当看到 Exactly-Once 这个名词的时候, 有一些同学的第一反应可能就是每条数据, 恰好执行一次. 这个理解有一点问题, 我们之后再讨论. 我们先想一想为什么 Exactly-Once 重要. 其实这一点我觉得就不用做太多解释. 想象一下计算 UV 这个场景, 如果没有这个语义的保证, UV 数据就会出错.

当我们使用 Spark, Storm, Flink 等等流式处理框架的时候, 我们的逻辑往往就是从若干个数据源读取数据, 然后使用我们自定义的代码去处理这些数据. 由于有 Failure 的存在, 这些框架不得不重试多次, which means 我们的代码对同一个数据, 可能会调用多次 (甚至在不同的机器上同时被调用, 比如在出现网络故障的情况下). 如果我们的代码有副作用, 比如每处理一条数据就发一条微博, 那么最后发出来的微博数很有可能和我们的数据总数是不一致的. 一般情况下是大于等于, 如果处理框架提供了 at-least-once 的语义. 我们说的 exactly-once 语义并不保证我们的代码只被执行一次.

那么到底什么是这篇文章想要讨论的 exactly-once 呢? 我们说的 exactly-once 是指保证得到的最终数据等效于每条原始的数据被正确处理了恰好一次, 即使处理过程中出现 worker 挂掉导致同一个数据被实际多次处理了.

DataFlow 怎么保证 Exactly-Once

考虑下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
Pipeline p = Pipeline.create();
PCollection<String, Long> perUserCounts = p.apply(ReadFromUnboundedSource.read())
.apply(new KeyByUser())
.into(FixedWindows.of(Duration.ofMinutes(1)))
.apply(Count.perKey());
perUserCounts.apply(new Sink1());

perUserCounts.apply(Values.<>Create())
.apply(Count.globally())
.apply(new Sink2());

p.run();

这里我们计算了两个数据, 一个是对于每个 1 分钟的时间窗口, 计算每个用户的 count. 另一个是计算每个 1 分钟的时间窗口中, 所有用户的 count 之和.

就像 Spark, M/R 等处理引擎一样, 像 count by key 这种操作会导致 Shuffle. key 一样的数据会 Shuffle 到一个 worker 做处理. 由于还有时间窗口这个东西, 这里的 key 还会包含时间窗口 (的起始值).

那么现在, 为了保证 exactly-once, 我们实际上要保证在三个地方都有 exactly-once 的语义:

  1. Shuffle 的过程中, 一个 record 之后被 shuffle 一次 (同一个 record “等效于”只会被接收端收到一次).
  2. 读 Source 的过程中, 一个 source record 只会被处理一次.
  3. Sink 写数据要保证正确性.

Shuffle 的 Exactly-Once

由于 Shuffle 是通过 RPC 实现的, 而 RPC 会失败, 所以 Dateaflow 使用不断重试直到被 ACK 才算完这种方式保证 at-least-once.

有了 at least once 的保证, 接下来只需要做到去重就可以了. Dataflow 通过给每个发送的 message 加上一个 unique id, 然后接受端通过保存所有已经收到的 id 来做去重. 由于 dataflow 本身就构建于一个 scalable 的 kv 存储, 所以可以直接用.

使用一个唯一的 ID 看似能解决这个问题, 实际上还是有一个坑的. 怎么给 message 赋予这个 ID 呢? 这个 ID 不但需要唯一, 同时还需要和实际发出的消息一一对应, 否则要是重试的话, 相同的消息就会有不同的 ID 从而被下游处理多次. 看到这里, 你可能会想, 那这个 ID 就用一个 HASH 算法根据消息生成一个. 先不说 HASH 碰撞这种小概率事件, 另一个问题在于, 用户的自定义代码不一定是确定的, 两次重试的 output 可能就不一样了, 那么 ID 也会不一样.

那么, 为什么不强制要求自定义代码满足确定性要求呢? 答案很简单, 因为这个限制太强了. 很多数据处理工作可能要查询外部数据 (which can change). 即使不查询外部数据, 但是常见的基于事件时间的 aggregation 操作也会因为数据实际到达时间这个不确定性, 而实际上不确定.

为了解决这个问题, DataFlow 使用了 checkpoint 来保证即使不确定的过程, 也能保证 effectively 确定. 一个 transform 的每一个输出, 都会持久化到某一个稳定的存储. 以后要是 shuffle 需要重试, 就只要重试这些存储了的 record 即可. 这里要保证所有的输出都是同一次执行产生的结果, 这个可以通过 epoch 之类的机制保证.

到这里, 保证 Shuffle 的 Exactly-Once 的方法就确定了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Transform
for each record
let (uniqueId, output) = transform(record)
persist(uniqueId, output)

# Shuffle Sender
for (uniqueId, output) in storage
send(uniqueId, output)

# Shuffle Receiver
for (uniqueId, output) in received
if uniqueId in processedIds
continue
process output

这里实际上还剩一个问题, 这个 UniqueID 到底是怎么来的, 这个下面会解释.

性能问题

上面的方法看上去很美好, 因为逻辑真的很简单, 但是很显然, 这样的方法如果一不小心就会有性能问题.

  1. 每一个 transform 都要持久化
  2. 每收到一个 record 都要查看是否已经处理过

这两个操作一旦数据量一大, 开销就会很大. 为了避免这两个问题, DataFlow 在实现的时候使用了两种方法:

  1. DAG 优化
  2. Bloom Filter

DAG 优化就不多做介绍了, FlumeJava, Spark 这些框架都有这种优化. BloomFilter 的大概原理相信大家也很熟悉. 这里一个实现上的小细节是, 为了避免一个 BF 随着数据越来越多饱和从而让正确率下降, DataFlow 会每隔一段时间就创建一个新的 BF. 每个 Record 会带一个系统时间戳, 然后分配到对应的 BF 中.

既然 BF 是每隔一段时间就创建一个新的 BF, 就需要对很早之前创建的 BF 进行垃圾回收. 这个垃圾回收基于系统时间戳来做, 这实际上就是一个 processing-time watermark.

到这里我想有人会问, 要是一个 record 来了, 但是发现它的时间戳对应的 BF 已经被回收了怎么办? 这个情况实际上不会发生, 因为这个 record 的发送方只有在收到 ACK 之后才会 advance 自己的 watermark, 而它的下游, 也就是持有 BF 的这一方的 watermark 一定小于等于这个发送方的 watermark. 如果出现 BF 被回收, 那么上游已经 advance 过自己的 watermark 了. 这就意味着这个 record 之前一定已经发过了.

Source 的 Exactly-Once

Source 要做到 Exactly-Once 相对比较简单, 但是对于 Source 有一个要求, 即 Source 必须是 Replayable 的. 其实这个很好理解. 由于处理过程中可能出现问题, 要是一条记录处理失败了, 就再也找不回来了, 那真的是回天乏术了. 现在有很多 Source 都支持 Replayable, 比如文件, Kafka, Cloud Pubsub 等等.

Replayable 这个要求很好理解, 但是为什么只要 Replayable 就能保证 Exactly-Once 呢? 有了 Replayable, 我们能很容易的保证 at-least-once 的语义. 而为了保证 exact-once, DataFlow 区分了两种类型的 Source 并分别做了处理:

  1. 确定性的 Source, 比如说文件, Kafka. 一个 Worker 都能以某种确定的顺序读取数据, 那么就能分配一个固定的 ID, 用于去重即可.
  2. 不确定的 Source, 比如 Cloud Pubsub. 一条 Message 可能会被任意的 Worker 读取到. 为了解决这个问题, DataFlow 封装了一层, 从 Cloud Pubsub 拉取消息之后将 Message ID Shuffle 了一下保证每个 Worker 处理的消息是确定的, 这样就可以使用之前 Shuffle 过程中的方式保证 Exact-Once.

(填坑: 这里的 Unique ID 就可以用作 Shuffle 中的 Unique ID)

Sink 的 Exactly-Once

为了保证写 Sink 时能有 Exactly-Once 的语义, 一个基本要求是 Sink 支持幂等的写操作. 如果写操作不幂等, 那么就不能保证 Exactly-Once 语义了, 因为只要写操作失败后重试 (但实际上已写入), 就可能会出现同一个数据写多次的问题.

但是, 有了幂等性并不够. 比如说, 如果处理过程不是确定性的 (比如会调用一个外部服务等等), 那么被写入的数据每次重试都可能不同. 如果第一次重试写了部分数据之后失败了, 第二次重试要写的数据可能不一样了, 这个时候幂等性就爱莫能助了. DataFlow 提出了一个巧妙的解决办法, 在写数据前面加一层 Shuffle. 这样之前的数据都会写到一个持久化的存储. 如果之前的处理写到一半失败了, 之前写的也会被弃用. 这样, 我们能保证 Shuffle 之后的数据是一致的 (是同一次 Attempt 产生的数据). 而写数据的重试, 只会读 Shuffle 后的确定的数据, 有了幂等性的保证, 就可以保证 Exactly-Once 了.