最近工作上有一些变动, 所以有一点时间可以看看源码. 之所以挑选 Netty 是因为之前准备看一下, 但是因为各种原因没能看完, 这次就趁此机会读一读.

NIO 的核心思路

网上各种 I/O 模式的分析已经很多了, 比如阻塞 I/O, 非阻塞 I/O 等等的区别. 大部分都会引用 Unix Network Programming 这本书上的分析, 有兴趣的可以自己去看一看. 这里所说的 NIO 翻译成中文就是非阻塞 I/O (并不是异步 I/O).

我个人感觉, NIO 的优点, 简而言之, 就是可以用很少的线程数, 来管理大量的连接. 如果你用过一些支持 Reactive 的网络框架, 你就会发现框架用来处理请求的 I/O 线程很少 (比如 4 个). 如果你在开发的时候没有意识到这一点, 一不小心使用了阻塞比较久的调用, 你的服务就会无法处理新的请求. 像使用 NodeJS 开发的服务往往也需要注意这一点.

那么怎么使用很少的线程来管理大量连接呢? 这就需要一个历史比较悠久的概念, I/O 多路复用.

In computing, I/O multiplexing can also be used to refer to the concept of processing multiple input/output events from a single event loop, with system calls like poll and select.

简而言之, 就是用很少的 I/O 线程去不断的检测有哪些 I/O 事件可以处理 (并处理这些 I/O 事件). 对于大部分的服务, 处理 I/O 事件需要消耗的时间往往小于等等 I/O 事件的时间, 所以只需要几个 I/O 线程就可以了. 当然, 如果处理 I/O 事件需要比较久的话, 往往会由另一个线程池来处理, 以免阻塞 I/O 线程.

这个不断循环检测 I/O 事件的模式, 往往被称为 Event Loop.

NIO 代码示例

以下是一个使用 Java 原生 NIO 实现的服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Server
class SimpleServer {
public void main(String[] args) {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("localhost", 5454));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// Event Loop
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
// handle key
}
}
}
}

这段代码最重要的部分主要是两个. 一个是在 Selector 上注册关心的事件, 另一个就是下面的事件循环.

Netty 的抽象

Netty 本质上就是一个实现了上述 NIO 的框架, 并且提供了丰富的其他功能, 帮助开发者 focus 在实际的业务中, 而不需要关注网络 I/O 的处理. 它的实现使用的核心抽象包括:

  1. Channel: 包含了实际网络的抽象, 表示一个连接. 它内部封装了实际 Java 的 Channel 和一个 Unsafe 的实现, 用来做实际的网络操作.
  2. EventLoop: 一个事件循环, 底层是一个线程, 不断的处理 I/O 事件. 一个 channel 对应到一个 event loop. 一个 event loop 会接管多个 channel.
  3. EventLoopGroup: 一个事件循环组, 包含了若干个 EventLoop.
  4. ChannelHandler 和 ChannelHandlerContext: ChannelHandler 是自定义的 Interceptor, 被包装成 ChannelHandlerContext 之后加入到了 ChannelPipeline 中. ChannelHandler 分为可以处理 inbound 数据和 outbound 数据的两类 handler.
  5. ChannelPipeline: 和 Channel 一一对应. ChannelPipeline 用类似责任链的形式, 对 Channel 上进出的数据和事件做处理.

处理实现使用的核心抽象之外, 为了方便使用者, Netty 也定义了 Bootstrap 和 ServerBootstrap 两个类, 分别用于创建 Netty 的客户端和服务端.

Netty 服务器启动过程发生了什么

上面的描述很抽象, 接下来还是举两个具体的例子来解释 Netty 的实现原理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Configure the server.
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
// ================= 配置 ServerBootstrap =========================
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
// ================= 绑定端口并启动服务器 =========================
ChannelFuture f = b.bind(PORT).sync();
// ================= 等待服务器关闭 =========================
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

上面这段代码是 Netty 提供的样例代码. 我加了三行(很明显的)注释.

ServerBootstrap 配置

在配置 ServerBootstrap 部分, 主要关注的就是 group, handler 和 childHandler 这三个方法.

group 方法就是为了指定线程池. bossGroup 包含了一个线程, 用来处理连接事件, workerGroup 包含了多个线程 (默认是是核数个线程), 用来处理所有连接的 I/O 事件.

handler 和 childHandler 分别会被加入到对应的 channel 的 ChannelPipeline 中. handler 的参数很好理解, 实际上就是一个 interceptor, 用来 log 所有关心的网络连接的数据. childHandler 则是用来处理建立好的连接中发生的事件.

绑定端口

绑定端口看上去只有一行代码, 实际上做的最多的工作, 下面用一段伪代码来解释一下大概的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 1. 从 group 中选一个 EventLoop (即线程). 这里的 group 是 bossGroup.
EventLoop loop = group.next();

// 2. 创建一个 Channel. 这个 Channel 的具体子类是配置的使用通过 channel(NioServerSocketChannel.class) 指定的.
// NioServerSocketChannel 的构造函数会被调用.
Channel channel = newChannel(loop);

// 3. 在 loop 这个线程上对 channel 做初始化. 注意这段伪代码非常伪.
// 这个初始化, 会把配置的时候指定的 handler 加入到 channel 的 pipeline 中.
// 同时也会把 ServerBootstrapAcceptor 加入到 pipeline 中.
// ServerBootstrapAcceptor 会对客户端连接创建的新 channel 做一下初始化操作.
init(channel);

// 4. 注册这个 channel.
// 在 channel 上调用的方法, 都是 outbound 事件. 它会转而调用 pipeline 的相关方法. 而 pipeline 会调用它的 handlers 中的方法.
// 这个方法最终会调用到 pipeline 的 head handler. 这个 handler 会使用 channel 中封装的 unsafe, 将 channel 注册到 selector 上.
channel.register(promise);

// 5. 实际的 bind 调用.
// 和上面类似, 也是一个在 channel 上调用的方法, 是一个 outbound 事件. 最终会调用 unsafe 的 bind 方法.
channel.bind(address);

为了更方便理解, 这里也贴出各个步骤的调用栈.

Step 3: 初始化阶段

init

Step 4: 注册阶段

register

Step 5: 绑定阶段

bind

Step 5.1: doBind

doBind

上面就是 Netty 的服务端启动的整个过程. 其中涉及到 Pipeline 的部分都是 outbound 操作. 对于 outbound 操作, 都是从 pipeline 的 tail 出发, 不断的去找下一个可以处理 outbound 事件的 context, 然后调用这个 context 包装的 handler 的对应方法. 这个 handler 也可以选择继续 propagate 这个事件.

以 bind 举例, 这个过程可以描述为

channel::bind -> pipeline::bind -> context::bind -> context::findAndInvokeBind -> nextContext.handler()::bind -> nextContext::bind 然后循环, 直到到 head.

Netty 客户端启动过程发生了什么

Netty 的客户端启动过程类似, 但是更简单. 主要区别是 init 逻辑的精简, 以及调用的方法变成了 connect.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Configure the client.
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
try {
//==================== 配置 bootstrap ====================
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});

//==================== 连接服务器 ====================
ChannelFuture f = b.connect(HOST, PORT).sync();

//==================== 等待客户端关闭 ====================
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}

注意, 对于客户端只需要一个 event loop group. 而这个 group 中往往只有第一个 event loop 被使用.

和上文类似, 这里主要关注连接服务器的过程.

连接服务器

废话不说, 先上很伪的伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1. 从 group 中选一个 EventLoop (即线程).
EventLoop loop = group.next();

// 2. 创建一个 Channel.
Channel channel = newChannel(loop);

// 3. 在 loop 这个线程上对 channel 做初始化. 注意这段伪代码非常伪.
// 这个初始化, 会把配置的时候指定的 handler 加入到 channel 的 pipeline 中.
init(channel);

// 4. 注册这个 channel.
// 在 channel 上调用的方法, 都是 outbound 事件. 它会转而调用 pipeline 的相关方法. 而 pipeline 会调用它的 handlers 中的方法.
// 这个方法最终会调用到 pipeline 的 head handler. 这个 handler 会使用 channel 中封装的 unsafe, 将 channel 注册到 selector 上.
channel.register(promise);

// 5. 实际的 connect 调用.
// 和上面类似, 也是一个在 channel 上调用的方法, 是一个 outbound 事件. 最终会调用 unsafe 的 connect 方法.
channel.connect(address);

上面的前四步和服务端一模一样 (newChannel 和 init 的逻辑会不太一样). 至于最后一步和 bind 也是类似的, 这里就不多做解释了.

Netty 事件循环的核心代码

实际上核心代码很短, 代码如下

1
2
3
4
5
6
7
8
9
// SingleThreadEventLoop.java
assert inEventLoop();
do {
runIo();
if (isShuttingDown()) {
ioHandler.prepareToDestroy();
}
runAllTasks(maxTasksPerRun);
} while (!confirmShutdown());

可以看出, 这个事件循环主要分为两步, 一个是处理 I/O 事件 (runIo), 一个是处理任务. 处理任务这里暂时不做讨论, 因为相对比较简单, 就是从一个 queue 中 poll 任务, 然后执行即可. 下面重点看看 runIo 这个方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
// SingleThreadEventLoop.java
protected int runIo() {
assert inEventLoop();
return ioHandler.run(context);
}

// 伪代码
// NioHandler.java
@Override
public int run(IoExecutionContext runner) {
select(runner, wakenUp.getAndSet(false));
return processSelectedKeys();
}

虽然伪代码做了一定的简化, 简化的部分主要是错误处理, 以及提交任务的唤醒逻辑. 核心实际上就是调用 select 然后处理所有 ready 的事件.

Netty 服务器处理新连接

有了上面的铺垫, 接下来让我们看一下, Netty 服务器是怎么处理新连接的. 在看下去之前, 先想象一下这个流程, 看看你想的和实际是不是差不多 (我也来想一下😆).

这个过程应该是

  1. EventLoop 发现有 ACCEPT 事件
  2. 创建一个新连接
  3. 然后走一遍 Channel Pipeline
  4. 把这个新连接注册到 EventLoop 上, 并监听读事件.

接下来看实际流程

Step 1: Selector 发现 Accept 事件

image-20190716162027904

Step 2: 创建连接:

image-20190716162531765

Step 3: 走一遍 ChannelPipeline

image-20190716162627292

Step 4: 注册连接

image-20190716162759240

image-20190716162833655

一直到这里, 新连接在服务端算是完成了.

注意这里走 pipeline 是由 unsafe.read 触发的. 所有的 inbound 事件都有这个特点. inbound 事件会由 pipeline 的 head 首先处理, 然后不断 propagate 到下面.

Netty 客户端处理连接成功

这个比较容易, 就是在 EventLoop 中监听到 connect 完成事件后, 调用 unsafe 的 finishConnect 方法. 显然这个也是一个 inbound 事件, 要走一遍 pipeline.

总结

虽然一直拖到今天才初步读了一下 Netty 的代码, 但是在很早以前, 我读过 swift-nio 的源码 (那时候刚开源, 代码量相对比较少, 现在可能也不少了…). 感觉 NIO 的核心思想差不多, 更多是实现上的区别. Netty 的代码架构对于刚入门的人来说, 我感觉还是蛮难懂的, 尤其是 pipeline, unsafe, channel 之间的来回调用. 不知道这样设计的目的是什么. 相比之下 swift-nio 当时读起来就相对轻松得多.