springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法
如何保證mq隊(duì)列里的消息只被測(cè)試服務(wù)器上的consumer消費(fèi),避免本地環(huán)境誤消費(fèi)?
程序里有一個(gè)應(yīng)用場(chǎng)景使用到了rabbitmq——當(dāng)財(cái)務(wù)確認(rèn)收到企業(yè)的打款金額后,系統(tǒng)會(huì)把企業(yè)訂單生成用戶付款單。由于訂單記錄數(shù)據(jù)量大,改為通過mq來異步實(shí)現(xiàn)。即財(cái)務(wù)確認(rèn)收款操作后,將企業(yè)訂單數(shù)據(jù)放入mq,另一端監(jiān)聽mq消息隊(duì)列,將收到的企業(yè)訂單加工轉(zhuǎn)換成用戶付款單,并做持久化。

本地開發(fā)環(huán)境與測(cè)試環(huán)境共用一套rabbitmq。當(dāng)項(xiàng)目部署到測(cè)試環(huán)境后,QA測(cè)試過程中,總是“莫名其妙”的發(fā)現(xiàn)所保存的用戶付款單數(shù)據(jù)有問題。
當(dāng)然,首先要排查程序,檢查Consumer的數(shù)據(jù)處理的邏輯是否有bug。單元測(cè)試后,發(fā)現(xiàn)并不存在測(cè)試環(huán)境的bug。
原來,消息隊(duì)列被“非正?!毕M(fèi)了!
Q: 什么情況?
A: 幾個(gè)伙伴一起參與的項(xiàng)目,大家總是要調(diào)試自己的程序的。而如果碰巧本地程序監(jiān)聽到消息隊(duì)列里有消息,那么,消息就被本地程序消費(fèi)掉了。問題正是出現(xiàn)在這里!————團(tuán)隊(duì)開發(fā),大家并不會(huì)及時(shí)檢出git上最新的程序版本。如果本地的程序版本不是最新的正確的版本,勢(shì)必會(huì)出現(xiàn)bug。
那么,怎么辦?
每次你改了邏輯,告訴大家獲取最新?
不現(xiàn)實(shí),約定的東西往往不奏效的。
如何保證mq隊(duì)列里的消息只被測(cè)試服務(wù)器上的consumer消費(fèi),避免本地環(huán)境誤消費(fèi)? 或者說,如何實(shí)現(xiàn)消息的定向消費(fèi)呢?
只要肯琢磨,辦法總比困難多!百思可得解!
我們知道,rabbitmq手動(dòng)ack模式。這還不夠,因?yàn)槲覀冊(cè)趺醋宑onsumer來決定是否消費(fèi)呢? 所以,我們需要一個(gè)標(biāo)識(shí)————producer設(shè)定一個(gè)標(biāo)識(shí),consumer如果匹配這個(gè)標(biāo)識(shí),則消費(fèi),否則予以reject放回消息隊(duì)列。

通過查看spring-rabbit/spring-amqp的代碼,發(fā)現(xiàn)可以在spring-amqp里的MessageProperties上做文章。生產(chǎn)者與消費(fèi)者每次消息傳輸都會(huì)攜帶一個(gè)MessageProperties,通常我們是不指定的,走M(jìn)essageProperties的默認(rèn)設(shè)置值。
我的策略:MessageProperties有一個(gè)屬性叫AppId。我們程序所部署的測(cè)試機(jī)器就一臺(tái),即消息Producer和消息Consumer在一臺(tái)機(jī)器上。那么,我就可以利用機(jī)器的IP來識(shí)別消息。只有Producer與Consumer的IP匹配,才消費(fèi)消息。程序員本機(jī)IP與測(cè)試服務(wù)器IP不一樣,就會(huì)拒絕接收消息,會(huì)把消息重新放回消息隊(duì)列,等待測(cè)試服務(wù)器的Consumer消費(fèi)。
話不多說,上代碼吧,
生產(chǎn)者代碼:
package com.sboot.mq;
import org.junit.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetAddress;
import java.util.UUID;
public class MQProducerTest extends BaseTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test() throws Exception {
for (int i = 1; i <= 5; i++) {
MessageProperties messageProperties = new MessageProperties();
String ip = InetAddress.getLocalHost().getHostAddress();
messageProperties.setAppId(ip);
// messageProperties.setUserId(String.valueOf(i));
MessageConverter messageConverter = new SimpleMessageConverter();
String msg = UUID.randomUUID().toString();
// System.out.println(msg);
Message message1 = messageConverter.toMessage(msg, messageProperties);
rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);
System.out.println("入隊(duì)完成");
Thread.sleep(500L);
}
}
}
消費(fèi)者手動(dòng)ACK,要實(shí)現(xiàn)ChannelAwareMessageListener接口,感知rabbitmq.client.Channel實(shí)例,調(diào)用channel的basicAck、basicReject等方法:
package com.sboot.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
@Component
@Profile(value = "dev")
@Slf4j
public class UserSettlementDevConsumer implements ChannelAwareMessageListener {
@RabbitHandler
@RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());
long tag = message.getMessageProperties().getDeliveryTag();
String appId = message.getMessageProperties().getAppId();
log.info("{}-{}, 消息出隊(duì)", tag, appId);
String receiveMsg = "";
try {
//核對(duì)標(biāo)識(shí),決定是否消費(fèi)消息
String ip = InetAddress.getLocalHost().getHostAddress();
if (!ip.equals(appId)) {
log.info("這不是我需要的消息。放回隊(duì)列。{}", receiveMsg);
// channel.basicNack(tag, false, true);
channel.basicReject(tag, true);
// channel.basicRecover(true);
return;
}
MessageConverter messageConverter = new SimpleMessageConverter();
receiveMsg = String.valueOf(messageConverter.fromMessage(message));
。。。。在這里消費(fèi)消息
log.info("success " + receiveMsg);
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("receive message has an error, ", e);
channel.basicNack(tag, false, true);
}
}
}
說明一下依賴的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解沒有ackMode。
解決本案問題過程中的花絮:

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

@RabbitListener的ackMode的值見枚舉org.springframework.amqp.core.AcknowledgeMode
NONE-- no acks(自動(dòng)消費(fèi) autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手動(dòng)消費(fèi),Consumer端必須顯式調(diào)用ack或nack)AUTO --

設(shè)置了手動(dòng)消費(fèi),上文消費(fèi)端的deliveryTag會(huì)是不同的long值。自動(dòng)消費(fèi)的deliveryTag是重復(fù)的1和2這樣的。并且,自動(dòng)消費(fèi)時(shí),如果要使用channel的ack/nack,會(huì)報(bào)異常:
2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)
到此這篇關(guān)于springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的文章就介紹到這了,更多相關(guān)springboot rabbitmq消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java8中stream和functional interface的配合使用詳解
這篇文章主要給大家介紹了關(guān)于Java8中stream和functional interface配合使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java8具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起看看吧。2017-11-11
java 教你如何給你的頭像添加一個(gè)好看的國(guó)旗
這篇文章主要介紹了java 教你如何給你的頭像添加一個(gè)好看的國(guó)旗,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
Reactor中的onErrorContinue?和?onErrorResume
這篇文章主要介紹了Reactor中的onErrorContinue?和?onErrorResume,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的朋友可以參考一下2022-09-09
Java獲取接口的所有實(shí)現(xiàn)類方法總結(jié)示例
這篇文章主要給大家介紹了關(guān)于Java獲取接口的所有實(shí)現(xiàn)類方法的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2024-06-06
java實(shí)現(xiàn)全局監(jiān)聽鍵盤詳解
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)全局監(jiān)聽鍵盤的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解下2024-01-01
Java生成二維碼的兩種實(shí)現(xiàn)方式(基于Spring?Boot)
這篇文章主要給大家介紹了關(guān)于Java生成二維碼的兩種實(shí)現(xiàn)方式,文中的代碼基于Spring?Boot,本文基于JAVA環(huán)境,以SpringBoot框架為基礎(chǔ)開發(fā),文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07
springboot 設(shè)置CorsFilter跨域不生效的解決
這篇文章主要介紹了springboot 設(shè)置CorsFilter跨域不生效的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11

