RocketMQ消息存儲(chǔ)文件的加載與恢復(fù)機(jī)制源碼分析
前言
前面文章我們介紹了Broker是如何將消息全量存儲(chǔ)到CommitLog文件中,并異步生成dispatchRequest任務(wù)更新ConsumeQueue,IndexFile的過程以及ConsumeQueue和IndexFile的文件結(jié)構(gòu)。由于是異步轉(zhuǎn)發(fā)消息,就可能出現(xiàn)消息成功存儲(chǔ)到CommitLog文件,轉(zhuǎn)發(fā)請(qǐng)求任務(wù)執(zhí)行失敗,Broker宕機(jī)了,此時(shí)CommitLog和Index消息并未處理完,導(dǎo)致CommitLog與ConsumeQueue和IndexFile文件中的數(shù)據(jù)不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么這部分消息Consumer將永遠(yuǎn)無法消費(fèi)到了,那么Broker是如何保證數(shù)據(jù)一致性的呢?
StoreCheckPoint介紹
StoreCheckPoint的作用是記錄CommitLog,ConsumeQueue和IndexFile的刷盤點(diǎn),當(dāng)Broker異常結(jié)束時(shí)會(huì)根據(jù)StoreCheckPoint的數(shù)據(jù)恢復(fù),StoreCheckPoint屬性如下
public class StoreCheckpoint {
// commitLog最后一條信息的刷盤時(shí)間戳
private volatile long physicMsgTimestamp = 0;
// consumeQueue最后一個(gè)存儲(chǔ)單元刷盤時(shí)間戳
private volatile long logicsMsgTimestamp = 0;
// 最近一個(gè)已經(jīng)寫完IndexFile的最后一條記錄刷盤時(shí)間戳
private volatile long indexMsgTimestamp = 0;
}
StoreCheckPoint文件的存儲(chǔ)位置是${user.home}/store/checkpoint,文件的固定長(zhǎng)度為4K,但StoreCheckPoint只占用了前24個(gè)字節(jié),存儲(chǔ)格式如下圖所示

StoreCheckPoint時(shí)間戳更新時(shí)機(jī)
physicMsgTimestamp
FlushRealTimeService刷盤時(shí)更新
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
// ...
// 更新CommitLog刷盤時(shí)間戳
if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
}
GroupCommitService刷盤時(shí)更新
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
// ...
// 更新CommitLog刷盤時(shí)間戳
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
}
logicsMsgTimestamp
ConsumeQueue保存消息存儲(chǔ)單元時(shí)更新
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
// ...
// 如果consumeQueue保存成功,則更新ConsumeQueue存儲(chǔ)點(diǎn)信息
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
}
}
ConsumeQueue刷盤時(shí)更新并觸發(fā)StoreCheckPoint刷盤
// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush
private void doFlush(int retryTimes) {
// ...
// 更新ConsumeQueue存儲(chǔ)時(shí)間戳,并刷盤
if (0 == flushConsumeQueueLeastPages) {
if (logicsMsgTimestamp > 0) {
DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
}
// 更新存儲(chǔ)點(diǎn)
DefaultMessageStore.this.getStoreCheckpoint().flush();
}
}
indexMsgTimestamp
// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile
public IndexFile getAndCreateLastIndexFile() {
// 獲取最新IndexFile,如果IndexFile已經(jīng)滿了,需要?jiǎng)?chuàng)建一個(gè)新的IndexFile
if (indexFile == null) {
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
// 如果創(chuàng)建新的IndexFile成功,原IndexFile刷盤
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
// indexFile刷盤
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
// org.apache.rocketmq.store.index.IndexService#flush
public void flush(final IndexFile f) {
if (null == f)
return;
long indexMsgTimestamp = 0;
if (f.isWriteFull()) {
indexMsgTimestamp = f.getEndTimestamp();
}
f.flush();
if (indexMsgTimestamp > 0) {
// 更新checkPoint的indexMsgTimestamp并觸發(fā)刷盤
this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
this.defaultMessageStore.getStoreCheckpoint().flush();
}
}
- 保存消息Index,獲取最新的IndexFile如果滿了,則會(huì)創(chuàng)建一個(gè)新的IndexFile,并且更新IndexMsgTimestamp并觸發(fā)StoreCheckPoint刷盤
StoreCheckPoint刷盤源碼
StoreCheckPoint刷盤源碼如下所示,就是將CommitLog,ConsumeQueue和IndexFile刷盤時(shí)間戳持久化到硬盤上,由上面源碼可知它的刷盤觸發(fā)時(shí)機(jī)
- ConsumeQueue刷盤時(shí)觸發(fā)
- 創(chuàng)建新IndexFile文件時(shí)觸發(fā)
StoreCheckPoint刷盤源碼如下
// org.apache.rocketmq.store.StoreCheckpoint#flush
public void flush() {
this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
this.mappedByteBuffer.force();
}
消息加載源碼分析
在BrokerController啟動(dòng)時(shí)會(huì)調(diào)用DefaultMessageStore#load加載存儲(chǔ)文件加載和恢復(fù)過程主要分為下面幾步
- 判斷Broker上次是否正常退出。這個(gè)判斷邏輯是根據(jù)
${user.home}/store/abort是否存在。如果文件存在,說明上次是異常退出,如果文件不存在,則說明是正常退出。 - 加載CommitLog
- 加載ConsumeQueue
- 加載StoreCheckPoint
- 加載IndexFile
- 恢復(fù)ConsumeQueue與IndexFile
- 加載延遲隊(duì)列服務(wù)
// org.apache.rocketmq.store.DefaultMessageStore#load
public boolean load() {
boolean result = true;
try {
// 1. Broker上次是否正常退出
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
// 2. 加載commitLog
result = result && this.commitLog.load();
// 3. 加載consumeQueue
result = result && this.loadConsumeQueue();
if (result) {
// 4. 加載StoreCheckPoint
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
// 5. 加載IndexFile
this.indexService.load(lastExitOK);
// 6. 恢復(fù)ConsumeQueue與IndexFile
this.recover(lastExitOK);
// 7. 延遲隊(duì)列服務(wù)加載
if (null != scheduleMessageService) {
result = this.scheduleMessageService.load();
}
}
}
return result;
}
CommitLog加載
前面文章介紹過,CommitLog文件的存儲(chǔ)目錄是${user.home}/store/commitlog/,并且CommitLog文件的底層是MappedFile,由MappedFileQueue管理。

CommitLog文件的加載其實(shí)調(diào)用的是MappedFileQueue#load方法,代碼如下所示,load()中首先加載CommitLog文件目錄下的所有文件,并調(diào)用doLoad()方法加載CommitLog。
// org.apache.rocketmq.store.MappedFileQueue#load
public boolean load() {
File dir = new File(this.storePath/*${user.home}/store/commitlog/*/);
File[] ls = dir.listFiles();
if (ls != null) {
return doLoad(Arrays.asList(ls));
}
return true;
}
MappedFile的加載過程如下所示,核心邏輯主要分為下面三步
- 按照文件名稱將文件排序,排序好的文件就會(huì)按照消息保存的先后順序存放在列表中
- 校驗(yàn)文件大小與mappedFile是否一致,如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改
- 創(chuàng)建mappedFile,并且設(shè)置wrotePosition,flushedPosition,committedPosition為mappedFileSize
public boolean doLoad(List<File> files) {
// 按照文件名稱排序
files.sort(Comparator.comparing(File::getName));
for (File file : files) {
// 如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改
if (file.length() != this.mappedFileSize) {
return false;
}
try {
// 創(chuàng)建MappedFile
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
}
}
return true;
}
看到這里肯定會(huì)有疑問,加載后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都為mappedFileSize,如果最后一個(gè)MappedFile沒有使用完,Broker啟動(dòng)后還會(huì)從最后一個(gè)MappedFile開始寫么?我們可以在后面消息文件恢復(fù)源碼分析找到答案。
ConsumeQueue加載
從前面文章我們知道,ConsumeQueue文件底層其實(shí)也是MappedFile,因此ConsumeQueue文件的加載與CommitLog加載差別不大。ConsumeQueue加載邏輯為
- 獲取ConsumeQueue目錄下存儲(chǔ)的所有Topic目錄,遍歷Topic目錄
- 遍歷每個(gè)Topic目錄下的所有queueId目錄,逐個(gè)加載ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue
private boolean loadConsumeQueue() {
// 獲取consumeQueue目錄
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */));
// topic文件夾數(shù)組
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
// 遍歷topic
for (File fileTopic : fileTopicList) {
// 獲取topic名稱
String topic = fileTopic.getName();
// 獲取queueId文件夾數(shù)組
File[] fileQueueIdList = fileTopic.listFiles();
// 遍歷queueId
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId;
// 文件夾名稱就是queueId
queueId = Integer.parseInt(fileQueueId.getName());
// 構(gòu)建consumeQueue
ConsumeQueue logic = new ConsumeQueue(/* ... */);
this.putConsumeQueue(topic, queueId, logic);
// ConsumeQueue加載
if (!logic.load()) {
return false;
}
}
}
}
}
return true;
}
IndexFile加載
IndexFile文件加載過程調(diào)用的是IndexService#load,首先獲取${user.home}/store/index目錄下的所有文件,遍歷所有文件,如果IndexFile最后存儲(chǔ)時(shí)間大于StoreCheckPoint中indexMsgTimestamp,則會(huì)先刪除IndexFile
// org.apache.rocketmq.store.index.IndexService#load
public boolean load(final boolean lastExitOK) {
// indexFile文件目錄
File dir = new File(this.storePath);
// indexFile文件列表
File[] files = dir.listFiles();
if (files != null) {
// 文件排序
Arrays.sort(files);
for (File file : files) {
try {
IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
f.load();
if (!lastExitOK) {
// 文件最后存儲(chǔ)時(shí)間戳大于刷盤點(diǎn),則摧毀indexFile,重建
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存儲(chǔ)點(diǎn)時(shí)間*/
.getIndexMsgTimestamp()) {
f.destroy(0);
continue;
}
}
this.indexFileList.add(f);
}
}
}
return true;
}
ConsumeQueue與IndexFile恢復(fù)
如果是正常退出,數(shù)據(jù)都已經(jīng)正常刷盤,前面我們說到CommitLog在加載時(shí)的wrotePosition,flushedPosition,committedPosition都設(shè)置為mappedFileSize,
因此即使是正常退出,也會(huì)調(diào)用CommitLog#recoverNormally找到最后一條消息的位置,更新這三個(gè)屬性。
// org.apache.rocketmq.store.DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {
// consumeQueue中最大物理偏移量
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
// 正常退出文件恢復(fù)
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
// 異常退出文件恢復(fù)
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
// 恢復(fù)topicQueueTable
this.recoverTopicQueueTable();
}
正?;謴?fù)的源碼如下,由于Broker是正常關(guān)閉,因此CommitLog,ConsumeQueue與IndexFile都已經(jīng)正確刷盤,并且三者的消息是一致的。正?;謴?fù)的主要目的是找到找到最后一條消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盤點(diǎn)(flushWhere)和提交點(diǎn)(committedWhere),
- 從最后3個(gè)mappedFile開始恢復(fù),如果mappedFile總數(shù)不足3個(gè),則從第0個(gè)mappedFile開始恢復(fù)
- 逐個(gè)遍歷mappedFile,找到每個(gè)MappedFile的最后一條消息的偏移量,并將其更新到CommitLog中MappedFileQueue的刷盤點(diǎn)和提交點(diǎn)中
- 清除ConsumeQueue冗余數(shù)據(jù)
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
// 確認(rèn)消息是否完整,默認(rèn)是true
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// 默認(rèn)從最后3個(gè)mappedFile開始恢復(fù)
int index = mappedFiles.size() - 3;
// 如果commitLog不足三個(gè),則從第一個(gè)文件開始恢復(fù)
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 最后一個(gè)MappedFile的文件起始偏移量
long processOffset = mappedFile.getFileFromOffset();
// mappedFileOffset偏移量
long mappedFileOffset = 0;
// 遍歷CommitLog文件
while (true) {
// 校驗(yàn)消息完整性
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
// 獲取消息size
int size = dispatchRequest.getMsgSize();
// 返回結(jié)果為true并且消息size>0,說明消息是完整的
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
}
// 最大物理偏移量
processOffset += mappedFileOffset;
// 更新flushedWhere和committedPosition指針
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// 清除ConsumeQueue冗余數(shù)據(jù)
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/);
}
}
}
異?;謴?fù)源碼如下,由于上次Broker沒有正常關(guān)閉,因此由可能存在CommitLog、ConsumeQueue與IndexFile不一致的情況,因此在異?;謴?fù)時(shí)可能需要恢復(fù)ConsumeQueue和IndexFile,異?;謴?fù)核心邏輯主要包括
- 倒序查CommitLog的mappedFile文件,找到第一條消息存儲(chǔ)的時(shí)間戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,該mappedFile至少有一部分消息是被正常轉(zhuǎn)發(fā),正常存儲(chǔ),正常刷盤的
- 從該mappedFile開始逐條轉(zhuǎn)發(fā)消息,重新恢復(fù)ConsumeQueue和IndexFile
- 當(dāng)遍歷到最后一條消息,將其偏移量更新到CommitLog中MappedFileQueue的刷盤點(diǎn)和提交點(diǎn)中
- 清除ConsumeQueue冗余數(shù)據(jù)
// org.apache.rocketmq.store.CommitLog#recoverAbnormally
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// 是否CRC校驗(yàn)
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// 最后一個(gè)mappedFile的index
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
// 倒序遍歷mappedFile數(shù)組,
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
// 1. 如果第一條消息的時(shí)間戳小于存儲(chǔ)點(diǎn)時(shí)間戳
if (this.isMappedFileMatchedRecover(mappedFile)) {
break;
}
}
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
mappedFileOffset += size;
// 2. 轉(zhuǎn)發(fā)消息
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重復(fù),默認(rèn)是false*/) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
}
// 3. 更新MappedFileQueue中的刷盤位置和提交位置
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// 清除ConsumeQueue中的冗余數(shù)據(jù)
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
}
總結(jié)
Broker啟動(dòng)時(shí)會(huì)分別加載CommitLog、ConsumeQueue與IndexFile。加載完成后,如果Broker上次是正常退出,只需要找到CommitLog的最后一條消息,并更新刷盤點(diǎn)和提交點(diǎn)。如果Broker上次是異常退出,就有可能出現(xiàn)ConsumeQueue、IndexFile與CommitLog不一致的情況,需要根據(jù)StoreCheckPoint存儲(chǔ)的時(shí)間戳從CommitLog找到消息,逐條恢復(fù)ConsumeQueue與IndexFile。
以上就是RocketMQ | 源碼分析】消息存儲(chǔ)文件的加載與恢復(fù)機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 消息存儲(chǔ)文件加載恢復(fù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java String.split 無法使用小數(shù)點(diǎn)分割的問題
這篇文章主要介紹了java String.split 無法使用小數(shù)點(diǎn)分割的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
Java計(jì)時(shí)器StopWatch實(shí)現(xiàn)方法代碼實(shí)例
這篇文章主要介紹了Java計(jì)時(shí)器StopWatch實(shí)現(xiàn)方法代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
SpringBoot實(shí)現(xiàn)過濾敏感詞的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何利用SpringBoot實(shí)現(xiàn)過濾敏感詞功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以動(dòng)手嘗試一下2022-08-08
java Spring 5 新特性函數(shù)式Web框架詳細(xì)介紹
正如昨天Juergen博客中所提到的,Spring 5.0的第二個(gè)里程碑是引入了一個(gè)新的函數(shù)式web框架。在這篇文章中,我們將給出關(guān)于這個(gè)框架的更多信息,,需要的朋友可以參考下2016-12-12
Spring @Profile注解實(shí)現(xiàn)多環(huán)境配置
這篇文章主要介紹了Spring @Profile注解實(shí)現(xiàn)多環(huán)境配置,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
Java老手該當(dāng)心的13個(gè)錯(cuò)誤
這篇文章主要介紹了Java老手該當(dāng)心的13個(gè)錯(cuò)誤,需要的朋友可以參考下2015-04-04
解決Swagger2返回map復(fù)雜結(jié)構(gòu)不能解析的問題
這篇文章主要介紹了解決Swagger2返回map復(fù)雜結(jié)構(gòu)不能解析的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07

