Spring+Redis+RabbitMQ開發(fā)限流和秒殺項目功能
本文將圍繞高并發(fā)場景中的限流和秒殺需求綜合演示Spring Boot整合JPA、Redis緩存和RabbitMQ消息隊列的做法。
本項目將通過整合Springboot和Redis以及Lua腳本來實現(xiàn)限流和秒殺的效果,將通過RabbitMQ消息隊列來實現(xiàn)異步保存秒殺結果的效果。
一、項目概述
本項目將要實現(xiàn)的秒殺是指商家在某個時間段以非常低的價格銷售商品的一種營銷活動。
由于商品價格非常低,因此單位時間內發(fā)起購買商品的請求會非常多,從而會對系統(tǒng)造巨大的壓力。對此,在一些秒殺系統(tǒng)中往往會整合限流的功能,同時會通過消息隊列異步地保存秒殺結果。
本章將要實現(xiàn)的限流和秒殺功能歸納如下:
(1)通過Spring Boot的控制器類對外接收秒殺請求。
(2)針對請求進行限流操作,比如秒殺商品的數(shù)量是10個,就限定在秒殺開始后的20秒內只有100個請求能參加秒殺,該操作是通過Redis來實現(xiàn)的。
(3)通過限流檢驗的這些請求將會同時競爭若干個秒殺商品。該操作將通過基于Redis的Lua腳本來實現(xiàn)。
(4)為了降低數(shù)據(jù)庫的壓力,秒殺成功的記錄將通過RabbitMQ隊列以異步的方式記錄到數(shù)據(jù)庫中。
(5)同時,將通過RestTemple對象以多線程的方式模擬發(fā)送秒殺請求,以此來觀察本秒殺系統(tǒng)的運行效果。
也就是說,本系統(tǒng)會綜合用到Spring Boot、JPA、Redis和RabbitMQ,相關組件之間的關系如圖所示。

二、基于Redis的Lua腳本分析
Lua使用標準C語言開發(fā)而成的,它是一種輕量級的腳本語言,可嵌入基于Redis等的應用程序中。Lua腳本可以駐留在內存中,所以具有較高的性能,適用于處理高并發(fā)的場景。
Lua腳本的特性
Lua腳本語言是由巴西一所大學的Roberto lerusalimschy 、 Waldemar Celes和 LnHenrique de Figuciredo設計而成的,它具有如下兩大特性
(1)輕量性:Lua只具有一些核心和最基本的庫,所以非常輕便,非常適合嵌入由其他語言編寫的代碼中。
(2)擴展性:Lua語言中預留了擴展接口和相關擴展機制,這樣在Lua語言中就能很方便地引入其他開發(fā)語言的功能,
本章給出的秒殺場景中會向Redis服務器發(fā)送多條指令,為了降低網(wǎng)絡調用的開銷,會把相關Redis命令放在Lua腳本里。通過調用Lua腳本只需要耗費少量的網(wǎng)絡調用代價就能執(zhí)行多條Redis命令。
此外,秒殺相關的Redis語句還需要具備原子性,即這些語句要么全都執(zhí)行,要么全都不執(zhí)行。而Lua腳本是作為一個整體來執(zhí)行的,所以可以充分地確保相關秒殺語句的原子性。
在Redis中引入Lua腳本
在啟動Redis服務器以后,可以通過redis-cli命令運行l(wèi)ua腳本,具體步驟如下:
- 可以在
C:work\redisConf\lua目錄中創(chuàng)建redisCallLua.lua文件,在其中編寫Lua腳本,注意,Lua腳本文件的擴展名一般都是.lua。 - 在第一步創(chuàng)建的
redisCallLua.lua文件中加入一行代碼,在其中通過redis.call命令執(zhí)行set name Peter的命令,
redis.call('set', 'name', 'Peter')通過rdis.call方法在Redis中調用Lua腳本時,第一個參數(shù)是Redis命令,比如這里是set,第二個參數(shù)以及之后的參數(shù)是執(zhí)行該條Redis命令的參數(shù)。
- 通過如下的
--eval命令執(zhí)行第二步定義的Lua腳本,其中C:work\redisConf\lua是這條Lua腳本所在的路徑,而redisCallLua.lua是腳本名。
redis-cli --eval C:\work\redisConf\lua\redisCallLua.lua
上述命令運行后,得到的返回結果是空(nil),原因是該Lua腳本只是通過set命令設置了值,并沒有返回結果。不過通過get name命令就能看到通過這條Lua腳本緩存的name值,具體是Peter。
如果Lua腳本包含的語句很少,那么還可以直接用eval命令來執(zhí)行該腳本,具體做法是,
先通過redis-cli語句連接到Redis服務器,隨后再執(zhí)行如下eval命令:
eval "redis.call('set','BookName','Spring Boot')" 0從上述語句中能看到,在該條eval命令之后通過雙引號引入了待執(zhí)行的Lua腳本,在該腳本中依然是通過redis.call語句執(zhí)行Redis的set命令,進行設置緩存的操作。
在該eval命令之后還指定了Lua腳本中KEYS類型參數(shù)的個數(shù),這里是0,表示該Lua腳本沒有KEYS類型的參數(shù)。注意,這里設置的是KEYS類型的參數(shù),而不是ARGV類型的參數(shù),下文將詳細說明這兩種參數(shù)的差別。
Lua腳本的返回值和參數(shù)
在Lua腳本中,可以通過retum語句返回執(zhí)行的結果,這部分對應的語法比較簡單。
同時,Redis在通過eval命令執(zhí)行Lua腳本時,可以傳入KEYS和ARGV這兩種不同類型的參數(shù),它們的區(qū)別是,可以用KEYS參數(shù)來傳入Redis命令所需要的參數(shù),可以用ARGV參數(shù)來傳入自定義的參數(shù),通過如下兩個eval執(zhí)行Lua腳本的命令,可以看到這兩種參數(shù)的差別。
127.0.0.1:6379> eval "return {KEYS[1],ARGV[1],ARGV[2]" 1 keyono argvone argvtwo
1) "keyone"
2) "argvone"
3) "argvtwo"
127.0.0.1:6379> eval "return {KEYS[1].ARGV[1],ARGV[2]}" 2 keyone argvone argvtwo
1) "key1"
2) "argvtwo"在第1行eval語句中,KEYS[1]表示KEYS類型的第一個參數(shù),而ARGV[1]和ARGV[2]對應地表示第一個和第二個ARGV類型的參數(shù)。
在第1行eval語句中,雙引號之后的1表示KEYS類型的參數(shù)個數(shù)是1,所以統(tǒng)計參數(shù)個數(shù)時并不把ARGV自定義類型的參數(shù)統(tǒng)計在內,隨后的keyone, argvone和argvtwo分別對應KEYS[1]、ARGV[1]和ARGV[2].
執(zhí)行第一行對應的Lua腳本時,會看到如第2~4行所示的輸出結果,這里輸出了KEYS[1]、
ARGV[1]和ARGV[2]這3個參數(shù)對應的值。
第5行腳本和第1行的差別是,表示KEYS參數(shù)個數(shù)的值從1變成了2。但這里第2個參數(shù)是ARGV類型的,而不是KEYS類型的,所以這條Lua腳本語句會拋棄第2個參數(shù),即ARGV[1],通過第6行和第7行的輸出結果能驗證這點。
所以,在通過eval命令執(zhí)行Lua腳本時,一定要確保參數(shù)個數(shù)和類型的正確性。同時,這里再次提醒,eval命令之后傳入的參數(shù)個數(shù)是KEYS類型參數(shù)的個數(shù),而不是ARGV類型的。
分支語句
在Lua腳本中,可以通過if…else語句來控制代碼的執(zhí)行流程,具體語法如下:
if(布爾表達式) then 布爾表達式是true時執(zhí)行的語句 else 布爾表達式是false時執(zhí)行的語句 end
通過如下的ifDemo.lua范例,讀者可以看到在Lua腳本中使用分支語句的做法。
if redis.call('exists','studentID')==1 then
return 'Existed'
else
redis.call('set','StudentID','001');
return 'Not Existed'
end在第1行中,通過if語句判斷redis.call命令執(zhí)行的exists語句是否返回1,如果是,則表示StudentID鍵存在,就會執(zhí)行第2行的returm 'Existed’語句返回Existed,否則走第3行的else流程,執(zhí)行第4行和第5行的語句,設置StudentID的值,并通過retum語句返回Not Existed。
由此可以看到在Lua腳本中使用if分支語句的做法。該腳本的運行結果是:第一次運行時,由于StudentID鍵不存在,因此會走else流程,從而看到Not Existed的輸出,而在第二次運行時,由于此時該鍵已經(jīng)存在,因此會直接輸出’Existed’的結果。
三、實現(xiàn)限流和秒殺功能
本節(jié)將要創(chuàng)建的QuickBuyDemo項目中,一方面會用到上文提到的Lua腳本實現(xiàn)限流和秒殺的功能,另一方面將通過RabbitMQ消息隊列實現(xiàn)異步保存秒殺結果的功能。
創(chuàng)建項目并編寫配置文件
可以在IDEA集成開發(fā)環(huán)境中創(chuàng)建名為QuickBuyDemo的Maven項目,在該項目的pom.xml文件中通過如下關鍵代碼引入所需要的依賴包:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.10</version>
</dependency>
</dependencies>這里通過第2-5行代碼引入了SpringBoot的依賴包,通過第6-9行代碼引入了RabbitMQ消息隊列相關的依賴包,通過第10-13行代碼引入了Redis相關的依賴包,通過第14-23行代碼引入了HTTP客戶端相關的依賴包,在本項目中將通過HTTP客戶端模擬客戶請求,從而驗證秒殺效果。
在本項目resources目錄的application.properties配置文件中,將通過如下代碼配置消息隊列和Redis緩存:
rabbitmq.host=127.0.0.1 rabbitmq.port=5672 rabbitmq.username=guest rabbitmq.password=guest redis.host=localhost redis.port=6379
在該配置文件中,通過第1~4行代碼配置了RabbitMQ的連接參數(shù),通過第5行和第6行代碼配置了Redis的連接參數(shù)。
編寫啟動類和控制器類
本項目的啟動類如下,由于和大多數(shù)的Spring Boot項目啟動類完全一致,因此不再重復講述。
package prj;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootApp.class, args);
}
}本項目的控制器類代碼如下,在該Controller控制器類的第11-25行代碼中封裝了實現(xiàn)秒殺服務的quickBuy方法,該方法是以quickBuy/{item}/{person}格式的URL請求對外提供服務的,其中item參數(shù)表示商品,而person參數(shù)則表示商品的購買人。
package prj.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import prj.receiver.BuyService;
@RestController
public class Controller {
@Autowired
private BuyService buyService;
@RequestMapping("/quickBuy/{item}/{person}")
public String quickBuy(@PathVariable String item, @PathVariable String person){
//20秒里限流100個請求
if(buyService.canVisit(item, 20,100)) {
String result = buyService.buy(item, person);
if (!result.equals("0")) {
return person + " success";
} else {
return person + " fail";
}
}
else{
return person + " fail";
}
}在quickBuy方法中,首先通過第14行的buyService.canVisit方法對請求進行了限流操作,這里在20秒中只允許有100個請求訪問,如果通過限流驗證,那么會繼續(xù)通過第15行的buyService.buy方法進行秒殺操作。注意,這里的實現(xiàn)限流和秒殺功能的代碼都封裝在第10行定義的BuyService類中。
消息隊列的相關配置
在本項目的RabbitMQConfig類中將配置RabbitMQ的消息隊列和消息交換機,具體代碼如下:
package prj;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig{
//定義含主題的消息隊列
@Bean
public Queue objectQueue() {
return new Queue("buyRecordQueue");
}
//定義交換機
TopicExchange myExchange() {
return new TopicExchange("myExchange");
Binding bindingObjectQueue(Queue objectQueue,TopicExchange exchange) {
return BindingBuilder.bind(objectQueue).to(exchange).with("buyRecordQueue");
}其中通過第9行的objectQueue方法創(chuàng)建了名為buyRecordQucue的消息隊列,該消息隊同將向用戶傳輸秒殺的結果,通過第14行的myExchange方法創(chuàng)建了名為myExhnge的清息交換機,并通過第18行的bindingObjectQueue方法根據(jù)buyRecordQucue主題綁定了上述消息以列和消息交換機。
實現(xiàn)秒殺功能的Lua腳本
在本項目中,實現(xiàn)秒殺效果的Lua腳本代碼如下:
local item = KEYS[1]
local person = ARGV[1]
local left = tonumber(redis.call('get',item))
if (left>=1) then
redis.call ('decrby',item,1)
redis.call ('rpush", 'personList',person)
return 1
else在該腳本中,首先通過KEYS[1]參數(shù)傳入待秒殺的商品,并賦予item對象,再通過ARGV[1]參數(shù)傳入發(fā)起秒殺請求的用戶,并賦子person對象。
隨后在第3行中,通過get item命令從Redis緩存中獲取該商品還有多少庫存,再通過第4行的if語句進行判斷。
如果發(fā)現(xiàn)該商品剩余的庫存數(shù)量大于等于1,就會執(zhí)行第5~7行的Lua腳本,先通過decrby命令把庫存數(shù)減1,再調用rpush命令記錄當前秒殺成功的用戶,并通過第7行的return語句返回1,表示秒殺成功。如果發(fā)現(xiàn)庫存數(shù)已經(jīng)小于1,那么會直接通過第9行的語句返且0,表示秒殺失敗。
在業(yè)務實現(xiàn)類中實現(xiàn)限流和秒殺
在BuyService.java中,將調用Redis和Lua腳本實現(xiàn)限流和秒殺的功能,具體代碼如下:
package prj.receiver;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import prj.model.buyrecord;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Service
public class BuyService {
@Resource
private RedisTemplate redisTemplate;
@Autowired
private AmqpTemplate amqpTemplate;
public boolean canVisit(String item, int limitTime, int limitNum) {
long curTime = System.currentTimeMillis();
// 在zset里存入請求
redisTemplate.opsForZSet().add(item, curTime, curTime);
// 移除時間范圍外的請求
redisTemplate.opsForZSet().removeRangeByScore(item,0,curTime - limitTime * 1000);
// 統(tǒng)計時間范圍內的請求個數(shù)
Long count = redisTemplate.opsForZSet().zCard(item);
// 統(tǒng)一設置所有請求的超時時間
redisTemplate.expire(item, limitTime, TimeUnit.SECONDS);
return limitNum >= count;
}
public String buy(String item, String person){
String luaScript = "local person = ARGV[1]\n" +
"local item = KEYS[1] \n" +
"local left = tonumber(redis.call('get',item)) \n" +
"if (left >= 1) \n" +
"then redis.call('decrby',item,1) \n" +
" redis.call('rpush','personList',person) \n" +
"return 1 \n" +
"else \n" +
"return 0\n" +
"end\n" +
"\n" ;
String key=item;
String args=person;
DefaultRedisScript<String> redisScript = new DefaultRedisScript<String>();
redisScript.setScriptText(luaScript);
//調用lua腳本,請注意傳入的參數(shù)
Object luaResult = redisTemplate.execute((RedisConnection connection) -> connection.eval(
redisScript.getScriptAsString().getBytes(),
ReturnType.INTEGER,
1,
key.getBytes(),
args.getBytes()));
//如果秒殺成功,向消息隊列發(fā)消息,異步插入到數(shù)據(jù)庫
if(!luaResult.equals("0") ){
buyrecord record = new buyrecord();
record.setItem(item);
record.setPerson(person);
amqpTemplate.convertAndSend("myExchange","buyRecordQueue",record);
}
//根據(jù)lua腳本的執(zhí)行情況返回結果
return luaResult.toString();
}
}在上述代碼中,首先通過第2-11行的import語句引入了本類所要用到的依賴包,隨后在第15行中定義了調用Redis會用到的redisTemplate對象,在第17行中定義了向RabbitMQ消息隊列發(fā)送消息所要用到的amqpTemplate對象。
第18行的canVisit方法實現(xiàn)了限流效果,該方法的item參數(shù)表示待限流的商品,limitTime和LimitNum參數(shù)分別表示在指定時間內需要限流的請求個數(shù)。
在該方法中使用Redis的有序集合實現(xiàn)了限流效果,具體的做法是,在第21行的代碼中,通過zadd方法把表示操作類型的item作為鍵插入有序集合,插入時用表示當前時間的curTime作為值,以保證值的唯一性,同樣再用curTime值作為有序集合中元素的score值。
隨后在第23行中,通過removeRangeByScore命令移除從0到距當前時間limitTime范圍內的數(shù)據(jù),比如限流的時間范圍是20秒,那么通過這條命令就能在有序集合中移除score范圍從0到距離當前時間20秒的數(shù)據(jù),從而確保有序集合只保存最近20秒內的請求。

在此基礎上,通過第25行代碼用zcard命令統(tǒng)計有序集合內鍵為item的個數(shù),如果通過第28行的布爾語句發(fā)現(xiàn)當前個數(shù)還沒達到限流的上限,該方法就會返回true,表示該請求能繼續(xù),否則返回false,表示該請求將會被限流。
同時,需要通過第27行的expire語句設置有序集合中數(shù)據(jù)的超時時間,這樣就能確保在限流以及秒殺動作完成后這些鍵能自動刪除。
第30行定義的buy方法將會實現(xiàn)秒殺的功能,其中先通過第31~41行代碼定義實現(xiàn)秒殺功能的Lua腳本,該腳本之前分析過,隨后再通過第47一52行代碼使用redisTemplate.execute方法執(zhí)行這段Lua腳本。
在執(zhí)行時,會通過第50行代碼指定KEYS類型參數(shù)的個數(shù),通過第51行和第52行代碼傳入該腳本執(zhí)行時所需要用到的KEYS和ARGVS參數(shù)。
隨后會通過第54行的f語句判斷秒殺腳本的執(zhí)行結果,如果秒殺成功,那么會通過第55~58行代碼用amqpTemplate對象向buyRecordQueue隊列發(fā)送包含秒殺結果的record對象。最后,再通過第61行的語句返回秒殺的結果。
觀察秒殺效果
至此,可以通過如下步驟啟動Redis、RabbitMQ和QuickBuyDemo項目,并觀察秒殺效果。
- 在命令行中通過
rabbitmq-server.bat start命令啟動RabbitMQ。 - 通過運行redis-server.exe啟動Redis服務器,并通過運行redis-cli.exe啟動Redis客戶端,隨后在Redis客戶端通過
set Computer 10命令向Redis中緩存一條庫存數(shù)據(jù),表示有10個Computer可供秒殺。 - 在QuickBuyDemo項目中,通過運行SpringBootApp.java啟動類啟動該項目。成功啟動后,在瀏覽器中輸入
http:localhost:8080/quickBuy/Computer/Tom發(fā)起秒殺請求,其中Computer參數(shù)表示秒殺的商品,而Tom則表示發(fā)起秒殺請求的人。
輸入后,能在瀏覽器中看到Tom success的結果,隨后到Redis客戶端窗口運行get Computer命令,能看到Computer的庫存數(shù)量會降到9,由此可以確認秒殺成功。同時,可以通過lindex personList 0命令觀察到成功發(fā)起秒殺請求的人是Tom。
四、以異步方式保存秒殺結果
如果在上述QuickBuyDemo項目中直接把秒殺結果插入MySQL數(shù)據(jù)庫,那么當秒殺請求并發(fā)量很高時會對數(shù)據(jù)庫造成很大的壓力,所以在該項目中會通過消息隊列把秒殺結果傳輸?shù)紻BHandlerPrj項目中,用異步的方式保存數(shù)據(jù),從而降低數(shù)據(jù)庫的負載壓力。
創(chuàng)建項目并設計數(shù)據(jù)庫
首先需要創(chuàng)建名為DBHandlerPrj的Maven項目,在其中實現(xiàn)異步保存秒殺數(shù)據(jù)的功能,該項目的pom.xml文件如下,其中通過第2-5行代碼引入了Spring Boot依賴包,通過第6-9行代碼引入了RabbitMO消息隊列的依賴包,通過第10~18行代碼引入了JPA和MySQL的依賴包。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>本項目將會用到如表所示的buyrecord表,該表是創(chuàng)建在本地MySQL的QuickBuy數(shù)據(jù)表(schema)中的,在其中將會保存秒殺結果。
| 字段名 | 類型 | 說明 |
|---|---|---|
| item | 字符串 | 秒殺成功的商品名 |
| person | 字符串 | 秒殺成功的用戶 |
而本項目的啟動類SpringBootAppjava和QuickBuyDemo項目中的完全一致,所以不再重復說明。
配置消息隊列和數(shù)據(jù)庫參數(shù)
在本項目resources目錄的application.yml文件中,將通過如下代碼配置消息隊列和數(shù)據(jù)庫連接參數(shù)。
server:
port: 8090
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
spring:
jpa:
show-sql: true
hibernate:
dll-auto: validate
datasource:
url: jdbc:mysql://localhost:3306/QuickBuy?serverTimezone=GMT
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver由于之前的QuickBuyDemo項目已經(jīng)占用了8080端口,因此本配置文件將通過第1行和第2行代碼設置工作端口為8090。隨后,本配置文件將通過第3~7行代碼設置RabbiMQ消息隊列的連接參數(shù),具體是連接到本地5672端口,且連接所用的用戶名和密碼都是guest。
由于本項目是通過JPA的方式連接MySQL庫的,因此本配置文件通過第8-12行代碼配置了JPA的參數(shù),通過第13-17行代碼配置了MySQL的連接參數(shù)。
此外,和QuickBuyDemo項目一樣,本項目依然是在RabbitMQConfg.java配置文件中設置RabbitMQ消息隊列和交換機,具體代碼如下,其中配置的消息隊列名字buyRecordQueue與交換機的名字myExchange需要和QuickBuyDemo項目中的定義保持一致。
package prj;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig{
//定義含主題的消息隊列
@Bean
public Queue objectQueue() {
return new Queue("buyRecordQueue");
}
//定義交換機
TopicExchange myExchange() {
return new TopicExchange("myExchange");
Binding bindingObjectQueue(Queue objectQueue,TopicExchange exchange) {
return BindingBuilder.bind(objectQueue).to(exchange).with("buyRecordQueue");
}監(jiān)聽消息隊列并保存秒殺結果
在本項目的QuickBuySevivce.java文件中將會監(jiān)聽buyRecordQueue消息隊列,并把秒殺結果存入MySOL數(shù)據(jù)表,具體代碼如下:
package prj.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import prj.model.buyrecord;
import prj.repo.BuyRecordRepo;
@Component
@RabbitListener(queues = "buyRecordQueue")
public class QuickBuyService {
@Autowired
private AmqpTemplate amqpTemplate;
private BuyRecordRepo buyRecordRepo;
@RabbitHandler
public void saveBuyRecord(buyrecord record){
buyRecordRepo.save(record);
}
}在本類的第10行通過@RabbitListener注解說明將要監(jiān)聽buyRecordQueue消息隊列,當該消息隊列有消息時,會觸發(fā)本類第17行的saveBuyRecord方法,該方法被第16行的@RabbitHandler注解所修飾。在該方法中會調用JPA類buyRecordRepo的save方法向數(shù)據(jù)表中保存秒殺結果。
QuickBuyServce類中用到的模型類buyrecord和QuickBuyDemo項目中的很相似,由于該類需要通過消息隊列在網(wǎng)絡中傳輸,因此需要像第9行那樣實現(xiàn)Serializable接口。
package prj.model;
import java.io.Serializable;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table(name="buyrecord")
public class buyrecord implements Serializable {
@Id
@Column(name = "person")
private String person;
@Column(name = "item")
private String item;
public void setItem(String item) {
this.item = item;
}
public void setPerson(String person) {
this.person = person;
public String getItem() {
return item;
public String getPerson() {
return person;
}全鏈路效果演示
開發(fā)好上述兩個項目以后,可以用對如下步驟觀察全鏈路的秒殺效果:
- 啟動RabbitMQ、Redis服務器和客戶端,通過
set Computer 10命令緩存秒殺商品的數(shù)量,同時通過運行啟動類 - 啟動QuickBuyDemo項目。
啟動DBHandlerPrj項目
在QuickBuyDemo項日中開發(fā)如下的QuickBuyThread.java文件,在其中用多線程的方式模擬多個秒殺情求,代碼如下:
package prj.client;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
class QuickBuyThread extends Thread{
public void run() {
RestTemplate restTemplate = new RestTemplate();
String user = Thread.currentThread().getName();
ResponseEntity<String> entity = restTemplate.
getForEntity("http://localhost:8080/quickBuy/Computer/"+user , String.class);
System.out.println(entity.getBody());
}
}
public class MockQuickBuy {
public static void main(String[] args){
for (int i = 0; i < 15; i++) {
new QuickBuyThread().start();
}第4行定義的QuickBuyThread類以繼承Thread類的方式實現(xiàn)了線程的效果,在第5行線程的run方法中用restTemplate.getForEntity方法模擬發(fā)送了秒殺的請求,其中用當前線程的名字作為發(fā)起秒殺的用戶。
public class MockQuickBuy {
public static void main(String[] args){
for (int i = 0; i < 15; i++) {
new QuickBuyThread().start();
}
}
}在第12行MockQuickBuy類的main方法中,通過第14行的for循環(huán)啟動了15個線程發(fā)起秒殺請求。由于之前在Redis緩存中設置的Computer商品數(shù)量是10個,因此會有10個請求秒殺成功。5個請求不成功。如下輸出語句能確認這一結果。
此外,如果再到 MySQL數(shù)據(jù)庫用select from QuickBuy.buyrecord語句觀察秒殺結果,能看到成功秒殺的用戶,這些用戶名和上述輸出結果中的用戶名完全一致。
本文來自于《Spring Boot+Vue.js+分布式組件全棧開發(fā)訓練營(視頻教學版)》第17章
到此這篇關于Spring+Redis+RabbitMQ限流和秒殺項目的開發(fā)的文章就介紹到這了,更多相關Spring Redis RabbitMQ限流秒殺內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Redis數(shù)據(jù)遷移RedisShake的實現(xiàn)方法
本文主要介紹了Redis數(shù)據(jù)遷移RedisShake的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-04-04
Redis Subscribe timeout 報錯的問題解決
最近系統(tǒng)偶爾報出org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)的錯誤,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2025-08-08
Redis五大基本數(shù)據(jù)類型及對應使用場景總結
Redis有五種基本數(shù)據(jù)類型,分別是字符串(String)、哈希(Hash)、列表(List)、集合(Set)和有序集合(Sorted?Set),這些基本數(shù)據(jù)類型使得Redis具備了豐富的數(shù)據(jù)結構和功能,適用于各種不同的應用場景,本文就給大家詳細的介紹一下這五大類型2023-08-08

