目录
Vert.x 源码阅读 (2) - Stream
这是 Vert.x 项目源码阅读笔记的第二篇,主要记录一下 Stream
这个抽象。Stream
是 Vertx
中消息传递使用的底层抽象。主要代码在这个目录下。其中两个接口比较重要,分别是 ReadStream
和 WriteStream
。这两个接口都扩展了 StreamBase
这个接口。从名字就能看出来,这两个接口分别对应了读取和写入,我们可以不严谨地类比到 C++ 里的 std::cin
和 std::cout
。
ReadStream
响应式
ReadStream
提供的接口是响应式的。使用者可以设置数据的回调方法,一旦有数据过来,回调就会被调用。这个和 Reactive
编程方式提供的抽象有点相似。
读取模式
ReadStream
提供了两种读取模式,它们分别是:
Flowing
Fetching
这两种模式可以类比为 Push 和 Pull 模式。ReadStream
通过 pause
, resume
,还有 fetch
三个方法来切换读取模式。pause
方法会进入 Fetching
模式,而 resume
方法会进入 Flowing
模式。fetch
方法则是用来在 Fetch
模式下指定需要读取多少元素。
下面是简化后的 ReadStream
接口的一个实现 MessageConsumerImpl
的代码。主要省略了边界条件处理。
1 | public interface MessageConsumer<T> extends ReadStream<T> { |
WriteStream
WriteStream
提供了异步的写入数据的接口,这个接口比较自然,所以不做太多讨论。下面我们主要看看它提供的流量控制的接口。
流量控制 Flow Control
WriteStream
提供的和流量控制的相关接口主要有三个
setWriteQueueMaxSize(int maxSize)
设置了最大的 Write Queue 的大小。boolean writeQueueFull()
返回当前是否 Write Queue 已写满。drainHandler(Handler<Void> handler)
用来指定当 Write Queue 从 Full 的状态恢复的时候,应该执行什么逻辑。
有了这三个方法,我们就能在一定程度上保证流量控制了。下面是一个简单的例子,假设我们想从一个 ReadStream
读取数据,然后写到 WriteStream
中。为了避免 WriteStream
的下游承受的住,我们可以这么写:
1 | // 我们想把数据从 readStream 读出来,然后转移到 writeStream 中。 |
Vert.x 提供了 Pump
和 Pipe
两个接口,它们的实现和上面的代码类似。