SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列的示例詳解
如何保證消息不丟失
rabbitmq消息投遞路徑
生產(chǎn)者->交換機(jī)->隊(duì)列->消費(fèi)者
總的來說分為三個(gè)階段。
- 1.生產(chǎn)者保證消息投遞可靠性。
- 2.mq內(nèi)部消息不丟失。
- 3.消費(fèi)者消費(fèi)成功。
什么是消息投遞可靠性
簡(jiǎn)單點(diǎn)說就是消息百分百發(fā)送到消息隊(duì)列中。
我們可以開啟confirmCallback
生產(chǎn)者投遞消息后,mq會(huì)給生產(chǎn)者一個(gè)ack.根據(jù)ack,生產(chǎn)者就可以確認(rèn)這條消息是否發(fā)送到mq.
開啟confirmCallback
修改配置文件
#NONE:禁用發(fā)布確認(rèn)模式,是默認(rèn)值,CORRELATED:發(fā)布消息成功到交換器后會(huì)觸發(fā)回調(diào)方法
spring:
rabbitmq:
publisher-confirm-type: correlated
測(cè)試代碼
@Test
public void testConfirmCallback() throws InterruptedException {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置
* @param ack 交換機(jī)是否收到消息,true是成功,false是失敗
* @param cause 失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm=====>");
System.out.println("confirm==== ack="+ack);
System.out.println("confirm==== cause="+cause);
//根據(jù)ACK狀態(tài)做對(duì)應(yīng)的消息更新操作 TODO
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "雞你太美");
Thread.sleep(10000);
}通過returnCallback保證消息從交換器發(fā)送到隊(duì)列成功。 修改配置文件
spring:
rabbitmq:
#開啟returnCallback
publisher-returns: true
#交換機(jī)處理消息到路由失敗,則會(huì)返回給生產(chǎn)者
template:
mandatory: true
測(cè)試代碼
@Test
void testReturnCallback() {
//為true,則交換機(jī)處理消息到路由失敗,則會(huì)返回給生產(chǎn)者 配置文件指定,則這里不需指定
rabbitTemplate.setMandatory(true);
//開啟強(qiáng)制消息投遞(mandatory為設(shè)置為true),但消息未被路由至任何一個(gè)queue,則回退一條消息
rabbitTemplate.setReturnsCallback(returned -> {
int code = returned.getReplyCode();
System.out.println("code="+code);
System.out.println("returned="+ returned);
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","測(cè)試returnCallback");
}消費(fèi)者消費(fèi)消息時(shí)需要通過ack手動(dòng)確認(rèn)消息已消費(fèi)。
修改配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual編寫測(cè)試代碼
@RabbitHandler
public void consumer(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+ message);
System.out.println("body="+body);
//成功確認(rèn),使用此回執(zhí)方法后,消息會(huì)被 rabbitmq broker 刪除
channel.basicAck(msgTag,false);
// channel.basicNack(msgTag,false,true);
}
deliveryTags是消息投遞序號(hào),每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會(huì)增加
ttl死信隊(duì)列
什么是死信隊(duì)列
沒有被及時(shí)消費(fèi)的消息存放的隊(duì)列
消息有哪幾種情況成為死信
- 消費(fèi)者拒收消息 (basic.reject/ basic.nack) ,并且沒有重新入隊(duì) requeue=false
- 消息在隊(duì)列中未被消費(fèi),且超過隊(duì)列或者消息本身的過期時(shí)間TTL(time-to-live)
- 隊(duì)列的消息長(zhǎng)度達(dá)到極限
- 結(jié)果:消息成為死信后,如果該隊(duì)列綁定了死信交換機(jī),則消息會(huì)被死信交換機(jī)重新路由到死信隊(duì)列
死信隊(duì)列經(jīng)常用來做延遲隊(duì)列消費(fèi)。
延遲隊(duì)列
生產(chǎn)者投遞到mq中并不希望這條消息立馬被消費(fèi),而是等待一段時(shí)間后再去消費(fèi)。
springboot整合rabbitmq實(shí)現(xiàn)訂單超時(shí)自動(dòng)關(guān)閉
package com.fandf.test.rabbit;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author fandongfeng
* @date 2023/4/15 15:38
*/
@Configuration
public class RabbitMQConfig {
/**
* 訂單交換機(jī)
*/
public static final String ORDER_EXCHANGE = "order_exchange";
/**
* 訂單隊(duì)列
*/
public static final String ORDER_QUEUE = "order_queue";
/**
* 訂單路由key
*/
public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";
/**
* 死信交換機(jī)
*/
public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
/**
* 死信隊(duì)列 routingKey
*/
public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";
/**
* 死信隊(duì)列
*/
public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";
/**
* 創(chuàng)建死信交換機(jī)
*/
@Bean("orderDeadLetterExchange")
public Exchange orderDeadLetterExchange() {
return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
}
/**
* 創(chuàng)建死信隊(duì)列
*/
@Bean("orderDeadLetterQueue")
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
}
/**
* 綁定死信交換機(jī)和死信隊(duì)列
*/
@Bean("orderDeadLetterBinding")
public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
}
/**
* 創(chuàng)建訂單交換機(jī)
*/
@Bean("orderExchange")
public Exchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE, true, false);
}
/**
* 創(chuàng)建訂單隊(duì)列
*/
@Bean("orderQueue")
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>(3);
//消息過期后,進(jìn)入到死信交換機(jī)
args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);
//消息過期后,進(jìn)入到死信交換機(jī)的路由key
args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);
//過期時(shí)間,單位毫秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
}
/**
* 綁定訂單交換機(jī)和隊(duì)列
*/
@Bean("orderBinding")
public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
}
}消費(fèi)者
package com.fandf.test.rabbit;
import cn.hutool.core.date.DateUtil;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author fandongfeng
* @date 2023/4/15 15:42
*/
@Component
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {
@RabbitHandler
public void consumer(String body, Message message, Channel channel) throws IOException {
System.out.println("收到消息:" + DateUtil.now());
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag=" + msgTag);
System.out.println("message=" + message);
System.out.println("body=" + body);
channel.basicAck(msgTag, false);
}
}測(cè)試類
@Test
void testOrder() throws InterruptedException {
//為true,則交換機(jī)處理消息到路由失敗,則會(huì)返回給生產(chǎn)者 配置文件指定,則這里不需指定
rabbitTemplate.setMandatory(true);
//開啟強(qiáng)制消息投遞(mandatory為設(shè)置為true),但消息未被路由至任何一個(gè)queue,則回退一條消息
rabbitTemplate.setReturnsCallback(returned -> {
int code = returned.getReplyCode();
System.out.println("code=" + code);
System.out.println("returned=" + returned);
});
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "測(cè)試訂單延遲");
System.out.println("發(fā)送消息:" + DateUtil.now());
Thread.sleep(20000);
}程序輸出
發(fā)送消息:2023-04-16 15:14:34
收到消息:2023-04-16 15:14:44
msgTag=1
message=(Body:'測(cè)試訂單延遲' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15:14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
body=測(cè)試訂單延遲
到此這篇關(guān)于SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列的示例詳解的文章就介紹到這了,更多相關(guān)SpringBoot RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)簡(jiǎn)單郵件發(fā)送
這篇文章主要介紹了Java實(shí)現(xiàn)簡(jiǎn)單郵件發(fā)送的相關(guān)資料,實(shí)例講解了java郵件發(fā)送實(shí)現(xiàn)方法,感興趣的小伙伴們可以參考一下2016-02-02
Idea配置Maven阿里云鏡像加速的實(shí)現(xiàn)
這篇文章主要介紹了Idea配置Maven阿里云鏡像加速的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
解決Spring security5.5.7報(bào)錯(cuò)Encoded password does
這篇文章主要介紹了解決Spring security5.5.7出現(xiàn)Encoded password does not look like BCrypt異常問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
解決SpringBoot框架因post數(shù)據(jù)量過大沒反應(yīng)問題(踩坑)
這篇文章主要介紹了解決SpringBoot框架因post數(shù)據(jù)量過大沒反應(yīng)問題(踩坑),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09
使用IDEA搭建MyBatis環(huán)境詳細(xì)過程
這篇文章主要介紹了使用IDEA搭建MyBatis環(huán)境的相關(guān)知識(shí),包括創(chuàng)建項(xiàng)目的過程及導(dǎo)入mybatis的核心jar包的詳細(xì)說明,本文通過圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-05-05
Java使用XML與注解方式實(shí)現(xiàn)CRUD操作代碼
MyBatis提供了靈活的配置和使用方式,使得數(shù)據(jù)庫(kù)操作更加簡(jiǎn)潔和高效,通過本文,我們介紹了如何使用MyBatis框架,通過XML映射文件和注解兩種方式來實(shí)現(xiàn)數(shù)據(jù)庫(kù)的增刪改查操作,感興趣的朋友跟隨小編一起看看吧2024-02-02

