Flink源码解读系列 | Flink中发送端反压以及Credit机制

点击上方蓝色字体,选择“设为星标”

回复”资源“获取更多惊喜

Flink源码解读系列 | Flink中发送端反压以及Credit机制

Flink源码解读系列 | Flink中发送端反压以及Credit机制

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

Flink源码解读系列 | Flink中发送端反压以及Credit机制

Flink源码解读系列 | Flink中发送端反压以及Credit机制

暴走大数据

点击右侧关注,暴走大数据!

Flink源码解读系列 | Flink中发送端反压以及Credit机制

上一篇《Flink接收端反压机制》说到因为Flink每个Task的接收端和发送端是共享一个bufferPool的,形成了天然的反压机制,当Task接收数据的时候,接收端会根据积压的数据量以及可用的buffer数量(可用的memorySegment数)来决定是否向上游发送Credit(简而言之就是当我还有空间的时候,我向上游也就是上一个Task的发送端发送一个ack消息,表明我还有空间你可以发送数据过来,如果下游没有给你Credit就证明下游已经堵了,没有空间了也就不能继续往下游发送了)

现在从源码来看一下Task的数据发送端,也就是Netty的Server端的实现

先看Task初始化的时候TaskManagerRunner.java中startTaskManager()方法中

Flink源码解读系列 | Flink中发送端反压以及Credit机制

Flink源码解读系列 | Flink中发送端反压以及Credit机制

Flink源码解读系列 | Flink中发送端反压以及Credit机制

这个connectionManager其实分为两种,Netty,local一看就知道netty这种肯定是对应需要通过网络传输,本地模式这里就不讲了。

Flink源码解读系列 | Flink中发送端反压以及Credit机制

这个地方看到Flink的client和server都初始化了,需要注意的是其实这个地方client端只是初始化了一些配置,并没有调用bind()方法启动起来,这里看过上一篇文章的同学就会知道,client只有当第一次需要拉取上游subpatition数据的时候才会启动起来也就是bind(),

而server端在这里也就是task启动的时候就启动起来了,继续看server端如何启动的server.init()方法。

Flink源码解读系列 | Flink中发送端反压以及Credit机制

 init方法中,这里可以看到,这是Flink1.6以前只有基于netty的tcp网络层反压,这里是通过bootstrap的两个参数

ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK  大小为两倍的memorySegment大小

ChannelOption.WRITE_BUFFER_LOW_WATER_MARK   大小为memorySegment + 1

接着

Flink源码解读系列 | Flink中发送端反压以及Credit机制

Flink源码解读系列 | Flink中发送端反压以及Credit机制

1处2处常规的Netty定长编解码器,没有什么好说的

看看3处,4这里先不讲后面会提到

Flink源码解读系列 | Flink中发送端反压以及Credit机制

看到3是一个inboundHandler,反压机制时他的用处是用来接收来自下游响应的Credit,来看他的ChannelRead0方法

Flink源码解读系列 | Flink中发送端反压以及Credit机制

当接收到的消息是一个Credit信任的时候

Flink源码解读系列 | Flink中发送端反压以及Credit机制

先是

Flink源码解读系列 | Flink中发送端反压以及Credit机制

增加了这个reader的可用的Credit可用数

然后

Flink源码解读系列 | Flink中发送端反压以及Credit机制

其实了解了接收端的反压,发送端接收到了下游的credit,那发送数据的时候肯定有一个地方会先判断是否有可用的Credit才决定是否往下发数据

其实就是这个带星号的地方判断,然后下面就是常规的从queue中拉取reader往netty下游writeAndFlash()数据了,没什么好讲的

来看一下他判断Credit是否满足的地方

Flink源码解读系列 | Flink中发送端反压以及Credit机制

可以看到只有当

    有数据且可用的Credit数量大于0

    或者有数据且数据是一个事件而不是record的时候,才返回true往下游发送

 

可以看到这个 enqueueAvailableReader()方法比较重要,里面包含了判断credit以及往后下游发送数据的逻辑

那这个enqueueAvailableReader()方法除了会在接收到下游的Credit的时候触发一次,还有哪会被触发呢

既然是往下游发送数据那我们task处理完数据以后应该也会调用这个方法

于是来看一下Task发送数据,以前的文章讲过,这里就不赘述了,直接看到RecordWriterOutput的emit()

Flink源码解读系列 | Flink中发送端反压以及Credit机制

Flink源码解读系列 | Flink中发送端反压以及Credit机制

会先将record写入到这个Serializer里面去

然后copyFromSerializerToTargetChannel()方法中

Flink源码解读系列 | Flink中发送端反压以及Credit机制

先去localBufferPool中请求buffer,这里就是反压了

请求到buffer了以后

Flink源码解读系列 | Flink中发送端反压以及Credit机制

这个调用链有点长不全列出来了

最后

Flink源码解读系列 | Flink中发送端反压以及Credit机制

这个requestQueue其实是前面Netty初始化时具体逻辑中的4,是一个ChannelInboundHandlerAdapter

这个Inbound一开始我也很疑惑,这个Inbound没有重写他的channelRead()方法,那这个不就直接转发数据了吗,那他的作用是干嘛的呢

继续往下看

Flink源码解读系列 | Flink中发送端反压以及Credit机制

原来发送数据的时候会触发这个inbound的eventTrigger

看下userEventTriggered()具体触发了什么

Flink源码解读系列 | Flink中发送端反压以及Credit机制

这个方法就很眼熟了,就是前面到接收到下游发送过来的Credit时会触发一次的方法,用来判断是否有Credit以及通过netty往下游发送数据

这里在发送数据的时候果然又触发了,后面就是判断是否有Credit满足往下游发送数据的条件,然后往下游发送数据

也就是说:

当接收到下游返回的Credit的时候会触发一次,是否能往下游写数据的判断并拉queue数据写数据。

每次Task处理完数据以后emit,也会触发一次判断并拉queue数据写数据。

Flink源码解读系列 | Flink中发送端反压以及Credit机制

Flink源码解读系列 | Flink中发送端反压以及Credit机制

版权声明:

本文为《暴走大数据》整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶

微信公众号|暴走大数据

欢迎点赞+收藏

欢迎转发至朋友圈

Flink源码解读系列 | Flink中发送端反压以及Credit机制

文章不错?点个【在看】吧! ????