rabbitmq結(jié)合spring實(shí)現(xiàn)消息隊(duì)列優(yōu)先級(jí)的方法
1.1項(xiàng)目背景:做一個(gè)災(zāi)情預(yù)警的消息平臺(tái),災(zāi)情檢查系統(tǒng)需要向消息平臺(tái)里面推送消息,這里是典型的異構(gòu)系統(tǒng)的消息傳遞,我們需要選擇一個(gè)中間件作為消息隊(duì)列,調(diào)研分析了rabbitmq,zeromq,activemq,kafka等消息中間件,綜合性能,安全,可持久化等角度果斷選擇了rabbitmq作為我們的消息中間件 (其實(shí)這里是因?yàn)閞abbitmq 是spring官方支持的,開發(fā)起來(lái)方便)。需求上我們有多種類型的消息,這里有緊急推送的和一般的等區(qū)分,高并發(fā)時(shí),就會(huì)有對(duì)消息進(jìn)行優(yōu)先推送的情況出現(xiàn),于是rabbitmq消息隊(duì)優(yōu)先級(jí)的推送功能是我們需要解決的首個(gè)技術(shù)點(diǎn).
1.2技術(shù)調(diào)研:這里一個(gè)概念需要說(shuō)明,為什么說(shuō)是消息隊(duì)列的優(yōu)先級(jí)而不是消息的優(yōu)先級(jí),來(lái)看下消息隊(duì)列的工作原理

生產(chǎn)者生成消息打到交換機(jī)里面(如果沒有聲明交換機(jī),會(huì)打到default exchange里面),交換機(jī)綁定一個(gè)或多個(gè)隊(duì)列,消息進(jìn)入隊(duì)列里面,消費(fèi)者一直在監(jiān)聽隊(duì)列,發(fā)現(xiàn)隊(duì)列里面有消息就開始消費(fèi),這里就是一個(gè)消息傳遞的過(guò)程,queue是一個(gè)棧隊(duì)列,棧是先進(jìn)先出的,就是說(shuō)消息來(lái)了依次排隊(duì),一個(gè)隊(duì)列并不能實(shí)現(xiàn)消息的插隊(duì)和優(yōu)先推送的功能。但是如果說(shuō)我們的多個(gè)隊(duì)列有不同的優(yōu)先級(jí),不同優(yōu)先級(jí)的消息通過(guò)roatingkey進(jìn)入不同的隊(duì)列,優(yōu)先級(jí)高的隊(duì)列消息被優(yōu)先消費(fèi),這樣也能形成一個(gè)相對(duì)意義上的優(yōu)先級(jí),所以說(shuō)這里不是消息的優(yōu)先級(jí)而是隊(duì)列的優(yōu)先級(jí).
1.2.1 為什么說(shuō)是相對(duì)意義上的優(yōu)先級(jí)
有并發(fā)才有優(yōu)先級(jí),如果每個(gè)消息都能被瞬間處理也不會(huì)有消息優(yōu)先推送的需求,那我們看看消息會(huì)在哪里阻塞
1,queue,很明顯高并發(fā)的時(shí)候隊(duì)列里面是會(huì)存在很多消息的,2,eschange ,高并發(fā)的時(shí)候producer發(fā)送給exchange的時(shí)候也會(huì)產(chǎn)生阻塞。
第一種情況由于我們隊(duì)列已經(jīng)定義優(yōu)先級(jí)了,所以進(jìn)入隊(duì)列的消息都是同種優(yōu)先級(jí)別的,并不需要插隊(duì)。而對(duì)于第二種情況,消息在exchange時(shí)阻塞時(shí)并不能實(shí)現(xiàn)消息優(yōu)先進(jìn)入隊(duì)列,依然是一個(gè)依次處理的情景,但是由于exchang到queue的處理速度極快,所有我們忽略了這塊的優(yōu)先級(jí)。
1.2.3 代碼實(shí)現(xiàn)
在rabbitmq3.5版本之前,官方并沒有實(shí)現(xiàn)隊(duì)列優(yōu)先級(jí)的功能,但論壇里面有一些插件可以實(shí)現(xiàn)(末尾附鏈接),這里我們主要說(shuō)3.5版本之后的實(shí)現(xiàn)
1.2.3.1 Java代碼
Connectionconn =RabbitMQConnectionUtil.getRabbitmqConnection();//創(chuàng)建連接
Channelchannel = conn.createChannel();//創(chuàng)建channel
Map<String,Object> arg = newHashMap<String, Object>();
arg.put("x-max-priority",10); //隊(duì)列的屬性參數(shù) 有10個(gè)優(yōu)先級(jí)別
// 聲明(創(chuàng)建)隊(duì)列
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME,true,false, false, arg);
// 消息內(nèi)容
String message ="Hello World!";
channel.basicPublish("",QUEUE_NAME, null, message.getBytes());
BasicPropertiesprop =new BasicProperties(null, null, null, null, 1,
null, null, null, null, null, null, null, null,null);//消息的參數(shù),聲明該消息的優(yōu)先級(jí)是1
channel.basicPublish("",QUEUE_NAME, prop, message.getBytes()); //消息發(fā)布
System.out.println("[x] Sent '" + message + "'");
//關(guān)閉通道和連接
channel.close();
conn.close();
客戶端看下結(jié)果:

1.2.3.2結(jié)合spring實(shí)現(xiàn):
1.2.3.2.1 xml配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
<description>rabbitmq 連接服務(wù)配置</description>
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" username="${rabbit.username}"
password="${rabbit.password}" port="${rabbit.port}" virtual-host="${rabbit.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<!-- spring template聲明-->
<!-- 聲明一個(gè)隊(duì)列 -->
<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value>//這個(gè)地方一定是integer的,別的不好使??!
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 監(jiān)聽配置queues:監(jiān)聽的隊(duì)列,多個(gè)的話用逗號(hào)(,)分隔 ref:監(jiān)聽器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queue-names="test_queue_key" ref="queueListenter" method="onMessage"/>
</rabbit:listener-container>
<bean id="queueListenter" class="com.DF.spring.springAMQP.QueueListener" />
1.2.3.2.2代碼部分:
producter:
AbstractApplicationContext ctx = new
ClassPathXmlApplicationContext("classpath:/spring/rabbitmq-contextDemo2.xml");
RabbitTemplate amqpTemplate = ctx.getBean(RabbitTemplate.class);
Random random = new Random();
for (int i=0; i< 1000; i++){
final int priority = random.nextInt(10 - 1 + 1) + 1;//隨機(jī)的優(yōu)先級(jí)
amqpTemplate.convertAndSend("test_queue_key", (Object)("hello world"), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(priority);
return message;
}
});
}
customer:
public class QueueListener implements MessageListener{
@Override
public void onMessage(Message message) {
try{
System.out.print("[x] 接收到的消息:"+new String(message.getBody(),"utf-8")+"&&&"+"優(yōu)先級(jí)"+message.getMessageProperties().getPrority());
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
}
}
從客戶端看下隊(duì)列里面的消息:

我們發(fā)送隨機(jī)優(yōu)先級(jí)的消息進(jìn)入隊(duì)列,看看消費(fèi)端打印出來(lái)的消息:

到這里,rabbitmq結(jié)合spring的demo功能實(shí)現(xiàn)......
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- springboot實(shí)現(xiàn)rabbitmq的隊(duì)列初始化和綁定
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
- 消息隊(duì)列 RabbitMQ 與 Spring 整合使用的實(shí)例代碼
- Spring學(xué)習(xí)筆記3之消息隊(duì)列(rabbitmq)發(fā)送郵件功能
- Spring項(xiàng)目集成RabbitMQ及自動(dòng)創(chuàng)建隊(duì)列
相關(guān)文章
Java?Valhalla?Project項(xiàng)目介紹
這篇文章主要介紹了Java?Valhalla?Project項(xiàng)目介紹,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09
Java?Mybatis查詢數(shù)據(jù)庫(kù)舉例詳解
這篇文章主要給大家介紹了關(guān)于Java?Mybatis查詢數(shù)據(jù)庫(kù)的相關(guān)資料,在MyBatis中可以使用遞歸查詢實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)中樹形結(jié)構(gòu)數(shù)據(jù)的查詢,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-10-10
使用迭代器模式來(lái)進(jìn)行Java的設(shè)計(jì)模式編程
這篇文章主要介紹了使用迭代器模式來(lái)進(jìn)行Java的設(shè)計(jì)模式編程,文中對(duì)迭代器模式中的容器封裝方面的知識(shí)進(jìn)行了講解,需要的朋友可以參考下2016-02-02
Java 中Json中既有對(duì)象又有數(shù)組的參數(shù)如何轉(zhuǎn)化成對(duì)象(推薦)
Gson庫(kù)是一個(gè)功能強(qiáng)大、易于使用的Java序列化/反序列化庫(kù),它提供了豐富的API來(lái)支持Java對(duì)象和JSON之間的轉(zhuǎn)換,這篇文章主要介紹了Java 中Json中既有對(duì)象又有數(shù)組的參數(shù)如何轉(zhuǎn)化成對(duì)象,需要的朋友可以參考下2024-07-07
RabbitMQ實(shí)現(xiàn)消息可靠性傳遞過(guò)程講解
消息的可靠性傳遞是指保證消息百分百發(fā)送到消息隊(duì)列中去,這篇文章主要介紹了RabbitMQ實(shí)現(xiàn)消息可靠性傳遞過(guò)程,感興趣想要詳細(xì)了解可以參考下文2023-05-05

