SparkÊÇÒ»¸ö¿ìËٵġ¢Í¨Óõķֲ¼Ê½¼ÆËãϵͳ£¬¶ø·Ö²¼Ê½µÄÌØÐÔ¾ÍÒâζ×Å£¬±ØÈ»´æÔÚ½Úµã¼äµÄͨÐÅ¡£±¾ÎÄÖ÷Òª½éÉܲ»Í¬µÄSpark×é¼þÖ®¼äÊÇÈçºÎͨ¹ýRPC£¨Remote
Procedure Call) ½øÐеã¶ÔµãͨÐŵ쬷ÖΪÈý¸öÕ½ڣº
Spark RPCµÄ¼òµ¥Ê¾ÀýºÍʵ¼ÊÓ¦Óã»
Spark RPCÄ£¿éµÄÉè¼ÆÔÀí£»
Spark RPCºËÐļ¼Êõ×ܽᡣ
Ò»¡¢Spark RPCµÄ¼òµ¥Ê¾ÀýºÍʵ¼ÊÓ¦ÓÃ
SparkµÄRPCÖ÷ÒªÔÚÁ½¸öÄ£¿éÖУº
ÔÚSpark-coreÖУ¬Ö÷Òª³ÐÔØÁ˸üºÃµÄ·â×°serverºÍclientµÄ×÷Óã¬ÒÔ¼°ºÍscalaÓïÑÔµÄÈںϣ¬ËüÒÀÀµÓÚÄ£¿éorg.apache.spark.spark-network-common£»
ÔÚorg.apache.spark.spark-network-commonÖУ¬¸ÃÄ£¿éÊÇjavaÓïÑÔ±àдµÄ£¬×îа汾ÊÇ»ùÓÚnetty4¿ª·¢µÄ£¬Ìṩȫ˫¹¤¡¢¶à·¸´ÓÃI/OÄ£Ð͵ÄSocket
I/OÄÜÁ¦£¬SparkµÄ´«ÊäÐÒé½á¹¹£¨wire protocol£©Ò²ÊÇ×Ô¶¨ÒåµÄ¡£
ΪÁ˸üºÃµÄÁ˽âSpark RPCµÄÄÚ²¿ÊµÏÖϸ½Ú£¬ÎÒ»ùÓÚSpark 2.1°æ±¾³éÀëÁËRPCͨÐŵIJ¿·Ö£¬µ¥¶ÀÆôÁËÒ»¸öÏîÄ¿£¬·Åµ½ÁËgithubÒÔ¼°·¢²¼µ½MavenÖÐÑë²Ö¿â×öѧϰʹÓã¬ÌṩÁ˱ȽϺõÄÉÏÊÖÎĵµ¡¢²ÎÊýÉèÖúÍÐÔÄÜÆÀ¹À¡£ÏÂÃæ¾Íͨ¹ýÕâ¸öÄ£¿é¶ÔSpark
RPCÏÈ×öÒ»¸ö¸ÐÐÔµÄÈÏʶ¡£
ÒÔϵĴúÂë¾ù¿ÉÒÔÔÚkraps-rpcÕÒµ½¡£
1.1 ¼òµ¥Ê¾Àý
¼ÙÉèÎÒÃÇÒª¿ª·¢Ò»¸öHello·þÎñ£¬¿Í»§¶Ë¿ÉÒÔ´«Êästring£¬·þÎñ¶ËÏìÓ¦hi»òÕßbye£¬²¢echo»ØÈ¥ÊäÈëµÄstring¡£
µÚÒ»²½£¬¶¨ÒåÒ»¸öHelloEndpoint¼Ì³Ð×ÔRpcEndpoint±íÃ÷¿ÉÒÔ²¢·¢µÄµ÷Óø÷þÎñ£¬Èç¹û¼Ì³Ð×ÔThreadSafeRpcEndpointÔò±íÃ÷¸ÃEndpoint²»ÔÊÐí²¢·¢¡£
class HelloEndpoint(override
val rpcEnv: RpcEnv) extends RpcEndpoint {
override def onStart(): Unit = {
println("start hello endpoint")
}
override def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
case SayHi(msg) => {
println(s"receive $msg")
context.reply(s"hi, $msg")
}
case SayBye(msg) => {
println(s"receive $msg")
context.reply(s"bye, $msg")
}
}
override def onStop(): Unit = {
println("stop hello endpoint")
}
}
case class SayHi(msg: String)
case class SayBye(msg: String) |
ºÍJava´«Í³µÄRPC½â¾ö·½°¸¶Ô±È£¬¿ÉÒÔ¿´³öÕâÀï²»Óö¨Òå½Ó¿Ú»òÕß·½·¨±êʾ£¨±ÈÈçͨ³£µÄid»òÕßname£©£¬Ê¹ÓÃscalaµÄģʽƥÅä½øÐз½·¨µÄ·ÓÉ¡£ËäÈ»µã¶ÔµãͨÐŵįõÔ¼½»»»ÊÜÖÆÓÚÓïÑÔ£¬ÕâÀï¾ÍÊÇSayHiºÍSayByeÁ½¸öcase
class£¬µ«ÊÇSpark RPC¶¨Î»ÓÚÄÚ²¿×é¼þͨÐÅ£¬ËùÒÔÎÞÉË´óÑÅ¡£
µÚ¶þ²½£¬°Ñ¸Õ¸Õ¿ª·¢ºÃµÄEndpoint½»¸øSpark RPC¹ÜÀíÆäÉúÃüÖÜÆÚ£¬ÓÃÓÚÏìÓ¦ÍⲿÇëÇó¡£RpcEnvServerConfig¿ÉÒÔ¶¨ÒåһЩ²ÎÊý¡¢serverÃû³Æ£¨½ö½öÊÇÒ»¸ö±êʶ£©¡¢bindµØÖ·ºÍ¶Ë¿Ú¡£Í¨¹ýNettyRpcEnvFactoryÕâ¸ö¹¤³§·½·¨£¬Éú³ÉRpcEnv£¬RpcEnvÊÇÕû¸öSpark
RPCµÄºËÐÄËùÔÚ£¬ºóÎÄ»áÏêϸչ¿ª£¬Í¨¹ýsetupEndpoint½«¡±hello-service¡±Õâ¸öÃû×ֺ͵ÚÒ»²½¶¨ÒåµÄEndpoint°ó¶¨£¬ºóÐøclientµ÷Ó÷Óɵ½Õâ¸öEndpoint¾ÍÐèÒª¡±hello-service¡±Õâ¸öÃû×Ö¡£µ÷ÓÃawaitTerminationÀ´×èÈû·þÎñ¶Ë¼àÌýÇëÇó²¢ÇÒ´¦Àí¡£
val config =
RpcEnvServerConfig(new RpcConf(), "hello-server",
"localhost", 52345)
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service",
helloEndpoint)
rpcEnv.awaitTermination() |
µÚÈý²½£¬¿ª·¢Ò»¸öclientµ÷ÓÃ¸Õ¸ÕÆô¶¯µÄserver£¬Ê×ÏÈRpcEnvClientConfigºÍRpcEnv¶¼ÊDZØÐëµÄ£¬È»ºóͨ¹ý¸Õ¸ÕÌáµ½µÄ¡±hello-service¡±Ãû×Öн¨Ò»¸öÔ¶³ÌEndpointµÄÒýÓã¨Ref£©£¬¿ÉÒÔ¿´×öÊÇstub£¬ÓÃÓÚµ÷Óã¬ÕâÀïÊ×ÏÈչʾͨ¹ýÒì²½µÄ·½Ê½À´×öÇëÇó¡£
val rpcConf
= new RpcConf()
val config = RpcEnvClientConfig (rpcConf, " hello-client")
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create (config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef (RpcAddress ("localhost",
52345), "hell-service")
val future: Future[String] = endPointRef.ask [String] (SayHi("neo"))
future.onComplete {
case scala.util.Success(value) = > println(s"Got
the result = $value")
case scala.util.Failure(e) = > println (s"Got
error: $e")
}
Await.result (future, Duration.apply ("30s")) |
Ò²¿ÉÒÔͨ¹ýͬ²½µÄ·½Ê½£¬ÔÚ×îеÄSparkÖÐaskWithRetryʵ¼ÊÒѸüÃûΪaskSync¡£
val result =
endPointRef.askWithRetry [String] (SayBye ("neo")) |
Õâ¾ÍÊÇSpark RPCµÄͨÐŹý³Ì£¬Ê¹ÓÃÆðÀ´Ò×ÓÃÐÔ¿ÉÏë¶øÖª£¬·Ç³£¼òµ¥£¬RPC¿ò¼ÜÆÁ±ÎÁËSocket
I/OÄ£ÐÍ¡¢Ïß³ÌÄ£ÐÍ¡¢ÐòÁл¯/·´ÐòÁл¯¹ý³Ì¡¢Ê¹ÓÃnetty×öÁ˰üʶ±ð£¬³¤Á¬½Ó£¬ÍøÂçÖØÁ¬ÖØÊԵȻúÖÆ¡£
1.2 ʵ¼ÊÓ¦ÓÃ
ÔÚSparkÄÚ²¿£¬ºÜ¶àµÄEndpointÒÔ¼°EndpointRefÓë֮ͨÐŶ¼ÊÇͨ¹ýÕâÖÖÐÎʽµÄ£¬¾ÙÀýÀ´Ëµ±ÈÈçdriverºÍexecutorÖ®¼äµÄ½»»¥Óõ½ÁËÐÄÌø»úÖÆ£¬Ê¹ÓÃHeartbeatReceiverÀ´ÊµÏÖ£¬ÕâÒ²ÊÇÒ»¸öEndpoint£¬ËüµÄ×¢²áÔÚSparkContext³õʼ»¯µÄʱºò×öµÄ£¬´úÂëÈçÏ£º
_heartbeatReceiver
= env.rpcEnv.setupEndpoint (HeartbeatReceiver.ENDPOINT_NAME,
new HeartbeatReceiver(this)) |
¶øËüµÄµ÷ÓÃÔÚExecutorÄڵķ½Ê½ÈçÏ£º
val message
= Heartbeat( executorId, accumUpdates.toArray,
env.blockManager.blockManagerId)
val response = heartbeatReceiverRef.askWithRetry [HeartbeatResponse] (message,
RpcTimeou t(conf, "spark.executor.heartbeatInterval",
"10s")) |
¶þ¡¢Spark RPCÄ£¿éµÄÉè¼ÆÔÀí
Ê×ÏÈ˵Ã÷Ï£¬×ÔSpark 2.0ºóÒѾ°ÑAkkaÕâ¸öRPC¿ò¼Ü°þÀë³öÈ¥ÁË£¨Ïêϸ¼ûSPARK-5293£©£¬ÔÒòºÜ¼òµ¥£¬ÒòΪºÜ¶àÓû§»áʹÓÃAkka×öÏûÏ¢´«µÝ£¬ÄÇô¾Í»áºÍSparkÄÚǶµÄ°æ±¾²úÉú³åÍ»£¬¶øSparkÒ²½ö½öÓÃÁËAkka×öRPC£¬ËùÒÔ2.0Ö®ºó£¬»ùÓڵײãµÄorg.apache.spark.spark-network-commonÄ£¿éʵÏÖÁËÒ»¸öÀàËÆAkka
ActorÏûÏ¢´«µÝģʽµÄscalaÄ£¿é£¬·â×°ÔÚÁËcoreÀïÃæ£¬kraps-rpcÒ²¾ÍÊǰÑÕâ¸ö²¿·Ö´ÓcoreÀïÃæ°þÀë³öÀ´¶ÀÁ¢ÁËÒ»¸öÏîÄ¿¡£
ËäÈ»°þÀëÁËAkka£¬µ«ÊÇ»¹ÊÇÑØÏ®ÁËActorģʽÖеÄһЩ¸ÅÄÔÚÏÖÔÚµÄSpark RPCÖÐÓÐÈçÏÂÓ³Éä¹ØÏµ¡£
RpcEndpoint
=> Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem |
µ×²ãͨÐÅÈ«²¿Ê¹ÓÃnetty½øÐÐÁËÌæ»»£¬Ê¹ÓõÄÊÇorg.apache.spark.spark-network-commonÕâ¸öÄÚ²¿lib¡£
2.1 Ààͼ·ÖÎö
ÕâÀïÏÈÉÏÒ»¸öUMLͼչʾÁËSpark RPCÄ£¿éÄÚµÄÀà¹ØÏµ£¬°×É«µÄÊÇSpark-coreÖеÄscalaÀ࣬»ÆÉ«µÄÊÇorg.apache.spark.spark-network-commonÖеÄjavaÀà¡£

²»Òª±»ÕâÕÅͼËùÏŵ¹£¬¾¹ýÏÂÃæµÄ½âÊÍ·ÖÎö£¬ÏàÐŶÁÕß¿ÉÒÔÁì»áÆäÄÚº£¬²»ÓÃϸ¾¿ÆäÉè¼ÆµÄºÏÀí¶È£¬SparkÊÇÒ»¸ö·¢Õ¹ºÜ¿ì¡¢²»¶ÏÑݽøµÄÏîÄ¿£¬´úÂë²»ÊÇÒ»³É²»±äµÄ£¬³ÖÐø±ä»¯ÊÇÒ»¶¨µÄ¡£
RpcEndpointºÍRpcCallContext
ÏÈ¿´×î×ó²àµÄRpcEndpoint£¬RpcEndpointÊÇÒ»¸ö¿ÉÒÔÏìÓ¦ÇëÇóµÄ·þÎñ£¬ºÍAkkaÖеÄActorÀàËÆ£¬´ÓËüµÄÌṩµÄ·½·¨Ç©Ãû£¨ÈçÏ£©¿ÉÒÔ¿´³ö£¬receive·½·¨Êǵ¥Ïò·½Ê½µÄ£¬¿ÉÒÔ±È×÷UDP£¬¶øreceiveAndReplyÊÇÓ¦´ð·½Ê½µÄ£¬¿ÉÒÔ±È×÷TCP¡£ËüµÄ×ÓÀàʵÏÖ¿ÉÒÔÑ¡ÔñÐԵĸ²¸ÇÕâÁ½¸öº¯Êý£¬ÎÒÃǵÚÒ»ÕÂʵÏÖµÄHelloEndpointÒÔ¼°SparkÖеÄHeartbeatReceiver¶¼ÊÇËüµÄ×ÓÀà¡£
def receive:
PartialFunction[Any, Unit] = {
case _ => throw new RpcException(self + "
does not implement 'receive'")
}
def receiveAndReply(context: RpcCallContext):
PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new RpcException(self
+ " won't reply anything"))
} |
ÆäÖÐRpcCallContextÊÇÓÃÓÚ·ÖÀëºËÐÄÒµÎñÂß¼ºÍµ×²ã´«ÊäµÄÇŽӷ½·¨£¬ÕâÒ²¿ÉÒÔ¿´³öSpark
RPC¶àÓÃ×éºÏ£¬¾ÛºÏÒÔ¼°»Øµ÷callbackµÄÉè¼ÆÄ£Ê½À´×öOO³éÏó£¬ÕâÑù¿ÉÒÔ°þÀëÒµÎñÂß¼->RPC·â×°£¨Spark-coreÄ£¿éÄÚ£©->µ×²ãͨÐÅ£¨spark-network-common£©ÈýÕß¡£RpcCallContext¿ÉÒÔÓÃÓڻظ´Õý³£µÄÏìÓ¦ÒÔ¼°´íÎóÒì³££¬ÀýÈ磺
dreply(response:
Any) // »Ø¸´Ò»¸ömessage£¬¿ÉÒÔÊÇÒ»¸öcase class¡£
sendFailure(e: Throwable) // »Ø¸´Ò»¸öÒì³££¬¿ÉÒÔÊÇExceptionµÄ×ÓÀ࣬ÓÉÓÚSpark
RPCĬÈϲÉÓÃJavaÐòÁл¯·½Ê½£¬ËùÒÔÒì³£¿ÉÒÔÍêÕûµÄÔÚ¿Í»§¶Ë»¹Ô²¢ÇÒ×÷Ϊcause re-throw³öÈ¥¡£ |
RpcCallContextÒ²·ÖΪÁËÁ½¸ö×ÓÀ࣬·Ö±ðÊÇLocalNettyRpcCallContextºÍRemoteNettyRpcCallContext£¬Õâ¸öÖ÷ÒªÊÇ¿ò¼ÜÄÚ²¿Ê¹Óã¬Èç¹ûÊDZ¾µØ¾Í×ßLocalNettyRpcCallContextÖ±½Óµ÷ÓÃEndpoint¼´¿É£¬·ñÔò¾Í×ßRemoteNettyRpcCallContextÐèҪͨ¹ýRPCºÍÔ¶³Ì½»»¥£¬ÕâµãÒ²ÌåÏÖÁËRPCµÄºËÐĸÅÄ¾ÍÊÇÈçºÎÖ´ÐÐÁíÍâÒ»¸öµØÖ·¿Õ¼äÉϵĺ¯Êý¡¢·½·¨£¬¾Í·Â·ðÔÚ±¾µØµ÷ÓÃÒ»Ñù¡£
ÁíÍ⣬RpcEndpoint»¹ÌṩÁËһϵÁлص÷º¯Êý¸²¸Ç¡£
onError
onConnected
onDisconnected
onNetworkError
onStart
onStop
stop
ÁíÍâÐèҪעÒâÏ£¬ËüµÄÒ»¸ö×ÓÀàÊÇThreadSafeRpcEndpoint£¬ºÜ¶àSparkÖеÄEndpoint¼Ì³ÐÁËÕâ¸öÀ࣬Spark
RPC¿ò¼Ü¶ÔÕâÖÖEndpoint²»×ö²¢·¢´¦Àí£¬Ò²¾ÍÊÇͬһʱ¼äÖ»ÔÊÐíÒ»¸öÏß³ÌÔÚ×öµ÷Óá£
»¹ÓÐÒ»¸öĬÈϵÄRpcEndpoint½Ð×öRpcEndpointVerifier£¬Ã¿Ò»¸öRpcEnv³õʼ»¯µÄʱºò¶¼»á×¢²áÉÏÕâ¸öEndpoint£¬ÒòΪ¿Í»§¶ËµÄµ÷ÓÃÿ´Î¶¼ÐèÒªÏÈѯÎÊ·þÎñ¶ËÊÇ·ñ´æÔÚijһ¸öEndpoint¡£
RpcEndpointRef
RpcEndpointRefÀàËÆÓÚAkkaÖÐActorRef£¬¹ËÃû˼Ò壬ËüÊÇRpcEndpointµÄÒýÓã¬ÌṩµÄ·½·¨sendµÈͬÓÚ!,
ask·½·¨µÈͬÓÚ?£¬sendÓÃÓÚµ¥Ïò·¢ËÍÇëÇó£¨RpcEndpointÖеÄreceiveÏìÓ¦Ëü£©£¬Ìṩfire-and-forgetÓïÒ壬¶øaskÌṩÇëÇóÏìÓ¦µÄÓïÒ壨RpcEndpointÖеÄreceiveAndReplyÏìÓ¦Ëü£©£¬Ä¬ÈÏÊÇÐèÒª·µ»ØresponseµÄ£¬´øÓг¬Ê±»úÖÆ£¬¿ÉÒÔͬ²½×èÈûµÈ´ý£¬Ò²¿ÉÒÔ·µ»ØÒ»¸öFuture¾ä±ú£¬²»×èÈû·¢ÆðÇëÇóµÄ¹¤×÷Ï̡߳£
RpcEndpointRefÊǿͻ§¶Ë·¢ÆðÇëÇóµÄÈë¿Ú£¬Ëü¿ÉÒÔ´ÓRpcEnvÖлñÈ¡£¬²¢ÇÒ´ÏÃ÷µÄ×ö±¾µØµ÷ÓûòÕßRPC¡£
RpcEnvºÍNettyRpcEnv
Àà¿âÖÐ×îºËÐĵľÍÊÇRpcEnv£¬¸Õ¸ÕÌáµ½ÁËÕâ¾ÍÊÇActorSystem£¬·þÎñ¶ËºÍ¿Í»§¶Ë¶¼¿ÉÒÔʹÓÃËüÀ´×öͨÐÅ¡£
¶ÔÓÚserver sideÀ´Ëµ£¬RpcEnvÊÇRpcEndpointµÄÔËÐл·¾³£¬¸ºÔðRpcEndpointµÄÕû¸öÉúÃüÖÜÆÚ¹ÜÀí£¬Ëü¿ÉÒÔ×¢²á»òÕßÏú»ÙEndpoint£¬½âÎöTCP²ãµÄÊý¾Ý°ü²¢·´ÐòÁл¯£¬·â×°³ÉRpcMessage£¬²¢ÇÒ·ÓÉÇëÇóµ½Ö¸¶¨µÄEndpoint£¬µ÷ÓÃÒµÎñÂß¼´úÂ룬Èç¹ûEndpointÐèÒªÏìÓ¦£¬°Ñ·µ»ØµÄ¶ÔÏóÐòÁл¯ºóͨ¹ýTCP²ãÔÙ´«Êäµ½Ô¶³Ì¶Ô¶Ë£¬Èç¹ûEndpoint·¢ÉúÒì³££¬ÄÇôµ÷ÓÃRpcCallContext.sendFailureÀ´°ÑÒì³£·¢ËÍ»ØÈ¥¡£
¶Ôclient sideÀ´Ëµ£¬Í¨¹ýRpcEnv¿ÉÒÔ»ñÈ¡RpcEndpointÒýÓã¬Ò²¾ÍÊÇRpcEndpointRefµÄ¡£
RpcEnvÊǺ;ßÌåµÄµ×²ãͨÐÅÄ£¿é½»»¥µÄ¸ºÔðÈË£¬ËüµÄ°éÉú¶ÔÏó°üº¬´´½¨RpcEnvµÄ·½·¨£¬Ç©ÃûÈçÏ£º
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress,
advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
} |
RpcEnvµÄ´´½¨ÓÉRpcEnvFactory¸ºÔð£¬RpcEnvFactoryĿǰֻÓÐÒ»¸ö×ÓÀàÊÇNettyRpcEnvFactory£¬ÔÀ´»¹ÓÐAkkaRpcEnvFactory¡£NettyRpcEnvFactory.create·½·¨Ò»µ©µ÷ÓþͻáÁ¢¼´ÔÚbindµÄaddressºÍportÉÏÆô¶¯server¡£
ËüÒÀÀµµÄRpcEnvConfig¾ÍÊÇÒ»¸ö°üº¬ÁËSparkConfÒÔ¼°Ò»Ð©²ÎÊý£¨kraps-rpcÖиüÃûΪRpcConf£©¡£RpcEnvµÄ²ÎÊý¶¼ÐèÒª´ÓRpcEnvConfigÖÐÄã¬×î»ù±¾µÄhostnameºÍport£¬»¹Óи߼¶Ð©µÄÁ¬½Ó³¬Ê±¡¢ÖØÊÔ´ÎÊý¡¢ReactorÏ̳߳شóСµÈµÈ¡£
ÏÂÃæ¿´¿´RpcEnv×î³£ÓõÄÁ½¸ö·½·¨£º
// ×¢²áendpoint£¬±ØÐëÖ¸¶¨Ãû³Æ£¬¿Í»§¶Ë·ÓɾͿ¿Õâ¸öÃû³ÆÀ´ÕÒendpoint
def setupEndpoint(name: String, endpoint: RpcEndpoint):
RpcEndpointRef
// Äõ½Ò»¸öendpointµÄÒýÓÃ
def setupEndpointRef(address: RpcAddress, endpointName:
String): RpcEndpointRef |
NettyRpcEnvÓÉNettyRpcEnvFactory.create´´½¨£¬ÕâÊÇÕû¸öSpark
coreºÍorg.apache.spark.spark-network-commonµÄÇÅÁº£¬ÄÚ²¿leverageµ×²ãÌṩµÄͨÐÅÄÜÁ¦£¬Í¬Ê±°ü×°ÁËÒ»¸öÀàActorµÄÓïÒå¡£ÉÏÃæÁ½¸öºËÐĵķ½·¨£¬setupEndpoint»áÔÚDispatcherÖÐ×¢²áEndpoint£¬setupEndpointRef»áÏÈÈ¥µ÷ÓÃRpcEndpointVerifier³¢ÊÔÑéÖ¤±¾µØ»òÕßÔ¶³ÌÊÇ·ñ´æÔÚij¸öendpoint£¬È»ºóÔÙ´´½¨RpcEndpointRef¡£¸ü¶à¹ØÓÚ·þÎñ¶Ë¡¢¿Í»§¶Ëµ÷ÓõÄϸ½Ú½«ÔÚʱÐòͼÖвûÊö£¬ÕâÀï²»ÔÙÕ¹¿ª¡£
DispatcherºÍInbox
NettyRpcEnvÖаüº¬Dispatcher£¬Ö÷ÒªÕë¶Ô·þÎñ¶Ë£¬°ïÖú·Óɵ½ÕýÈ·µÄRpcEndpoint£¬²¢ÇÒµ÷ÓÃÆäÒµÎñÂß¼¡£
ÕâÀïÐèÒªÏȲûÊöÏÂReactorÄ£ÐÍ£¬Spark RPCµÄSocket I/OÒ»¸öµäÐ͵ÄReactorÄ£Ð͵쬵«ÊǽáºÏÁËActor
patternÖеÄmailbox£¬¿ÉνÊÇÒ»ÖÖ»ìºÏµÄʵÏÖ·½Ê½¡£
ʹÓÃReactorÄ£ÐÍ£¬Óɵײãnetty´´½¨µÄEventLoop×öI/O¶à·¸´Óã¬ÕâÀïʹÓÃMultiple
ReactorsÕâÖÖÐÎʽ£¬ÈçÏÂͼËùʾ£¬´ÓnettyµÄ½Ç¶È¶øÑÔ£¬Main ReactorºÍSub Reactor¶ÔÓ¦BossGroupºÍWorkerGroupµÄ¸ÅÄǰÕ߸ºÔð¼àÌýTCPÁ¬½Ó¡¢½¨Á¢ºÍ¶Ï¿ª£¬ºóÕ߸ºÔðÕæÕýµÄI/O¶Áд£¬¶øÍ¼ÖеÄThreadPool¾ÍÊǵÄDispatcherÖеÄÏ̳߳أ¬ËüÀ´½âñÀ´ºÄʱµÄÒµÎñÂß¼ºÍI/O²Ù×÷£¬ÕâÑù¾Í¿ÉÒÔ¸üscalabe£¬Ö»ÐèÒªÉÙÊýµÄÏ߳̾ͿÉÒÔ´¦Àí³ÉǧÉÏÍòµÄÁ¬½Ó£¬ÕâÖÖ˼ÏëÊDZê×¼µÄ·ÖÖβßÂÔ£¬offload·ÇI/O²Ù×÷µ½ÁíÍâµÄÏ̳߳ء£
ÕæÕý´¦ÀíRpcEndpointµÄÒµÎñÂß¼ÔÚThreadPoolÀïÃæ£¬Öм俿ReactorÏß³ÌÖеÄhandler´¦Àídecode³ÉRpcMessage£¬È»ºóͶµÝµ½InboxÖУ¬ËùÒÔcomputeµÄ¹ý³ÌÔÚÁíÍâµÄÏÂÃæ½éÉܵÄDispatcherÏ̳߳ØÀïÃæ×ö¡£

¸Õ¸Õ»¹Ìáµ½ÁËActor patternÖÐmailboxģʽ£¬Spark RPC×îÔçÆðÔ´ÓÚAkka£¬ËùÒÔ½ø»¯µ½ÏÖÔÚ£¬ÈÔÈ»ÁËʹÓÃÁËÕâ¸öģʽ¡£ÕâÀï¾Í½éÉÜInbox£¬Ã¿¸öEndpoint¶¼ÓÐÒ»¸öInbox£¬InboxÀïÃæÓÐÒ»¸öInboxMessageµÄÁ´±í£¬InboxMessageÓкܶà×ÓÀ࣬¿ÉÒÔÊÇÔ¶³Ìµ÷ÓùýÀ´µÄRpcMessage£¬¿ÉÒÔÊÇÔ¶³Ìµ÷ÓùýÀ´µÄfire-and-forgetµÄµ¥ÏòÏûÏ¢OneWayMessage£¬»¹¿ÉÒÔÊǸ÷ÖÖ·þÎñÆô¶¯£¬Á´Â·½¨Á¢¶Ï¿ªµÈMessage£¬ÕâЩMessage¶¼»áÔÚInboxÄÚ²¿µÄ·½·¨ÄÚ×öģʽƥÅ䣬µ÷ÓÃÏàÓ¦µÄRpcEndpointµÄº¯Êý£¨¶¼ÊÇÒ»Ò»¶ÔÓ¦µÄ£©¡£
DispatcherÖаüº¬Ò»¸öMessageLoop£¬Ëü¶ÁÈ¡LinkedBlockingQueueÖеÄͶµÝRpcMessage£¬¸ù¾Ý¿Í»§¶ËÖ¸¶¨µÄEndpoint±êʶ£¬ÕÒµ½EndpointµÄInbox£¬È»ºóͶµÝ½øÈ¥£¬ÓÉÓÚÊÇ×èÈû¶ÓÁУ¬µ±Ã»ÓÐÏûÏ¢µÄʱºò×ÔÈ»×èÈû£¬Ò»µ©ÓÐÏûÏ¢£¬¾Í¿ªÊ¼¹¤×÷¡£DispatcherµÄThreadPool¸ºÔðÏû·ÑÕâЩMessage¡£
DispatcherµÄThreadPoolËüʹÓòÎÊýspark.rpc.netty.dispatcher.numThreadsÀ´¿ØÖÆÊýÁ¿£¬Èç¹ûkill
-3 ÿ¸öSpark driver»òÕßexecutor½ø³Ì£¬¶¼»á¿´µ½N¸ödispatcherỊ̈߳º
"dispatcher-event-loop-0"
#26 daemon prio=5 os_prio=31 tid=0x00007f8877153800
nid=0x7103 waiting on condition [0x000000011f78b000] |
ÄÇôÁíÍâµÄÎÊÌâÊÇË»áµ÷ÓÃDispatcher·Ö·¢MessageµÄ·½·¨ÄØ£¿´ð°¸ÊÇRpcHandlerµÄ×ÓÀàNettyRpcHandler£¬Õâ¾ÍÊÇReactorÖеÄÏß³Ì×öµÄÊÂÇé¡£RpcHandlerÊǵײãorg.apache.spark.spark-network-commonÌṩµÄhandler£¬µ±Ô¶³ÌµÄÊý¾Ý°ü½âÎö³É¹¦ºó£¬»áµ÷ÓÃÕâ¸öhandler×ö´¦Àí¡£
ÕâÑù¾ÍÍê³ÉÁËÒ»¸öÍêÈ«Òì²½µÄÁ÷³Ì£¬Network IOͨÐÅÓɵײ㸺Ôð£¬È»ºóÓÉDispatcher·Ö·¢£¬Ö»ÒªDispatcherÖеÄInboxMessageµÄÁ´±í×ã¹»´ó£¬ÄÇô¾Í¿ÉÒÔÈÃDispatcherÖеÄThreadPoolÂýÂýÏû»¯ÏûÏ¢£¬ºÍµ×²ãµÄIO½âñÀ´£¬ÍêÈ«ÔÚ¶ÀÁ¢µÄÏß³ÌÖÐÍê³É£¬Ò»µ©Íê³ÉEndpointÄÚ²¿ÒµÎñÂß¼£¬ÀûÓÃRpcCallContext»Øµ÷À´×öÏûÏ¢µÄ·µ»Ø¡£
Outbox
NettyRpcEnvÖаüº¬Ò»¸öConcurrentHashMap[RpcAddress, Outbox]£¬Ã¿¸öÔ¶³ÌEndpoint¶¼¶ÔÓ¦Ò»¸öOutbox£¬ÕâºÍÉÏÃæInboxÒ£ÏàºôÓ¦£¬ÊÇÒ»¸ömailboxËÆµÄʵÏÖ·½Ê½¡£
ºÍInboxÀàËÆ£¬OutboxÄÚ²¿°üº¬Ò»¸öOutboxMessageµÄÁ´±í£¬OutboxMessageÓÐÁ½¸ö×ÓÀ࣬OneWayOutboxMessageºÍRpcOutboxMessage£¬·Ö±ð¶ÔÓ¦µ÷ÓÃRpcEndpointµÄreceiveºÍreceiveAndReply·½·¨¡£
NettyRpcEnvÖеÄsendºÍask·½·¨»áµ÷ÓÃÖ¸¶¨µØÖ·OutboxÖеÄsend·½·¨£¬µ±Ô¶³ÌÁ¬½Ó佨Á¢Ê±£¬»áÏȽ¨Á¢Á¬½Ó£¬È»ºóÈ¥Ïû»¯OutboxMessage¡£
ͬÑù£¬Ò»¸öÎÊÌâÊÇOutboxÖеÄsend·½·¨ÈçºÎ½«ÏûϢͨ¹ýNetwork IO·¢ËͳöÈ¥£¬Èç¹ûÊÇask·½·¨ÓÖÊÇÈçºÎ¶ÁȡԶ³ÌÏìÓ¦µÄÄØ£¿´ð°¸ÊÇsend·½·¨Í¨¹ýorg.apache.spark.spark-network-common´´½¨µÄTransportClient·¢ËͳöÈ¥ÏûÏ¢£¬ÓÉReactorÏ̸߳ºÔðÐòÁл¯²¢ÇÒ·¢ËͳöÈ¥£¬Ã¿¸öMessage¶¼»á·µ»ØÒ»¸öUUID£¬ÓɵײãÀ´Î¬»¤Ò»¸ö·¢ËͳöÈ¥ÏûÏ¢ÓëÆäCallbackµÄHashMap£¬µ±NettyÊÕµ½ÍêÕûµÄÔ¶³ÌRpcResponseʱºò£¬»Øµ÷ÏìÓ¦µÄCallback£¬×ö·´ÐòÁл¯£¬½ø¶ø»Øµ÷Spark
coreÖеÄÒµÎñÂß¼£¬×öPromise/FutureµÄdone£¬ÉϲãÍ˳ö×èÈû¡£
ÕâÒ²ÊÇÒ»¸öÒì²½µÄ¹ý³Ì£¬·¢ËÍÏûÏ¢µ½Outboxºó£¬Ö±½Ó·µ»Ø£¬Network IOͨÐÅÓɵײ㸺Ôð£¬Ò»µ©RPCµ÷Óóɹ¦»òÕßʧ°Ü£¬¶¼»á»Øµ÷ÉϲãµÄº¯Êý£¬×öÏàÓ¦µÄ´¦Àí¡£
spark-network-commonÖеÄÀà
ÕâÀïÔݲ»×ö¹ý¶àµÄÕ¹¿ª£¬¶¼ÊÇ»ùÓÚNettyµÄ·â×°£¬ÓÐÐËȤµÄ¶ÁÕß¿ÉÒÔ×ÔÐÐÔĶÁÔ´Â룬µ±È»»¹¿ÉÒԲο¼ÎÒ֮ǰ¿ªÔ´µÄNavi-pbrpc¿ò¼ÜµÄ´úÂ룬ÆäÔÀíÊÇ»ù±¾ÏàͬµÄ¡£
2.2 ʱÐòͼ·ÖÎö
·þÎñÆô¶¯
»°²»¶àÊö£¬Ö±½ÓÉÏͼ¡£

·þÎñ¶ËÏìÓ¦

µÚÒ»½×¶Î£¬IO½ÓÊÕ¡£TransportRequestHandlerÊÇnettyµÄ»Øµ÷handler£¬Ëü»á¸ù¾Ýwire
format£¨ÏÂÎÄ»á½éÉÜ£©½âÎöºÃÒ»¸öÍêÕûµÄÊý¾Ý°ü£¬½»¸øNettyRpcEnv×ö·´ÐòÁл¯£¬Èç¹ûÊÇRPCµ÷ÓûṹÔìRpcMessage£¬È»ºó»Øµ÷RpcHandlerµÄ·½·¨´¦ÀíRpcMessage£¬ÄÚ²¿»áµ÷ÓÃDispatcher×öRpcMessageµÄͶµÝ£¬·Åµ½InboxÖУ¬µ½´Ë½áÊø¡£
µÚ¶þ½×¶Î£¬IOÏìÓ¦¡£MessageLoop»ñÈ¡´ø´¦ÀíµÄRpcMessage£¬½»¸øDispatcherÖеÄThreadPool×ö´¦Àí£¬Êµ¼Ê¾ÍÊǵ÷ÓÃRpcEndpointµÄÒµÎñÂß¼£¬Í¨¹ýRpcCallContext½«ÏûÏ¢ÐòÁл¯£¬Í¨¹ý»Øµ÷º¯Êý£¬¸æËßTransportRequestHandlerÕâÓÐÒ»¸öÏûÏ¢´¦ÀíÍê±Ï£¬ÏìÓ¦»ØÈ¥¡£
ÕâÀïÇëÖØµãÌå»áÒì²½´¦Àí´øÀ´µÄ±ãÀû£¬Ê¹ÓÃReactorºÍActor mailboxµÄ½áºÏµÄģʽ£¬½âñîÁËÏûÏ¢µÄ»ñÈ¡ÒÔ¼°´¦ÀíÂß¼¡£
¿Í»§¶ËÇëÇó

¿Í»§¶ËÒ»°ãÐèÒªÏȽ¨Á¢RpcEnv£¬È»ºó»ñÈ¡RpcEndpointRef¡£
µÚÒ»½×¶Î£¬IO·¢ËÍ¡£ÀûÓÃRpcEndpointRef×ösend»òÕßask¶¯×÷£¬ÕâÀïÒÔsendΪÀý£¬send»áÏȽøÐÐÏûÏ¢µÄÐòÁл¯£¬È»ºóͶµÝµ½Ö¸¶¨µØÖ·µÄOutboxÖУ¬OutboxÈç¹û·¢ÏÖÁ¬½Ó佨Á¢ÔòÏȳ¢ÊÔ½¨Á¢Á¬½Ó£¬È»ºóµ÷ÓõײãµÄTransportClient·¢ËÍÊý¾Ý£¬Ö±½Óͨ¹ý¸ÃnettyµÄAPIÍê³É£¬Íê³Éºó¼´¿É·µ»Ø£¬ÕâÀï·µ»ØÁËUUID×÷ΪÏûÏ¢µÄ±êʶ£¬ÓÃÓÚÏÂÒ»¸ö½×¶ÎµÄ»Øµ÷£¬Ê¹ÓõĽǶÈÀ´Ëµ¿ÉÒÔ·µ»ØÒ»¸öFuture£¬¿Í»§¶Ë¿ÉÒÔ×èÈû»òÕß¼ÌÐø×öÆäËû²Ù×÷¡£
µÚ¶þ£¬IO½ÓÊÕ¡£TransportResponseHandler½ÓÊÕµ½Ô¶³ÌµÄÏìÓ¦ºó£¬»áÏÈ×ö·´ÐòÁкţ¬È»ºó»Øµ÷µÚÒ»½×¶ÎµÄFuture£¬Íê³Éµ÷Óã¬Õâ¸ö¹ý³ÌÈ«²¿ÔÚReactorÏß³ÌÖÐÍê³ÉµÄ£¬Í¨¹ýFuture×öÏ̼߳äµÄ֪ͨ¡£
Èý¡¢Spark RPCºËÐļ¼Êõ×ܽá
Spark RPC×÷ΪRPC´«Êä²ãÑ¡ÔñTCPÐÒ飬×ö¿É¿¿µÄ¡¢È«Ë«¹¤µÄbinary streamͨµÀ¡£
×öÒ»¸ö¸ßÐÔÄÜ/scalableµÄRPC£¬ÐèÒªÄܹ»Âú×ãµÚÒ»£¬·þÎñ¶Ë¾¡¿ÉÄܶàµÄ´¦Àí²¢·¢ÇëÇ󣬵ڶþ£¬Í¬Ê±¾¡¿ÉÄ̵ܶĴ¦ÀíÍê±Ï¡£CPUºÍI/O֮ǰÌìÈ»´æÔÚ×ŲîÒì£¬ÍøÂç´«ÊäµÄÑÓʱ²»¿É¿Ø£¬CPU×ÊÔ´±¦¹ó£¬ÏµÍ³½ø³Ì/Ïß³Ì×ÊÔ´±¦¹ó£¬ÎªÁ˾¡¿ÉÄܱÜÃâSocket
I/O×èÈû·þÎñ¶ËºÍ¿Í»§¶Ëµ÷Óã¬ÓÐһЩģʽ£¨pattern£©ÊÇ¿ÉÒÔÓ¦Óõġ£Spark RPCµÄI/O
ModelÓÉÓÚ²ÉÓÃÁËNetty£¬Òò´ËʹÓõĵײãµÄI/O¶à·¸´Óã¨I/O Multiplexing£©»úÖÆ£¬ÕâÀï¿ÉÒÔͨ¹ýspark.rpc.io.mode²ÎÊýÉèÖ㬲»Í¬µÄƽ̨ʹÓõļ¼Êõ²»Í¬£¬ÀýÈçlinuxʹÓÃepoll¡£
Ïß³ÌÄ£ÐͲÉÓÃMulti-Reactors + mailboxµÄÒì²½·½Ê½À´´¦Àí£¬ÔÚÉÏÎÄÖÐÒѾ½éÉܹý¡£
Schema DeclarationºÍÐòÁл¯·½Ã棬Spark RPCĬÈϲÉÓÃJava native
serialization·½°¸£¬Ö÷Òª´Ó¼æÈÝÐÔºÍJVMƽ̨ÄÚ²¿×é¼þͨÐÅ£¬ÒÔ¼°scalaÓïÑÔµÄÈںϿ¼ÂÇ£¬ËùÒÔ²»¾ß±¸¿çÓïÑÔͨÐŵÄÄÜÁ¦£¬ÐÔÄÜÉÏÒ²²»ÊÇ×·Çó¼«Ö£¬Ä¿Ç°»¹Ã»ÓÐʹÓÃKyroµÈ¸üºÃÐòÁл¯ÐÔÄܺÍÊý¾Ý´óСµÄ·½°¸¡£
ÐÒé½á¹¹£¬Spark RPC²ÉÓÃ˽ÓеÄwire formatÈçÏ£¬²ÉÓÃheadr+payloadµÄ×éÖ¯·½Ê½£¬headerÖаüÀ¨Õû¸öframeµÄ³¤¶È£¬messageµÄÀàÐÍ£¬ÇëÇóUUID¡£Îª½â¾öTCPÕ³°üºÍ°ë°üÎÊÌ⣬ÒÔ¼°×éÖ¯³ÉÍêÕûµÄMessageµÄÂß¼¶¼ÔÚorg.apache.spark.network.protocol.MessageEncoderÖС£

ʹÓÃwireshake¾ßÌå·ÖÎöһϡ£
Ê×ÏÈ¿´Ò»¸öRPCÇëÇ󣬾ÍÊǵ÷ÓõÚÒ»ÕÂ˵µÄHelloEndpoint£¬¿Í»§¶Ëµ÷Ó÷ÖÁ½¸öTCP Segment´«Ê䣬ÕâÊÇÒòΪSparkʹÓÃnettyµÄʱºòheaderºÍbody·Ö±ðwriteAndFlush³öÈ¥¡£
ÏÂͼÊǵÚÒ»¸öTCP segment£º

Àý×ÓÖÐÀ¶É«µÄ²¿·ÖÊÇheader£¬Í·ÖеÄ×Ö½Ú½âÎöÈçÏ£º
00 00 00 00
00 00 05 d2 // Ê®½øÖÆ1490£¬ÊÇÕû¸öframeµÄ³¤¶È |
03Ò»¸ö×Ö½Ú±íʾµÄÊÇRpcRequest£¬Ã¶¾Ù¶¨ÒåÈçÏ£º
RpcRequest(3)
RpcResponse(4)
RpcFailure(5)
StreamRequest(6)
StreamResponse(7)
StreamFailure(8),
OneWayMessage(9)
User(-1) |
ÿ¸ö×Ö½ÚµÄÒâÒåÈçÏ£º
4b ac a6 9f
83 5d 17 a9 // 8¸ö×Ö½ÚÊÇUUID
05 bd // Ê®½øÖÆ1469£¬payload³¤¶È |
¾ßÌåµÄPayload¾Í³¤ÏÂÃæÕâ¸öÑù×Ó£¬¿ÉÒÔ¿´³öʹÓÃJava native serialization£¬Ò»¸ö¼òµ¥µÄEchoÇëÇó¾ÍÓÐ1469¸ö×Ö½Ú£¬»¹ÊǺܴóµÄ£¬ÐòÁл¯µÄЧÂʲ»¸ß¡£µ«ÊÇSpark
RPC¶¨Î»ÄÚ²¿Í¨ÐÅ£¬²»ÊÇÒ»¸öͨÓõÄRPC¿ò¼Ü£¬²¢ÇÒʹÓõÄÁ¿·Ç³£Ð¡£¬ËùÒÔÕâµãÏûºÄÒ²¾Í¿ÉÒÔºöÂÔÁË£¬»¹ÓÐSpark
Structured StreamingʹÓøÃÐòÁл¯·½Ê½£¬ÆäÐÔÄÜ»¹ÊÇ¿ÉÒÔÂú×ãÒªÇóµÄ¡£

ÁíÍ⣬×÷ÕßÔÚkraps-rpcÖл¹¸øSpark-rpc×öÁËÒ»´ÎÐÔÄܲâÊÔ£¬¾ßÌå¿ÉÒԲο¼github¡£
×ܽá
×÷Õß´ÓºÃÆæµÄ½Ç¶ÈÀ´Éî¶ÈÍÚ¾òÁËÏÂSpark RPCµÄÄÚÄ»£¬²¢ÇÒ´Ó2.1°æ±¾µÄSpark coreÖжÀÁ¢³öÁËÒ»¸öרÃŵÄÏîÄ¿Kraps-rpc£¬·Åµ½ÁËgithubÒÔ¼°·¢²¼µ½MavenÖÐÑë²Ö¿â×öѧϰʹÓã¬ÌṩÁ˱ȽϺõÄÉÏÊÖÎĵµ¡¢²ÎÊýÉèÖúÍÐÔÄÜÆÀ¹À£¬ÔÚÕûºÏkraps-rpc»¹·¢ÏÖÁËÒ»¸öСµÄ¸Ä½øµã£¬¸øSparkÌáÁËÒ»¸öPR¡ª¡ª[SPARK-21701]£¬ÒѾ±»mergeµ½ÁËÖ÷¸É£¬ËãÊÇcontributeÉçÇøÁË£¨10086¸ö¿ªÐÄ£©¡£
½Ó×ÅÉîÈëÆÊÎöÁËSpark RPCÄ£¿éÄÚµÄÀà×éÖ¯¹ØÏµ£¬Ê¹ÓÃUMLÀàͼºÍʱÐòͼ°ïÖú¶ÁÕ߸üºÃµÄÀí½âһЩºËÐĵĸÅÄ°üÀ¨RpcEnv£¬RpcEndpoint£¬RpcEndpointRefµÈ£¬ÒÔ¼°I/OµÄÉè¼ÆÄ£Ê½£¬°üÀ¨I/O¶à·¸´Óã¬ReactorºÍActor
mailboxµÈ£¬ÕâÀﻹÊÇÖØµãÌáÏÂSpark RPCµÄÉè¼ÆÕÜѧ£¬ÀûÓÃnettyÇ¿´óµÄSocket I/OÄÜÁ¦£¬¹¹½¨Ò»¸öÒì²½µÄͨÐÅ¿ò¼Ü¡£×îºó£¬´ÓTCP²ãµÄsegment¶þ½øÖƽǶȷÖÎöÁËwire
protocol¡£ |