Flink__Flink1.10.0Concepts__Distributed Runtime Environment

Tasks and Operator Chains

对于分布式计算,Flink 把operator subtasks 运行子任务串连在一起,组合成Tasks。每个线程执行一个Task。 把算子任务串连到一个 Task 中运行是一种非常有用的优化:它减少了线程到线程间切换和缓存的开销,并且提高了整体吞吐量,减少了数据延迟。这种运行子任务的串连操作是可以配置的:更多细节请查看这里

下图中的数据流有5个运行子任务(subtasks),因此有5个并行线程:
Flink__Flink1.10.0Concepts__Distributed Runtime Environment

Job Managers, Task Managers, Clients

Flink 运行时包含2种进程:

  • Job Managers (又称 Masters): Job Managers 负责协调分布式任务的运行。Master调度Tasks,协调Checkpoint执行,协调故障恢复等工作。
    Flink 的执行环境中至少有一个 Job Manager。如果配置了 Flink 的HA (高可用),会有多个 Job Manager,其中一个 Job Manager 始终是 Leader,其他Job Manager 是 Standby (备用)。

  • Task Managers (又称 Workers): Task Managers 负责执行数据流的tasks (更确切的说是: subtasks),并缓存和交换数据流的数据。
    Flink 的执行环境中至少有一个 TaskManager。

Job Managers 和 Task Managers 有多种不同启动方式:直接以 standalone cluster 形式在Linux 机器中启动,或者在 资源管理框架 YARN 或 Mesos 的 容器中(containers) 中启动。Task Manager 与Job Manager 保持连接上报自身状态,并接收Master分配的任务。

Client客户端(Job Client)不是运行时环境和程序执行的一部分,但它是任务执行的起点。Client 负责准备dataflow 任务执行流程 并发送到 JobManager。之后,Client 可以断开连接,或者保持连接用于接收Job执行进度相关的信息。

Flink__Flink1.10.0Concepts__Distributed Runtime Environment

Task Slots and Resources

每个 Worker(TaskManager) 都是一个 JVM 进程,每个 TaskManager会在彼此隔离的线程中执行 一个或多个 subtasks 子任务。为了控制一个 Worker可以执行多少个Tasks 任务,Flink 引入了称为Task Slots 任务槽的概念,每个Worker至少包括一个Task Slot 任务槽。

每个task slot代表TaskManager的一些固定资源。例如: 一个 TaskManager 有3个 Slots,那 TaskManager为每个任务槽分配他自身1/3的资源。分配任务槽意味着subtasks 子任务不会与其他Jobs 作业争抢内存,而是为每个 Slot 预留一定数量的内存。注意:目前 Flink任务槽 Slots 仅隔离分配给TaskManager内存, 不会隔离分配给 TaskManager 的 CPU。

通过调整任务槽的数量,用户可以定义subtasks 子任务的隔离程度。TaskManager 有一个 Slot,表示每个Task group 任务组都在单独的 JVM 进程中运行。TaskManager 有多个 Slot,表示多个subtasks 子任务共享一个 JVM。在同一个 JVM 进程中的 subtasks 子任务 共享 TCP 连接 (通过多路复用技术) 和 心跳消息。多个Slots之间也会共享数据集和数据结构,这样可以减少每个subtasks 子任务的开销。

Flink__Flink1.10.0Concepts__Distributed Runtime Environment

默认情况下,Flink 允许同一个 Job的 subtasks 子任务之间共享 Slot,即使这些subtasks 子任务属于不同的Tasks (这个 Task 可以理解为 Spark 的 Stage) ,只要这些Tasks属于同一个Flink-Job,subtasks就可以共享Slot。这样做的结果是一个 Slot可能负责整个Flink-Job的作业流水线(Pipeline)。Flink 允许 Slot 共享 带来2个好处:

  • Flink 集群所需要的 任务Slot 数与 Flink Job 中使用的并行度一致。不需要再额外计算一个程序要包含多个Tasks。
  • 更好的利用系统资源。没有 Slot Sharing 任务槽共享,非资源密集型的子任务source()/map() 占用的资源 将与资源密集的window() 窗口子任务占用的一样多。在我们的例子中,通过 Slot Sharing 任务槽共享,任务的并行度由2增加到6 可以充分利用 Slot 资源,同时确保重型任务能在 TaskManager 之间公平分配。

Flink__Flink1.10.0Concepts__Distributed Runtime Environment

Flink API 包含一种 resource group 的机制 来阻止不希望发生的 Slot Sharing。
根据经验来看,较合理的共享槽 Slots 数量 应该与 CPU 的核数相一致。通过hyper-threading 超线程技术,每个任务槽将运行2个,或者多个线程。

State Backends

存储key/value 索引的确切数据结构依赖于所选择的 state backend。一种state backend在内存中使用 Hash Map 结构来存储数据,另一种state backend使用 RocksDB 来存储 Key/Value。除了定义数据结构来存储状态值之外,state backend也实现了获取 Key/Value 状态的时间点快照,并将状态值快照做为 Checkpoint 的一部分。
Flink__Flink1.10.0Concepts__Distributed Runtime Environment

Savepoints

Flink 的Data Stream API可以从savepoint中恢复异常。savepoints能够保证在不丢失任何 状态数据 的情况下更新 Flink程序 和 Flink集群。

Savepoints 是手动触发的checkpoints,savepoints 会生成 程序快照 并将快照写入 state backend 中。savepoint 依赖常规的checkpoint机制。在执行执行过程中会定期在 worker 节点上生成 快照 和 检查点。状态恢复只需要最后一次完成的checkpoint,当最新的 checkpoint生成之后,就可以安全的删除之前完成的checkpoint。

savepoint特别像这些定期生成的 checkpoint,区别就是savepoint是用户触发的并且当生成新的checkpoint时,savepoint不会自动过期。可以用command line 命令行来创建 savepoint,或在取消一个 job 时,通过REST API来生成 Savepoint。