上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Eureka

guduadmin21天前

一、 响应三级缓存

1. Eureka自动装配spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

2. 自动装配类EurekaServerAutoConfiguration

//实例注册
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                                serverCodecs, this.eurekaClient,
                                                           this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                                    this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
//Eureka节点
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
                                       ServerCodecs serverCodecs) {
    return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
       this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
​
//Eureka服务器上下文
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
    PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
       registry, peerEurekaNodes, this.applicationInfoManager);
}

3. Eureka服务器上下文初始化

@PostConstruct //bean创建出来,完成属性点的初始化后调用
@Override
public void initialize() {
    logger.info("Initializing ...");
    peerEurekaNodes.start();//节点更新任务开启
    try {
        registry.init(peerEurekaNodes);//注册中心初始化
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    logger.info("Initialized");
}

4. 注册中心初始化

@Override

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {

    this.numberOfReplicationsLastMin.start();

    this.peerEurekaNodes = peerEurekaNodes;

    //初始化响应缓存

    initializedResponseCache();

    //开启续约阈值更新任务,这个阈值就是计算自我保护的

    scheduleRenewalThresholdUpdateTask();

    //初始化远程区域注册

    initRemoteRegionRegistry();

    try {

        Monitors.registerObject(this);

    } catch (Throwable e) {

        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);

    }

}

5. 初始化响应缓存

@Override
public synchronized void initializedResponseCache() {
    if (responseCache == null) {
        responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
    }
}

6. 响应缓存结构

//一级缓存 只读缓存
private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();
//二级缓存  读写缓存
private final LoadingCache readWriteCacheMap;
//三级缓存  本地缓存
private final AbstractInstanceRegistry registry;
​
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs,                               AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    //是否应该使用只读响应缓存,默认为true,从Eureka服务器配置中可以获取
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    //自动装配中配置的那个实例注册对象
    this.registry = registry;
    //响应缓存更新时间间隔,默认为30秒,从Eureka服务器配置中可以获取
    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    //读写缓存,使用的是google guava cache,这个缓存具有过期时间
    this.readWriteCacheMap =
        //缓存的初始化容量为1000
        CacheBuilder.newBuilder().initialCapacity(1000)
        //缓存中的每一个key写进去后都会过期时间,默认为180s,从Eureka服务器配置中可以获取
        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
        .removalListener(new RemovalListener() {
            @Override
            public void onRemoval(RemovalNotification notification) {
                Key removedKey = notification.getKey();
                if (removedKey.hasRegions()) {
                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                }
            }
        })
        .build(new CacheLoader() {
            @Override
            public Value load(Key key) throws Exception {
                if (key.hasRegions()) {
                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                    regionSpecificKeys.put(cloneWithNoRegions, key);
                }
                //从本地注册表获取
                Value value = generatePayload(key);
                return value;
            }
        });
    //如果使用了只读缓存,那么会开启一个定时任务,每隔30s更新一次只读缓存
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                       new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                       responseCacheUpdateIntervalMs);
    }
​
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
    }
}

7. 更新只读缓存

private TimerTask getCacheUpdateTask() {

    return new TimerTask() {

        @Override

        public void run() {

            logger.debug("Updating the client cache from response cache");

            //遍历只读缓存

            for (Key key : readOnlyCacheMap.keySet()) {

                if (logger.isDebugEnabled()) {

                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",

                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType());

                }

                try {

                    CurrentRequestVersion.set(key.getVersion());

                    //从读写缓存取

                    Value cacheValue = readWriteCacheMap.get(key);

                    //从只读缓存取

                    Value currentCacheValue = readOnlyCacheMap.get(key);

                    //如果值不一样,说明只读缓存也就是一级缓存失效了

                    if (cacheValue != currentCacheValue) {

                        //重新放入只读缓存

                        readOnlyCacheMap.put(key, cacheValue);

                    }

                } catch (Throwable th) {

                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);

                }

            }

        }

    };

}

二、 客户端操作

1. 客户端注册服务信息ApplicationResource

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // validate that the instanceinfo contains all the necessary required fields
    if (isBlank(info.getId())) {
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getIPAddr())) {
        return Response.status(400).entity("Missing ip address").build();
    } else if (isBlank(info.getAppName())) {
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    } else if (info.getDataCenterInfo() == null) {
        return Response.status(400).entity("Missing dataCenterInfo").build();
    } else if (info.getDataCenterInfo().getName() == null) {
        return Response.status(400).entity("Missing dataCenterInfo Name").build();
    }
​
    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }
    //本地注册表中注册服务信息
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    //将实例信息存入本地注册表,也就是存入一个ConcurrentHashMap中
    super.register(info, leaseDuration, isReplication);
    //向集群中的其他节点同步服务信息
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

2. 客户端拉取服务信息ApplicationsResource

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {
​
    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
    String[] regions = null;
    if (!isRemoteRegionRequested) {
        EurekaMonitors.GET_ALL.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
    }
​
    // Check if the server allows the access to the registry. The server can
    // restrict access if it is not
    // ready to serve traffic depending on various reasons.
    if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
        return Response.status(Status.FORBIDDEN).build();
    }
    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }
​
    Key cacheKey = new Key(Key.EntityType.Application,
                           ResponseCacheImpl.ALL_APPS,
                           keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
                          );
​
    Response response;
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        //从响应缓存中取获取注册服务器信息
        response = Response.ok(responseCache.getGZIP(cacheKey))
            .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
            .header(HEADER_CONTENT_TYPE, returnMediaType)
            .build();
    } else {
        //从响应缓存中取获取注册服务器信息
        response = Response.ok(responseCache.get(cacheKey))
            .build();
    }
    return response;
}

三、 自我保护机制

1. 续约阈值任务

private void scheduleRenewalThresholdUpdateTask() {
    //这是一个定时器,默认每15分钟执行一次
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            updateRenewalThreshold();
        }
    }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                   serverConfig.getRenewalThresholdUpdateIntervalMs());
}

2. 自我保护计算

private void updateRenewalThreshold() {
    try {
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        //循环遍历本地注册表所有注册的应用
        for (Application app : apps.getRegisteredApplications()) {
            //每一个应用可能会部署多份,每一份都是一个服务
            for (InstanceInfo instance : app.getInstances()) {
                if (this.isRegisterable(instance)) {
                    ++count;//累加服务的个数
                }
            }
        }
        synchronized (lock) {
            //每个服务每30s续约一次,每分钟应该续约两次,因此,总续约数就是 count * 2
            
            //当当前续约次数大于期望续约的最小次数 与 设置的续约百分比的乘积时更新续约的阈值
            //因为可能有新的服务注册进来,续约的阈值应该增加;也可能有服务下线,续约的阈值应该减少
          
            //关闭自我保护时,也需要更新续约的阈值
            if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
                || (!this.isSelfPreservationModeEnabled())) {
                //每分钟续约的最小次数更新
                this.expectedNumberOfRenewsPerMin = count * 2;
                //每分钟续约的最小次数阈值更新
                this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
            }
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {
        logger.error("Cannot update renewal threshold", e);
    }
}

3. 服务续约

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        //通过心跳来续约
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
​
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            //服务需要与每个Eureka节点续约
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}
private void replicateInstanceActionsToPeers(Action action, String appName,String id, 
                                             InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat: //心跳命令,就是用来续约的
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                //Eureka节点发送心跳信息
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}
public void heartbeat(final String appName, final String id,
                      final InstanceInfo info, final InstanceStatus overriddenStatus,
                      boolean primeConnection) throws Throwable {
    if (primeConnection) {
        // We do not care about the result for priming request.
        replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        return;
    }
    ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
        @Override
        public EurekaHttpResponse execute() throws Throwable {
            //Eureka HTTP客户端发送心跳信息
            return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }
​
        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            super.handleFailure(statusCode, responseEntity);
            if (statusCode == 404) {
                logger.warn("{}: missing entry.", getTaskName());
                if (info != null) {
                    logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                                getTaskName(), info.getId(), info.getStatus());
                    register(info);
                }
            } else if (config.shouldSyncWhenTimestampDiffers()) {
                InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                if (peerInstanceInfo != null) {
                    syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                }
            }
        }
    };
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
@Override
public EurekaHttpResponse sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        //模拟HTTP请求
        WebResource webResource = jerseyClient.resource(serviceUrl)
            .path(urlPath)
            .queryParam("status", info.getStatus().toString())
            .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        response = requestBuilder.put(ClientResponse.class);
        EurekaHttpResponseBuilder eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity()) {
            eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
        }
        return eurekaResponseBuilder.build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

网友评论

搜索
最新文章
热门文章
热门标签