Flink的核心概念及运行流程

一、什么是Flink

flink是数据流上的有状态计算,可以用来处理有界和*数据(实时和批次)。
1、应用的场景:
Flink的核心概念及运行流程
• 事件驱动的应用
• 数据管道 & ETL
• 流式、批次数据分析

二、概念及运行流程

1、概念

1.1、flink的组件堆栈

Flink的核心概念及运行流程
Programs and Dataflows
1、DataFlow的基本套路:
Flink的核心概念及运行流程
构建运行环境-》Source(一个或多个)-》转换(算子) -》Sink(一个或多个)
Flink的核心概念及运行流程
多个DataFlow组成DAG。

2、并行化DataFlow(Distributed Streaming DataFlow)
Flink的核心概念及运行流程

  • 一个Stream会有多个Stream Partition

  • 一个Operator会还有多个Operator subtasks.Operator subtasks的数据就是这个Operator的并行度

  • Operator(算子)间数据的传递模式:

    • One-to-one streams保持元素的分区和顺序,如mao()
    • Redistributing streams 改变流的分区
  • 重新分区策略取决于使用的算子

    • KeyBy() (re-partitions by hashing the key)
    • broadcast()
    • rebalance() (which re-partitions randomly)

1.2、Flink的API分层架构

Flink的核心概念及运行流程
越底层API越灵活,越上层的API越轻便
1、Stateful Stream Processing (有状态的流式处理)

位于最底层, 是core API 的底层实现,主要通过ProcessFunction实现。 灵活性高,但开发比较复杂.
ProcessFunction是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块如:event,state,timers

2、CoreAPIs:

• DataStreamAPI:批处理API • DataSetAPI:流处理API

3、Table API&SQL

• SQL 构建在Table 之上,都需要构建Table 环境 • 不同的类型的Table 构建不同的Table 环境 • Table
可以与DataStream或者DataSet进行相互转换 • Streaming SQL不同于存储的SQL,最终会转化为流式执行计划 •
Table API 程序在执行前有优化器基于优化规则对其进行优化

1.3、核心概念

1、Window(Stream特有)
主要是解决聚合(count,sum)等操作的问题,因为流是无边界的,所以无法对一个流做聚合操作(永远无法输出结果)
但是流可以在一定边界内进行聚合操作,而这个边界就由window给出。
Window可以由
• 数据驱动,例如每100个元素. --CountWindow
• 时间驱动,例如每30s
Tumbling Window(翻滚窗口,无重叠)
Sliding Window(滑动窗口,有重叠)
Session Window(类似于Web编程里的Session,以不活动间隙作为分隔)
• 自定义Window
Flink的核心概念及运行流程
2、Time
Flink的核心概念及运行流程

  • Event Time:记录产生的时间
  • Ingestion Time:记录进入source的时间
  • Processing Time:记录被处理的时间

常用:ProcessTime和EventTime
Flink的核心概念及运行流程
3、Stateful Operations(有状态的操作)
state一般指一个具体的task/operator的状态。为了容错。分为:

  • Operator State
    Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。
  • Keyed State
    基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key, 可能都对应一个state。
    Flink的核心概念及运行流程
    Operator State和Keyed State可以以两种形式存在:
  • 托管状态(Flink框架管理的状态:如ValueState, ListState, MapState等)
  • 原始状态(由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知)

4、Checkpoint(检查点即备份,轻量级容错)
Checkponit是指在某一时刻,将所有task的状态(state)做一个快照(snapshot), 然后存储到State Backend.
State Backend (rocksdb + hdfs):rocksdb会在本地文件系统中维护状态,KeyedStateBackend等会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。

  • 保证exactly-once 语义
  • 用于内部失败的恢复(自动),无需人工干预
  • 基本原理:通过往source 注入barrier(特殊的event),barrier作为checkpoint的标志
    Flink的核心概念及运行流程
    5、Savepoint(保存点)
    savepoint可以理解为是一种特殊的checkpoint, savepoint就是指向checkpoint的一个指针,需要手动触发,而且不会过期,不会被覆盖,除非手动删除
  • 流处理过程中的状态历史版本
  • 具有可以replay的功能
  • 外部恢复(应用重启和升级)
  • 两种方式触发
    Cancel with savepoint
    手动主动触发

6、Batch on Streaming(基于流的批处理)
批处理程序的容错功能不使用检查点。通过完全重播流来进行恢复

2、运行流程:

2.1 Client&JobManager&TaskManager

Flink的核心概念及运行流程
当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

  • Client:准备和提交 dataflow 到 JobManager
  • JobManager(master节点):协调分布式计算。它们负责调度任务、协调 checkpoints、协调故障恢复等。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个TaskManager 去执行。
  • TaskManger(slave节点):执行 dataflow 中的 tasks(准确来说是 subtasks ),并且缓存和交换数据 streams
  • 角色间的通信(Akka)
  • 数据的传输(Netty)
    拓展:scheduler调度:
    Flink的核心概念及运行流程

2.2 Task & OperatorChain

Flink 将算子(operator)的 subtask 链接(chain)成 task。每个 task 由一个线程执行。把算子链接成 tasks 能够减少线程间切换和缓冲的开销,在降低延迟的同时提高了整体吞吐量
Flink的核心概念及运行流程
SubTask(=线程):算子的并行度表示该算子在运行时可被拆解为多少个子任务 (subtask)
OperatorChain:没有 shuffle 的多个算子合并在一个 subTask 中,就形成了 Operator Chains
Task:Task 是一个阶段多个功能相同 subTask 的集合
=》task由多个subtask组成(个数由subtask的并行度确定),一个subtask包含一个或多个Operator(多个operator时会形成Operator Chain)

使用OperatorChain的好处:

  • 减少线程切换
  • 减少序列化与反序列化
  • 减少数据在缓冲区的交换
  • 减少延迟并且提高吞吐能力

OperatorChain 组成条件:

  • 没有禁用Chain
  • 上下游算子并行度一致
  • 下游算子的入度为1(也就是说下游节点没有来自其他节点的输入)
  • 上下游算子在同一个slot group(通过slot group先分配到同一个solt,然后才能chain)
  • 下游节点的 chain 策略为 ALWAYS(可以与上下游链接, map、 flatmap、 filter等默认是ALWAYS)
  • 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接, Source默认是HEAD)
  • 上下游算子之间没有数据shuffle (数据分区方式是 forward)

编程改变Operator chain的行为:

  • 可以通过在DataStream的operator后面(如someStream.map(…))调用startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。
  • 调用disableChaining()来指示该operator不参与chaining(不会与前后的operator chain一起)。
  • 通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。
  • 设置Slot group,例如someStream.filter(…).slotSharingGroup(“name” )。
  • 调整并行度

2.3 TaskSlot

Flink的核心概念及运行流程
TaskManager是JVM进程,每个subtask(线程)下可运行一个或者多个operator,即OperatorChain。为了控制 worker 接收 task 的数量,worker 拥有所谓的 task slots(仅做内存管理,没做CPU隔离).
Task Slot共享
默认情况下, Flink 允许subtasks共享slot, 条件是它们都来自同一个Job的不同task的subtask。
slot共享有以下两点好处:

  • Flink 集群需要与 job 中使用的最高并行度一样多的 slots(前提,保持默认SlotSharingGroup)。
  • 更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作keyAggregation/sink 一样多的资源。将示例中的并行度从 2 增加到 6 可以充分利用 slot 的资源,同时确保繁重的 subtask 在 TaskManagers 之间公平地获取资源。
    Flink的核心概念及运行流程

实现Slot共享的两个类:
SlotSharingGroup(soft)
SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot

  • 算子的默认group为default(即默认一个job下的subtask都可以共享一个slot)
  • 同一个group并且并行度相同的sub-tasks 共享同一个slots
  • 为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:
    someStream.filter(…).slotSharingGroup(“group1”);就强制指定了filter的slot共享组为group1。
  • 怎么确定一个算子的SlotSharingGroup什么呢(根据上游算子的group 和自身是否设置group共同确定)
  • 适当设置可以减少每个slot运行的线程数,从而整体上减少机器负载

CoLocationGroup(hard)
• 保证所有的并行度相同的sub-tasks运行在同一个slot
• 主要用于迭代流(训练机器学习模型)

2.4、Flink 中的执行图(Graph)

Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
    Flink的核心概念及运行流程
    这里对一些名词进行简单的解释。
  • StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。
  • StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
  • StreamEdge:表示连接两个StreamNode的边。
  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。
  • JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
  • IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。
  • JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度一样多的 ExecutionVertex。
  • ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。
  • IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
  • IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。
  • ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。
  • Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
  • Task:Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
  • ResultPartition:代表由一个Task的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。
  • ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。
  • InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费了一个或多个的ResultPartition。
  • InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。

那么 Flink 为什么要设计这4张图呢,其目的是什么呢?Spark 中也有多张图,数据依赖图以及物理执行的DAG。其目的都是一样的,就是解耦,每张图各司其职,每张图对应了 Job 不同的阶段,更方便做该阶段的事情。我们给出更完整的 Flink Graph 的层次图。
Flink的核心概念及运行流程
首先我们看到,JobGraph 之上除了 StreamGraph 还有 OptimizedPlan。OptimizedPlan 是由 Batch API 转换而来的。StreamGraph 是由 Stream API 转换而来的。为什么 API 不直接转换成 JobGraph?因为,Batch 和 Stream 的图结构和优化方法有很大的区别,比如 Batch 有很多执行前的预分析用来优化图的执行,而这种优化并不普适于 Stream,所以通过 OptimizedPlan 来做 Batch 的优化会更方便和清晰,也不会影响 Stream。JobGraph 的责任就是统一 Batch 和 Stream 的图,用来描述清楚一个拓扑图的结构,并且做了 chaining 的优化,chaining 是普适于 Batch 和 Stream 的,所以在这一层做掉。ExecutionGraph 的责任是方便调度和各个 tasks 状态的监控和跟踪,所以 ExecutionGraph 是并行化的 JobGraph。而“物理执行图”就是最终分布式在各个机器上运行着的tasks了。所以可以看到,这种解耦方式极大地方便了我们在各个层所做的工作,各个层之间是相互隔离的。

拓展:Flink如何保证数据流的容错:

Flink提供了一种容错机制,可以一致地恢复数据流应用程序的状态。该机制确保即使在出现故障的情况下,程序的状态最终也将恰好一次反映出数据流中的每个记录。
Flink容错机制的实现是通过连续绘制分布式流数据流的快照。并将状态存储在可配置的位置(例如主节点或HDFS)。当程序故障时,Flink将停止分布式流数据流。系统会重启Operator,并将他们重置为最新的成功检查点。输入流将重置为状态快照的点。确保作为重新启动的并行数据流的一部分处理的任何记录都不属于先前的检查点状态。

1、Checkpoint(检查点)

Flink绘制这些快照的机制依赖于异步屏障快照(asynchronous barrier snapshotting, ABS)算法。
Flink的核心概念及运行流程

1.1、Barriers & Snapshot(屏障与快照)

Barriers
由Flink的JobManager周期性产生(周期长度由StreamExecutionEnvironment.enableCheckpointing()方法来指定),并广播给所有Source算子,沿着数据流流动下去。下图示出一条带有屏障的数据流。
可见,第n - 1个屏障之后、第n个屏障之前的所有数据都属于第n个检查点。下游算子如果检测到屏障的存在,就会触发快照动作,不必再关心时间无法静止的问题。下面继续了解快照阶段是如何执行的。
Snapshotting
Flink的核心概念及运行流程
快照算法的步骤如下:

  • a) Source算子接收到JobManager产生的屏障,生成自己状态的快照(其中包含数据源对应的offset/position信息),并将屏障广播给下游所有数据流;
  • b)、c) 下游非Source的算子从它的某个输入数据流接收到屏障后,会阻塞这个输入流,继续接收其他输入流,直到所有输入流的屏障都到达(图中的count-2算子接收的两个屏障就不是同时到达的)。一旦算子收齐了所有屏障,它就会生成自己状态的快照,并继续将屏障广播给下游所有数据流;
  • d) 快照生成后,算子解除对输入流的阻塞,继续进行计算。Sink算子接收到屏障之后会向JobManager确认,所有Sink都确认收到屏障标记着这一周期checkpoint过程结束,快照成功。

可见,如果算子只有一个输入流的话,问题就比较简单,只需要在收到屏障之后立即做快照。但是如果有多个输入流,就必须要等待收到所有屏障才能做快照,以避免将检查点n与检查点n + 1的数据混淆。这个等待的过程就叫做对齐(alignment)。注意算子内部有个输入缓冲区,用来在对齐期间缓存数据。
Flink的核心概念及运行流程
下图是从Flink系统的角度示出整个checkpoint流程里屏障的流动,以及快照数据向状态后端的写入。注意Source记录的offset值以及Sink收到所有屏障后的ack信号。
Flink的核心概念及运行流程

1.2、Exactly Once & At Least Once(恰好一次与最少一次语义)

上面讲到的屏障对齐过程是Flink Exactly-Once语义的基础,因为屏障对齐能够保证多输入流的算子正常处理不同checkpoint区间的数据,避免它们发生交叉,即不会有数据被处理两次。
但是对齐过程需要时间,有一些对延迟特别敏感的应用可能对准确性的要求没有那么高。所以Flink也允许在StreamExecutionEnvironment.enableCheckpointing()方法里指定At-Least-Once语义,会取消屏障对齐,即算子收到第一个输入的屏障之后不会阻塞。这样一来,部分属于检查点n + 1的数据也会包括进检查点n的数据里, 当恢复时,这部分数据就会被重复处理。

1.3 、Asynchronous State Snapshots(异步状态快照)

屏障”和“快照”都讲过了,“异步”呢?这个词实际上指的是快照数据写入的异步性:算子收齐屏障并触发快照之后,不会等待快照数据全部写入状态后端,而是一边后台写入,一边立刻继续处理数据流,并将屏障发送到下游,实现了最小化延迟。
当然,引入异步性之后,checkpoint成功的条件除了所有Sink都报告ack之外,还得加上一条:所有有状态的算子都报告ack,否则JobManager就无法确认异步写入到底完成没有。

2、Recovery(恢复)

在这种机制下的恢复非常简单:失败时,Flink选择最新完成的检查点k。然后,系统重新部署整个分布式数据流,并为每个操作员提供作为检查点k的一部分快照的状态。设置源以从位置S k开始读取流。例如,在Apache Kafka中,这意味着告诉使用者开始从偏移量S k获取。
如果状态是增量快照,则操作员将从最新完整快照的状态开始,然后对该状态应用一系列增量快照更新。