flink 支持有状态的流,存储历史的状态信息。
状态分类 keystate keyBy/groupBy/PartitonBy 后,每个key都有属于自己的一个state
liststatevaluestatemapstateoperatorstate-flink source connector 的实现会用operatorState 来记录source 数据读取的offset
broadcaststateliststateunionliststate 三种状态存储方式机使用场景 flink 默认是RocksDB+HDFS的方式进行存储。源头是多个数据流,同步的增加barrier ,同时在job处理的过程中,为了保证job失败的额时候可以从错误中恢复,flink 还对barrier进行align 对其操作 snapshots 全局一致性镜像,快照的内容是state,比如source offset 或者 算子累加的结果。
no replay resource
replay resource
checpointed replay source flink 内部的exactly-onece exactlyOnce 不是数据只处理一次,而是数据只影响数据结果一次
barrier对齐就是exactly once ,barrier不对齐就是at least once barrier从source Task处生成,一直流到Sink Task,期间所有的Task只要碰到barrier,就会触发自身进行快照 flink 容错的核心时barrier alignment(组标记栏) 设置方式: env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
作业恢复续跑命令
从checkpoint 恢复 bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\chk-10 控制台页面提交submit job支持恢复从savepoint 恢复 1.停止作业savepoint bin/flink cancle -s 作业id 创建savepoint 并进行停止作业 2.从savepoint 恢复 bin/flink run -s file:///D:\\DATA\\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd作业拓扑变化的升级续跑
作业不能启动 bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd作业启动,但是state没恢复 bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState 添加参数allowNonRestoredState,作业能过成功,但是state的关系没有恢复。作业能启动,state也能恢复 bin/flink run -s file:///D:\\DATA\cbefc1f3110c2db01f1dc5e25b8e5ba0\\savepoint-10erfdgdfgddfd --allowNonRestoredState 添加uid逻辑,统一的算子和state文件映射关系source外部数据源支持提供数据重放,例如kafka,可以提供的postion进行数据拉取。 要实现CheckpointedFunction接口,实现保存kafka的offset ,实现数据源续跑 snapshotState,initializeState 方法
# taskid 和offset 续跑不对应 getListState list结构,只存offset # taskid 和offset 续跑相对应 getUnionListState,是map结构,存储<taskid,offset> #offset的时候要使用SourceContext.getCheckpointLock来进行同步操作,保证确定性一次语义。 synchronized (ctx.getCheckpointLock()) { ctx.collect(new Tuple3<>("key", ++offset, String.valueOf(System.currentTimeMillis()))); }两阶段提交,sink operator 需要感知整个checkpoint 的完成,并在完成后才将数据存储。 数据满足两个要求
下游组件对事物的支持通过flink两阶段提交协议和预提交阶段,确保提交或回滚一致性的问题 步骤 # cp 完成之前结果写入临时存储 #上一个cp之后生成当前cp的事务ID #cp将当前事务进行持久化 #cp完成后,将当前事务对应的结果数据写入正式外部系统