rabbitmq中routingkey的作用說明
對(duì)于消息發(fā)布者而言它只負(fù)責(zé)把消息發(fā)布出去,甚至它也不知道消息是發(fā)到哪個(gè)queue,消息通過exchange到達(dá)queue,exchange的職責(zé)非常簡(jiǎn)單,就是一邊接收發(fā)布者的消息一邊把這些消息推到queue中。
而exchange是怎么知道消息應(yīng)該推到哪個(gè)queue呢,這就要通過綁定queue與exchange時(shí)的routingkey了,通過代碼進(jìn)行綁定并且指定routingkey,下面有一張關(guān)系圖,p(發(fā)布者) —> x(exchange) bindding(綁定關(guān)系也就是我們的routingkey) 紅色代表著queue

我們來看代碼:
在消息的生產(chǎn)者端:
@Component
public class RabbitOrderSender {
//自動(dòng)注入RabbitTemplate模板類
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;
//回調(diào)函數(shù): confirm確認(rèn)
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
String messageId = correlationData.getId();
if(ack){
//如果confirm返回成功 則進(jìn)行更新
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
} else {
//失敗則進(jìn)行具體的后續(xù)操作:重試 或者補(bǔ)償?shù)仁侄?
System.err.println("異常處理...");
}
}
};
//發(fā)送消息方法調(diào)用: 構(gòu)建自定義對(duì)象消息
public void sendOrder(Order order) throws Exception {
// 通過實(shí)現(xiàn) ConfirmCallback 接口,消息發(fā)送到 Broker 后觸發(fā)回調(diào),確認(rèn)消息是否到達(dá) Broker 服務(wù)器,也就是只確認(rèn)是否正確到達(dá) Exchange 中
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息唯一ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("order-exchange", "order.ABC", order, correlationData);
}
}
利用rabbitTemplate(import org.springframework.amqp.rabbit.core.RabbitTemplate;需要在pom.xml中導(dǎo)入amqp的依賴)的convertAndSend方法就可以發(fā)送,這里order-exchange為交換機(jī)exchange,order.ABC為routingKey,并沒有指定對(duì)應(yīng)消息需要發(fā)往哪個(gè)隊(duì)列,還有指定消息回調(diào)。
在消息的消費(fèi)者端:
@Component
public class OrderReceiver {
//配置監(jiān)聽的哪一個(gè)隊(duì)列,同時(shí)在沒有queue和exchange的情況下會(huì)去創(chuàng)建并建立綁定關(guān)系
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name="order-exchange",durable = "true",type = "topic"),
key = "order.*"
)
)
@RabbitHandler//如果有消息過來,在消費(fèi)的時(shí)候調(diào)用這個(gè)方法
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws IOException {
//消費(fèi)者操作
System.out.println("---------收到消息,開始消費(fèi)---------");
System.out.println("訂單ID:"+order.getId());
/**
* Delivery Tag 用來標(biāo)識(shí)信道中投遞的消息。RabbitMQ 推送消息給 Consumer 時(shí),會(huì)附帶一個(gè) Delivery Tag,
* 以便 Consumer 可以在消息確認(rèn)時(shí)告訴 RabbitMQ 到底是哪條消息被確認(rèn)了。
* RabbitMQ 保證在每個(gè)信道中,每條消息的 Delivery Tag 從 1 開始遞增。
*/
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
/**
* multiple 取值為 false 時(shí),表示通知 RabbitMQ 當(dāng)前消息被確認(rèn)
* 如果為 true,則額外將比第一個(gè)參數(shù)指定的 delivery tag 小的消息一并確認(rèn)
*/
boolean multiple = false;
//ACK,確認(rèn)一條消息已經(jīng)被消費(fèi)。不然的話,在rabbitmq首頁會(huì)有Unacked顯示為未處理數(shù)1.
channel.basicAck(deliveryTag,multiple);
}
}
消費(fèi)者需要指定監(jiān)聽的隊(duì)列,routingkey,和exchage,如果在localhost:15672的rabbitmq的首頁沒有手動(dòng)創(chuàng)建,@RabbitListener會(huì)自動(dòng)幫我們創(chuàng)建的并綁定關(guān)系。rabbitmq的routingkey還可以用來過濾從隊(duì)列中取的的信息。
對(duì) rabbitmq 基本理解(exchange queue binding-key routing-key)
一 exchange queue binding-key routing-key概念及相互間的關(guān)系
1.queue :存儲(chǔ)消息的隊(duì)列,可以指定name來唯一確定
2.exchange:交換機(jī)(常用有三種),用于接收生產(chǎn)者發(fā)來的消息,并通過binding-key 與 routing-key 的匹配關(guān)系來決定將消息分發(fā)到指定queue
2.1 Direct(路由模式):完全匹配 > 當(dāng)消息的routing-key 與 exchange和queue間的binding-key完全匹配時(shí),將消息分發(fā)到該queue
2.2 Fanout (訂閱模式):與binding-key和routing-key無關(guān),將接受到的消息分發(fā)給有綁定關(guān)系的所有隊(duì)列(不論binding-key和routing-key是什么)
2.3 Topic (通配符模式):用消息的routing-key 與 exchange和queue間的binding-key 進(jìn)行模式匹配,當(dāng)滿足規(guī)則時(shí),分發(fā)到滿足規(guī)則的所有隊(duì)列
二 exchange queue binding-key routing-key的創(chuàng)建與使用
1. Fanout
ConnectionFactory connectionFactory = new ConnectionFactory(); // 獲取到tcp連接 Connection connection = connectionFactory.newConnection(); //從tcp連接中創(chuàng)建通道 Channel channel = connection.createChannel(); / 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明隊(duì)列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null); // 綁定隊(duì)列到交換機(jī) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 綁定隊(duì)列到交換機(jī) - aaa 是路由鍵3名稱(其實(shí)這里無作用) channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "aaa");
這樣就創(chuàng)建好隊(duì)列和交換機(jī)并且將它們綁定好了,只要交換機(jī)EXCHANGE_NAME收到消息就會(huì)分發(fā)給隊(duì)列1和2
// 消息內(nèi)容 String message = "qqqqqqqq"; channel.basicPublish(EXCHANGE_NAME, "aaa", null, message.getBytes()); // 這里路由鍵aaa有沒有都一樣,可以寫任何值
2.不顯式聲明交換機(jī)時(shí)并且發(fā)送消息不指定交換機(jī)
則默認(rèn)使用Direct,并且聲明隊(duì)列時(shí),不顯式綁定隊(duì)列與交換機(jī),則隊(duì)列以隊(duì)列名為routing-key綁定到默認(rèn)的direct交換機(jī),發(fā)送消息不指定交換機(jī)時(shí),則將消息發(fā)到默認(rèn)的direct交換機(jī)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解Java如何實(shí)現(xiàn)在PDF中插入,替換或刪除圖像
圖文并茂的內(nèi)容往往讓人看起來更加舒服,如果只是文字內(nèi)容的累加,往往會(huì)使讀者產(chǎn)生視覺疲勞。搭配精美的文章配圖則會(huì)使文章內(nèi)容更加豐富。那我們要如何在PDF中插入、替換或刪除圖像呢?別擔(dān)心,今天為大家介紹一種高效便捷的方法2023-01-01
詳解json在SpringBoot中的格式轉(zhuǎn)換
這篇文章主要介紹了詳解json在SpringBoot中的格式轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
關(guān)于java.lang.NumberFormatException: null的問題及解決
這篇文章主要介紹了關(guān)于java.lang.NumberFormatException: null的問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
Java計(jì)算一個(gè)數(shù)加上100是完全平方數(shù),加上168還是完全平方數(shù)
這篇文章主要介紹了Java計(jì)算一個(gè)數(shù)加上100是完全平方數(shù),加上168還是完全平方數(shù),需要的朋友可以參考下2017-02-02

