Redis分布式限流的幾種實現(xiàn)
一、 簡介
分布式限流是指通過將限流策略嵌入到分布式系統(tǒng)中,以控制流量或保護服務(wù),保證系統(tǒng)在高并發(fā)訪問情況下不被過載。
分布式限流可以防止系統(tǒng)因大量請求同時到達導致壓力過大而崩潰,從而提高系統(tǒng)的穩(wěn)定性和可靠性。同時,它可以使得業(yè)務(wù)資源能夠更好地分配,提高系統(tǒng)的效率。
二、分布式限流
1 數(shù)據(jù)結(jié)構(gòu)
1.1 Redis List
Redis List 是一個可以輸入相同元素和非唯一元素的集合且支持在兩端進行快速(O(1))插入和刪除元素。
1.2 Redis Set
Redis Set 是一個無序,但不允許重復元素的集合。您可以使用這些命令對Redis集進行常規(guī)操作: SADD,SREM,SISMEMBER,SMEMBERS等。
1.3 Redis Sorted Set
Redis Sorted Set 是一個有序的、不重復的元素集合。每個元素都關(guān)聯(lián)著一個浮點數(shù)值(稱為 Score)。通過 Score 可以從小到大排序得到一個有序集合。
2 實現(xiàn)分布式限流
在Redis中可以使用令牌桶算法來實現(xiàn)分布式限速。具體方法為:
Step 1:創(chuàng)建一個列表作為Redis令牌桶
String key = "rate_limit:" + userId; // 模擬用戶請求訪問 List<String> tokens = redis.lrange(key, 0, -1);
Step 2:設(shè)定令牌桶的基準參數(shù)
int maxTokens = 50; long timeInterval = 1 * 1000; long now = System.currentTimeMillis();
Step 3:計算Redis中令牌數(shù)量
long expiredTokens = tokens.stream().filter(t -> Long.parseLong(t) < now - timeInterval).count(); tokens = tokens.subList((int) expiredTokens, tokens.size()); long remainTokens = maxTokens - tokens.size();
Step 4:基于令牌數(shù)量,判斷是否超過限制
if (remainTokens < 1) {
throw new RateLimitException("請求太頻繁,請稍后再試!");
}
Step 5:如果沒有超出限制,則更新Redis中令牌數(shù)并設(shè)置過期時間
Long expiresIn = now + timeInterval; redis.multi(); redis.rpush(key, String.valueOf(expiresIn)); redis.pexpire(key, timeInterval); redis.exec();
3 實現(xiàn)原理分析
以上代碼所示首先需要建立Redis List用于存儲Token,其次需要設(shè)定令牌桶的基準參數(shù)(比如最大Token數(shù)量和Token過期間隔等)。在用戶訪問請求時,需要計算Redis中的令牌數(shù)量,根據(jù)規(guī)則對訪問量進行限制。如果沒有超過限制,則需要更新Redis List中令牌數(shù)并設(shè)置過期時間;如果超過了限制,則需要返回錯誤信息并拒絕服務(wù)。
整個過程中,需要注意并發(fā)訪問情況下的線程安全問題,并確保流量控制配置的公共協(xié)商,如最大QPS(Queries Per Second),哪些接口需限制流量等。
三、分布式限流算法
在實際的系統(tǒng)設(shè)計中,為了防止某一時刻出現(xiàn)大量請求導致系統(tǒng)崩潰,我們通常會采用限流策略來控制流量,而Redis作為分布式NoSQL數(shù)據(jù)庫,在限流中也有著廣泛的應用。下面介紹一些Redis分布式限流的經(jīng)典算法。
1. 計數(shù)器算法
計數(shù)器算法比較簡單,直接利用Redis存儲每個IP或者用戶的請求次數(shù),當請求次數(shù)超過預設(shè)閾值時拒絕服務(wù)。代碼如下:
public boolean isAllowed(String key, int limit, int timeout) {
Jedis jedis = getJedis();
long count = jedis.incr(key);
if (count == 1) {
jedis.expire(key, timeout);
}
boolean allowed = count <= limit;
if (!allowed) {
jedis.del(key);
}
jedis.close();
return allowed;
}
key:需要限流的用戶標識,可根據(jù)IP、UserID等進行定義limit:閾值,即允許的最大請求數(shù)timeout:過期時間,對于計數(shù)器算法,一定要設(shè)置過期時間,否則緩存中的請求次數(shù)會一直不斷累加
2. 漏斗算法
漏斗算法的核心思想是將請求按照恒定的速率轉(zhuǎn)換為水流,有效控制請求超出服務(wù)處理能力的情況。漏斗算法實現(xiàn)代碼如下:
public boolean isAllowed(String key, int capacity, double leakRate, int reqCount) {
Jedis jedis = getJedis();
long nowTime = System.currentTimeMillis();
String luaScript =
"local currentCapacity = tonumber(redis.call('hget', KEYS[1], 'leftCapacity'))\n"
+ "if currentCapacity == nil then\n"
+ " redis.call('hset', KEYS[1], 'lastTime', ARGV[2])\n"
+ " redis.call('hset', KEYS[1], 'leftCapacity', ARGV[1] - 1)\n"
+ " return 1\n"
+ "end\n"
+ "local changeTime = tonumber(redis.call('hget', KEYS[1], 'lastTime'))\n"
+ "local delayMillSeconds = nowTime - changeTime\n"
+ "local currentDelayCount = tonumber(delayMillSeconds*ARGV[3])\n"
+ "local currentCapacity = math.min(currentDelayCount+currentCapacity, ARGV[1])\n"
+ "if currentCapacity >= ARGV[4] then\n"
+ " return 0\n"
+ "else\n"
+ " redis.call('hset', KEYS[1], 'leftCapacity', currentCapacity-1)\n"
+ " redis.call('hset', KEYS[1], 'lastTime', nowTime)\n"
+ " return 1\n"
+ "end";
Object result = jedis.eval(
luaScript,
Collections.singletonList(key),
Arrays.asList(String.valueOf(capacity), String.valueOf(nowTime), String.valueOf(leakRate), String.valueOf(reqCount))
);
boolean allowed = (result instanceof Long ? (Long) result : 0L) == 1L;
jedis.close();
return allowed;
}
key:需要進行限流的用戶標識capacity:漏斗容量,即最大允許請求數(shù)量leakRate:漏嘴流水速率,保證有序的請求到達reqCount:預計請求量,用于計算漏斗每次流出的數(shù)量
3. 令牌桶算法
令牌桶算法的特點是以一個固定的速率不斷產(chǎn)生令牌,并將令牌放入到桶中,訪問時若桶為空,則表示請求數(shù)超限。令牌桶算法實現(xiàn)代碼如下:
public boolean isAllowed(String key, int capacity, double rate, int reqCount) {
long nowTime = System.currentTimeMillis();
Jedis jedis = getJedis();
String luaScript =
"local currentLimit = tonumber(redis.call('get', KEYS[1]) or '0')\n"
+ "if currentLimit + ARGV[1] > tonumber(KEYS[2]) then\n"
+ " return false\n"
+ "else\n"
+ " redis.call('incrby', KEYS[1], ARGV[1])\n"
+ " redis.call('expire', KEYS[1], ARGV[2])\n"
+ " return true\n"
+ "end";
Object result = jedis.eval(luaScript, 2, key, String.valueOf(capacity), String.valueOf(reqCount), String.valueOf(rate * (nowTime / 1000)));
boolean allowed = (result instanceof Boolean ? (Boolean) result : false);
jedis.close();
return allowed;
}
key:需要進行限流的用戶標識capacity:桶容量rate:令牌發(fā)放速率reqCount:請求數(shù)量
四、分布式限流實戰(zhàn)
1. 單機限流實現(xiàn)
假設(shè)我們有一個需求,需要限制每個IP一分鐘內(nèi)最多只能發(fā)送100個請求??梢酝ㄟ^Redis的INCR、EXPIRE等API操作來簡單實現(xiàn)單機限流。
public boolean isAllowed(String ip, int limit, int interval) {
Jedis jedis = getJedis();
String key = "ip:" + ip;
long count = jedis.incr(key);
if (count == 1) {
jedis.expire(key, interval);
}
boolean allowed = count <= limit;
if (!allowed) {
jedis.del(key);
}
jedis.close();
return allowed;
}
2. 基于Redis Clusters的分布式限流實現(xiàn)
當業(yè)務(wù)規(guī)模擴大時,單機的限流已經(jīng)無法滿足需求,這時候需要考慮使用Redis Clusters實現(xiàn)分布式限流。Clusers擴展了原先Redis的功能,不僅支持橫向擴展,而且提高了整個集群的可用性。限流算法同上,只是需要使用把數(shù)據(jù)分配到Cluser內(nèi)不同的節(jié)點上。
public boolean isAllowed(String ip, int limit, int interval) {
JedisCluster jedis = getJedisCluster();
String key = "ip:" + ip;
long count = jedis.incr(key);
if (count == 1) {
jedis.expire(key, interval);
}
boolean allowed = count <= limit;
if (!allowed) {
jedis.del(key);
}
return allowed;
}
五、基于Redis分布式限流的優(yōu)化
1. 緩存擊穿
1.1 問題描述
在高并發(fā)場景下,如果存在大量的緩存未命中請求,將會導致訪問底層數(shù)據(jù)存儲系統(tǒng),這種情況被稱為緩存擊穿。
1.2 解決方案
1.2.1 使用互斥鎖
/**
* 獲取緩存值方法
* @param key 緩存鍵值
* @return 緩存值
*/
public String getCacheValue(String key) {
String value = cache.get(key);
if (value == null) { //緩存未命中
//使用互斥鎖
Lock lock = redisson.getLock(key);
if (lock.tryLock()) { //嘗試獲取鎖
try {
value = cache.get(key); //再次嘗試獲取緩存
if (value == null) { //如果仍然未命中,從數(shù)據(jù)庫中獲取
value = db.get(key);
cache.put(key, value); //將查詢結(jié)果放入緩存
}
} finally {
lock.unlock(); //釋放鎖
}
} else {
Thread.sleep(100); //自旋一段時間后重試
return getCacheValue(key);
}
}
return value;
}
1.2.2 使用預熱機制
預熱機制是指在系統(tǒng)啟動的時候,提前加載熱點數(shù)據(jù)到緩存中,以減少緩存未命中請求。預熱的方式可以使用定時任務(wù)或者其他方式,在系統(tǒng)低峰期進行加載。
2. 熱點key問題的解決方案
2.1 問題描述
在高并發(fā)場景下,如果某個key的請求量過大,將會導致這個key成為熱點key,從而導致緩存雪崩問題。
2.2 解決方案
2.2.1 分布式鎖
/**
* 獲取緩存值方法
* @param key 緩存鍵值
* @return 緩存值
*/
public String getCacheValue(String key) {
String value = cache.get(key);
if (value == null) { //緩存未命中
//使用分布式鎖
RLock lock = redisson.getFairLock(key);
if (lock.tryLock()) { //嘗試獲取鎖
try {
value = cache.get(key); //再次嘗試獲取緩存
if (value == null) { //如果仍然未命中,從數(shù)據(jù)庫中獲取
value = db.get(key);
cache.put(key, value); //將查詢結(jié)果放入緩存
}
} finally {
lock.unlock(); //釋放鎖
}
} else {
Thread.sleep(100); //自旋一段時間后重試
return getCacheValue(key);
}
}
return value;
}
2.2.2 分布式緩存
使用分布式緩存將數(shù)據(jù)均勻地分散到多個節(jié)點上,從而避免單點瓶頸問題。
3. 并發(fā)競爭優(yōu)化
3.1 問題描述
在高并發(fā)場景下,對于某些資源的并發(fā)訪問將會導致性能瓶頸,需要進行并發(fā)競爭優(yōu)化。
3.2 解決方案
3.2.1 使用限流器
限流器類似于信號燈,用于控制并發(fā)請求的數(shù)量,在高峰期可以采用漏桶算法或令牌桶算法進行限流。
3.2.2 使用異步線程池
對于一些耗時的操作,可以使用異步線程池進行處理,從而避免阻塞主線程,提升系統(tǒng)的并發(fā)能力。
到此這篇關(guān)于Redis分布式限流的幾種實現(xiàn)的文章就介紹到這了,更多相關(guān)Redis分布式限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Redis實現(xiàn)分布式限流的幾種方法
- redis+lua實現(xiàn)分布式限流的示例
- 詳解Redisson分布式限流的使用及原理
- Redisson分布式限流的實現(xiàn)原理分析
- Redisson分布式限流器RRateLimiter的使用及原理小結(jié)
- Redisson分布式限流的實現(xiàn)原理解析
- redisson分布式限流RRateLimiter源碼解析
- Redis分布式限流組件設(shè)計與使用實例
- 基于Redis+Lua腳本實現(xiàn)分布式限流組件封裝的方法
- Redis和Lua實現(xiàn)分布式限流器的方法詳解
- Redis分布式限流生產(chǎn)環(huán)境落地方案
相關(guān)文章
Redis快速實現(xiàn)分布式session的方法詳解
Session是客戶端與服務(wù)器通訊會話跟蹤技術(shù),服務(wù)器與客戶端保持整個通訊的會話基本信息。本文主要介紹了Redis快速實現(xiàn)分布式session的方法,感興趣的可以學習一下2022-01-01

