Vert.x 源码阅读 (1) - Future 和 Promise

Vert.x 源码阅读 (2) - Stream

Vert.x 源码阅读 (3) - EventBus

Vert.x 源码阅读 (4) - Context

这是 Vert.x 项目源码阅读笔记的第二篇,主要记录一下 Stream 这个抽象。StreamVertx 中消息传递使用的底层抽象。主要代码在这个目录下。其中两个接口比较重要,分别是 ReadStreamWriteStream。这两个接口都扩展了 StreamBase 这个接口。从名字就能看出来,这两个接口分别对应了读取和写入,我们可以不严谨地类比到 C++ 里的 std::cinstd::cout

ReadStream

响应式

ReadStream 提供的接口是响应式的。使用者可以设置数据的回调方法,一旦有数据过来,回调就会被调用。这个和 Reactive 编程方式提供的抽象有点相似。

读取模式

ReadStream 提供了两种读取模式,它们分别是:

  1. Flowing
  2. Fetching

这两种模式可以类比为 Push 和 Pull 模式。ReadStream 通过 pauseresume,还有 fetch 三个方法来切换读取模式。pause 方法会进入 Fetching 模式,而 resume 方法会进入 Flowing 模式。fetch 方法则是用来在 Fetch 模式下指定需要读取多少元素。

下面是简化后的 ReadStream 接口的一个实现 MessageConsumerImpl 的代码。主要省略了边界条件处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface MessageConsumer<T> extends ReadStream<T> {
}

public class MessageConsumerImpl<T> implements MessageConsumer<T> {
// 这个字段用来表示想要读取多少元素。如果是 Long.MAX_VALUE,则表示正处在 Flowing 模式下。
private long demand = Long.MAX_VALUE;

public synchronized MessageConsumer<T> pause() {
demand = 0L;
return this;
}

@Override
public MessageConsumer<T> resume() {
return fetch(Long.MAX_VALUE);
}

@Override
public synchronized MessageConsumer<T> fetch(long amount) {
demand += amount;
if (demand > 0L) {
// 异步 consume 数据
}
return this;
}
}

WriteStream

WriteStream 提供了异步的写入数据的接口,这个接口比较自然,所以不做太多讨论。下面我们主要看看它提供的流量控制的接口。

流量控制 Flow Control

WriteStream 提供的和流量控制的相关接口主要有三个

  1. setWriteQueueMaxSize(int maxSize) 设置了最大的 Write Queue 的大小。
  2. boolean writeQueueFull() 返回当前是否 Write Queue 已写满。
  3. drainHandler(Handler<Void> handler) 用来指定当 Write Queue 从 Full 的状态恢复的时候,应该执行什么逻辑。

有了这三个方法,我们就能在一定程度上保证流量控制了。下面是一个简单的例子,假设我们想从一个 ReadStream 读取数据,然后写到 WriteStream 中。为了避免 WriteStream 的下游承受的住,我们可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 我们想把数据从 readStream 读出来,然后转移到 writeStream 中。
ReadStream<T> readStream = ...;
WriteStream<T> writeStream = ...;

readStream.handler(data -> {
if (writeStream.writeQueueFull()) {
// 发现 Queue 写满了,就暂停一下
readStream.pause();
// 在 Queue 恢复了之后,自动 resume
writeStream.drainHandler(any -> {
readStream.resume();
});
} else {
writeStream.write(data);
}
})

Vert.x 提供了 PumpPipe 两个接口,它们的实现和上面的代码类似。