netty服務(wù)端輔助類ServerBootstrap創(chuàng)建邏輯分析
ServerBootstrap創(chuàng)建
ServerBootstrap 為 netty 建立服務(wù)端的輔助類, 以 NIO為例,創(chuàng)建代碼如下:
public static void main(String[] args) throws Exception {
ServerBootstrap bs = new ServerBootstrap();
bs.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65535))
.addLast(new Controller());
}
}).bind(8080).sync().channel().closeFuture().sync();
}核心參數(shù)
//配置屬性,如 SO_KEEPALIVE 等private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
//acceot 的 子channel所綁定的 事件循環(huán)組"
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;初始化流程
主要為 綁定本地端口 -> 注冊(cè)自身到 EventLoop , 并注冊(cè) accept 和 read 事件 -> EventLoop的主循環(huán)中會(huì)不斷的select注冊(cè)的channel的事件,并處理。
首先執(zhí)行綁定
核心邏輯位于
io.netty.bootstrap.AbstractBootstrap.doBind(SocketAddress) 和 io.netty.bootstrap.AbstractBootstrap.initAndRegister()中
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
..........if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//綁定邏輯
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}注冊(cè)自身到 EventLoop
先來看 initAndRegister , 核心邏輯就是利用channelFactory初始化一個(gè)NioServerSocketChannel實(shí)例,并為其設(shè)置上config中的參數(shù),然后將其注冊(cè)到EventLoop中,實(shí)際上是委托的channel的Unsafe來實(shí)現(xiàn)注冊(cè)的,核心邏輯位于 AbstractUnsafe.register0 中 完成注冊(cè)
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//本例子中實(shí)際調(diào)用的是 NioServerSocketChannel的構(gòu)造參數(shù), 并為其設(shè)置感興趣的事件類型為 OP_ACCEPT
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
} void init(Channel channel) throws Exception {
//設(shè)置屬性
..........
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//為NioServerSocketChannel 設(shè)置一個(gè) 默認(rèn)的 channelhandler : ServerBootstrapAcceptor , 當(dāng)發(fā)生 accept事件時(shí),將 accept的channel注冊(cè)到 childEventLoop中
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//執(zhí)行channel到 eventloop的 selector
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);//觸發(fā) InboundChannelHnader.channelRegistered 事件
pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) {
//觸發(fā)channelActive事件,并會(huì)為 channel 綁定上 read 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}綁定端口邏輯
initAndRegister注冊(cè)成功后,開始執(zhí)行真正的綁定端口邏輯,核心邏輯位于 NioSocketChannel.doBind0(SocketAddress) 中
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}至此 綁定個(gè)成功, 當(dāng)觸發(fā) ACCEPT 事件時(shí), 會(huì)觸發(fā) NioServerSocketChannel.doReadMessages -> ServerBootstrapAcceptor.channelRead , 并將 子channel 注冊(cè)到 childEventLoop中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//注冊(cè)channel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}以上就是netty服務(wù)端輔助類ServerBootstrap創(chuàng)建邏輯分析的詳細(xì)內(nèi)容,更多關(guān)于netty輔助類ServerBootstrap創(chuàng)建邏輯的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于SpringBoot多線程@Async的使用體驗(yàn)
這篇文章主要介紹了SpringBoot多線程@Async的使用體驗(yàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
spring事務(wù)之事務(wù)掛起和事務(wù)恢復(fù)源碼解讀
這篇文章主要介紹了spring事務(wù)之事務(wù)掛起和事務(wù)恢復(fù)源碼解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-11-11
Java之MyBatis的Dao方式以及Dao動(dòng)態(tài)代理詳解
這篇文章主要介紹了Java之MyBatis的Dao方式以及Dao動(dòng)態(tài)代理詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12
使用Java自帶的mail?API實(shí)現(xiàn)郵件發(fā)送功能全過程
電子郵件的應(yīng)用非常廣泛,例如在某網(wǎng)站注冊(cè)了一個(gè)賬戶,自動(dòng)發(fā)送一封歡迎郵件,通過郵件找回密碼,自動(dòng)批量發(fā)送活動(dòng)信息等,下面這篇文章主要給大家介紹了關(guān)于如何使用Java自帶的mail?API實(shí)現(xiàn)郵件發(fā)送功能的相關(guān)資料,需要的朋友可以參考下2023-04-04
淺析Bean?Searcher?與?MyBatis?Plus?區(qū)別介紹
Bean?Searcher號(hào)稱任何復(fù)雜的查詢都可以一行代碼搞定,但?Mybatis?Plus?似乎也有類似的動(dòng)態(tài)查詢功能,最近火起的?Bean?Searcher?與?MyBatis?Plus?倒底有啥區(qū)別?帶著這個(gè)問題一起通過本文學(xué)習(xí)下吧2022-05-05
MyBatis-Plus實(shí)現(xiàn)多表聯(lián)查的方法實(shí)戰(zhàn)
這篇文章主要給大家介紹了關(guān)于MyBatis-Plus實(shí)現(xiàn)多表聯(lián)查的方法,MyBatis Plus是一款針對(duì)MyBatis框架的增強(qiáng)工具,它提供了很多方便的方法來實(shí)現(xiàn)多表聯(lián)查,需要的朋友可以參考下2023-07-07

