kafka的消息存儲機(jī)制和原理分析
消息的保存路徑
消息發(fā)送端發(fā)送消息到 broker 上以后,消息是如何持久化的?
數(shù)據(jù)分片
kafka 使用日志文件的方式來保存生產(chǎn)者和發(fā)送者的消息,每條消息都有一個(gè) offset 值來表示它在分區(qū)中的偏移量。
Kafka 中存儲的一般都是海量的消息數(shù)據(jù),為了避免日志文件過大,一個(gè)分片 并不是直接對應(yīng)在一個(gè)磁盤上的日志文件,而是對應(yīng)磁盤上的一個(gè)目錄,這個(gè)目錄的命名規(guī)則是<topic_name>_<partition_id>。
比如創(chuàng)建一個(gè)名為firstTopic的topic,其中有3個(gè)partition,那么在 kafka 的數(shù)據(jù)目錄(/tmp/kafka-log)中就有 3 個(gè)目錄,firstTopic-0~3
多個(gè)分區(qū)在集群中多個(gè)broker上的分配方法
1.將所有 N Broker 和待分配的 i 個(gè) Partition 排序
2.將第 i 個(gè) Partition 分配到第(i mod n)個(gè) Broker 上

log分段
每個(gè)分片目錄中,kafka 通過分段的方式將 數(shù)據(jù) 分為多個(gè) LogSegment,一個(gè) LogSegment 對應(yīng)磁盤上的一個(gè)日志文件(00000000000000000000.log)和一個(gè)索引文件(如上:00000000000000000000.index),其中日志文件是用來記錄消息的。索引文件是用來保存消息的索引。
每個(gè)LogSegment 的大小可以在server.properties 中l(wèi)og.segment.bytes=107370 (設(shè)置分段大小,默認(rèn)是1gb)選項(xiàng)進(jìn)行設(shè)置。

segment 的 index file 和 data file 2 個(gè)文件一一對應(yīng),成對出現(xiàn),后綴".index"和“.log”分別表示為 segment 索引文件、數(shù)據(jù)文件.命名規(guī)則:partion 全局的第一個(gè) segment從 0 開始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment文件最后一條消息的 offset 值進(jìn)行遞增。數(shù)值最大為 64 位long 大小,20 位數(shù)字字符長度,沒有數(shù)字用 0 填充
第一個(gè) log 文件的最后一個(gè) offset 為:5376,所以下一個(gè)segment 的文件命名為: 0000000000000005376.log。
對應(yīng)的 index 為 00000000000000005376.index
kafka 這種分片和分段策略,避免了數(shù)據(jù)量過大時(shí),數(shù)據(jù)文件文件無限擴(kuò)張帶來的隱患,更有助于消息文件的維護(hù)以及被消費(fèi)的消息的清理。
日志和索引文件內(nèi)容分析
通過下面這條命令可以看到 kafka 消息日志的內(nèi)容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
輸出結(jié)果為:
offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
可以看到一條消息,會包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
各字段的意義:
offset:記錄號 ;position:偏移量;createTime:創(chuàng)建時(shí)間、keysize和valuesize表示key和value的大小compresscodec:表示壓縮編碼payload:表示消息的具體內(nèi)容
為了提高查找消息的性能,kafka為每一個(gè)日志文件添加 了2 個(gè)索引文件:OffsetIndex 和 TimeIndex,分別對應(yīng)*.index以及*.timeindex, *.TimeIndex 是映射時(shí)間戳和相對 offset的文件
查看索引內(nèi)容命令:
?sh ?kafka-run-class.shkafka.tools.DumpLogSegments ?--files ?/tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
索引文件和日志文件內(nèi)容關(guān)系如下

如上圖所示,index 文件中存儲了索引以及物理偏移量。
log 文件存儲了消息的內(nèi)容。
索引文件中保存了部分offset和偏移量position的對應(yīng)關(guān)系。
比如 index文件中 [4053,80899],表示在 log 文件中,對應(yīng)的是第 4053 條記錄,物理偏移量(position)為 80899.
在 partition 中通過 offset 查找 message過程
- 根據(jù) offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一個(gè)文件的最后一個(gè)offset 進(jìn)行命名的,所以,使用二分查找算法能夠根據(jù)offset 快速定位到指定的索引文件
- 找到索引文件后,根據(jù) offset 進(jìn)行定位,找到索引文件中的匹配范圍的偏移量position。(kafka 采用稀疏索引的方式來提高查找性能)
- 得到 position 以后,再到對應(yīng)的 log 文件中,從 position處開始查找 offset 對應(yīng)的消息,將每條消息的 offset 與目標(biāo) offset 進(jìn)行比較,直到找到消息
比如說,我們要查找 offset=2490 這條消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個(gè)索引,再到 log 文件中,根據(jù) 49111 這個(gè) position 開始查找,比較每條消息的 offset 是否大于等于 2490。最后查找到對應(yīng)的消息以后返回
日志的清除策略以及壓縮策略
日志的清理策略有兩個(gè)
- 根據(jù)消息的保留時(shí)間,當(dāng)消息在 kafka 中保存的時(shí)間超過了指定的時(shí)間,就會觸發(fā)清理過程
- 根據(jù) topic 存儲的數(shù)據(jù)大小,當(dāng) topic 所占的日志文件大小大于一定的閥值,則可以開始刪除最舊的消息。
通過 log.retention.bytes 和 log.retention.hours 這兩個(gè)參數(shù)來設(shè)置,當(dāng)其中任意一個(gè)達(dá)到要求,都會執(zhí)行刪除。默認(rèn)的保留時(shí)間是:7 天
kafka會啟動(dòng)一個(gè)后臺線程,定期檢查是否存在可以刪除的消息。
日志壓縮策略
Kafka 還提供了“日志壓縮(Log Compaction)”功能,通過這個(gè)功能可以有效的減少日志文件的大小,緩解磁盤緊張的情況,在很多實(shí)際場景中,消息的 key 和 value 的值之間的對應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改一樣,消費(fèi)者只關(guān)心 key 對應(yīng)的最新的 value。
因此,我們可以開啟 kafka 的日志壓縮功能,服務(wù)端會在后臺啟動(dòng)Cleaner線程池,定期將相同的key進(jìn)行合并,只保留最新的 value 值。日志的壓縮原理如下圖:

消息寫入的性能
順序?qū)?/h3>
我們現(xiàn)在大部分企業(yè)仍然用的是機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對應(yīng)的柱面、磁頭以及對應(yīng)的扇區(qū);
這個(gè)過程相對內(nèi)存來說會消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來的時(shí)間消耗,kafka 采用順序?qū)懙姆绞酱鎯?shù)據(jù)。
零拷貝
即使采用順序?qū)?,但是頻繁的 I/O 操作仍然會造成磁盤的性能瓶頸,所以 kafka還有一個(gè)性能策略:零拷貝
消息從發(fā)送到落地保存,broker 維護(hù)的消息日志本身就是文件目錄,每個(gè)文件都是二進(jìn)制保存,生產(chǎn)者和消費(fèi)者使用相同的格式來處理。
在消費(fèi)者獲取消息時(shí),服務(wù)器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動(dòng)的通過 socket 發(fā)送給消費(fèi)者。
雖然這個(gè)操作描述起來很簡單,但實(shí)際上經(jīng)歷了很多步驟。如下:

- 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存
- 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
- 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到 socket 緩存中
- 操作系統(tǒng)將數(shù)據(jù)從 socket 緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這個(gè)過程涉及到 4 次上下文切換以及 4 次數(shù)據(jù)復(fù)制,并且有兩次復(fù)制操作是由 CPU 完成。但是這個(gè)過程中,數(shù)據(jù)完全沒有進(jìn)行變化,僅僅是從磁盤復(fù)制到網(wǎng)卡緩沖區(qū)。通過“零拷貝”技術(shù),可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作,同時(shí)也會減少上下文切換次數(shù)。
現(xiàn)代的 unix 操作系統(tǒng)提供一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)?socket;在 Linux 中,是通過 sendfile 系統(tǒng)調(diào)用來完成的。
Java 提供了訪問這個(gè)系統(tǒng)調(diào)用的方法:FileChannel.transferTo API。
使用 sendfile,只需要一次拷貝就行,允許操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡(luò)上。
所以在這個(gè)優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Mybatis不支持batchInsertOrUpdate返顯id問題
這篇文章主要介紹了Mybatis不支持batchInsertOrUpdate返顯id問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
使用Spring的AbstractRoutingDataSource實(shí)現(xiàn)多數(shù)據(jù)源切換示例
這篇文章主要介紹了使用Spring的AbstractRoutingDataSource實(shí)現(xiàn)多數(shù)據(jù)源切換示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-02-02
MyBatis-Flex+ShardingSphere-JDBC多數(shù)據(jù)源分庫分表實(shí)現(xiàn)
本文介紹了使用MyBatis-Flex和ShardingSphere-JDBC實(shí)現(xiàn)多數(shù)據(jù)源分庫分表的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-10-10
Java使用線程實(shí)現(xiàn)異步運(yùn)行的方法
在Java中,實(shí)現(xiàn)異步運(yùn)行的一個(gè)常用方式是使用Thread類,這篇文章主要介紹了Java使用線程實(shí)現(xiàn)異步運(yùn)行,需要的朋友可以參考下2024-07-07
Java畢業(yè)設(shè)計(jì)之多用戶宿舍管理系統(tǒng)的實(shí)現(xiàn)
這篇文章主要介紹了基于Java實(shí)現(xiàn)的多用戶宿舍管理系統(tǒng),本文采用了jsp、servlet、jdbc等技術(shù),文中示例代碼講解詳細(xì),需要的可以參考一下2022-02-02
Java編寫程序之輸入一個(gè)數(shù)字實(shí)現(xiàn)該數(shù)字階乘的計(jì)算
這篇文章主要介紹了Java編寫程序之輸入一個(gè)數(shù)字實(shí)現(xiàn)該數(shù)字階乘的計(jì)算,本文通過實(shí)例代碼給大家介紹的非常想詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02

