RabbitMQ實(shí)現(xiàn)Work Queue工作隊(duì)列的示例詳解
RabbitMQ Work Queue工作隊(duì)列

工作隊(duì)列(又稱任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。
相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù)。

多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會被一個(gè)消費(fèi)者處理。
但是對于工作隊(duì)列,可以提高消息的處理速度,避免隊(duì)列中的消息堆積。
我們以一個(gè)例子來解釋work queue工作隊(duì)列。在生產(chǎn)者的服務(wù)中添加測試方法,通過循環(huán)的方式,向名為simple.queue隊(duì)列中發(fā)送50條消息,代碼和詳細(xì)描述如下:
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName="simple.queue";//隊(duì)列名稱
String message = "hello, message_";//發(fā)送的消息
for (int i=1;i<=50;i++){
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
}在消費(fèi)者的服務(wù)模塊中,定義兩個(gè)消息監(jiān)聽,分別為listenSimpleQueue1和listenSimpleQueue2,讓它們都監(jiān)聽simple.queue隊(duì)列,并且設(shè)置休眠時(shí)間,使得消費(fèi)者1每秒處理50條消息,消費(fèi)者2每秒處理10條消息。
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Component
public class SpringRabbitListener {
@RabbitListener(queues="simple.queue")
public void listenSimpleQueue1(String msg) throws InterruptedException {
System.out.println("消費(fèi)者1已經(jīng)接收到simple.queue的消息:[" + msg + "]"+ LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues="simple.queue")
public void listenSimpleQueue2(String msg) throws InterruptedException {
System.err.println("消費(fèi)者2已經(jīng)接收到simple.queue的消息:[" + msg + "]"+LocalTime.now());
Thread.sleep(200);
}
}
消費(fèi)者的application.yaml文件,設(shè)置消費(fèi)者每次只能獲取一條消息,生產(chǎn)者和消費(fèi)者的配置文件相似。
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 192.168.220.13*
port: 5672
username: user
password: ******
virtual-host: /
Listener:
simple:
prefetch: 1 #每次只能獲取一條消息,處理完成才能獲取下一條消息 控制消費(fèi)者預(yù)取消息的上限
處理完成后,運(yùn)行項(xiàng)目,可以得到消費(fèi)者1和消費(fèi)者2都能消費(fèi)消息,并且可以根據(jù)休眠時(shí)間有序進(jìn)行工作。

到此這篇關(guān)于RabbitMQ實(shí)現(xiàn)Work Queue工作隊(duì)列的示例詳解的文章就介紹到這了,更多相關(guān)RabbitMQ Work Queue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java加載properties文件的六種方法總結(jié)
這篇文章主要介紹了java加載properties文件的六種方法總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-05-05
JAVA基于Slack實(shí)現(xiàn)異常日志報(bào)警詳解
這篇文章主要為大家介紹了JAVA基于Slack實(shí)現(xiàn)異常日志報(bào)警詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
Java Spring的數(shù)據(jù)庫開發(fā)詳解
這篇文章主要介紹了Spring的數(shù)據(jù)庫開發(fā),主要圍繞SpringJDBC和Spring Jdbc Template兩個(gè)技術(shù)來講解,文中有詳細(xì)的代碼示例,需要的小伙伴可以參考一下2023-04-04
IDEA之IDEA連接gitlab協(xié)同開發(fā)方式
通過IDEA克隆GitLab項(xiàng)目實(shí)現(xiàn)代碼協(xié)同開發(fā)相較于使用SourceTree,?通過IDEA連接GitLab進(jìn)行代碼協(xié)同開發(fā)更顯便捷,方法包括通過VersionControl創(chuàng)建新項(xiàng)目,輸入項(xiàng)目的git?HTTP地址和本地路徑,測試連接成功后克隆項(xiàng)目,修改代碼后2024-11-11

