剑客
关注科技互联网

Spring Cloud Eureka服务下线(Cancel)源码分析

摘要:在本篇文章中主要对Eureka的Cancel(服务下线)进行源码分析,在Service Provider服务shut down的时候,需要及时通知Eureka Server把自己剔除,从而避免其它客户端调用已经下线的服务,导致服务不可用。

Cancel(服务下线)

概述

在Service Provider服务shut down的时候,需要及时通知Eureka Server把自己剔除,从而避免客户端调用已经下线的服务。

服务提供者端源码分析

  1. 在eureka-client-1.4.1中的com.netflix.discovery.DiscoveryClient中shutdown()的 867
    行,
    /**
     * Shuts down Eureka Client. Also sends a deregistration request to the
     * eureka server.
     */
    @PreDestroy
    @Override
    publicsynchronizedvoidshutdown(){
    if(isShutdown.compareAndSet(false,true)) {
     logger.info("Shutting down DiscoveryClient ...");
    
    if(statusChangeListener !=null&& applicationInfoManager !=null) {
     applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
     }
    
     cancelScheduledTasks();
    
    // If APPINFO was registered
    if(applicationInfoManager !=null&& clientConfig.shouldRegisterWithEureka()) {
     applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
    //调用下线接口
     unregister();
     }
    
    if(eurekaTransport !=null) {
     eurekaTransport.shutdown();
     }
    
     heartbeatStalenessMonitor.shutdown();
     registryStalenessMonitor.shutdown();
    
     logger.info("Completed shut down of DiscoveryClient");
     }
     }
    

Tips
@PreDestroy
注解或 shutdown()
的方法是服务下线的入口

  1. 在eureka-client-1.4.1中的 com.netflix.discovery.DiscoveryClient
    unregister()
    897
    /**
     * unregister w/ the eureka service.
     */
    voidunregister(){
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport !=null&& eurekaTransport.registrationClient !=null) {
    try{
     logger.info("Unregistering ...");
    //发送服务下线请求
     EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
     logger.info(PREFIX + appPathIdentifier + " - deregister status: "+ httpResponse.getStatusCode());
     } catch(Exception e) {
     logger.error(PREFIX + appPathIdentifier + " - de-registration failed"+ e.getMessage(), e);
     }
     }
    }
    

Eureka Server服务下线实现细节

  1. com.netflix.eureka.resources.InstanceResource
    中的 280
    行中的 cancelLease()
    方法

    @DELETE
    publicResponsecancelLease(
     @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    //调用cancel
    booleanisSuccess = registry.cancel(app.getName(), id,
    "true".equals(isReplication));
    
    if(isSuccess) {
     logger.debug("Found (Cancel): "+ app.getName() +" - "+ id);
    returnResponse.ok().build();
     } else{
     logger.info("Not Found (Cancel): "+ app.getName() +" - "+ id);
    returnResponse.status(Status.NOT_FOUND).build();
     }
    }
    
  2. org.springframework.cloud.netflix.eureka.server.InstanceRegistry
    中的 95
    行的 cancel()
    方法,

    @Override
    publicbooleancancel(String appName, String serverId,booleanisReplication){
     handleCancelation(appName, serverId, isReplication);
    //调用父类中的cancel
    returnsuper.cancel(appName, serverId, isReplication);
    }
    
  3. com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
    中的 376

    @Override
    publicbooleancancel(finalString appName,finalString id,
    finalbooleanisReplication) {
    if(super.cancel(appName, id, isReplication)) {
    //服务下线成功后,同步更新信息到其它Eureka Server节点
     replicateToPeers(Action.Cancel, appName, id, null,null, isReplication);
    synchronized(lock) {
    if(this.expectedNumberOfRenewsPerMin >0) {
    // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
    this.expectedNumberOfRenewsPerMin =this.expectedNumberOfRenewsPerMin -2;
    this.numberOfRenewsPerMinThreshold =
     (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
     }
     }
    returntrue;
     }
    returnfalse;
    }
    
  4. replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);调用自身的 replicateToPeers()
    方法,在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的 618
    行,主要接口实现方式和register基本一致:首先更新自身Eureka Server中服务的状态,再同步到其它Eureka Server中。

    privatevoidreplicateToPeers(Action action, String appName, String id,
     InstanceInfo info /* optional */,
     InstanceStatus newStatus /* optional */,booleanisReplication) {
     Stopwatch tracer = action.getTimer().start();
    try{
    if(isReplication) {
     numberOfReplicationsLastMin.increment();
     }
    // If it is a replication already, do not replicate again as this will create a poison replication
    if(peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
    return;
     }
    // 同步把续约信息同步到其它的Eureka Server中
    for(finalPeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
    // If the url represents this host, do not replicate to yourself.
    if(peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
    continue;
     }
    //根据action做相应操作的同步
     replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
     }
     } finally{
     tracer.stop();
     }
     }
    

至此,Eureka服务续约源码分析结束,大家有兴趣可自行阅读。

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址