Spring?Kafka中如何通過參數(shù)配置解決超時(shí)問題詳解
背景
這是我們團(tuán)隊(duì)負(fù)責(zé)的一個(gè)不太核心的服務(wù)。之前與外部交互時(shí)應(yīng)外部要求由普通kafka集群改成加密kafka集群。我們是數(shù)據(jù)生產(chǎn)端。

改的過程中并跑上線,60%的請(qǐng)求耗時(shí)增加了2倍,也還是在百毫秒的量級(jí)可以接受。但是每次重啟的第一個(gè)請(qǐng)求要5s以上,會(huì)超過;運(yùn)行過程中,一兩個(gè)月也會(huì)有一次超時(shí)。因?yàn)槲覀冇腥沃卦嚕w沒有影響成功率。
上線的時(shí)候我們問過網(wǎng)絡(luò)組,還專門請(qǐng)教過公司專業(yè)負(fù)責(zé)kafka的團(tuán)隊(duì)。結(jié)論是:第一,這個(gè)慢是外部交互方的問題,不是咱們這邊可以處理的;第二,參數(shù)上也沒有什么可以調(diào)優(yōu)的。
我們團(tuán)隊(duì)內(nèi)部還是不信邪,調(diào)了幾個(gè)參數(shù),加測(cè)之后上線了。頻繁度降到了現(xiàn)在的一兩個(gè)月一次超時(shí),但是沒有根治。因?yàn)楸旧磉@個(gè)服務(wù)不是特別核心,本身外部是允許有一定失敗率的,而且現(xiàn)在實(shí)際上也沒有失敗,幾年內(nèi)業(yè)務(wù)量也是很平穩(wěn)的:1分鐘4筆。
而我上班時(shí)間的狀態(tài)基本上是我站在兩個(gè)人中間,我目的是想問一個(gè)人問題,結(jié)果卻先要回答另外一個(gè)人的問題,這時(shí)候還會(huì)出現(xiàn)第四個(gè)人說別的事。這個(gè)優(yōu)先級(jí)排不上。但是心疼開發(fā)小哥哥,每一兩個(gè)月就要處理一下因?yàn)檫@件事引起的告警。雖然實(shí)際不影響,告警出來了,我們就要排查核對(duì)是否還是這個(gè)問題,并且確實(shí)通過重試將消息推送出去了。
所以本次利用周末,希望可以根治這個(gè)疑難雜癥,減少運(yùn)維成本。
思路
前期已經(jīng)明確了這個(gè)外部的加密集群建立連接和數(shù)據(jù)傳輸速度都慢于之前的普通集群。之所以第一次慢和每一兩個(gè)月會(huì)慢一次都是連接斷開重連造成的。之前我們進(jìn)行過參數(shù)調(diào)優(yōu),調(diào)優(yōu)做的就是因?yàn)?分鐘4筆請(qǐng)求,線上以最小部署單元3臺(tái)機(jī)器部署,每臺(tái)機(jī)器1分鐘預(yù)計(jì)處理一筆請(qǐng)求。根據(jù)這個(gè)數(shù)據(jù)調(diào)整了空閑自動(dòng)斷開連接的時(shí)間間隔,保證連接不會(huì)因?yàn)榭臻e自動(dòng)斷開。線上驗(yàn)證有效,也側(cè)面證實(shí)了是連接過程慢引起的超時(shí)。
因?yàn)榻⑦B接過程慢,這個(gè)主要是外部提供的集群就是如此。既然目前并不影響實(shí)際發(fā)送成功率。人家代表的是大佬,我們也不好太強(qiáng)硬的去推他們解決。所以我的思路有兩個(gè):
第一,探索將建立連接與發(fā)送數(shù)據(jù)分離的可行性:程序啟動(dòng)后先將連接建立好再提供服務(wù)。如果生產(chǎn)端是這樣實(shí)現(xiàn)的。那也許還可以進(jìn)行連接自動(dòng)探測(cè),如果連接斷開則自動(dòng)重連,不要等發(fā)送數(shù)據(jù)時(shí)再發(fā)現(xiàn)連接已斷開。
第二,其實(shí)第一種思路的可行性渺茫,只是需要驗(yàn)證一下自己的想法。一般的這種消息中間件,消費(fèi)端是這樣實(shí)現(xiàn)的。但是生產(chǎn)端采用了更簡(jiǎn)單的方式:讀寫數(shù)據(jù)的時(shí)候再探測(cè)連接是否可用,不可用則重新建立連接。這種用在發(fā)送本來就是異步的,對(duì)發(fā)送延遲本身敏感度也不高的場(chǎng)景。生產(chǎn)端本來就是這種場(chǎng)景,并且通過測(cè)試實(shí)際上也確實(shí)是在發(fā)送時(shí)建立的第一次連接。kafka生產(chǎn)端原本就是這種設(shè)計(jì)的可能性極大。如果是這種情況,那就在生產(chǎn)端真正使用異步,給調(diào)用方返回“受理成功”,保證調(diào)用方不超時(shí)。自己再通過接受回調(diào)保證實(shí)際的成功。
這個(gè)事情真要做,還有兩個(gè)隱形需求:
1、因?yàn)橥獠坑行枨?,?shù)據(jù)可以偶爾少發(fā),但是不能重復(fù)發(fā)送。所以不能使用業(yè)務(wù)級(jí)別的數(shù)據(jù)發(fā)送來實(shí)現(xiàn)探測(cè)功能。重試也要保證上條確實(shí)沒有收到。
2、改造不能太大,研發(fā)成本要小。
過程
因?yàn)槲以诰W(wǎng)上搜到的這方面都是入門級(jí),沒有什么解決這個(gè)問題的相關(guān)資料。所以采用的主要方法是讀源碼和官方文檔。當(dāng)然,本文的方法是有前提知識(shí)儲(chǔ)備基礎(chǔ)的。就是《白話TCP/IP原理》系列的相關(guān)內(nèi)容:https://mp.weixin.qq.com/s/Y2k3AW2ZjWbB1w63gsSRag
步驟一,查詢版本特性
我們目前用到的kafka客戶端版本是
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>spring-kafka對(duì)應(yīng)的官網(wǎng)的大版本是2.5,所以先點(diǎn)開了2.5.17.RELEASE對(duì)應(yīng)的參考文檔??吹揭痪溆杏眯畔ⅲ?/p>
The default consumer and producer factories can now invoke a callback whenever a consumer or producer is created or closed.
默認(rèn)消費(fèi)者和生產(chǎn)者工廠現(xiàn)在已經(jīng)可以在生產(chǎn)者和消費(fèi)者創(chuàng)建和關(guān)閉時(shí)引發(fā)一個(gè)回調(diào)。耗時(shí)的連接建立過程是可以監(jiān)聽的,我們可以通過打日志進(jìn)行監(jiān)控。
步驟二,查源碼

首先我們看一下類圖,看不清楚沒有關(guān)系??催@里就好:
首先發(fā)現(xiàn)Producer、Consumer和Sender都是通過KafkaClient(接口),也就是NetworkClient(實(shí)現(xiàn)類)進(jìn)行網(wǎng)絡(luò)活動(dòng)的。其次發(fā)現(xiàn)NetworkClient是在傳輸層和應(yīng)用層之間起了一個(gè)緩沖的作用,解耦了各個(gè)部件。
Producer、Consumer和AdminClient主要管理requests;NetworkClient主要管理connection;Selector主要管理sockets channel。這些被管理對(duì)象我在之前的網(wǎng)絡(luò)系列里都講過。
如果不看代碼,我站在設(shè)計(jì)者角度結(jié)合類圖猜想:生產(chǎn)端實(shí)際使用的是KafkaTemplate的send方法,具體的參數(shù)都是由DefaultKafkaProducerFactory接收。實(shí)際上連接的建立是Producer類進(jìn)行。而在Producer類依賴于NetworkClient。而實(shí)際上進(jìn)行連接應(yīng)該在Sender類。Sender是一個(gè)Runnable異步線程來做,那實(shí)際建立連接的是run方法中。
我跟蹤源碼驗(yàn)證了猜想。NetworkClient里有個(gè)initiateConnect的私有方法,是建立連接用的,跟蹤它就可以知道調(diào)用的地方。跟蹤下來,主要入口在NetworkClient的poll方法,注釋如下:
/**
* Do actual reads and writes to sockets.
*
* @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
* must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
* metadata timeout
* @param now The current time in milliseconds
* @return The list of responses received
*/
@Override
public List<ClientResponse> poll(long timeout, long now) {人家明確說了是讀寫時(shí)才會(huì)調(diào)用。證實(shí)了思路一不可行。
步驟三,查自身的代碼
按照思路二,進(jìn)行異步化。本身生產(chǎn)端就應(yīng)該是異步的,為什么異步?jīng)]有生效呢?結(jié)合KafkaTemplate的send方法源代碼和項(xiàng)目中自己寫的代碼。異步部分大體是這樣:
SettableListenableFuture future = new SettableListenableFuture();
future.set("OK");
future.get();
future.addCallback((sendResult) -> {
try {
System.out.println("成功");
} catch (Exception e) {
}
}, r -> {
System.out.println("失敗");
});
System.out.println("============end==============");就是說KafkaTemplate的異步是靠使用SettableListenableFuture實(shí)現(xiàn)的,實(shí)際上它的set方法會(huì)馬上觸發(fā)callback,是同步的。代碼是先同步調(diào)用set,并且還手動(dòng)調(diào)用了get(這個(gè)方法會(huì)等待直到返回結(jié)果)。所以整體是同步的?;蛘咧苯舆@么看,future實(shí)現(xiàn)異步要有一個(gè)Callable或者Runnable的線程方法,人家SettableListenableFuture第一行源碼就禁用了Callable。這個(gè)我看了2.5.17.RELEASE這個(gè)更高版本的spring-kafka,實(shí)現(xiàn)沒有做更改。
也就是說spring-kafka自身起碼在2.5.X版本里異步?jīng)]有起到作用。
問題清楚了修改也很簡(jiǎn)單,比如可以加個(gè)異步注解將整個(gè)發(fā)送方法做異步,重試等邏輯也放到這個(gè)方法中。給調(diào)用方只返回受理成功。具體怎么解決交給開發(fā)小哥哥。
總結(jié)
幸虧我上周已經(jīng)提前規(guī)劃好周一要休假。否則現(xiàn)在都2點(diǎn)半了明天上班也沒精神。主要時(shí)間花在異步不生效的問題上。其實(shí)排查異步不生效的思路是很簡(jiǎn)單清晰的。耗時(shí)長(zhǎng)是因?yàn)椋旱谝?,不敢相信spring官方實(shí)現(xiàn)的,竟然使用異步的代碼實(shí)際效果沒有異步;第二,關(guān)于異步我在網(wǎng)上搜索了一下,都是按照項(xiàng)目中配置的那樣。官方這樣說,大家這樣說,我總得考慮是不是自己搞錯(cuò)了。
所以我反復(fù)的驗(yàn)證、反復(fù)的debug之后也不敢下結(jié)論。仔細(xì)研究了源碼仍然不敢下結(jié)論。直到終于搜索到一篇文章說要實(shí)現(xiàn)異步除了要使用addCallback之外還要加異步標(biāo)簽。人間清醒的我,馬上意識(shí)到文章實(shí)際用了兩種不同方法實(shí)現(xiàn)異步。作者之所以認(rèn)為這是一個(gè)方法的兩個(gè)部分大概也是發(fā)現(xiàn)其實(shí)spring-kafka的異步?jīng)]好使吧。
到此這篇關(guān)于Spring Kafka中如何通過參數(shù)配置解決超時(shí)問題的文章就介紹到這了,更多相關(guān)Spring Kafka參數(shù)配置解決超時(shí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JAVA中StackOverflowError錯(cuò)誤的解決
這篇文章主要介紹了JAVA中StackOverflowError錯(cuò)誤的解決,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
Java微信公眾平臺(tái)開發(fā)(11) 微信三大平臺(tái)的關(guān)聯(lián)
這篇文章主要介紹了Java微信公眾平臺(tái)開發(fā)第十一步,微信開發(fā)中微信公眾平臺(tái)、開放平臺(tái)和商戶平臺(tái)的關(guān)聯(lián),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04
java實(shí)現(xiàn)socket從服務(wù)器連續(xù)獲取消息的示例
這篇文章主要介紹了java實(shí)現(xiàn)socket從服務(wù)器連續(xù)獲取消息的示例,需要的朋友可以參考下2014-04-04
Java如何通過反射方式生成數(shù)據(jù)庫實(shí)體類
這篇文章主要介紹了Java如何通過反射方式生成數(shù)據(jù)庫實(shí)體類問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
MyBatis-Flex實(shí)現(xiàn)分頁查詢的示例代碼
在MyBatis-Flex中實(shí)現(xiàn)分頁查詢時(shí),需要注意維護(hù)一個(gè)獲取數(shù)據(jù)庫總數(shù)的方法,詳細(xì)介紹了UserService、UserServiceImpl類以及Mapper.xml配置,感興趣的可以了解一下2024-10-10
通過jstack分析解決進(jìn)程死鎖問題實(shí)例代碼
這篇文章主要介紹了通過jstack分析解決進(jìn)程死鎖問題實(shí)例代碼,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01
Mybatis配置錯(cuò)誤:java.lang.ExceptionInInitializerError
這篇文章主要介紹了Mybatis配置錯(cuò)誤:java.lang.ExceptionInInitializerError的相關(guān)資料,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12
Java?C++題解leetcode消失的兩個(gè)數(shù)字實(shí)例
這篇文章主要介紹了Java?C++題解leetcode消失的兩個(gè)數(shù)字實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
Java PriorityQueue優(yōu)點(diǎn)和缺點(diǎn)面試精講
這篇文章主要為大家介紹了Java面試中PriorityQueue的優(yōu)點(diǎn)和缺點(diǎn)及使用注意詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
Springboot實(shí)現(xiàn)從controller中跳轉(zhuǎn)到指定前端頁面
Springboot實(shí)現(xiàn)從controller中跳轉(zhuǎn)到指定前端頁面方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-10-10

