redis使用zset實現(xiàn)延時隊列的示例代碼
最近在使用redis時,就想能不能用其實現(xiàn)消息隊列?也在網(wǎng)上看了下其他小伙伴寫的實現(xiàn),結合自身業(yè)務實現(xiàn)了如下消息隊列,希望對大家有用。
廢話不多說,直接開擼。
1、為什么zset可以做消息隊列?
首先我們來看下,設計消息隊列需要考慮的需求:有序性,消息重復性,可靠性。
- 有序性:zset所有元素可以根據(jù)成員關聯(lián)的score來進行從低到高的排序,例如,我們可以利用時間戳來進行排序
- 消息重復性:在zset中每個元素都是唯一的,這也保證了消息的唯一性
- 可靠性:zset會自動維護元素之間的順序,在添加或刪除元素時無需手動排序,提升操作速度。
2、使用的zset命令
| 命令 | 描述 |
|---|---|
| zadd | 將一個給定score的成員添加到有序集合中,返回添加元素的個數(shù) |
| zrange | 根據(jù)元素在有序排序中的位置,從有序集合中獲取多個元素 |
| rank(K key, Object o) | 獲取指定元素在集合中的索引,索引從0開始 |
3、代碼實現(xiàn)
使用zset實現(xiàn)消息隊列時,具體的流程,如下:

生產(chǎn)者流程:
- 用戶獲取消息Id,并封裝消息體
- 用戶發(fā)送數(shù)據(jù)到生產(chǎn)者,先獲取鎖
- 如果獲取到鎖,則校驗該消息體是否已添加到隊列中,已添加則直接返回提醒。
- 若未添加則調(diào)用方法將數(shù)據(jù)保存到zset集合中,否則等到指定時間后再獲取鎖。
- 推送數(shù)據(jù)后,釋放鎖
消費者流程:
- 調(diào)用方法獲取數(shù)據(jù)
- 獲取到數(shù)據(jù),則直接返回,否則到指定時間后再次獲取數(shù)據(jù),直到獲取到數(shù)據(jù)并返回。
統(tǒng)一返回類:
/**
* @Author: jiangjs
* @Description:
* @Date: 2021/11/12 15:46
**/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResultUtil<T> implements Serializable {
private int code;
private String msg;
private T data;
public static <T> ResultUtil<T> success(){
return ResultUtil.<T>builder().code(1000).msg("成功").build();
}
public static <T> ResultUtil<T> success(T data){
return ResultUtil.<T>builder().code(1000).msg("成功").data(data).build();
}
public static <T> ResultUtil<T> error(String msg){
return ResultUtil.<T>builder().code(5000).msg(msg).data(null).build();
}
public static <T> ResultUtil<T> error(int code,String msg){
return ResultUtil.<T>builder().code(code).msg(msg).build();
}
}3.1 消息實體
需添加消息Id,主要防止消息重復提交。
/**
* @author: jiangjs
* @description: 消息實體
* @date: 2023/5/30 11:11
**/
@Data
@Accessors(chain = true)
public class QueueTask<T> {
/**
* 消息Id
*/
private String taskId;
/**
* 任務
*/
private T task;
}3.2 隊列類型
隊列類型可以理解為隊列的名稱,通過枚舉,可以隨意添加隊列名稱。
/**
* @author: jiangjs
* @description: 隊列類型
* @date: 2023/5/30 10:53
**/
public enum QueueTypeEnum {
/**
* 訂單
*/
ORDER("order");
private final String type;
QueueTypeEnum(String type){
this.type = type;
}
public String getType(){
return type;
}
}3.3 創(chuàng)建消息工具
package com.jiashn.springbootproject.redis.utils;
import com.jiashn.springbootproject.redis.domain.QueueTask;
import com.jiashn.springbootproject.utils.ResultUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author: jiangjs
* @description: redis實現(xiàn)消息隊列
* @date: 2023/5/30 10:51
**/
public class RedisQueueUtil<T> {
private static final Logger log = LoggerFactory.getLogger(RedisQueueUtil.class);
private RedisTemplate<String,QueueTask<T>> redisTemplate;
/**
* 隊列類型,即名稱
*/
private final QueueTypeEnum typeEnum;
public RedisQueueUtil(QueueTypeEnum typeEnum,RedisTemplate<String,QueueTask<T>> redisTemplate){
this.typeEnum = typeEnum;
this.redisTemplate = redisTemplate;
}
/**
* 添加消息數(shù)據(jù)
* @param queueTask 消息
* @param time 延遲時間,單位s
*/
public ResultUtil<String> sendQueueTask(QueueTask<T> queueTask, long time){
//加鎖
if (getLock()){
try {
Long rank = redisTemplate.opsForZSet().rank(typeEnum.getType(), queueTask);
if (Objects.nonNull(rank)){
return ResultUtil.error(6000,"消息數(shù)據(jù)已經(jīng)存在,不予添加......");
}
Boolean result = redisTemplate.opsForZSet().add(typeEnum.getType(), queueTask, System.currentTimeMillis() + time*1000);
if (Objects.nonNull(result) && result){
log.info("添加消息數(shù)據(jù)成功:" + queueTask + ",添加時間:" + LocalDateTime.now());
return ResultUtil.success("添加消息數(shù)據(jù)成功");
}
return ResultUtil.error("添加消息數(shù)據(jù)失敗");
}finally {
//釋放鎖
releaseLock();
}
} else {
log.info("未獲取到鎖,稍后再試");
return ResultUtil.error("未獲取到鎖,稍后再試");
}
}
/**
* 獲取zset前count數(shù)據(jù)
* @param count 數(shù)據(jù)數(shù)
* @return 返回獲取到數(shù)據(jù)
*/
public Set<QueueTask<T>> loopGetTask(int count) {
//rangeByScore,根據(jù)score順序獲取zset數(shù)據(jù)的值
return redisTemplate.opsForZSet().rangeByScore(typeEnum.getType(), 0, System.currentTimeMillis(), 0, count-1);
}
/**
* 注銷消息隊列
* @param typeEnum 消息隊列名稱
*/
public void destroy(QueueTypeEnum typeEnum){
redisTemplate.opsForZSet().remove(typeEnum.getType());
}
/**
* 獲取任務Id
* @return 返回消息Id
*/
public String getTaskId(){
return typeEnum.getType() + "_" + UUID.randomUUID().toString().replace("-","");
}
/**
* 獲取鎖
* @return 返回加鎖狀態(tài)
*/
private boolean getLock(){
Boolean absent = redisTemplate.opsForValue().setIfAbsent(typeEnum.getType() + "_Locked", null, 30L, TimeUnit.MINUTES);
return Objects.nonNull(absent) ? absent : false;
}
/**
* 釋放鎖
*/
public void releaseLock(){
redisTemplate.delete(typeEnum.getType() + "_Locked");
}
}在消息工具類中,創(chuàng)建消息任務時添加了鎖,只有在獲取鎖的前提下才能添加消息任務。
提供獲取消息Id的方法是為了讓提交消息任務前,先獲取Id,即使在提交時網(wǎng)絡發(fā)生問題,提交的Id還是同一個,再進行消息消費時,可以根據(jù)這個Id來進行判斷該消息任務是否已被消費,被消費則直接丟棄。
3.4 消費消息
/**
* @author: jiangjs
* @description: 啟動消費
* @date: 2023/5/30 14:27
**/
@Component
public class CustomerTaskLineRunner implements CommandLineRunner {
@Resource
private RedisTemplate<String,QueueTask<String>> redisTemplate;
private final static String QUEUE_TYPE = QueueTypeEnum.ORDER.getType();
private final static Logger log = LoggerFactory.getLogger(CustomerTaskLineRunner.class);
@Override
public void run(String... args) throws Exception {
RedisQueueUtil<String> queueUtil = new RedisQueueUtil<>(QueueTypeEnum.ORDER,redisTemplate);
while (true){
Set<QueueTask<String>> queueTasks = queueUtil.loopGetTask(10);
if (CollectionUtils.isNotEmpty(queueTasks)){
for (QueueTask<String> queueTask : queueTasks) {
//校驗當前消息是否已消費,主要防止網(wǎng)絡延時,導致多次提交同一任務 存在
QueueTask<String> stringQueueTask = redisTemplate.opsForValue().get(QUEUE_TYPE + "_" + queueTask.getTaskId());
if (Objects.nonNull(stringQueueTask)){
log.info("該任務已經(jīng)消費,不能重復消費");
redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
continue;
}
Long removeNum = redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
if (Objects.nonNull(removeNum) && removeNum > 0){
String task = queueTask.getTask();
log.info("消費任務數(shù)據(jù):" + task);
//設置過期時間,10分鐘內(nèi)則默認是重復提交
redisTemplate.opsForValue().set(QUEUE_TYPE + "_" + queueTask.getTaskId(),queueTask,10L, TimeUnit.MINUTES);
}
}
}
log.info("------1分鐘后再次獲取------");
Thread.sleep(60000);
}
}
}校驗重復消息,若消息重復且在10分鐘內(nèi)未被消費,則直接將該消息從隊列中刪除。在消息任務被消費后,將數(shù)據(jù)從隊列中移除。
執(zhí)行結果:

到此這篇關于redis使用zset實現(xiàn)延時隊列的示例代碼的文章就介紹到這了,更多相關redis zset延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Redis特殊數(shù)據(jù)類型Geospatial地理空間
這篇文章主要為大家介紹了Redis特殊數(shù)據(jù)類型Geospatial地理空間,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05

