Netty分布式pipeline管道創(chuàng)建方法跟蹤解析
上一章節(jié)回顧:Netty分布式源碼分析監(jiān)聽讀事件
概述
pipeline, 顧名思義, 就是管道的意思, 在netty中, 事件在pipeline中傳輸, 用戶可以中斷事件, 添加自己的事件處理邏輯, 可以直接將事件中斷不再往下傳輸, 同樣可以改變管道的流向, 傳遞其他事件.這里有點(diǎn)類似于Spring的AOP, 但是比AOP實(shí)現(xiàn)起來(lái)簡(jiǎn)單的多
事件通常分為兩種, 一是inBound事件, 另一種是outBound事件, inBound事件, 顧名思義, 就是從另一端流向自己的事件, 比如讀事件, 連接完成事件等等, outBound, 是從自己流向另一端的事件, 比如連接事件, 寫事件, 刷新緩沖區(qū)事件等等
在netty中, 事件是通過(guò)handler對(duì)象進(jìn)行處理的, 里面封裝著事件的處理邏輯.而每個(gè)handler, 是由HandlerContext進(jìn)行包裝的, 里面封裝了對(duì)事件傳輸?shù)牟僮?/p>
通過(guò)之前的學(xué)習(xí), 我們知道每一個(gè)channel綁定一個(gè)pipeline, 那么pipeline和handler又是什么關(guān)系呢?
其實(shí)pipeline我們可以理解成是一個(gè)雙向鏈表的數(shù)據(jù)結(jié)構(gòu), 只是其中存放的并不是數(shù)據(jù)而是HandlerContext, 而HandlerContext又包裝了handler, 事件傳輸過(guò)程中, 從頭結(jié)點(diǎn)(或者尾節(jié)點(diǎn))開始, 找到下一個(gè)HandlerContext, 執(zhí)行其Handler的業(yè)務(wù)邏輯, 然后再繼續(xù)往下走, 直到執(zhí)行到尾節(jié)點(diǎn)(或者頭結(jié)點(diǎn), 反向)為止, 通過(guò)一幅圖了解其大概邏輯:

這里head代表pipeline的頭結(jié)點(diǎn), tail代表pipeline的尾節(jié)點(diǎn), 這兩個(gè)節(jié)點(diǎn)是會(huì)隨著pipeline的初始化而創(chuàng)建, 并且不會(huì)被刪除
HandlerContext的簡(jiǎn)單繼承關(guān)系比較簡(jiǎn)單, 默認(rèn)的是DefaultChannelHandlerContext, 繼承于AbstractChannelHandlerContext
而Handler分為InboundHandler和outBoundHandler, Inbound專門處理inbound事件, outBound專門用于處理outBound事件
繼承關(guān)系如下圖:

從圖中不難看出, 如果屬于ChannelInboundHandler的子類, 則屬于Inbound類型的handler
如果是ChannelOutboundHandler的子類, 則屬于Outbound類型的handler
了解了其大概邏輯, 我們繼續(xù)跟到源碼中, 看其實(shí)如何體現(xiàn)的:
pipeline的創(chuàng)建
回顧之前NioServerSocketChannel的創(chuàng)建過(guò)程
我們看AbstractChannel的構(gòu)造方法:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}我們跟到newChannelPipeline()中:
protected DefaultChannelPipeline newChannelPipeline() {
//傳入當(dāng)前channel
return new DefaultChannelPipeline(this);
}我們看到這里創(chuàng)建了一個(gè)DefaultChannelPipeline, 并將自身channel傳入
繼續(xù)跟DefaultChannelPipeline的構(gòu)造方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}首先保存了當(dāng)前channel
然后保存了兩個(gè)屬性succeededFuture, voidPromise, 這兩個(gè)屬性是Future相關(guān)的內(nèi)容, 之后的章節(jié)會(huì)講到
首先, 這里初始化了兩個(gè)節(jié)點(diǎn), head節(jié)點(diǎn)和tail節(jié)點(diǎn), 在pipeline中代表頭結(jié)點(diǎn)和尾節(jié)點(diǎn)
我們首先跟到這tail節(jié)點(diǎn)類中,也就是TailContext類:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
//inbound處理器
super(pipeline, null, TAIL_NAME, true, false);
//將當(dāng)前節(jié)點(diǎn)設(shè)置為已添加, head和tail.....
setAddComplete();
}
//自身也是handler
@Override
public ChannelHandler handler() {
return this;
}
//方法省略這個(gè)是DefualtPipline的內(nèi)部類, 首先看其繼承了AbstractChannelHandlerContext類, 說(shuō)明自身是個(gè)HandlerContext, 同時(shí)也實(shí)現(xiàn)ChannelInboundHander接口, 并且其中的handler()方法返回了自身, 說(shuō)明自身也是handler, 而實(shí)現(xiàn)ChannelInboundHander, 說(shuō)明自身只處理Inbound事件
構(gòu)造方法中, 調(diào)用了父類的構(gòu)造器, 看其中參數(shù):
pipeline是自身所屬的pipeline
executor為null
TAIL_NAME是當(dāng)前handler, 也就是自身的命名
true代表自身是inboundHandler
fasle代表自身不是outboundHandler
繼續(xù)跟到父類構(gòu)造方法中:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
//名字
this.name = ObjectUtil.checkNotNull(name, "name");
//pipeline
this.pipeline = pipeline;
//線程處理器
this.executor = executor;
//事件標(biāo)識(shí)
this.inbound = inbound;
this.outbound = outbound;
ordered = executor == null || executor instanceof OrderedEventExecutor;
}這里初始化了自身父類的幾個(gè)屬性
pipeline為自身綁定的pipeline
exeutor是線程執(zhí)行器, 這里為空
inbound和outbound是事件標(biāo)志, 這里分別是true和false, 也就是自身屬于inboundHnadler而不屬于outboundHandler
回到DefaultChannelPipeline的構(gòu)造方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}再看HeadContext類:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
}看過(guò)了tail節(jié)點(diǎn), head節(jié)點(diǎn)就不難理解, 同樣繼承了AbstractChannelHandlerContext, 說(shuō)明自身是一個(gè)HandlerContext, 與tail不同的是, 這里實(shí)現(xiàn)了ChannelOutboundHandler接口和ChannelOutboundHandler接口, 說(shuō)明其既能處理inbound事件也能處理outbound的事件, handler方法返歸自身, 說(shuō)明自身是一個(gè)handler
在構(gòu)造方法中初始化了一個(gè)Unsafe類型的成員變量, 是通過(guò)自身綁定的channel拿到的, 說(shuō)明這個(gè)類中可以進(jìn)行對(duì)channel的讀寫操作
這里同樣調(diào)用了父類的構(gòu)造方法, 不同的是, 這里inbound參數(shù)傳入了false, 而outbound參數(shù)傳入了true, 這里說(shuō)明這里標(biāo)志的事件是outbound事件
同學(xué)們可能疑惑, 為什么同時(shí)執(zhí)行ChannelOutboundHandler接口和ChannelOutboundHandler但是標(biāo)志的事件不同?
其實(shí)這兩個(gè)地方應(yīng)用的場(chǎng)景是不同的, 繼承ChannelOutboundHandler和ChannelOutboundHandler, 說(shuō)明其既能處理inbound事件也能處理outBound的事件, 但是只有outbound屬性為true說(shuō)明自身是一個(gè)outboundhandler, 是一個(gè)可以處理inbound事件的outboundhandler(估計(jì)被繞暈了), 這兩種handler主要是保證在事件傳輸中保證事件的單方向流動(dòng), 在后面事件傳輸我們能領(lǐng)會(huì)到
再跟進(jìn)父類的構(gòu)造方法, 又是我們熟悉的部分:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
//名字
this.name = ObjectUtil.checkNotNull(name, "name");
//pipeline
this.pipeline = pipeline;
//線程處理器
this.executor = executor;
//事件標(biāo)識(shí)
this.inbound = inbound;
this.outbound = outbound;
ordered = executor == null || executor instanceof OrderedEventExecutor;
}初始化了pipeline, executor, 和事件標(biāo)識(shí)的屬性
回到DefaultChannelPipeline的構(gòu)造方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}我們介紹完了head, 和tail這兩個(gè)context, 繼續(xù)往下看:
head.next = tail;
tail.prev = head;
tail節(jié)點(diǎn)和head節(jié)點(diǎn)中的next和prev屬性, 其實(shí)是其父類AbstractChannelHandlerContext, 每一個(gè)handlerContext都擁有這兩個(gè)屬性, 代表自身的下一個(gè)節(jié)點(diǎn)和上一個(gè)節(jié)點(diǎn), 因?yàn)槲覀兏攀鲋薪榻B過(guò)pipeline其實(shí)是一個(gè)雙向鏈表, 所以其中每一個(gè)節(jié)點(diǎn)必須有指向其他節(jié)點(diǎn)的指針, 熟悉雙向鏈接數(shù)據(jù)結(jié)構(gòu)的同學(xué)應(yīng)該不會(huì)陌生
這里head節(jié)點(diǎn)的next屬性是tail節(jié)點(diǎn), tail節(jié)點(diǎn)的prev屬性是head, 說(shuō)明當(dāng)前雙向鏈表只有兩個(gè)節(jié)點(diǎn), head和tail, 其中head下一個(gè)節(jié)點(diǎn)指向tail, tail的上一個(gè)節(jié)點(diǎn)指向head, 如圖所示:

以上就是pipeline的初始化過(guò)程,更多關(guān)于Netty分布式pipeline管道創(chuàng)建的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot?模板模式實(shí)現(xiàn)優(yōu)惠券邏輯的示例代碼
這篇文章主要介紹了SpringBoot?模板模式實(shí)現(xiàn)優(yōu)惠券邏輯,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
JavaWeb實(shí)現(xiàn)注冊(cè)用戶名檢測(cè)
這篇文章主要為大家詳細(xì)介紹了JavaWeb實(shí)現(xiàn)注冊(cè)用戶名檢測(cè),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08
Java 最優(yōu)二叉樹的哈夫曼算法的簡(jiǎn)單實(shí)現(xiàn)
這篇文章主要介紹了Java 最優(yōu)二叉樹的哈夫曼算法的簡(jiǎn)單實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10
SpringBoot整合Redis使用注解進(jìn)行緩存方式
文章介紹了使用Redis進(jìn)行數(shù)據(jù)緩存的幾種方式,包括手動(dòng)配置RedisTemplate、使用Spring的Caching模塊以及配置自定義的RedisCacheManager2025-03-03
Spring Boot Maven Plugin打包異常解決方案
這篇文章主要介紹了Spring Boot Maven Plugin打包異常解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
java項(xiàng)目jar包與jdk的版本不兼容的問(wèn)題解決
這篇文章主要介紹了java項(xiàng)目jar包與jdk的版本不兼容的問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10
Java網(wǎng)絡(luò)編程之TCP程序設(shè)計(jì)
這篇文章主要為大家詳細(xì)介紹了Java網(wǎng)絡(luò)編程之TCP程序設(shè)計(jì),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-08-08

