Flink最佳实践 - Table与DataStream互相转换

本文已收录在合集Apche Flink原理与实践中.

Flink SQL在很多场景下可以简化实时数据处理管道的开发,然而SQL的表达能力毕竟有限, 一些复杂的处理逻辑还是不得不借助DataStream API实现, 如复杂Lookup Join, 自定义定时器处理等. 然而如果所有处理逻辑都用DataStream API实现, 则又需要编写大量的Java代码, 不仅效率低下, 而且相比于SQL更难维护. 这时候比较好的方法是用SQL进行尽可能多的处理, 然后将结果转换为DataStream借助DataStream API实现复杂的自定义处理逻辑. 这就需要在Flink中实现Table与DataStream的互相转换.

本文首先介绍Table与DataStream互相转换的使用场景, 之后具体介绍转换方法及需要注意的细节问题.

应用场景

文章开头已经说到, 混合使用Table API & SQL和DataStream API既可以借助SQL的表达能力提升开发效率, 又可以借助DataStream API实现SQL难以表达的复杂处理逻辑. 结合笔者的经验, 在以下这些场景中, 都适合混用Table API & SQL和DataStream API.

  • 以SQL为主的作业: 整个实时数据处理管道的大部分处理逻辑都可以用SQL表达, 但是存在部分难以表达的处理逻辑, 比如复杂Lookup Join, 自定义定时器处理等. 这种场景下可以在适当的时候将处理结果转换为DataStream进行操作.
  • 以DataStream为主的作业: 整个处理管道都比较复杂, SQL几乎不能进行处理, 需借助DataStream API实现. 这种场景下可以借助Flink的Table & SQL Connector进行数据的读写, 中间的处理逻辑使用DataStream实现. 因为多数场景下, Table & SQL Connector的完善度是高于DataStream Connector的, 例如开源版本中的HBase Connector并不支持DataStream.

具体案例

入门案例

以下是一个来自Flink文档中的入门案例, 可以看到, Flink提供了十分简单的API用于支持Table和DataStream的转换. 其转换接口在StreamTableEnvironment中. 在该案例中用到了以下两个接口:

  • StreamTableEnvironment.fromDataStream用于将DataStream转为Table, 该接口只能转换Insert-Only的DataStream. 对应的还有StreamTableEnvironment.fromChangelogStream接口, 用于转换Changelog的DataStream.
  • StreamTableEnvironment.toChangelogStream用于将Table转为DataStream, 该接口可转换为Changelog的DataStream. 对应的还有StreamTableEnvironment.toDataStream可转换为Insert-Only的DataStream.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TableToDataStreamExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// create a DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100));

// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");

// register the Table object as a view and query it
// the query contains an aggregation that produces updates
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"SELECT name, SUM(score) FROM InputTable GROUP BY name");

// interpret the updating Table as a changelog DataStream
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);

// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
}
}

仿真场景

从上述入门案例中, 我们可以对Flink中Table与DataStream的转换有一个整体的了解. 为了更深入地介绍Table与DataStream相互转换的应用场景, 笔者设计了一个在现实生活中可能出现的一个场景. 希望借此能够启发读者在更多的场景中应用Table与DataStream的互转, 以更高效的方式实现流处理管道.

在生活中我们都有使用高德等地图软件和美团等生活软件的经历, 现在假设地图软件推出了一个新的功能, 可以根据你当前所处的位置实时推荐距离最近的餐厅. 这个功能在实现上就需要将用户的实时轨迹点跟存储于数据库中的餐厅表做Join. 这个功能显然不能直接用Flink SQL实现, 因为Flink SQL的Lookup Join只支持等值连接, 而这里需要从数据库中查询距离当前点最近的餐厅, 然后将二者关联.

这里笔者默认使用Flink解决该场景, 在实际的应用架构中可能并不适合使用Flink实现.

上述场景在DataStream API中很容易实现, 我们只需利用ProcessFunction, 对于每一条记录, 都去餐厅表中查询到最近的餐厅后关联即可. 为了演示上述场景, 我们首先在MySQL中创建一个restaurant_table表, 并在其中插入3条记录, SQL语句如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE restaurant_table (
id BIGINT auto_increment NOT NULL,
name varchar(100) NOT NULL,
x FLOAT NOT NULL,
y FLOAT NOT NULL,
CONSTRAINT restaurant_table_PK PRIMARY KEY (id)
)
ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci;

INSERT INTO restaurant_table (name, x, y) VALUES
('KFC', 1, 2), ('McDonald', 5, 5), ('Burger King', 8, 8);

有了上述准备后, 我们就可以在Flink中通过以下代码实现任意的Lookup Join, 这段代码主要由以下几部分组成:

  • 第一部分使用SQL定义了源表和结果表, 这里为了方便使用了Datagen作为源表, 使用Print作为结果表, 在真实环境中可替换成对应的Connector.
  • 第二部分从源码中读取数据并进行解析处理, 示例中并没有过多的解析工作, 只是使用SELECT *将数据全部读取出来, 在实践中可以在这里加入所需的解析处理或计算逻辑. 最后将Table转换为了DataStream.
  • 第三部分使用DataStream API进行了Lookup Join, 为每个轨迹点关联了最近的餐厅名称. 主要的实现在JdbcRowLookupFunction中, 这是笔者自定义的一个继承于ProcessFunction的类, 其实现逻辑参考了Flink源码中的JdbcRowDataLookupFunction.
  • 第四部分将关联后的结果重新转换为Table, 并使用INSERT INTO写入结果表中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class LookupJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// PART 1: define source and result tables
tableEnv.executeSql(
"CREATE TABLE traj_point (\n" +
" id BIGINT,\n" +
" `user` STRING,\n" +
" x FLOAT,\n" +
" y FLOAT,\n" +
" `time` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'fields.id.min' = '1',\n" +
" 'fields.id.max' = '100',\n" +
" 'fields.user.length' = '5',\n" +
" 'fields.x.min' = '0',\n" +
" 'fields.x.max' = '10',\n" +
" 'fields.y.min' = '0',\n" +
" 'fields.y.max' = '10'\n" +
")");
tableEnv.executeSql(
"CREATE TABLE traj_point_with_restaurant (\n" +
" id BIGINT,\n" +
" `user` STRING,\n" +
" x FLOAT,\n" +
" y FLOAT,\n" +
" `time` TIMESTAMP(3),\n" +
" restaurant STRING\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
// Part 2: get source table and convert to DataStream
Table table = tableEnv.sqlQuery("SELECT * FROM traj_point");
DataStream<Row> dataStream = tableEnv.toDataStream(table);
// Part 3: do lookup join with DataStream API
JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions
.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://localhost:3306/test")
.withUsername("root")
.withPassword("0407")
.build();
JdbcLookupOptions lookupOptions = new JdbcLookupOptions
.Builder()
.setCacheMaxSize(100)
.setCacheExpireMs(10000)
.setMaxRetryTimes(3)
.setCacheMissingKey(false)
.build();
String lookupSQL =
"SELECT name FROM restaurant_table " +
"ORDER BY SQRT(POW(x - ?, 2) + POW(y - ?, 2)) LIMIT 1";
JoinFunction<Row, Row, Row> joinFunction = Row::join;
RowLookupFunction lookupFunction = new JdbcRowLookupFunction(
connectionOptions,
lookupOptions,
lookupSQL,
new int[] {2, 3},
joinFunction,
true);
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[] {
Types.LONG,
Types.STRING,
Types.FLOAT,
Types.FLOAT,
Types.LOCAL_DATE_TIME,
Types.STRING},
new String[] {"id", "user", "x", "y", "time", "restaurant"});
DataStream<Row> resultDataStream = dataStream.process(lookupFunction).returns(rowTypeInfo);
// PART 4: convert join result to Table
Table resultTable = tableEnv.fromDataStream(resultDataStream);
tableEnv.executeSql("INSERT INTO traj_point_with_restaurant SELECT * FROM " + resultTable);

env.execute();
}
}

从上述案例中可以看到, 借助Table与DataStream的互相转换, 可以很方便地实现复杂的Lookup Join. 其实上述代码中很大一部分都是模板代码, 如果有需要可以更进一步把这个Lookup Join进行封装, 最终做到只需要传入相应的配置参数和SQL即可. 实际上在生产实践中, 我们完全可以把这些常用的功能构建成一个通用的模板, 这样后续根据具体场景传入具体的参数和SQL即可, 而不需要编写任何Java代码.

注意事项

相信通过上述案例读者已经对Table和DataStream互转的方法和使用场景有了比较深入的了解, 本节讲述在互转过程中需要注意的几个细节, 这些细节在生产实践中都需要格外注意.

Insert-Only和Changelog流的区别

上文也说到了, 对于Insert-Only流和Changelog流, Flink提供了两类互转接口:

  • 对于Insert-Only流:
    • StreamTableEnvironment.fromDataStream将DataStream转为Table;
    • StreamTableEnvironment.toDataStream将Table转为DataStream.
  • 对于Changelog流:
    • StreamTableEnvironment.fromChangelogStream将DataStream转为Table;
    • StreamTableEnvironment.toChangelogStream将Table转为DataStream.

在转换过程中需要注意的是, 对于Changelog流, 如果使用了fromDataStream接口和toDataStream接口将会出错, 其错误提示大致如下.

1
2
3
4
5
# 误用fromDataStream接口将Changelog的DataStream转为Table
Error during input conversion. Conversion expects insert-only records but DataStream API record contains: UPDATE_BEFORE

# 误用toChangelogStream接口将Changelog的Table转为DataStream
Table sink '*anonymous_datastream_sink$2*' doesn't support consuming update changes which is produced by node [...]

事实上, Insert-Only流只是Changelog流的一种特例, 为了安全起见, 在进行Table和DataStream转换时, 我们可以总是使用fromChangelogStream接口和toChangelogStream. 在性能上不会有任何影响.

DataStream转Table的Schema确定

Table包含的是结构化的数据, 因此在将DataStream转为Table时需要有一个Schema来指定如何将DataStream中的Java对象对应到Table中的一行记录.

对于入门案例中的代码, 也就是以下代码中的EXAMPLE 1, Flink会自动推断出其Schema类型, 默认的Schema是(f0STRING,f1INT)这样的形式, 其字段名是自动分配的. 可以在转换为Table时通过as方法指定别名. 使用fromCollection接口生成的DataStream同样可以自动类型推断.

然而如果不是由上述两个接口生成的DataStream, Flink的自动类型推断很可能失效, 比如EXAMPLE 2中将DataStream中的记录识别为了RAW类型, 它其实把整个ROW识别为了一个字段, 而无法将ROW中的字段分别识别. 这时候可以通过returns方法为DataStream明确指定结果类型, 例如EXAMPLE 3中通过RowTypeInfo指定了每个字段的类型和名称.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// === EXAMPLE 1 ===
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100));

Table inputTable = tableEnv.fromChangelogStream(dataStream).as("name", "score");
inputTable.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT
// )

// === EXAMPLE 2 ===
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100));

DataStream<Row> processedStream = dataStream.process(new ProcessFunction<Row, Row>() {
@Override
public void processElement(Row row,
Context context,
Collector<Row> collector) {
collector.collect(row);
}
});

Table inputTable = tableEnv.fromChangelogStream(processedStream);
inputTable.printSchema();
// prints:
// (
// `f0` RAW('org.apache.flink.types.Row', '...')
// )

// === EXAMPLE 3 ===
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100));

RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation<?>[] {Types.STRING, Types.INT},
new String[] {"name", "score"});
DataStream<Row> processedStream = dataStream.process(new ProcessFunction<Row, Row>() {
@Override
public void processElement(Row row,
Context context,
Collector<Row> collector) {
collector.collect(row);
}
}).returns(rowTypeInfo);

Table inputTable = tableEnv.fromChangelogStream(processedStream);
inputTable.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT
// )

TypeInformation和DataType的映射关系

在Flink中, DataStream API使用TypeInformation来描述流中传输的记录类型, 而Table API在内部使用RowData表示记录, 用户可以使用DataType声明RowData转换成的外部数据类型. DataType用于表示SQL数据类型, 比TypeInformation更丰富, 因此在进行Table与DataStream互转时需要注意其记录类型的转换关系.

在将Table转换为DataStream时, Flink会保证将Table的DataType转换为合适的TypeInformation类型. 重点是在DataStream转换为Table时, 需要注意其对应的转换关系, 完整的转换关系可参考TypeInfoDataTypeConverter, 其中需要格外注意的几种类型是:

  • Types.LONG转换为BIGINT, 对应Java中的Long类型, 注意不要使用Types.BIG_INT.
  • 时间类型中
    • Types.LOCAL_DATE转换为DATE, 对应Java中的LocalDate.
    • Types.LOCAL_TIME转换为TIME, 对应Java中的LocalTime.
    • Types.LOCAL_DATE_TIME转换为TIMESTAMP, 对应Java中的LocalDateTime.
    • Types.INSTANT转换为TIMESTAMP_LTZ, 对应Java中的Instant.
    • Types.SQL_DATE转换为DATE, 对应Java中的java.sql.Date.
    • Types.SQL_TIME转换为TIME, 对应Java中的java.sql.Time.
    • Types.SQL_TIMESTAMP转换为TIMESTAMP, 对应Java中的java.sql.Timestamp.

Watermark传播

Watermark的传播也是在进行Table与DataStream互转时格外需要注意的一点.

  • 从Table转为DataStream时, Event-time和Watermark都会被保留, 即如果在Table中指定了Watermark, 在DataStream中不再需要指定Watermark即可进行时间类型操作.
  • 从DataStream转为Table时, 在默认情况下Event-time和Watermark都不会进行传播, 必须通过Schema指定Watermark策略, 具体如以下代码所示. 指定Watermark策略的表达式与SQLWATERMARK FOR rowtime_column AS watermark_strategy中的watermark_strategy一致. 如果在DataStream中已经申明了Watermark, 可以使用SOURCE_WATERMARK()直接复用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class WatermarkExample {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql(
"CREATE TABLE source (\n" +
" id BIGINT,\n" +
" price DECIMAL(32,2),\n" +
" buyer STRING,\n" +
" order_time TIMESTAMP(3),\n" +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'fields.id.min' = '1',\n" +
" 'fields.id.max' = '1000',\n" +
" 'fields.price.min' = '0',\n" +
" 'fields.price.max' = '10000',\n" +
" 'fields.buyer.length' = '5'\n" +
")");
Table table = tableEnv.sqlQuery("SELECT * FROM source");
DataStream<Row> dataStream = tableEnv.toDataStream(table);

Table resultTable = tableEnv.fromDataStream(dataStream,
Schema.newBuilder()
.column("id", "BIGINT")
.column("price", "DECIMAL(32,2)")
.column("buyer", "STRING")
.column("order_time", "TIMESTAMP(3)")
.watermark("order_time", "SOURCE_WATERMARK()")
.build());
resultTable.printSchema();
tableEnv.createTemporaryView("result_table", resultTable);

Table table1 = tableEnv.sqlQuery("SELECT window_start, window_end, SUM(price)\n" +
" FROM TABLE(\n" +
" TUMBLE(TABLE result_table, DESCRIPTOR(order_time), INTERVAL '10' SECONDS))\n" +
" GROUP BY window_start, window_end");
table1.execute().print();

env.execute();
}
}

总结

本文通过一个仿真案例介绍了Flink中Table与DataStream互转的使用场景, 虽然随着Flink SQL的不断完善, 我们可以借助SQL快速实现绝大多数的实时处理管道, 然而还是有部分场景难以直接使用SQL实现. 这时候通过SQL构建整个实时处理管道的主体, 在适当位置插入DataStream API代码是当前来看最好的选择. 本文最后总结了几个Table与DataStream互转的几个注意事项, 都是在生产实践中需要格外注意的.

参考

[1] Flink Documentation - DataStream API Integration

Apache Calcite关系代数 论文阅读 - Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores

本博客所有文章除特别声明外, 均采用CC BY-NC-SA 3.0 CN许可协议. 转载请注明出处!



关注笔者微信公众号获得最新文章推送

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×