從log4j2到Disruptor詳解
- log4j2實現(xiàn)原理可查看://www.dhdzp.com/article/232602.htm
- 文章同樣基于log4j-2.7版本,disruptor-3.3.6
相信看過log4j2的源碼后大家應(yīng)該明白為什么第二代日志性能會提升那么多,這其中最大的功臣莫過于Disruptor并發(fā)編程框架。
下面我們就跟著log4j2來走進Disruptor這個神奇的框(wang)架(zhan)
log4j2異步日志簡要回顧
從日志工廠(Log4jLoggerFactory)中獲取日志Logger實例
從日志上下文工廠(Log4jContextFactory)獲取日志上下文
啟用日志上下文(AsyncLoggerContext)
啟動Disruptor(AsyncLoggerDisruptor)
序列號屏障(ProcessingSequenceBarrier)等待序列號發(fā)布
等待策略(WaitStrategy)等待序列號
返回Logger等待序列號(即等待日志寫入)
異步日志(AsyncLogger)寫入
日志內(nèi)容與轉(zhuǎn)化者(RingBufferLogEventTranslator)綁定
Disruptor嘗試發(fā)布轉(zhuǎn)化者tryPublish
RingBuffer嘗試發(fā)布事件tryPublishEvent
獲取下一個可用序號
轉(zhuǎn)化并發(fā)布序號,日志與序號對應(yīng)的事件綁定,并發(fā)布序號
RingBuffer發(fā)布序號,MultiProducerSequencer發(fā)布
等待策略(waitStrategy)喚醒阻塞
Disruptor在log4j2中的應(yīng)用
AsyncLoggerDisruptor
異步日志Disruptor啟動
創(chuàng)建事件工廠EventFactory
計算ringBufferSize:AsyncLogger.RingBufferSize屬性
創(chuàng)建等待策略:AsyncLogger.WaitStrategy屬性
創(chuàng)建守護線程執(zhí)行器executor
創(chuàng)建異步隊列滿時處理策略AsyncQueueFullPolicy(非Disruptor步驟)
創(chuàng)建Disruptor
- 創(chuàng)建RingBuffer與Disruptor綁定
- RingBuffer根據(jù)生產(chǎn)者類型創(chuàng)建對應(yīng)的實例,例如多生產(chǎn)者:MultiProducerSequencer
- 創(chuàng)建多生產(chǎn)者序號(bufferSize,waitStrategy)
綁定異常句柄(Disruptor.handleExceptionsWith)
綁定事件處理句柄(Disruptor.handleEventsWith)
- 根據(jù)handle列表創(chuàng)建事件處理器createEventProcessors
- RingBuffer為Sequence(MultiProducerSequencer)序列創(chuàng)建序列屏障ProcessingSequenceBarrier
- 創(chuàng)建事件批處理器BatchEventProcessor
- 為事件批處理器綁定異常處理句柄
- 消費者倉庫(consumerRepository)添加消費者,創(chuàng)建事件處理信息EventProcessorInfo添加至消費者信息列表consumerInfos
- RingBuffer添加處理序列號列表processorSequences為序列號閘
- 如果存在序列號屏障,從閘門中移除屏障序列號并標識endOfChain為false
啟動Disruptor
- 遍歷消費者倉庫放入執(zhí)行器中執(zhí)行消費者EventProcessorInfo
- 啟動事件批處理器BatchEventProcessor
- 事件批處理器序列號自增1
- 死循環(huán)
- 序列號屏障ProcessingSequenceBarrier等待下個有效序列號,默認為超時等待策略,超時會繼續(xù)下輪循環(huán)
- 事件批處理器序列號如果小于等于有效序列號
- 從RingBuffer中按照序列號獲取event事件
- 通知回調(diào)事件句柄eventHandler.onEvent如果當前消費下標等于有效序列號availableSequence說明是當前批次的最后一個消息,endOfBatch為true:eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
- 事件批處理器序列號設(shè)置為有效序列號
異步日志Disruptor寫入
嘗試發(fā)布tryPublish事件轉(zhuǎn)化器EventTranslator:RingBufferLogEventTranslator
Disruptor獲取RingBuffer嘗試發(fā)布事件tryPublishEvent
序列號獲取下個有效序號,步進為1,例如:MultiProducerSequencer.tryNext
游標按照步進移動
判斷是否有足夠的空間,沒有則拋出InsufficientCapacityException異常
返回有效序列號
轉(zhuǎn)化器轉(zhuǎn)化消息為對應(yīng)有效序列號的事件放入entries
發(fā)布序列號
- 設(shè)置有效序列號至緩存availableBuffer
- 等待策略喚醒阻塞waitStrategy.signalAllWhenBlocking
架構(gòu)及流程
紅色數(shù)字標識流程為獲取logger時Disruptor創(chuàng)建消費者流程
黑色數(shù)字標識流程為logger寫入日志時Disruptor創(chuàng)建事件并通知消費者流程
RingBuffer對于所有消費者、生產(chǎn)者是同一個實例
- 環(huán)形隊列,dataProvide,數(shù)據(jù)的存儲與提供者
Sequencer:生產(chǎn)者
- 對于所有消費者、生產(chǎn)者(可能是多生產(chǎn)者序列類型對于Multi類型)是同一個實例,包含一個游標序列號Sequence
SequenceBarrier:序列號屏障
- 對于所有消費者、生產(chǎn)者也是同一個實例,序列號屏障包含一個等待策略、一個RingBuffer引用、一個游標序列號、一個依賴序列號(可能是組序列號類型)
BatchEventProcessor:消費者
- 消費者包含一個RingBuffer引用
- 一個序列號屏障,可以包含多個屏障序列號,默認為0個則使用RingBuffer的MultiProducerSequencer的游標序列號Sequence
- 一個EventHandler:RingBufferLogEventHandler
- 遍歷EventHandler列表將其封裝為BatchEventProcessor,將其與原始eventHandler、barrier屏障注冊至消費者資源庫consumerRepository。
- 獲取batchEventProcessor序列號默認為-1,將其緩存至processorSequences標識正在處理,并將processorSequences、disruptor、consumerRepository綁定至EventHandlerGroup。Disruptor啟動遍歷消費者資源庫啟動消費者:BatchEventProcessor
消費者入口
- 消費者消費前先自增本地序列號(即-1+1=0序號),向序列號屏障申請該序列號的消費,默認為Timeout策略申請。
- 屏障收到申請waitFor序列號,當前屏障游標序列號小于申請的消費序列號,等待生產(chǎn)者生產(chǎn)至當前序列號,如果超時則拋出異常(本地序列號不更新繼續(xù)重試);如果沒有超時,將屏障的dependentSequence序列號(如果不是非多序列號屏障類型,log4j2使用的是非多序列號屏障,則是屏障的本地游標)賦值為availableSequence返回。
- 如果availableSequence有效的序列號(即屏障的游標序列號)小于申請要消費的序列號直接返回availableSequence(即消費超出的生產(chǎn)的速度,消費者申請的序列號向后回移至有效序列號)。否則getHighestPublishedSequence判斷申請的序列號至availableSequence序列號之間的每個序列號對應(yīng)的消息事件均是有效的則返回有效序列號(即生產(chǎn)者生產(chǎn)很快,消費者申請消費的序列號很小,向前移動至有效的,可能是本身也可能會跳躍多個下標),根據(jù)生產(chǎn)者的availableBuffer判斷是否有效,因為生產(chǎn)者先發(fā)布序列號再寫入數(shù)據(jù),此處避免了讀取數(shù)據(jù)異常,如果數(shù)據(jù)沒有寫入,有效序列號緩存標識沒有寫入(即無效),消費者會進行剛剛所說的“重試”,如果之間存在無效序列號則返回申請序列號-1(即回滾一個值,進入邏輯時增加了一個值,也就是回滾至申請前的點,可以理解為與超時相同,即重試)
- 如果申請的序列號小于等于有效的序列號,則消費序列號對應(yīng)的消息事件并更新本地BatchEventProcessor的序列號,按照下標去dataProvide(RingBuffer.entries)中提取對應(yīng)位置的數(shù)據(jù)消費
- 如果申請的序列號大于有效的序列號,則將消費者本地序列號設(shè)置為有效序列號(即消費超出的生產(chǎn)的速度,消費的序列號向后回移)
- 如果期間出現(xiàn)任何未catch住的異常則會跳過當前下標,異常出現(xiàn)時的下標及對應(yīng)的事件會交由exceptionHandler處理,默認為AsyncLoggerDefaultExceptionHandler異步處理,會將異常事件輸出至系統(tǒng)的標準錯誤管道,雖然是異步也是會占用消費者線程池資源
Disruptor:生產(chǎn)者入口
- 獲取RingBuffer嘗試發(fā)布消息,生產(chǎn)者(例如:MultiProducerSequencer)
- 生產(chǎn)者游標序列號嘗試自增,判斷當前是否有足夠的空間,當前游標+步進-bufferSize是否大于最小的閘門序列號(gatingSequences,即:所有消費者的本地游標序列號processorSequences列表),最小序列號會緩存至本地gatingSequenceCache用于下次判斷減少進行所有閘門序列號的遍歷次數(shù),如果是說明已經(jīng)沒有空間(因為生產(chǎn)者生產(chǎn)申請的序列號已經(jīng)追上了消費者消費序列號的最小值。RingBuffer是一個環(huán)形隊列結(jié)構(gòu)。上面已經(jīng)講到消費者序列號會與生產(chǎn)者序列號同步,同步指消費者申請序列號小于有效序列號時會前進至有效序列號,即使有延遲也保證了有大于等于buffer值的緩沖空間供生產(chǎn)者生產(chǎn)),如果沒有空間返回false進入下一輪生產(chǎn)

- 自增成功后,將消息轉(zhuǎn)化為對應(yīng)序列號下標位置的事件數(shù)據(jù)
- Sequencer發(fā)布序列號,將當前序列號設(shè)置為有效(availableBuffer),并根據(jù)等待策略喚醒等待的消費者,被喚醒的消費者根據(jù)發(fā)布的序列號獲取相應(yīng)下標處事件數(shù)據(jù)進行處理

Disruptor為什么這么快?
Disruptor采用無鎖并發(fā)編程,框架中主要使用CAS與volatile關(guān)鍵字保證并發(fā)安全
使用環(huán)形數(shù)據(jù)結(jié)構(gòu)(另一個典型的應(yīng)用是時鐘算法也是使用的環(huán)形數(shù)據(jù)結(jié)構(gòu)),為環(huán)形結(jié)構(gòu)添加序列號屏障來控制對環(huán)形隊列讀寫操作,保證存儲數(shù)據(jù)的并發(fā)安全
另一個點便是神奇的緩沖行填充了
Log4j2為什么這么快?
使用Disruptor并發(fā)編程框架
使用NIO寫入日志數(shù)據(jù)
當然log4j2中有很多細節(jié),如果我們想要獲取線程棧信息,可以同樣學習一下這樣的寫法
// LOG4J2-1029 new Throwable().getStackTrace is faster than Thread.currentThread().getStackTrace().
final StackTraceElement[] stackTrace = new Throwable().getStackTrace();
StackTraceElement last = null;
for (int i = stackTrace.length - 1; i > 0; i--) {
final String className = stackTrace[i].getClassName();
if (fqcnOfLogger.equals(className)) {
return last;
}
last = stackTrace[i];
}
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Netty分布式ByteBuf使用subPage級別內(nèi)存分配剖析
這篇文章主要為大家介紹了Netty分布式ByteBuf使用subPage級別內(nèi)存分配剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03
Java數(shù)據(jù)結(jié)構(gòu)之位圖的簡單實現(xiàn)和使用
位圖,?是一種非常常見的結(jié)構(gòu),?它使用每個二進制位來存放一個值的狀態(tài),?就類似于?Java?當中?HashSet?存儲元素的功能。本文主要來介紹一下位圖的簡單實現(xiàn)和使用,需要的可以參考一下2023-05-05
SpringBoot中使用AOP切面編程實現(xiàn)登錄攔截功能
本文介紹了如何使用AOP切面編程實現(xiàn)Spring Boot中的登錄攔截,通過實例代碼給大家介紹的非常詳細,感興趣的朋友一起看看吧2024-12-12
springboot 設(shè)置server.port不生效的原因及解決
這篇文章主要介紹了springboot 設(shè)置server.port不生效的原因及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08

