Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

Snapshot Algorithm

分布式快照算法是拿来干嘛的?

在缺乏全局时钟或者全局时钟不可靠的分布式系统确定全局状态

A snapshot algorithm is used to create a consistent snapshot of the global state of a distributed system. Due to the lack of globally shared memory and a global clock, this isn’t trivially possible.

Global Snapshot

全局快照我们也可以理解为全局状态,主要用于在Failure Recovery。

我们把分布式系统简化为以节点表示的进程状态图,进程之间以消息队列进行通信。

这里的消息队列有两类input channel 和 output channel,channel可以看作是一个无限大的FIFO队列。

队列中收到的message都是有序无重复的。

C-L分布式快照算法通过记录每个进程的local state和input channel中有序的message作为一个局部快照。

那么global snapshot就是把所有进程的local snapshot全部合并起来

Chandy-Lamport

考虑一个分布式系统,这些进程运行在不同的物理机器。一个分布式系统全局状态就是进程的状态和队列中的message

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

考虑一个分布式系统,其中有两个节点,即两个进程再循环传递令牌。p和q,s0,s1对应的是两个状态。token可以理解为一个令牌,全局只有一个,进程之间通过消息队列发送令牌

s0表示进程不持有令牌,s1表示进程持有令牌。

两个进程之间的连线代表消息队列,token可能处于消息队列中,这时候两个进程状态都是s0,都没有令牌。

进程是有可能崩溃的
我们要保证进程崩溃重启之后,系统仍然能够正常运行,于是我们需要从某个检查点恢复程序的运行状态,这就需要定时把系统中在某个时间点的状态保存起来,也就是做一次snapshot

保存什么东西?

保存每个节点在当前的状态以及消息队列在当前的状态(消息队列的message)。

eg:

对上图来说,我们对右上角的状态做一个snapshot,保存的状态如下:

p s0
q s0
p->q token
q->p empty

但是由于pq是两个进程,时间不同步,例如p进程在发送token之后进行了快照存储,q在p发送token之前进行snapshot.

1.p在snapshot的时候由于p已经发送了token,现在token在q进程队列中,所以p进程保存快照的时候认定token既不在p也在p的接受队列
2.q在p发送之前快照,这时候token还在p受伤,因此q进程认定token不在q也不在q进程的队列中

导致全局快照中token消失不见

解决办法
Marker-Receiving Rule

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制
p发送完token后发起snapshot,发送marker给q,q收到marker就保存本地状态。由于queue是FIFO的,q先接收到token,再接收到marker,所以保存状态的时候肯定会知道自己已经拿了token,所以记录自己为s1.并且p->q=empty

然后q再发送marker给p,p接收到marker,检查p保存状态后到接收到marker之前有没有收到信息,由于没有收到,所以设置q->p为empty

这就是保存了右下角图片的状态。

接下来具体介绍一下CL算法的流程,主要三个部分:

1.initiate a snapshot 系统中任意一个进程发起一个snapshot
2. Propagting a snapshot 系统中其他进程逐个创建snapshot
3. Terminating a snapshot 算法结束

  • Initiating a snapshot
    1.进程Pi发起,记录自己的进程状态,同时产生一个标志信息marker,marker和进程通信的message不同
    2.将marker信息通过output channel发送到其他系统的进程
    3.记录所有从input channel接收到的message

  • Propagting a snapshot

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

1.进程i从input channel接收到marker信息
2.如果P还没有记录到自己的进程状态,则:
P记录自己的进程状态,同时将 channel C 置为空,并且向 output channel 发送 marker 信息
3.记录其他 channel 在收到 marker 之前的 channel 中收到所有 message

啥意思?

比如 Pj 做完 local snapshot 之后 Ckj 中发送过来的 message 为 [a,b,c,marker,x,y,z] 那么 a, b, c 就是进程 Pk 做 local snapshot 前的数据,Pj 对于这部分数据需要记录下来,比如记录在 log 里面。而 marker 后面 message 正常处理掉就可以了。

  • Terminating a snapshot

所有的进程都收到 marker 信息并且记录下自己的状态和 channel 的状态(包含的 message)

例子:蓝色表示全局snapshot

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制
初始状态如上:

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

  • P1发起全局snapshot记录
  • P1先记录本身的进程状态,然后向P2发送marker信息
  • 在marker信息到达P2之前,P2向P1发送message: M

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

  • P2 收到 P1发送过来的marker信息之后,记录自己的状态。
  • 然后 P1 收到 P2 之前发送过来的 message: M。
  • 对于 P1 来说,从 P2 channel 发送过来的信息相当于是 [M, marker],由于 P1 已经做了 local snapshot,所以 P1 需要记录 message M。
    Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

所以我们就保持了全局snapshot的状态
Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

Flink 分布式快照机制

flink通过使用分布式快照来提供容错服务。这些快照就充当checkpoint,系统在发生故障时候就回滚。

当某个节点宕机之后,flink将会停止分布式数据流。然后系统会重新启动操作算子并且将其重新设置为最新的checkpoint.
输入流将重置为状态快照记录的位置,作为重新启动的并行数据流的一部分。

flink的checkpint就是基于分布式快照实现的。

  • checkpoint 与 state

checkpoint 在flink中指的是一个执行操作,最终产生结果作为分布式快照提供容错机制

state 构成checkpoint的数据构成,指的是流式计算中持久化的状态

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

checkpoint执行机制

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

ckpoint执行流程:

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制
Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制最后Coordinator把完整的checkpoint数据存储到checkpoint coordinator.

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制
checkpoint执行流程

  • checkpoint仅支持flink在内部实现EXACTLY ONCE 语义,端到端的exactly once需要source 和sink的支持

但是讲到这里还是很懵

barriers是什么鬼?

barriers

  • barriers被注入数据流并与记录一起作为数据流的一部分,向前流动

  • barriers永远不会超过记录,数据流保证严格有序

  • barriers把数据流中的记录分为当前进入快照的记录和下一个快照的记录

  • 每个barrier都有快照的ID,并且barrier之前的记录都进入了该快照。

  • barrier不会中断流的流动,非常轻量级

  • 来自不同快照的多个barrier可以同时在流中出现,也就是不同的快照可以同时进行
    Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

  • 快照n的barrier被插入的位置是快照n所有数据最大的数据,eg: kafka最后一条记录的偏移量。并且最后会把这个位置报告给checkpoint协调器。

  • 然后barrier向下流动,当一个算子从所有输入流都受到快照n的barrier的时候,算子就会为快照n发出barrier,进入到其所有输出流中。

  • 一旦sink操作算子从其所有输入流接收到barrier n,它就会向checkpoint协调器确认快照n完成。

  • 所有sink确认快照后,意味着快照已经完成

一旦完成快照n,job不会再去请求快照n之前的记录,因为这些记录已经通过整个数据流拓扑,就是被处理结束了。

Chandy-Lamport分布式快照学习记录和Flink与实现分布式快照的机制

  • 一旦操作算子从输入流接收到快照barrier n,就不能够处理来自该流的其他任何记录。知道从其他输入接收到barrier n为止 否则会搞混快照n和快照n+1的记录

  • barrier n 所属于的数据流暂时会被搁置,从这些流接受的记录不会被处理,而是放入输入缓冲区

  • 一旦从最后一个流接收到barrier n,操作算子就会发出所有挂起的向后传送的记录,然后再自己发出快照n的barrier

  • 之后会恢复处理所有来自输入流的记录,并且会优先处理来自输入缓冲区的记录

这就是checkpoints的对齐机制

state

当运算符包含任何形式的状态的时候,快照也应该包含这个状态。

状态 state具体指的是?

1.系统状态 :作为运算符计算一部分的数据缓冲区,典型例子是窗口缓冲区。系统在其中收集窗口里的记录,直到窗口被计算或者抛弃
2.用户定义的状态 :可以理解为算子map,filter等
3.操作算子在从输入流接收到所有快照barrier的时候,以及向其输出流发出barrier之前,会对其状态进行写快照。这个时候,在barrier之前的数据的状态更新已经完成。barrier之后的数据不会更新状态。
4.快照的状态占的空间很大,因此可以存储在可配置的状态后端,例如JobManager的内存。在生产环境下,也可以存储在HDFS中。
5.存储状态之后,操作算子确认checkpoint完成,会把快照barrier发送到输出流中,然后继续。

异步状态快照

上面介绍的快照过程是同步过程,当操作算子把状态快照存储到状态后端的时候,会停止处理输入的记录。每次写快照的时候,延迟会比较大。

我们可以把这个过程改为异步,具体做法为

操作算子必须能够生成一个状态对象,该状态对象应以某种方式存储,以便对操作算子状态的进一步修改不会影响该状态对象。copy-on-write是一种解决方法。

至于copy-on-write是什么,请google,这里不赘述。

输入checkpoint的barrier之后,操作算子启动其状态的异步快照复制。会立刻释放其barrier到输出,并继续进行常规流处理。复制过程是放在后台完成的,它会向checkpoint协调器(JobManager)确认快照已经完成。

checkpoint仅仅在所有sink都已经收到barrier并且所有有状态操作算子都确认完成了备份之后才算完成。

Recovery

当失败时,Flink选择最新完成的checkpoint k。 然后,系统重新部署整个分布式数据流,并为每个操作算子重置作为checkpoint k的一部分的快照的状态。 数据源设置为从位置Sk开始读取。 例如在Apache Kafka中,这意味着告诉消费者从偏移量Sk开始读取。

ref

snapshot
snapshot flink