kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)的解決
kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)
發(fā)送端

接收端

問(wèn)題
采用內(nèi)置的zookeeper,發(fā)送端發(fā)送數(shù)據(jù),接收端能夠接收數(shù)據(jù)
但是采用外置的zookeeper,發(fā)送端發(fā)送數(shù)據(jù),接收端一直接收不到數(shù)據(jù)
解決
先判斷主題是否一致,如果一致就在關(guān)閉kafka
./kafka-server-stop.sh ../config/server.properties
修改一下配置,確保這些配置已加上,不要用localhost,在listeners的ip地址和端口號(hào)要和消費(fèi)者,生產(chǎn)者的的地址端口號(hào)一直
vim ../config/server.propertiesst3


最后把log.dirs后面的文件刪除或者重新?lián)Q個(gè)地址
rm -rf /tmp/kafka

重新在前臺(tái)啟動(dòng)kafka,注意查看打印在桌面的日志,有無(wú)報(bào)錯(cuò)信息
./kafka-server-start.sh ../config/server.properties
如果沒(méi)有報(bào)錯(cuò)信息,啟動(dòng)正常,那么就可以在后臺(tái)啟動(dòng)了
./kafka-server-start.sh -daemon ../config/server.properties
創(chuàng)建生產(chǎn)者
./kafka-console-producer.sh --broker-list 172.16.193.175:9092 --topic test3
創(chuàng)建消費(fèi)者
./kafka-console-consumer.sh --bootstrap-server 172.16.193.175:9092 --topic test3 --from-beginning
關(guān)于kafka-console-consumer.sh消費(fèi)者的一些思考
(人物設(shè)定初步了解kafka的我)
我司現(xiàn)在有三臺(tái)kafka服務(wù)器作為一個(gè)集群
需求是我寫了一個(gè)監(jiān)聽器去監(jiān)聽活動(dòng)失敗的情況,如果活動(dòng)失敗則調(diào)用一個(gè)統(tǒng)計(jì)接口 做數(shù)據(jù)統(tǒng)計(jì)
我需要從失敗事件的隨路數(shù)據(jù)中取一些數(shù)據(jù),做一些判斷.
現(xiàn)在我想從集群中看一下失敗事件中的隨路數(shù)據(jù)是否完整正確
于是,我xshell連接上了三臺(tái)服務(wù)器并且運(yùn)行以下命令
./kafka-console-consumer.sh --bootstrap-server broker1IP:9092 --topic topicname ? ./kafka-console-consumer.sh --bootstrap-server broker2IP:9092 --topic topicname ? ./kafka-console-consumer.sh --bootstrap-server broker3IP:9092 --topic topicname
發(fā)現(xiàn)只要發(fā)送一個(gè)事件三個(gè)服務(wù)器都可以收到事件中的消息
怪了,為什么三臺(tái)都會(huì)顯示.
我第一反應(yīng)是:這是否是傳說(shuō)中的leader和follower 同步策略
我問(wèn)了一下我的leader ,
leader:.....,你知道你這個(gè)命令是什么意思嗎?
這個(gè)命令就是相當(dāng)于創(chuàng)建了一個(gè)消費(fèi)者去消費(fèi)了隊(duì)列中的消息!
你這個(gè)3個(gè)服務(wù)器相當(dāng)于啟動(dòng)了3個(gè)消費(fèi)者去消費(fèi)了,同一個(gè)消息三次!
我:不對(duì)啊,同一個(gè)消息不能被消費(fèi)三次啊!?
leader:........,你知道什么是消費(fèi)者組嗎?你這相當(dāng)于三個(gè)消費(fèi)者組 不信你看看
./kafka-console-consumer.sh -help ? ... ? --group <String: consumer group id> ? ? ?The consumer group id of the consumer.? ? ...
看到了么?這里可以指定消費(fèi)者組,你不指定他就默認(rèn)是一個(gè)新的消費(fèi)者組
我: 牛啊!
kafka-console-consumer.sh相關(guān)知識(shí)拓展
kafka-console-consumer.sh腳本是一個(gè)簡(jiǎn)易的消費(fèi)者控制臺(tái)。該 shell 腳本的功能通過(guò)調(diào)用 kafka.tools 包下的 ConsoleConsumer 類,并將提供的命令行參數(shù)全部傳給該類實(shí)現(xiàn)。
./kafka-console-consumer.sh --bootstrap-server node:9092 --topic topicName //表示從 latest 位移位置開始消費(fèi)該主題的所有分區(qū)消息,即僅消費(fèi)正在寫入的消息。 bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName //?表示從指定主題中有效的起始位移位置開始消費(fèi)所有分區(qū)的消息。 bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName //?消費(fèi)出的消息結(jié)果將打印出消息體的 key 和 value。
| 參數(shù) | 值類型 | 說(shuō)明 | 有效值 |
|---|---|---|---|
| --topic | string | 被消費(fèi)的topic | |
| --whitelist | string | 正則表達(dá)式,指定要包含以供使用的主題的白名單 | |
| --partition | integer | 指定分區(qū) 除非指定’–offset’,否則從分區(qū)結(jié)束(latest)開始消費(fèi) | |
| --offset | string | 執(zhí)行消費(fèi)的起始o(jì)ffset位置 默認(rèn)值:latest | |
| --from-beginning | 從存在的最早消息開始,而不是從最新消息開始 | ||
| --max-messages | integer | 消費(fèi)的最大數(shù)據(jù)量,若不指定,則持續(xù)消費(fèi)下去 | |
| --timeout-ms | integer | 在指定時(shí)間間隔內(nèi)沒(méi)有消息可用時(shí)退出 | |
| --bootstrap-server | string | 必需(除非使用舊版本的消費(fèi)者),要連接的服務(wù)器 | |
| --key-deserializer | string | ||
| --value-deserializer | string | ||
| --group | string | 指定消費(fèi)者所屬組的ID | |
| --zookeeper | string | 必需(僅當(dāng)使用舊的使用者時(shí))連接zookeeper的字符串。 可以給出多個(gè)URL以允許故障轉(zhuǎn)移 |
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
IntelliJ IDEA自定義代碼提示模板Live Templates的圖文教程
這篇文章主要介紹了IntelliJ IDEA自定義代碼提示模板Live Templates,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
SpringBoot服務(wù)設(shè)置禁止server.point端口的使用
本文主要介紹了SpringBoot服務(wù)設(shè)置禁止server.point端口的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-01-01
eclipse的web項(xiàng)目實(shí)現(xiàn)Javaweb購(gòu)物車的方法
這篇文章主要介紹了eclipse的web項(xiàng)目實(shí)現(xiàn)Javaweb購(gòu)物車的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10
關(guān)于ArrayList的動(dòng)態(tài)擴(kuò)容機(jī)制解讀
這篇文章主要介紹了關(guān)于ArrayList的動(dòng)態(tài)擴(kuò)容機(jī)制解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10
如何解決getReader() has already been called&
這篇文章主要介紹了如何解決getReader() has already been called for this request問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
SpringBoot基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)定時(shí)任務(wù)過(guò)程解析
這篇文章主要介紹了SpringBoot基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)定時(shí)任務(wù)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
如何實(shí)現(xiàn)Spring?Event(異步事件)
這篇文章主要介紹了如何實(shí)現(xiàn)Spring?Event(異步事件)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
Java設(shè)計(jì)模式之工廠方法模式實(shí)例簡(jiǎn)析
這篇文章主要介紹了Java設(shè)計(jì)模式之工廠方法模式,較為詳細(xì)的分析了Java工廠模式的功能、定義并給出了實(shí)例代碼加以總結(jié)分析,需要的朋友可以參考下2015-11-11

