SpringBoot整合RabbitMQ及生產(chǎn)全場景高級特性實戰(zhàn)
摘要
整合場景含 topic 工作模式(通過 routingKey 可滿足簡單/工作隊列/發(fā)布訂閱/路由等四種工作模式)和 confirm(消息確認(rèn))、return(消息返回)、basicAck(消息簽收)、basicNack(拒絕簽收)、DLX(Dead Letter Exchange死信隊列)實現(xiàn)延時/定時任務(wù)等。
整合
依賴與配置
以下內(nèi)容消費者同生產(chǎn)者
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
server.port=8090 spring.rabbitmq.host=192.168.168.10 spring.rabbitmq.port=5672 spring.rabbitmq.username=zheng123 spring.rabbitmq.password=zheng123 spring.rabbitmq.virtual-host=/zheng spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.listener.direct.acknowledge-mode=manual
生產(chǎn)者配置消息隊列規(guī)則
下邊是兩種配置方式,本次整合示例中使用第一個配置
@Configuration
public class TopicConfig {
// 聲明隊列
@Bean
public Queue topicQ1() {
return new Queue("topic_sb_mq_q1");
}
// 聲明隊列并綁定該隊列到死信交換機(jī)(返回值有兩種寫法,任選一種都可以)
// 測試死信需要關(guān)閉原隊列的監(jiān)聽
@Bean
public Queue topicQ2() {
return QueueBuilder.durable("topic_sb_mq_q2")
.withArgument("x-dead-letter-exchange", "topicExchange")
.withArgument("x-dead-letter-routing-key", "changsha.f")
.withArgument("x-message-ttl", 10000)
.build();
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange","topicExchange");
arguments.put("x-dead-letter-routing-key","changsha.f");
arguments.put("x-message-ttl",10000);
return new Queue("topic_sb_mq_q2",true,false,false,arguments);
}
//聲明exchange
@Bean
public TopicExchange setTopicExchange() {
return new TopicExchange("topicExchange");
}
//聲明binding,需要聲明一個routingKey
@Bean
public Binding bindTopicHebei1() {
return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*");
}
@Bean
public Binding bindTopicHebei2() {
return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing");
}
}
@Configuration
public class RabbitMqConfig {
//定義交換機(jī)的名字
public static final String EXCHANGE_NAME = "boot_topic_exchange";
//定義隊列的名字
public static final String QUEUE_NAME = "boot_queue";
//1、聲明交換機(jī)
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2、聲明隊列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3、隊列與交換機(jī)進(jìn)行綁定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
// topic模式兼容廣播模式,路由模式。with("#")則類似廣播模式匹配所有訂閱者;with("boot.1")則類似路由模式匹配指定訂閱者
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
生產(chǎn)者發(fā)布消息
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/topicSend")
public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
// 定義 confirm 回調(diào)
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// confirmed
} else {
// nack-ed
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routKey)->{
// return message
});
if(null == routingKey) {
routingKey="changsha.kf";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里發(fā)送消息。分發(fā)到exchange下的所有queue
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : routingKey >"+routingKey+";message > "+message;
}
}
消費者監(jiān)聽消息
@Component
public class ConcumerReceiver {
//topic 模式
//注意這個模式會有優(yōu)先匹配原則。例如發(fā)送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不會再去匹配*.ITd
@RabbitListener(queues="topic_sb_mq_q1")
public void topicReceiveq1(String msg,Message message, Channel channel) throws IOException {
// 消息id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// message.getBody() todosomething
// 簽收消息
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 拒絕簽收
// 第三個參數(shù):requeue:重回隊列。如果設(shè)置為true,則消息重新回到queue,broker會重新發(fā)送該消息給消費端
channel.basicNack(deliveryTag, true, true);
}
}
@RabbitListener(queues="topic_sb_mq_q2")
public void topicReceiveq2(String message) {
System.out.println("Topic模式 topic_sb_mq_q2 received message : " +message);
}
}
到此這篇關(guān)于SpringBoot整合RabbitMQ及生產(chǎn)全場景高級特性實戰(zhàn)的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ的5種模式實戰(zhàn)
- SpringBoot整合RabbitMQ實現(xiàn)消息確認(rèn)機(jī)制
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- Springboot整合Rabbitmq之Confirm和Return機(jī)制
- Springboot整合RabbitMq測試TTL的方法詳解
- 詳解SpringBoot整合RabbitMQ如何實現(xiàn)消息確認(rèn)
- SpringBoot整合RabbitMQ實現(xiàn)交換機(jī)與隊列的綁定
- SpringBoot整合RabbitMQ實戰(zhàn)教程附死信交換機(jī)
- springboot整合消息隊列RabbitMQ
相關(guān)文章
使用Java實現(xiàn)HTTP和HTTPS代理服務(wù)詳解
這篇文章主要為大家詳細(xì)介紹了如何使用Java實現(xiàn)HTTP和HTTPS代理服務(wù),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-04-04
spring @Scheduled定時任務(wù)注解使用方法及注意事項小結(jié)
Spring的@Scheduled注解用于定時任務(wù)調(diào)度,默認(rèn)單線程依次執(zhí)行,可以通過配置多線程調(diào)度器或使用@Async注解實現(xiàn)并行執(zhí)行,常見參數(shù)包括cron、fixedRate、fixedDelay、initialDelay等,本文介紹spring @Scheduled定時任務(wù)注解使用方法,感興趣的朋友一起看看吧2025-02-02
SpringBoot整合MP通過Redis實現(xiàn)二級緩存方式
這篇文章主要介紹了SpringBoot整合MP通過Redis實現(xiàn)二級緩存方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
詳細(xì)分析Java并發(fā)集合LinkedBlockingQueue的用法
這篇文章主要介紹了詳細(xì)分析Java并發(fā)集合LinkedBlockingQueue的用法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-04-04

