Netty源码-2-NioEventLoop

长文预警!!!

一、NioEventLoop概要

NioEventLoop就是上文中NioEventLoopGroup的children,可以先通过名字分析一下这个类的作用,这个类名有三个单词:Nio、Event、Loop。

通过Loop可以猜测,这个类里面有一个死循环,死循环的逻辑一般都会扔给一个单独的线程,所以可以猜测NioEventLoop中会管理一个线程或者线程池,这个线程中用来执行死循环逻辑,既然涉及到线程或者线程池,那么还要有管理线程生命周期的方法。如果是线程池,那就会有线程池相关的一些逻辑,比如线程池的拒绝策略等等。

通过Event可以猜测到,死循环执行的逻辑是在监听某一个或一类事件,监听过程一定是阻塞的。

通过Nio可以知道,上面提到的Event是Nio中的事件,而且是SocketNio的事件,SocketNio的事件定义在SelectionKey中,有四种事件,读事件,写事件,客户端的连接事件,服务端的接收事件。

1
2
3
4
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

通过上面的猜测总结一下NioEventLoop的功能:开启一个线程循环监听Socket Nio中的事件。

接下来看下源码:

接着上文,创建NioEventLoop的入口在NioEventLoopGroup中,上文讲了NioEventLoopGroup构造时,会根据nThread参数创建相应大小的children数组,children数组实际上就是NioEventLoop数组,MultithreadEventExecutorGroup中的newChild方法是一个模板方法,实现在NioEventLoopGroup中:

1
2
3
4
5
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可以看到就是new了一个NioEventLoop,这里注意一下传入的参数:

  • executor
  • SelectorProvider
  • SelectStrategyFactory (先不管)
  • RejectedExecutionHandler

这里印证了一些上文的猜测,我们挨个看一下这几个参数(其实这里面的参数都在上文中提到过):

1. executor

上文中讲过了executor的创建,这里复习一下,executor是一个netty中自己定义的线程池:ThreadPerTaskExecutor,从名字就能看出,这个线程池为每一个新的任务都创建一个新的线程去执行,这个线程池最主要的作用是,从这个线程池中创建出来的每一个线程都是Netty自定义的线程:FastThreadLocalThread,这个线程池中使用的是FastThreadLocal而不是jdk原生的ThreadLocal,FastThreadLocal比ThreadLocal更快,因为FastThreadLocal没有算hash,并解决了伪共享的问题。

这里创建FastThreadLocal并不是ThreadPerTaskExecutor本身的功能,而是因为MultithreadEventExecutorGroup创建ThreadPerTaskExecutor时传入的是DefaultThreadFactory,真正负责创建FastThreadLocal的类实际是DefaultThreadFactory。

2. SelectorProvider

这是Java中的原生类,用来获取一个Selector。

3. RejectedExecutionHandler

对应的executor的拒绝策略。

二、构造NioEventLoop

看了构造函数的参数之后,再看一下构造的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

首先调用了父类的构造函数,看一下SingleThreadEventExecutor这个父类的构造函数:

1
2
3
4
5
6
7
8
9
10
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

上文中提到的几个参数,在这里都有体现了,父类构造也比较简单,只是进行了简单的赋值。

继续看NioEventLoop的构造函数,构造函数中,除了一些赋值之外,执行了一个重要的方法:openSelector(),这个方法用来获取Selector:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});

if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);

if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}

Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}

selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});

if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

这个方法有看起来非常复杂,但其实有一个开关:DISABLE_KEYSET_OPTIMIZATION:

1
2
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

默认是false。

先看如果是true怎么办,如果是true,那么直接返回了SelectorProvider的openSelector方法返回的Selector,这里简单包装了一下,SelectorTuple,但是其实什么也没干,然后这个方法就直接返回了。

这里有两个问题:SelectorProvider的openSelector获得的是什么Selector?如果DISABLE_KEYSET_OPTIMIZATION是false怎么办?咱们一个一个看:

1. SelectorProvider的openSelector方法

这里的SelectorProvider是使用SelectorProvider.provider()方法获取的SelectorProvider是sun.nio.ch.DefaultSelectorProvider.create()返回的SelectorProvider:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

而sun.nio.ch.DefaultSelectorProvider.create()返回的是KQueueSelectorProvider。所以SelectorProvider的openSelector方法实际是调用的KQueueSelectorProvider的openSelector方法:

1
2
3
4
5
6
7
8
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}

public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}

这里可以体现出JDK的跨平台特性了,我使用的是iOS系统,所以返回的是KQueue,可以猜想,如果使用的是Linux系统,那这里返回的应该是Epoll,因为iOS和Linux的epoll系统调用不一样。

那第一个问题解决了,最终创建的Selector是KQueueSelectorImpl,使用了k-queue系统调用。

2. DISABLE_KEYSET_OPTIMIZATION为false

传统,先看名字:DISABLE_KEYSET_OPTIMIZATION,禁用KeySet优化,默认是false,即默认是启用KeySet优化。

当DISABLE_KEYSET_OPTIMIZATION == true的时候,直接返回了KQueueSelectorImpl。

如果是false的时候,逻辑就稍微比较复杂一些,首先通过反射找到了sun.nio.ch.SelectorImpl这个类,这个类是KQueueSelectorImpl的父类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;

protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}

}
// ... 略
}

继续看下面的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

// ... 省略
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});

可以看到,通过反射拿到了SelectorImpl的selectedKeys和publicSelectedKeys这两个Field,然后调用了set方法,替换掉了这两个Field的值。

用什么替换掉的呢?SelectedSelectionKeySet,为什么要用这个替换呢,替换掉的是什么呢?

看看SelectorImpl的构造函数,构造selectedKeys和publicSelectedKeys使用的是HashSet,所以实际上使用SelectedSelectionKeySet替换掉了HashSet。

看看SelectedSelectionKeySet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

SelectionKey[] keys;
int size;

SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}

@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}

keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}

return true;
}

@Override
public boolean remove(Object o) {
return false;
}

@Override
public boolean contains(Object o) {
return false;
}

@Override
public int size() {
return size;
}

@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;

@Override
public boolean hasNext() {
return idx < size;
}

@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}

void reset() {
reset(0);
}

void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}

private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}

HashSet本身是基于HashMap实现的,而HashMap的插入会有很复杂的操作,链表、扩容、rehash、链表转红黑树等等操作,而这里这里并不需要这么复杂的操作,所以Netty直接用了一个数组代替,数组的顺序插入比HashMap的插入要高效得多。

所以简而言之,这里不需要使用HashSet,Netty认为SelectorImpl的实现不够高效,所以使用反射,将SelectorImpl中的selectedKeys和publicSelectedKeys替换为自己用数组实现的性能更好的SelectedSelectionKeySet。

到这里就能完全看出DISABLE_KEYSET_OPTIMIZATION的意思了,是否开启KeySet优化。

【BTW】,这种使用数组替换hash的方法在Netty中其实并不陌生,FastThreadLocal就是用数组的顺序添加替代JDK原生ThreadLocal中的hash。

总结:这里没有Thread的启动逻辑诶,不是说NioEventLoop会管理自己的Thread,为什么没有在构造函数里面体现呢?这里可以这么认为,NioEventLoop的构造函数属于NioEventLoop的生命周期,而内部Thread的生命周期需要单独管理。换个角度思考,这里也可以说这是一种懒加载,只有在真正需要启动Thread的时候,再去启动,毕竟Thread是比较昂贵的系统资源。

三、内部Thread的启动

那内部的Thread是怎么启动的呢?

在构造函数中,我们看到executor被SingleThreadEventExecutor持有,我很好奇这个executor会怎么用呢?通过查找executor,找到了一个doStartThread方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ... 省略 更新状态位等收尾工作。
}
}
});
}

thread是SingleThreadEventExecutor持有的一个Thread引用。

这里调用了executor.execute方法,这里面的方法贼好玩,我们知道这个executor是ThreadPerTaskExecutor,看一下ThreadPerTaskExecutor的execute方法:

1
2
3
4
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}

这里从threadFactory创建了一个新的线程(threadFactory是DefaultThreadFactory,创建的是FastThreadLocalThread,后面不再赘述了,已经讲了很多遍了),然后doStartThread调用executor.execute方法中,进行了一步赋值:thread = Thread.currentThread()。

这个就很有味道了,首先Thread.currentThread()是什么,就是ThreadPerTaskExecutor刚刚创建出来的新线程,然后把这个新线程给了SingleThreadEventExecutor持有的Thread引用。

这样,SingleThreadEventExecutor持有的Thread引用其实是由DefaultThreadFactory创建出来的。

在给thread赋值之后,后面接着调用了SingleThreadEventExecutor.this.run(),即自己的run方法,待会着重看一下run方法。

最后做了一些收尾工作。所以我们能看到,前面说了这么久的内部线程,最后是在这里创建并启动的。

那么,doStartThread是什么时候调用的呢?直接查找可以看到startThread方法调用了doStartThread,而SingleThreadEventExecutor的execute方法调用了startThread方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

inEventLoop判断当前的线程是否是自己持有的thread,判断的方法是”==”,这样如果thread是null的话,就调用startThread创建线程。

到这里,我们就知道了NioEventLoop是如何创建并启动持有的thread了。

有朋友可能要问,哎,这个thread没有start啊,其实我们回头看ThreadPerTaskExecutor就知道了:

1
2
3
4
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}

这里调用了start,所以在Runnable内部通过Thread.currentThread()拿到的thread其实就已经启动了。这段代码真的玩的很花。

四、NioEventLoop中的run方法

通过SingleThreadEventExecutor.this.run();调用的实际上是NioEventLoop中的run方法,这个方法是NioEventLoop的核心方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));

// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).

if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

先要介绍一下:ioRatio,NioEventLoop并不是所有的时间都在做nio相关的事情,还有提交到NioEventLoop中执行的普通任务,提交的入口就在SingleThreadEventExecutor#execute方法中,当调用这个方法之后,会将task加入到内部的一个队列中,这部分逻辑和JDK中的线程池很像,不再赘述了。

看一下addTask:

1
2
3
4
5
6
7
8
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}

细看一下究竟,首先offerTask:

1
2
3
4
5
6
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}

taskQueue是什么呢?是SingleThreadEventExecutor构造的时候创建的,回头看一下这部分代码:

1
2
3
4
5
6
7
8
9
10
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

这里调用了newTaskQueue方法:

1
2
3
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}

这里创建了一个LinkedBlockingQueue,到这里taskQueue是什么算是明白了,再看看reject方法:

有两个reject方法:

  • reject()
  • reject(Runnable task)
1
2
3
4
5
6
7
protected static void reject() {
throw new RejectedExecutionException("event executor terminated");
}

protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}

这里调用了rejectedExecutionHandler,这个rejectedExecutionHandler不知道还记不记得,在创建NioEventLoopGroup的时候就传入了这个参数:

1
2
3
4
5
6
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};

这里面不管是哪个reject方法,最终都是抛出了一个RejectedExecutionException异常。

到这里,addTask方法内的逻辑就看清楚了。

知道这这些,再看一下ioRatio这个参数,这个参数代表单位时间内处理nio和普通task的时间分配比率,默认是50,即用在两边的时间一样。

再仔细看看run方法内的逻辑:

1
2
3
4
5
6
7
8
9
10
11
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}

calculateStrategy方法是什么呢:

1
2
3
4
5
6
7
8
9
10
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

private DefaultSelectStrategy() { }

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}

hasTasks()方法返回taskQueue中是否有任务,如果没有任务的话,返回SelectStrategy.SELECT,那样就就会调用select(wakenUp.getAndSet(false))方法,如果任务不为空,则调用selectSupplier.get(),selectSupplier在NioEventLoop中定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};

int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}

get方法调用了selectNow()方法,这个方法也很简单,调用了jdk selector的selectNow方法,这个selector是在NioEventLoop构造的时候创建的。selectNow返回的值是大于等于0的,所以会跳过switch中的所有逻辑。

switch内的逻辑总结一下,如果有task,则调用selectNow并跳出switch,如果没有task,则调用select(wakenUp.getAndSet(false))。

这两条路都看一下。

1. select(wakenUp.getAndSet(false))

先说一下这个方法干了啥,这个方法其实就是NioEventLoop没事情做了,现在只能跑过来等Nio事件,并且一边等Nio事件一边还要检查taskQueue中有没有任务到来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}

int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);

rebuildSelector();
selector = this.selector;

selector.selectNow();
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}

首先算出一个selectDeadLineNanos,是由下一个定时任务的开始时间算出来的。看一下循环方法内部:

首先看一下下一个定时任务是否会在500微秒内开始,如果在接下来500微秒开始则退出循环。如果hasTask,则退出循环。然后调用selector.select(timeoutMillis);,这里会阻塞线程,除非有一个以上的Channel被唤醒或者时间到了,才会唤醒线程,唤醒之后又会检查hasTask和是否有Channel被唤醒(selectedKeys是否>0)等检查条件。

在下面这里解决了一个bug,那就是JDK本身select被无限唤醒导致CPU空转的bug,原理是,一旦bug触发,由于selector.select这步会被一直唤醒而不会阻塞,所以selectCnt会增加的很快,并且消耗的时间将会非常的短,当在非常短的时间内,selectCnt增加到512之后,就会任务bug被触发了,因为selector.select的阻塞失效了。这时候会调用rebuildSelector()。

rebuildSelector比较简单,就是重新创建一个Selector,然后把旧的Selector中注册过的Channel重新注册到新的Selector中,然后替换掉旧的Selector就可以了,最后调用close关闭掉旧的Selector。

2. 当switch跳出后,就会根据ioRatio将时间分比率分配到Nio事件和Task上

跳出上面的select循环后,有两个重要的函数:processSelectedKeys(),runAllTasks()。分别对应nio事件处理和task任务。

processSelectedKeys:

首先在processSelectedKeysPlain中循环遍历了所有的SelectionKey:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}

Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}

if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();

// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}

然后对于每个SelectionKey调用processSelectedKey去处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

这里,就是处理NioEvent的核心内容了,对于每个SelectionKey,都看一下有没有就绪的事件。

对于就绪的事件,会调用unsafe相应的方法,这里面会触发Netty中Pipeline中的事件链,去调用各个Handler中的处理逻辑。这部分之后会讲。

到这里就知道了,Netty的事件入口了,可以说,这里是Netty的引擎,是驱动Netty工作的核心逻辑!

runAllTasks

这个没什么好说的了,挨个执行taskQueue中的任务,留意一下分配的处理时间就完事了。

五、总结

NioEventLoop可以说是Netty的心脏,他驱动了各种Nio事件,并处理各种提交过来的任务。

我们一段一段的分析了NioEventLoop中的核心方法,看看它到底是如何运作的,是如何创建自己以及如何创建内部的Thread的,也看到了NioEventLoop是如何消除JDK中的bug的。

NioEventLoop的触发方式还是水平触发,和JDK是一样的。

还有一个比较重要的方法没有分析,那就是shutdownGracefully方法,里面封装了Netty著名的优雅关机逻辑,回头我们看一下这个方法。

这里只讲了事件是如何触发的,之后我们会看到事件是如何在Pipeline中的各个Handler中流动的。

希望你喜欢这篇文章~