Netty分布式pipeline管道傳播outBound事件源碼解析
了解了inbound事件的傳播過程, 對(duì)于學(xué)習(xí)outbound事件傳輸?shù)牧鞒? 也不會(huì)太困難
outbound事件傳輸流程
在我們業(yè)務(wù)代碼中, 有可能使用wirte方法往寫數(shù)據(jù):
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().write("test data");
}當(dāng)然, 直接調(diào)用write方法是不能往對(duì)方channel中寫入數(shù)據(jù)的, 因?yàn)檫@種方式只能寫入到緩沖區(qū), 還要調(diào)用flush方法才能將緩沖區(qū)數(shù)據(jù)刷到channel中, 或者直接調(diào)用writeAndFlush方法, 有關(guān)邏輯, 我們會(huì)在后面章節(jié)中詳細(xì)講解, 這里只是以wirte方法為例為了演示outbound事件的傳播的流程
這里我們同樣給出兩種寫法
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//寫法1
ctx.channel().write("test data");
//寫法2
ctx.write("test data");
}這兩種寫法有什么區(qū)別, 我們首先跟到第一種寫法中去:
ctx.channel().write("test data");這里獲取ctx所綁定的channel
我們跟到AbstractChannel的write方法中:
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}這里pipeline是DefaultChannelPipeline
跟到其write方法中:
public final ChannelFuture write(Object msg) {
//從tail節(jié)點(diǎn)開始(從最后的節(jié)點(diǎn)往前寫)
return tail.write(msg);
}這里調(diào)用tail節(jié)點(diǎn)write方法, 這里我們應(yīng)該能分析到, outbound事件, 是通過tail節(jié)點(diǎn)開始往上傳播的, 帶著這點(diǎn)猜想, 我們繼往下看
其實(shí)tail節(jié)點(diǎn)并沒有重寫write方法, 最終會(huì)調(diào)用其父類AbstractChannelHandlerContext的write方法
AbstractChannelHandlerContext的write方法:
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}我們看到這里有個(gè)newPromise()這個(gè)方法, 這里是創(chuàng)建一個(gè)Promise對(duì)象, 有關(guān)Promise的相關(guān)知識(shí)我們會(huì)在以后的章節(jié)剖析
我們繼續(xù)跟write:
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
//代碼省略
write(msg, false, promise);
return promise;
}繼續(xù)跟write:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
//沒有調(diào)flush
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}這里跟我們上一小節(jié)剖析過channelRead方法有點(diǎn)類似, 但是事件傳輸?shù)姆较蛴兴煌? 這里findContextOutbound()是獲取上一個(gè)標(biāo)注outbound事件的HandlerContext
跟到findContextOutbound中
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}這里的邏輯我們似曾相識(shí), 跟我們上一小節(jié)的findContextInbound()方法有點(diǎn)像, 只是過程是反過來的
在這里, 會(huì)找到當(dāng)前context的上一個(gè)節(jié)點(diǎn), 如果標(biāo)注的事件不是outbound事件, 則繼續(xù)往上找, 意思就是找到上一個(gè)標(biāo)注outbound事件的節(jié)點(diǎn)
回到write方法:
AbstractChannelHandlerContext next = findContextOutbound();
這里將找到節(jié)點(diǎn)賦值到next屬性中
因?yàn)槲覀冎胺治龅膚rite事件是從tail節(jié)點(diǎn)傳播的, 所以上一個(gè)節(jié)點(diǎn)就有可能是用戶自定的handler所屬的context
然后判斷是否為當(dāng)前eventLoop線程, 如果是不是, 則封裝成task異步執(zhí)行, 如果不是, 則繼續(xù)判斷是否調(diào)用了flush方法, 因?yàn)槲覀冞@里沒有調(diào)用, 所以會(huì)執(zhí)行到next.invokeWrite(m, promise),
我們繼續(xù)跟invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}這里會(huì)判斷當(dāng)前handler的狀態(tài)是否是添加狀態(tài), 這里返回的是true, 將會(huì)走到invokeWrite0(msg, promise)這一步
繼續(xù)跟invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//調(diào)用當(dāng)前handler的wirte()方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}這里的邏輯也似曾相識(shí), 調(diào)用了當(dāng)前節(jié)點(diǎn)包裝的handler的write方法, 如果用戶沒有重寫write方法, 則會(huì)交給其父類處理
我們跟到ChannelOutboundHandlerAdapter的write方法中看:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}這里調(diào)用了當(dāng)前ctx的write方法, 這種寫法和我們小節(jié)開始的寫法是相同的, 我們回顧一下:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//寫法1
ctx.channel().write("test data");
//寫法2
ctx.write("test data");
}我們跟到其write方法中, 這里走到的是AbstractChannelHandlerContext類的write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
//沒有調(diào)flush
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}又是我們所熟悉邏輯, 找到當(dāng)前節(jié)點(diǎn)的上一個(gè)標(biāo)注事件為outbound事件的節(jié)點(diǎn), 繼續(xù)執(zhí)行invokeWrite方法, 根據(jù)之前的剖析, 我們知道最終會(huì)執(zhí)行到上一個(gè)handler的write方法中
走到這里已經(jīng)不難理解, ctx.channel().write("test data")其實(shí)是從tail節(jié)點(diǎn)開始傳播寫事件, 而ctx.write("test data")是從自身開始傳播寫事件
所以, 在handler中如果重寫了write方法要傳遞write事件, 一定采用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")這種方式, 因?yàn)闀?huì)造成每次事件傳輸?shù)竭@里都會(huì)從tail節(jié)點(diǎn)重新傳輸, 導(dǎo)致不可預(yù)知的錯(cuò)誤
如果用代碼中沒有重寫handler的write方法, 則事件會(huì)一直往上傳輸, 當(dāng)傳輸完所有的outbound節(jié)點(diǎn)之后, 最后會(huì)走到head節(jié)點(diǎn)的wirte方法中
我們跟到HeadContext的write方法中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}我們看到write事件最終會(huì)流向這里, 通過unsafe對(duì)象進(jìn)行最終的寫操作
有關(guān)inbound事件和outbound事件的傳輸, 可通過下圖進(jìn)行說明:

以上就是Netty分布式pipeline管道傳播outBound事件源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式pipeline管道傳播outBound的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決spring?data?jpa?saveAll()?保存過慢問題
這篇文章主要介紹了解決spring?data?jpa?saveAll()保存過慢問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
SpringBoot解析JSON數(shù)據(jù)的三種方案
JSON(JavaScript Object Notation) 是一種輕量級(jí)的數(shù)據(jù)交換格式,易于人閱讀和編寫,同時(shí)也易于機(jī)器解析和生成,本文給大家介紹了SpringBoot解析JSON數(shù)據(jù)的三種方案,需要的朋友可以參考下2024-03-03
關(guān)于弱引用WeakReference所引用的對(duì)象的回收規(guī)則
這篇文章主要介紹了關(guān)于弱引用WeakReference所引用的對(duì)象的回收規(guī)則,如果一個(gè)弱引用實(shí)例的成員變量referent引用了一個(gè)對(duì)象obj,那么就稱這個(gè)弱引用實(shí)例對(duì)obj的引用是弱引用,被一個(gè)弱引用實(shí)例引用的對(duì)象,稱為弱引用對(duì)象,需要的朋友可以參考下2023-09-09
MyBatis學(xué)習(xí)教程(二)—如何使用MyBatis對(duì)users表執(zhí)行CRUD操作
這篇文章主要介紹了MyBatis學(xué)習(xí)教程(二)—如何使用MyBatis對(duì)users表執(zhí)行CRUD操作的相關(guān)資料,需要的朋友可以參考下2016-05-05
java實(shí)現(xiàn)對(duì)excel文件的處理合并單元格的操作
這篇文章主要介紹了java實(shí)現(xiàn)對(duì)excel文件的處理合并單元格的操作,開頭給大家介紹了依賴引入代碼,表格操作的核心代碼,代碼超級(jí)簡單,需要的朋友可以參考下2021-07-07
SpringSecurity微服務(wù)實(shí)戰(zhàn)之公共模塊詳解
這篇文章主要為大家介紹了SpringSecurity微服務(wù)實(shí)戰(zhàn)之公共模塊詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
Java程序員編程性能優(yōu)化必備的34個(gè)小技巧(總結(jié))
這篇文章主要介紹了Java程序員編程性能優(yōu)化必備的34個(gè)小技巧(總結(jié)),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-07-07

