RocketMQ?Broker消息如何刷盤源碼解析
前言
我們在學(xué)習(xí)RocketMQ的時(shí)候,我們知道RocketMQ的刷盤策略有兩個(gè)刷盤策略
- 同步刷盤
同步刷盤即Broker消息已經(jīng)被持久化到硬盤后才會向客戶端返回成功。同步刷盤的優(yōu)點(diǎn)是能保證消息不丟失,但是這是以犧牲寫入性能為代價(jià)的。
- 異步刷盤
異步刷盤是指Broker將信息存儲到pagecache后就立即向客戶端返回成功,然后會有一個(gè)異步線程定時(shí)將內(nèi)存中的數(shù)據(jù)寫入磁盤,默認(rèn)時(shí)間間隔為500ms。
Broker中的刷盤策略是通過Broker配置文件中flushDiskType進(jìn)行配置,可以配置ASYNC_FLUSH(異步刷盤)和SYNC_FLUSH(同步刷盤),默認(rèn)配置是ASYNC_FLUSH。
Broker的刷盤采用基于JDK NIO技術(shù),消息首先會存儲到內(nèi)存中,然后再根據(jù)不同的刷盤策略在不同時(shí)間刷盤,如果有不了解的小伙伴可以參考這篇文章《【NIO實(shí)戰(zhàn)】深入理解FileChannel》
刷盤相關(guān)類介紹
CommitLog中的內(nèi)部類FlushCommitLogService及其子類CommitRealTimeService、GroupCommitService、FlushRealTimeService分別是用于不同場景下用于刷盤的刷盤行為,他們會單獨(dú)或者配合起來使用。具體類圖如下所示。

如果是同步刷盤會使用GroupCommitService。如果是異步刷盤,并且關(guān)閉了堆外緩存(TransientStorePool),則采用FlushRealTimeService刷盤。如果是異步刷盤,并且開啟了堆外緩存,則會使用FlushRealTimeService與CommitRealTimeService配合刷盤。
默認(rèn)的輸盤策略是異步且關(guān)閉堆外緩存,因此默認(rèn)是采用FlushRealTimeService進(jìn)行刷盤

Broker刷盤源碼分析
消息刷盤相關(guān)邏輯都是圍繞在CommitLog,因此要想知道消息時(shí)如何刷盤的關(guān)鍵是研究CommitLog
CommitLog構(gòu)造&屬性賦值
CommitLog中與刷盤相關(guān)的屬性有flushCommitLogService、commitLogService。如果是同步刷盤則在構(gòu)造函數(shù)中會給flushCommitLogService賦值GroupCommitService,如果是異步刷盤則給flushCommitLogService賦值FlushRealTimeService。commitLogService的值是CommitRealTimeService,從上面我們可以很明顯的看出它只有在異步且開啟TransientStorePoolEnabled時(shí)才會被使用。
public class CommitLog {
// 如果是同步刷盤,則是GroupCommitService。如果是異步刷盤則是FlushRealTimeService
// 默認(rèn)是異步刷盤,因此是CommitLog$FlushRealTimeService
private final FlushCommitLogService flushCommitLogService;
// 開啟TransientStorePoolEnable時(shí)使用CommitRealTimeService
private final FlushCommitLogService commitLogService;
// 構(gòu)造函數(shù)
public CommitLog(final DefaultMessageStore defaultMessageStore) {
// 默認(rèn)是異步刷盤,因此這里是false
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
// 消息回調(diào)
this.appendMessageCallback = new DefaultAppendMessageCallback();
flushDiskWatcher = new FlushDiskWatcher();
}
}
TransientStorePoolEnabled介紹
transientStorePoolEnabled配置的默認(rèn)值為false,開啟transientStorePoolEnabled需要手動(dòng)開啟。如果開啟transientStorePoolEnabled會開啟堆外內(nèi)存存儲池,Broker在啟動(dòng)時(shí)會申請5個(gè)與CommitLog大小(1GB)相同的堆外內(nèi)存交給TransientStorePool,創(chuàng)建MappedFile時(shí)會向TransientStorePool“借”一個(gè)堆外內(nèi)存ByteBuffer,保存消息時(shí)會先將消息保存到堆外內(nèi)存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盤中。TransientStorePool屬性和一些核心方法源碼如下,堆外內(nèi)存ByteBuffer都是由它來管理。
// org.apache.rocketmq.store.TransientStorePool
public class TransientStorePool {
// 存儲池大小,默認(rèn)是5
private final int poolSize;
// CommitLog MappedFile文件大小,默認(rèn)1GB
private final int fileSize;
// 默認(rèn)存5個(gè)ByteBuffer
private final Deque<ByteBuffer> availableBuffers;
// 消息存儲配置
private final MessageStoreConfig storeConfig;
// TransientStorePool初始化
public void init() {
// 默認(rèn)是5
for (int i = 0; i < poolSize; i++) {
// 分配1GB的直接內(nèi)存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
// 生成的緩存保存到隊(duì)列中
availableBuffers.offer(byteBuffer);
}
}
// 歸還緩沖
public void returnBuffer(ByteBuffer byteBuffer) {
// 修改position和limit,"清空"緩沖
byteBuffer.position(0);
byteBuffer.limit(fileSize);
// 緩沖入隊(duì)
this.availableBuffers.offerFirst(byteBuffer);
}
// 向TransientStorePool借緩沖
public ByteBuffer borrowBuffer() {
// 緩沖出隊(duì)
ByteBuffer buffer = availableBuffers.pollFirst();
return buffer;
}
}
消息保存源碼分析
前面文章《【RocketMQ | 源碼分析】Broker是如何保存消息的? 》我們雖然介紹了消息的保存過程,但是開啟或者關(guān)閉TransientStorePoolEnabled時(shí),消息保存的細(xì)節(jié)是不同的,我們再打開消息保存MappedFile的源碼如下,下面代碼中如果writeBuffer不空,則會將消息先追加到writeBuffer,否者直接寫入到MappedFile的內(nèi)存映射文件中。
// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// 如果寫文件位置小于文件size
if (currentPos < this.fileSize) {
// 如果writeBuffer不空,則獲取writeBuffer的淺拷貝,否則獲取MappedFile的內(nèi)存映射(MappedByteBuffer)的淺拷貝
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
// 如果是單條消息
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件長度-當(dāng)前寫位置,可以寫的長度*/,
(MessageExtBrokerInner) messageExt, putMessageContext);
} // ...如果是批量消息
return result;
}
}
那么什么情況下MappedFile中的writeBuffer為空,什么情況下writeBuffer不為空呢?我們可以先來了解MappedFile是如何創(chuàng)建的,MappedFile是由AllocateMappedFileService創(chuàng)建的,具體源碼如下,如果開啟了TransientStorePoolEnabled,則在創(chuàng)建MappedFile時(shí)會向TransientStorePool“借”一個(gè)ByteBuffer,如果沒有開啟TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存數(shù)據(jù)時(shí)會將數(shù)據(jù)直接保存到MappedFile的直接內(nèi)存映射(MappedByteBuffer)中。
private boolean mmapOperation() {
// ...
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
// 初始化mappedFile會向TransientStorePool"借"一個(gè)writeBuffer
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 創(chuàng)建MappedFile,沒有writeBuffer
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
// ...
}
由上可知,消息保存如下圖所示

消息刷盤入口方法源碼分析
消息保存和刷盤的入口方法CommitLog#asyncPutMessage,消息保存到mappedFile的緩存后,最后會調(diào)用submitFlushRequest方法提交刷盤請求,Broker會根據(jù)刷盤策略進(jìn)行刷盤。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
//... 保存消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
// ...
// 提交刷盤請求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 提交復(fù)制請求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
// 合并提交刷盤請求和提交復(fù)制請求結(jié)果
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
提交了刷盤請求后,根據(jù)刷盤策略,是否開啟堆外緩存,推送消息中是否要等待消息保存有如下四種刷盤方式
- 異步刷盤(關(guān)閉TransientStorePoolEnabled)
異步刷盤(關(guān)閉TransientStorePoolEnabled)是默認(rèn)的刷盤方案,這個(gè)刷盤方案先會**異步喚醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于關(guān)閉了TransientStorePoolEnabled,消息是保存到MappedFile中的內(nèi)存映射文件MappedByteBuffer,F(xiàn)lushRealTimeService將定時(shí)MappedByteBuffer刷到磁盤。
- 異步刷盤(開啟TransientStorePoolEnabled)
異步刷盤(開啟TransientStorePoolEnabled)會先**異步喚醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于開啟了TransientStorePoolEnabled,消息會保存到MappedFile中的內(nèi)存映射文件ByteBuffer,CommitRealTimeService定時(shí)將ByteBuffer中的數(shù)據(jù)刷到FileChannel中。
- 同步刷盤(等待消息保存)
同步刷盤(等待消息保存)會先創(chuàng)建一個(gè)刷盤請求(GroupCommitRequest),然后向GroupCommitService提交刷盤請求,最后等待刷盤結(jié)果并返回
- 同步刷盤(不等待消息保存)
同步刷盤(不等待消息保存)也是通過GroupCommitService刷盤,與等待消息保存不同的是不等待的方式異步喚醒(wakeup)GroupCommitService后,直接返回消息保存成功。
四種刷盤方式源碼如下所示
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// 同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 獲取同步刷盤Service
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
// 創(chuàng)建GroupCommitRequest 刷盤偏移量nextOffset = 當(dāng)前寫入偏移量 + 當(dāng)前消息寫入大小
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
// 向刷盤監(jiān)視器(flushDistWatch)提交刷盤請求
flushDiskWatcher.add(request);
// 提交刷盤請求,并且喚醒同步刷盤線程
service.putRequest(request);
return request.future();
} else {
// 同步刷盤,但是不需要等待刷盤結(jié)果,那么喚醒同步刷盤線程
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// 異步刷盤
else {
// 是否啟動(dòng)了堆外緩存
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// 如果沒有啟動(dòng)堆外緩存,則喚醒異步刷盤服務(wù) flushRealTimeService
flushCommitLogService.wakeup();
} else {
// 如果啟動(dòng)了堆外緩存,則喚醒異步轉(zhuǎn)存服務(wù)CommitRealTimeService
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
將上面四種場景及調(diào)用關(guān)系如下圖所示

總結(jié)
本篇文章介紹了TransientStorePool機(jī)制以及開啟和管理隊(duì)消息保存的影響,我們還介紹了RocketMQ中四種刷盤策略
- 同步刷盤-等待消息保存到磁盤
- 同步刷盤-不等待消息保存到磁盤上
- 異步刷盤-開啟堆外緩存
- 異步刷盤-不開啟堆外緩存
以上就是RocketMQ Broker消息如何刷盤源碼解析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Broker消息刷盤的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解析Java中所有錯(cuò)誤和異常的父類java.lang.Throwable
這篇文章主要介紹了Java中所有錯(cuò)誤和異常的父類java.lang.Throwable,文章中簡單地分析了其源碼,說明在代碼注釋中,需要的朋友可以參考下2016-03-03
springboot中rabbitmq實(shí)現(xiàn)消息可靠性機(jī)制詳解
這篇文章主要介紹了springboot中rabbitmq實(shí)現(xiàn)消息可靠性機(jī)制詳解,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-09-09
Java終止循環(huán)體的具體實(shí)現(xiàn)
這篇文章主要介紹了Java終止循環(huán)體的具體實(shí)現(xiàn),需要的朋友可以參考下2014-02-02
http basic authentication通過post方式訪問api示例分享 basic認(rèn)證示例
在HTTP中,基本認(rèn)證是一種用來允許Web瀏覽器或其他客戶端程序在請求時(shí)提供以用戶名和口令形式的憑證,這篇文章主要介紹了http basic authentication通過post方式訪問api示例,大家參考使用吧2014-01-01
校驗(yàn)非空的注解@NotNull如何取得自定義的message
這篇文章主要介紹了校驗(yàn)非空的注解@NotNull如何取得自定義的message,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
IDEA取消SVN關(guān)聯(lián),再重新分享項(xiàng)目的操作
這篇文章主要介紹了IDEA取消SVN關(guān)聯(lián),再重新分享項(xiàng)目的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
Java通過Lambda表達(dá)式實(shí)現(xiàn)簡化代碼
我們在編寫代碼時(shí),常常會遇到代碼又長又重復(fù)的情況,就像調(diào)用第3方服務(wù)時(shí),每個(gè)方法都差不多,?寫起來啰嗦,?改起來麻煩,?還容易改漏,所以本文就來用Lambda表達(dá)式簡化一下代碼,希望對大家有所幫助2023-05-05
淺析Java關(guān)鍵詞synchronized的使用
Synchronized是java虛擬機(jī)為線程安全而引入的。這篇文章主要為大家介紹一下Java關(guān)鍵詞synchronized的使用與原理,需要的可以參考一下2022-12-12

