flink状态和容错(三)

it2024-03-15  54

目录

state容错语义at most onceat least onceexactly-once 容错续跑算子容错续跑task异常作业逻辑不变CP/SP策略bugfix升级续跑SP策略 source容错续跑sink 容错续跑

state


flink 支持有状态的流,存储历史的状态信息。

状态分类 keystate keyBy/groupBy/PartitonBy 后,每个key都有属于自己的一个state

liststatevaluestatemapstate

operatorstate-flink source connector 的实现会用operatorState 来记录source 数据读取的offset

broadcaststateliststateunionliststate 三种状态存储方式机使用场景 flink 默认是RocksDB+HDFS的方式进行存储。

容错语义


源头是多个数据流,同步的增加barrier ,同时在job处理的过程中,为了保证job失败的额时候可以从错误中恢复,flink 还对barrier进行align 对其操作 snapshots 全局一致性镜像,快照的内容是state,比如source offset 或者 算子累加的结果。

at most once

no replay resource

at least once

replay resource

exactly-once

checpointed replay source flink 内部的exactly-onece exactlyOnce 不是数据只处理一次,而是数据只影响数据结果一次

barrier对齐就是exactly once ,barrier不对齐就是at least once barrier从source Task处生成,一直流到Sink Task,期间所有的Task只要碰到barrier,就会触发自身进行快照 flink 容错的核心时barrier alignment(组标记栏) 设置方式: env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

容错续跑


算子容错续跑

只设置重启策略,不设置checkpoint state不会保留,每次重启都会重新计算设置重启策略,设置checkpoint方式 state 会保留在checkpoint中,计算结果包含以前的结果。默认state存储在jobmanager内存中。设置重启策略,设置checkpoint,设置存储后端是fs。 state 存储在后端fs,作业停止后,默认会删除checkpoint。设置重启策略,设置checkpoint,设置存储后端是fs,开启保存cp. 支持继续跑 senv.enableCheckpointing(1000); senv.setStateBackend(new FsStateBackend("file:///D:\\DATA")); senv.ssetRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(1,TimeUnit.MILLISECONDS) )); senv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

task异常作业逻辑不变CP/SP策略

作业恢复续跑命令

从checkpoint 恢复 bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\chk-10 控制台页面提交submit job支持恢复从savepoint 恢复 1.停止作业savepoint bin/flink cancle -s 作业id 创建savepoint 并进行停止作业 2.从savepoint 恢复 bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd

bugfix升级续跑SP策略

作业拓扑变化的升级续跑

作业不能启动 bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd作业启动,但是state没恢复 bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState 添加参数allowNonRestoredState,作业能过成功,但是state的关系没有恢复。作业能启动,state也能恢复 bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState 添加uid逻辑,统一的算子和state文件映射关系

source容错续跑

source外部数据源支持提供数据重放,例如kafka,可以提供的postion进行数据拉取。 要实现CheckpointedFunction接口,实现保存kafka的offset ,实现数据源续跑 snapshotState,initializeState 方法

# taskid 和offset 续跑不对应 getListState list结构,只存offset # taskid 和offset 续跑相对应 getUnionListState,是map结构,存储<taskid,offset> #offset的时候要使用SourceContext.getCheckpointLock来进行同步操作,保证确定性一次语义。 synchronized (ctx.getCheckpointLock()) { ctx.collect(new Tuple3<>("key", ++offset, String.valueOf(System.currentTimeMillis()))); }

sink 容错续跑

两阶段提交,sink operator 需要感知整个checkpoint 的完成,并在完成后才将数据存储。 数据满足两个要求

下游组件对事物的支持通过flink两阶段提交协议和预提交阶段,确保提交或回滚一致性的问题

步骤 # cp 完成之前结果写入临时存储 #上一个cp之后生成当前cp的事务ID #cp将当前事务进行持久化 #cp完成后,将当前事务对应的结果数据写入正式外部系统
最新回复(0)