RocketMQ設(shè)計之同步刷盤
同步刷盤方式:在返回寫成功狀態(tài)時,消息已經(jīng)被寫入磁盤。具體流程是,消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,返回消息寫成功的狀態(tài)。

在同步刷盤模式下,當(dāng)消息寫到內(nèi)存后,會等待數(shù)據(jù)寫到磁盤的CommitLog文件。
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
? ? // Synchronization flush
? ? if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
? ? ? ? final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
? ? ? ? if (messageExt.isWaitStoreMsgOK()) {
? ? ? ? ? ? GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
? ? ? ? ? ? service.putRequest(request);
? ? ? ? ? ? boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
? ? ? ? ? ? ? ? ? ? + " client address: " + messageExt.getBornHostString());
? ? ? ? ? ? ? ? putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? service.wakeup();
? ? ? ? }
? ? }
? ? // Asynchronous flush
? ? else {
? ? ? ? if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
? ? ? ? ? ? flushCommitLogService.wakeup();
? ? ? ? } else {
? ? ? ? ? ? commitLogService.wakeup();
? ? ? ? }
? ? }
}
class GroupCommitService extends FlushCommitLogService {
? ? ? ? private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
? ? ? ? private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
? ? ?? ?//提交刷盤任務(wù)到任務(wù)列表
? ? ? ? public synchronized void putRequest(final GroupCommitRequest request) {
? ? ? ? ? ? synchronized (this.requestsWrite) {
? ? ? ? ? ? ? ? this.requestsWrite.add(request);
? ? ? ? ? ? }
? ? ? ? ? ? if (hasNotified.compareAndSet(false, true)) {
? ? ? ? ? ? ? ? waitPoint.countDown(); // notify
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? private void swapRequests() {
? ? ? ? ? ? List<GroupCommitRequest> tmp = this.requestsWrite;
? ? ? ? ? ? this.requestsWrite = this.requestsRead;
? ? ? ? ? ? this.requestsRead = tmp;
? ? ? ? }
? ? ? ? private void doCommit() {
? ? ? ? ? ? synchronized (this.requestsRead) {
? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) {
? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) {
? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of
? ? ? ? ? ? ? ? ? ? ? ? // two times the flush
? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false;
? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) {
? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? this.requestsRead.clear();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it
? ? ? ? ? ? ? ? ? ? // will come to this process
? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? public void run() {
? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service started");
? ? ? ? ? ? while (!this.isStopped()) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? this.waitForRunning(10);
? ? ? ? ? ? ? ? ? ? this.doCommit();
? ? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ? CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // Under normal circumstances shutdown, wait for the arrival of the
? ? ? ? ? ? // request, and then flush
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(10);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? CommitLog.log.warn("GroupCommitService Exception, ", e);
? ? ? ? ? ? }
? ? ? ? ? ? synchronized (this) {
? ? ? ? ? ? ? ? this.swapRequests();
? ? ? ? ? ? }
? ? ? ? ? ? this.doCommit();
? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service end");
? ? ? ? }
? ? ? ? @Override
? ? ? ? protected void onWaitEnd() {
? ? ? ? ? ? this.swapRequests();
? ? ? ? }
? ? ? ? @Override
? ? ? ? public String getServiceName() {
? ? ? ? ? ? return GroupCommitService.class.getSimpleName();
? ? ? ? }
? ? ? ? @Override
? ? ? ? public long getJointime() {
? ? ? ? ? ? return 1000 * 60 * 5;
? ? ? ? }
? ? }GroupCommitRequest是刷盤任務(wù),提交刷盤任務(wù)后,會在刷盤隊列中等待刷盤,而刷盤線程
GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。之所以不直接寫是磁盤io壓力大,寫入性能低,每隔10毫秒寫一次可以提升磁盤io效率和寫入性能。
- putRequest(request) 提交刷盤任務(wù)到任務(wù)列表
- request.waitForFlush同步等待
GroupCommitService將任務(wù)列表中的任務(wù)刷盤完成。
兩個隊列讀寫分離,requestsWrite是寫隊列,用戶保存添加進(jìn)來的刷盤任務(wù),requestsRead是讀隊列,在刷盤之前會把寫隊列的數(shù)據(jù)放入讀隊列。
CommitLog的doCommit方法:
private void doCommit() {
? ? ? ? ? ? synchronized (this.requestsRead) {
? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) {
? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) {
? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of
? ? ? ? ? ? ? ? ? ? ? ? // two times the flush
? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false;
? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? //根據(jù)offset確定是否已經(jīng)刷盤
? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) {
? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
? ? ? ? ? ? ? ? ? ? }
?? ??? ??? ??? ??? ?//清空已刷盤的列表
? ? ? ? ? ? ? ? ? ? this.requestsRead.clear();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it
? ? ? ? ? ? ? ? ? ? // will come to this process
? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }- 刷盤的時候依次讀取
requestsRead中的數(shù)據(jù)寫入磁盤, - 寫入完成后清空
requestsRead。
讀寫分離設(shè)計的目的是在刷盤時不影響任務(wù)提交到列表。
CommitLog.this.mappedFileQueue.flush(0);是刷盤操作:
public boolean flush(final int flushLeastPages) {
? ? boolean result = true;
? ? MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
? ? if (mappedFile != null) {
? ? ? ? long tmpTimeStamp = mappedFile.getStoreTimestamp();
? ? ? ? int offset = mappedFile.flush(flushLeastPages);
? ? ? ? long where = mappedFile.getFileFromOffset() + offset;
? ? ? ? result = where == this.flushedWhere;
? ? ? ? this.flushedWhere = where;
? ? ? ? if (0 == flushLeastPages) {
? ? ? ? ? ? this.storeTimestamp = tmpTimeStamp;
? ? ? ? }
? ? }
? ? return result;
}通過MappedFile映射的CommitLog文件寫入磁盤
這就是RocketMQ高可用設(shè)計之同步刷盤的基本情況了,大體思路就是一個讀寫分離的隊列來刷盤,同步刷盤任務(wù)提交后會在刷盤隊列中等待刷盤完成后再返回,而GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。
到此這篇關(guān)于RocketMQ設(shè)計之同步刷盤的文章就介紹到這了,更多相關(guān)RocketMQ同步刷盤內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于IDEA2020.1新建項目maven PKIX 報錯問題解決方法
這篇文章主要介紹了關(guān)于IDEA2020.1新建項目maven PKIX 報錯問題解決方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06
java中rss解析器(rome.jar和jdom.jar)示例
這篇文章主要介紹了java中rss解析器(rome.jar和jdom.jar)示例,需要的朋友可以參考下2014-03-03
java WebSocket實現(xiàn)聊天消息推送功能
這篇文章主要為大家詳細(xì)介紹了java WebSocket實現(xiàn)聊天消息推送功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-07-07
詳解Java如何實現(xiàn)與JS相同的Des加解密算法
這篇文章主要介紹了如何在Java中實現(xiàn)與JavaScript相同的DES(Data Encryption Standard)加解密算法,確保在兩個平臺之間可以無縫地傳遞加密信息,希望對大家有一定的幫助2025-04-04
深入淺出重構(gòu)Mybatis與Spring集成的SqlSessionFactoryBean(上)
通常來講,重構(gòu)是指不改變功能的情況下優(yōu)化代碼,但本文所說的重構(gòu)也包括了添加功能。這篇文章主要介紹了重構(gòu)Mybatis與Spring集成的SqlSessionFactoryBean(上)的相關(guān)資料,需要的朋友可以參考下2016-11-11

