深入理解SpringCloud之Eureka注冊過程分析
eureka是一種去中心化的服務(wù)治理應(yīng)用,其顯著特點是既可以作為服務(wù)端又可以作為服務(wù)向自己配置的地址進行注冊。那么這篇文章就來探討一下eureka的注冊流程。
一、Eureka的服務(wù)端
eureka的服務(wù)端核心類是EurekaBootstrap,該類實現(xiàn)了一個ServletContextListener的監(jiān)聽器。因此我們可以斷定eureka是基于servlet容器實現(xiàn)的。關(guān)鍵代碼如下:
public class EurekaBootStrap implements ServletContextListener {
//...省略相關(guān)代碼
/**
* Initializes Eureka, including syncing up with other Eureka peers and publishing the registry.
*
* @see
* javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent)
*/
@Override
public void contextInitialized(ServletContextEvent event) {
try {
initEurekaEnvironment();
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
//省略相關(guān)代碼.....
}
我們可以看到在ServletContext初始化完成時,會初始化Eureka環(huán)境,然后初始化EurekaServerContext,那么我們在看一看initEurekaServerContext方法:
/**
* init hook for server context. Override for custom logic.
*/
protected void initEurekaServerContext() throws Exception {
// .....
ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
//....省略部分代碼
}
在這個方法里會創(chuàng)建許多與eureka服務(wù)相關(guān)的對象,在這里我列舉了兩個核心對象分別是eurekaClient與PeerAwareInstanceRegistry,關(guān)于客戶端部分我們等會再說,我們現(xiàn)在來看看PeerAwareInstanceRegistry到底是做什么用的,這里我寫貼出關(guān)于這個類的類圖:

根據(jù)類圖我們可以清晰的發(fā)現(xiàn)PeerAwareInstanceRegistry的最頂層接口為LeaseManager與LookupService,其中LookupService定義了最基本的發(fā)現(xiàn)示例的行為而LeaseManager定義了處理客戶端注冊,續(xù)約,注銷等操作。那么在這篇文章我們還是重點關(guān)注一下LeaseManager的相關(guān)接口的實現(xiàn)?;剡^頭來我們在看PeerAwareInstanceRegistry,其實這個類用于多個節(jié)點下復(fù)制相關(guān)信息,比如說一個節(jié)點注冊續(xù)約與下線那么通過這個類將會相關(guān)復(fù)制(通知)到各個節(jié)點。我們來看看它是怎么處理客戶端注冊的:
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
我們可以看到它調(diào)用了父類的register方法后又通過replicateToPeers復(fù)制對應(yīng)的行為到其他節(jié)點,具體如何復(fù)制的先不在這里討論,我們重點來看看注冊方法,我們在父類里找到register()方法:
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
//。。。省略部分代碼
}
通過源代碼,我們來簡要梳理一下流程:
1)首先根據(jù)appName獲取一些列的服務(wù)實例對象,如果為Null,則新創(chuàng)建一個map并把當(dāng)前的注冊應(yīng)用程序信息添加到此Map當(dāng)中,這里有一個Lease對象,這個類描述了泛型T的時間屬性,比如說注冊時間,服務(wù)啟動時間,最后更新時間等,大家可以關(guān)注一下它的實現(xiàn):
/*
* Copyright 2012 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.eureka.lease;
import com.netflix.eureka.registry.AbstractInstanceRegistry;
/**
* Describes a time-based availability of a {@link T}. Purpose is to avoid
* accumulation of instances in {@link AbstractInstanceRegistry} as result of ungraceful
* shutdowns that is not uncommon in AWS environments.
*
* If a lease elapses without renewals, it will eventually expire consequently
* marking the associated {@link T} for immediate eviction - this is similar to
* an explicit cancellation except that there is no communication between the
* {@link T} and {@link LeaseManager}.
*
* @author Karthik Ranganathan, Greg Kim
*/
public class Lease<T> {
enum Action {
Register, Cancel, Renew
};
public static final int DEFAULT_DURATION_IN_SECS = 90;
private T holder;
private long evictionTimestamp;
private long registrationTimestamp;
private long serviceUpTimestamp;
// Make it volatile so that the expiration task would see this quicker
private volatile long lastUpdateTimestamp;
private long duration;
public Lease(T r, int durationInSecs) {
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = (durationInSecs * 1000);
}
/**
* Renew the lease, use renewal duration if it was specified by the
* associated {@link T} during registration, otherwise default duration is
* {@link #DEFAULT_DURATION_IN_SECS}.
*/
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
/**
* Cancels the lease by updating the eviction time.
*/
public void cancel() {
if (evictionTimestamp <= 0) {
evictionTimestamp = System.currentTimeMillis();
}
}
/**
* Mark the service as up. This will only take affect the first time called,
* subsequent calls will be ignored.
*/
public void serviceUp() {
if (serviceUpTimestamp == 0) {
serviceUpTimestamp = System.currentTimeMillis();
}
}
/**
* Set the leases service UP timestamp.
*/
public void setServiceUpTimestamp(long serviceUpTimestamp) {
this.serviceUpTimestamp = serviceUpTimestamp;
}
/**
* Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
*/
public boolean isExpired() {
return isExpired(0l);
}
/**
* Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
*
* Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
* what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
* instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
* not be fixed.
*
* @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
*/
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
/**
* Gets the milliseconds since epoch when the lease was registered.
*
* @return the milliseconds since epoch when the lease was registered.
*/
public long getRegistrationTimestamp() {
return registrationTimestamp;
}
/**
* Gets the milliseconds since epoch when the lease was last renewed.
* Note that the value returned here is actually not the last lease renewal time but the renewal + duration.
*
* @return the milliseconds since epoch when the lease was last renewed.
*/
public long getLastRenewalTimestamp() {
return lastUpdateTimestamp;
}
/**
* Gets the milliseconds since epoch when the lease was evicted.
*
* @return the milliseconds since epoch when the lease was evicted.
*/
public long getEvictionTimestamp() {
return evictionTimestamp;
}
/**
* Gets the milliseconds since epoch when the service for the lease was marked as up.
*
* @return the milliseconds since epoch when the service for the lease was marked as up.
*/
public long getServiceUpTimestamp() {
return serviceUpTimestamp;
}
/**
* Returns the holder of the lease.
*/
public T getHolder() {
return holder;
}
}
2)根據(jù)當(dāng)前注冊的ID,如果能在map中取到則做以下操作:
2.1)根據(jù)當(dāng)前存在節(jié)點的觸碰時間和注冊節(jié)點的觸碰時間比較,如果前者的時間晚于后者的時間,那么當(dāng)前注冊的實例就以已存在的實例為準
2.2)否則更新其每分鐘期望的續(xù)約數(shù)量及其閾值
3)將當(dāng)前的注冊節(jié)點存到map當(dāng)中,至此我們的注冊過程基本告一段落了
二、eureka客戶端
在服務(wù)端servletContext初始化完畢時,會創(chuàng)建DiscoveryClient。熟悉eureka的朋友,一定熟悉這兩個屬性:fetchRegistry與registerWithEureka。在springcloud中集成eureka獨立模式運行時,如果這兩個值不為false,那么啟動會報錯,為什么會報錯呢?其實答案就在DiscoveryClient的構(gòu)造函數(shù)中:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//....省略部分代碼
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
//....省略部分代碼
initScheduledTasks();
//....
}
根據(jù)源代碼,我們可以得出以下結(jié)論:
1)如果shouldRegisterWithEureka與shouldFetchRegistry都為false,那么直接return。
2)創(chuàng)建發(fā)送心跳與刷新緩存的線程池
3)初始化創(chuàng)建的定時任務(wù)
那么我們在看看initScheduledTasks()方法里有如下代碼:
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
此處是觸發(fā)一個定時執(zhí)行的線程,以秒為單位,根據(jù)renewalIntervalInSecs值定時執(zhí)行發(fā)送心跳,HeartbeatThread線程執(zhí)行如下:
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
我們可以看到run方法里很簡單執(zhí)行renew方法,如果成功記錄一下時間。renew方法:
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
在這里發(fā)送心跳如果返回的是404,那么會執(zhí)行注冊操作,注意我們根據(jù)返回值httpResponse可以斷定這一切的操作都是基于http請求的,到底是不是呢?我們繼續(xù)看一下register方法:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
在這里又調(diào)用了eurekaTransport里registrationClient的方法:
private static final class EurekaTransport {
private ClosableResolver bootstrapResolver;
private TransportClientFactory transportClientFactory;
private EurekaHttpClient registrationClient;
private EurekaHttpClientFactory registrationClientFactory;
private EurekaHttpClient queryClient;
private EurekaHttpClientFactory queryClientFactory;
void shutdown() {
if (registrationClientFactory != null) {
registrationClientFactory.shutdown();
}
if (queryClientFactory != null) {
queryClientFactory.shutdown();
}
if (registrationClient != null) {
registrationClient.shutdown();
}
if (queryClient != null) {
queryClient.shutdown();
}
if (transportClientFactory != null) {
transportClientFactory.shutdown();
}
if (bootstrapResolver != null) {
bootstrapResolver.shutdown();
}
}
}
在這里我們可以看到,eureka的客戶端是使用http請求進行注冊服務(wù)的,也就是說當(dāng)我們創(chuàng)建DiscoveryClient就會向服務(wù)端進行實例的注冊。
三、服務(wù)端提供的rest服務(wù)
服務(wù)端提供用于處理客戶端注冊請求的代碼我們已經(jīng)看過了,既然客戶端是通過走HTTP協(xié)議進行注冊的,那服務(wù)端總要有處理這個http請求的地址吧,其實eureka服務(wù)端是采用jax-rs標準提供rest方式進行暴露服務(wù)的,我們可以看一下這個類ApplicationResoure的addInstance方法:
/**
* Registers information about a particular instance for an
* {@link com.netflix.discovery.shared.Application}.
*
* @param info
* {@link InstanceInfo} information of the instance.
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes.
*/
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot項目中使用Sharding-JDBC實現(xiàn)讀寫分離的詳細步驟
Sharding-JDBC是一個分布式數(shù)據(jù)庫中間件,它不僅支持數(shù)據(jù)分片,還可以輕松實現(xiàn)數(shù)據(jù)庫的讀寫分離,本文介紹如何在Spring Boot項目中集成Sharding-JDBC并實現(xiàn)讀寫分離的詳細步驟,需要的朋友可以參考下2024-08-08
java實現(xiàn)阿拉伯?dāng)?shù)字轉(zhuǎn)漢字數(shù)字
這篇文章主要為大家詳細介紹了java實現(xiàn)阿拉伯?dāng)?shù)字轉(zhuǎn)換為漢字數(shù)字源代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-04-04
SpringAop @Aspect織入不生效,不執(zhí)行前置增強織入@Before方式
這篇文章主要介紹了SpringAop @Aspect織入不生效,不執(zhí)行前置增強織入@Before方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
詳解JUC并發(fā)編程中的進程與線程學(xué)習(xí)
這篇文章主要為大家詳細介紹了JUC并發(fā)編程中的進程與線程學(xué)習(xí),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03

