Flink进阶(五):Flink中的反压问题

有段时间没写flink系列文章了,今天写一写Flink中的反压问题。

何为反压?

在短时间内的负载高峰导致系统接收的数据远大于它能处理数据的速率,如果反压问题不能解决,就会导致系统资源耗尽甚至系统崩溃。那么Flink自身是有反压机制的,它能够自己检测到哪些Operator被阻塞了,然后自适应地降低源头或上游数据的发送速率,从而维持整个系统的稳定。

1.5之前Flink的反压机制

将反压机制之前,先得了解下Flink各个Task之间是如何传递数据的

  • Task A 从外部 Source 端读取到数据后将数据序列化放到 Send Buffer 中,再由 Task A 的 Send Buffer 发送到 Task B 的 Receive Buffer;
  • Task B 的算子从 Task B 的 Receive Buffer 中将数据反序列后进行处理,将处理后数据序列化放到 Task B 的Send Buffer 中,再由 Task B 的 Send Buffer 发送到 Task C 的 Receive Buffer;
  • Task C 再从 Task C 的 Receive Buffer 中将数据反序列后输出到外部 Sink端,这就是所有数据的传输和处理流程。

在Flink1.5之前,反压机制是这样的:

  • Task C 由于各种原因吞吐量急剧降低,那么肯定会造成 Task C 的 Receive Buffer 中堆积大量数据,此时 Task B 还在给 Task C 发送数据,但是毕竟内存是有限的,持续一段时间后 Task C 的 Receive Buffer 满了,此时Task B 发现 Task C 的 Receive Buffer 满了后,就不会再往 Task C 发送数据了
  • Task B 处理完的数据就开始往 Task B 的 Send Buffer 积压,一段时间后 Task B 的 Send Buffer也满了,Task B 的处理就会被阻塞,这时 Task A 还在往 Task B 的 Receive Buffer 发送数据。同样的道理,Task B 的 Receive Buffer 很快满了,导致 Task A 不再往 Task B 发送数据,
  • Task A 的 Send Buffer 也会被用完,Task A 是 Source Task 没有上游,所以 Task A直接降低从外部 Source 端读取数据的速率甚至完全停止读取数据。

所以:Flink就这样将下游的压力传递给上游一直到Source Task,Source Task就会减少拉取数据的速率。

如果下游 Task C 的负载能力恢复后,如何将负载提升的信息反馈给上游呢?
Task B 会一直向 Task C 发送探测信号,检测 Task C 的 Receive Buffer 是否有足够的空间,当 Task C 的负载能力恢复后,Task C 会优先消费 Task C Receive Buffer 中的数据,Task C Receive Buffer 中有足够的空间时,Task B 会从 Send Buffer 继续发送数据到 Task C 的 Receive Buffer,Task B 的 Send Buffer 有足够空间后,Task B 又开始正常处理数据,很快 Task B 的 Receive Buffer 中也会有足够空间,同理,Task A 会从 Send Buffer 继续发送数据到 Task B 的 Receive Buffer,Task A 的 Receive Buffer 有足够空间后,Task A 就可以从外部的 Source 端开始正常读取数据了。

但是有个问题:如果一个TaskManager中有多个Task,只要其中一个Task A触发了反压机制,但是Task B是正常的,此时Task A和Task B对于上游数据的获取是来自不同的TaskManager,两个Task是共享一个网络传输的TCP信道,A的反压会导致这个公用的TCP信道阻塞,会导致Task B也接受不到上游的数据了。
所以存在的问题就是:其中一个Task出现反压时可能会导致其他Task也接收不到数据。为什么是可能,因为只有是跨TaskManager传输数据且下游TaskManager中有多个Task,其中只有一个Task出现反压。

1.5之后Flink的反压机制

首先了解下Flink数据接受发送的模型:

  • Send Buffer
    ResultPartition:生产者生产的数据首先写入到 ResultPartition 中,一个 Operator 实例对应一个ResultPartition。
    ResultSubpartition:一个ResultPartition 是由多个 ResultSubpartition 组成。当 Producer Operator实例生产的数据要发送给下游 Consumer Operator n 个实例时,那么该 Producer Operator 实例对应的ResultPartition 中就包含 n 个 ResultSubpartition。
  • Receiver Buffer
    InputGate:消费者消费的数据来自于 InputGate 中,一个 Operator 实例对应一个InputGate。网络中传输的数据会写入到 Task 的 InputGate。
    InputChannel:一个InputGate 是由多个 InputChannel 组成。当 Consumer Operator 实例读取的数据来自于上游Producer Operator n 个实例时,那么该 Consumer Operator 实例对应的 InputGate 中就包含n 个 InputChannel。
  • 读写数据
    RecordReader:用于将记录从Buffer中读出。
    RecordWriter:用于将记录写入Buffer。
  • 缓存空间
    LocalBufferPool:为 ResultPartition 或 InputGate 分配内存,每一个 ResultPartition 或 InputGate分别对应一个 LocalBufferPool。
    NetworkBufferPool:为 LocalBufferPool 分配内存,NetworkBufferPool 是
    Task之间共享的,每个 TaskManager 只会实例化一个。
    Flink进阶(五):Flink中的反压问题

基于 Credit 的反压机制:

  • 上游 SubTask 给下游 SubTask 发送数据时,会把 Buffer 中要发送的数据和上游 ResultSubPartition堆积的数据量 Backlog size 发给下游,下游接收到上游发来的 Backlog size 后,会向上游反馈现在的 Credit值,Credit 值表示目前下游可以接收上游的 Buffer 量,1 个Buffer 等价于 1 个 Credit。上游接收到下游反馈的Credit 值后,上游下次最多只会发送 Credit 个数据到下游,保障不会有数据积压在 Socket 这一层。
  • 当下游 SubTask 反压比较严重时,可能就会向上游反馈 Channel Credit = 0,此时上游就知道下游目前对应的InputChannel 没有可用空间了,所以就不向下游发送数据了。
  • 此时,上游还会定期向下游发送探测信号,检测下游返回的 Credit 是否大于 0,当下游返回的 Credit 大于 0 表示下游有可用的Buffer 空间,上游就可以开始向下游发送数据了。

通过这种基于 Credit 的反馈策略,就可以保证每次上游发送的数据都是下游 InputChannel 可以承受的数据量,所以在公用的 TCP 这一层就不会产生数据堆积而影响其他 SubTask 通信。

如何通过webUI界面来确定反压的位置

  • 当 Flink 的某个 Task 出现故障导致吞吐量严重下降时,在 Flink 的反压页面,我们会看到该 Task 的反压状态为OK,而该 Task 上游所有 Task 的反压状态为 HIGH。所以,我们根据 Flink 的 BackPressure 页面去定位哪个Task 出故障时,首先要找到反压状态为 HIGH 的最后一个 Task,该 Task 紧跟的下一个 Task 就是我们要找的有故障的Task。
  • 网络瓶颈也会造成反压,上游数据不能迅速发到下游,这种情况比较少见。

利用 Flink Metrics 定位产生反压的位置

  1. 如果上游的operator对应两个sink,而sink端出现了反压问题,此时就不能分辨出来到底是哪个sink端出了反压。此时需要借助Flink
    Metrics提供的参数与协助我们判断
  2. 参数说明
    inputFloatingBuffersUsage:每个 Operator 实例对应一个
    FloatingBuffers,inputFloatingBuffersUsage 表示 Operator 对应的FloatingBuffers 使用率。
    inputExclusiveBuffersUsage: 每个 Operator实例的每个远程输入 通道(RemoteInputChannel)都有自己的一组独占缓冲区(ExclusiveBuffer),inputExclusiveBuffersUsage表示 ExclusiveBuffer 的使用率。
    inPoolUsage:Flink 1.5 - 1.8 中的inPoolUsage表示inputFloatingBuffersUsage。Flink 1.9 及以上版本 inPoolUsage 表示
    inputFloatingBuffersUsage 和 inputExclusiveBuffersUsage 的总和。
  3. 建议观察inPoolUsage参数。

反压的原因和解决

  • 在实践中,很多情况下的反压是由于数据倾斜造成的,这点我们可以通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。
  • 此外,最常见的问题可能是用户代码的执行效率问题(频繁被阻塞或者性能问题)。
  • TaskManager 的内存以及 GC 问题也可能会导致反压。