flink checkpoint
1 原理
flink通过checkpoint来进行状态保存,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态, 保证应用流图状态的一致性。
Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法,通过checkpoint频率时间定时发送barrier消息。 (分布式快照算)
barrier从Source Task处生成,一直流到Sink Task,期间所有的Task只要碰到barrier,就会触发自身进行快照; 通过保存每个task的快照,即保存整个流的即时时状态。
2 barrier 对齐
当一个opeator有多个输入流的时候,checkpoint barrier n 会进行对齐,就是已到达的会先缓存到buffer里等待其他未到达的,一旦所有流都到达,则会向下游广播,exactly-once 就是利用这一特性实现的,at least once 因为不会进行对齐,就会导致有的数据被重复处理.
举个例子说明: 上述source2个分区数据, 当到达operator处理的时候,如果source1状态保存后,会向下游发送barrier, operator收到后,立即处理算出结果,保存状态。 此时source2由于网络或处理能力问题,延时厉害,当source2终于处理完时,source1又已经发送了其它2条数据数据,处理完毕。那么source2的数据实际上加上到处理完毕,实际计算了多2条数据。此时系统异常重启,恢复状态会把这2条数据重新参与计算导致重复消费。
Exactly Once时必须barrier对齐,如果barrier不对齐就变成了At Least Once;
3 checkpoint过程。
- 1、JobManager端的 CheckPointCoordinator向 所有SourceTask发送CheckPointTrigger,Source Task会在数据流中安插CheckPoint barrier
- 2、当task收到所有的barrier后,向自己的下游继续传递barrier,然后自身执行快照,并将自己的状态异步写入到持久化存储中。增量CheckPoint只是把最新的一部分更新写入到 外部存储;为了下游尽快做CheckPoint,所以会先发送barrier到下游,自身再同步进行快照
- 3、当task完成备份后,会将备份数据的地址(state handle)通知给JobManager的CheckPointCoordinator;
如果CheckPoint的持续时长超过 了CheckPoint设定的超时时间,CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator就会认为本次CheckPoint失败,会把这次CheckPoint产生的所有 状态数据全部删除。 - 4、 最后 CheckPoint Coordinator 会把整个 StateHandle 封装成 completed CheckPoint Meta,写入到hdfs。
4 checkpoint存储方式。
4.1 RocksDB实现增量checkpoint
state backend中提供了一种RocksDb存储checkpoint ,它是Flink提供的唯一可以实现增量checkpoint的方法。原理是每次生成checkpoint是会生成sst文件(不会再修改了),会和之前的文件进行对比,每次上传新增的sst文件即可,大概就是这样
4.2 全量checkpoint
默认情况下是保存在Jm的内存中,由于可能会比较大,可以存在状态后端中,生成中建议放hdfs.
5 checkpoint代码设置:
6 checkpoint配置文件flink-conf.yaml
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html
- state.backend
- 用于指定checkpoint state存储的backend,默认为none[默认保存在JM中]
- 可以设置如下值: 'jobmanager', 'filesystem', 'rocksdb', or the <class-name-of-factory>.
- state.backend.async
- 用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持async的state backend可能会忽略这个参数
- state.backend.fs.memory-threshold,默认为1024,
- 用于指定存储state数据文件的最小值,如果小于该值则会存储在root checkpoint metadata file
- state.backend.incremental,默认为false,
- 用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置
- state.backend.local-recovery,默认为false
- 本地状态恢复。目前仅keyed state backends支持。 (基于同样的key记录创建的状态)
- 参考https://www.cnblogs.com/zackstang/p/11737007.html
- state.checkpoints.dir,默认为none,
- 用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见 例如保存HDFS: hdfs://namenode-host:port/flink-checkpoints
- state.checkpoints.num-retained,
- 默认为1,用于指定保留的已完成的checkpoints个数
- state.savepoints.dir,默认为none
- 用于指定savepoints的默认目录
- taskmanager.state.local.root-dirs,默认为none,对应参数state.backend.local-recovery