Netty分布式pipeline管道傳播事件的邏輯總結(jié)分析
我們在第一章和第三章中, 遺留了很多有關(guān)事件傳輸?shù)南嚓P(guān)邏輯, 這里帶大家一一回顧
問題分析
首先看兩個問題:
1.在客戶端接入的時候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))為什么會調(diào)用到ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor中的channelRead()方法
2.客戶端handler是什么時候被添加的?
首先看第一個問題
1.在客戶端接入的時候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))為什么會調(diào)用到ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor中的channelRead()方法?
我們首先看這段代碼:
public void read() {
//必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用
assert eventLoop().inEventLoop();
//服務(wù)端channel的config
final ChannelConfig config = config();
//服務(wù)端channel的pipeline
final ChannelPipeline pipeline = pipeline();
//處理服務(wù)端接入的速率
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
//設(shè)置配置
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//創(chuàng)建jdk底層的channel
//readBuf用于臨時承載讀到鏈接
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//分配器將讀到的鏈接進(jìn)行計數(shù)
allocHandle.incMessagesRead(localRead);
//連接數(shù)是否超過最大值
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
//遍歷每一條客戶端連接
for (int i = 0; i < size; i ++) {
readPending = false;
//傳遞事件, 將創(chuàng)建NioSokectChannel進(jìn)行傳遞
//最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//代碼省略
} finally {
//代碼省略
}
}重點看pipeline.fireChannelRead(readBuf.get(i))
首先, 這里pipeline是服務(wù)端channel的pipeline, 也就是NioServerSocketChannel的pipeline
我們學(xué)習(xí)過pipeline之后, 對這種寫法并不陌生, 就是傳遞channelRead事件, 這里通過傳遞channelRead事件走到了ServerBootstrapAcceptor的channelRead()方法, 說明在這步之前, ServerBootstrapAcceptor作為一個handler添加到了服務(wù)端channel的pipeline中, 那么這個handler什么時候添加的呢?
我們回顧下第一章, 初始化NioServerSocketChannel的時候, 調(diào)用了ServerBootstrap的init方法:
void init(Channel channel) throws Exception {
//獲取用戶定義的選項(1)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
//獲取用戶定義的屬性(2)
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//獲取channel的pipline(3)
ChannelPipeline p = channel.pipeline();
//work線程組(4)
final EventLoopGroup currentChildGroup = childGroup;
//用戶設(shè)置的Handler(5)
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//選項轉(zhuǎn)化為Entry對象(6)
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
//屬性轉(zhuǎn)化為Entry對象(7)
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//添加服務(wù)端handler(8)
p.addLast(new ChannelInitializer<Channel>() {
//初始化channel
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}這個方法比較長, 我們重點關(guān)注第8步, 添加服務(wù)端channel, 這里的pipeline, 是服務(wù)服務(wù)端channel的pipeline, 也就是NioServerSocketChannel綁定的pipeline, 這里添加了一個ChannelInitializer類型的handler
我們看一下ChannelInitializer這個類的繼承關(guān)系
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
//省略類體
}我們看到其繼承了ChannelInboundHandlerAdapter, 說明是一個inbound類型的handler
這里我們可能會想到, 添加完handler會執(zhí)行handlerAdded, 然后再handlerAdded方法中做了添加ServerBootstrapAcceptor這個handler
但是, 實際上并不是這樣的, 當(dāng)程序執(zhí)行到這里, 并沒有馬上執(zhí)行handlerAdded, 我們緊跟addLast方法
最后會跟到DefualtChannelPipeline的一個addLast方法中去:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//判斷handler是否被重復(fù)添加(1)
checkMultiplicity(handler);
//創(chuàng)建一個HandlerContext并添加到列表(2)
newCtx = newContext(group, filterName(name, handler), handler);
//添加HandlerContext(3)
addLast0(newCtx);
//是否已注冊
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
//回調(diào)用戶事件
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//回調(diào)添加事件(4)
callHandlerAdded0(newCtx);
return this;
}首先完成了handler的添加, 但是并沒有馬上執(zhí)行回調(diào)
這里我們重點關(guān)注if (!registered)這個條件判斷, 其實在注冊完成, registered會變成true, 但是走到這一步的時候NioServerSockeChannel并沒有完成注冊(可以回顧第一章看注冊在哪一步), 所以會進(jìn)到if里并返回自身
我們重點關(guān)注callHandlerCallbackLater這個方法, 我們跟進(jìn)去:
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
//判斷是否已添加, 未添加, 進(jìn)行添加, 已添加進(jìn)行刪除
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
//獲取第一個Callback任務(wù)
PendingHandlerCallback pending = pendingHandlerCallbackHead;
//如果第一個Callback任務(wù)為空
if (pending == null) {
//將第一個任務(wù)設(shè)置為剛創(chuàng)建的任務(wù)
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}因我們調(diào)用這個方法的時候added傳的true, 所以PendingHandlerCallback task賦值為new PendingHandlerAddedTask(ctx)
PendingHandlerAddedTask這個類, 我們從名字可以看出, 這是一個handler添加的延遲任務(wù), 用于執(zhí)行handler延遲添加的操作, 同樣也對應(yīng)一個名字為PendingHandlerRemovedTask的類, 用于執(zhí)行延遲刪除handler的操作, 這兩個類都繼承抽象類PendingHandlerCallback
我們看PendingHandlerAddedTask類構(gòu)造方法:
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}這里調(diào)用了父類的構(gòu)造方法, 再跟進(jìn)去:
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}在父類中, 保存了要添加的context, 也就是ChannelInitializer類型的包裝類
回到callHandlerCallbackLater方法中
PendingHandlerCallback pending = pendingHandlerCallbackHead;
這表示獲取第一個PendingHandlerCallback的任務(wù), 其實PendingHandlerCallback是一個單向鏈表, 自身維護(hù)一個PendingHandlerCallback類型的next, 指向下一個任務(wù), 在DefaultChannelPipeline這個類中, 定義了個PendingHandlerCallback類型的引用pendingHandlerCallbackHead, 用來指向延遲回調(diào)任務(wù)的中的第一個任務(wù)
之后判斷這個任務(wù)是為空, 如果是第一次添加handler, 那么這里就是空, 所以將第一個任務(wù)賦值為我們剛創(chuàng)建的添加任務(wù)
如果不是第一次添加handler, 則將我們新創(chuàng)建的任務(wù)添加到鏈表的尾部, 因為這里我們是第一次添加, 所以第一個回調(diào)任務(wù)就指向了我們創(chuàng)建的添加handler的任務(wù)
完成這一系列操作之后, addLast方法返歸, 此時并沒有完成添加操作
而什么時候完成添加操作的呢?
在服務(wù)端channel注冊時候的會走到AbstractChannel的register0方法:
private void register0(ChannelPromise promise) {
try {
//做實際的注冊(1)
doRegister();
neverRegistered = false;
registered = true;
//觸發(fā)事件(2)
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//觸發(fā)注冊成功事件(3)
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
//傳播active事件(4)
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
//省略代碼
}
}重點關(guān)注第二步pipeline.invokeHandlerAddedIfNeeded(), 這里已經(jīng)通過doRegister()方法完成了實際的注冊, 我們跟到該方法中:
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}這里會判斷是否第一次注冊, 這里返回true, 然后會執(zhí)行callHandlerAddedForAllHandlers()方法, 我們跟進(jìn)去:
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
//獲取task
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
//執(zhí)行添加handler方法
task.execute();
task = task.next;
}
}這里拿到第一個延遲執(zhí)行handler添加的task其實就是我們之前剖析過的, 延遲執(zhí)行handler添加的task, 就是PendingHandlerAddedTask對象
在while循環(huán)中, 通過執(zhí)行execute()方法將handler添加
我們跟到PendingHandlerAddedTask的execute()方法中:
void execute() {
//獲取當(dāng)前eventLoop線程
EventExecutor executor = ctx.executor();
//是當(dāng)前執(zhí)行的線程
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
//添加到隊列
executor.execute(this);
} catch (RejectedExecutionException e) {
//代碼省略
}
}
}終于在這里, 我們看到了執(zhí)行回調(diào)的方法
再回到init方法中:
void init(Channel channel) throws Exception {
//獲取用戶定義的選項(1)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
//獲取用戶定義的屬性(2)
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//獲取channel的pipline(3)
ChannelPipeline p = channel.pipeline();
//work線程組(4)
final EventLoopGroup currentChildGroup = childGroup;
//用戶設(shè)置的Handler(5)
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//選項轉(zhuǎn)化為Entry對象(6)
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
//屬性轉(zhuǎn)化為Entry對象(7)
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//添加服務(wù)端handler(8)
p.addLast(new ChannelInitializer<Channel>() {
//初始化channel
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}我們繼續(xù)看第8步添加服務(wù)端handler
因為這里的handler是ChannelInitializer, 所以完成添加之后會調(diào)用ChannelInitializer的handlerAdded方法
跟到handlerAdded方法:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//默認(rèn)情況下, 會返回true
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}因為執(zhí)行到這步服務(wù)端channel已經(jīng)完成注冊, 所以會執(zhí)行到initChannel方法
跟到initChannel方法:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
//這段代碼是否被執(zhí)行過
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
//調(diào)用之后會刪除當(dāng)前節(jié)點
remove(ctx);
}
return true;
}
return false;
}我們關(guān)注initChannel這個方法, 這個方法是在ChannelInitializer的匿名內(nèi)部來實現(xiàn)的, 這里我們注意, 在initChannel方法執(zhí)行完畢之后會調(diào)用remove(ctx)刪除當(dāng)前節(jié)點
我們繼續(xù)跟進(jìn)initChannel方法:
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}這里首先添加用戶自定義的handler, 這里如果用戶沒有定義, 則添加不成功, 然后, 會調(diào)用addLast將ServerBootstrapAcceptor這個handler添加了進(jìn)去, 同樣這個handler也繼承了ChannelInboundHandlerAdapter, 在這個handler中, 重寫了channelRead方法, 所以, 這就是第一個問題的答案
緊接著我們看第二個問題
2.客戶端handler是什么時候被添加的?
我們這里看ServerBootstrapAcceptor的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//添加channelHadler, 這個channelHandler, 就是用戶代碼添加的ChannelInitializer
child.pipeline().addLast(childHandler);
//代碼省略
try {
//work線程注冊channel
childGroup.register(child).addListener(new ChannelFutureListener() {
//代碼省略
});
} catch (Throwable t) {
forceClose(child, t);
}
}這里真相可以大白了, 服務(wù)端再創(chuàng)建完客戶端channel之后, 將新創(chuàng)建的NioSocketChannel作為參數(shù)觸發(fā)channelRead事件(可以回顧NioMessageUnsafe的read方法, 代碼這里就不貼了), 所以這里的參數(shù)msg就是NioSocketChannel
拿到channel時候再將客戶端的handler添加進(jìn)去, 我們回顧客戶端handler的添加過程:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});和服務(wù)端channel的邏輯一樣, 首先會添加ChannelInitializer這個handler但是沒有注冊所以沒有執(zhí)行添加handler的回調(diào), 將任務(wù)保存到一個延遲回調(diào)的task中
等客戶端channel注冊完畢, 會將執(zhí)行添加handler的回調(diào), 也就是handlerAdded方法, 在回調(diào)中執(zhí)行initChannel方法將客戶端handler添加進(jìn)去, 然后刪除ChannelInitializer這個handler
因為在服務(wù)端channel中這塊邏輯已經(jīng)進(jìn)行了詳細(xì)的剖析, 所以這邊就不在贅述, 同學(xué)們可以自己跟進(jìn)去走一遍流程
這里注意, 因為每創(chuàng)建一個NioSoeketChannel都會調(diào)用服務(wù)端ServerBootstrapAcceptor的channelRead方法, 所以這里會將每一個NioSocketChannel的handler進(jìn)行添加
章節(jié)總結(jié)
本章剖析了事件傳輸?shù)南嚓P(guān)邏輯, 包括handler的添加, 刪除, inbound和outbound以及異常事件的傳輸, 最后結(jié)合第一章和第三章, 剖析了服務(wù)端channel和客戶端channel的添加過程, 同學(xué)們可以課后跟進(jìn)源碼, 將這些功能自己再走一遍以加深印象.其他的有關(guān)事件傳輸?shù)倪壿? 可以結(jié)合這一章的知識點進(jìn)行自行剖析
更多關(guān)于Netty分布式pipeline管道傳播事件的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot關(guān)于自定義stater的yml無法提示問題解決方案
這篇文章主要介紹了Springboot關(guān)于自定義stater的yml無法提示問題及解決方案,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-06-06
Java用戶交互scanner及運算結(jié)構(gòu)代碼詳解
這篇文章主要介紹了Java用戶交互scanner及運算結(jié)構(gòu)代碼詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-12-12
MyBatis動態(tài)SQL標(biāo)簽的用法詳解
這篇文章主要介紹了MyBatis動態(tài)SQL標(biāo)簽的用法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
如何使用Jenkins編譯并打包SpringCloud微服務(wù)目錄
這篇文章主要介紹了如何使用Jenkins編譯并打包SpringCloud微服務(wù)目錄,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11

