RocketMQ生產(chǎn)消息與消費(fèi)消息超詳細(xì)講解
1 RocketMQ簡(jiǎn)介
RocketMQ是阿里開源的一款非常優(yōu)秀中間件產(chǎn)品,脫胎于阿里的另一款隊(duì)列技術(shù)MetaQ,后捐贈(zèng)給Apache基金會(huì)作為一款孵化技術(shù),僅僅經(jīng)歷了一年多的時(shí)間就成為Apache基金會(huì)的頂級(jí)項(xiàng)目。并且它現(xiàn)在已經(jīng)在阿里內(nèi)部被廣泛的應(yīng)用,并且經(jīng)受住了多次雙十一的這種極致場(chǎng)景的壓力(2017年的雙十一,RocketMQ流轉(zhuǎn)的消息量達(dá)到了萬億級(jí),峰值TPS達(dá)到5600萬)
2 MQ的常見產(chǎn)品
ActiveMQ:java語言實(shí)現(xiàn),萬級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),主從架構(gòu),成熟度高
RabbitMQ :erlang語言實(shí)現(xiàn),萬級(jí)數(shù)據(jù)吞吐量,處理速度us級(jí),主從架構(gòu),
RocketMQ :java語言實(shí)現(xiàn),十萬級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),分布式架構(gòu),功能強(qiáng),擴(kuò)展性強(qiáng)
kafka :scala語言實(shí)現(xiàn),十萬級(jí)數(shù)據(jù)吞吐量,處理速度ms級(jí),分布式架構(gòu),功能較少,應(yīng)用于大數(shù)據(jù)較多
3 環(huán)境搭建
創(chuàng)建maven工程

引入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
4 單生產(chǎn)者單消費(fèi)者模式
生產(chǎn)者:
//生產(chǎn)者,產(chǎn)生消息
public class Producer {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建一個(gè)發(fā)送消息的對(duì)象Producer
DefaultMQProducer producer=new DefaultMQProducer("group1");
//2.設(shè)定發(fā)送的命名服務(wù)器地址
producer.setNamesrvAddr("192.168.23.127:9876");
//3啟動(dòng)發(fā)送的服務(wù)
producer.start();
//4.1創(chuàng)建要發(fā)送的消息對(duì)象,指定topic,指定內(nèi)容body
Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8"));
//4.2發(fā)送消息
SendResult result = producer.send(msg);
System.out.println("返回結(jié)果:"+result);
//5.關(guān)閉連接
producer.shutdown();
}
}
消費(fèi)者:
//消費(fèi)者,消費(fèi)消息
public class Consumer {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
//2.設(shè)定接收的命名服務(wù)器地址
consumer.setNamesrvAddr("192.168.23.127:9876");
//3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意*
consumer.subscribe("topic1","*");
//4.開啟監(jiān)聽,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者
}
});
//5.啟動(dòng)接收消息的服務(wù)
consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行
System.out.println("接收消息服務(wù)已運(yùn)行");
}
}
測(cè)試:


5 單生產(chǎn)者多消費(fèi)者模式
5.1默認(rèn)模式(負(fù)載均衡)
生產(chǎn)者:
//生產(chǎn)者,產(chǎn)生消息
public class Producer {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建一個(gè)發(fā)送消息的對(duì)象Producer
DefaultMQProducer producer=new DefaultMQProducer("group1");
//2.設(shè)定發(fā)送的命名服務(wù)器地址
producer.setNamesrvAddr("192.168.23.127:9876");
//3啟動(dòng)發(fā)送的服務(wù)
producer.start();
for (int i = 1; i <= 10; i++) {
Message msg = new Message("topic1",("生產(chǎn)者2: hello rocketmq "+i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回結(jié)果:"+result);
}
//5.關(guān)閉連接
producer.shutdown();
}
}
消費(fèi)者:
//消費(fèi)者,消費(fèi)消息
public class Consumer {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
//2.設(shè)定接收的命名服務(wù)器地址
consumer.setNamesrvAddr("192.168.23.127:9876");
//3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意*
consumer.subscribe("topic1","*");
//4.開啟監(jiān)聽,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者
}
});
//5.啟動(dòng)接收消息的服務(wù)
consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行
System.out.println("接收消息服務(wù)已運(yùn)行");
}
}
測(cè)試:


5.2廣播模式
生產(chǎn)者的代碼不變,消費(fèi)者的代碼改動(dòng)如下:
//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡)
consumer.setMessageModel(MessageModel.CLUSTERING);
//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式為廣播模式:所有客戶端接收的消息是一樣的
consumer.setMessageModel(MessageModel.BROADCASTING);
具體消費(fèi)者代碼:
//消費(fèi)者,消費(fèi)消息
public class Consumer {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建一個(gè)接收消息的對(duì)象Consumer
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
//2.設(shè)定接收的命名服務(wù)器地址
consumer.setNamesrvAddr("192.168.23.127:9876");
//3.設(shè)置接收消息對(duì)應(yīng)的topic,對(duì)應(yīng)的sub標(biāo)簽為任意*
consumer.subscribe("topic1","*");
//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式(默認(rèn)模式:負(fù)載均衡)
//consumer.setMessageModel(MessageModel.CLUSTERING);
//設(shè)置當(dāng)前消費(fèi)者的消費(fèi)模式為廣播模式:所有客戶端接收的消息是一樣的
consumer.setMessageModel(MessageModel.BROADCASTING);
//4.開啟監(jiān)聽,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
//System.out.println("收到消息:"+msg);
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功處理, mq 收到這個(gè) 標(biāo)記后相同的消息講不會(huì)再次發(fā)給消費(fèi)者
}
});
//5.啟動(dòng)接收消息的服務(wù)
consumer.start();// 開啟多線程 監(jiān)控消息,持續(xù)運(yùn)行
System.out.println("接收消息服務(wù)已運(yùn)行");
}
}測(cè)試:

廣播模式的現(xiàn)象
如果 生產(chǎn)者先發(fā)送消息, 后啟動(dòng)消費(fèi)者, 消息只能被消費(fèi)一次
如果多個(gè)消費(fèi)者先啟動(dòng)(廣播模式),后發(fā)消息,才有廣播的效果
結(jié)論: 必須先啟動(dòng)消費(fèi)者再啟動(dòng)發(fā)送者才有廣播的效果
6 多生產(chǎn)者多消費(fèi)者模式
多生產(chǎn)者產(chǎn)生的消息可以被同一個(gè)消費(fèi)者消費(fèi),也可以被多個(gè)消費(fèi)者消費(fèi)
運(yùn)行多個(gè)生產(chǎn)者,在啟動(dòng)消費(fèi)者
測(cè)試:

到此這篇關(guān)于RocketMQ生產(chǎn)消息與消費(fèi)消息超詳細(xì)講解的文章就介紹到這了,更多相關(guān)RocketMQ生產(chǎn)消息與消費(fèi)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Hibernate validator使用以及自定義校驗(yàn)器注解
這篇文章主要介紹了Hibernate validator使用以及自定義校驗(yàn)器注解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01
Java字節(jié)緩存流的構(gòu)造方法之文件IO流
這篇文章主要介紹了Java字節(jié)緩存流的構(gòu)造方法之文件IO流,同時(shí)也介紹了字符流中的一些相關(guān)的內(nèi)容,并且通過大量的案例供大家理解。最后通過一些經(jīng)典的案例幫助大家對(duì)前面所學(xué)的知識(shí)做了一個(gè)綜合的應(yīng)用,需要的朋友可以參考一下2022-04-04
springboot項(xiàng)目idea熱部署的教程詳解
這篇文章主要介紹了springboot項(xiàng)目idea熱部署,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08
spring學(xué)習(xí)之@SessionAttributes實(shí)例解析
這篇文章主要介紹了spring學(xué)習(xí)之@SessionAttributes實(shí)例解析,分享了相關(guān)代碼示例,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-02-02

