Netty源码-3-ServerBootstrap

前面的两篇博客分别分析了NioEventLoopGroup和NioEventLoop这两个类的创建以及重要功能,这为这篇博客全面分析Netty的启动类ServerBootstrap奠定了基础。

先回顾一下Server的核心启动代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

可以看到创建ServerBootstrap的过程,最后调用b.bind(PORT)将服务绑定到端口上。

ServerBootstrap的构造函数什么也没有干,核心的内容都在bind方法中,所以接下来我们重点分析bind方法,看看netty究竟是怎么启动的。

bind方法

所有的重载的bind方法最终都调用了doBind方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

这里面干了两件事情:

  • 调用initAndRegister方法创建Channel并注册Nio事件
  • 调用doBind0触发pipeline中的事件链

initAndRegister方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

这个方法里面主要干了三件事情:

  • 使用channelFactory创建channel
  • 使用init方法初始化channel
  • 最后调用config().group().register(channel)将刚刚创建的channel绑定到一个EventLoop上(就是一个NioEventLoop上)

1. channelFactory创建channel

首先弄清楚channelFactory是什么,当我们调用ServerBootstrap的channel方法时:

1
2
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)

调用的实际上是父类AbstractBootstrap的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}

this.channelFactory = channelFactory;
return self();
}

调用AbstractBootstrap#channel方法时,创建了一个ReflectiveChannelFactory对象,并最终赋值给了channelFactory。

到这里我们就知道了channelFactory其实是一个ReflectiveChannelFactory实例,那继续看看ReflectiveChannelFactory是什么东西:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Class<? extends T> clazz;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}

ReflectiveChannelFactory其实就是用反射去创建一个新的Channel,也就是我们传入的NioServerSocketChannel,所以看到这里就明白了,initAndRegister方法初始化和注册的是NioServerSocketChannel这个Channel。

看一下NioServerSocketChannel创建的大致过程:

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

这里调用了父类的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

这里最重要的是,将SelectionKey.OP_ACCEPT赋值给了readInterestOp属性,这个在之后向Selector中注册的时候会用到。

这里分析一下为什么只关心SelectionKey.OP_ACCEPT事件,记得ServerBootstrap传入了两个EventLoopGroup,分别命名为bossGroup和workerGroup,代码如下:

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

bossGroup和workerGroup的职责不同,bossGroup专门负责接收客户端的链接,一旦连接建立,就会把接下来的io读写工作交给workerGroup,这也是为什么bossGroup在new的时候只需要一个线程了。workerGroup才是执行io读写工作的线程,所以命名为工作线程。NioServerSocketChannel是处理客户端连接的Channel,所以它关心的事件只有SelectionKey.OP_ACCEPT。

在AbstractNioChannel的构造函数中还调用了父类AbstractChannel的构造函数,AbstractChannel构造函数干了三件事情:

1
2
3
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();

比较重要的是创建了unsafe实例和pipeline实例,这里的unsafe不同于sun包中的unsafe,后面会仔细分析,这里只用知道这两个实例被初始化了。pipeline是Netty中另一个核心构建,如果说NioEventLoop是心脏,那pipeline就是血管了。这里创建的pipeline是DefaultChannelPipeline,记住这一点后,后面分析的pipeline的方法实现都在DefaultChannelPipeline中。这里简化了我们的分析,不用去找多个父类和多个实现才知道最终的方法。

当使用channelFactory创建完Channel之后,调用init方法去初始化这个channel。

2. init方法初始化channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 剔除掉一些无用的方法
@Override
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

剔除掉一些不管心的方法之后,这里的功能就比较清晰了,首先通过channel.pipeline()方法获取pipeline,这里的pipeline就是上文中创建的DefaultChannelPipeline。

然后pipeline中增加了一个ChannelHandle,ChannelHandle在对应事件触发的时候回调ChannelHandle里面的一些方法,所以这个类里面的方法是异步执行的。

稍微看一下这个ChannelInitializer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
PlatformDependent.newConcurrentHashMap();

protected abstract void initChannel(C ch) throws Exception;

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (initChannel(ctx)) {
ctx.pipeline().fireChannelRegistered();
} else {
ctx.fireChannelRegistered();
}
}

initChannel方法实际是在channelRegistered被回调的时候调用的,这里mark一下。

在initChannel方法中,最重要的是往自己的事件处理连中添加了一个ServerBootstrapAcceptor。

上文已经分析过了,NioServerSocketChannel的作用其实就是监听并接受客户端的连接请求,连接建立完成之后,就会扔给worker线程。这里的ServerBootstrapAcceptor就是干这个用的,看一下ServerBootstrapAcceptor的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

当NioServerSocketChannel监听到SelectionKey.OP_ACCEPT时间后,会触发pipeline中的channelRead事件链,最终会执行ServerBootstrapAcceptor中的channelRead方法。这个方法调用childGroup.register(child)将客户端和服务端建立的连接,也就是这里传进来的msg(一个Channel实例)注册进childGroup。childGroup(一个NioEventLoopGroup实例)会从自己的children(一个NioEventLoop数组)中选出一个NioEventLoop去接纳这个新的Channel。

到这里init方法就执行完了,注意这里的很多动作其实都仅仅注册了一个回调函数,还没有被真正的执行。

接下来看看config().group().register(channel)。

3. config().group().register(channel)

config方法返回的是一个ServerBootstrapConfig实例,这部分代码朋友们可以自己跟踪,比较简单。

ServerBootstrapConfig#group方法返回的其实就是ServerBootstrap中的group,这里的group是bossGroup,同样大家跟踪一下这部分代码,不再赘述了。关键我们看group.register(channel)干了什么:

首先,调用NioEventLoopGroup#register方法实际调用了父类MultithreadEventLoopGroup#register方法:

1
2
3
4
5
// MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

然后先调用了一个next方法,调用的是父类MultithreadEventExecutorGroup#next方法:

1
2
3
4
@Override
public EventExecutor next() {
return chooser.next();
}

这里调用了chooser.next(),看过上篇博客的朋友们肯定有印象,这个chooser其实就是从NioEventLoopGroup的children数组中选出一个NioEventLoop。

回到MultithreadEventLoopGroup#register方法中,这里的next().register(channel)其实调用的是NioEventLoop#register方法:

(这里比较绕,多看几遍这部分)

1
2
3
4
5
6
7
8
9
10
11
12
// SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

然后调用的是unsafe的register方法:

这里调用register方法时,将自己传进去了,也就是一个EventLoop,这步的作用其实就是将这个Channel和一个EventLoop绑定起来了,或者说将Channel注册到了EventLoop中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// AbstractChannel.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

这里面有一个AbstractChannel.this.eventLoop = eventLoop;,可以印证我们刚刚分析的,这里将传进来的EventLoop赋值到了自己的eventLoop对象上,还记的上面的init方法中,最后有一段代码:

1
2
3
4
5
6
7
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});

这里ch.eventLoop()拿到的其实就是刚刚赋值的eventLoop对象。

哎这里是不是很奇怪,ch.eventLoop()这步是在eventLoop对象被赋值之前调用的啊,这时拿到的eventLoop难道不是null嘛?这就是上面反复提到过的,这里的ch.eventLoop()其实并没有执行,只是注册了一个回调函数,当它真正被调用执行的时候,eventLoop已经被赋值了。

继续看AbstractChannel的register方法,这个方法调用了register0:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

这段代码干了两件重要的事情:

  • doRegister
  • beginRead

这两个后面分析。这里还做了一件重要的事情,调用了pipeline.fireChannelRegistered();,这里触发了pipeline中的注册事件链,注册事件链比较特殊,上文提到过,ChannelInitializer的initChannel方法实际是由channelRegistered方法触发的。

1.doRegister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}

看到了重要的一个调用:

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);,这里selectionKey是JDK的selectionKey,javaChannel返回的是JDK的Channel,然后将eventLoop中的Selector注册进了这个Channel中,到这里总算是落地到JDK的代码上了,到这里,Nio Server算是真正的启动了。

这里还没完,这里注册的interestOps是0,0不是任何一个Nio事件,所以这里其实是借助register方法初始化selectionKey,并没有开始真正的监听Nio事件。这里将自己作为attachment传进SelectionKey,之后会反过来从SelectionKey中取这个NettyChannel。

2.beginRead

刚刚说了,doRegister方法中并没有开启真正的事件监听,那唯一的可能就是在beginRead中开启监听了:

进过一顿寻找,发现最终beginRead调用了这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// AbstractNioChannel.java
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

这里将刚刚创建的selectionKey中的interestOps换成了在AbstractNioChannel构造的时候传进来的interestOps,文章开头看到了,最初传进来的interestOps是SelectionKey.OP_ACCEPT事件,所以最后selectionKey绑定的事件就是SelectionKey.OP_ACCEPT事件。

doBind方法

这个方法就很简单了,就是触发了pipeline中的bind事件链,并最终调用JDK绑定到端口,这里最后调用JDK的代码比较难找,最开始我找了好久也没有找到是在哪里调用的,仔细梳理了一遍之后发现,触发pipeline中的bind事件链中,bind事件被定义为出站事件,所以事件会从tail流到head,我们去看那一下DefaultChannelPipeline中的headContext:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {

private final Unsafe unsafe;

HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}

@Override
public ChannelHandler handler() {
return this;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}

@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

// ... 省略
}

最终委托给了unsafe#bind方法,后面的代码跟踪就比较简单了。

总结

我们看到,在注册这一步的时候,绕了好大一个弯,从NioEventLoopGroup#register到NioEventLoop#register再到Unsafe#register方法。

从NioEventLoopGroup#register到NioEventLoop#register这步,其实是从EventLoopGroup选出一个EventLoop(通过调用next方法),NioEventLoop#register到Unsafe#register这步,其实是将NioEventLoop绑定到Channel,这里Unsafe是Channel的内部类,最终,Unsafe调用了JDK的Nio register方法创建了一个selectionKey。Unsafe最后还点燃了pipeline的register事件链,并最终绑定了SelectionKey.OP_ACCEPT事件。

这里分一下Unsafe这个命名,与JDK打交道的功能封装在Unsafe中,它是连接Netty Nio与JDK Nio的桥梁,那为什么要命名为Unsafe呢?这里就很有趣了:

我们熟悉Sun提供的Unsafe工具,这个工具可以与一些底层直接进行交互,比如CAS,比如堆外内存的使用,这里命名为Unsafe的意思是说,这些东西都不属于JVM管理的,请知晓,JVM是不保证这些操作的安全性的!

在Netty中就很有意思了,Netty是在说谁Unsafe呢?其实说的是JDK,Netty说JDK是Unsafe的,因为对于Netty来说,JDK中的代码是不受自己控制的,调用JDK出了问题Netty是无能为力的,Netty同样不能保证JDK的安全!所以与JDK打交道的代码被称为Unsafe。

这样的话,在以后自己写框架的过程中,如果需要对其他的依赖的框架做一些封装,那这个封装的类也可以被命名为Unsafe,告诉别人,这个类里面的功能都是别人的!我不能保证安全性!我只是提供一层封装!(虽然我自己可能不会这么去做^_^)。