
之前总是听到别人说 Flink 本身的反压机制有多么优秀,直到自己真正在应用中碰巧踩到坑,仔细研究后才发现,这个反压机制貌似并不完美。

Network Stack

关于 Apache Flink 本身的网络栈研究,不少人都写过博客叙述:

熟悉网络栈之后,我们知道,在 RemoteChannel 的接收数据部分,提供了 m+n 即 Floating Buffer + Exclusive Buffer 的接收数据空间,由于每个 RemoteInputChannel 都有自己独立的 Exclusive Buffer,所以同一个 Taskmanager 的不同 RemoteInputChannel 在接收数据时不会互相阻塞。

从接收端看,确实没有什么问题,好像确实消费慢的 InputChannel 没有阻塞到消费快的 InputChannel。然而,从发送端看,就不是这样了,我画了一个示意图,为了方便,在示意图中就没有画出接收端的 credit-based 机制:


从图中可以看到,同一个 Task 的不同的 subpartition本身是共享一个 LocalBufferPool 的,并没有采取类似 credit-based 机制,当有数据发送时,直接从 LocalBufferPool 获取MemorySegment封装当前的数据然后发送。那么此时,有没有可能发送慢的 subpartition 抢占了 LocalBufferPool的所有资源,而发送快的 subpartition 由于迟迟获取不到资源而阻塞呢?

答案显然肯定的,我在这里贴出我的 demo 代码,大家可以试一下。我在代码中强制让一个 task 的消费速率变慢,由于 MapTask 的消费速率小于了 SourceTask 的生产速率,而我们的数据均衡机制又是采用 Round-Robin 的方式,所以导致 LocalBufferPool 的所有资源都被应该发送到消费慢的 MapTask 的 subpartition 抢占,而应该发送到消费快的 MapTask 的 subpartition 获取不到资源而被阻塞。

def testRebalance(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment


  val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {

    override def run(ctx: SourceContext[(Int, Int)]): Unit = {
      while (true) {
        0 until 10 foreach {
          // keys '1' and '2' hash to different buckets
          i => ctx.collect((1 + MathUtils.murmurHash(i) % 5, i))

    override def cancel(): Unit = {}

  sourceStream.rebalance.map(new RichMapFunction[(Int, Int), String] {

    var block = false

    override def open(parameters: Configuration): Unit = {
      if (getRuntimeContext.getIndexOfThisSubtask == 0) {
        block = true

    override def map(value: (Int, Int)): String = {
      if (block) {
      s"Subtask Index: ${getRuntimeContext.getIndexOfThisSubtask}"



在上述代码 demo 中,我采取的是 round-robin 的方式来均衡数据,也就是说,数据发送到哪个 subpartition 其实我并不关心,我只关心后续处理不要产生倾斜,反压等问题或现象。针对这种情况,优化方法当然是有的。Credit-Based 机制在发送端加入了一个 backlog 的概念,代表着有多少数据应该发送但是还没有发送,接收端会根据发送端 backlog 的大小来决定是否需要征用 Floating Buffer 的资源。同时,我们也可以利用 backlog 知道每个 subpartition 分别抢占了 LocalBufferPool 的多少资源。在这里,放一些关键代码:

新建一个特征 BacklogBasedSelector,让 RebalancePartitioner 实现这个特征。在 RecordWriter 中判断当前 channelSelector 是否具有 BacklogBasedSelector 的特征,如果具有,检查 targetChannel 是否存在堵塞现象。

if (targetPartition instanceof ResultPartition && channelSelector instanceof BacklogBasedSelector) {
    targetChannel = ((ResultPartition) targetPartition).changIfBlockingChannel(targetChannel);

在 ResultPartition 中加入 changeIfBlockingChannel 函数。

public int changIfBlockingChannel(int subpartitionIndex) {
    int bufferSize = bufferPool.getNumBuffers();
    int backlogSize = subpartitions[subpartitionIndex].getBuffersInBacklog();
    // 根据 bufferPool 的大小和 subpartition 的 backlog 大小来选择是否需要更换 targetChannel

当然,这个方式只能解决接收端的 RemoteInputChannel 地位平等的情况,如果是 keyBy 这样的操作,暂时就没有其他更好的办法了。