解決springboot集成rocketmq關(guān)于tag的坑
springboot集成rocketmq關(guān)于tag的坑
新項(xiàng)目使用springboot的若依框架集成rocketmq,選擇集成RocketMQTemplate這種方式實(shí)現(xiàn)消息的發(fā)送和接收。
1.客戶端發(fā)送代碼
此處回調(diào)方法里有些業(yè)務(wù)不用關(guān)注,只關(guān)心發(fā)送方法
@Component
public class RocketMqHelper {
Logger logger = LoggerFactory.getLogger(RocketMqHelper.class);
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send(ReqMsg msg){
rocketMQTemplate.asyncSend(msg.getMsg().getTopic()+":"+msg.getMsg().getTags(),
msg.getMsg(),
new SendCallback(){
@Override
public void onSuccess(SendResult sendResult) {
logger.debug("msgid:{} 發(fā)送成功" , sendResult.getMsgId());
logger.debug("發(fā)送mq成功后要執(zhí)行的service:
{}",msg.getMsg().getSendAfterMethod());
IsaveSendAfterMqLog saveSendAfterMqLog =
SpringUtils.getBean(msg.getMsg().getSendAfterMethod());
saveSendAfterMqLog.saveSendAfterMqLog(new
SendAfterLog(msg.getMsg(),sendResult,"0"));
}
@Override
public void onException(Throwable throwable) {
logger.error("mq發(fā)送異常!{}",throwable.toString());
logger.debug("發(fā)送mq失敗后執(zhí)行的service:
{}",msg.getMsg().getSendAfterMethod());
//異常描述截取500 length入庫
msg.getMsg().putUserProperty("exceptionDesc",throwable.toString());
IsaveSendAfterMqLog saveSendAfterMqLog =
SpringUtils.getBean(msg.getMsg().getSendAfterMethod());
saveSendAfterMqLog.saveSendAfterMqLog(new
SendAfterLog(msg.getMsg(),"1"));
}
});
}
}
2.服務(wù)端監(jiān)聽消息
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}",
selectorExpression="${rocketmq.tags}")
public class CbiRocketmqConsumer implements RocketMQListener<CbiMsg> {
Logger logger = LoggerFactory.getLogger(CbiRocketmqConsumer.class);
@Override
public void onMessage(CbiMsg message) {
String msgBody = new String(message.getBody());
String serviceName = message.getTags();
logger.info("本次消費(fèi)服務(wù)名稱:{}",serviceName);
AbSaveReceiveAfter saveReceiveAfter = SpringUtils.getBean(serviceName);
saveReceiveAfter.saveReceiveAfter(new RecevieAfterLog(message,
Constants.CONSUME_SUCCESS));//默認(rèn)消費(fèi)成功
}
}
@RocketMQMessageListener這個(gè)注解里selectorExpression默認(rèn)是*,接收topic下全部消息。想動(dòng)態(tài)對(duì)tags進(jìn)行配置。于是利用springboot獲取yml配置。寫死的時(shí)候沒有問題,但是改成$表達(dá)式配置后怎么都收不到消息,經(jīng)排查居然是selectorExpression這個(gè)不支持配置,會(huì)原封的按表達(dá)式進(jìn)入MQ容器初始化。然而注解里面的topic,comsumerGroup都可以正常拿到配置值。
翻源碼發(fā)現(xiàn)問題所在,項(xiàng)目啟動(dòng)時(shí),在ListenerContainerConfiguration在這個(gè)類里初始化mq容器時(shí),對(duì)配置進(jìn)行賦值
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new
DefaultRocketMQListenerContainer();
container.setNameServer(rocketMQProperties.getNameServer());
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
container.setConsumerGroup(environment.resolvePlaceholders
(annotation.consumerGroup()));
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener) bean);
container.setObjectMapper(objectMapper);
return container;
}
topic和comsumerGroup都在springboot環(huán)境里獲取配置值了,唯獨(dú)selectorExpression這個(gè)沒有,直接默認(rèn)注解里的。下面的問題就是需要自己在項(xiàng)目啟動(dòng),springboot容器起來,但是rocketmq容器未起的時(shí)候,動(dòng)態(tài)去改注解里配置的值。然后讓Rocketmq啟動(dòng)。
**
* 因?yàn)镽ocketMQMessageListener不提供動(dòng)態(tài)配置功能
* springboot初始化后rocket容器初始化前利用反射動(dòng)態(tài)改變
* RocketMQMessageListener注解selectorExpression的值
*
*
*/
@Component
public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean {
@Value("${rocketmq.consumer.tags}")
private String tags;
@Override
public void afterPropertiesSet() throws Exception {
RocketMQMessageListener annoTable =
CbiRocketmqConsumer.class.getAnnotation(RocketMQMessageListener.class);
// 獲取代理處理器
InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable);
// 獲取私有 memberValues 屬性
Field f = invocationHandler.getClass().getDeclaredField("memberValues");
f.setAccessible(true);
// 獲取實(shí)例的屬性map
Map<String, Object> memberValues = (Map<String, Object>)
f.get(invocationHandler);
// 修改屬性值
memberValues.put("selectorExpression", tags);
}
}
問題解決。。
SpringBoot集成RocketMQ及報(bào)錯(cuò)處理
項(xiàng)目場景:
說明:springBoot集成RocketMQ開發(fā)
環(huán)境:阿里云+Centos8+RocketMQ+SpringBoot+Docker
啟動(dòng):docker start rmqserver rmqbroker[因?yàn)镽ocketMQ安裝在Docket容器中,所以這樣啟動(dòng)]
服務(wù)器broker.conf配置信息:
brokerIP1=外網(wǎng)ip namesrvAddr=外網(wǎng)ip:9876 brokerName=broker_tanhua autoCreateTopicEnable=true
說明:
1.brokerIP1 當(dāng)前broker監(jiān)聽的IP
2.Broker是RocketMq的核心,負(fù)責(zé)消息的傳遞(提供者=》消費(fèi)者)以及消息的持久化存儲(chǔ),消息的HA機(jī)制以及服務(wù)器過濾功能。
3.autoCreateTopicEnable:自動(dòng)創(chuàng)建Topic路由
問題一描述:
我第一次配置時(shí),broker.conf配置文件中沒有配置autoCreateTopicEnable,因此在程序運(yùn)行時(shí)會(huì)提示沒有路由信息:No route info of this topic: tanhua-sso-login
我發(fā)送消息路由名字是tanhua-sso-login

錯(cuò)誤信息:
No route info of this topic: tanhua-sso-login
錯(cuò)誤信息截圖:我沒有截圖網(wǎng)上找了一個(gè),差不多

解決方式:
我當(dāng)時(shí)也在網(wǎng)上找了很多,有在啟動(dòng)時(shí)添加自動(dòng)創(chuàng)建的也有說防火墻開啟的原因,但是我感覺會(huì)這個(gè)的話應(yīng)該都知道關(guān)防火墻。
在啟動(dòng)時(shí)添加自動(dòng)創(chuàng)建可能也好使,但是我沒試過,因?yàn)槲以谒阉鲿r(shí)發(fā)現(xiàn)問題統(tǒng)一指向說沒有自動(dòng)創(chuàng)建,因此我想的是直接在配置文件中進(jìn)行修改,然后重啟
解決方式:
在broker.conf配置文件中添加如下配置:
autoCreateTopicEnable=true
SpringBoot集成信息:
application.properties:
# RocketMQ相關(guān)配置 rocketmq.nameServer=外網(wǎng)IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
【注】:這里配置的開通沒有spring,我之前加spring怎么也連接不上
pom.xml:
<!--RocketMQ相關(guān)-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.5.1</version>
</dependency>
問題二描述:
我在修改上面的錯(cuò)誤后,緊接著又報(bào)
錯(cuò)誤信息:
RemotingTooMuchRequestException: sendDefaultImpl call timeout
錯(cuò)誤信息截圖:也是沒有截圖網(wǎng)上找了一個(gè),差不多

思路:錯(cuò)誤信息中提示call timeout,timeout一般想到到時(shí)連接或響應(yīng)超時(shí),因此在網(wǎng)上找到的是在發(fā)送MQ時(shí)出錯(cuò),網(wǎng)上解決方案是:修改Mq配置文件中的sendMsgTimeout,因此想到修改可以修改SpringBoot連接MQ時(shí)的配置設(shè)置
解決方案:添加rocketmq.producer.send-message-timeout= 6000
說明:給大一點(diǎn)發(fā)送信息超時(shí)時(shí)間。
說明:同時(shí)在SpringBoot集成RoctetMQ配置中沒有sendMsgTimeout因此用rocketmq=>輸入'.'=>輸入sendtimeout=>查看有哪些關(guān)于這個(gè)的配置。
完整配置:
# RocketMQ相關(guān)配置 rocketmq.nameServer=外網(wǎng)IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
基于Springboot實(shí)現(xiàn)JWT認(rèn)證的示例代碼
本文主要介紹了基于Springboot實(shí)現(xiàn)JWT認(rèn)證,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11
Maven依賴junit?@Test報(bào)錯(cuò)的解決方案
這篇文章主要介紹了Maven依賴junit?@Test報(bào)錯(cuò)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
基于SpringMVC中的路徑參數(shù)和URL參數(shù)實(shí)例
這篇文章主要介紹了基于SpringMVC中的路徑參數(shù)和URL參數(shù)實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-02-02
IDEA源碼修改器JarEditor使用(反編譯-打包一步到位)
JarEditor是一個(gè)IDEA插件,用于修改jar包中的類文件,它允許用戶在不解壓jar包的情況下,直接在IDEA中編輯和修改類文件的源碼,修改完成后,可以一鍵編譯并生成新的jar包,替換原jar包2025-01-01
java讀取JSON文件的多種實(shí)現(xiàn)方式
這篇文章主要介紹了java讀取JSON文件的多種實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
如何獲取springboot打成jar后的classpath
這篇文章主要介紹了如何獲取springboot打成jar后的classpath問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
Mybatis配置映射文件中parameterType的用法講解
這篇文章主要介紹了Mybatis配置映射文件中parameterType的用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09

