您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
Flink的容错机制
 
作者:大数据面壁者
 
  178  次浏览      50
2021-7-8 
 
编辑推荐:
本文主要介绍了Flink的容错机制状态的一致性、Checkpoint原理、Savepoint原理、checkpoint和savepoint的区别、Kafka+Flink+Kafka 实现端到端严格一次等相关内容。
本文来自csdn,由火龙果软件Anna编辑、推荐。

一、状态的一致性

当在分布式系统中引入状态时,自然也引入了一致性问题。

一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?

一致性级别

在流处理中,一致性可以分为3个级别:

at-most-once(最多一次):

这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。

at-least-once(至少一次):

这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。

exactly-once(严格一次):

这指的是系统保证在发生故障后得到的计数结果与正确值一致.既不多算也不少算

曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二:

保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性

流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。

最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。

Flink的一个重大价值在于,它既保证了exactly-once,又具有低延迟和高吞吐的处理能力。

从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。

端到端的状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。

具体划分如下:

source端

需要外部源可重设数据的读取位置.目前我们使用的Kafka Source具有这种特性: 读取数据的时候可以指定offset

flink内部

依赖checkpoint机制

sink端

需要保证从故障恢复时,数据不会重复写入外部系统. 有2种实现形式:

a)幂等(Idempotent)写入

所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

b)事务性(Transactional)写入

需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)

二、Checkpoint原理

Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。

假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢? 如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。

于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开; 当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。

Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。

Flink的检查点算法

checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性.

快照的实现算法:

a)简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用

b)Flink的改进Checkpoint算法. Flink的checkpoint机制原理来自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照(asynchronous barrier snapshotting)

每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

理解Barrier

流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark).这些barrier不会跨越流中的数据.

每个barrier会把数据流分成两部分: 一部分数据进入当前的快照 , 另一部分数据进入下一个快照 . 每个barrier携带着快照的id. barrier 不会暂停数据的流动, 所以非常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照.

Flink的检查点制作过程

第一步: Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint. 然后Source Task会在数据流中安插CheckPoint barrier

第二步: source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint

第三步: 当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。

第四步: 下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。

第五步: 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。

第六步: 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

严格一次语义: barrier对齐

在多并行度下, 如果要实现严格一次, 则要执行barrier对齐.

当 job graph 中的每个 operator 接收到barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行barrier 对齐(barrier alignment)以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。

1.当operator收到数字流的barrier n时, 它就不能处理(但是可以接收)来自该流的任何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快照 n 的记录和属于快照 n + 1 的记录。

2.接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理。

3.图一中的 Checkpoint barrier n之后的数据 123已结到达了算子, 存入到输入缓冲区没有被处理, 只有等到字母流的Checkpoint barrier n到达之后才会开始处理.

4.一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。

至少一次语义: barrier不对齐

前面介绍了barrier对齐, 如果barrier不对齐会怎么样? 会重复消费, 就是至少一次语义.

假设不对齐, 在字母流的Checkpoint barrier n到达前, 已经处理了1 2 3. 等字母流Checkpoint barrier n到达之后, 会做Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会Checkpoint n之后的数据重新计算. 1 2 3 会被再次被计算, 所以123出现了重复计算.

三、Savepoint原理

Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)

原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点

Flink不会自动创建保存点,因此用户(或外部调度程序)必须明确地触发创建操作

保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等

四、checkpoint和savepoint的区别

五、Kafka+Flink+Kafka 实现端到端严格一次

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证部的状态一致性

source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction

内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

具体的两阶段提交步骤总结如下:

第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”

jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanagerr

sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据

jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成

sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据

外部kafka关闭事务,提交的数据可以正常消费了

六、在代码中测试Checkpoint

package com.flink.charpter07.state;
import org.apache.flink.api .common.functions.FlatMapFunction;
import org.apache.flink.api .common.functions.MapFunction;
import org.apache.flink.api .common.serialization.SimpleStringSchema;
import org.apache.flink.api .java.tuple.Tuple2;
import org.apache.flink.runtime .state.filesystem.FsStateBackend;
import org.apache.flink.streaming .api.CheckpointingMode;
import org.apache.flink.streaming .api.environment.CheckpointConfig;
import org.apache.flink.streaming .api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming .connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming .connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.Properties;
public class S04_CheckPoint {
public static void main(String[] args) throws IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment();
env.setParallelism(2);
Properties properties = new Properties();
properties.setProperty ("bootstrap.servers", "hadoop162:9092, hadoop163:9092");
properties.setProperty ("group.id", "S04_CheckPoint");
properties.setProperty ("auto.offset.reset", "latest");
env.setStateBackend(new FsStateBackend ("hdfs://hadoop162:8020/ flink/checkpoints/fs"));
//设置checkpoint的时间间隔
env.enableCheckpointing(2000);
//设置模式为精准一次(默认值)
env.getCheckpointConfig().setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint之间的时间间隔
env.getCheckpointConfig( ).setMinPauseBetweenCheckpoints(1000);
//checkpoint必须在一分钟内完成,否则被抛弃
env.getCheckpointConfig( ).setCheckpointTimeout(60000);
//同一时间只允许一个checkpoint
env.getCheckpointConfig( ).setMaxConcurrentCheckpoints(1);

//开启在job中终止后任然保留的externalized checkpoints
env.getCheckpointConfig( ).enableExternalizedCheckpoints (CheckpointConfig .ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION);

env.addSource(new FlinkKafkaConsumer<String> ("test10",new SimpleStringSchema( ),properties))
.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2< String,Long>> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(Tuple2.of(word,1l));
}
}
})
.keyBy(t->t.f0)
.sum(1)
//.map(t->"("+t.f0+":"+t.f1+")")
.map(new MapFunction<Tuple2<String, Long>, String>() {
@Override
public String map(Tuple2<String, Long> value) throws Exception {

StringBuffer bs = new StringBuffer();
bs.append("(").append(value.f0).append(" :").append(value.f1).append(")");
return bs.toString();
}
})
.addSink(new FlinkKafkaProducer<String> ("hadoop162:9092","test11",new SimpleStringSchema()));


try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}

 
   
178 次浏览       50
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划
 
最新文章
大数据平台下的数据治理
如何设计实时数据平台(技术篇)
大数据资产管理总体框架概述
Kafka架构和原理
ELK多种架构及优劣
最新课程
大数据平台搭建与高性能计算
大数据平台架构与应用实战
大数据系统运维
大数据分析与管理
Python及数据分析
更多...   
成功案例
某通信设备企业 Python数据分析与挖掘
某银行 人工智能+Python+大数据
北京 Python及数据分析
神龙汽车 大数据技术平台-Hadoop
中国电信 大数据时代与现代企业的数据化运营实践
更多...