一、 响应三级缓存
1. Eureka自动装配spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
2. 自动装配类EurekaServerAutoConfiguration
//实例注册 @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } //Eureka节点 @Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } //Eureka服务器上下文 @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); }
3. Eureka服务器上下文初始化
@PostConstruct //bean创建出来,完成属性点的初始化后调用 @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start();//节点更新任务开启 try { registry.init(peerEurekaNodes);//注册中心初始化 } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); }
4. 注册中心初始化
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//初始化响应缓存
initializedResponseCache();
//开启续约阈值更新任务,这个阈值就是计算自我保护的
scheduleRenewalThresholdUpdateTask();
//初始化远程区域注册
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
5. 初始化响应缓存
@Override public synchronized void initializedResponseCache() { if (responseCache == null) { responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this); } }
6. 响应缓存结构
//一级缓存 只读缓存 private final ConcurrentMapreadOnlyCacheMap = new ConcurrentHashMap (); //二级缓存 读写缓存 private final LoadingCache readWriteCacheMap; //三级缓存 本地缓存 private final AbstractInstanceRegistry registry; ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; //是否应该使用只读响应缓存,默认为true,从Eureka服务器配置中可以获取 this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); //自动装配中配置的那个实例注册对象 this.registry = registry; //响应缓存更新时间间隔,默认为30秒,从Eureka服务器配置中可以获取 long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); //读写缓存,使用的是google guava cache,这个缓存具有过期时间 this.readWriteCacheMap = //缓存的初始化容量为1000 CacheBuilder.newBuilder().initialCapacity(1000) //缓存中的每一个key写进去后都会过期时间,默认为180s,从Eureka服务器配置中可以获取 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener () { @Override public void onRemoval(RemovalNotification notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader () { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } //从本地注册表获取 Value value = generatePayload(key); return value; } }); //如果使用了只读缓存,那么会开启一个定时任务,每隔30s更新一次只读缓存 if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } }
7. 更新只读缓存
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
//遍历只读缓存
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
//从读写缓存取
Value cacheValue = readWriteCacheMap.get(key);
//从只读缓存取
Value currentCacheValue = readOnlyCacheMap.get(key);
//如果值不一样,说明只读缓存也就是一级缓存失效了
if (cacheValue != currentCacheValue) {
//重新放入只读缓存
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
二、 客户端操作
1. 客户端注册服务信息ApplicationResource
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } //本地注册表中注册服务信息 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //将实例信息存入本地注册表,也就是存入一个ConcurrentHashMap中 super.register(info, leaseDuration, isReplication); //向集群中的其他节点同步服务信息 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
2. 客户端拉取服务信息ApplicationsResource
@GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { //从响应缓存中取获取注册服务器信息 response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { //从响应缓存中取获取注册服务器信息 response = Response.ok(responseCache.get(cacheKey)) .build(); } return response; }
三、 自我保护机制
1. 续约阈值任务
private void scheduleRenewalThresholdUpdateTask() { //这是一个定时器,默认每15分钟执行一次 timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); }
2. 自我保护计算
private void updateRenewalThreshold() { try { Applications apps = eurekaClient.getApplications(); int count = 0; //循环遍历本地注册表所有注册的应用 for (Application app : apps.getRegisteredApplications()) { //每一个应用可能会部署多份,每一份都是一个服务 for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count;//累加服务的个数 } } } synchronized (lock) { //每个服务每30s续约一次,每分钟应该续约两次,因此,总续约数就是 count * 2 //当当前续约次数大于期望续约的最小次数 与 设置的续约百分比的乘积时更新续约的阈值 //因为可能有新的服务注册进来,续约的阈值应该增加;也可能有服务下线,续约的阈值应该减少 //关闭自我保护时,也需要更新续约的阈值 if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin) || (!this.isSelfPreservationModeEnabled())) { //每分钟续约的最小次数更新 this.expectedNumberOfRenewsPerMin = count * 2; //每分钟续约的最小次数阈值更新 this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } }
3. 服务续约
public boolean renew(final String appName, final String id, final boolean isReplication) { if (super.renew(appName, id, isReplication)) { //通过心跳来续约 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } //服务需要与每个Eureka节点续约 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: //心跳命令,就是用来续约的 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); //Eureka节点发送心跳信息 node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
public void heartbeat(final String appName, final String id, final InstanceInfo info, final InstanceStatus overriddenStatus, boolean primeConnection) throws Throwable { if (primeConnection) { // We do not care about the result for priming request. replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); return; } ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) { @Override public EurekaHttpResponseexecute() throws Throwable { //Eureka HTTP客户端发送心跳信息 return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); } @Override public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); if (info != null) { logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", getTaskName(), info.getId(), info.getStatus()); register(info); } } else if (config.shouldSyncWhenTimestampDiffers()) { InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity; if (peerInstanceInfo != null) { syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); } } } }; long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime); }
@Override public EurekaHttpResponsesendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { //模拟HTTP请求 WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章