Flink四大基石——4.Checkpoint容错机制

it2025-09-28  5

1.State Vs Checkpoint

State:

维护/存储的是某一个Operator的运行的状态/历史值,是维护在内存中!

一般指一个具体的Operator的状态(operator的状态表示一些操作/算子在运行的过程中会产生的一些历史结果,如前面的maxBy底层会维护当前的最大值,也就是会维护一个keyedOperator,这个State里面存放就是maxBy这个Operator中的最大值)

State数据默认保存在Java的堆内存中/TaskManage节点的内存中

State可以被记录,在失败的情况下数据还可以恢复

Checkpoint:

某一时刻,Flink中所有的Operator的当前State的全局快照,一般存在磁盘上

表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有Operator的状态

可以理解为Checkpoint是把State数据定时持久化存储了

比如FlinkKafkaConsumer算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取

Checkpoint就是State的全局的分布式快照

注意:

Flink中的Checkpoint底层使用了Chandy-Lamport algorithm分布式快照算法可以保证数据的在分布式环境下的一致性!

https://zhuanlan.zhihu.com/p/53482103

Chandy-Lamport algorithm算法的作者也是ZK中Paxos 一致性算法的作者

https://www.cnblogs.com/shenguanpu/p/4048660.html

Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后续Spark的StructuredStreaming也借鉴了该算法

2.Checkpoint的执行流程

JobManager创建CheckpointCoordinator,并根据设置的Checkpoint时间间隔,向SourceOperator发送Barrier栅栏/其实就是告诉SourceOperator要将State进行快照/进行Checkpoint的命令!

SourceOperator接收到Barrier会暂停当前的工作,并异步调用API将当前的State状态数据保存到指定位置,一般为HDFS,并和CheckpointCoordinator确认已经完成Checkpoint操作,同时将Barrier发送给下游的TransformationOperator,同时恢复自己的工作!

下游的TransformationOperator接收到Barrier,同样也暂停当前的工作,并异步调用API将当前的State状态数据保存到指定位置,一般为HDFS,并和CheckpointCoordinator确认已经完成Checkpoint操作,同时将Barrier发送给下游,同时恢复自己的工作!

直到Barrier被发送给SinkOperator,SinkOperator同样也暂停当前的工作,并异步调用API将当前的State状态数据保存到指定位置,一般为HDFS,并和CheckpointCoordinator确认已经完成Checkpoint操作,同时恢复自己的工作!

CheckpointCoordinator接收到所有的Operator的确认消息,那么本次Checkpoint结束!如果有没有收到的,超时之后可以认为失败,Checkpoint失败可以让任务失败也可以不管,直接进行下一次Checkpoint!这些都可以通过配置参数来实现!   注意: 1.在往介质(如HDFS)中写入快照数据的时候是异步的(为了提高效率) 2.分布式快照执行时的数据一致性由Chandy-Lamport algorithm分布式快照算法保证!

3.State状态后端/State存储介质

注意:

前面学习了Checkpoint其实就是Flink中某一时刻,所有的Operator的State的全局快照,

那么快照应该要有一个地方进行存储,而这个存储的地方叫做状态后端

Flink中的State状态后端有很多种:

MemStateBackend[了解]

FsStateBackend–一般情况使用

RocksDBStateBackend—超大状态使用

三种状态存储介质的配置方式

全局配置

修改flink-conf.yaml #这里可以配置 #jobmanager(即MemoryStateBackend), #filesystem(即FsStateBackend), #rocksdb(即RocksDBStateBackend) state.backend: filesystem state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints

代码配置

/1.MemoryStateBackend--开发中不用 env.setStateBackend(new MemoryStateBackend) //2.FsStateBackend--开发中可以使用--适合一般状态--秒级/分钟级窗口... env.setStateBackend(new FsStateBackend("hdfs路径或测试时的本地路径")) //3.RocksDBStateBackend--开发中可以使用--适合超大状态--天级窗口... env.setStateBackend(new RocksDBStateBackend(filebackend, true)) 注意:RocksDBStateBackend还需要引入RocksDB依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.7.2</version> </dependency>

总结:

后面的学习测试和开发都使用:

env.setStateBackend(new FsStateBackend(“hdfs路径或测试时的本地路径”))

特殊情况下的超大状态用:

env.setStateBackend(new RocksDBStateBackend(filebackend, true))

最新回复(0)