上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【Spark源码分析】Spark的RPC通信一-初稿

guduadmin31天前

Spark的RPC通信一-初稿

文章目录

  • Spark的RPC通信一-初稿
    • Spark的RPC顶层设计
      • 核心类`NettyRpcEnv`
      • 核心类`RpcEndpoint`
      • 核心类`RpcEndpointRef`
      • Spark RPC消息的发送与接收实现
        • 核心类`Inbox`
        • 核心类`Dispatcher`
        • 核心类`Outbox`

          Spark的RPC顶层设计

          在RpcEnv中定义了RPC通信框架的启动、停止和关闭等抽象方法,表示RPC的顶层环境。唯一的子类NettyRpcEnv。

          RpcEndpoints 需要向 RpcEnv 注册自己的名称,以便接收信息。然后,RpcEnv 将处理从 RpcEndpointRef 或远程节点发送的信息,并将它们传送到相应的 RpcEndpoints。对于 RpcEnv 捕捉到的未捕获异常,RpcEnv 会使用 RpcCallContext.sendFailure 将异常发回给发送者,如果没有发送者或出现 NotSerializableException,则记录异常。

          RpcEnv 还提供了一些方法来检索给定名称或 uri 的 RpcEndpointRefs。

          在RpcEnvFactory中定义了创建RpcEnv的抽象方法,在NettyRpcEnv和NettyRpcEnvFactory中使用Netty对继承的方式进行了实现。

          在NettRpcEnv中启动终端点方法setEndpoint中,会将RpcEndpoint和RpcEndpointRef相互以键值对的形式存储到ConcurrentHashMap中,最后在RpcEnv的object类中通过反射方式实现创建RpcEnv的实例的静态方法。

          核心类NettyRpcEnv

          NettyRpcEnv的核心成员和核心方法

          • transportConf:TransportConf的实例对象,加载一些关于RPC的配置项
          • dispatcher:Dispatcher的实例对象,消息转发器,将RPC消息路由到要该对此消息处理的RpcEndpoint。
          • streamManager:NettyStreamManager的实例对象,流的管理器,为NettyRpcEnv提供流式服务。
          • transportContext:TransportContext的实例对象
          • clientFactory: 用于构造发送和接收响应的TransportClient
          • fileDownloadFactory: 用于文件下载的独立客户端工厂。这样可以避免使用与主 RPC 上下文相同的 RPC 处理程序,从而将这些客户端引起的事件与主 RPC 流量隔离开来。它还允许对某些属性进行不同的配置,例如每个对等节点的连接数。
          • server:TransportServer,提供高效的底层流媒体服务。
          • ConcurrentHashMap[RpcAddress, Outbox] outboxes:远程地址与Outbox的映射map。
          • startServer(bindAddress: String, port: Int)
            • 创建一个TransportServer
            • 向消息转发器中注册RpcEndpointVerifier,RpcEndpointVerifier的注册名称为endpoint-verifier,用来校验RpcEndpoint是否存在的RpcEndpoint服务
            • send(message: RequestMessage): Unit
              • 发送消息时,将本地消息交于InBox,远程消息交于OutBox
              • ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout)
                • 若请求消息的接收者的地址与当前的NettyRpcEnv的地址相同,将消息交通过dispatcher.postLocalMessage(message, p)方法处理,p中是成功和失败的回调函数。
                • 若请求消息的的接收者的地址与当前的NettyRpcEnv的地址不同时,将消息通过postToOutbox(message.receiver, rpcMessage)方法处理,主要是将消息放入outbox,然后传输到远程地址上。
                • 在方法的最后设定了一个定时器,实现消息请求的超时机制。
                • postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage):将消息传到远程节点上
                  • 如果receiver.client不为空,那么消息将直接通过TransportClient发送到远端节点
                  • 如果receiver.client为空,则获取远端结点地址对应的Outbox,若没有则新建一个
                  • 如果NettyRpcEnv已经停止,移除该Outbox并停止,否则调用Outbox.send()发送消息。

                    核心类RpcEndpoint

                    RpcEndpoint是对能够处理RPC请求,给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint。

                    RPC 的RpcEndpoint,它定义了给定消息时要触发的函数。保证按调用顺序为 onStart、receive 和 onStop。RpcEndpoint的生命周期为constructor -> onStart -> receive* -> onStop。receive 可以并发调用。如果希望接收是线程安全的,则需要请使用 ThreadSafeRpcEndpoint。如果 RpcEndpoint 方法(onError 除外)抛出任何错误,onError 将被调用并说明原因。如果 onError 抛出错误,RpcEnv会将忽略。

                    ThreadSafeRpcEndpoint是继承自RpcEndpoint的特质,需要 RpcEnv 以线程安全方式向其发送消息的特性。主要用于对消息的处理,必须是线程安全的场景。ThreadSafeRpcEndpoint对消息的处理都是串行的,即前一条消息处理完才能接着处理下一条消息。

                    核心类RpcEndpointRef

                    远程 RpcEndpoint 的引用。RpcEndpointRef 是线程安全的。用于消息发送方持有并发送消息。

                    核心成员

                    • maxRetries:最大尝试连接次数。可以通过spark.rpc.numRetries参数指定,默认3次
                    • retryWaitMs:每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait,默认3秒
                    • defaultAskTimeout:RPC ask操作的超时时间。可以通过spark.rpc.askTimeout,默认120秒
                    • address:远程RpcEndpoint引用的地址
                    • name:远程RpcEndpoint引用的名称

                      核心方法

                      • send():发送单向异步信息。只管发送,不管结果。
                      • ask()系列:向远程的RpcEndpoint.receiveAndReply()方法发送消息,并带有超时机制的Future。该类方法只发送一次消息,从不重试。
                      • askSync()系列:向相应的 RpcEndpoint.receiveAndReply 发送消息,并在指定超时内获取结果,如果失败则抛出异常。

                        这是一个阻塞操作,可能会耗费大量时间,因此不要在 RpcEndpoint 的消息循环中调用它。

                        NettyRpcEndpointRef是其唯一的继承类。重写了ask()和send()方法,主要是消息封装成RequestMessage,然后通过nettyEnv的ask和send方法将消息发送出去。

                        客户端发送请求简单示例图

                        1. 若是向本地节点的RpcEndpoint发送消息
                          1. 通过调用NettyRpcEndpointRef的send()和ask()方法向本地节点的RpcEndpoint发送消息。由于是在同一节点,所以直接调用Dispatcher的postLocalMessage()或postOneWayMessage()方法将消息放入EndpointData内部Inbox的messages中。
                          2. InboxMessage放入后Inbox后,Inbox所属的endPointData就会放入receivers一旦receivers中有数据,原本阻塞的MessageLoop就可以取到数据,
                          3. MessageLoop将调用inbox.process()方法消息的处理。对不同的消息类型调用endpoint的不同回调函数,即完成了消息的处理。
                        2. 通过调用NettyRpcEndpointRef的send()和ask()方法向远端节点的RpcEndpoint发送消息。消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outbox的messages中。
                        3. 每个Outbox的drainOutbox()方法通过循环,不断从messages列表中取得OutboxMessage,并通过TransportClient发送,底层依赖Netty。
                        4. TransportClient和远端NettyRpcEnv的TransportServer建立了连接后,请求消息首先经过Netty管道的处理,由TransportChannelHandler将消息分发给TransportRequestHandler,最终会调用NettyRpcHandler或StreamManager处理。如果是RPC消息则会调用NettyRpcHandler.receive()方法,之后与第一步所述一致,调用Dispatcher的postRemoteMessage()或``postOneWayMessage()`方法。
                        5. 如果TransportRequestHandler处理的是RpcRequest,那么server端的TransportRequestHandler处理消息时还会对client端进行响应,依赖Netty将响应消息发送给client端。client端接收到消息时由TransportChannelHandler将消息分发给TransportResponseHandler处理。

                        Spark RPC消息的发送与接收实现

                        OutboxMessage在客户端使用,是对外发送消息的封装。InboxMessage在服务端使用,是对接收消息的封装。

                        InboxMessage是一个scala特质类,所有的RPC消息都继承自InboxMessage。下面是继承自InboxMessage的子类

                        • OneWayMessage:RpcEndpoint处理此类型的消息后不需要向客户端回复信息。
                        • RpcMessage:RpcEndpoint处理完此消息后需要向客户端回复信息。
                        • OnStart:Inbox实例化后,再通知与此Inbox相关联的RpcEndpoint启动。
                        • OnStop:Inbox停止后,通知与此Inbox相关联的RpcEndpoint停止。
                        • RemoteProcessConnected:告诉所有的RpcEndpoint,有远端的进程已经与当前RPC服务建立了连接。
                        • RemoteProcessDisconnected:告诉所有的RpcEndpoint,有远端的进程已经与当前RPC服务断开了连接。
                        • RemoteProcessConnectionError:告诉所有的RpcEndpoint,与远端某个地址之间的连接发生了错误。

                          核心类Inbox

                          Inbox为RpcEndpoint存储了消息即InboxMessage,并线程安全地发送给RpcEndPoint。

                          private[netty] class Inbox(
                              val endpointRef: NettyRpcEndpointRef,
                              val endpoint: RpcEndpoint)
                            extends Logging {
                            //相当于给this起了一个别名为inbox,
                            inbox =>  
                            
                          }
                          

                          重要的属性

                          • messages:所有的消息以消息盒子的方式,通过LinkedList链式存储
                          • enableConcurrent:是否同时允许多线程同时处理消息
                          • numActiveThreads:Inbox中正在处理消息的线程数

                            重要方法

                            • post():将InboxMessage投递到box中,从下面的代码可以看出使用了synchronized保证线程安全,如果该box已经关闭,消息将会丢弃。

                              def post(message: InboxMessage): Unit = inbox.synchronized {
                                if (stopped) {
                                  // 日志进行warning输出
                                  onDrop(message)
                                } else {
                                  messages.add(message)
                                  false
                                }
                              }
                              
                            • process():处理存储在messages中的消息。

                                def process(dispatcher: Dispatcher): Unit = {
                                  var message: InboxMessage = null
                                  // 1.以synchronized进行并发检查,开启并发则取消息,numActiveThreads自增1。
                                  inbox.synchronized {
                                    if (!enableConcurrent && numActiveThreads != 0) {
                                      return
                                    }
                                    message = messages.poll()
                                    if (message != null) {
                                      numActiveThreads += 1
                                    } else {
                                      return
                                    }
                                  }
                                  while (true) {
                                    // 安全回调?处理异常的
                                    safelyCall(endpoint) {
                                      //对不同消息,通过模式匹配进行通过不同的endpoint进行处理
                                      message match {
                                        case RpcMessage(_sender, content, context) =>
                                          try {
                                            endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                                              throw new SparkException(s"Unsupported message $message from ${_sender}")
                                            })
                                          } catch {
                                            case e: Throwable =>
                                              context.sendFailure(e)
                                              // Throw the exception -- this exception will be caught by the safelyCall function.
                                              // The endpoint's onError function will be called.
                                              throw e
                                          }
                                        case OneWayMessage(_sender, content) =>
                                          endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
                                            throw new SparkException(s"Unsupported message $message from ${_sender}")
                                          })
                                        case OnStart =>
                                          endpoint.onStart()
                                          if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
                                            inbox.synchronized {
                                              if (!stopped) {
                                                enableConcurrent = true
                                              }
                                            }
                                          }
                                        case OnStop =>
                                          val activeThreads = inbox.synchronized { inbox.numActiveThreads }
                                          assert(activeThreads == 1,
                                            s"There should be only a single active thread but found $activeThreads threads.")
                                          dispatcher.removeRpcEndpointRef(endpoint)
                                          endpoint.onStop()
                                          assert(isEmpty, "OnStop should be the last message")
                                        case RemoteProcessConnected(remoteAddress) =>
                                          endpoint.onConnected(remoteAddress)
                                        case RemoteProcessDisconnected(remoteAddress) =>
                                          endpoint.onDisconnected(remoteAddress)
                                        case RemoteProcessConnectionError(cause, remoteAddress) =>
                                          endpoint.onNetworkError(cause, remoteAddress)
                                      }
                                    }
                                    inbox.synchronized {
                                      // 调用 `onStop` 后,"enableConcurrent "将被设置为 false,所以需要每次都检查它。
                                      if (!enableConcurrent && numActiveThreads != 1) {
                                        // 此线程退出,降低并发,最终归于一个线程处理剩下的消息
                                        numActiveThreads -= 1
                                        return
                                      }
                                      message = messages.poll()
                                      // 没有消息之后,退出当前循环
                                      if (message == null) {
                                        numActiveThreads -= 1
                                        return
                                      }
                                    }
                                  }
                                }
                              
                            • stop():enableConcurrent赋值为false,保证当前是唯一活跃的线程。并在messages中添加onStop消息。

                              def stop(): Unit = inbox.synchronized {
                                // 优雅关闭,是关闭并发只留一个线程处理消息。确保OnStop为最后一个消息,这样,"RpcEndpoint.onStop "就可以安全地释放资源了。
                                if (!stopped) {
                                  enableConcurrent = false
                                  stopped = true
                                  messages.add(OnStop)
                                }
                              }
                              

                              核心类Dispatcher

                              Dispatcher负责将RPC消息路由到要该对此消息处理的RpcEndpoint。

                              内部类

                              • EndpointData:包装一个Inbox类。一个RpcEndpoint与NettyRpcEndpointRef映射关联在一起。即一个Inbox只为一个映射关系服务。
                              • MessageLoop:用于转发信息的循环任务类,从receivers中获取有消息的inbox进行处理。

                                重要属性

                                • endpoints:储存name和EndpointData的映射关系。EndpointData包含了name,RpcEndpoint, NettyRpcEndpointRef和Inbox,采用ConcureentHashMap保证线程安全
                                • endpointRefs:储存RpcEndpoint和RpcEndpointRef的映射关系。采用ConcureentHashMap保证线程安全
                                • receivers:存储inbox中可能包含message的EndpointData。在MessageLoop中取出并处理消息。使用阻塞队列LinkedBlockingQueue存储。
                                • threadpool:用于调度消息的线程池。根据spark.rpc.netty.dispatcher.numThreads创建固定大小的线程池,启动与线程池大小相同个数的MessageLoop任务。

                                  重要方法

                                  • registerRpcEndpoint():在调度器中注册endpoint。由name和RpcEndpoint构建NettyRpcEndpointRef,并加入到endpoints, endpointRefs, receivers中
                                  • postToAll():将message投递到在注册到该Dispatcher的所有RpcEndpoint。postMessage()将message投递到注册到该Dispatcher指定name的RpcEndpoint中,并将EndpointData放入receivers中,该方法中还传入了失败回调函数
                                  • unregisterRpcEndpoint(), stop():注销所有已注册的RpcEndpoint,从endpoints中移除并在inbox中增加了onstop消息。在receivers中插入哨兵,等待receivers中的所有消息都处理完毕后,关闭线程池。

                                    Dispatcher中的消息处理流程。

                                    1. postToAll()或者postxx()方法会调用postMessage()方法将InboxMessage放到对应endPointData里inbox的messages列表(调用inbox.post())
                                    2. InboxMessage放入后inbox后,inbox所属的endPointData就会放入receivers
                                    3. 一旦receivers中有数据,原本阻塞的MessageLoop就可以取到数据,因为receivers是一个阻塞队列
                                    4. MessageLoop将调用inbox.process()方法消息的处理。利用模式匹配,对不同的消息类型调用endpoint的不同回调函数,即完成了消息的处理。

                                    核心类Outbox

                                    OutboxMessage是一个特质,内部只有未实现的SendWith方法和onFailure方法。OneWayOutboxMessage和RpcOutboxMessage都继承自OutboxMessage特质,实现的SendWith通过调用TransportClient的sendRpc()方法发送信息,其中RpcOutboxMessage还增加了超时和发送成功的回调方法。

                                    Outbox的重要属性

                                    • messages: 保存要发送的OutboxMessage。LinkedList类型,线程不安全
                                    • client: TransportClient
                                    • stopped: 当前Outbox是否停止的标识
                                    • draining: 表示当前Outbox内正有线程在处理messages中消息的状态

                                      重要方法

                                      • send():将要发送的OutboxMessage首先保存到成员变量链表messages中,若Outbox未停止则调用drainOutbox()方法处理messages中的信息。因为messages是LinkedList类型,线程不安全,所以在添加和删除时使用了同步机制。之后调用了私有的drainOutbox()方法发送消息。发送信息。如果没有活动连接,则缓存并启动新连接。如果[[发件箱]]被停止,发送者将收到[[SparkException]]通知。

                                          def send(message: OutboxMessage): Unit = {
                                            val dropped = synchronized {
                                              if (stopped) {
                                                true
                                              } else {
                                                messages.add(message)
                                                false
                                              }
                                            }
                                            if (dropped) {
                                              message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
                                            } else {
                                              drainOutbox()
                                            }
                                          }
                                        
                                      • drainOutbox():先判断是否已停止,client是否空等前置条件。取出一条消息,并将draining置为true,接下来将messages中所有消息调用sendWith()方法发送。耗尽消息队列。如果有其他线程正在排空,则直接退出。如果尚未建立连接,则在 nettyEnv.clientConnectionExecutor 中启动一个任务来建立连接。

                                      • launchConnectTask(): 初始化client

                                      • stop():停止Outbox

                                        • 将Outbox的停止状态stopped置为true
                                        • 关闭TransportClient
                                        • 清空messages中的消息

                                          之所以要使用这种机制来发消息,是保证并发发送消息时,所有消息依次添加到Outbox中,并依次传输,同时不会阻塞send()方法

网友评论

搜索
最新文章
热门文章
热门标签