Java redisTemplate阻塞式處理消息隊(duì)列
Redis 消息隊(duì)列

redis五種數(shù)據(jù)結(jié)構(gòu)

隊(duì)列生產(chǎn)者
package cn.stylefeng.guns.knowledge.modular.knowledge.schedule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Random;
import java.util.UUID;
/**
* <p>
* 隊(duì)列生產(chǎn)者
* </p>
*
* @SINCE 2021/11/30 21:03
* @AUTHOR dispark
* @Date: 2021/11/30 21:03
*/
@Slf4j
public class QueueProducer implements Runnable {
/**
* 生產(chǎn)者隊(duì)列 key
*/
public static final String QUEUE_PRODUCTER = "queue-producter";
private RedisTemplate<String, Object> redisTemplate;
public QueueProducer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public void run() {
Random random = new Random();
while (true) {
try {
Thread.sleep(random.nextInt(600) + 600);
// 1.模擬生成一個(gè)任務(wù)
UUID queueProducerId = UUID.randomUUID();
// 2.將任務(wù)插入任務(wù)隊(duì)列:queue-producter
redisTemplate.opsForList().leftPush(QUEUE_PRODUCTER, queueProducerId.toString());
log.info("生產(chǎn)一條數(shù)據(jù) >>> {}", queueProducerId.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
隊(duì)列消費(fèi)者
package cn.stylefeng.guns.knowledge.modular.knowledge.schedule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Random;
/**
* <p>
* 隊(duì)列消費(fèi)者
* </p>
*
* @SINCE 2021/11/30 21:14
* @AUTHOR dispark
* @Date: 2021/11/30 21:14
*/
@Slf4j
public class QueueConsumer implements Runnable {
public static final String QUEUE_PRODUCTER = "queue-producter";
public static final String TMP_QUEUE = "tmp-queue";
private RedisTemplate<String, Object> redisTemplate;
public QueueConsumer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 功能描述: 取值 - <brpop:阻塞式> - 推薦使用
*
* @author dispark
* @date 2021/11/30 21:17
*/
@Override
public void run() {
Random random = new Random();
while (true) {
// 1.從任務(wù)隊(duì)列"queue-producter"中獲取一個(gè)任務(wù),并將該任務(wù)放入暫存隊(duì)列"tmp-queue"
Long ququeConsumerId = redisTemplate.opsForList().rightPush(QUEUE_PRODUCTER, TMP_QUEUE);
// 2.處理任務(wù)----純屬業(yè)務(wù)邏輯,模擬一下:睡覺(jué)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3.模擬成功和失敗的偶然現(xiàn)象,模擬失敗的情況,概率為2/13
if (random.nextInt(13) % 7 == 0) {
// 4.將本次處理失敗的任務(wù)從暫存隊(duì)列"tmp-queue"中,彈回任務(wù)隊(duì)列"queue-producter"
redisTemplate.opsForList().rightPush(TMP_QUEUE, QUEUE_PRODUCTER);
log.info(ququeConsumerId + "處理失敗,被彈回任務(wù)隊(duì)列");
} else {
// 5. 模擬成功的情況,將本次任務(wù)從暫存隊(duì)列"tmp-queue"中清除
redisTemplate.opsForList().rightPop(TMP_QUEUE);
log.info(ququeConsumerId + "處理成功,被清除");
}
}
}
}
測(cè)試類
@Test
public void QueueThreadTotalEntry() throws Exception {
// 1.啟動(dòng)一個(gè)生產(chǎn)者線程,模擬任務(wù)的產(chǎn)生
new Thread(new QueueProducer(redisTemplate)).start();
Thread.sleep(15000);
// 2.啟動(dòng)一個(gè)線程者線程,模擬任務(wù)的處理
new Thread(new QueueConsumer(redisTemplate)).start();
// 3.主線程
Thread.sleep(Long.MAX_VALUE);
}
并發(fā)情況下使用increment遞增
線程一:
Long increment = redisTemplate.opsForValue().increment("increment", 1L);
log.info("隊(duì)列消費(fèi)者 >> increment遞增: {}", increment);
線程二:
Long increment = redisTemplate.opsForValue().increment("increment", 1L);
log.info("生產(chǎn)者隊(duì)列 >> increment遞增: {}", increment);

補(bǔ)充
redisTemplate處理/獲取redis消息隊(duì)列
(參考代碼)
/**
* redis消息隊(duì)列
*/
@Component
public class RedisQueue {
@Autowired
private RedisTemplate redisTemplate;
/** ---------------------------------- redis消息隊(duì)列 ---------------------------------- */
/**
* 存值
* @param key 鍵
* @param value 值
* @return
*/
public boolean lpush(String key, Object value) {
try {
redisTemplate.opsForList().leftPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 取值 - <rpop:非阻塞式>
* @param key 鍵
* @return
*/
public Object rpop(String key) {
try {
return redisTemplate.opsForList().rightPop(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 取值 - <brpop:阻塞式> - 推薦使用
* @param key 鍵
* @param timeout 超時(shí)時(shí)間
* @param timeUnit 給定單元粒度的時(shí)間段
* TimeUnit.DAYS //天
* TimeUnit.HOURS //小時(shí)
* TimeUnit.MINUTES //分鐘
* TimeUnit.SECONDS //秒
* TimeUnit.MILLISECONDS //毫秒
* @return
*/
public Object brpop(String key, long timeout, TimeUnit timeUnit) {
try {
return redisTemplate.opsForList().rightPop(key, timeout, timeUnit);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 查看值
* @param key 鍵
* @param start 開(kāi)始
* @param end 結(jié)束 0 到 -1代表所有值
* @return
*/
public List<Object> lrange(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
以上就是Java redisTemplate阻塞式處理消息隊(duì)列的詳細(xì)內(nèi)容,更多關(guān)于Java redisTemplate 處理消息隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Boot 使用 Swagger 構(gòu)建 RestAPI 接口文檔
這篇文章主要介紹了Spring Boot 使用 Swagger 構(gòu)建 RestAPI 接口文檔,幫助大家更好的理解和使用Spring Boot框架,感興趣的朋友可以了解下2020-10-10
解決springboot 2.x集成log4j2調(diào)試日志無(wú)法關(guān)閉的問(wèn)題
這篇文章主要介紹了解決springboot 2.x集成log4j2調(diào)試日志無(wú)法關(guān)閉的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
Java并發(fā)Lock接口實(shí)現(xiàn)示例詳解
這篇文章主要為大家介紹了Java并發(fā)Lock接口,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
MyBatisPlus3.4.3版自動(dòng)生成代碼的使用過(guò)程
這篇文章主要介紹了MyBatisPlus3.4.3版自動(dòng)生成代碼的使用,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04
Spring?Cloud?Hystrix?服務(wù)降級(jí)限流策略詳解
這篇文章主要為大家介紹了Spring?Cloud?Hystrix?服務(wù)降級(jí)限流策略詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01
JAVA實(shí)現(xiàn)監(jiān)測(cè)tomcat是否宕機(jī)及控制重啟的方法
這篇文章主要介紹了JAVA實(shí)現(xiàn)監(jiān)測(cè)tomcat是否宕機(jī)及控制重啟的方法,可實(shí)現(xiàn)有效的檢測(cè)及控制tomcat服務(wù)器運(yùn)行,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-08-08
Java開(kāi)發(fā)之普通web項(xiàng)目轉(zhuǎn)為Maven項(xiàng)目的方法
這篇文章主要給大家介紹了關(guān)于Java開(kāi)發(fā)之普通web項(xiàng)目轉(zhuǎn)為Maven項(xiàng)目的相關(guān)資料,文中通過(guò)圖文將轉(zhuǎn)換的方法步驟介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12
Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換)
這篇文章主要介紹了Java前后端的JSON傳輸方式(前后端JSON格式轉(zhuǎn)換),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04

