8張圖帶你全面了解Java?kafka的核心機制
kafka基礎(chǔ)架構(gòu)
現(xiàn)在假如有100T大小的消息要發(fā)送到kafka中,數(shù)據(jù)量非常大,一臺機器存儲不下,面對這種情況,你該如何設(shè)計呢?
很簡單,分而治之,一臺不夠,那就多臺,這就形成了一個kafka集群。如下圖所示,一個broker就是一個kafka節(jié)點,100T數(shù)據(jù)就有3個節(jié)點分擔(dān),每個節(jié)點約33T,這樣就能解決問題了,還能提高吞吐量。

- Topic: 可以理解為一個隊列,一個kafka集群中可以定義很多的topic,比如上圖中的
topicA。 - Partition: 為了實現(xiàn)擴展性,提高吞吐量,一個非常大的
topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。比如上圖中的topicA被分成了3個partition。 - Replica: 副本,如果數(shù)據(jù)只放在一個
broker中,萬一這個broker宕機了怎么辦?為了實現(xiàn)高可用,一個topic的每個分區(qū)都有若干個副本,一個Leader和若干個Follower。比如上圖中的虛線連接的就是它的副本。 - Leader: 每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費者消費數(shù)據(jù)的對象都是
Leader。 - Follower: 每個分區(qū)多個副本中的“從”,實時從
Leader中同步數(shù)據(jù),保持和Leader數(shù)據(jù)的同步。Leader發(fā)生故障時,某個Follower會成為新的Leader。 - Producer: 消息生產(chǎn)者,就是向
Kafka broker發(fā)消息的客戶端,后面詳細講解。 - Consumer: 消息消費者,向
Kafka broker取消息的客戶端,多個Consumer會組成一個消費者組,后面詳細講解。 - Zookeeper:用來記錄kafka中的一些元數(shù)據(jù),比如kafka集群中的broker,leader是誰等等,但
Kafka2.8.0版本以后也支持非zk的方式,大大減少了和zk的交互。
kafka生產(chǎn)者流程
前面通過一張圖片講解了kafka整體的架構(gòu),那現(xiàn)在我們來看看kafka生產(chǎn)者發(fā)送的整個過程,這里面也是大有文章。
在消息發(fā)送的過程中,涉及到了兩個線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個雙端隊列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker。

- 在主線程中由
kafkaProducer創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator, 也稱為消息收集器)中。
- 攔截器: 可以用來在消息發(fā)送前做一些準備工作,比如按照某個規(guī)則過濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計類工作。
- 序列化器: 用于在網(wǎng)絡(luò)傳輸中將數(shù)據(jù)序列化為字節(jié)流進行傳輸,保證數(shù)據(jù)不會丟失。
- 分區(qū)器: 用于按照一定的規(guī)則將數(shù)據(jù)分發(fā)到不同的kafka broker節(jié)點中
Sender線程負責(zé)從RecordAccumulator獲取消息并將其發(fā)送到Kafka中。
RecordAccumulator主要用來緩存消息以便Sender線程可以批量發(fā)送,進而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。
RecordAccumulator緩存的大小可以通過生產(chǎn)者客戶端參數(shù)buffer.memory配置,默認值為33554432B,即32M。
- 主線程中發(fā)送過來的消息都會被迫加到
RecordAccumulator的某個雙端隊列(Deque)中,RecordAccumulator內(nèi)部為每個分區(qū)都維護了一個雙端隊列,即Deque<ProducerBatch>, 消息寫入緩存時,追加到雙端隊列的尾部。 Sender讀取消息時,從雙端隊列的頭部讀取。ProducerBatch是指一個消息批次;與此同時,會將較小的ProducerBatch湊成一個較大ProducerBatch,也可以減少網(wǎng)絡(luò)請求的次數(shù)以提升整體的吞吐量。ProducerBatch大小可以通過batch.size控制,默認16kb。Sender線程會在有數(shù)據(jù)積累到batch.size,默認16kb,或者如果數(shù)據(jù)遲遲未達到batch.size,Sender線程等待linger.ms設(shè)置的時間到了之后就會獲取數(shù)據(jù)。linger.ms單位ms,默認值是0ms,表示沒有延遲。
Sender從RecordAccumulator獲取緩存的消息之后,會將數(shù)據(jù)封裝成網(wǎng)絡(luò)請求<Node,Request>的形式,這樣就可以將Request請求發(fā)往各個Node了。- 請求在從
sender線程發(fā)往Kafka之前還會保存到InFlightRequests中,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到服務(wù)端響應(yīng)的請求。InFlightRequests默認每個分區(qū)下最多緩存5個請求,可以通過配置參數(shù)為max.in.flight.request.per. connection修改。 - 請求
Request通過通道Selector發(fā)送到kafka節(jié)點。 - 發(fā)送后,需要等待kafka的應(yīng)答機制,取決于配置項
acks.
- 0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等待數(shù)據(jù)落盤就應(yīng)答。
- 1:生產(chǎn)者發(fā)送過來的數(shù)據(jù),
Leader收到數(shù)據(jù)后應(yīng)答。 - -1(all):生產(chǎn)者發(fā)送過來的數(shù)據(jù),Leader和副本節(jié)點收齊數(shù)據(jù)后應(yīng)答。默認值是-1,-1 和all 是等價的。
Request請求接受到kafka的響應(yīng)結(jié)果,如果成功的話,從InFlightRequests清除請求,否則的話需要進行重發(fā)操作,可以通過配置項retries決定,當(dāng)消息發(fā)送出現(xiàn)錯誤的時候,系統(tǒng)會重發(fā)消息。retries表示重試次數(shù)。默認是 int 最大值,2147483647。- 清理消息累加器
RecordAccumulator中的數(shù)據(jù)。
kafka消費者流程
原來kafka生產(chǎn)者發(fā)送經(jīng)過了這么多流程,我們現(xiàn)在來看看kafka消費者又是如何進行的呢?
Kafka 中的消費是基于拉取模式的。消息的消費一般有兩種模式:推送模式和拉取模式。推模式是服務(wù)端主動將消息推送給消費者,而拉模式是消費者主動向服務(wù)端發(fā)起請求來拉取消息。
kafka是以消費者組進行消費的,一個消費者組,由多個consumer組成。形成一個消費者組的條件,是所有消費者的groupid相同。

- 消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費者消費。如果向消費組中添加更多的消費者,超過主題分區(qū)數(shù)量,則有一部分消費者就會閑置,不會接收任何消息。
- 消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。
那么問題來了,kafka是如何指定消費者組的每個消費者消費哪個分區(qū)?每次消費的數(shù)量是多少呢?
一、如何制定消費方案

- 消費者consumerA,consumerB, consumerC向kafka集群中的協(xié)調(diào)器
coordinator發(fā)送JoinGroup的請求。coordinator主要是用來輔助實現(xiàn)消費者組的初始化和分區(qū)的分配。
coordinator老大節(jié)點選擇 =groupid的hashcode值 % 50(__consumer_offsets內(nèi)置主題位移的分區(qū)數(shù)量)例如:groupid的hashcode值 為1,1% 50 = 1,那么__consumer_offsets主題的1號分區(qū),在哪個broker上,就選擇這個節(jié)點的coordinator作為這個消費者組的老大。消費者組下的所有的消費者提交offset的時候就往這個分區(qū)去提交offset。
- 選出一個
consumer作為消費中的leader,比如上圖中的ConsumerB。 - 消費者
leader制定出消費方案,比如誰來消費哪個分區(qū)等 - 把消費方案發(fā)給
coordinator - 最后
coordinator就把消費方 案下發(fā)給各個consumer, 圖中只畫了一條線,實際上是有下發(fā)各個consumer。
注意,每個消費者都會和coordinator保持心跳(默認3s),一旦超時(session.timeout.ms=45s),該消費者會被移除,并觸發(fā)再平衡;或者消費者處理消息的時間過長(max.poll.interval.ms=5分鐘),也會觸發(fā)再平衡,也就是重新進行上面的流程。
二、消費者消費細節(jié)
現(xiàn)在已經(jīng)初始化消費者組信息,知道哪個消費者消費哪個分區(qū),接著我們來看看消費者細節(jié)。

- 消費者創(chuàng)建一個網(wǎng)絡(luò)連接客戶端
ConsumerNetworkClient, 發(fā)送消費請求,可以進行如下配置:
fetch.min.bytes: 每批次最小抓取大小,默認1字節(jié)fetch.max.bytes: 每批次最大抓取大小,默認50Mfetch.max.wait.ms:最大超時時間,默認500ms
- 發(fā)送請求到kafka集群
- 成功的回調(diào),會將數(shù)據(jù)保存到
completedFetches隊列中 - 消費者從隊列中抓取數(shù)據(jù),根據(jù)配置
max.poll.records一次拉取數(shù)據(jù)返回消息的最大條數(shù),默認500條。 - 獲取到數(shù)據(jù)后,需要經(jīng)過反序列化器、攔截器等。
kafka的存儲機制
我們都知道消息發(fā)送到kafka,最終是存儲到磁盤中的,我們看下kafka是如何存儲的。

一個topic分為多個partition,每個partition對應(yīng)于一個log文件,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機制,每個partition分為多個segment。每個segment包括:“.index”文件、“.log”文件和.timeindex等文件,Producer生產(chǎn)的數(shù)據(jù)會被不斷追加到該log文件末端。

上圖中t1即為一個topic的名稱,而“t1-0/t1-1”則表明這個目錄是t1這個topic的哪個partition。

kafka中的索引文件以稀疏索引(sparseindex)的方式構(gòu)造消息的索引,如下圖所示:

1.根據(jù)目標offset定位segment文件
2.找到小于等于目標offset的最大offset對應(yīng)的索引項
3.定位到log文件
4.向下遍歷找到目標Record
注意:index為稀疏索引,大約每往log文件寫入4kb數(shù)據(jù),會往index文件寫入一條索引。通過參數(shù)log.index.interval.bytes控制,默認4kb。
那kafka中磁盤文件保存多久呢?
kafka 中默認的日志保存時間為 7 天,可以通過調(diào)整如下參數(shù)修改保存時間。
log.retention.hours,最低優(yōu)先級小時,默認 7 天。log.retention.minutes,分鐘。log.retention.ms,最高優(yōu)先級毫秒。log.retention.check.interval.ms,負責(zé)設(shè)置檢查周期,默認 5 分鐘。
總結(jié)
其實kafka中的細節(jié)十分多,本文也只是對kafka的一些核心機制從理論層面做了一個總結(jié),更多的細節(jié)還是需要自行去實踐,去學(xué)習(xí)。
以上就是8張圖帶你全面了解Java kafka的核心機制的詳細內(nèi)容,更多關(guān)于Java kafka核心機制的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot引入遠程nacos配置文件錯誤的解決方案
本文為解決Spring Cloud Alibaba中配置導(dǎo)入問題,提供了詳細的步驟說明,包括引入依賴、配置nacos、創(chuàng)建bootstrap.yml文件以及測試配置導(dǎo)入是否成功的方法,幫助開發(fā)者快速解決相關(guān)問題2024-09-09
Mybatis之通用Mapper動態(tài)表名及其原理分析
這篇文章主要介紹了Mybatis之通用Mapper動態(tài)表名及其原理分析,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08
SpringBoot中的@ApiModelProperty注解作用
這篇文章主要介紹了SpringBoot中的@ApiModelProperty注解作用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教。2022-01-01
解決@RequestBody使用不能class類型匹配的問題
這篇文章主要介紹了解決@RequestBody使用不能class類型匹配的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07

