本文已收录在合集Apche Flink原理与实践中.
Flink作为一个分布式计算系统, 其组件间的通信是由RPC实现的. 为方便使用, Flink抽象了一套RPC框架, 并提供了基于Akka的实现. 本文首先介绍Flink RPC框架的整体设计, 之后介绍其基于Akka的实现. 理解Flink的RPC实现是理解其底层组件之间通信原理的基础.
由于Akka将许可协议从Apache 2.0改为了Business Source License (BSL) v1.1, 为了避免商业风险, FLINK-32468暂时将Akka替换为了Apache Pekko(Fork自Akka 2.6.x), 已经在Flink 1.18中发布, 由于Pekko源自Akka, 因此本文在介绍时不作区分. 远期看社区有计划将Akka替换为gRPC(FLINK-29281).
Flink抽象了一套RPC接口, 基于这套接口可以有不同的底层实现, 例如当前基于Akka的实现, 以及计划中基于gRPC的实现. Flink的RPC实现都在独立的flink-rpc模块下, 如果有需要完全可以引用相关依赖并在其他项目中使用.
使用案例
在进一步介绍Flink RPC接口之前, 我们先通过一个简单的RPC远程调用案例来展示一下Flink RPC接口的使用方法. 完整代码可见此处.
定义服务接口.
1
2
3public interface HelloGateway extends RpcGateway {
String hello();
}定义服务端组件.
1
2
3
4
5
6
7
8
9
10public class HelloRpcEndpoint extends RpcEndpoint implements HelloGateway {
public HelloRpcEndpoint(RpcService rpcService) {
super(rpcService);
}
@Override
public String hello() {
return "Hello";
}
}启动服务端组件.
1
2
3
4
5
6
7
8
9rpcService1 =
PekkoRpcServiceUtils.createRemoteRpcService(
new Configuration(),
"localhost",
"0",
null,
Optional.empty());
HelloRpcEndpoint helloRpcEndpoint = new HelloRpcEndpoint(rpcService1);
helloRpcEndpoint.start();客户端进行远程调用.
1
2
3
4
5
6
7
8
9
10
11
12rpcService2 =
PekkoRpcServiceUtils.createRemoteRpcService(
new Configuration(),
"localhost",
"0",
null,
Optional.empty());
HelloGateway helloGateway =
rpcService2.connect(
helloRpcEndpoint.getAddress(), HelloGateway.class).get();
String result = helloGateway.hello();
System.out.println(result);
从上述案例可以看到, Flink的RPC框架提供了非常高级的抽象, 客户端拿到服务接口后的远程调用就跟本地方法调用一样. 需要说明的是, 本例中的客户端和服务端区分只是为了方便描述, 实际上在Flink中JabManager中的各个组件和TaskManager在通信角色上是对等的, 并没有严格的客户端和服务端区分, 或者说客户端和服务端的角色会发生转变, 谁发送请求谁就是客户端.
接口详解
RpcEndpoint
, RpcService
, RpcGateway
和RpcServer
是Flink RPC框架中的核心抽象接口.
RpcEndpoint
代表一个可提供RPC服务的组件, 它持有其所属的RpcService
引用, 在Flink中其实现类如下. JobManager中的ResourceManager
, Dispatcher
和JobMaster
组件以及TaskManager中的TaskExecutor
都是RpcEndpoint
组件.
RpcService
是RPC服务的底层实现, 一个RpcService
可服务于多个RpcEndpoint
, 例如JobManager中的ResourceManager
, Dispatcher
和JobMaster
就共用一个RpcService
. RpcService
可通过connect()
连接到另一个RpcService
.
RpcGateway
是所有RPC服务的抽象接口, 用户可继承RpcGateway
并扩展更多接口. 通常情况下, RpcEndpoint
会继承RpcGateway
并实现具体的方法. RpcServer
也继承了RpcGateway
, 它可以看做是RpcGateway
的代理, 实际上RpcService.connect()
方法返回的都是RpcServer
, 在调用RpcGateway
的实现方法时, 都由RpcServer
代理实现, 这样即可根据调用方与被调用方的关系选择远程调用或本地调用.
这里读者可能会有一些疑惑, 有了
RpcGateway
为什么还需要RpcServer
? 这是由于RpcGateway
只是一个普通的Java接口, 其方法都是本地实现, 如果RpcService.connect()
返回的是RpcGateway
实例, 那么并不能实现RPC调用.RpcServer
通过动态代理包装了RpcGateway
的方法, 从而实现RPC调用.
上述接口在Flink中使用时的联动关系如下图所示.
RpcSystem
是一个工厂接口, 具体实现由RpcSystemLoader
加载得到. 它创建的RpcServiceBuilder
可启动一个RpcService
;RpcEndpoint
会持有一个RpcService
, 在初始化时会根据传入的RpcService
启动一个RpcServer
用于为当前RpcEndpoint
提供RPC服务;- 通过
RpcService
可连接到本地或远程RpcService
中对应的RpcServer
服务.
基于Akka的RPC实现
Akka简介
Akka是Actor模型的一种实现, Actor模型是一种通用的消息传递编程模型, 在分布式系统构建中被广泛运用. Actor的核心思想是独立维护隔离状态, 并基于消息传递实现异步通信. Actor通常包含以下特征:
- 每个Actor持有一个邮箱(即队列), 用于存储消息;
- 每个Actor可以发送消息至任何Actor;
- 每个Actor可以通过处理消息来更新内部状态, 对于外部而言, Actor的状态是隔离的.
在Akka中, 用户可继承Actor
, 从而实现自定义的消息处理逻辑.
Actor实现
Flink RPC基于Akka的实现中, 扩展了多种类型的Actor
, 其继承关系如下图所示.
其中:
SupervisorActor
用于启动PekkoRpcActor
, 并监控其何时终止.PekkoRpcActor
用于真正处理RPC调用.DeadLettersActor
用于处理DeadLetter
(即发送给已经停止的Actor
的消息).
基于Actor的RPC调用流程
上文已经说到, RpcService.connect()
方法返回的都是RpcServer
, PekkoInvocationHandler
是基于Akka的实现. 对于RpcGateway
的调用都会被代理到PekkoInvocationHandler.invoke()
中, PekkoInvocationHandler
在创建时就持有一个Actor
的引用, 在调用时即可与该Actor
通信, PekkoRpcActor
会通过动态代理调用RpcEnpoint
中的具体实现方法, 并将结果返回.
总结
本文通过一个示例直观的介绍了Flink RPC的使用方法. 之后介绍了几个核心接口的含义, 以及基于Akka的实现. 从整体上看, Flink的RPC框架是比较优雅的实现, 其中使用了多种设计模式, 除了理解Flink的底层通信方式, 也值得我们从中学习框架的设计方法.
参考
[1] FLIP-6: Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.
[2] [FLINK-4346] Implement basic RPC abstraction
[3] Spark与Flink中的RPC实现
[4] 浅谈Actor模型
本博客所有文章除特别声明外, 均采用CC BY-NC-SA 3.0 CN许可协议. 转载请注明出处!
关注笔者微信公众号获得最新文章推送