SpringBoot整合RabbitMQ實戰(zhàn)教程附死信交換機
前言
使用springboot,實現(xiàn)以下功能,有兩個隊列1、2,往里面發(fā)送消息,如果處理失敗發(fā)生異常,可以重試3次,重試3次均失敗,那么就將消息發(fā)送到死信隊列進行統(tǒng)一處理,例如記錄數(shù)據(jù)庫、報警等
完整demo項目代碼https://gitee.com/daenmax/rabbit-mq-demo
環(huán)境
Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.雙擊C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat啟動MQ服務(wù)
2.然后訪問http://localhost:15672/,默認賬號密碼均為guest,
3.手動添加一個虛擬主機為admin_host,手動創(chuàng)建一個用戶賬號密碼均為admin
pom.xml
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.0</version>
</dependency>配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: admin_host
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
retry:
enabled: true #開啟失敗重試
max-attempts: 3 #最大重試次數(shù)
initial-interval: 1000 #重試間隔時間 毫秒配置文件
RabbitConfig
package com.example.rabitmqdemo.mydemo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Broker:它提供一種傳輸服務(wù),它的角色就是維護一條從生產(chǎn)者到消費者的路線,保證數(shù)據(jù)能按照指定的方式進行傳輸,
* Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
* Queue:消息的載體,每個消息都會被投到一個或多個隊列。
* Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來.
* Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進行消息投遞。
* vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權(quán)限分離。
* Producer:消息生產(chǎn)者,就是投遞消息的程序.
* Consumer:消息消費者,就是接受消息的程序.
* Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
*/
@Slf4j
@Component
public class RabbitConfig {
//業(yè)務(wù)交換機
public static final String EXCHANGE_PHCP = "phcp";
//業(yè)務(wù)隊列1
public static final String QUEUE_COMPANY = "company";
//業(yè)務(wù)隊列1的key
public static final String ROUTINGKEY_COMPANY = "companyKey";
//業(yè)務(wù)隊列2
public static final String QUEUE_PROJECT = "project";
//業(yè)務(wù)隊列2的key
public static final String ROUTINGKEY_PROJECT = "projectKey";
//死信交換機
public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";
//死信隊列1
public static final String QUEUE_COMPANY_DEAD = "company_dead";
//死信隊列2
public static final String QUEUE_PROJECT_DEAD = "project_dead";
//死信隊列1的key
public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";
//死信隊列2的key
public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";
// /**
// * 解決重復確認報錯問題,如果沒有報錯的話,就不用啟用這個
// *
// * @param connectionFactory
// * @return
// */
// @Bean
// public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// factory.setConnectionFactory(connectionFactory);
// factory.setMessageConverter(new Jackson2JsonMessageConverter());
// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// return factory;
// }
/**
* 聲明業(yè)務(wù)交換機
* 1. 設(shè)置交換機類型
* 2. 將隊列綁定到交換機
* FanoutExchange: 將消息分發(fā)到所有的綁定隊列,無routingkey的概念
* HeadersExchange :通過添加屬性key-value匹配
* DirectExchange:按照routingkey分發(fā)到指定隊列
* TopicExchange:多關(guān)鍵字匹配
*/
@Bean("exchangePhcp")
public DirectExchange exchangePhcp() {
return new DirectExchange(EXCHANGE_PHCP);
}
* 聲明死信交換機
@Bean("exchangePhcpDead")
public DirectExchange exchangePhcpDead() {
return new DirectExchange(EXCHANGE_PHCP_DEAD);
* 聲明業(yè)務(wù)隊列1
*
* @return
@Bean("queueCompany")
public Queue queueCompany() {
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);
//綁定該隊列到死信交換機的隊列1
arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);
return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();
* 聲明業(yè)務(wù)隊列2
@Bean("queueProject")
public Queue queueProject() {
//綁定該隊列到死信交換機的隊列2
arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);
return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();
* 聲明死信隊列1
@Bean("queueCompanyDead")
public Queue queueCompanyDead() {
return new Queue(QUEUE_COMPANY_DEAD);
* 聲明死信隊列2
@Bean("queueProjectDead")
public Queue queueProjectDead() {
return new Queue(QUEUE_PROJECT_DEAD);
* 綁定業(yè)務(wù)隊列1和業(yè)務(wù)交換機
* @param queue
* @param directExchange
@Bean
public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);
* 綁定業(yè)務(wù)隊列2和業(yè)務(wù)交換機
public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);
* 綁定死信隊列1和死信交換機
public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);
* 綁定死信隊列2和死信交換機
public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);
}
生產(chǎn)者
RabbltProducer
package com.example.rabitmqdemo.mydemo.producer;
import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 初始化消息確認函數(shù)
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
}
/**
* 發(fā)送消息服務(wù)器確認函數(shù)
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息發(fā)送成功" + correlationData);
} else {
System.out.println("消息發(fā)送失敗:" + cause);
}
}
/**
* 消息發(fā)送失敗,消息回調(diào)函數(shù)
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
String str = new String(returnedMessage.getMessage().getBody());
System.out.println("消息發(fā)送失?。? + str);
}
/**
* 處理消息發(fā)送到隊列1
* @param str
*/
public void sendCompany(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);
}
/**
* 處理消息發(fā)送到隊列2
* @param str
*/
public void sendProject(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/json");
Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
//也可以用下面的方式
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);
}
}
業(yè)務(wù)消費者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 監(jiān)聽業(yè)務(wù)交換機
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumer {
/**
* 監(jiān)聽業(yè)務(wù)隊列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company")
public void company(Message message, Channel channel) throws IOException {
try{
System.out.println("次數(shù)" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("處理消息"+s);
//下面兩行是嘗試手動拋出異常,用來測試重試次數(shù)和發(fā)送到死信交換機
//String str = null;
//str.split("1");
//處理成功,確認應(yīng)答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("處理消息時發(fā)生異常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("異常重試次數(shù)已到達設(shè)置次數(shù),將發(fā)送到死信交換機");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即將返回隊列處理重試");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* 監(jiān)聽業(yè)務(wù)隊列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project")
public void project(Message message, Channel channel) throws IOException {
try{
System.out.println("次數(shù)" + message.getMessageProperties().getDeliveryTag());
channel.basicQos(1);
Thread.sleep(2000);
String s = new String(message.getBody());
log.info("處理消息"+s);
//下面兩行是嘗試手動拋出異常,用來測試重試次數(shù)和發(fā)送到死信交換機
//String str = null;
//str.split("1");
//處理成功,確認應(yīng)答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("處理消息時發(fā)生異常:"+e.getMessage());
Boolean redelivered = message.getMessageProperties().getRedelivered();
if(redelivered){
log.error("異常重試次數(shù)已到達設(shè)置次數(shù),將發(fā)送到死信交換機");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}else {
log.error("消息即將返回隊列處理重試");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
死信消費者
RabbitConsumer
package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 監(jiān)聽死信交換機
* @author JeWang
*/
@Component
@Slf4j
public class RabbitConsumerDead {
/**
* 處理死信隊列1
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "company_dead")
public void company_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("處理死信"+s);
//在此處記錄到數(shù)據(jù)庫、報警之類的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收異常:"+e.getMessage());
}
}
/**
* 處理死信隊列2
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = "project_dead")
public void project_dead(Message message, Channel channel) throws IOException {
try{
channel.basicQos(1);
String s = new String(message.getBody());
log.info("處理死信"+s);
//在此處記錄到數(shù)據(jù)庫、報警之類的操作
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("接收異常:"+e.getMessage());
}
}
}
測試
MqController
package com.example.rabitmqdemo.mydemo.controller;
import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RequestMapping("/def")
@RestController
@Slf4j
public class MsgController {
@Resource
private RabbltProducer rabbltProducer;
@RequestMapping("/handleCompany")
public void handleCompany(@RequestBody String jsonStr){
rabbltProducer.sendCompany(jsonStr);
}
}
到此這篇關(guān)于SpringBoot整合RabbitMQ實戰(zhàn)附加死信交換機的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ死信交換機內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實現(xiàn)優(yōu)雅停止線程的有效方法詳解
這篇文章主要為大家詳細如何安全有效停止 Java 線程的,確保多線程應(yīng)用程序平穩(wěn)運行并實現(xiàn)最佳資源管理,感興趣的小伙伴可以跟隨小編一起學習一下2023-12-12
spring?security?自定義Provider?如何實現(xiàn)多種認證
這篇文章主要介紹了spring?security?自定義Provider實現(xiàn)多種認證方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
Java 和 Javascript 的 Date 與 .Net 的 DateTime 之間的相互轉(zhuǎn)換
這篇文章主要介紹了Java 和 Javascript 的 Date 與 .Net 的 DateTime 之間的相互轉(zhuǎn)換的相關(guān)資料,非常不錯具有參考借鑒價值,需要的朋友可以參考下2016-06-06
Java如何使用遞歸查詢多級樹形結(jié)構(gòu)數(shù)據(jù)(多級菜單)
這篇文章主要介紹了Java如何使用遞歸查詢多級樹形結(jié)構(gòu)數(shù)據(jù)(多級菜單),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07

