Netty源码-1-NioEventLoopGroup

建议大家手头都有一份netty的源码对照阅读。

从EchoServer入手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final class EchoServer {

static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}

// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

SSL的部分先不管,先看看netty的服务启动过程,主要有两个重要的点:

  • 创建NioEventLoopGroup,服务端有两个,bossGroup和workerGroup
  • ServerBootstrap绑定端口。

在创建NioEventLoopGroup的过程中,会创建NioEventLoop,这部分比较复杂需要单独写成一篇,然后创建NioEventLoopGroup的过程和创建ServerBootstrap的过程各写成一篇博客,一共用三篇博客来看netty的启动过程。

创建NioEventLoopGroup

看看创建NioEventLoopGroup的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// NioEventLoopGroup.java

public NioEventLoopGroup() {
this(0);
}

public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

可以看到如果我们没有传入线程数的话,nThreads默认为零,之后可以看到如果nThreads为零的话,会使用availableProcessors() * 2代替,即可用线程数的两倍。

三个默认参数:

在构造方法的传递过程中,传入了几个默认参数:

  • SelectorProvider.provider()
  • DefaultSelectStrategyFactory.INSTANCE
  • RejectedExecutionHandlers.reject()

稍微点进去看一下这几个类是干什么的:

SelectorProvider负责打开一个Selector,这是java nio中的原生Selector。

DefaultSelectStrategyFactory.INSTANCE是一个SelectStrategy的工厂实例,返回的是DefaultSelectStrategy.INSTANCE,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();

private DefaultSelectStrategyFactory() { }

@Override
public SelectStrategy newSelectStrategy() {
return DefaultSelectStrategy.INSTANCE;
}
}

final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

private DefaultSelectStrategy() { }

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}

public interface SelectStrategy {

/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* Indicates the IO loop to poll for new events without blocking.
*/
int BUSY_WAIT = -3;

/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

这个类具体干嘛用的之后会看到,现在不用纠结。

RejectedExecutionHandlers.reject()返回的也是一个单例的RejectedExecutionHandler(),这个类是一个拒绝策略,熟悉线程池的同学都知道。这个类简单的抛出一个异常RejectedExecutionException。

最后,NioEventLoopGroup中的构造函数调用了父类MultithreadEventLoopGroup的构造函数:

1
2
3
4
// MultithreadEventLoopGroup.java
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里可以看到,如果nThreads为0的话,使用DEFAULT_EVENT_LOOP_THREADS代替,也就是availableProcessors() * 2。这里又调用了父类MultithreadEventExecutorGroup的构造函数:

1
2
3
4
// MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

真正的构造函数:

最终委托给了真正的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

MultithreadEventExecutorGroup持有一个EventExecutor[] children,是真正执行任务的线程,ExecutorGroup可以看做一个线程容器,负责管理持有的EventExecutor。这里可以对照java原生的线程池,EventExecutorGroup可以看做ExecutorService,EventExecutor可以看做线程池中的Thread,不过是升级版的。

这个构造函数主要干了三件事情:

1. 创建executor

看这个构造函数,如果executor是null(这个例子中就是null,可以回顾一下之前的构造函数,第二个构造函数就传入了一个null的executor),就new了一个ThreadPerTaskExecutor,这个ThreadPerTaskExecutor对于每个提交的任务(顾名思义,Thread Per Task),都使用ThreadFactory创建一个新的Thread去 执行这个任务。关键是ThreadFactory,是使用newDefaultThreadFactory()获得的,newDefaultThreadFactory方法创建了一个DefaultThreadFactory,DefaultThreadFactory除了定制了Thread的名字之类的属性外,最重要的是他返回的是FastThreadLocalThread类型的Thread,FastThreadLocalThread里面使用的ThreadLocal是FastThreadLocal,并将任务Runnable包装成一个新的Runable:FastThreadLocalRunnable.wrap(r)

所以这个executor的逻辑是对于每个新的任务,都创建一个FastThreadLocalThread类型的线程去执行这个任务。

netty中所有的线程都是使用FastThreadLocalThread+FastThreadLocal这样的搭配,原因是FastThreadLocal比java原生的ThreadLocal要快,并且杜绝了原生ThreadLocal潜在的内存泄漏,这部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
        // MultithreadEventExecutorGroup的构造函数
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}

// netty的默认ThreadFactory,创建出来的Thread是FastThreadLocalThread
public class DefaultThreadFactory implements ThreadFactory {

// ... 省略

@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}

if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}

protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}

// 一个简单的Executor,对于每个新的任务都创建一个新的线程去执行任务
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

// netty中的runable都被包装成这个,目的是在run方法执行完后,主动调用FastThreadLocal.removeAll();
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;

private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}

@Override
public void run() {
try {
runnable.run();
} finally {
// 手动清理这ThreadLocal,防止内存泄漏
FastThreadLocal.removeAll();
}
}

static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}

这里不讲FastThreadLocalThread和FastThreadLocal的代码。

2. 创建所有的children

好了,接下来就是其创建children,每一个children都是一个NioEventLoop,看一下这个类:

这个类是一个SingleThreadEventLoop,简而言之就是一个单线程的线程池。这个类非常复杂,之后再看这个类,现在只用知道,EventExecutorGroup现在初始化了一个NioEventLoop数组并持有。

3. 创建chooser

先看一下chooser是什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@UnstableApi
public interface EventExecutorChooserFactory {

/**
* Returns a new {@link EventExecutorChooser}.
*/
EventExecutorChooser newChooser(EventExecutor[] executors);

/**
* Chooses the next {@link EventExecutor} to use.
*/
@UnstableApi
interface EventExecutorChooser {

/**
* Returns the new {@link EventExecutor} to use.
*/
EventExecutor next();
}
}

首先传入了一个EventExecutor[],然后又一个next方法用来从EventExecutor[]中选出一个EventExecutor。这里的EventExecutor就是NioEventLoop。

构造函数中使用的EventExecutorChooserFactory是DefaultEventExecutorChooserFactory.INSTANCE:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

private DefaultEventExecutorChooserFactory() { }

@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}

最终创建了PowerOfTwoEventExecutorChooser或者GenericEventExecutorChooser作为chooser。这里可以看出,如果传给chooser的EventExecutor数组是2的整数幂,那么next方法的执行效率会快很多。

这里有一个快速判断是个数是否是2的整数次幂的方法,学习到了吗,这里使用到了补码的性质。

Q&A以及总结

Q:NioEventLoopGroup是什么?

A:netty内部的线程池,用来管理netty内部的线程。

Q:NioEventLoop是什么?

A:包装了一个Thread,NioEventLoop本质是一个只有一个线程的线程池,但是比起线程池来说干了很多其他的事情,比如注册一些事件,注册Selector等功能。

Q:NioEventLoopGroup如何选出一个NioEventLoop?

A:使用chooser,chooser有两种,当NioEventLoop数量是2的整数幂的时候,使用与运算,否则使用取余从数组中取出NioEventLoop,但是两种方法都是轮询的取,去的顺序没有区别,只是前者更快。

Q:Thread和FastThreadLocalThread的区别?

A:FastThreadLocalThread使用的是FastThreadLocal,FastThreadLocal内部是线性的存取数据,而java原生的ThreadLocal使用的是hash,所以FastThreadLocal要快不少。FastThreadLocalThread中跑的Runnable是经过包装后的FastThreadLocalRunnable,在执行完run方法后,会主动清除FastThreadLocal中的数据。

Q:NioEventLoopGroup的创建过程是怎样的?

A:首先创建executor,最主要的作用是使用netty的ThreadFactory,保证创建的线程都是FastThreadLocalThread,然后根据nThread创建NioEventLoop数组,作为children持有,最后根据children创建一个chooser。

Q:NioEventLoopGroup持有的children是什么?

A:就是一个NioEventLoop数组,用来真正干活的线程组。

Q:bossGroup和workerGroup的区别:

A:bossGroup用来监听客户端连接请求,连接之后就将其他的任务交给workerGroup了,所以bossGroup内部只有一个线程,workerGroup的默认大小是availableProcessors() * 2

我不喝咖啡,但是我相信知识有价。