Rpc In Flink

Apache Flink 在 Dispatcher、JobMaster、ResourceManager、TaskExecutor 几大组件之间采用 akka 进行通信。


Dispatcher、JobMaster、ResourceManager、TaskExecutor 这几个组件都同时继承了 RpcEndpoint 抽象类。我们来看一下 RpcEndpoint 的构造函数。

/**
 * Initializes the RPC endpoint.
 *
 * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
 * @param endpointId Unique identifier for this endpoint
 */
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
    this.rpcService = checkNotNull(rpcService, "rpcService");
    this.endpointId = checkNotNull(endpointId, "endpointId");

    this.rpcServer = rpcService.startServer(this);

    this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}

入参有一个 RpcService 实例,rpcService 实例是用来启动或者连接 RpcEndpoint。比如,TaskExecutor 使用 rpcService 实例启动了自己的 TaskExecutor 对应的 Actor,而对 JobMaster 的连接则是调用 rpcService 的 connect 方法获取 JobMaster 对应的 jobMasterGateway,从而可以通过 gateway 远程调用 JobMaster 的方法。

而构造函数中初始化的 RpcServer 对应的就是 Actor 的一个封装了,里面包含了 address、host 等属性。

如何通过 gateway 远程调用 JobMaster 的方法?

我们先看看 gateway 本身是如何生成的。从 connect 方法进去,找到 connectInternal 的末尾:

private <C extends RpcGateway> CompletableFuture<C> connectInternal(
        final String address,
        final Class<C> clazz,
        Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
    checkState(!stopped, "RpcService is stopped");

    ........

    return actorRefFuture.thenCombineAsync(
        handshakeFuture,
        (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
            InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);

            // Rather than using the System ClassLoader directly, we derive the ClassLoader
            // from this class . That works better in cases where Flink runs embedded and all Flink
            // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
            ClassLoader classLoader = getClass().getClassLoader();

            @SuppressWarnings("unchecked")
            C proxy = (C) Proxy.newProxyInstance(
                classLoader,
                new Class<?>[]{clazz},
                invocationHandler);

            return proxy;
        },
        actorSystem.dispatcher());
}

发现这里使用了代理模式,而代理的类是 AkkaInvocationHandler,观察其中的 invokeRpc 方法,它将调用的方法以及参数封装成了 RpcInvocation(即 Message),通过 Akka 自身的机制传递给了 JobMaster 对应的 Actor。而 JobMaster 的 Actor 接收到对应的消息之后,通过反射机制调用了相对应的方法。