Spring Cloud 請求重試機(jī)制核心代碼分析
場景
發(fā)布微服務(wù)的操作一般都是打完新代碼的包,kill掉在跑的應(yīng)用,替換新的包,啟動。
spring cloud 中使用eureka為注冊中心,它是允許服務(wù)列表數(shù)據(jù)的延遲性的,就是說即使應(yīng)用已經(jīng)不在服務(wù)列表了,客戶端在一段時間內(nèi)依然會請求這個地址。那么就會出現(xiàn)請求正在發(fā)布的地址,而導(dǎo)致失敗。
我們會優(yōu)化服務(wù)列表的刷新時間,以提高服務(wù)列表信息的時效性。但是無論怎樣,都無法避免有那么一段時間是數(shù)據(jù)不一致的。
所以我們想到一個辦法就是重試機(jī)制,當(dāng)a機(jī)子在重啟時,同個集群的b是可以正常提供服務(wù)的,如果有重試機(jī)制就可以在上面這個場景里進(jìn)行重試到b而不影響正確響應(yīng)。
操作
需要進(jìn)行如下的操作:
ribbon: ReadTimeout: 10000 ConnectTimeout: 10000 MaxAutoRetries: 0 MaxAutoRetriesNextServer: 1 OkToRetryOnAllOperations: false
引入spring-retry包
<dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> </dependency>
以zuul為例子還需要配置開啟重試:
zuul.retryable=true
遇到了問題
然而萬事總沒那么一帆風(fēng)順,通過測試重試機(jī)制生效了,但是并沒有我想象的去請求另一臺健康的機(jī)子,于是被迫去吧開源碼看一看,最終發(fā)現(xiàn)是源碼的bug,不過已經(jīng)修復(fù),升級版本即可。
代碼分析
使用的版本是
spring-cloud-netflix-core:1.3.6.RELEASE
spring-retry:1.2.1.RELEASE
spring cloud 依賴版本:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
因為啟用了重試,所以請求應(yīng)用時會執(zhí)行RetryableRibbonLoadBalancingHttpClient.execute方法:
public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
final RequestConfig.Builder builder = RequestConfig.custom();
IClientConfig config = configOverride != null ? configOverride : this.config;
builder.setConnectTimeout(config.get(
CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
builder.setSocketTimeout(config.get(
CommonClientConfigKey.ReadTimeout, this.readTimeout));
builder.setRedirectsEnabled(config.get(
CommonClientConfigKey.FollowRedirects, this.followRedirects));
final RequestConfig requestConfig = builder.build();
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
RetryCallback retryCallback = new RetryCallback() {
@Override
public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception {
//on retries the policy will choose the server and set it in the context
//extract the server and update the request being made
RibbonApacheHttpRequest newRequest = request;
if(context instanceof LoadBalancedRetryContext) {
ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
if(service != null) {
//Reconstruct the request URI using the host and port set in the retry context
newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
newRequest.getURI().getFragment()));
}
}
newRequest = getSecureRequest(request, configOverride);
HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
if(CloseableHttpResponse.class.isInstance(httpResponse)) {
((CloseableHttpResponse)httpResponse).close();
}
throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
httpResponse.getStatusLine().getStatusCode());
}
return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}
};
return this.executeWithRetry(request, retryPolicy, retryCallback);
}
我們發(fā)現(xiàn)先new 一個RetryCallback,然后執(zhí)行this.executeWithRetry(request, retryPolicy, retryCallback);
而這個RetryCallback.doWithRetry的代碼我們清楚看到是實際請求的代碼,也就是說this.executeWithRetry方法最終還是會調(diào)用RetryCallback.doWithRetry
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state)
throws E, ExhaustedRetryException {
RetryPolicy retryPolicy = this.retryPolicy;
BackOffPolicy backOffPolicy = this.backOffPolicy;
// Allow the retry policy to initialise itself...
RetryContext context = open(retryPolicy, state);
if (this.logger.isTraceEnabled()) {
this.logger.trace("RetryContext retrieved: " + context);
}
// Make sure the context is available globally for clients who need
// it...
RetrySynchronizationManager.register(context);
Throwable lastException = null;
boolean exhausted = false;
try {
// Give clients a chance to enhance the context...
boolean running = doOpenInterceptors(retryCallback, context);
if (!running) {
throw new TerminatedRetryException(
"Retry terminated abnormally by interceptor before first attempt");
}
// Get or Start the backoff context...
BackOffContext backOffContext = null;
Object resource = context.getAttribute("backOffContext");
if (resource instanceof BackOffContext) {
backOffContext = (BackOffContext) resource;
}
if (backOffContext == null) {
backOffContext = backOffPolicy.start(context);
if (backOffContext != null) {
context.setAttribute("backOffContext", backOffContext);
}
}
/*
* We allow the whole loop to be skipped if the policy or context already
* forbid the first try. This is used in the case of external retry to allow a
* recovery in handleRetryExhausted without the callback processing (which
* would throw an exception).
*/
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Retry: count=" + context.getRetryCount());
}
// Reset the last exception, so if we are successful
// the close interceptors will not think we failed...
lastException = null;
return retryCallback.doWithRetry(context);
}
catch (Throwable e) {
lastException = e;
try {
registerThrowable(retryPolicy, state, context, e);
}
catch (Exception ex) {
throw new TerminatedRetryException("Could not register throwable",
ex);
}
finally {
doOnErrorInterceptors(retryCallback, context, e);
}
if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
try {
backOffPolicy.backOff(backOffContext);
}
catch (BackOffInterruptedException ex) {
lastException = e;
// back off was prevented by another thread - fail the retry
if (this.logger.isDebugEnabled()) {
this.logger
.debug("Abort retry because interrupted: count="
+ context.getRetryCount());
}
throw ex;
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Checking for rethrow: count=" + context.getRetryCount());
}
if (shouldRethrow(retryPolicy, context, state)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Rethrow in retry for policy: count="
+ context.getRetryCount());
}
throw RetryTemplate.<E>wrapIfNecessary(e);
}
}
/*
* A stateful attempt that can retry may rethrow the exception before now,
* but if we get this far in a stateful retry there's a reason for it,
* like a circuit breaker or a rollback classifier.
*/
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}
}
if (state == null && this.logger.isDebugEnabled()) {
this.logger.debug(
"Retry failed last attempt: count=" + context.getRetryCount());
}
exhausted = true;
return handleRetryExhausted(recoveryCallback, context, state);
}
catch (Throwable e) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}
finally {
close(retryPolicy, context, state, lastException == null || exhausted);
doCloseInterceptors(retryCallback, context, lastException);
RetrySynchronizationManager.clear();
}
}
在一個while循環(huán)里實現(xiàn)重試機(jī)制,當(dāng)執(zhí)行retryCallback.doWithRetry(context)出現(xiàn)異常的時候,就會catch異常,然后用 retryPolicy判斷是否進(jìn)行重試,特別注意registerThrowable(retryPolicy, state, context, e);方法,不但判斷了是否重試,在重試情況下會新選出一個機(jī)子放入context,然后再去執(zhí)行retryCallback.doWithRetry(context)時帶入,如此就實現(xiàn)了換機(jī)子重試了。
但是我的配置怎么會沒有換機(jī)子呢?調(diào)試代碼發(fā)現(xiàn)registerThrowable(retryPolicy, state, context, e);選出來的機(jī)子沒問題,就是新的健康的機(jī)子,但是在執(zhí)行retryCallback.doWithRetry(context)代碼的時候依然請求的是那臺掛掉的機(jī)子。
所以我們再仔細(xì)看一下retryCallback.doWithRetry(context)的代碼:
我們發(fā)現(xiàn)了這行代碼:
newRequest = getSecureRequest(request, configOverride);
protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) {
if (isSecure(configOverride)) {
final URI secureUri = UriComponentsBuilder.fromUri(request.getUri())
.scheme("https").build(true).toUri();
return request.withNewUri(secureUri);
}
return request;
}
newRequest在前面已經(jīng)使用context構(gòu)建完畢,request是上一次請求的數(shù)據(jù),只要執(zhí)行這個代碼就會發(fā)現(xiàn)newRequest永遠(yuǎn)都會被request覆蓋??吹竭@里我們才發(fā)現(xiàn)原來是一個源碼bug。
issue地址:https://github.com/spring-cloud/spring-cloud-netflix/issues/2667
總結(jié)
這是一次很普通的查問題過程,在這個過程中當(dāng)我發(fā)現(xiàn)配置沒有達(dá)到我的預(yù)期時,我先查看了配置的含義,嘗試多次無果,于是進(jìn)行斷點(diǎn)調(diào)試發(fā)現(xiàn)異常中斷點(diǎn)后,因為場景需要一臺機(jī)子健康一臺機(jī)子下線,我模擬了數(shù)百次,最終才定位到了這行代碼。開源項目即使是優(yōu)秀的項目必然也會有bug存在,不迷信,不盲目。另一方面,閱讀源碼能力也是一個解決問題的重要能力,像我在找源碼入口,定位代碼時耗費(fèi)了很多的時間。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java多線程教程之如何利用Future實現(xiàn)攜帶結(jié)果的任務(wù)
Callable與Future兩功能是Java?5版本中加入的,這篇文章主要給大家介紹了關(guān)于Java多線程教程之如何利用Future實現(xiàn)攜帶結(jié)果任務(wù)的相關(guān)資料,需要的朋友可以參考下2021-12-12
Java使用Jasypt進(jìn)行加密和解密的技術(shù)指南
Jasypt (Java Simplified Encryption) 是一個簡化 Java 應(yīng)用中加密工作的庫,它支持加密和解密操作,易于與 Spring Boot 集成,通過 Jasypt,可以安全地管理敏感信息,比如數(shù)據(jù)庫密碼、API 密鑰等,本文介紹了Java使用Jasypt進(jìn)行加密和解密的技術(shù)指南,需要的朋友可以參考下2025-03-03
Spring Security靈活的PasswordEncoder加密方式解析
這篇文章主要介紹了Spring Security靈活的PasswordEncoder加密方式解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09
解決response.setHeader設(shè)置下載文件名無效的問題
這篇文章主要介紹了解決response.setHeader設(shè)置下載文件名無效的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01

