解讀Redis秒殺優(yōu)化方案(阻塞隊(duì)列+基于Stream流的消息隊(duì)列)
Redis秒殺優(yōu)化方案(阻塞隊(duì)列+Stream流的消息隊(duì)列)
下面是我們的秒殺流程:

對(duì)于正常的秒殺處理,我們需要多次查詢(xún)數(shù)據(jù)庫(kù),會(huì)給數(shù)據(jù)庫(kù)造成相當(dāng)大的壓力,這個(gè)時(shí)候我們需要加入緩存,進(jìn)而緩解數(shù)據(jù)庫(kù)壓力。
在上面的圖示中,我們可以將一條流水線的任務(wù)拆成兩條流水線來(lái)做,如果我們直接將判斷秒殺庫(kù)存與校驗(yàn)一人一單放在流水線A上,剩下的放在另一條流水線B,那么如果流水線A就可以相當(dāng)于服務(wù)員直接判斷是否符合資格,如果符合資格那么直接生成信息給另一條流水線B去處理業(yè)務(wù),這里的流水線就是咱們的線程,而流水線A也是基于數(shù)據(jù)庫(kù)進(jìn)行查詢(xún),也會(huì)壓力數(shù)據(jù)庫(kù),那么這種情況我們就可以將待查詢(xún)信息保存在Redis緩存中。
但是我們不能再流水線A判斷完成后去直接調(diào)用流水線B,這樣的效率是大打折扣的,這種情況我們需要開(kāi)啟獨(dú)立線程去執(zhí)行流水線B的操作,如何知道給哪個(gè)用戶(hù)創(chuàng)建訂單呢?這個(gè)時(shí)候就要流水線A在判斷成功后去生成信息給獨(dú)立線程。
最后的業(yè)務(wù)就變成,用戶(hù)直接訪問(wèn)流水線A,通過(guò)流水線A去判斷,如果通過(guò)則生成信息給流水線B去創(chuàng)建訂單,過(guò)程如下圖:

那么什么樣的數(shù)據(jù)結(jié)構(gòu)滿足下面條件:
- ① 一個(gè)key能夠保存很多值
- ②唯一性:一人一單需要保證用戶(hù)id不能重復(fù)。
所以我們需要使用set:

那么如何判斷校驗(yàn)用戶(hù)的購(gòu)買(mǎi)資格呢?

而上述判斷需要保證原子性,所以我們需要使用Lua腳本進(jìn)行編寫(xiě):

local voucherId = ARGV[1]; -- 優(yōu)惠劵id
local userId = ARGV[2]; -- 用戶(hù)id
-- 庫(kù)存key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 訂單key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 判斷庫(kù)存是否充足
if(tonumber(redis.call('get',stockKey) <= 0)) then
-- 庫(kù)存不足,返回1
return 1;
end;
-- 判斷用戶(hù)是否下單
if(redis.call('sismember',orderKey,userId)) then
-- 存在,說(shuō)明重復(fù)下單,返回2
return 2;
end
-- 扣減庫(kù)存 incrby stockKey -1
redis.call('incrby',stockKey,-1);
-- 下單(保存用戶(hù)) sadd orderKey userId
redis.call('sadd',orderKey,userId);
return 0;之后我們按照下面步驟來(lái)實(shí)現(xiàn)代碼:

在方法體內(nèi)執(zhí)行Lua腳本來(lái)原子性判斷,然后判斷是否能夠處理并傳入阻塞隊(duì)列:
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型內(nèi)填入返回值類(lèi)型
static { // 靜態(tài)屬性要使用靜態(tài)代碼塊進(jìn)行初始化
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setResultType(Long.class);
SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
}
public Result seckillVoucherMax(Long voucherId) {
// 獲取用戶(hù)信息
Long userId = UserHolder.getUser().getId();
// 1.執(zhí)行Lua腳本來(lái)判斷用戶(hù)資格
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), // Lua無(wú)需接受key
voucherId.toString(),
userId.toString()
);
// 2.判斷結(jié)果是否為0
int r = result.intValue();
if(r != 0) {
// 不為0代表無(wú)資格購(gòu)買(mǎi)
return Result.fail(r == 1 ? "庫(kù)存不足" : "不能重復(fù)下單");
}
// 3.有購(gòu)買(mǎi)資格則將下單信息保存到阻塞隊(duì)列中
// ...
return Result.ok();
}
}接下來(lái)我們創(chuàng)建阻塞隊(duì)列,線程池以及線程方法,隨后使用Springboot提供的注解在@PostConstruct去給線程池傳入線程方法:
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型內(nèi)填入返回值類(lèi)型
static { // 靜態(tài)屬性要使用靜態(tài)代碼塊進(jìn)行初始化
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setResultType(Long.class);
SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
}
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 創(chuàng)建阻塞隊(duì)列
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); // 創(chuàng)建線程池
// 讓大類(lèi)在開(kāi)始初始化時(shí)就能夠執(zhí)行線程任務(wù)
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
}
// 創(chuàng)建線程任務(wù)
private class VoucherOrderTask implements Runnable {
@Override
public void run() {
while(true){
try {
// 獲取隊(duì)列中的訂單信息
VoucherOrder voucherOrder = orderTasks.take();// 取出頭部信息
// 創(chuàng)建訂單
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("處理訂單異常",e);
}
}
}
}
// 創(chuàng)建訂單
private void handleVoucherOrder(VoucherOrder voucherOrder) {
RLock lock = redissonClient.getLock("lock:order:" + voucherOrder.getUserId().toString());
boolean isLock = lock.tryLock();
// 判斷是否獲取鎖成功
if (!isLock) {
// 獲取鎖失敗,返回錯(cuò)誤或重試
log.error("不允許重復(fù)下單");
return ;
}
try {
proxy.createVoucherOrderMax(voucherOrder);
} finally {
lock.unlock();
}
}
@Override
public void createVoucherOrderMax(VoucherOrder voucherOrder) {
// 一人一單
Long userId = voucherOrder.getUserId();
// 查詢(xún)訂單
int count = query().eq("user_id",userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 判斷是否存在
if(count > 0){
// 用戶(hù)已經(jīng)購(gòu)買(mǎi)過(guò)
log.error("用戶(hù)已經(jīng)購(gòu)買(mǎi)過(guò)");
return ;
}
// CAS改進(jìn):將庫(kù)存判斷改成stock > 0以此來(lái)提高性能
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).eq("stock",0) // where id = ? and stock > 0
.update();
if (!success) {
//扣減庫(kù)存
log.error("庫(kù)存不足!");
return ;
}
//6.創(chuàng)建訂單
save(voucherOrder);
}
private IVoucherOrderService proxy; // 代理對(duì)象
public Result seckillVoucherMax(Long voucherId) {
// 獲取用戶(hù)信息
Long userId = UserHolder.getUser().getId();
// 1.執(zhí)行Lua腳本來(lái)判斷用戶(hù)資格
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(), // Lua無(wú)需接受key
voucherId.toString(),
userId.toString()
);
// 2.判斷結(jié)果是否為0
int r = result.intValue();
if(r != 0) {
// 不為0代表無(wú)資格購(gòu)買(mǎi)
return Result.fail(r == 1 ? "庫(kù)存不足" : "不能重復(fù)下單");
}
// 3.有購(gòu)買(mǎi)資格則將下單信息保存到阻塞隊(duì)列中
Long orderId = redisIdWorker.nextId("order");
// 創(chuàng)建訂單
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放入阻塞隊(duì)列
orderTasks.add(voucherOrder);
// 4.獲取代理對(duì)象(線程異步執(zhí)行,需要手動(dòng)在方法內(nèi)獲取)
proxy = (IVoucherOrderService)AopContext.currentProxy(); // 獲取當(dāng)前類(lèi)的代理對(duì)象 (需要引入aspectjweaver依賴(lài),并且在實(shí)現(xiàn)類(lèi)加入@EnableAspectJAutoProxy(exposeProxy = true)以此來(lái)暴露代理對(duì)象)
return Result.ok();
}
}在上面代碼中,我們使用下面代碼創(chuàng)建了一個(gè)單線程的線程池。它保證所有提交的任務(wù)都按照提交的順序執(zhí)行,每次只有一個(gè)線程在工作。
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
下面代碼是一個(gè)常見(jiàn)的阻塞隊(duì)列實(shí)現(xiàn),具有固定大小(在這里是 1024 * 1024),它的作用是緩沖和排隊(duì)任務(wù)。ArrayBlockingQueue 是一個(gè)線程安全的隊(duì)列,它會(huì)自動(dòng)處理線程之間的同步問(wèn)題。當(dāng)隊(duì)列滿時(shí),調(diào)用 put() 方法的線程會(huì)被阻塞,直到隊(duì)列有空間;當(dāng)隊(duì)列為空時(shí),調(diào)用 take() 方法的線程會(huì)被阻塞,直到隊(duì)列中有數(shù)據(jù)。
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
在下面代碼中,orderTasks 阻塞隊(duì)列用于存放需要處理的訂單對(duì)象,每個(gè)訂單的處理邏輯都由 VoucherOrderTask 線程池中的線程異步執(zhí)行:
VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder);
之后我們需要調(diào)用 Runnable 接口去實(shí)現(xiàn)VoucherOrderTask類(lèi)以此來(lái)創(chuàng)建線程方法:
private class VoucherOrderTask implements Runnable {
@Override
public void run() {
while (true) {
try {
// 獲取隊(duì)列中的訂單信息
VoucherOrder voucherOrder = orderTasks.take(); // 獲取訂單
// 創(chuàng)建訂單
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("處理訂單異常", e);
}
}
}
}隨后將線程方法通過(guò) submit() 方法將 VoucherOrderTask 提交到線程池中,這個(gè)任務(wù)是一個(gè)無(wú)限循環(huán)的任務(wù),它會(huì)不斷從阻塞隊(duì)列中取出訂單并處理,直到線程池關(guān)閉。
這種方式使得訂單處理任務(wù)可以異步執(zhí)行,而不阻塞主線程,提高了系統(tǒng)的響應(yīng)能力:
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
}但是在高并發(fā)的情況下就會(huì)產(chǎn)生大量訂單,就會(huì)超出JVM阻塞隊(duì)列的上線,并且每當(dāng)服務(wù)重啟或者宕機(jī)的情況發(fā)生,阻塞隊(duì)列的所有訂單任務(wù)就都會(huì)丟失。
所以為了解決這種情況,我們就要使用消息隊(duì)列去解決這個(gè)問(wèn)題:
什么是消息隊(duì)列?
消息隊(duì)列(Message Queue, MQ)是一種用于在應(yīng)用程序之間傳遞消息的通信方式。它允許應(yīng)用程序通過(guò)發(fā)送和接收消息來(lái)解耦,從而提高系統(tǒng)的可擴(kuò)展性、可靠性和靈活性。消息隊(duì)列通常用于異步通信、任務(wù)隊(duì)列、事件驅(qū)動(dòng)架構(gòu)等場(chǎng)景。
消息隊(duì)列的核心概念 :
- 生產(chǎn)者(Producer):發(fā)送消息到消息隊(duì)列的應(yīng)用程序。
- 消費(fèi)者(Consumer):從消息隊(duì)列中接收并處理消息的應(yīng)用程序。
- 隊(duì)列(Queue):消息的存儲(chǔ)區(qū)域,生產(chǎn)者將消息發(fā)送到隊(duì)列,消費(fèi)者從隊(duì)列中獲取消息。
- 消息(Message):在生產(chǎn)者與消費(fèi)者之間傳遞的數(shù)據(jù)單元。
- Broker:消息隊(duì)列的服務(wù)器,負(fù)責(zé)接收、存儲(chǔ)和轉(zhuǎn)發(fā)消息。

消息隊(duì)列是在JVM以外的一個(gè)獨(dú)立的服務(wù),能夠不受JVM內(nèi)存的限制,并且存入MQ的信息都可以做持久化存儲(chǔ)。
詳細(xì)教學(xué)可以查詢(xún)下面鏈接:微服務(wù)架構(gòu) --- 使用RabbitMQ進(jìn)行異步處理
但是這樣的方式是需要額外提供服務(wù)的,所以我們可以使用Redis提供的三種不同的方式來(lái)實(shí)現(xiàn)消息隊(duì)列:
- List 結(jié)構(gòu)實(shí)現(xiàn)消息隊(duì)列
- Pub/Sub(發(fā)布/訂閱)模式
- Stream 結(jié)構(gòu)(Redis 5.0 及以上版本)(推薦使用)(詳細(xì)介紹)
使用 List 結(jié)構(gòu)實(shí)現(xiàn)消息隊(duì)列:
Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,支持從頭部或尾部插入和彈出元素。我們可以利用 LPUSH 和 BRPOP 命令實(shí)現(xiàn)一個(gè)簡(jiǎn)單的消息隊(duì)列。
實(shí)現(xiàn)步驟:
- 生產(chǎn)者:使用
LPUSH將消息推入隊(duì)列。 - 消費(fèi)者:使用
BRPOP阻塞地從隊(duì)列中獲取消息。
生產(chǎn)者代碼:
import redis.clients.jedis.Jedis;
public class ListProducer {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
String queueName = "myQueue";
// 發(fā)送消息
for (int i = 1; i <= 5; i++) {
String message = "Message " + i;
jedis.lpush(queueName, message); // 將消息推入隊(duì)列
System.out.println("Sent: " + message);
}
jedis.close(); // 關(guān)閉連接
}
}消費(fèi)者代碼:
import redis.clients.jedis.Jedis;
public class ListConsumer {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
String queueName = "myQueue";
while (true) {
// 阻塞獲取消息,超時(shí)時(shí)間為 0(無(wú)限等待)
var result = jedis.brpop(0, queueName);
String message = result.get(1); // 獲取消息內(nèi)容
System.out.println("Received: " + message);
}
}
}- 優(yōu)點(diǎn):簡(jiǎn)單易用,適合輕量級(jí)場(chǎng)景。
- 缺點(diǎn):不支持消息確認(rèn)機(jī)制,消息一旦被消費(fèi)(從隊(duì)列內(nèi)取出)就會(huì)從隊(duì)列中刪除。并且只支持單消費(fèi)者(一個(gè)消息只能拿出一次)
使用 Pub/Sub 模式實(shí)現(xiàn)消息隊(duì)列:
Redis 的 Pub/Sub 模式是一種發(fā)布-訂閱模型,生產(chǎn)者將消息發(fā)布到頻道,消費(fèi)者訂閱頻道以接收消息。
實(shí)現(xiàn)步驟:
- 生產(chǎn)者:使用
PUBLISH命令向頻道發(fā)布消息。 - 消費(fèi)者:使用
SUBSCRIBE命令訂閱頻道。
生產(chǎn)者代碼:
import redis.clients.jedis.Jedis;
public class PubSubProducer {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
String channelName = "myChannel";
// 發(fā)布消息
for (int i = 1; i <= 5; i++) {
String message = "Message " + i;
jedis.publish(channelName, message); // 發(fā)布消息到頻道
System.out.println("Published: " + message);
}
jedis.close(); // 關(guān)閉連接
}
}消費(fèi)者代碼:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PubSubConsumer {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
String channelName = "myChannel";
// 創(chuàng)建訂閱者
JedisPubSub subscriber = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("Received: " + message);
}
};
// 訂閱頻道
jedis.subscribe(subscriber, channelName);
}
}- 優(yōu)點(diǎn):支持一對(duì)多的消息廣播。
- 缺點(diǎn):消息是即時(shí)的,如果消費(fèi)者不在線,消息會(huì)丟失。
但是上面兩方式都是有缺點(diǎn)的:
- 不支持消息確認(rèn)機(jī)制,消息一旦被消費(fèi)(從隊(duì)列內(nèi)取出)就會(huì)從隊(duì)列中刪除。并且只支持單消費(fèi)者(一個(gè)消息只能拿出一次)
- 消息是即時(shí)的,如果消費(fèi)者不在線,消息會(huì)丟失。
所以根據(jù)上面的兩種方式,我們推出一款全新的方式 ->
使用 Stream 結(jié)構(gòu)實(shí)現(xiàn)消息隊(duì)列:
Redis Stream 是一種強(qiáng)大的數(shù)據(jù)結(jié)構(gòu),用于管理消息流。它將消息存儲(chǔ)在 Redis 中,并允許消費(fèi)者按順序獲取消息。Stream 具有以下特點(diǎn):
- 有序消息:消息按插入順序排列。
- 消費(fèi)者組:一個(gè)消費(fèi)者組可以有多個(gè)消費(fèi)者,每個(gè)消費(fèi)者可以獨(dú)立消費(fèi)不同的消息。
- 消息 ID:每條消息都有唯一的 ID(如:
1588890470850-0),ID 按時(shí)間戳生成。 - 自動(dòng)分配消息:多個(gè)消費(fèi)者可以從 Stream 中并行消費(fèi)消息,保證消息不會(huì)重復(fù)消費(fèi)。



在 Redis Stream 中,一個(gè)隊(duì)列可以有多個(gè)消費(fèi)者組,每個(gè)消費(fèi)者組可以獨(dú)立地消費(fèi)隊(duì)列中的消息。每個(gè)消費(fèi)者組內(nèi)有多個(gè)消費(fèi)者,而消費(fèi)者是基于 消費(fèi)者名稱(chēng) 進(jìn)行識(shí)別的。
消費(fèi)者組的工作方式
- 每個(gè)消費(fèi)者組擁有自己的 消費(fèi)進(jìn)度,也就是每個(gè)消費(fèi)者組會(huì)從 自己獨(dú)立的消息 ID 開(kāi)始消費(fèi)。
- 多個(gè)消費(fèi)者組之間是相互獨(dú)立的,即使它們消費(fèi)的是同一個(gè)隊(duì)列,它們也可以從不同的位置開(kāi)始消費(fèi)隊(duì)列中的消息。
- 每個(gè)消費(fèi)者組都可以有多個(gè) 消費(fèi)者(在同一個(gè)組內(nèi),多個(gè)消費(fèi)者可以并行消費(fèi)同一個(gè)隊(duì)列的消息,但每個(gè)消息在消費(fèi)者組內(nèi)只能被一個(gè)消費(fèi)者處理一次)。
假設(shè)有一個(gè)隊(duì)列(Stream)mystream,可以為它創(chuàng)建多個(gè)消費(fèi)者組:
XGROUP CREATE mystream group1 $ MKSTREAM XGROUP CREATE mystream group2 $ MKSTREAM
這樣,mystream 隊(duì)列上就有了兩個(gè)消費(fèi)者組:group1 和 group2。每個(gè)消費(fèi)者組可以有自己的消費(fèi)者并從該隊(duì)列中讀取消息。此時(shí),group1 和 group2 都在消費(fèi)同一個(gè)隊(duì)列 mystream,但它們的消費(fèi)進(jìn)度是獨(dú)立的,它們各自有自己的消息 ID 記錄。
每個(gè)消費(fèi)者組可以有多個(gè)消費(fèi)者,而每個(gè)消費(fèi)者通過(guò)一個(gè) 唯一的消費(fèi)者名稱(chēng) 來(lái)標(biāo)識(shí)。
每個(gè)消費(fèi)者組有獨(dú)立的消費(fèi)進(jìn)度
每個(gè)消費(fèi)者組會(huì)記錄自己的消費(fèi)進(jìn)度,也就是它消費(fèi)到隊(duì)列中的 哪個(gè)消息 ID。即使多個(gè)消費(fèi)者組在消費(fèi)同一個(gè)消息隊(duì)列,它們每個(gè)組都會(huì)從 不同的消費(fèi)位置(消息 ID)開(kāi)始讀取消息。
例如,假設(shè)有一個(gè)隊(duì)列 mystream,同時(shí)有兩個(gè)消費(fèi)者組 group1 和 group2,它們都從 mystream 隊(duì)列中讀取消息:
group1從mystream隊(duì)列中的消息id1開(kāi)始消費(fèi),group1的進(jìn)度會(huì)記錄在 Redis 中。group2從mystream隊(duì)列中的消息id2開(kāi)始消費(fèi),group2的進(jìn)度也會(huì)記錄在 Redis 中。
消費(fèi)進(jìn)度互不干擾,即便 group1 和 group2 都在消費(fèi) mystream 隊(duì)列,它們的消費(fèi)位置是獨(dú)立的。
消費(fèi)者組內(nèi)部的消息消費(fèi)
一個(gè)消費(fèi)者組內(nèi)的消費(fèi)者會(huì) 共享 組內(nèi)的消息。即使有多個(gè)消費(fèi)者,每條消息 在消費(fèi)者組內(nèi)部只會(huì)被 一個(gè)消費(fèi)者 消費(fèi)。消費(fèi)者之間會(huì)并行處理消息,但每條消息只會(huì)被一個(gè)消費(fèi)者處理。
舉個(gè)例子:假設(shè) group1 中有三個(gè)消費(fèi)者 consumer1、consumer2、consumer3,如果隊(duì)列 mystream 有 6 條消息,那么它們會(huì)如下消費(fèi):
consumer1處理消息1、2consumer2處理消息3、4consumer3處理消息5、6
但對(duì)于消費(fèi)者組 group2,如果它有自己的消費(fèi)者,group2 內(nèi)的消費(fèi)者也會(huì)并行消費(fèi) mystream 中的消息,而 group1 和 group2 之間沒(méi)有直接關(guān)系。
首先初始化一個(gè)消息隊(duì)列:
在項(xiàng)目啟動(dòng)時(shí),確保 Redis 中存在對(duì)應(yīng)的 Stream 和消費(fèi)者組??梢酝ㄟ^(guò)程序在啟動(dòng)時(shí)檢查并創(chuàng)建(如果不存在的話)。
@Configuration
public class RedisStreamConfig {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String STREAM_KEY = "mystream";
private static final String GROUP_NAME = "mygroup";
@PostConstruct
public void init() {
// 檢查消費(fèi)者組是否存在,若不存在則創(chuàng)建
try {
// 如果消費(fèi)者組不存在則會(huì)拋出異常,我們捕獲異常進(jìn)行創(chuàng)建
redisTemplate.opsForStream().groups(STREAM_KEY);
} catch (Exception e) {
// 創(chuàng)建消費(fèi)者組,起始位置為 $ 表示從末尾開(kāi)始消費(fèi)新消息
redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
}
}
}注意:
opsForStream().groups(STREAM_KEY):查詢(xún)消費(fèi)者組是否已存在。opsForStream().createGroup(STREAM_KEY, GROUP_NAME):如果沒(méi)有消費(fèi)者組,則創(chuàng)建一個(gè)新的組。
隨后我們生產(chǎn)者發(fā)送消息示例:
@Service
public class RedisStreamProducerService { // 定義生產(chǎn)者服務(wù)類(lèi) RedisStreamProducerService
private static final String STREAM_KEY = "mystream"; // 定義 Redis Stream 的名稱(chēng),這里指定隊(duì)列名為 "mystream"
@Autowired
private StringRedisTemplate redisTemplate;
public void sendMessage(String content) { // 定義一個(gè)方法,發(fā)送消息到 Redis Stream,參數(shù) content 是消息的內(nèi)容
Map<String, String> map = new HashMap<>(); // 創(chuàng)建一個(gè) Map 用來(lái)存儲(chǔ)消息內(nèi)容
map.put("content", content); // 將消息內(nèi)容添加到 Map 中,鍵是 "content",值是傳入的內(nèi)容
// 在消息隊(duì)列中添加消息,調(diào)用 StringRedisTemplate 的 opsForStream 方法
RecordId recordId = redisTemplate.opsForStream() // 獲取操作 Redis Stream 的操作對(duì)象
.add(StreamRecords.objectBacked(map) // 創(chuàng)建一個(gè) Stream 記錄,將 Map 轉(zhuǎn)化為對(duì)象記錄
.withStreamKey(STREAM_KEY)); // 設(shè)置該記錄屬于的 Stream(消息隊(duì)列)的名稱(chēng)
// 輸出記錄的 ID,表示消息已經(jīng)成功發(fā)送
System.out.println("消息發(fā)送成功,id: " + recordId.getValue()); // 打印消息的 ID,表明該消息已經(jīng)被成功加入到 Stream 中
}
}RecordId 是 Spring Data Redis 中的一個(gè)類(lèi),用來(lái)表示 消息的唯一標(biāo)識(shí)符。它對(duì)應(yīng) Redis Stream 中的 消息 ID,該 ID 是 Redis Stream 中每條消息的唯一標(biāo)識(shí)。Redis 中的消息 ID 通常是由時(shí)間戳和序號(hào)組成的(如 1588890470850-0)。
主要功能:
- 表示消息 ID:
RecordId是一個(gè)封裝類(lèi),表示 Redis Stream 中消息的 ID。 - 用于識(shí)別和操作消息:在消費(fèi)和確認(rèn)消息時(shí),
RecordId用來(lái)標(biāo)識(shí)每條消息的唯一性,并幫助 Redis 確定消息是否已經(jīng)被消費(fèi)。
使用場(chǎng)景:
RecordId 用來(lái)標(biāo)識(shí)從 Stream 中讀取到的消息,我們可以通過(guò) RecordId 來(lái)進(jìn)行消息的確認(rèn)、刪除或其他操作。
RecordId recordId = redisTemplate.opsForStream().add(StreamRecords.objectBacked(map).withStreamKey("mystream"));通過(guò) StreamRecords.objectBacked(map) 將 map 對(duì)象作為消息內(nèi)容,并用 add 方法將其寫(xiě)入 Stream。
在然后編寫(xiě)消費(fèi)者服務(wù):
使用 RedisTemplate 的 read 方法(底層執(zhí)行的是 XREADGROUP 命令)從消費(fèi)者組中拉取消息,并進(jìn)行處理。消費(fèi)者可以采用定時(shí)任務(wù)或后臺(tái)線程不斷輪詢(xún)。
@Slf4j
@Service
public class RedisStreamConsumerService {
private static final String STREAM_KEY = "mystream"; // Redis Stream 的名稱(chēng),這里指定隊(duì)列名為 "mystream"
private static final String GROUP_NAME = "mygroup"; // 消費(fèi)者組的名稱(chēng),多個(gè)消費(fèi)者可以通過(guò)組名共享消費(fèi)隊(duì)列
private static final String CONSUMER_NAME = "consumer-1"; // 消費(fèi)者的名稱(chēng),消費(fèi)者名稱(chēng)在同一消費(fèi)者組內(nèi)必須唯一
@Autowired
private StringRedisTemplate redisTemplate;
@PostConstruct // 使用該注解能讓方法在 Spring 完成依賴(lài)注入后自動(dòng)調(diào)用,用于初始化任務(wù)
@Async // 將該方法標(biāo)記為異步執(zhí)行,允許它在單獨(dú)的線程中運(yùn)行,不會(huì)阻塞主線程,@EnableAsync 需要在配置類(lèi)中啟用
public void start() { // 啟動(dòng)方法,在應(yīng)用啟動(dòng)時(shí)執(zhí)行
// 無(wú)限循環(huán),不斷從 Redis Stream 中讀取消息(可以改為定時(shí)任務(wù)等方式)
while (true) {
try {
// 設(shè)置 Stream 讀取的阻塞超時(shí),設(shè)置最多等待 2 秒
StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(2));
// 從指定的消費(fèi)者組中讀取消息,">" 表示只消費(fèi)未被消費(fèi)過(guò)的消息
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
Consumer.from(GROUP_NAME, CONSUMER_NAME), // 指定消費(fèi)者組和消費(fèi)者名稱(chēng)
options, // 設(shè)置讀取選項(xiàng),包含阻塞時(shí)間
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()) // 從最后消費(fèi)的消息開(kāi)始讀取
);
// 如果沒(méi)有消息,繼續(xù)循環(huán)讀取
if (messages == null || messages.isEmpty()) {
continue;
}
// 處理每一條讀取到的消息
for (MapRecord<String, Object, Object> message : messages) {
String messageId = message.getId(); // 獲取消息的唯一標(biāo)識(shí)符(ID)
Map<Object, Object> value = message.getValue(); // 獲取消息內(nèi)容(以 Map 形式存儲(chǔ))
log.info("接收到消息,id={},內(nèi)容={}", messageId, value); // 打印日志,記錄消息 ID 和內(nèi)容
// 在這里加入業(yè)務(wù)邏輯處理
// 例如處理消息并執(zhí)行相應(yīng)的操作
// ...
// 消息處理成功后,需要確認(rèn)消息已經(jīng)被消費(fèi)(通過(guò) XACK 命令)
redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId); // 確認(rèn)消費(fèi)的消息
}
} catch (Exception e) {
log.error("讀取 Redis Stream 消息異常", e); // 異常捕獲,記錄錯(cuò)誤日志
}
}
}
}MapRecord<String, Object, Object> 是 Spring Data Redis 用來(lái)表示 Redis Stream 中的 消息記錄 的類(lèi)。它不僅包含了消息的 ID,還包含了消息的內(nèi)容(即消息數(shù)據(jù))。
在 Redis 中,每條消息都存儲(chǔ)為一個(gè) key-value 對(duì)。
主要功能:
- 封裝消息 ID 和消息內(nèi)容:
MapRecord用來(lái)封裝消息的 ID 和消息的內(nèi)容。 - 消息的內(nèi)容:消息的內(nèi)容通常是一個(gè) 鍵值對(duì)(
Map<String, Object>),可以是任意對(duì)象的數(shù)據(jù)結(jié)構(gòu)(例如,JSON、Map 或其他序列化對(duì)象)。
字段:
getId():返回消息的 ID(RecordId類(lèi)型)。getValue():返回消息的內(nèi)容,以Map<Object, Object>的形式。
使用場(chǎng)景:
MapRecord 是用來(lái)表示從 Stream 中讀取到的消息,它將消息的 ID 和內(nèi)容(鍵值對(duì))封裝在一起。你可以使用 MapRecord 來(lái)獲取消息的 ID 和內(nèi)容并處理。
MapRecord<String, Object, Object> message = redisTemplate.opsForStream().read(Consumer.from("mygroup", "consumer1"), options, StreamOffset.create("mystream", ReadOffset.lastConsumed()));在這個(gè)例子中,message 是一個(gè) MapRecord 實(shí)例,它封裝了從 mystream 隊(duì)列中讀取到的消息。我們可以通過(guò) message.getId() 獲取消息 ID,通過(guò) message.getValue() 獲取消息內(nèi)容。
在消費(fèi)者中,我們使用 MapRecord<String, Object, Object> 來(lái)封裝消息,獲取 message.getId() 來(lái)獲取消息的 ID(RecordId),以及通過(guò) message.getValue() 獲取消息的內(nèi)容。 隨后在處理完消息后,調(diào)用 acknowledge() 來(lái)確認(rèn)消息已經(jīng)被消費(fèi)。
最后啟動(dòng)異步支持:
@SpringBootApplication
@EnableAsync // 啟動(dòng)異步支持
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}通過(guò)這種方式,Spring Data Redis 提供了高效且類(lèi)型安全的接口來(lái)操作 Redis Stream,幫助我們?cè)诜植际较到y(tǒng)中實(shí)現(xiàn)高效的消息隊(duì)列。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
redis數(shù)據(jù)一致性之延時(shí)雙刪策略詳解
在使用redis時(shí),需要保持redis和數(shù)據(jù)庫(kù)數(shù)據(jù)的一致性,最流行的解決方案之一就是延時(shí)雙刪策略,今天我們就來(lái)詳細(xì)刨析一下,需要的朋友可以參考下2023-09-09
Redis優(yōu)惠券秒殺企業(yè)實(shí)戰(zhàn)
本文主要介紹了Redis優(yōu)惠券秒殺企業(yè)實(shí)戰(zhàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
Redis實(shí)現(xiàn)訂單過(guò)期刪除的方法步驟
本文主要介紹了Redis實(shí)現(xiàn)訂單過(guò)期刪除的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06
深入解析Redis中常見(jiàn)的應(yīng)用場(chǎng)景
這篇文章主要給大家介紹了關(guān)于Redis中常見(jiàn)的應(yīng)用場(chǎng)景的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09
Win10下通過(guò)Ubuntu安裝Redis的過(guò)程
這篇文章主要介紹了Win10下通過(guò)Ubuntu安裝Redis,在安裝Ubuntu需要先打開(kāi)Windows功能,接著創(chuàng)建一個(gè)用戶(hù)及密碼,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04

