kafka并發(fā)寫大消息異常TimeoutException排查記錄
前言
先簡單介紹下我們的使用場景,線上5臺Broker節(jié)點的kafka承接了所有binlog訂閱的數據,用于Flink組件接收數據做數據中臺的原始數據。昨兒開發(fā)反饋,線上的binlog大量報錯,都是kafka的異常,而且都是同一條topic拋的錯,特征也很明顯,發(fā)送的消息體非常大,主觀判斷肯定是寫入大消息導致的超時了,異常詳情如下:
thread: kafka-producer-network-thread | producer-1 throwable: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for BIN-LOG-DATA-center-dmz2-TABLE-kk-data-center-ods_contract_finance_info-0 due to 56352 ms has passed since last append
定位異常點
應用拋一個不常見的異常,一般操作是先去百度or谷歌搜索一番的,就上面這個timeout超時的異常,搜索引擎的結果都是producer連不上Borker導致的問題,根本不是我們這個場景的,所以其次我們就需要從源碼中尋找答案了。博主使用的開發(fā)工具是IDEA,借助IDEA很容易定位到異常拋出點。首先定位TimeoutException異常類,然后按住ctrl鍵,點擊這個類,會出現如下圖所有拋TimeoutException異常的點,然后根據異常message內容,尋找相匹配的點擊進去就是拋異常的地方了,如圖,紅色箭頭所指即代碼位置:

分析拋異常的邏輯
程序中的異常,一定是符合某些條件才會拋出的,想要解決異常,只要讓運行時的環(huán)境不滿足拋異常的條件即可,下面就是拋異常的代碼:
boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
boolean expired = expiryErrorMessage != null;
if (expired)
abortRecordAppends();
return expired;
}可以看到,我們的異常是在第一個邏輯判斷時候就滿足了所以拋異常了。在此處有可能會拋出三個不同的timeout異常,用中文語義翻譯條件分別是:
- 沒設置重試,并且發(fā)送批次(batch.size)滿了,并且配置請求超時時間(request.timeout.ms)小于【當前時間減去最后追加批次的時間】
- 沒設置重試,并且配置請求超時時間(request.timeout.ms)小于【創(chuàng)建批次時間減去配置的等待發(fā)送的時間(linger.ms)】
- 設置重試,并且配置請求超時時間(request.timeout.ms)小于【當前時間-最后重試時間-重試需要等待的時間(retry.backoff.ms)】
上面括號中的參數就是kafka producer中配置的相關的參數,這些參數都沒有重新設置過,batch.size默認是10kb大小,而引發(fā)報錯的消息都是36kb的大小,默認的request.timeout.ms超時設置是30s,所以在這個判斷可能過期了的方法中,引發(fā)我們異常的主要原因是batch.size和request.timeout.ms的參數設置問題了。
真實原因-解決方案
從上面代碼看表面原因是參數設置不夠了,實際上呢,博主使用kafka-test啟動了五個Borker集群做復現驗證測試,測試寫入相同的36kb的message,在所有配置也保持默認的情況下,也根本毫無壓力。后面查找相關的錯誤日志,發(fā)現所有的TimeoutException集中在幾乎同一時刻,經查明,是因為業(yè)務批量導入了數據到mysql中,造成binlog消息突然增加,高并發(fā)的往kafka寫大消息導致Borker處理不過來,造成的TimeoutException超時,所以真正解決問題也可以從兩個方面入手:
- 服務端:增加Borker,并設置多個TopicPartition,平攤寫入壓力,這個是根本的解決問題
- 客戶端:加大request.timeout.ms、batch.size參數,或者開啟消息重試,這種方案治標不治本,但是也能大概率的減少因為此類場景導致的TimeoutException
結語
異常不可怕,所有異常都是人為拋的,都是有既定的觸發(fā)條件的,只要定位到觸發(fā)異常的條件對癥下藥即可解決問題。不過博主五年來的經驗發(fā)現,日志打印真的是門藝術,在這個方面,Spring框架和Dubbo以及Apollo配置中心框架就是日志打印的典范,不管發(fā)生什么異常,日志里都會輸出詳細的上下文環(huán)境,異常的原因,建議的解決方法,如果涉及到相關的配置,也會打印該怎么配置最好。反觀kafka client的這條TimeoutException就顯的信息量有點過少了,如果能把相關的配置信息和排查的方向寫明會更好。最后安利一波kafka test,輕松搭建多Borker的kafka集群,一個注解就ok了。詳情參考我的這篇博文《深入研究spring boot集成kafka之spring-kafka底層原理》
以上就是kafka并發(fā)寫大消息異常TimeoutException排查記錄的詳細內容,更多關于kafka并發(fā)異常TimeoutException排查的資料請關注腳本之家其它相關文章!
相關文章
Springboot 整合 Java DL4J 實現智能客服功能
本文主要介紹了如何使用SpringBoot整合JavaDeeplearning4j來構建一個智能客服系統,詳細探討了神經網絡選擇、數據集格式、技術介紹、Maven依賴、代碼示例等內容,為構建高效、便捷、個性化的客戶服務提供了理論支持和實踐指導2024-10-10
解決Springboot @WebFilter攔截器未生效問題
這篇文章主要介紹了解決Springboot @WebFilter攔截器未生效問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10
Mybatis Mybatis-Plus傳入多個參數的處理方式
這篇文章主要介紹了Mybatis Mybatis-Plus傳入多個參數的處理方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05
java通過ssh連接執(zhí)行shell命令,文件傳輸方式
這篇文章主要介紹了java通過ssh連接執(zhí)行shell命令,文件傳輸方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08

