redis redisson 限流器的實(shí)例(RRateLimiter)
redis redisson 限流器實(shí)例
作用:限制一段時間內(nèi)對數(shù)據(jù)的訪問數(shù)量
相關(guān)接口
RRateLimiter
public interface RRateLimiter extends RRateLimiterAsync, RObject {
boolean trySetRate(RateType var1, long var2, long var4, RateIntervalUnit var6);
//設(shè)置訪問速率,var2為訪問數(shù),var4為單位時間,var6為時間單位
void acquire(); //訪問數(shù)據(jù)
void acquire(long var1); //占var1的速度計算值
boolean tryAcquire(); //嘗試訪問數(shù)據(jù)
boolean tryAcquire(long var1); //嘗試訪問數(shù)據(jù),占var1的速度計算值
boolean tryAcquire(long var1, TimeUnit var3); //嘗試訪問數(shù)據(jù),設(shè)置等待時間var3
boolean tryAcquire(long var1, long var3, TimeUnit var5); //嘗試訪問數(shù)據(jù),占數(shù)據(jù)計算值var1,設(shè)置等待時間var3
RateLimiterConfig getConfig();
}
RateType:速度類型
public enum RateType {
OVERALL, //所有客戶端加總限流
PER_CLIENT; //每個客戶端單獨(dú)計算流量
private RateType() {
}
}
RateInternalUnit:速度單位
public enum RateIntervalUnit {
MILLISECONDS {
public long toMillis(long value) {
return value;
}
},
SECONDS {
public long toMillis(long value) {
return TimeUnit.SECONDS.toMillis(value);
}
},
MINUTES {
public long toMillis(long value) {
return TimeUnit.MINUTES.toMillis(value);
}
},
HOURS {
public long toMillis(long value) {
return TimeUnit.HOURS.toMillis(value);
}
},
DAYS {
public long toMillis(long value) {
return TimeUnit.DAYS.toMillis(value);
}
};
private RateIntervalUnit() {
}
public abstract long toMillis(long var1);
}
示例
public class MyTest8 {
public static void main(String[] args){
Config config=new Config();
config.useSingleServer().setAddress("redis://192.168.57.120:6379").setPassword("123456");
RedissonClient client= Redisson.create(config);
RRateLimiter rateLimiter=client.getRateLimiter("rate_limiter");
rateLimiter.trySetRate(RateType.PER_CLIENT,5,2, RateIntervalUnit.MINUTES);
ExecutorService executorService= Executors.newFixedThreadPool(10);
for (int i=0;i<10;i++){
executorService.submit(()->{
try{
rateLimiter.acquire();
System.out.println("線程"+Thread.currentThread().getId()+"進(jìn)入數(shù)據(jù)區(qū):"+System.currentTimeMillis());
}catch (Exception e){
e.printStackTrace();
}
});
}
}
}
控制臺輸出
線程49進(jìn)入數(shù)據(jù)區(qū):1574672546522
線程55進(jìn)入數(shù)據(jù)區(qū):1574672546522
線程56進(jìn)入數(shù)據(jù)區(qū):1574672546526
線程50進(jìn)入數(shù)據(jù)區(qū):1574672546523
線程48進(jìn)入數(shù)據(jù)區(qū):1574672546523
線程51進(jìn)入數(shù)據(jù)區(qū):1574672666627
線程53進(jìn)入數(shù)據(jù)區(qū):1574672666627
線程54進(jìn)入數(shù)據(jù)區(qū):1574672666627
線程57進(jìn)入數(shù)據(jù)區(qū):1574672666628
線程52進(jìn)入數(shù)據(jù)區(qū):1574672666628
說明:兩分鐘之內(nèi)最多只有5個線程在執(zhí)行
分布式限流redission RRateLimiter使用及原理
前提:
最近公司在做有需求在做分布式限流,調(diào)研的限流框架大概有
- 1、spring cloud gateway集成redis限流,但屬于網(wǎng)關(guān)層限流
- 2、阿里Sentinel,功能強(qiáng)大、帶監(jiān)控平臺
- 3、srping cloud hystrix,屬于接口層限流,提供線程池與信號量兩種方式
- 4、其他:redission、手?jǐn)]代碼
實(shí)際需求情況屬于業(yè)務(wù)端限流,redission更加方便,使用更加靈活,下面介紹下redission分布式限流如何使用及原理:
一、使用
使用很簡單、如下
// 1、 聲明一個限流器 RRateLimiter rateLimiter = redissonClient.getRateLimiter(key); // 2、 設(shè)置速率,5秒中產(chǎn)生3個令牌 rateLimiter.trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS); // 3、試圖獲取一個令牌,獲取到返回true rateLimiter.tryAcquire(1)
二、原理
1、getRateLimiter
// 聲明一個限流器 名稱 叫key redissonClient.getRateLimiter(key)
2、trySetRate
trySetRate方法跟進(jìn)去底層實(shí)現(xiàn)如下:
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"
+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",
Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
}
舉個例子,更容易理解:
比如下面這段代碼,5秒中產(chǎn)生3個令牌,并且所有實(shí)例共享(RateType.OVERALL所有實(shí)例共享、RateType.CLIENT單實(shí)例端共享)
trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);
那么redis中就會設(shè)置3個參數(shù):
hsetnx,key,rate,3
hsetnx,key,interval,5
hsetnx,key,type,0
接著看tryAcquire(1)方法:底層源碼如下
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');" //1
+ "local interval = redis.call('hget', KEYS[1], 'interval');" //2
+ "local type = redis.call('hget', KEYS[1], 'type');" //3
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" //4
+ "local valueName = KEYS[2];" //5
+ "if type == 1 then "
+ "valueName = KEYS[3];" //6
+ "end;"
+ "local currentValue = redis.call('get', valueName); " //7
+ "if currentValue ~= false then "
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then " //8
+ "return redis.call('pttl', valueName); "
+ "else "
+ "redis.call('decrby', valueName, ARGV[1]); " //9
+ "return nil; "
+ "end; "
+ "else " //10
+ "redis.call('set', valueName, rate, 'px', interval); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end;",
Arrays.<Object>asList(getName(), getValueName(), getClientValueName()),
value, commandExecutor.getConnectionManager().getId().toString());
}
第1、2、3備注行是獲取上一步set的3個值:rate、interval、type,如果這3個值沒有設(shè)置,直接返回rateLimiter沒有被初始化。
第5備注行聲明一個變量叫valueName 值為KEYS[2],KEYS[2]對應(yīng)的值是getValueName()方法,getValueName()返回的就是上面第一步getRateLimiter我們設(shè)置的key;如果type=1,表示全局共享,那么valueName 的值改為取KEYS[3],KEYS[3]對應(yīng)的值為getClientValueName(),查看getClientValueName()源碼:
String getClientValueName() {
return suffixName(getValueName(), commandExecutor.getConnectionManager().getId().toString());
}
ConnectionManager().getId()如下:
public interface ConnectionManager {
UUID getId();
省略...
}
這個getId()是每個客戶端初始化的時候生成的UUID,即每個客戶端的getId是唯一的,這也就驗(yàn)證了trySetRate方法中RateType.ALL與RateType.PER_CLIENT的作用。
- 接著看第7標(biāo)準(zhǔn)行,獲取valueName對應(yīng)的值currentValue;首次獲取肯定為空,那么看第10標(biāo)準(zhǔn)行else的邏輯
- set valueName 3 px 5,設(shè)置key=valueName value=3 過期時間為5秒
- decrby valueName 1,將上面valueName的值減1
- 那么如果第二次訪問,第7標(biāo)注行返回的值存在,將會走第8標(biāo)注行,緊接著走如下判斷
- 如果當(dāng)前valueName的值也就是3,小于要獲得的令牌數(shù)量(tryAcquire方法中的入?yún)?,那么說明當(dāng)前時間內(nèi)(key的有效期5秒內(nèi)),令牌的數(shù)量已經(jīng)被用完,返回pttl(key的剩余過期時間);反之說明桶中有足夠的令牌,獲取之后將會把桶中的令牌數(shù)量減1,至此結(jié)束。
總結(jié):
redission分布式限流采用令牌桶思想和固定時間窗口,trySetRate方法設(shè)置桶的大小,利用redis key過期機(jī)制達(dá)到時間窗口目的,控制固定時間窗口內(nèi)允許通過的請求量。
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Jpa中Specification的求和sum不生效原理分析
這篇文章主要為大家介紹了Jpa中Specification的求和sum不生效原理示例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
Springboot RocketMq實(shí)現(xiàn)過程詳解
這篇文章主要介紹了Springboot RocketMq實(shí)現(xiàn)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-05-05
Java詳細(xì)分析String類與StringBuffer和StringBuilder的使用方法
當(dāng)對字符串進(jìn)行修改的時候,需要使用 StringBuffer 和 StringBuilder類,和String類不同的是,StringBuffer和 StringBuilder類的對象能夠被多次的修改,并且不產(chǎn)生新的未使用對象2022-04-04
Spring boot中filter類不能注入@Autowired變量問題
這篇文章主要介紹了Spring boot中filter類不能注入@Autowired變量問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
關(guān)于Jsoup將相對路徑轉(zhuǎn)為絕對路徑的方法
這篇文章主要介紹了關(guān)于Jsoup將相對路徑轉(zhuǎn)為絕對路徑的方法,jsoup 是一款Java 的HTML解析器,可直接解析某個URL地址、HTML文本內(nèi)容,需要的朋友可以參考下2023-04-04
MyBatis?超詳細(xì)講解動態(tài)SQL的實(shí)現(xiàn)
動態(tài)?SQL?是?MyBatis?的強(qiáng)大特性之一。如果你使用過?JDBC?或其它類似的框架,你應(yīng)該能理解根據(jù)不同條件拼接?SQL?語句有多痛苦,例如拼接時要確保不能忘記添加必要的空格,還要注意去掉列表最后一個列名的逗號。利用動態(tài)?SQL,可以徹底擺脫這種痛苦2022-03-03
springboot時間格式化的五種方法總結(jié)(解決后端傳給前端的時間顯示不一致)
這篇文章主要給大家介紹了關(guān)于springboot時間格式化的五種方法,文中介紹的方法解決了后端傳給前端的時間顯示不一致,文中通過圖文以及代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01

