Springboot使用RabbitMq延遲隊(duì)列和死信隊(duì)列詳解
前言
在最近的項(xiàng)目中,結(jié)合minio文件服務(wù)器的一些特性。
需要做一個(gè)分片上傳的功能:用戶上傳文件到md5的桶下,合并文件后刪除這個(gè)臨時(shí)桶。
會出現(xiàn)這樣一種情況,用戶上傳文件傳到一半就不再上傳了,那么如何去刪除,什么時(shí)候去刪除時(shí)需要解決問題。
一、業(yè)務(wù)解決方案
1.quartz定時(shí)器
如果是單體項(xiàng)目,可以考慮使用quartz定時(shí)器。在創(chuàng)建桶的時(shí)候加入到定時(shí)任務(wù)里。
2.redis定時(shí)器
redis定時(shí)器需要修改配置文件,并且對redis進(jìn)行監(jiān)聽,在創(chuàng)建桶時(shí),設(shè)置過時(shí)時(shí)間,一旦時(shí)間超時(shí),可以對key進(jìn)行捕捉,最好對名字進(jìn)行規(guī)范設(shè)計(jì)以便于業(yè)務(wù)
3.mq消息隊(duì)列
使用延遲隊(duì)列和死信隊(duì)列進(jìn)行定時(shí)任務(wù)
這篇主要講解mq的方式解決問題
二、RabbitMq延遲隊(duì)列
1.延遲隊(duì)列
延遲隊(duì)列也是一個(gè)普通的隊(duì)列,和普通的隊(duì)列相比,他多了幾個(gè)屬性,比如:
1)延遲的時(shí)間:表示隊(duì)列中消息的生命周期,在指定時(shí)間后,要么拋棄這個(gè)消息,要么投遞到死信隊(duì)列中
2)指定死信交換機(jī):如果不希望丟棄這個(gè)消息,那么可以將這個(gè)過期的消息丟到死信隊(duì)列中
定義一個(gè)延遲隊(duì)列:
//桶延遲隊(duì)列
@Bean(BUCKET_TTL_QUEUE)
public Queue bucketTtlQueue(){
Map<String,Object> deadParamsMap = new HashMap<>();
// 設(shè)置死信隊(duì)列的Exchange
deadParamsMap.put("x-dead-letter-exchange",BUCKET_DEAD_EXCHANGE);
//設(shè)置死信隊(duì)列的RouteKey
deadParamsMap.put("x-dead-letter-routing-key",BUCKET_DEAD_QUEUE);
// 設(shè)置對接過期時(shí)間"x-message-ttl"
deadParamsMap.put("x-message-ttl",60000*5);//5分鐘
// 設(shè)置對接可以存儲的最大消息數(shù)量
//deadParamsMap.put("x-max-length",10);
return new Queue(BUCKET_TTL_QUEUE,true,false,false,deadParamsMap);
}
延遲隊(duì)列交換機(jī)
如上所說,延遲隊(duì)列本就是一個(gè)普通的隊(duì)列,如果你想更細(xì)粒的對他進(jìn)行控制,那么需要綁定交換機(jī),如果不綁定交換機(jī),會綁定到默認(rèn)交換機(jī),在發(fā)送消息時(shí),交換機(jī)寫""就行,默認(rèn)交換機(jī)為直連交換機(jī);
我這里指定了延遲隊(duì)列的交換機(jī),因?yàn)闆]有做消息冪等性,所以采用直連交換機(jī)應(yīng)對在集群下消息只被消費(fèi)一次
//桶延遲交換機(jī)
@Bean(BUCKET_TTL_EXCHANGE)
public DirectExchange bucketTtlExchange() {
return new DirectExchange(BUCKET_TTL_EXCHANGE,true,false);
}
// 綁定
@Bean
public Binding bucketTtlBinding() {
return BindingBuilder.bind(bucketTtlQueue())
.to(bucketTtlExchange())
.with(BUCKET_TTL_QUEUE);
}
2.死信交換機(jī)
DLX也是一個(gè)正常的Exchange,和一般的Exchange沒有區(qū)別,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。
當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去,進(jìn)而被路由到另一個(gè)隊(duì)列
死信交換機(jī)也是普通交換機(jī),他只是你指定接收過期消息的交換機(jī)而已
/**
* 死信隊(duì)列
*
* @return
*/
@Bean(BUCKET_DEAD_QUEUE)
public Queue bucketDeadQueue() {
//屬性參數(shù) 隊(duì)列名稱 是否持久化
return new Queue(BUCKET_DEAD_QUEUE, true);
}
/**
* 死信隊(duì)列交換機(jī)
*
* @return
*/
@Bean(BUCKET_DEAD_EXCHANGE)
public DirectExchange bucketDeadExchange() {
return new DirectExchange(BUCKET_DEAD_EXCHANGE,true,false);
}
/**
* 給死信隊(duì)列綁定交換機(jī)
*
* @return
*/
@Bean
public Binding bucketDeadBinding() {
return BindingBuilder.bind(bucketDeadQueue()).to(bucketDeadExchange()).with(BUCKET_DEAD_QUEUE);
}
3.監(jiān)聽器
消息處理的邏輯,在消息過期后,送到死信交換機(jī)里,監(jiān)聽器監(jiān)聽到死信交換機(jī)的消息進(jìn)行刪除桶以及文件的業(yè)務(wù)邏輯處理
/**
* @description:死信隊(duì)列監(jiān)聽器,用來刪除過期的桶
* @author manchao
* @date 2022/2/17 9:52
*/
@Configuration
public class BucketDeadConsumer {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private MinioTemplate minioTemplate;
@Bean
public SimpleMessageListenerContainer BucketDeadListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
// 監(jiān)聽隊(duì)列名
container.setQueueNames(MyMqConfig.BUCKET_DEAD_QUEUE);
// 當(dāng)前消費(fèi)者數(shù)量 開啟幾個(gè)線程去處理數(shù)據(jù) 支持運(yùn)行時(shí)動(dòng)態(tài)修改
container.setConcurrentConsumers(5);
// 最大消費(fèi)者數(shù)量 , 消息堵塞太多的時(shí)候,會幫我自動(dòng)擴(kuò)展到我的最大消費(fèi)者數(shù)量
container.setMaxConcurrentConsumers(10);
// 是否重回隊(duì)列
container.setDefaultRequeueRejected(true);
// 手動(dòng)確認(rèn)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 設(shè)置監(jiān)聽器
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, Channel channel) throws IOException {
// 消息的唯一性ID deliveryTag:該消息的index 自增長
long deliveryTag = message.getMessageProperties().getDeliveryTag();
byte[] messageBody = message.getBody();
String s = new String(messageBody);
System.out.println("消息: " + s);
System.out.println("消息來自: "+message.getMessageProperties().getConsumerQueue());
System.out.println("交換機(jī): "+message.getMessageProperties().getReceivedExchange());
//刪除桶
try {
minioTemplate.removeBuckets(s, "");
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
channel.basicReject(deliveryTag, false);
}
}
});
return container;
}
}
注:我設(shè)置了消息發(fā)送和確認(rèn)的回調(diào)函數(shù),為什么沒有觸發(fā)這個(gè)函數(shù)?因?yàn)槲沂菑暮笈_管理頁面發(fā)的消息,沒有通過rabbitteplate進(jìn)行發(fā)送,不會是這個(gè)原因吧!
總結(jié)
業(yè)務(wù)的解決方法有太多種了,找到一個(gè)高可用以及簡便的方法才是解決問題的關(guān)鍵
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java super關(guān)鍵字知識點(diǎn)詳解
在本篇文章里小編給大家整理的是一篇關(guān)于java super關(guān)鍵字知識點(diǎn)詳解內(nèi)容,有興趣的朋友們可以參考下。2021-01-01
java基于控制臺的學(xué)生學(xué)籍管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java基于控制臺的學(xué)生學(xué)籍管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-07-07
基于java實(shí)現(xiàn)websocket代碼示例
這篇文章主要介紹了基于java實(shí)現(xiàn)websocket代碼示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-12-12
Java簡易學(xué)生成績系統(tǒng)寫法實(shí)例
在本篇文章里小編給大家分享的是關(guān)于Java簡易學(xué)生成績系統(tǒng)寫法實(shí)例以及相關(guān)知識點(diǎn),有需要的朋友們可以學(xué)習(xí)下。2019-09-09
SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信
MQTT非常適用于物聯(lián)網(wǎng)領(lǐng)域,本文主要介紹了SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-08-08
java實(shí)現(xiàn)堆排序以及時(shí)間復(fù)雜度的分析
本文主要介紹了java實(shí)現(xiàn)堆排序以及時(shí)間復(fù)雜度,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
使用Java實(shí)現(xiàn)KMZ和KML數(shù)據(jù)的直接解析
本文主要講解如何用JAVA語言,直接解析KMZ數(shù)據(jù),文章首先介紹google地圖中的KMZ和KML數(shù)據(jù),然后使用代碼的方式實(shí)現(xiàn)數(shù)據(jù)的解析,最后展示解析成果以及如何將數(shù)據(jù)轉(zhuǎn)換成空間WKT數(shù)據(jù),需要的朋友可以參考下2024-06-06
圖解Java經(jīng)典算法快速排序的原理與實(shí)現(xiàn)
快速排序是基于二分的思想,對冒泡排序的一種改進(jìn)。主要思想是確立一個(gè)基數(shù),將小于基數(shù)的數(shù)放到基數(shù)左邊,大于基數(shù)的數(shù)字放到基數(shù)的右邊,然后在對這兩部分進(jìn)一步排序,從而實(shí)現(xiàn)對數(shù)組的排序2022-09-09

