kafka-console-consumer.sh使用2次grep管道無法提取消息的解決
問題
在使用kafka自帶腳本消費(fèi)時(shí),想用grep關(guān)鍵詞過濾出來想要的信息
./kafka-console-consumer.sh \ --bootstrap-server node1:9092,node2:9092,node3:9092 \ --topic test \ --from-beginning \ --consumer-property group.id=gaofeng_test\ | grep "b786aba6f3a6" | grep 388aabd
但是發(fā)現(xiàn)grep命令在這里有問題,無法取到想要消息

已經(jīng)消費(fèi)到latest了,都沒有提取到想要的msg
這里確認(rèn)了producer端已經(jīng)發(fā)送成功了,回調(diào)函數(shù)里都沒有任何異常
懷疑
懷疑是否kafka存在丟數(shù)據(jù)的情況
排查問題后發(fā)現(xiàn)kafka集群一切正常,性能也很強(qiáng)勁
并且在存儲消息的日志文件中也已經(jīng)查詢到了我想要的消息了,為什么就是消費(fèi)不出來呢
分析
將kafka的這個(gè)主題 --from-beginning 消費(fèi)到文件topic.log中去
對topic.log這個(gè)靜態(tài)文件進(jìn)行g(shù)rep “b786aba6f3a6” | grep 388aabd抽取,發(fā)現(xiàn)取到了自己想要的這條消息
kafka消費(fèi)時(shí)結(jié)果是動態(tài)的,grep這里用法不對,只有對靜態(tài)的文件可以抽取到數(shù)據(jù)
--line-buffered
Force output to be line buffered. By default, output is line buffered when standard output is
a terminal and block buffered otherwise
上面的意思是
- 強(qiáng)制輸出結(jié)果使用行緩沖
- 默認(rèn)情況下,如果標(biāo)準(zhǔn)輸入時(shí)終端,則使用line bufferred
- 否則,使用塊緩沖,(默認(rèn)的大小為4096 bytes,因系統(tǒng)和配置而異)
所以,這也就解釋了為什么雙重grep過濾沒有內(nèi)容,因?yàn)闆]有達(dá)到塊緩沖限制。
驗(yàn)證
生產(chǎn)消息
a,b,c

消費(fèi)時(shí)兩次grep則無消息

消費(fèi)時(shí)一次grep則都出來

結(jié)論
- 剛剛
/kafka-console-consumer.sh第一次grep過濾出來的的消息小于4k了,所以傳給第二次grep是不可以直接輸出出來的,因?yàn)樵诰彌_區(qū) - 如果第一次grep出來的數(shù)據(jù)大于4k,傳給第二個(gè)grep時(shí)則不會緩沖著,第二個(gè)grep收到就立刻傳入到terminal輸出了
- 反之第一次grep過濾出來的的消息大于還是小于4k,一旦后面沒有管道符號,直接輸出給terminal的話,則遵循line bufferred規(guī)則立馬輸出
解決
使用如下命令即可
./kafka-console-consumer.sh \ --bootstrap-server node1:9092,node2:9092,node3:9092 \ --topic test \ --from-beginning \ --consumer-property group.id=gaofeng_test\ | grep --line-buffered "b786aba6f3a6" | grep 388aabd
【注意】
這里用了grep 的 --line-buffered參數(shù)
具體用法
查閱http://www.dhdzp.com/article/277299.htm
最后
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
解決MybatisPlus批量插入數(shù)據(jù)報(bào)錯:Error getting generated 
在使用MybatisPlus進(jìn)行批量插入數(shù)據(jù)時(shí)遇到空指針異常錯誤,分析原因是由于主鍵生成策略導(dǎo)致的,嘗試通過設(shè)置useGeneratedKeys屬性解決問題,但因批量插入方法限制,該方法未能成功,最終通過自定義mapper方法實(shí)現(xiàn)批量插入,解決了問題2024-09-09
使用Java和SNMP4J實(shí)現(xiàn)SNMP操作完整代碼
這篇文章主要介紹了如何使用Java和SNMP4J庫進(jìn)行SNMP操作,包括初始化SNMP、創(chuàng)建目標(biāo)、創(chuàng)建PDU、發(fā)送SNMP請求和處理響應(yīng)等內(nèi)容,通過編寫SnmpUtil類,展示了完整的SNMP操作流程,需要的朋友可以參考下2024-12-12
IDEA安裝阿里巴巴編碼規(guī)范插件的兩種方式詳解(在線安裝和離線安裝)
這篇文章主要介紹了IDEA安裝阿里巴巴編碼規(guī)范插件的兩種方式詳解(在線安裝和離線安裝),本文通過截圖給大家展示的非常詳細(xì),需要的朋友可以參考下2021-09-09
idea創(chuàng)建springboot項(xiàng)目,Application.java不能運(yùn)行問題及解決
這篇文章主要介紹了idea創(chuàng)建springboot項(xiàng)目,Application.java不能運(yùn)行問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11
如何正確控制springboot中bean的加載順序小結(jié)篇
這篇文章主要介紹了如何正確控制springboot中bean的加載順序總結(jié),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07

