Spring?Cloud?Gateway實(shí)現(xiàn)分布式限流和熔斷降級(jí)的示例代碼
一、限流
思考:為啥需要限流?
在一個(gè)流量特別大的業(yè)務(wù)場(chǎng)景中,如果不進(jìn)行限流,會(huì)造成系統(tǒng)宕機(jī),當(dāng)大批量的請(qǐng)求到達(dá)后端服務(wù)時(shí),會(huì)造成資源耗盡【CPU、內(nèi)存、線程、網(wǎng)絡(luò)帶寬、數(shù)據(jù)庫(kù)連接等是有限的】,進(jìn)而拖垮系統(tǒng)。
1.常見(jiàn)限流算法
- 漏桶算法
- 令牌桶算法
1.1漏桶算法(不推薦)
1.1.1.原理
將請(qǐng)求緩存到一個(gè)隊(duì)列中,然后以固定的速度處理,從而達(dá)到限流的目的
1.1.2.實(shí)現(xiàn)
將請(qǐng)求裝到一個(gè)桶中,桶的容量為固定的一個(gè)值,當(dāng)桶裝滿之后,就會(huì)將請(qǐng)求丟棄掉,桶底部有一個(gè)洞,以固定的速率流出。
1.1.3.舉例
桶的容量為1W,有10W并發(fā)請(qǐng)求,最多只能將1W請(qǐng)求放入桶中,其余請(qǐng)求全部丟棄,以固定的速度處理請(qǐng)求
1.1.4.缺點(diǎn)
處理突發(fā)流量效率低(處理請(qǐng)求的速度不變,效率很低)
1.2.令牌桶算法(推薦)
1.2.1.原理
將請(qǐng)求放在一個(gè)緩沖隊(duì)列中,拿到令牌后才能進(jìn)行處理
1.2.2.實(shí)現(xiàn)
裝令牌的桶大小固定,當(dāng)令牌裝滿后,則不能將令牌放入其中;每次請(qǐng)求都會(huì)到桶中拿取一個(gè)令牌才能放行,沒(méi)有令牌時(shí)即丟棄請(qǐng)求/繼續(xù)放入緩存隊(duì)列中等待
1.2.3.舉例
桶的容量為10w個(gè),生產(chǎn)1w個(gè)/s,有10W的并發(fā)請(qǐng)求,以每秒10W個(gè)/s速度處理,隨著桶中的令牌很快用完,速度又慢慢降下來(lái)啦,而生產(chǎn)令牌的速度趨于一致1w個(gè)/s
1.2.4.缺點(diǎn)
處理突發(fā)流量提供了系統(tǒng)性能,但是對(duì)系統(tǒng)造成了一定的壓力,桶的大小不合理,甚至?xí)嚎逑到y(tǒng)(處理1億的并發(fā)請(qǐng)求,將桶的大小設(shè)置為1,這個(gè)系統(tǒng)一下就涼涼啦)
2.網(wǎng)關(guān)限流(Spring Cloud Gateway + Redis實(shí)戰(zhàn))
2.1.pom.xml配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>2.2.yaml配置
spring:
application:
name: laokou-gateway
cloud:
gateway:
routes:
- id: LAOKOU-SSO-DEMO
uri: lb://laokou-sso-demo
predicates:
- Path=/sso/**
filters:
- StripPrefix=1
- name: RequestRateLimiter #請(qǐng)求數(shù)限流,名字不能亂打
args:
key-resolver: "#{@ipKeyResolver}"
redis-rate-limiter.replenishRate: 1 #生成令牌速率-設(shè)為1方便測(cè)試
redis-rate-limiter.burstCapacity: 1 #令牌桶容量-設(shè)置1方便測(cè)試
redis:
database: 0
cluster:
nodes: x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005,x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005
password: laokou #密碼
timeout: 6000ms #連接超時(shí)時(shí)長(zhǎng)(毫秒)
jedis:
pool:
max-active: -1 #連接池最大連接數(shù)(使用負(fù)值表示無(wú)極限)
max-wait: -1ms #連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒(méi)有限制)
max-idle: 10 #連接池最大空閑連接
min-idle: 5 #連接池最小空間連接2.3.創(chuàng)建bean
@Configuration
public class RequestRateLimiterConfig {
@Bean(value = "ipKeyResolver")
public KeyResolver ipKeyResolver(RemoteAddressResolver remoteAddressResolver) {
return exchange -> Mono.just(remoteAddressResolver.resolve(exchange).getAddress().getHostAddress());
}
@Bean
public RemoteAddressResolver remoteAddressResolver() {
// 遠(yuǎn)程地址解析器
return XForwardedRemoteAddressResolver.trustAll();
}
}
3.測(cè)試限流(編寫(xiě)java并發(fā)測(cè)試)
@Slf4j
public class HttpUtil {
public static void apiConcurrent(String url,Map<String,String> params) {
Integer count = 200;
//創(chuàng)建線程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.SECONDS, new SynchronousQueue<>());
//同步工具
CountDownLatch latch = new CountDownLatch(count);
Map<String,String> dataMap = new HashMap<>(1);
dataMap.put("authorize","XXXXXXX");
for (int i = 0; i < count; i++) {
pool.execute(() -> {
try {
//訪問(wèn)網(wǎng)關(guān)的API接口
HttpUtil.doGet("http://localhost:1234/sso/laokou-demo/user",dataMap);
} catch (IOException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static String doGet(String url, Map<String, String> params) throws IOException {
//創(chuàng)建HttpClient對(duì)象
CloseableHttpClient httpClient = HttpClients.createDefault();
String resultString = "";
CloseableHttpResponse response = null;
try {
//創(chuàng)建uri
URIBuilder builder = new URIBuilder(url);
if (!params.isEmpty()) {
for (Map.Entry<String, String> entry : params.entrySet()) {
builder.addParameter(entry.getKey(), entry.getValue());
}
}
URI uri = builder.build();
//創(chuàng)建http GET請(qǐng)求
HttpGet httpGet = new HttpGet(uri);
List<NameValuePair> paramList = new ArrayList<>();
RequestBuilder requestBuilder = RequestBuilder.get().setUri(new URI(url));
requestBuilder.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
httpGet.setHeader(new BasicHeader("Content-Type", "application/json;charset=UTF-8"));
httpGet.setHeader(new BasicHeader("Accept", "*/*;charset=utf-8"));
//執(zhí)行請(qǐng)求
response = httpClient.execute(httpGet);
//判斷返回狀態(tài)是否是200
if (response.getStatusLine().getStatusCode() == 200) {
resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
}
} catch (Exception e) {
log.info("調(diào)用失敗:{}",e);
} finally {
if (response != null) {
response.close();
}
httpClient.close();
}
log.info("打?。簕}",resultString);
return resultString;
}
}說(shuō)明這個(gè)網(wǎng)關(guān)限流配置是沒(méi)有問(wèn)題的
4.源碼查看
Spring Cloud Gateway RequestRateLimiter GatewayFilter Factory文檔地址
工廠 RequestRateLimiter GatewayFilter使用一個(gè)RateLimiter實(shí)現(xiàn)來(lái)判斷當(dāng)前請(qǐng)求是否被允許繼續(xù)。如果不允許,HTTP 429 - Too Many Requests則返回默認(rèn)狀態(tài)。
4.1.查看 RequestRateLimiterGatewayFilterFactory
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder
.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
// 執(zhí)行限流
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
});
});
}4.2.查看 RedisRateLimiter
@Override
@SuppressWarnings("unchecked")
public Mono<Response> isAllowed(String routeId, String id) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
// 這里如何加載配置?請(qǐng)思考
Config routeConfig = loadConfiguration(routeId);
// 令牌桶每秒產(chǎn)生令牌數(shù)量
int replenishRate = routeConfig.getReplenishRate();
// 令牌桶容量
int burstCapacity = routeConfig.getBurstCapacity();
// 請(qǐng)求消耗的令牌數(shù)
int requestedTokens = routeConfig.getRequestedTokens();
try {
// 鍵
List<String> keys = getKeys(id);
// 參數(shù)
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", "", requestedTokens + "");
// 調(diào)用lua腳本
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
return flux.onErrorResume(throwable -> {
if (log.isDebugEnabled()) {
log.debug("Error calling rate limiter lua", throwable);
}
return Flux.just(Arrays.asList(1L, -1L));
}).reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
// 判斷是否等于1,1表示允許通過(guò),0表示不允許通過(guò)
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
catch (Exception e) {
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}
static List<String> getKeys(String id) {
String prefix = "request_rate_limiter.{" + id;
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}思考:redis限流配置是如何加載?
其實(shí)就是監(jiān)聽(tīng)動(dòng)態(tài)路由的事件并把配置存起來(lái)
4.3.重點(diǎn)來(lái)了,令牌桶 /META-INF/scripts/request_rate_limiter.lua 腳本剖析
-- User Request Rate Limiter filter
-- See https://stripe.com/blog/rate-limiters
-- See https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L11-L34
-- 令牌桶算法工作原理
-- 1.系統(tǒng)以恒定速率往桶里面放入令牌
-- 2.請(qǐng)求需要被處理,則需要從桶里面獲取一個(gè)令牌
-- 3.如果桶里面沒(méi)有令牌可獲取,則可以選擇等待或直接拒絕并返回
-- 令牌桶算法工作流程
-- 1.計(jì)算填滿令牌桶所需要的時(shí)間(填充時(shí)間 = 桶容量 / 速率)
-- 2.設(shè)置存儲(chǔ)數(shù)據(jù)的TTL(過(guò)期時(shí)間),為填充時(shí)間的兩倍(存儲(chǔ)時(shí)間 = 填充時(shí)間 * 2)
-- 3.從Redis獲取當(dāng)前令牌的剩余數(shù)量和上一次調(diào)用的時(shí)間戳
-- 4.計(jì)算距離上一次調(diào)用的時(shí)間間隔(時(shí)間間隔 = 當(dāng)前時(shí)間 - 上一次調(diào)用時(shí)間)
-- 5.計(jì)算填充的令牌數(shù)量(填充令牌數(shù)量 = 時(shí)間間隔 * 速率)【前提:桶容量是固定的,不存在無(wú)限制的填充】
-- 6.判斷是否有足夠多的令牌滿足請(qǐng)求【 (填充令牌數(shù)量 + 剩余令牌數(shù)量) >= 請(qǐng)求數(shù)量 && (填充令牌數(shù)量 + 剩余令牌數(shù)量) <= 桶容量 】
-- 7.如果請(qǐng)求被允許,則從桶里面取出相應(yīng)數(shù)據(jù)的令牌
-- 8.如果TTL為正,則更新Redis鍵中的令牌和時(shí)間戳
-- 9.返回兩個(gè)兩個(gè)參數(shù)(allowed_num:請(qǐng)求被允許標(biāo)志。1允許,0不允許)、(new_tokens:填充令牌后剩余的令牌數(shù)據(jù))
-- 隨機(jī)寫(xiě)入
redis.replicate_commands()
-- 令牌桶Key -> 存儲(chǔ)當(dāng)前可用令牌的數(shù)量(剩余令牌數(shù)量)
local tokens_key = KEYS[1]
-- 時(shí)間戳Key -> 存儲(chǔ)上次令牌刷新的時(shí)間戳
local timestamp_key = KEYS[2]
-- 令牌填充速率
local rate = tonumber(ARGV[1])
-- 令牌桶容量
local capacity = tonumber(ARGV[2])
-- 當(dāng)前時(shí)間
local now = tonumber(ARGV[3])
-- 請(qǐng)求數(shù)量
local requested = tonumber(ARGV[4])
-- 填滿令牌桶所需要的時(shí)間
local fill_time = capacity / rate
-- 設(shè)置key的過(guò)期時(shí)間(填滿令牌桶所需時(shí)間的2倍)
local ttl = math.floor(fill_time * 2)
-- 判斷當(dāng)前時(shí)間,為空則從redis獲取
if now == nil then
now = redis.call('TIME')[1]
end
-- 獲取當(dāng)前令牌的剩余數(shù)量
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
-- 獲取上一次調(diào)用的時(shí)間戳
local last_refreshed = tonumber(redis.call('get', timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
-- 計(jì)算距離上一次調(diào)用的時(shí)間間隔
local delta = math.max(0, now - last_refreshed)
-- 當(dāng)前的令牌數(shù)量(剩余 + 填充 <= 桶容量)
local now_tokens = math.min(capacity, last_refreshed + (rate * delta))
-- 判斷是否有足夠多的令牌滿足請(qǐng)求
local allowed = now_tokens >= requested
-- 定義當(dāng)前令牌的剩余數(shù)量
local new_tokens = now_tokens
-- 定義被允許標(biāo)志
local allowed_num = 0
if allowed then
new_tokens = now_tokens - requested
-- 允許訪問(wèn)
allowed_num = 1
end
-- ttl > 0,將當(dāng)前令牌的剩余數(shù)量和當(dāng)前時(shí)間戳存入redis
if ttl > 0 then
redis.call('setex', tokens_key, ttl, new_tokens)
redis.call('setex', timestamp_key, ttl, now)
end
-- 返回參數(shù)
return { allowed_num, new_tokens }4.4.查看 GatewayRedisAutoConfiguration 腳本初始化
@Bean
@SuppressWarnings("unchecked")
public RedisScript redisRequestRateLimiterScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(
// 根據(jù)指定路徑獲取lua腳本來(lái)初始化配置
new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
redisScript.setResultType(List.class);
return redisScript;
}
@Bean
@ConditionalOnMissingBean
public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
@Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
ConfigurationService configurationService) {
return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
}思考:請(qǐng)求限流過(guò)濾器是如何開(kāi)啟?
1.通過(guò)yaml配置開(kāi)啟
spring:
cloud:
gateway:
server:
webflux:
filter:
request-rate-limiter:
enabled: true2.GatewayAutoConfiguration自動(dòng)注入bean
@Bean
@ConditionalOnBean({ RateLimiter.class, KeyResolver.class })
@ConditionalOnEnabledFilter
public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter,
KeyResolver resolver) {
return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver);
}重點(diǎn)來(lái)了,真正加載這個(gè)bean的是 @ConditionalOnEnabledFilter 注解進(jìn)行判斷
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@Documented
@Conditional(OnEnabledFilter.class)
public @interface ConditionalOnEnabledFilter {
// 這里value是用來(lái)指定滿足條件的某些類,換一句話說(shuō),就是這些類都加載或注入到ioc容器,這個(gè)注解修飾的自動(dòng)裝配類才會(huì)生效
Class<? extends GatewayFilterFactory<?>> value() default OnEnabledFilter.DefaultValue.class;
}我們繼續(xù)跟進(jìn)代碼,查看@Conditional(OnEnabledFilter.class)
眾所周知,@Conditional可以用來(lái)加載滿足條件的bean,所以,我們分析一下OnEnabledFilter
public class OnEnabledFilter extends OnEnabledComponent<GatewayFilterFactory<?>> {}我分析它的父類,這里有你想要的答案!
public abstract class OnEnabledComponent<T> extends SpringBootCondition implements ConfigurationCondition {
private static final String PREFIX = "spring.cloud.gateway.server.webflux.";
private static final String SUFFIX = ".enabled";
private ConditionOutcome determineOutcome(Class<? extends T> componentClass, PropertyResolver resolver) {
// 拼接完整名稱
// 例如 => spring.cloud.gateway.server.webflux.request-rate-limiter.enabled
String key = PREFIX + normalizeComponentName(componentClass) + SUFFIX;
ConditionMessage.Builder messageBuilder = forCondition(annotationClass().getName(), componentClass.getName());
if ("false".equalsIgnoreCase(resolver.getProperty(key))) {
// 不滿足條件不加載bean
return ConditionOutcome.noMatch(messageBuilder.because("bean is not available"));
}
// 滿足條件加載bean
return ConditionOutcome.match();
}
}5.優(yōu)化限流響應(yīng)[使用全限定類名直接覆蓋類]
小伙伴們,有沒(méi)有發(fā)現(xiàn),這個(gè)這個(gè)響應(yīng)體封裝的不太好,因此,我們來(lái)自定義吧,我們直接覆蓋類,代碼修改如下
@Getter
@ConfigurationProperties("spring.cloud.gateway.server.webflux.filter.request-rate-limiter")
public class RequestRateLimiterGatewayFilterFactory
extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {
private static final String EMPTY_KEY = "____EMPTY_KEY__";
private final RateLimiter<?> defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
/**
* Switch to deny requests if the Key Resolver returns an empty key, defaults to true.
*/
@Setter
private boolean denyEmptyKey = true;
/** HttpStatus to return when denyEmptyKey is true, defaults to FORBIDDEN. */
@Setter
private String emptyKeyStatusCode = HttpStatus.FORBIDDEN.name();
public RequestRateLimiterGatewayFilterFactory(RateLimiter<?> defaultRateLimiter, KeyResolver defaultKeyResolver) {
super(Config.class);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
RateLimiter<?> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder
.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
Assert.notNull(route, "Route is null");
routeId = route.getId();
}
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
// 主要修改這行
return responseOk(exchange, Result.fail("Too_Many_Requests", "請(qǐng)求太頻繁"));
});
});
}
private Mono<Void> responseOk(ServerWebExchange exchange, Object data) {
return responseOk(exchange, JacksonUtils.toJsonStr(data), MediaType.APPLICATION_JSON);
}
private Mono<Void> responseOk(ServerWebExchange exchange, String str, MediaType contentType) {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(str.getBytes(StandardCharsets.UTF_8));
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(contentType);
response.getHeaders().setContentLength(str.getBytes(StandardCharsets.UTF_8).length);
return response.writeWith(Flux.just(buffer));
}
private <T> T getOrDefault(T configValue, T defaultValue) {
return (configValue != null) ? configValue : defaultValue;
}
public static class Config implements HasRouteId {
@Getter
private KeyResolver keyResolver;
@Getter
private RateLimiter<?> rateLimiter;
@Getter
private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS;
@Getter
private Boolean denyEmptyKey;
@Getter
private String emptyKeyStatus;
private String routeId;
public Config setKeyResolver(KeyResolver keyResolver) {
this.keyResolver = keyResolver;
return this;
}
public Config setRateLimiter(RateLimiter<?> rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}
public Config setStatusCode(HttpStatus statusCode) {
this.statusCode = statusCode;
return this;
}
public Config setDenyEmptyKey(Boolean denyEmptyKey) {
this.denyEmptyKey = denyEmptyKey;
return this;
}
public Config setEmptyKeyStatus(String emptyKeyStatus) {
this.emptyKeyStatus = emptyKeyStatus;
return this;
}
@Override
public void setRouteId(String routeId) {
this.routeId = routeId;
}
@Override
public String getRouteId() {
return this.routeId;
}
}
}二、熔斷降級(jí)
思考:為什么需要熔斷降級(jí)?
當(dāng)某個(gè)服務(wù)發(fā)生故障時(shí)(超時(shí),響應(yīng)慢,宕機(jī)),上游服務(wù)無(wú)法及時(shí)獲取響應(yīng),進(jìn)而也導(dǎo)致故障,出現(xiàn)服務(wù)雪崩【服務(wù)雪崩是指故障像滾雪球一樣沿著調(diào)用鏈向上游擴(kuò)展,進(jìn)而導(dǎo)致整個(gè)系統(tǒng)癱瘓】
熔斷降級(jí)的目標(biāo)就是在故障發(fā)生時(shí),快速隔離問(wèn)題服務(wù)【快速失敗,防止資源耗盡】,保護(hù)系統(tǒng)資源不被耗盡,防止故障擴(kuò)散,保護(hù)核心業(yè)務(wù)可用性。
1.技術(shù)選型
1.1.熔斷降級(jí)框架選型對(duì)比表
| 對(duì)比維度 | Hystrix (Netflix) | Sentinel (Alibaba) | Resilience4j |
|---|---|---|---|
| 當(dāng)前狀態(tài) | ? 停止更新 (維護(hù)模式) | ? 持續(xù)更新 | ? 持續(xù)更新 |
| 熔斷機(jī)制 | 滑動(dòng)窗口計(jì)數(shù) | 響應(yīng)時(shí)間/異常比例/QPS | 錯(cuò)誤率/響應(yīng)時(shí)間閾值 |
| 流量控制 | ? 僅基礎(chǔ)隔離 | ? QPS/并發(fā)數(shù)/熱點(diǎn)參數(shù)/集群流控 | ? RateLimiter |
| 隔離策略 | 線程池(開(kāi)銷大)/信號(hào)量 | 并發(fā)線程數(shù)(無(wú)線程池開(kāi)銷) | 信號(hào)量/Bulkhead |
| 降級(jí)能力 | Fallback 方法 | Fallback + 系統(tǒng)規(guī)則自適應(yīng) | Fallback + 自定義組合策略 |
| 實(shí)時(shí)監(jiān)控 | ? Hystrix Dashboard | ? 原生控制臺(tái)(可視化動(dòng)態(tài)規(guī)則) | ? 需整合 Prometheus/Grafana |
| 動(dòng)態(tài)配置 | ? 依賴 Archaius | ? 控制臺(tái)實(shí)時(shí)推送 | ? 需編碼實(shí)現(xiàn)(如Spring Cloud Config) |
| 生態(tài)集成 | ? Spring Cloud Netflix | ? Spring Cloud Alibaba/多語(yǔ)言網(wǎng)關(guān) | ? Spring Boot/響應(yīng)式編程 |
| 性能開(kāi)銷 | 高(線程池隔離) | 低(無(wú)額外線程) | 極低(純函數(shù)式) |
| 適用場(chǎng)景 | 遺留系統(tǒng)維護(hù) | 高并發(fā)控制/秒殺/熱點(diǎn)防護(hù) | 云原生/輕量級(jí)微服務(wù) |
| 推薦指數(shù) | ?? (不推薦新項(xiàng)目) | ????? (Java高并發(fā)首選) | ????? (云原生/響應(yīng)式首選) |
1.2選型決策指南
| 需求場(chǎng)景 | 推薦方案 | 原因 |
|---|---|---|
| 電商秒殺/API高頻調(diào)用管控 | ? Sentinel | 精細(xì)流量控制+熱點(diǎn)防護(hù)+實(shí)時(shí)看板 |
| Kubernetes云原生微服務(wù) | ? Resilience4j | 輕量化+無(wú)縫集成Prometheus+響應(yīng)式支持 |
| Spring Cloud Netflix舊系統(tǒng) | ?? Hystrix | 兼容現(xiàn)存代碼(短期過(guò)渡) |
| 多語(yǔ)言混合架構(gòu)(如Go+Java) | ? Sentinel | 通過(guò)Sidecar代理支持非Java服務(wù) |
| 響應(yīng)式編程(WebFlux) | ? Resilience4j | 原生Reactive API支持 |
2.Resilience4j使用
Resilience4j 可以看作是 Hystrix 的替代品,Resilience4j支持 熔斷器和單機(jī)限流
Resilience4j 是一個(gè)專為函數(shù)式編程設(shè)計(jì)的輕量級(jí)容錯(cuò)庫(kù)。Resilience4j 提供高階函數(shù)(裝飾器),可通過(guò)斷路器、速率限制器、重試或隔離功能增強(qiáng)任何函數(shù)式接口、lambda 表達(dá)式或方法引用。您可以在任何函數(shù)式接口、lambda 表達(dá)式或方法引用上堆疊多個(gè)裝飾器。這樣做的好處是,您可以只選擇所需的裝飾器,而無(wú)需考慮其他因素。
2.1.網(wǎng)關(guān)熔斷降級(jí)(Spring Cloud Gateway + Resilience4j實(shí)戰(zhàn))
2.1.1.pom依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId> </dependency>
2.1.2.yaml配置
spring:
application:
name: laokou-gateway
cloud:
gateway:
server:
webflux:
routes:
- id: LAOKOU-SSO-DEMO
uri: lb://laokou-sso-demo
predicates:
- Path=/sso/**
filters:
- name: CircuitBreaker
args:
name: default
fallbackUri: "forward:/fallback"
filter:
circuit-breaker:
enabled: true2.1.3.CircuitBreakerConfig配置
/**
* @author laokou
*/
@Configuration
public class CircuitBreakerConfig {
@Bean
public RouterFunction<ServerResponse> routerFunction() {
return RouterFunctions.route(
RequestPredicates.path("/fallback").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),
(request) -> ServerResponse.status(HttpStatus.SC_OK)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(Result.fail("Service_Unavailable", "服務(wù)正在維護(hù)"))));
}
@Bean
public Customizer<ReactiveResilience4JCircuitBreakerFactory> reactiveResilience4JCircuitBreakerFactoryCustomizer() {
return factory -> factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
// 3秒后超時(shí)時(shí)間
.timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(3)).build())
.circuitBreakerConfig(io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.ofDefaults())
.build());
}
}到此這篇關(guān)于Spring Cloud Gateway實(shí)現(xiàn)分布式限流和熔斷降級(jí)的文章就介紹到這了,更多相關(guān)Spring Cloud Gateway分布式限流和熔斷降級(jí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java gui實(shí)現(xiàn)計(jì)算器小程序
這篇文章主要為大家詳細(xì)介紹了java gui實(shí)現(xiàn)計(jì)算器小程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07
SpringBoot使用郵箱發(fā)送驗(yàn)證碼實(shí)現(xiàn)注冊(cè)功能
這篇文章主要為大家詳細(xì)介紹了SpringBoot使用郵箱發(fā)送驗(yàn)證碼實(shí)現(xiàn)注冊(cè)功能實(shí)例,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-02-02
Java中CyclicBarrier和CountDownLatch的用法與區(qū)別
CyclicBarrier和CountDownLatch這兩個(gè)工具都是在java.util.concurrent包下,并且平時(shí)很多場(chǎng)景都會(huì)使用到。本文將會(huì)對(duì)兩者進(jìn)行分析,記錄他們的用法和區(qū)別,感興趣的可以了解一下2021-08-08
Java生成的隨機(jī)數(shù)靠譜嗎?多少次會(huì)重復(fù)?
今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí),文章圍繞著Java生成的隨機(jī)數(shù)靠不靠譜展開(kāi),文中有非常詳細(xì)的介紹,需要的朋友可以參考下2021-06-06
Spring Boot容器加載時(shí)執(zhí)行特定操作(推薦)
這篇文章主要介紹了Spring Boot容器加載時(shí)執(zhí)行特定操作及spring內(nèi)置的事件,需要的朋友可以參考下2018-01-01
SpringBoot使用JavaMailSender實(shí)現(xiàn)發(fā)送郵件
JavaMailSender是Spring Framework中的一個(gè)接口,用于發(fā)送電子郵件,本文主要為大家詳細(xì)介紹了SpringBoot如何使用JavaMailSender實(shí)現(xiàn)發(fā)送郵件,需要的可以參考下2023-12-12

