RocketMQ消息隊(duì)列實(shí)現(xiàn)隨機(jī)消息發(fā)送當(dāng)做七夕禮物
正文
都在過情人節(jié),前端的小哥哥們給女朋友畫個(gè)頁面,美美的,寫個(gè)chrome插件,好看的,俺們后端程序員咋辦。
我給媳婦寫首詩,哈哈

我決定,把想對(duì)媳婦說的,今天發(fā)送到一個(gè)MQ里邊,然后在七夕當(dāng)天,打開消費(fèi)者,將這一段話給俺媳婦看。你看,這就是我好久前對(duì)你說的話,這就是我們后端程序員的浪漫。當(dāng)然也可以多發(fā)送幾個(gè),到時(shí)候跟根據(jù)topic控制到底發(fā)什么,哈哈。
這里首先得用順序消息,當(dāng)然,消息過期時(shí)間得設(shè)置的長一點(diǎn)。
1 下載并啟動(dòng)RocketMQ
點(diǎn)擊下載,這是個(gè)windows版本的。
下載完成解壓后長這樣:

然后后還需要配置環(huán)境變量

這個(gè)時(shí)候就可以進(jìn)入到RocketMQ的bin目錄啟動(dòng)MQ了
1.1 首先啟動(dòng)name server
start mqnamesrv.cmd

1.2 然后啟動(dòng)Broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

這個(gè)時(shí)候就啟動(dòng)成功了
2 生產(chǎn)者
需要注意的是,消息必須是順序消息,不然發(fā)送的消息順序就亂了。一首情詩順序亂了,讀不下去,豈不是很尷尬。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class RocketMQOrderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
//讀取文件
List<String> messages = getMessages();
for (int i = 0; i < messages.size(); i++) {
String body = messages.get(i);
Message msg = new Message("topic_luke", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = ((Integer)arg).longValue();
long index = id % mqs.size();
return mqs.get((int) index);
}
}, i);
}
producer.shutdown();
}
static List<String> getMessages() throws IOException {
String temp = null;
File f = new File("E:\Code\online-taxi-three\demo\luke.txt");
InputStreamReader read = new InputStreamReader(new FileInputStream(f));
ArrayList readList = new ArrayList();
BufferedReader reader = new BufferedReader(read);
while ((temp = reader.readLine()) != null && !"".equals(temp)) {
readList.add(temp);
}
return readList;
}
}
3 消費(fèi)者
這里需要注意的是setConsumeThreadMax和setConsumeThreadMin都需要設(shè)置成1,單線程取消息這樣就可以通過sleep控制消息的顯示速度,不然并發(fā)取消息就很快顯示完了。不夠唯美。
import lombok.SneakyThrows;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RockerMQConsumer {
public static void main(String[] args) throws Exception {
//實(shí)例化消息消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
//指定nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setPullBatchSize(1);
//訂閱topic
consumer.subscribe("topic_luke","*");
// 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
TimeUnit.SECONDS.sleep(3);
}
// 標(biāo)記該消息已經(jīng)被成功消費(fèi)
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動(dòng)消費(fèi)者實(shí)例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
發(fā)送的內(nèi)容在這里自由編寫哈,路徑和文件名能對(duì)上就行,謝謝觀看,最近突發(fā)奇想,把技術(shù)編成故事講出來,會(huì)不會(huì)比較受歡迎呢。
以上就是RocketMQ消息隊(duì)列實(shí)現(xiàn)隨機(jī)消息發(fā)送當(dāng)做七夕禮物的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息隊(duì)列隨機(jī)消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot項(xiàng)目中使用Groovy腳本的示例代碼
本文主要介紹了SpringBoot項(xiàng)目中使用Groovy腳本的示例代碼,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
從ElasticSearch中刪除數(shù)據(jù)的幾種常見方式
這篇文章主要給大家介紹了關(guān)于從ElasticSearch中刪除數(shù)據(jù)的幾種常見方式,在Elasticsearch中刪除數(shù)據(jù)可以通過刪除索引或刪除文檔兩種方式實(shí)現(xiàn),需要的朋友可以參考下2024-10-10
IOC?容器啟動(dòng)和Bean實(shí)例化兩個(gè)階段詳解
這篇文章主要為大家介紹了IOC?容器啟動(dòng)和Bean實(shí)例化兩個(gè)階段詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
Java中將字符串String轉(zhuǎn)換為整數(shù)int的多種方法
在Java中將String類型轉(zhuǎn)換為int類型是一個(gè)常見的操作,下面這篇文章主要給大家介紹了關(guān)于Java中將字符串String轉(zhuǎn)換為整數(shù)int的多種方法,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-07-07
利用Java Apache POI 生成Word文檔示例代碼
本篇文章主要介紹了利用Java Apache POI 生成Word文檔示例代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05
SpringBoot配置SSL同時(shí)支持http和https訪問實(shí)現(xiàn)
本文主要介紹了SpringBoot配置SSL同時(shí)支持http和https訪問實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
spring boot thymeleaf 圖片上傳web項(xiàng)目根目錄操作步驟
這篇文章主要介紹了spring boot thymeleaf 圖片上傳web項(xiàng)目根目錄步驟,本文給大家提到了thymeleaf的基礎(chǔ)知識(shí),需要的朋友可以參考下2018-03-03
在springboot文件中如何創(chuàng)建mapper.xml文件
這篇文章主要介紹了在springboot文件中如何創(chuàng)建mapper.xml文件問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
基于java實(shí)現(xiàn)簡單發(fā)紅包功能
這篇文章主要為大家詳細(xì)介紹了基于java實(shí)現(xiàn)簡單發(fā)紅包功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-11-11

