Flink SQL源码 - Table API & SQL概览

本文已收录在合集Apche Flink原理与实践中.

本文是Flink SQL执行框架源码分析系列的第二篇, 将从整体上介绍与Table API和SQL实现相关的模块, 并解析内部是如何通过组合各个模块实现SQL的解析, 优化与执行的. 最后通过对现有模块的简单扩展来实现一个新的用户接口executeStatements, 该接口可直接运行用户提供的整个SQL脚本. 需要注意的是, 本文仅从宏观角度介绍Table API和SQL内部的各个模块是如何组合使用的, 具体实现原理会在后续的文章的详细介绍.

Table

Table是Table API的核心抽象, 它只有一个实现类TableImpl. Table仅是对如何获取和转换数据的描述, 并不持有数据. 作为类比, Table可以理解为SQL中的视图.

TableEnvironment提供了以下几个接口用于创建Table对象:

  • fromValues()
  • from()
  • sqlQuery()
  • fromDataStream(), fromChangelogStream()

TableResultTable.execute()TableEnvironment.executeSql()的返回结果类型. Table.execute()返回的TableResult是对JobClient的封装, 后续可以从中取回执行结果. 而TableEnvironment.executeSql()返回的TableResult仅用于标识执行成功, 是一个只有一行一列的表, 其列名为result, 值为OK.

TableResult的实现关系如下图所示, 其核心实现在TableResultImpl中.

Table API & SQL核心实现模块

TableEnviroment是创建Table API和SQL程序的核心入口, 它提供了统一的用于处理有界和无界数据的接口. TableEnviroment主要负责:

  • 注册并管理Catalog, Table, View以及Function等元素, 如registerCatalog(), from(), createTemporaryView()createFunction()等接口;
  • 执行SQL语句, 如sqlQuery()executeSql()接口.

TableEnviroment的继承关系如下图所示, 其中核心的实现逻辑在TableEnviromentImpl中. StreamTableEnviromentTableEnviroment进行了扩展, 添加了Table与DataStream之间的转换接口, 如fromDataStream()toDataStream()等.

在纯Table API和SQL程序中建议使用TableEnviroment, 如果需要在Table和DataStream之间转换, 则使用StreamTableEnviroment.

TableEnviroment的核心成员如下, 它正是组合使用这些核心成员来将Table API或SQL描述的Flink程序转化为Transformation并提交给Flink集群运行.

  • CatalogManager: 用于管理Catalog, 并提供了操作Table元数据的接口.
  • FunctionCatalog: 用于管理UDF, 提供了函数的注册和删除接口. 长期来看它会被废弃, 相关的操作会合并到CatalogManager中.
  • ModuleManager: 用于管理Module, 默认会加载CoreModule, 在Hive兼容模式下需要加载HiveModule.
  • ResourceManager: 用于管理各种用户资源, 如依赖的Jar包, 文件等.
  • TableConfig: 用于配置当前TableEnviroment会话.
  • Executor: 提供了将Transformation图转化为StreamGraph, 以及执行StreamGraph的接口.
  • OperationTreeBuilder: 用于将Table API中的各种操作转化为Operation.
  • Planner: 用于将Operation翻译为Transformation图.

CatalogManager

CatalogManager用于管理Catalog, Table, View等对象. 在Flink SQL中, 存在类似于关系型数据库中从Cataog到Database到Table/View的层级关系, 每个上一级对象都可以包含多个下一级对象. CatalogManager借助Catalog管理Table和View.

Catalog

Catalog用于管理Database, Teble/View, UDF等元数据, 需要注意的是Catalog仅管理永久对象, 临时的Table/View或UDF分别由CatalogManagerFunctionCatalog管理.

Catalog的继承关系如下图所示:

  • 默认情况下使用的是GenericInMemoryCatalog, 即将所有元数据信息都保存在内存中.
  • HiveCatalog支持与HiveMetaStore集成.
  • AbstractJdbcCatalog的几个实现类并不支持创建对象, 主要是用于Flink CDC集成数据时获取MySQL或PostgreSQL中已有的表.

FunctionCatalog

FunctionCatalog的实现较为简单, 它直接管理临时UDF, 并借助Catalog管理永久UDF. 从长远来看, FunctionCatalog应当是CatalogManager的一部分.

ModuleManager

ModuleManager用于管理Mudule, 目前的Mudule支持扩展UDF, 未来可能会考虑支持扩展数据类型, 算子, 优化规则等. 默认情况下会加载CoreModule, Flink也实现了HiveModule以加载Hive内置的函数.

TableConfig

TableConfig用于配置当前TableEnviroment会话, 其配置加载顺序如下:

  1. config.yaml文件中的配置;
  2. 客户端启动时指定的参数;
  3. StreamExecutionEnvironment中配置的参数;
  4. EnvironmentSettings.Builder.withConfiguration(Configuration), TableEnvironment.create(Configuration)创建的配置;
  5. 通过set(ConfigOption, Object) / set(String, String)配置的参数.

Executor

Executor用于执行Planner生成的Transformation图, 可将TableEnviroment与具体的运行时解耦. DefaultExecutor的实现逻辑比较简单, 即调用StreamExecutionEnvironment创建或执行StreamGraph.

OperationTreeBuilder

Flink在实现中为了统一处理Table API和SQL, 抽象了Operation接口.

  • Table API中的各种操作, 如project, aggregate, join等都会通过OperationTreeBuilder转化为QueryOperation.
  • SQL语句经过Calcite解析之后会转化为PlannerQueryOperation, 其内部持有SQL语句转化来的RelNode树.

Planner

PlannerTableEnviroment中最核心的成员, 它主要有两个用处:

  • 作为SQL解析器(通过getParser()获取), 将SQL文本解析为Operation树;
  • 关系代数优化器, 将ModifyOperation优化并转化为Transformation树.

Planner的两个实现类为StreamPlannerBatchPlanner, 分别为流批作业执行优化, 在内部会使用不同的优化器.

Table API & SQL执行流程

TableEnvironment/StreamTableEnvironment的创建流程

TableEnvironmentStreamTableEnvironment的创建流程基本一致, 分别在TableEnvironmentImplStreamTableEnvironmentImplcreate()方法中.

Table API最佳实践

上文介绍了与Table API和SQL执行相关的模块, 为了加深理解, 接下来我们通过一个案例来说明如何使用这些模块进行开发,以扩展现有API的能力.

如果要对Flink进行平台化, SQL脚本的执行是一个十分重要的能力, 但是目前开源Flink中的TableEnvironment并不支持直接执行SQL脚本. 为了实现执行SQL脚本的能力, 我们当然可以先将SQL脚本按分号分割, 然后调用TableEnvironment.executeSql(). 不过这种实现方式相对繁琐, 实际上Flink已经有了一个支持SQL脚本的解析器, 我们只需要稍加扩展就能实现SQL脚本的执行, 而无需编写繁杂的SQL解析代码.

完成本文的案例之后, 可以通过如下代码执行一个SQL脚本. 笔者提供的可运行示例见StatementsExample.

1
2
3
4
5
6
7
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String statements = Utils.readResourceFile(
"my/statements.sql", StatementsExample.class.getClassLoader());

tEnv.executeStatements(statements);

要实现上述功能, 我们只需要围绕ParserTableEnvironment做一些扩展. 具体实现如下:

  • Parser接口中新增一个parseStatements()方法用于解析SQL脚本, 并在ParserImpl中实现该方法. 这里需要考虑的一点是, parseStatements()实际上进行了SQL解析和验证两个操作, 最终需要返回的是经过验证的Operation集合. 而对于SQL脚本而言, 后面的DML语句依赖于前面的DDL语句, 只有DDL语句对应的Operation执行完成后, DML语句才能通过验证. 因此parseStatements()不能直接返回List<Operation>, 比较合适的做法是返回一个Iterator<Operation>以实现延迟验证, 将验证的过程添加到Iterator<Operation>的迭代过程中.
  • TableEnvironment中新增一个executeStatements()方法用于执行SQL脚本, 并在TableEnvironmentImpl中实现该方法.

为了实现上文所说的延迟验证, 并更好地处理Statement set, 我们添加一个OperationsConverter来帮助将SqlNode进行验证并转化为Operation. 其代码如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class OperationsConverter implements Iterator<Operation> {

private final Iterator<SqlNode> sqlNodeIterator;
private final FlinkPlannerImpl planner;
private final CatalogManager catalogManager;

private boolean isInStatementSet;
private List<ModifyOperation> statementSetOperations;

public OperationsConverter(
SqlNodeList sqlNodeList,
FlinkPlannerImpl planner,
CatalogManager catalogManager) {
this.sqlNodeIterator = sqlNodeList.iterator();
this.planner = planner;
this.catalogManager = catalogManager;
}

@Override
public boolean hasNext() {
return sqlNodeIterator.hasNext();
}

@Override
public Operation next() {
Optional<Operation> operation = Optional.empty();
while (!operation.isPresent()) {
operation = convert();
}
return operation.get();
}

private Optional<Operation> convert() {
if (!sqlNodeIterator.hasNext() && isInStatementSet) {
throw new ValidationException(
"'BEGIN STATEMENT SET;' must be used with 'END;', but no 'END;' founded.");
}

SqlNode sqlNode = sqlNodeIterator.next();
Operation operation =
SqlNodeToOperationConversion.convert(planner, catalogManager, sqlNode)
.orElseThrow(
() ->
new TableException(
"Unsupported statement: " + sqlNode.toString()));
if (isInStatementSet
&& !(operation instanceof SinkModifyOperation
|| operation instanceof StatementSetOperation
|| operation instanceof EndStatementSetOperation)) {
throw new ValidationException(
"Only 'INSERT INTO' statement is allowed in statement set.");
}

if (operation instanceof ModifyOperation) {
if (isInStatementSet) {
statementSetOperations.add((ModifyOperation) operation);
return Optional.empty();
} else {
return Optional.of(operation);
}
} else if (operation instanceof BeginStatementSetOperation) {
isInStatementSet = true;
statementSetOperations = new ArrayList<>();
} else if (operation instanceof EndStatementSetOperation) {
if (!isInStatementSet) {
throw new ValidationException(
"'END;' must be used after 'BEGIN STATEMENT SET;', "
+ "but no 'BEGIN STATEMENT SET;' founded.");
}
if (statementSetOperations.isEmpty()) {
throw new ValidationException("NO statements in statement set.");
}

isInStatementSet = false;
StatementSetOperation statementSetOperation =
new StatementSetOperation(statementSetOperations);
return Optional.of(statementSetOperation);
} else {
return Optional.of(operation);
}
return Optional.empty();
}
}

总结

本文从整体上介绍了Table API和SQL内部实现中的几个相关模块, 了解这些模块是进一步了解Flink Table API和SQL实现原理的基础. 这些模块经过精心的设计, 具有强大的扩展性, 基于这些抽象接口进行扩展开发, 可使得Flink支持更多的上下游组件.

Flink SQL源码 - 整体架构及处理流程

本博客所有文章除特别声明外, 均采用CC BY-NC-SA 3.0 CN许可协议. 转载请注明出处!



关注笔者微信公众号获得最新文章推送

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×