本文已收录在合集Apche Flink原理与实践中.
GeoMesa已经成为时空数据存储领域重要的索引中间件, 京东城市时空数据引擎JUST和阿里云的HBase Ganos均是在GeoMesa的基础上扩展而来. GeoMesa采用键值存储, 支持多种类型的存储后端, 如HBase, Kafka, Redis等. 相对于PostgreSQL+PostGIS这种基于R-tree索引的关系型存储, GeoMesa的存储方案更容易与HBase等现有的分布式数据库相结合, 从而直接利用底层数据库的分布式特性, 更适合时空大数据的存储以及实时场景的应用.
为在时空流计算中利用GeoMesa的高效写入和时空查询能力, Glink扩展Flink SQL Connector框架形成了Flink GeoMesa SQL Connector(简称GeoMesa SQL Connector), 支持使用Flink SQL读写GeoMesa. 本文通过实际的应用案例, 讲述如何在Flink SQL中使用GeoMesa. 在流计算中Flink+GeoMesa主要有以下两种使用场景:
- 时空数据管道 & ETL: 以GeoMesa作为时空数据存储引擎, 通过Flink SQL构建实时的时空数据ETL管道, 将时空数据从文件, Kafka等数据源导入到GeoMesa;
- Lookup Join: 将维表存储在GeoMesa中, 通过Flink SQL进行流表与维表的空间Join, 在Glink中称为Spatial Dimention Join.
本文需要glink-1.0.0及以上版本, 可在Zepplin中运行, 关于Glink及zepplin的安装配置参考Glink文档. 为方便复现笔者提供了可直接运行的glink-geomesa.zpln, 下载后可直接在Zepplin打开运行.
时空数据管道 & ETL
在IoT等行业, 产生的大量时空数据一般会接入到Kafka, 之后经过清洗, 转换, 增强存入时空数据库, 这就需要建设时空数据ETL管道. Flink在实时数据ETL管道建设中已经起到了重要作用. 然而Flink不支持空间数据类型, 同时也缺乏与空间数据库, 如GeoMesa等的Connector. 为此Glink增加了GeoMesa SQL Connector, 支持与GeoMesa进行交互, 方便了时空数据ETL管道的建设.
在Glink中, 所有空间数据类型均用WKT格式的STRING
类型表示, 同时通过Connector参数geomesa.spatial.fields
指定空间类型字段和表示的几何类型. GeoMesa SQL Connector在写入GeoMesa时会将WKT转化为实际的几何对象. 下面通过一个实际的案例讲述如何利用Glink构建时空数据ETL管道.
在该案例中我们将CSV文件中的数据通过Flink SQL导入到GeoMesa中. CSV文件中每行代表一个空间点, 总共包含四列, 每列的含义是: 点ID, 点生成时间, 经度, 纬度. 以下是一个简单的案例.
1 | 1,2008-02-02 13:30:40,116.31412,70.89454 |
这里将CSV文件作为source只是为了方便, source可以是Kafka, MySQL等Flink支持的任意组件.
首先创建Source Table, DDL语句如下.
1 | CREATE TABLE csv_table ( |
接着在使用命令行工具在GeoMesa中创建对应的物理表, 建表命令如下.
1 | geomesa-hbase create-schema -c geomesa_table -s "pid:String,dtg:Date,pt:Point" -f geomesa_table |
然后创建GeoMesa Sink Table.
1 | CREATE TABLE geomesa_table ( |
最后通过如下语句即可实现将数据从CSV文件导入GeoMesa, 完成数据管道的构建.
1 | INSERT INTO geomesa_table |
Lookup Join
在流计算中, 流表与维表的关联是一项重要的基础功能. Flink可通过Lookup Join实现流表与维表的关联. 然而, 目前Flink的Lookup Join只支持等值Join, 对于时空数据而言, 通常需要基于流表与维表中对象的空间关系进行Join. 为此, Glink抽象出了Spatial Dimension Join, 支持基于空间关系的Lookup Join, 目前Glink的Spatial Dimension Join支持距离Join, 相交Join和包含Join.
Spatial Dimension Join具有大量的应用场景, 比如:
- 地理围栏应用, 流表中每条记录表示行人或车辆的轨迹点, 维表存储在GeoMesa中, 每条记录都是一个由多边形表示的地理围栏. 为了判断流表中的轨迹点是否出入了某个地理围栏, 可以将流表与维表做一个包含Join, 若某个轨迹点被包含在某个多边形围栏中, 则这两条记录会执行Join.
- 订单调度应用, 流表中每条记录都表示一个订单, 包含订单送达目的地的经纬度坐标, 维表存储在GeoMesa中, 每条记录都是由经纬度点表示的仓库位置. 为了获取与订单位置在某个距离范围内的仓库, 可以将流表与维表做一个距离Join, 若订单位置与仓库位置在距离范围内, 则这两条记录会执行Join.
在Glink中可以通过geomesa.temporal.join.predict
这一Connector参数指定进行何种类型的空间join:
R:<distance>
表示距离Join, 流表中空间对象与维表中空间对象距离在distance
之内的记录都会被Join,distance
的单位为米;I
表示相交Join, 流表中空间对象与维表中空间对象在空间上相交的记录都会被Join;+C
表示正相交Join, 流表中空间对象若在空间上包含维表中空间对象, 则两条记录会被Join;-C
表示负相交Join, 维表中空间对象若在空间上包含流表中空间对象, 则两条记录会被Join.
下面通过具体的案例讲述如何使用Glink进行Spatial Dimension Join.
相交/包含join
我们通过地理围栏应用讲述如何在Glink中进行相交Join或包含Join. 在地理围栏应用中, 流表中的一条记录通常是行人或车辆的轨迹点, 包含一些非空间属性及轨迹点的经纬度坐标. 维表中的一条记录通常代表一块地理区域, 包含一些非空间属性及地理区域的空间范围(由多边形表示). 在本例中, 流表来自CSV文件, 维表存储在GeoMesa中. 通过相交Join或负相交Join可以实现轨迹点与地理围栏的关联.
首先定义流表, DDL语句如下.
1 | CREATE TABLE csv_point ( |
GeoMesa维表的定义语句如下, geomesa.temporal.join.predict
用于指定空间join的类型, 在地理围栏应用中使用I
和-C
可以达到相同的结果. 但是使用I
有更高的效率.
1 | CREATE TABLE geomesa_area ( |
通过如下语句进行Spatial Dimension Join.
1 | SELECT A.id AS point_id, A.dtg, ST_AsText(ST_Point(A.lng, A.lat)) AS point, B.pid AS area_id |
距离join
我们通过订单调度应用讲述如何在Glink中进行距离join. 在订单调度应用中, 流表中每条记录都表示一个订单, 包含相关的非空间属性及订单送达目的地的经纬度坐标; 维表存储在GeoMesa中, 每条记录都是由经纬度点表示的仓库位置. 在订单调度应用中, 通常需要为每个订单关联某个距离范围内的仓库, 用于订单的分发调度. 这可以通过Glink的距离join实现.
首先定义流表, DDL语句如下.
1 | CREATE TABLE csv_order ( |
然后定义GeoMesa维表.
1 | CREATE TABLE geomesa_warehouse ( |
最后通过如下语句即可实现距离join.
1 | SELECT A.id AS order_id, A.dtg, ST_AsText(ST_Point(A.lng, A.lat)) AS order_point, B.id AS warehouse_id |
本博客所有文章除特别声明外, 均采用CC BY-NC-SA 3.0 CN许可协议. 转载请注明出处!
关注笔者微信公众号获得最新文章推送