Netty源码-5-Accept和Read事件监听过程

前面我们仔细分析过NioEventLoop的源码,以及找到了Netty事件驱动的源头代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

讲NioEventLoop的时候,后面就没有接着讲事件被触发之后的操作了,我们当时只知道了,这里是Nio事件触发的源头,是监听事件的地方。

这篇博客我们来看一看,当SelectionKey.OP_READ | SelectionKey.OP_ACCEPT这两个事件被Netty监听到之后,Netty会怎么操作。

先看看SelectionKey.OP_ACCEPT。看到这篇文章应该知道了,Server端的是有两个层次的:boss和worker,boss用来接收ACCEPT事件,worker用来持有建立的连接以及继续监听连接的读写事件。

所以这里SelectionKey.OP_ACCEPT事件触发后,最后一定会创建一个连接,并交给worker线程池。下面我们分析源码:

ServerBootstrapAcceptor

先必须要回忆一下ServerBootstrapAcceptor,这个类是一个ChannelHanlder,是boss Channel创建的时候注册到pipeline中去的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ServerBootstrap#init()方法,截取一部分
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
// ...
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

最后调用pipeline.addLast,将一个ServerBootstrapAcceptor实例注册进了pipeline。

ServerBootstrapAcceptor.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}

这个类主要实现了channelRead方法,这个方法会被pipeline#fireChannelRead方法回调。也就是说,read事件链被触发的时候,这个方法会被回调,这里记一下这个方法被触发的地方。

先看一下这个channelRead方法干了什么,首先传进来的参数msg被强制转换为Channel类型,这个应该是在调用方保证的,可以猜想到这个Channel是刚刚Client和Server建立的连接。接下来调用child.pipeline().addLast(childHandler)往子Channel的pipeline中注册一个事件处理类:childHandler,这个类是在调用ServerBootstrap#childHandler的时候设置进来的。最后将Channel注册进childGroup,这个childGroup是一个NioEventLoopGroup,也就是worker线程池。

这里验证了我们的猜想,总结一下:

初始化NioServerSocketChannel(init方法)的时候,在NioServerSocketChannel的pipeline中注册了一个ServerBootstrapAcceptor,当这个类的channelRead方法被回调时,建立的连接,也就是一个新的Channel被注册到worker线程组中。

这就是NioServerSocketChannel负责的功能啦。

好了,大致流程已经分析清楚了,接下来仔仔细细的看一下整个流程:

SelectionKey.OP_ACCEPT事件

这个事件被触发的时候,调用了unsafe.read(),这个unsafe就是NioServerSocketChannel对应的unsafe,调用的unsafe#read方法实际是调用NioServerSocketChannel的父类AbstractNioMessageChannel中的内部类NioMessageUnsafe#read方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (exception != null) {
closed = closeOnReadError(exception);

pipeline.fireExceptionCaught(exception);
}

if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}

这里面干了两件事:

  • doReadMessages(readBuf)
  • pipeline.fireChannelRead(readBuf.get(i))

惯例一个个看:

1. doReadMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

这个方法在子类NioServerSocketChannel中,很简单的调用了SocketUtils.accept(javaChannel()),其实就是调用JDK原生的accept方法,接纳一个新的客户端,并返回一个客户端的句柄SocketChannel,然后包装成Netty中的NioSocketChannel类,add到buf中。

当这个方法返回之后,List<Object> buf中被填充了所有刚刚Accept的Client端的连接。

2. pipeline.fireChannelRead(readBuf.get(i))

这个方法在一个for循环中,挨个触发channelRead事件链。这里的pipeline是NioServerSocketChannel这个类的pipeline,所以最终会调用ServerBootstrapAcceptor的channelRead方法,传入的参数是NioSocketChannel实例。

这里调用完后,后面的逻辑上面已经讲过了,ServerBootstrapAcceptor将这个客户端连接实例注册到了worker线程组中,开始监听并处理之后的读写事件。

总结

这里比较容易疑惑的是,SelectionKey.OP_READ | SelectionKey.OP_ACCEPT这两个事件被同时监听,并都触发的是unsafe.read()事件,但是,如果调用了NioServerSocketChannel的pipeline的channelRead事件链的话,可以保证一定是SelectionKey.OP_ACCEPT事件,因为NioServerSocketChannel监听的只有OP_ACCEPT事件,所以NioServerSocketChannel绑定的EventLoop中触发出来的事件只可能是SelectionKey.OP_ACCEPT事件被触发。

趁热打铁看看Read事件

Read事件由刚刚accept之后,new出来的NioSocketChannel来负责监听。看看这个类的创建:

NioSocketChannel.java构造函数:

1
2
3
4
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

调用了父类AbstractNioByteChannel的构造函数,注意:NioSocketChannel.java的父类是AbstractNioByteChannel,NioServerSocketChannel的父类是AbstractNioMessageChannel,这两个父类是不一样的,名字很相似

【BTW】NioServerSocketChannel和NioSocketChannel两个句柄前面是服务端的,后面是客户端的,和JDK命名规则一样。

看AbstractNioByteChannel的构造:

1
2
3
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}

好了这里得到了一个重要的信息,NioSocketChannel监听的是OP_READ事件。剩下两个参数很好理解,parent就是NioServerSocketChannel实例,ch是Accept事件被处理之后,创建的java原生的SelectableChannel。

所以当监听到OP_READ事件之后,会调用unsafe.read(),这里的unsafe是在AbstractNioByteChannel中实现的unsafe,看看它的NioByteUnsafe#read方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}

allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());

allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}

对应于NioMessageUnsafe的read方法,这里同样干了两个重要的事情:

  • doReadBytes
  • fireChannelRead

doReadBytes

这个方法是一个模板方法,在NioSocketChannel中实现:

1
2
3
4
5
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

这里其实就是将javaChannel中的数据读到了ByteBuf中,然后返回了,具体过程不再分析。

fireChannelRead

在获取到javaChannel中的读取的数据之后,就发起了channelRead事件链,这里的pipeline是NioSocketChannel的事件链(看pipeline事件触发时,要看清楚pipeline是属于哪个Channel,这将影响后面的逻辑分析)。

最终会运行childHandler的channelRead方法,也就是开发者自定义的handler,这里其实就是把事件传给了开发人员的逻辑里面了。

以Netty提供的Echo例子为例,它的childHandler是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

直接将读到的数据使用ctx.write(msg)方法写回到客户端,也就是echo服务。

这里解释一下@Sharable:

@Sharable

1
2
3
4
5
6
7
8
9
10
11
12
13

/**
* Indicates that the same instance of the annotated {@link ChannelHandler}
* can be added to one or more {@link ChannelPipeline}s multiple times
* without a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared
* state such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
*/

表示一个ChannelHandler是否是可以多个Pipeline共享的,可以和:是否是可重入的、是否是可以并发调用的、是否是线程安全的、是否是单例的这几个问题结合起来理解Sharable。

1️⃣