您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
Flink之容错机制
 
作者:herokang
  1824  次浏览      18
 2021-10-18
 
编辑推荐:
本文主要介绍Checkpoint的实现算法,checkpoint对性能的影响,以及如何使用checkpoint等,希望对您的学习有所帮助。
本文来自于CSDN,由Alice编辑、推荐。

一、Checkpoint,一致性检查点

flink故障恢复机制的核心就是checkpoint

有状态的流应用的一致性检查点,其实就是所有任务的状态,在某个时间点的一份快照,这个时间点是指所有任务都恰好处理完一个相同的输入数据的时候

如上图所示:此应用有一个source task,消费一个递增数的流,如1,2,3等等。流中的数据被分区到一个奇数流,一个偶数流。在一个sum operator中,有两个task,分别用于累加奇数与偶数。Source task 存储当前输入流的偏移量作为state。Sum task 将当前的累加和作为state并存储。图中,在输入偏移量为5时,Flink做了一个检查点,此时两个task的累加和分别为6和9。如果任务处理到6/7时发生故障,则重启任务后可以从checkponint中拿到5时所欲任务的状态,从5开始重新计算

Flink的Exactly-once需要从最近的一份快照开始重放数据, 因此这也和数据源的能力有关, 不是所有的数据源都可以提供Exactly-once语义的. 以下是apache官网列出的数据源和Exactly-once语义保障能力列表.

二、Checkpoint的实现算法

基于Chandy-Lamport算法的分布式快照原理

将检查点的保存和数据处理分开,不暂停整个应用

Flink的检查点算法使用了一个特殊的record类型,称为一个检查点分界(checkpoint barrier)。类似于水印,检查点barriers由source operator注入到常规的流记录中,并且无法被其他records 赶超。每个检查点barrier会携带一个检查点ID,用于辨别它属于哪个检查点,并且将一个流在逻辑上分成两部分。在一个barrier之前,对state的所有修改,包含于此barrier的检查点。若是在一个barrier之后对state的所有修改,则包含于下一个检查点。

我们依然用之前的奇偶数sum例子来说明:

JobManager 向每个source task发送一条包含一个新checkpointID的消息,以初始化一个检查点,如下图:

当一个source task 收到这条checkpoint消息时,它会停止释放数据,将offset保存到checkpoint并通知JobManager,广播检查点barrier给下游的所有任务 。在广播消息发出后,source继续它的常规操作。barrier则被注入添加到它后续的输出流。

检查点barrier被广播到所有相连的并行tasks中。当一个task收到一个新检查点的barrier时,它会等待barriers从它所有的输入分区到达(图中等待黄色和蓝色的三角2都到达)。在它等待时,新来的数据蓝色的4不会被立即处理,而是被放入缓存。

一旦一个task从它所有输入分区中,收到了全部的barriers。它开始在state backend初始化检查点,并广播检查点barrier到它所有的下游tasks,如下图:8,8保存到checkpoint

在所有检查点barriers已经被释放后,task开始处理被缓存的记录。在所有被缓存的记录被释放后,task 继续处理它的输入流。下图显示了应用在这个时间点的运行状况:

最终,检查点barriers 到达一个sink task。当一个sink task 收到一个barrier时,它会做一个barrier 调整(alignment),给它自己的状态做检查点,并向JobManager确认(acknowledge)它已收到barrier。JobManager在收到一个application的所有task发送的checkpoint acknowledge后,它会记录:此application的检查点完成。下图显示了检查点算法的最后一步,完成的检查点可以用于从故障中恢复一个application。

三、checkpoint对性能的影响

Flink的检查点算法可以在不停止整个application的情况下,从流应用中生成一致性分布式的检查点。然而,它会增加application的处理延时(processing latency)。Flink 实现了轻微调整,以在某些特定条件下缓解性能影响。

在一个task对它的状态做检查点时,它会阻塞,并缓存它的输入。因为state可以变的很大,并且检查点的操作需要通过网络写入数据到一个远端存储系统,所以做检查点的操作可能会很容易就花费几秒到几分钟,这对于延时敏感的application来说,延时过长了。在Flink的设计中,做一个检查点是由state backend负责的。一个task的state如何精确的被复制,取决于state backend的实现。例如,FileSystem state backend与RocksDB state backend支持异步做检查点。当一个检查点被触发时,state backend在本地创建一个检查点的副本。在本地副本创建完成后,task继续它的正常处理。一个后端线程会异步地复制本地快照到远端存储,并在它完成检查点后提醒task。异步检查点可以显著地降低一个task从暂停到继续处理数据,这中间的时间。另外,RocksDB state backend也有增量检查点的功能,可以减少数据的传输量。

另一个用于减少检查点算法对处理延时影响的技术是:微调barrier排列步骤。若是一个应用需要非常短的延时,并且可以容忍at-least-once 状态保证。Flink可以被配置为在buffer alignment时对所有到达的记录做处理,而不是将这些记录为已经到达的barrier缓存下来。对于一个检查点,在它所有的barriers都到达后,operator为它的状态做检查点,现在这里可能也会包括:本应属于下一个检查点的records对state 做的修改。在错误发生时,这些records会被再次处理,也就是说,这里检查点提供的是at-least-once 一致性保证,而不是excatly-once 一致性保证。

四、保存点(Savepoints)

原理上,保存点与检查点用的是相同的算法创建的,所以保存点其实就是:检查点加上一些额外的元数据。Flink不会自动做一个保存点,所以一个用户(或是外部调度器)需要明确地触发创建保存点。Flink也不会自动清理保存点。

保存点可以做很多事情,比如定期手动备份,更新应用程序(改bug),flink版本升级,集群迁移,暂停和重启应用等

五、使用checkpoint

object CheckPointTest {
def main(args: Array[String]) {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//开启checkpoint每分钟checkpoint一次
env.enableCheckpointing(60000)
//选择checkpoint的状态后端
env.setStateBackend(new FsStateBackend("hdfs:// namenode: 9000 / flink/ checkpoints"))
//设置重启策略,也可以在配置文件配,最大三次重启,每次间隔10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.seconds(10)))
//设置重启策略,也可以在配置文件配,10s内三次重启,每次间隔1s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,
org.apache.flink.api.common.time.Time.seconds(10),
org.apache.flink.api.common.time.Time.seconds(1)))
//默认EXACTLY_ONCE
env.getCheckpointConfig.setCheckpointingMode ( CheckpointingMode . EXACTLY_ONCE)
//设置checkpoint超时时间超时后放弃本次checkpoint
env.getCheckpointConfig.setCheckpointTimeout(600000)
//checkpoint出现异常时是否主动把应用程序job fail掉,
默认是 true 就是 checkpoint失败人物也会失败
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
//最大并行checkpoint任务数,checkpoint太频繁会影响性能
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//两次checkpoint的最小时间间隔ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000)
//是否开启checkpoint外部持久化,
默认任务失败检查点 保存的数据会被删除
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:
即使手动取消任务也不要删除保存点
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:
手动取消任务删除保存点
env.getCheckpointConfig.enableExternalizedCheckpoints
( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
}
}

六、state backend 状态后端

flink的状态后端负责两件事,state backend,负责本地状态管理,状态的存储访问及维护;以及将检查点(checkpoint)状态写入远程存储

可以通过以下代码设置(一般是在公司Flink集群端统一配置):

//选择checkpoint的状态后端
env.setStateBackend(new FsStateBackend
("hdfs://namenode:9000/flink/checkpoints"))

状态后端选择:

MemoryStateBackend,内存及的状态后端,会将健控状态存储在TaskManager的jvm堆上,而降checkpoint存储在JobManager的jvm堆上

特点:快速、低延时、但不稳定,生产不用

FsStateBackend:本地状态和上一个一样,checkpoint存储在远程的持久化文件系统上

特点,快速低延时更好的稳定性,不是超大型数据一般使用这种即可

RocksDBStateBackend:将所有数据序列化后存入本地的RocksDB中存储

更加重量级,不会受服务器内存和gc影响

配置状态后端:

第一种:单任务调整

修改当前任务代码

env.setStateBackend(new FsStateBackend(“ hdfs://namenode:9000/flink/checkpoints”));

或者new MemoryStateBackend()

或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.7.0</version>
</dependency>

第二种:全局调整

修改flink-conf.yaml

state.backend: filesystem

state.checkpoints.dir: hdfs: // namenode : 9000/flink/checkpoints

注意:state.backend的值可以是下面几种:jobmanager (MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

七、保存多个Checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

state.checkpoints.num-retained: 20

在HDFS的相应文件夹下面会产生多个checkpoint文件。

八、从Checkpoint进行恢复

如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3 / flink-checkpoints / 582e17d2cc343e6c56255d111bae0191 / chk-860/_metadata flink-app-jobs.jar

从上面我们可以看到,前面Flink Job的ID为 582e17d2cc343e6c56255d111bae0191 ,所有的 Checkpoint文件都在以Job ID为名称的目录里面,当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID(这里是11bbc5d9933e4ff7e25198a760e9792e),而对应的Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863等等。

九、Flink Savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用 Flink 的 Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中。

Flink程序中包含两种状态数据,一种是用户定义的状态(User-defined State),他们是基于Flink的Transformation函数来创建或者修改得到的状态数据;另一种是系统状态(System State),他们是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录。为了能够在创建 Savepoint 过程中,唯一识别对应的Operator的状态数据,Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定Operator ID,Flink也会我们自动生成对应的Operator状态ID。

而且,强烈建议手动为每个Operator设置ID,即使未来Flink应用程序可能会改动很大,比如替换原来的Operator实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。另外,保存的Savepoint状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以如果未来Flink程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的Savepoint正确地恢复。

下面,我们以Flink官网文档中给定的例子,来看下如何设置Operator ID,代码如下所示:

DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID

十、创建Savepoint

创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定:

一种是,需要配置Savepoint的默认路径,需要在Flink的配置文件 conf/flink-conf.yaml 中,添加如下配置,设置Savepoint存储目录,例如如下所示:

state.savepoints.dir: hdfs://namenode01.td.com /flink-1.5.3/ flink-savepoints

另一种是,在手动执行savepoint命令的时候,指定Savepoint存储目录,命令格式如下所示:

bin/flink savepoint :jobId [:targetDirectory]

例如,正在运行的Flink Job对应的ID为 40dcc6d2ba90f13930abce295de8d038,使用默认 state.savepoints.dir 配置指定的Savepoint目录,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

可以看到,在目录hdfs://namenode01.td.com/flink-1.5.3 /flink-savepoints /savepoint-40dcc6-4790807da3b0 下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。

为正在运行的Flink Job指定一个目录存储Savepoint数据,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs:// namenode01.td.com / tmp / flink / savepoints

可以看到,在目录 hdfs://namenode01.td.com/tmp/flink/savepoints/ savepoint-40dcc6-a90008f0f82f下面生成了ID为40dcc6d2ba90f13930abce295de8d038的Job的Savepoint数据。

十一、从Savepoint恢复

现在,我们可以停掉Job 40dcc6d2ba90f13930abce295de8d038,然后通过Savepoint命令来恢复Job运行,命令格式如下所示:

bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/ savepoint-40dcc6-a90008f0f82f flink-app-jobs .jar

可以看到,启动一个新的 Flink Job ,ID为 cdbae3af1b7441839e7c03bab0d0eefd 。

 

   
1824 次浏览       18
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划
信息架构建模(基于UML+EA)3-21[北京]
软件架构设计师 3-21[北京]
图数据库与知识图谱 3-25[北京]
业务架构设计 4-11[北京]
SysML和EA系统设计与建模 4-22[北京]
DoDAF规范、模型与实例 5-23[北京]
 
最新文章
大数据平台下的数据治理
如何设计实时数据平台(技术篇)
大数据资产管理总体框架概述
Kafka架构和原理
ELK多种架构及优劣
最新课程
大数据平台搭建与高性能计算
大数据平台架构与应用实战
大数据系统运维
大数据分析与管理
Python及数据分析
更多...   
成功案例
某通信设备企业 Python数据分析与挖掘
某银行 人工智能+Python+大数据
北京 Python及数据分析
神龙汽车 大数据技术平台-Hadoop
中国电信 大数据时代与现代企业的数据化运营实践
更多...