OPED 是 (我自己发起的) One Paper Each Day 挑战, 即每天读一篇 Paper, 领域不限.

这是 OPED 挑战的第二篇, 论文为 State Management in Apache Flink

这篇论文描述了 Flink 系统中的状态管理. Flink 使用了上一篇论文提供的算法实现了分布式系统的状态管理.

为什么我们需要(本地)状态

对于一个”合格”的流式系统来说, 状态是必要的. 比如我想在 Dashboard 上实时展示我每一篇 blog 截止到现在的 UV, 那么很显然, 我需要一个流式处理系统来处理 nginx 的日志, 并按照每个 blog 计算有多少个不同的 IP 访问过. 但是有一个问题, 因为我的 blog 这么精彩,可能同一个 IP 会访问多次, 那么这个系统需要做 IP 去重, 那么它就需要维护一个状态来帮助我们确定一个 IP 是否是第一次访问. 当然你会说, 我们可以保存在外部系统呀, 比如一个数据库. 对于一个访问量很大的网站, 显然访问数据库的延迟会大大影响数据处理的吞吐量. 同时, 数据库能否承受这么高的流量也是我们需要考虑的问题.

为什么我们需要一致的分布式状态快照

假设你已经被我说服我们需要一个本地状态, 那么我们在进一步的考虑这个问题. 如果上面说的系统挂了怎么办? 我们当然可以重启这个系统, 但是由于本地状态没有了, 我们没法知道一个 IP 是否访问过某篇 blog. Oh, shoot, 我们还得让我们的状态支持错误恢复. (早知道还是用外部数据库了.) 如果我们的处理系统只有一个节点, 那么我们只要定期 checkpoint 一下当前所有访问过的 IP 地址即可, 那么要是有多个节点, 这个数据处理链路很复杂该怎么办呢? 比如说下面这个(不可能发生的)场景:

假设我的博客声名远扬, 每天访问量巨大, 所以我有多台服务器来 host 我的 blog. 同时也需要多台机器才能处理这么大的访问数据. 所以这数据处理的拓扑结构长这样.
Processing

很显然, 从 Parse 到 Count, 我们需要做一次 Shuffle. 如果我们就是简单的定期 checkpoint 每台机器的状态, 那么我们在失败恢复时, 就会遇到一个问题. 如果 ParseLogs 以及处理了 500 行记录, 但是这 500 行记录只有 450 行发给了下游, 还有 50 行还在传输过程中. 这个时候, 系统挂了. 我们只有 ParseLogs 这台机器处理完 500 行的状态和 Count 处理完 450 行的状态, 有 50 行数据丢了. 这个情况, 更 general 一点来说, 就是 checkpoint 的时机不同, 也就是说, 为了解决这个问题, 我们需要一个同步机制, 这简直是分布式系统永恒的话题.

Flink 使用了上一篇博客提到的方法. 它会定期从 source 发布一个 marker. 下游收到 marker 之后就会记录自己的状态. 具体的算法内容参见上一篇博客, 这里讲讲一些不同的地方.

Alignment

在一个常见的流式处理拓扑中, 一个节点可能会从多个节点收到数据 (比如上面的例子). 那也就意味着, 一个节点可能会从多个上游节点收到多个 marker, 但是这个节点已经记录过自己的状态了. 对于这种情况, 上一篇博客引用的原论文已经有了解决方案, 即使用 channel state. 但是这有一个问题, 信道中传输的信息可能会很大, 同时要恢复起来也相对麻烦一点. 为了尽量避免 channel state, Flink 引入的 alignment 这一方法, 即一个节点从一个信道收到 marker 之后会 block 住等到其他信道的 marker 全部收到为止. 如果你用过 Flink 的 Web UI, 你就会看到这个指标.

更早的一篇博客中, 我们讨论了 DataFlow 中的 Exactly-Once 语义. Flink 和 DataFlow 不同, 它使用了上面提到的分布式系统快照来实现. 让我们再回忆一下 DataFlow 中为了保证 Exactly-Once 需要保证:

  1. Shuffle 过程保证 Exactly-Once. DataFlow 通过 BloomFilter 去重和持久化 Shuffle 的结果来保证 Exactly Once.
  2. Source 需要可以 Replayable 来保证 Exactly-Once.
  3. Sink 需要保证幂等性.

那么 Flink 是如何利用分布式快照的呢?

首先, 对于 Source 和 Sink, Flink 也有一样的要求, 唯一不同的是 Shuffle. Flink 使用了有序的基于 TCP 的协议, 并且一旦出现网络异常, 就需要回滚到上一个 Checkpoint 点. 在不考虑成本的情况下, 这种方式很显然是能保证 Shuffle 的恰好一次语义的. 唯一的问题在于一次异常的代价很高, 而 Flink 假设异常是很罕见的. 确实, 很多 Benchmark 和已有的使用体验也印证了这一点.

更多的细节

论文中还有更多的细节, 比如环的处理情况等等, 这里就不详细解释了. 有兴趣的朋友可以去读一读原论文.