Flink源码 - 从Kafka Connector看Source接口重构

本文已收录在合集Apche Flink原理与实践中.

Flink原始的Source接口(SourceFunction)随着Flink在数据集成和流批一体上的不断发展, 暴露出了越来越多的问题. 为了实现更优雅的数据接入, 社区提出了FLIP-27来重构Source接口. 新的Source接口已经在Flink 1.12中得到实现, 该接口将成为Flink数据接入的新标准. 虽然FLIP-27为流式数据的读取抽象了优雅的接口, 但是这些接口的实现和交互逻辑较为复杂, 如果不能准确理解其实现原理, 就很难写出正确的Connector. 本文以Kafka Connector为例, 详细介绍FLIP-27 Source接口的实现原理.

Source接口重构动机

随着Flink的快速发展, 旧的Source接口(SourceFunction)出现了诸多问题(关于重构动机的更详细介绍可参考漫谈Flink Source接口重构):

  • 无法实现流处理和批处理的统一: 原来的SourceFunction针对流处理设计, 难以适配到批处理. 从社区的发展来看, Flink对流批一体是有很大”野心”的, 流批一体的Source接口是重要的推进步骤.
  • 数据源的发现和数据读取耦合在一起: 这一问题使得对一些需要进行动态分区发现的数据源的Connector实现异常复杂, 如Kafka Connector.
  • 分区(Partition/Shard/Split)并没有抽象的接口: 这一问题使得难以实现一些数据源无关的通用特性, 例如Event Time对齐, Watermark对齐等.
  • 需要显示使用Checkpoint Lock: 由于Checkpoint Lock是非公平锁, SourceFunction可能长期持有而导致Checkpoint阻塞. 另外显示加锁的方式也不符合Flink算子Actor/Mailbox风格的线程模型设计.
  • 没有统一的线程模型: 为了避免阻塞主线程, Source通常需要实现IO线程来进行数据读取. 目前由于缺乏统一的线程管理, 提高了实现新的Source的门槛.

从结果来看, FLIP-27以及之后的FLIP-182FLIP-217已经基本解决了上述问题. File, Kafka, Pulsar等常见数据源的Connector也已经基于新的接口进行了重构.

整体执行原理

在介绍具体的接口实现之前, 我们先来看下新的Source实现在Runtime层面的整体运行逻辑.

  • 在JobMaster中, 引入了一个新的组件SourceCoordinator, 其包含的SourceCoordinatorContext 负责通过RPC与Task中的SourceOperator通信. SplitEnumerator是一个抽象接口, 用户可针对不同的数据源做具体的实现, 主要用于发现Split并分配给指定的Reader. SplitEnumeratorSourceCoordinator共享一个SourceCoordinatorContext, 并通过它进行交互.
  • 在TaskManager中, 引入了一个新的SourceOperator, 负责与SourceCoordinator交互, 并在内部调用SourceReader获取数据. SourceReader同样是一个抽象接口, 用户需要根据数据源做具体实现.

SourceCoordinatorContextSourceOperator之间通过OperatorEvent进行交互, 具体来说:

  • SourceOperator主要会向SourceCoordinatorContext发送:
    • RequestSplitEvent来向SourceCoordinator请求需要读取的Split, 在Kafka Connector中, SourceCoordinator总是主动向SourceOperator分配Split.
    • ReaderRegistrationEvent来向SourceCoordinator注册当前Reader.
    • ReportedWatermarkEvent来向SourceCoordinator报告当前Reader的Watermark, 以便SourceCoordinator决定是否需要进行Watermark对齐.
  • SourceCoordinatorContext主要会向SourceOperator发送:
    • AddSplitEvent来向SourceOperator分配Split.
    • WatermarkAlignmentEvent来通知SourceOperator要进行Watermark对齐.
    • NoMoreSplitsEvent来通知SourceOperator没有更多的Split了.

从宏观角度来看, FLIP-27最重要的变动就是将数据源的发现和数据的读取拆分开来, 作为两个独立的组件分别运行在JobMaster和Task中, 彼此之间通过RPC进行通信. 在这个整体框架下, 很多细节问题就变得很好解决了, 比如Watermark的对齐可以通过全局的SourceCoordinator方便地实现.

核心接口

在FLIP-27中, 最核心的两个接口是SplitEnumeratorSourceReader. 其中:

  • SplitEnumerator用于发现Splits并将其分配给SourceReader.
  • SourceReader用于从给定的Splits中读取数据.

除上述两个接口是公共接口外, 其他几个公共接口如下, 在实现Connector时需要频繁使用这些接口:

  • Source: 实际上是一个工厂类, 用于创建SplitEnumeratorSourceReader.
  • SourceSplit: 分区的抽象接口, 用于表示一个Partition/Shard/Split.
  • SplitEnumeratorContext: 用于为SplitEnumerator提供上下文信息, 保存已分配的分区, 并向SourceReader发送事件消息(如分配一个新的分区).
  • SplitAssignment: 用于保存分配给各个子任务的分区.
  • SourceReaderContext: 用于SourceReaderSplitEnumerator之间的通信.
  • SplitReader: 从一个或多个分区中读取数据.
  • SplitFetcherManager: 用于管理SplitFetcher, SplitFetcher调用SplitReader进行数据读取, 运行在SplitFetcherManager的线程池中.
  • SourceOutput: Collector样式的接口, 用于获取SourceReader发出的记录和时间戳.
  • WatermarkOutput: 用于发送Watermark, 并指示Source是否空闲.
  • Watermark: 这是在org.apache.flink.api.common.eventtime包中新建的Watermark类. 这个类最终将替换org.apache.flink.stream.api.Watermark. 这个更改允许flink-core独立于其他模块. 因为最终Watermark的生成将会被放到Source中. (关于为什么要将Watermark的生成放到Source中可参考笔者之前的博文Flink最佳实践 - Watermark原理及实践问题解析)

KafkaSource执行流程

上文介绍了FLIP-27中引入的几个新接口, 接下来本文将以KafkaSource为例, 介绍这些接口的执行和交互流程. 如果不做特殊说明, 本文在描述时将以Kafka Connector的具体实现类为准, 其中某些方法可能实现在父类中, 在阅读源码时需要稍加注意.

Split分配流程

Split分配流程由KafkaSourceEnumerator实现, 它是SplitEnumerator的实现类, 运行在JobMaster中. 整个Split的分配流程如下图所示:

  • 在JobMaster中会启动一个SourceCoordinator, 在它的start()方法中会调用KafkaSourceEnumeratorstart()方法来启动Split分配.
  • KafkaSourceEnumeratorstart()方法会根据用户配置, 选择定时或一次调用getSubscribedTopicPartitions()checkPartitionChanges(). 其中:
    • getSubscribedTopicPartitions()通过Kafka客户端获取指定Topic的所有Partition, 并将结果传递给checkPartitionChanges();
    • checkPartitionChanges()会调用getPartitionChange()来根据已经分配的Partition判断哪些Partition是新增的, 哪些是删除的(实际上删除的Partition在目前的实现中没有做任何处理).
  • 当获取到新增和删除的Partition之后, checkPartitionChanges()会调用initializePartitionSplits()handlePartitionSplitChanges(), 其中:
    • initializePartitionSplits()用于初始化各个Partition的起始和结束Offset. 结果会传递给handlePartitionSplitChanges();
    • handlePartitionSplitChanges()会先调用addPartitionSplitChangeToPendingAssignments()来计算各个Partition所属的SourceReader, 之后调用assignPendingPartitionSplits()来向SourceOperator发送消息分配Partition.
  • 真正分配Partition的消息由SourceCoordinatorContextassignSplits()方法通过发送AddSplitEvent来实现. TaskManager接收到消息后会交由SourceOperator进行处理, 详细的处理过程会在下文分析.

可以看到KafkaSourceEnumerator的Partition分配流程还是比较复杂的, 不过在把握整体流程之后再阅读各个函数的代码, 其实也不难理解.

数据读取流程

KafkaSourceReader接收到来自KafkaSourceEnumerator分配的Partition之后, 就会开始真正进行数据读取了. 数据读取的整体流程还是比较简单的, KafkaSourceReader运行在Task主线程中, 非阻塞地从FutureCompletingBlockingQueue中获取数据, 如果有数据就使用KafkaRecordEmitter向下游发送数据. SplitFetcher是真正的IO线程, 通过KafkaPartitionSplitReader从Kafka读取数据后放入FutureCompletingBlockingQueue.

数据读取流程的核心在于IO线程模型的实现, 可通过SplitFetcherManager进行控制. 多数实现都是以下线程模型中的一种:

  • Sequential Single Split, 即单个IO线程依次顺序读取各个Split, 这种模型一般用在文件, 数据库查询等有界数据场景.

  • Multi-split Multiplexed, 即单个IO线程复用以读取多个Split, 一般用在支持多路复用客户端的组件上, 如Kafka.

  • Multi-split Multi-threaded, 即一个IO线程读取一个Split, 一般用在Amazon Kinesis, 阿里云SLS等产品上.

SplitFetcher生命周期

通过上文的分析我们已经知道, SplitFetcher是真正从数据源读取数据的任务, 它继承自Runnable并运行在SplitFetcherManager的线程池中. SplitFetcher任务的状态在运行过程中会不断发生变化, 笔者将SplitFetcher的生命周期总结为以下四个状态(需要注意的是这几个状态是笔者总结的逻辑上的状态, 并不与Java线程的状态完全对应):

  • Running: 当SplitFetcherManager创建并向线程池提交SplitFetcher任务之后就进入了Runnning状态. Running状态的SplitFetcher可能是在真正地读取数据(对应的JVM线程状态为RUNNABLE, WAITING或TIME_WAITING), 也可能在等待Split的分配(如刚启动时的场景, 对应的JVM线程状态为WAITING).
  • Pause: 为了在多个Split之间进行Watermark对齐, SplitFetcher提供了三个接口:
    • pauseOrResumeSplits(): 会调用SplitReader.pauseOrResumeSplits(), 适用于KafkaConsumer这类提供了暂停或继续读取指定Split的客户端接口.
    • pause(): 通过设置SplitFetcherpaused标志位为true来暂停SplitFetcher任务.
    • resume(): 通过设置SplitFetcherpaused标志位为false, 并调用resumed.signal()来继续执行SplitFetcher任务.
  • Idle: 如果SplitFetcher完成了所有Split的数据读取, 就会进入Idle状态. 对应的JVM线程状态为WAITING.
  • Closed: 进入Idle状态的SplitFetcher将会被关闭而进入Closed状态.

从JVM线程的角度来看, SplitFetcher线程进入WAITING/TIME_WAITING状态的可能原因有两个:

  • 在读取数据时, 即在SplitReader.fetch()中等待, 如KafkaConsumer.poll()会因为暂时获取不到数据而等待.
  • assignedSplitstaskQueue均为空, 等待需要等待任务.

SplitFetcher的生命周期受SourceReader的控制, 其控制逻辑如下图所示. 其中紫色阴影块会调用SplitFetcher对其生命周期进行控制.

上图展示了SourceReader是如何通过SplitFetcherManager来控制SplitFetcher任务的. 接下来我们再来分析下SplitFetcher内部的执行逻辑. 其整体执行流程如下图所示:

  • SplitFetcherrun()方法中会通过一个while循环不断运行runOnce()方法.
  • SourceReaderpollNext()方法中会调用maybeShutdownFinishedFetchers()来判断对应的SplitFetcher是否处于Idle状态, 如果是则调用SplitFetcher.shutdown()来关闭该SplitFetcher(设置closedtrue), 之后runOnce()将进入步骤3.1.
  • 如果各个Split之间的Watermark差距过大, SourceReader会调用SplitReaderpauseOrResumeSplits()pause()来暂停当前任务, 之后runOnce()将进入步骤4.1. 后续会调用pauseOrResumeSplits()resume()来继续任务.
  • 如果当前没有被分配的Split, 也没有需要执行的任务(如添加Split), 那么SplitFetcher将进入WAITING状态, 直到addSplits(), pauseOrResumeSplits()enqueueTask()(在KafkaSource中提交Offset时将调用次方法)被调用.

SpliteFetcher是真正实现数据读取的任务, 理解它的生命周期和执行流程是理解新版Source接口的核心之一.

Watermark对齐流程

新Source接口中一个十分重要的特性就是Watermark对齐, 用于解决Event Time倾斜问题. 在Flink 1.15及之后的版本中可以通过如下方式指定对齐参数.

1
2
3
4
5
6
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withWatermarkAlignment(
"alignment-group-1", // watermarkGroup: Watermark对齐的group名称
Duration.ofSeconds(20), // maxAllowedWatermarkDrift: 最大Watermark偏移
Duration.ofSeconds(1)); // updateInterval: SourceOperator上报Watermark及SourceCoordinator下发Watermark对齐的时间间隔

FLIP-296对SQL层Watermark相关的支持进行了完善, 支持在WITH参数中指定上述参数, 示例代码如下, 这一特性在Flink 1.18中发布.

1
2
3
4
5
6
7
8
9
10
CREATE TABLE user_actions (
...
user_action_time TIMESTAMP(3),
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
'scan.watermark.alignment.group'='alignment-group-1',
'scan.watermark.alignment.max-drift'='1min',
'scan.watermark.alignment.update-interval'='1s',
...
);

Watermark的对齐需要SourceCoordinatorSourceOperator配合进行. 整体的交互流程如下图所示:

  • SourceOperator会定期上报Watermark(时间间隔由updateInterval参数指定), 在实现上Mailbox线程最终会调用emitNext(), 如果SourceOperator不处于READING状态, 就会调用emitNextNotReading(), 它会在初始化时通过ProcessingTimeService启动一个定时任务, 定期调用emitLatestWatermark()SourceCoordinator发送ReportWatermarkEvent事件, 最终SourceCoordinator.handleEventFromOperator()会处理该事件.
  • SourceCoordinator会定期下发Watermark对齐事件(时间间隔由updateInterval参数指定), 在实现上其构造函数中会利用SourceCoordinator启动一个定时任务, 定期调用announceCombinedWatermark()SourceOperator发送WatermarkAlignmentEvent, 最终SourceOperator.handleOperatorEvent()会处理该事件.

上文介绍了Watermark对齐的整体交互流程. 在具体实现上, SourceCoordinator端的实现比较简单, 就是将各个SourceOperator上报的Watermark聚合, 选出最小的Watermark值, 将其加上maxAllowedWatermarkDrift之后定期下发给SourceOperator, 作为允许读取的最大Watermark.

SourceOperator端的Watermark对齐流程相对复杂一些. [FLIP-182](FLIP-182: Support watermark alignment of FLIP-27 Sources - Apache Flink - Apache Software Foundation)提出了SourceOperator级别的Watermark对齐, 可以支持一个SourceOperator仅读取一个Split的场景, 在Flink 1.15及之后的版本中可用. FLIP-217提出了Split级别的Watermark对齐, 支持一个SourceOperator读取多个Split的场景, 在Flink 1.17及之后的版本中可用. 整体的对齐流程如下图所示.

SourceOperator级别的对齐在checkWatermarkAlignment()中实现, 其核心代码如下. 如果当前SourceOperator处于READING状态, 且需要对齐则重置waitingForAlignmentFuture变量, Mailbox主线程会调用getAvailableFuture()来获取SourceOperator的可用性, 如果此时waitingForAlignmentFuture不处于结束状态则代表当前SourceOperator需要阻塞等待Watermark对齐; 如果当前SourceOperator处于WAITING_FOR_ALIGNMENT状态, 且不需要对齐则结束waitingForAlignmentFuture, SourceOperator重新回到运行状态.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>
implements OperatorEventHandler,
PushingAsyncDataInput<OUT>,
TimestampsAndWatermarks.WatermarkUpdateListener {
private CompletableFuture<Void> waitingForAlignmentFuture =
CompletableFuture.completedFuture(null);

private void checkWatermarkAlignment() {
if (operatingMode == OperatingMode.READING) {
checkState(waitingForAlignmentFuture.isDone());
if (shouldWaitForAlignment()) {
operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT;
waitingForAlignmentFuture = new CompletableFuture<>();
}
} else if (operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
checkState(!waitingForAlignmentFuture.isDone());
if (!shouldWaitForAlignment()) {
operatingMode = OperatingMode.READING;
waitingForAlignmentFuture.complete(null);
}
}
}

// 如果lastEmittedWatermark大于currentMaxDesiredWatermark, 则需要等待
private boolean shouldWaitForAlignment() {
return currentMaxDesiredWatermark < lastEmittedWatermark;
}

// 由StreamTask触发调用, 当返回的CompletableFuture阻塞时, Mailbox线程将阻塞
@Override
public CompletableFuture<?> getAvailableFuture() {
switch (operatingMode) {
case WAITING_FOR_ALIGNMENT:
return availabilityHelper.update(waitingForAlignmentFuture);
case OUTPUT_NOT_INITIALIZED:
case READING:
return availabilityHelper.update(sourceReader.isAvailable());
case SOURCE_STOPPED:
case SOURCE_DRAINED:
case DATA_FINISHED:
return AvailabilityProvider.AVAILABLE;
default:
throw new IllegalStateException("Unknown operating mode: " + operatingMode);
}
}
}

Split级别的Watermark对齐在checkSplitWatermarkAlignment()中实现, 其调用链较长, 如上图所示. 最终的实现在KafkaPartitionSplitReader.pauseOrResumeSplits()中, 通过KafkaConsumer提供的pause()resume()两个方法分别用于停止和继续读取相应的Partition. 此外, 对于Multi-split Multi-threaded模式的实现, 由于一个SplitFetcher仅读取一个Split, 在需要对齐时可直接在SplitFetcherManager.pauseOrResumeSplits()通过SplitFetcher.pause()SplitFetcher.resume()将指定的SplitFetcher暂停或唤醒.

Checkpoint和Failover流程

除了Split分配和数据读取流程, 在生产环境中还需要关注的是Checkpoint流程和Failover流程.

KafkaSourceEnumerator的Checkpoint流程比较简单, 当Checkpoint触发时SourceCoordinatorcheckpointCoordinatot()会调用KafkaSourceEnumeratorsnapshotState(), 需要在状态中保存的仅有KafkaSourceEnumeratorSet<TopicPartition> assignedPartitions, 即已经分配的Partititions. 当Checpoint成功或失败时会分别回调notifyCheckpointComplete()notifyCheckpointAborted(), 这两个函数其实都没有做任何实现.

KafkaSourceReader的Checkpoint流程如下. 当Checkpoint触发时, KafkaSourceReadersnapshotState()将被调用用于保存当前Reader已经被分配的Split(SourceReaderBase中的splitStates变量保存了当前Reader已经被分配的所有Split). 当Checkpoint完成时notifyCheckpointComplete()会被调用, 用于向Kafka提交Offset.

KafkaSourceEnumerator的Failover流程如下, 主要分为两部分:

  1. 如果出现Global Failover那么SourceCoordinator将会调用resetToCheckpoint()来重置整个KafkaSourceEnumerator, 其中的assignedPartitions会被初始化为Checkpoint中的状态, 之后按启动流程重新启动即可.
  2. 如果出现Partial Failover那么SourceCoordinatorsubtaskReset()将会调用KafkaSourceEnumeratoraddSplitsBack()来将上次Checkpoint之后还没有分配给Reader的Split分配给对应的Reader.

KafkaReader的Failover流程比较简单, 重启时在SourceOperatoropen()函数中会重新加载当前Reader中已经被分配的所有Split(即原来SourceReaderBase中的splitStates变量保存的内容), 并重新调用addSplits()分配给SplitFetcher.

总结

本文以Kafka Connetcor为例, 详细介绍了FLIP-27中提出的新版Source接口的实现原理. 可以看到, 新版的Source接口在设计上还是比较巧妙的, 主要的亮点是解耦了分区发现和数据读取的流程, 简化了动态分区发现的实现. 另外通过对分区的抽象在流批处理上进行了统一. 不过Source的实现本身比较复杂, 实现一个生产可用的Source并不简单, 需要理解各个接口的执行和交互细节, 本文对其中的细节进行了详细介绍, 相信有助于实现新的Source Connector.

参考

[1] FLIP-27: Refactor Source Interface
[2] FLIP-27: Refactor Source Interface-Apache Mail Archives
[3] FLIP-182: Support watermark alignment of FLIP-27 Sources
[4] FLIP-217: Support watermark alignment of source splits
[5] 漫谈Flink Source接口重构

Apche Calcite查询优化概述 Apache Calcite关系代数

本博客所有文章除特别声明外, 均采用CC BY-NC-SA 3.0 CN许可协议. 转载请注明出处!



关注笔者微信公众号获得最新文章推送

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×