新聞中心
本系列Netty源碼解析文章基于 4.1.56.Final版本。

大家第一眼看到這幅流程圖,是不是腦瓜子嗡嗡的呢?
大家先不要驚慌,問題不大,本文筆者的目的就是要讓大家清晰的理解這幅流程圖,從而深刻的理解Netty Reactor的啟動全流程,包括其中涉及到的各種代碼設計實現(xiàn)細節(jié)。
在上篇文章??《聊聊Netty那些事兒之Reactor在Netty中的實現(xiàn)(創(chuàng)建篇)》??中我們詳細介紹了Netty服務端核心引擎組件主從Reactor組模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的創(chuàng)建過程。最終我們得到了netty Reactor模型的運行骨架如下:
現(xiàn)在Netty服務端程序的骨架是搭建好了,本文我們就基于這個骨架來深入剖析下Netty服務端的啟動過程。
我們繼續(xù)回到上篇文章提到的Netty服務端代碼模板中,在創(chuàng)建完主從Reactor線程組:bossGroup,workerGroup后,接下來就開始配置Netty服務端的啟動輔助類ServerBootstrap了。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
.option(ChannelOption.SO_BACKLOG, 100)//設置主Reactor中channel的option選項
.handler(new LoggingHandler(LogLevel.INFO))//設置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer() {//設置從Reactor中注冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 綁定端口啟動服務,開始監(jiān)聽accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
} 在上篇文章中我們對代碼模板中涉及到ServerBootstrap的一些配置方法做了簡單的介紹,大家如果忘記的話,可以在返回去回顧一下。
ServerBootstrap類其實沒有什么特別的邏輯,主要是對Netty啟動過程中需要用到的一些核心信息進行配置管理,比如:
- Netty的核心引擎組件主從Reactor線程組:bossGroup,workerGroup。通過ServerBootstrap#group方法配置。
- Netty服務端使用到的Channel類型:NioServerSocketChannel ,通過ServerBootstrap#channel方法配置。以及配置NioServerSocketChannel時用到的SocketOption。SocketOption用于設置底層JDK NIO Socket的一些選項。通過ServerBootstrap#option方法進行配置。
主ReactorGroup中的MainReactor管理的Channel類型為NioServerSocketChannel,如圖所示主要用來監(jiān)聽端口,接收客戶端連接,為客戶端創(chuàng)建初始化NioSocketChannel,然后采用round-robin輪詢的方式從圖中從ReactorGroup中選擇一個SubReactor與該客戶端NioSocketChannel進行綁定。
從ReactorGroup中的SubReactor管理的Channel類型為NioSocketChannel,它是netty中定義客戶端連接的一個模型,每個連接對應一個。如圖所示SubReactor負責監(jiān)聽處理綁定在其上的所有NioSocketChannel上的IO事件。
- 保存服務端NioServerSocketChannel和客戶端NioSocketChannel對應pipeline中指定的ChannelHandler。用于后續(xù)Channel向Reactor注冊成功之后,初始化Channel里的pipeline。
不管是服務端用到的NioServerSocketChannel還是客戶端用到的NioSocketChannel,每個Channel實例都會有一個Pipeline,Pipeline中有多個ChannelHandler用于編排處理對應Channel上感興趣的IO事件。
ServerBootstrap結(jié)構中包含了netty服務端程序啟動的所有配置信息,在我們介紹啟動流程之前,先來看下ServerBootstrap的源碼結(jié)構:
ServerBootstrap
ServerBootstrap的繼承結(jié)構比較簡單,繼承層次的職責分工也比較明確。
ServerBootstrap主要負責對主從Reactor線程組相關的配置進行管理,其中帶child前綴的配置方法是對從Reactor線程組的相關配置管理。從Reactor線程組中的Sub Reactor負責管理的客戶端NioSocketChannel相關配置存儲在ServerBootstrap結(jié)構中。
父類AbstractBootstrap則是主要負責對主Reactor線程組相關的配置進行管理,以及主Reactor線程組中的Main Reactor負責處理的服務端ServerSocketChannel相關的配置管理。
一、 配置主從Reactor線程組
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
public class ServerBootstrap extends AbstractBootstrap{
//Main Reactor線程組
volatile EventLoopGroup group;
//Sub Reactor線程組
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//父類管理主Reactor線程組
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}
二、 配置服務端ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap{
//用于創(chuàng)建ServerSocketChannel ReflectiveChannelFactory
private volatile ChannelFactory extends C> channelFactory;
public B channel(Class extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
@Deprecated
public B channelFactory(ChannelFactory extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
}
在向ServerBootstrap配置服務端ServerSocketChannel的channel方法中,其實是創(chuàng)建了一個ChannelFactory工廠實例ReflectiveChannelFactory,在Netty服務端啟動的過程中,會通過這個ChannelFactory去創(chuàng)建相應的Channel實例。
我們可以通過這個方法來配置netty的IO模型,下面為ServerSocketChannel在不同IO模型下的實現(xiàn):
EventLoopGroupReactor線程組在不同IO模型下的實現(xiàn):
我們只需要將IO模型的這些核心接口對應的實現(xiàn)類前綴改為對應IO模型的前綴,就可以輕松在Netty中完成對IO模型的切換。
1、 ReflectiveChannelFactory
public class ReflectiveChannelFactoryimplements ChannelFactory {
//NioServerSocketChannelde 構造器
private final Constructor extends T> constructor;
public ReflectiveChannelFactory(Class extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//反射獲取NioServerSocketChannel的構造器
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
//創(chuàng)建NioServerSocketChannel實例
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}
從類的簽名我們可以看出,這個工廠類是通過泛型加反射的方式來創(chuàng)建對應的Channel實例。
- 泛型參數(shù)T extends Channel表示的是要通過工廠類創(chuàng)建的Channel類型,這里我們初始化的是NioServerSocketChannel。
- 在ReflectiveChannelFactory的構造器中通過反射的方式獲取NioServerSocketChannel的構造器。
- 在newChannel方法中通過構造器反射創(chuàng)建NioServerSocketChannel實例。
注意這時只是配置階段,NioServerSocketChannel此時并未被創(chuàng)建。它是在啟動的時候才會被創(chuàng)建出來。
三、為NioServerSocketChannel配置ChannelOption
ServerBootstrap b = new ServerBootstrap();
//設置被MainReactor管理的NioServerSocketChannel的Socket選項
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
//serverSocketChannel中的ChannelOption配置
private final Map, Object> options = new LinkedHashMap, Object>();
public B option(ChannelOption option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
} 無論是服務端的NioServerSocketChannel還是客戶端的NioSocketChannel它們的相關底層Socket選項ChannelOption配置全部存放于一個Map類型的數(shù)據(jù)結(jié)構中。
由于客戶端NioSocketChannel是由從Reactor線程組中的Sub Reactor來負責處理,所以涉及到客戶端NioSocketChannel所有的方法和配置全部是以child前綴開頭。
ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap{
//客戶端SocketChannel對應的ChannelOption配置
private final Map, Object> childOptions = new LinkedHashMap , Object>();
publicServerBootstrap childOption(ChannelOption childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
}
相關的底層Socket選項,netty全部枚舉在ChannelOption類中,筆者這里就不一一列舉了,在本系列后續(xù)相關的文章中,筆者還會為大家詳細的介紹這些參數(shù)的作用。
public class ChannelOptionextends AbstractConstant > {
..................省略..............
public static final ChannelOptionSO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOptionSO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOptionSO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOptionSO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOptionSO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOptionSO_LINGER = valueOf("SO_LINGER");
public static final ChannelOptionSO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOptionSO_TIMEOUT = valueOf("SO_TIMEOUT");
..................省略..............
}
四、為服務端NioServerSocketChannel中的Pipeline配置ChannelHandler
//serverSocketChannel中pipeline里的handler(主要是acceptor)
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
向NioServerSocketChannel中的Pipeline添加ChannelHandler分為兩種方式:
- 顯式添加:顯式添加的方式是由用戶在main線程中通過ServerBootstrap#handler的方式添加。如果需要添加多個ChannelHandler,則可以通過ChannelInitializer向pipeline中進行添加。
關于ChannelInitializer后面筆者會有詳細介紹,這里大家只需要知道ChannelInitializer是一種特殊的ChannelHandler,用于初始化pipeline。適用于向pipeline中添加多個ChannelHandler的場景。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(channelhandler1)
.addLast(channelHandler2)
......
.addLast(channelHandler3);
}
})
- 隱式添加:隱式添加主要添加的就是主ReactorGroup的核心組件也就是下圖中的acceptor,Netty中的實現(xiàn)為ServerBootstrapAcceptor,本質(zhì)上也是一種ChannelHandler,主要負責在客戶端連接建立好后,初始化客戶端NioSocketChannel,在從Reactor線程組中選取一個Sub Reactor,將客戶端NioSocketChannel 注冊到Sub Reactor中的selector上。
隱式添加ServerBootstrapAcceptor是由Netty框架在啟動的時候負責添加,用戶無需關心。
在本例中,NioServerSocketChannel的PipeLine中只有兩個ChannelHandler,一個由用戶在外部顯式添加的LoggingHandler,另一個是由Netty框架隱式添加的ServerBootstrapAcceptor。
其實我們在實際項目使用的過程中,不會向netty服務端NioServerSocketChannel添加額外的ChannelHandler,NioServerSocketChannel只需要專心做好自己最重要的本職工作接收客戶端連接就好了。這里額外添加一個LoggingHandler只是為了向大家展示ServerBootstrap的配置方法。
五、 為客戶端NioSocketChannel中的Pipeline配置ChannelHandler
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer() {//設置從Reactor中注冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
//socketChannel中pipeline中的處理handler
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
向客戶端NioSocketChannel中的Pipeline里添加ChannelHandler完全是由用戶自己控制顯式添加,添加的數(shù)量不受限制。
由于在Netty的IO線程模型中,是由單個Sub Reactor線程負責執(zhí)行客戶端NioSocketChannel中的Pipeline,一個Sub Reactor線程負責處理多個NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler添加的太多,就會影響Sub Reactor線程執(zhí)行其他NioSocketChannel上的Pipeline,從而降低IO處理效率,降低吞吐量。
所以Pipeline中的ChannelHandler不易添加過多,并且不能再ChannelHandler中執(zhí)行耗時的業(yè)務處理任務。
在我們通過ServerBootstrap配置netty服務端啟動信息的時候,無論是向服務端NioServerSocketChannel的pipeline中添加ChannelHandler,還是向客戶端NioSocketChannel的pipeline中添加ChannelHandler,當涉及到多個ChannelHandler添加的時候,我們都會用到ChannelInitializer,那么這個ChannelInitializer究竟是何方圣神,為什么要這樣做呢?我們接著往下看。
ChannelInitializer
首先ChannelInitializer它繼承于ChannelHandler,它自己本身就是一個ChannelHandler,所以它可以添加到childHandler中。
其他的父類大家這里可以不用管,后面文章中筆者會一一為大家詳細介紹。
那為什么不直接添加ChannelHandler而是選擇用ChannelInitializer呢?
這里主要有兩點原因:
- 前邊我們提到,客戶端NioSocketChannel是在服務端accept連接后,在服務端NioServerSocketChannel中被創(chuàng)建出來的。但是此時我們正處于配置ServerBootStrap階段,服務端還沒有啟動,更沒有客戶端連接上來,此時客戶端NioSocketChannel還沒有被創(chuàng)建出來,所以也就沒辦法向客戶端NioSocketChannel的pipeline中添加ChannelHandler。
- 客戶端NioSocketChannel中Pipeline里可以添加任意多個ChannelHandler,但是Netty框架無法預知用戶到底需要添加多少個ChannelHandler,所以Netty框架提供了回調(diào)函數(shù)ChannelInitializer#initChannel,使用戶可以自定義ChannelHandler的添加行為。
當客戶端NioSocketChannel注冊到對應的Sub Reactor上后,緊接著就會初始化NioSocketChannel中的Pipeline,此時Netty框架會回調(diào)ChannelInitializer#initChannel執(zhí)行用戶自定義的添加邏輯。
public abstract class ChannelInitializerextends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//當channelRegister事件發(fā)生時,調(diào)用initChannel初始化pipeline
if (initChannel(ctx)) {
.................省略...............
} else {
.................省略...............
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//此時客戶單NioSocketChannel已經(jīng)創(chuàng)建并初始化好了
initChannel((C) ctx.channel());
} catch (Throwable cause) {
.................省略...............
} finally {
.................省略...............
}
return true;
}
return false;
}
protected abstract void initChannel(C ch) throws Exception;
.................省略...............
}
這里由netty框架回調(diào)的ChannelInitializer#initChannel方法正是我們自定義的添加邏輯。
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer() {//設置從Reactor中注冊channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
到此為止,Netty服務端啟動所需要的必要配置信息,已經(jīng)全部存入ServerBootStrap啟動輔助類中。
接下來要做的事情就是服務端的啟動了。
// Start the server. 綁定端口啟動服務,開始監(jiān)聽accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();
Netty服務端的啟動
經(jīng)過前面的鋪墊終于來到了本文的核心內(nèi)容----Netty服務端的啟動過程。
如代碼模板中的示例所示,Netty服務端的啟動過程封裝在io.netty.bootstrap.AbstractBootstrap#bind(int)函數(shù)中。
接下來我們看一下Netty服務端在啟動過程中究竟干了哪些事情?
大家看到這副啟動流程圖先不要慌,接下來的內(nèi)容筆者會帶大家各個擊破它,在文章的最后保證讓大家看懂這副流程圖。
我們先來從netty服務端啟動的入口函數(shù)開始我們今天的源碼解析旅程:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
//校驗Netty核心組件是否配置齊全
validate();
//服務端開始啟動,綁定端口地址,接收客戶端連接
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//異步創(chuàng)建,初始化,注冊ServerSocketChannel到main reactor上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
........serverSocketChannel向Main Reactor注冊成功后開始綁定端口....,
} else {
//http://www.fisionsoft.com.cn/article/cdsohce.html


咨詢
建站咨詢
