SkyWalking?自定義插件(Spring?RabbitMQ)具體分析過(guò)程
SkyWalking 自定義插件(Spring RabbitMQ) 官方
RabbitMQ插件問(wèn)題
skywalking官方提供的RabbitMQ插件存在缺陷,其只針對(duì)RabbitMQ官方原生Client實(shí)現(xiàn)擴(kuò)展,但我們?cè)陧?xiàng)目中一般不直接使用原生Client,而是使用Spring RabitMQ Client,因Spring RabitMQ Consumer中存在跨線程操作,導(dǎo)致跟蹤ID斷鏈。
具體分析過(guò)程
1.官方插件源碼的攔截點(diǎn)是原生Consumer的handleDelivery方法,源碼如下:

2.而Spring RabbitMQ消費(fèi)者的默認(rèn)實(shí)現(xiàn)是BlockingQueueConsumer, handleDelivery核心邏輯是把消息放到內(nèi)部的BlockingQueue隊(duì)列,不做真正的消費(fèi)處理,因此攔截此處無(wú)法關(guān)聯(lián)到消費(fèi)者邏輯,源碼如下
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
...
try {
if (BlockingQueueConsumer.this.abortStarted > 0) {
if (!BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queueName),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
Channel channelToClose = super.getChannel();
RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
// Defensive - should never happen
BlockingQueueConsumer.this.queue.clear();
if (!this.canceled) {
RabbitUtils.cancel(channelToClose, consumerTag);
}
try {
channelToClose.close();
catch (@SuppressWarnings("unused") TimeoutException e) {
// no-op
}
}
else {
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
catch (Exception e) {
BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
}3.真正的消費(fèi)處理在SimpleMessageListenerContainer,SimpleMessageListenerContainer繼承Runnable接口,在其run方法中while循環(huán)調(diào)用mainLoop方法,整體調(diào)用鏈路為
4.SimpleMessageListenerContainer.run() -> SimpleMessageListenerContainer.mainLoop() -> SimpleMessageListenerContainer.receiveAndExecute() -> SimpleMessageListenerContainer.doReceiveAndExecute() -> AbstractMessageListenerContainer.executeListener()。最終在executeListener中執(zhí)行消費(fèi)邏輯
protected void executeListener(Channel channel, Object data) {
...
try {
// 執(zhí)行消費(fèi)邏輯
doExecuteListener(channel, data);
if (sample != null) {
this.micrometerHolder.success(sample, data instanceof Message
? ((Message) data).getMessageProperties().getConsumerQueue()
: queuesAsListString());
}
}
catch (RuntimeException ex) {
....
}
}實(shí)現(xiàn)自定義插件
從上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的攔截點(diǎn)
實(shí)現(xiàn)源碼已放到碼云倉(cāng)庫(kù):https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/
效果展示
SkyWalking調(diào)用鏈路

logback日志

到此這篇關(guān)于SkyWalking 自定義插件(Spring RabbitMQ)的文章就介紹到這了,更多相關(guān)SkyWalking 自定義插件內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
nacos在liunx系統(tǒng)中啟動(dòng)成功瀏覽器卻訪問(wèn)不了的解決方法
在linux下搭建nacos,現(xiàn)在想要啟動(dòng),訪問(wèn)nacos頁(yè)面,訪問(wèn)不了,所以本文小編將給大家介紹nacos在liunx系統(tǒng)中啟動(dòng)成功,瀏覽器卻訪問(wèn)不了?全面的解決辦法,需要的朋友可以參考下2023-09-09
基于JavaSwing+mysql開發(fā)一個(gè)學(xué)生社團(tuán)管理系統(tǒng)設(shè)計(jì)和實(shí)現(xiàn)
項(xiàng)目使用Java swing+mysql開發(fā),可實(shí)現(xiàn)基礎(chǔ)數(shù)據(jù)維護(hù)、用戶登錄注冊(cè)、社團(tuán)信息列表查看、社團(tuán)信息添加、社團(tuán)信息修改、社團(tuán)信息刪除以及退出注銷等功能、界面設(shè)計(jì)比較簡(jiǎn)單易學(xué)、適合作為Java課設(shè)設(shè)計(jì)以及學(xué)習(xí)技術(shù)使用,需要的朋友參考下吧2021-08-08
通過(guò)Docker啟動(dòng)Solace并在Spring?Boot通過(guò)JMS整合Solace的操作方法
本文將介紹如何在Spring中使用,雖然代碼使用的是Spring Boot,但并沒有使用相關(guān)starter,跟Spring的整合一樣,可通用,JMS是通過(guò)的消息處理框架,可以深入學(xué)習(xí)一下,不同的MQ在JMS的整合上都是類似的,感興趣的朋友跟隨小編一起看看吧2023-01-01
Java 中的 getDeclaredMethods() 方法(使用與原理)
文章介紹了Java反射機(jī)制中的`getDeclaredMethods()`方法,詳細(xì)講解了其使用方法、原理、注意事項(xiàng)以及實(shí)際應(yīng)用場(chǎng)景,幫助讀者更好地理解和應(yīng)用這一強(qiáng)大的工具,感興趣的朋友一起看看吧2024-12-12
SpringSecurity rememberme功能實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了SpringSecurity rememberme功能實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03

