本文已收录在合集Apche Flink原理与实践中.
本文是Flink SQL执行框架源码分析系列的第二篇, 将从整体上介绍与Table API和SQL实现相关的模块, 并解析内部是如何通过组合各个模块实现SQL的解析, 优化与执行的. 最后通过对现有模块的简单扩展来实现一个新的用户接口executeStatements
, 该接口可直接运行用户提供的整个SQL脚本. 需要注意的是, 本文仅从宏观角度介绍Table API和SQL内部的各个模块是如何组合使用的, 具体实现原理会在后续的文章的详细介绍.
Table
Table
是Table API的核心抽象, 它只有一个实现类TableImpl
. Table
仅是对如何获取和转换数据的描述, 并不持有数据. 作为类比, Table
可以理解为SQL中的视图.
TableEnvironment
提供了以下几个接口用于创建Table
对象:
fromValues()
from()
sqlQuery()
fromDataStream()
,fromChangelogStream()
TableResult
是Table.execute()
和TableEnvironment.executeSql()
的返回结果类型. Table.execute()
返回的TableResult
是对JobClient
的封装, 后续可以从中取回执行结果. 而TableEnvironment.executeSql()
返回的TableResult
仅用于标识执行成功, 是一个只有一行一列的表, 其列名为result
, 值为OK
.
TableResult
的实现关系如下图所示, 其核心实现在TableResultImpl
中.
Table API & SQL核心实现模块
TableEnviroment
是创建Table API和SQL程序的核心入口, 它提供了统一的用于处理有界和无界数据的接口. TableEnviroment
主要负责:
- 注册并管理Catalog, Table, View以及Function等元素, 如
registerCatalog()
,from()
,createTemporaryView()
和createFunction()
等接口; - 执行SQL语句, 如
sqlQuery()
和executeSql()
接口.
TableEnviroment
的继承关系如下图所示, 其中核心的实现逻辑在TableEnviromentImpl
中. StreamTableEnviroment
对TableEnviroment
进行了扩展, 添加了Table与DataStream之间的转换接口, 如fromDataStream()
和toDataStream()
等.
在纯Table API和SQL程序中建议使用
TableEnviroment
, 如果需要在Table和DataStream之间转换, 则使用StreamTableEnviroment
.
TableEnviroment
的核心成员如下, 它正是组合使用这些核心成员来将Table API或SQL描述的Flink程序转化为Transformation
并提交给Flink集群运行.
CatalogManager
: 用于管理Catalog
, 并提供了操作Table元数据的接口.FunctionCatalog
: 用于管理UDF, 提供了函数的注册和删除接口. 长期来看它会被废弃, 相关的操作会合并到CatalogManager
中.ModuleManager
: 用于管理Module
, 默认会加载CoreModule
, 在Hive兼容模式下需要加载HiveModule
.ResourceManager
: 用于管理各种用户资源, 如依赖的Jar包, 文件等.TableConfig
: 用于配置当前TableEnviroment
会话.Executor
: 提供了将Transformation
图转化为StreamGraph
, 以及执行StreamGraph
的接口.OperationTreeBuilder
: 用于将Table API中的各种操作转化为Operation
.Planner
: 用于将Operation
翻译为Transformation
图.
CatalogManager
CatalogManager
用于管理Catalog, Table, View等对象. 在Flink SQL中, 存在类似于关系型数据库中从Cataog到Database到Table/View的层级关系, 每个上一级对象都可以包含多个下一级对象. CatalogManager
借助Catalog
管理Table和View.
Catalog
Catalog
用于管理Database, Teble/View, UDF等元数据, 需要注意的是Catalog
仅管理永久对象, 临时的Table/View或UDF分别由CatalogManager
和FunctionCatalog
管理.
Catalog
的继承关系如下图所示:
- 默认情况下使用的是
GenericInMemoryCatalog
, 即将所有元数据信息都保存在内存中. HiveCatalog
支持与HiveMetaStore集成.AbstractJdbcCatalog
的几个实现类并不支持创建对象, 主要是用于Flink CDC集成数据时获取MySQL或PostgreSQL中已有的表.
FunctionCatalog
FunctionCatalog
的实现较为简单, 它直接管理临时UDF, 并借助Catalog
管理永久UDF. 从长远来看, FunctionCatalog
应当是CatalogManager
的一部分.
ModuleManager
ModuleManager
用于管理Mudule
, 目前的Mudule
支持扩展UDF, 未来可能会考虑支持扩展数据类型, 算子, 优化规则等. 默认情况下会加载CoreModule
, Flink也实现了HiveModule
以加载Hive内置的函数.
TableConfig
TableConfig
用于配置当前TableEnviroment
会话, 其配置加载顺序如下:
config.yaml
文件中的配置;- 客户端启动时指定的参数;
StreamExecutionEnvironment
中配置的参数;EnvironmentSettings.Builder.withConfiguration(Configuration)
,TableEnvironment.create(Configuration)
创建的配置;- 通过
set(ConfigOption, Object) / set(String, String)
配置的参数.
Executor
Executor
用于执行Planner
生成的Transformation
图, 可将TableEnviroment
与具体的运行时解耦. DefaultExecutor
的实现逻辑比较简单, 即调用StreamExecutionEnvironment
创建或执行StreamGraph
.
OperationTreeBuilder
Flink在实现中为了统一处理Table API和SQL, 抽象了Operation
接口.
- Table API中的各种操作, 如
project
,aggregate
,join
等都会通过OperationTreeBuilder
转化为QueryOperation
. - SQL语句经过Calcite解析之后会转化为
PlannerQueryOperation
, 其内部持有SQL语句转化来的RelNode
树.
Planner
Planner
是TableEnviroment
中最核心的成员, 它主要有两个用处:
- 作为SQL解析器(通过
getParser()
获取), 将SQL文本解析为Operation
树; - 关系代数优化器, 将
ModifyOperation
优化并转化为Transformation
树.
Planner
的两个实现类为StreamPlanner
和BatchPlanner
, 分别为流批作业执行优化, 在内部会使用不同的优化器.
Table API & SQL执行流程
TableEnvironment/StreamTableEnvironment的创建流程
TableEnvironment
和StreamTableEnvironment
的创建流程基本一致, 分别在TableEnvironmentImpl
和StreamTableEnvironmentImpl
的create()
方法中.
Table API最佳实践
上文介绍了与Table API和SQL执行相关的模块, 为了加深理解, 接下来我们通过一个案例来说明如何使用这些模块进行开发,以扩展现有API的能力.
如果要对Flink进行平台化, SQL脚本的执行是一个十分重要的能力, 但是目前开源Flink中的TableEnvironment
并不支持直接执行SQL脚本. 为了实现执行SQL脚本的能力, 我们当然可以先将SQL脚本按分号分割, 然后调用TableEnvironment.executeSql()
. 不过这种实现方式相对繁琐, 实际上Flink已经有了一个支持SQL脚本的解析器, 我们只需要稍加扩展就能实现SQL脚本的执行, 而无需编写繁杂的SQL解析代码.
完成本文的案例之后, 可以通过如下代码执行一个SQL脚本. 笔者提供的可运行示例见StatementsExample.
1 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
要实现上述功能, 我们只需要围绕Parser
和TableEnvironment
做一些扩展. 具体实现如下:
- 在
Parser
接口中新增一个parseStatements()
方法用于解析SQL脚本, 并在ParserImpl
中实现该方法. 这里需要考虑的一点是,parseStatements()
实际上进行了SQL解析和验证两个操作, 最终需要返回的是经过验证的Operation
集合. 而对于SQL脚本而言, 后面的DML语句依赖于前面的DDL语句, 只有DDL语句对应的Operation
执行完成后, DML语句才能通过验证. 因此parseStatements()
不能直接返回List<Operation>
, 比较合适的做法是返回一个Iterator<Operation>
以实现延迟验证, 将验证的过程添加到Iterator<Operation>
的迭代过程中. - 在
TableEnvironment
中新增一个executeStatements()
方法用于执行SQL脚本, 并在TableEnvironmentImpl
中实现该方法.
为了实现上文所说的延迟验证, 并更好地处理Statement set, 我们添加一个OperationsConverter
来帮助将SqlNode
进行验证并转化为Operation
. 其代码如下.
1 | public class OperationsConverter implements Iterator<Operation> { |
总结
本文从整体上介绍了Table API和SQL内部实现中的几个相关模块, 了解这些模块是进一步了解Flink Table API和SQL实现原理的基础. 这些模块经过精心的设计, 具有强大的扩展性, 基于这些抽象接口进行扩展开发, 可使得Flink支持更多的上下游组件.
本博客所有文章除特别声明外, 均采用CC BY-NC-SA 3.0 CN许可协议. 转载请注明出处!
关注笔者微信公众号获得最新文章推送