Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
ÉîÈë½âÎöSparkÖеÄRPC
 
À´Ô´£ºcsdn ·¢²¼ÓÚ£º2017-10-13
  3828  次浏览      27
 

SparkÊÇÒ»¸ö¿ìËٵġ¢Í¨Óõķֲ¼Ê½¼ÆËãϵͳ£¬¶ø·Ö²¼Ê½µÄÌØÐÔ¾ÍÒâζ×Å£¬±ØÈ»´æÔÚ½Úµã¼äµÄͨÐÅ¡£±¾ÎÄÖ÷Òª½éÉܲ»Í¬µÄSpark×é¼þÖ®¼äÊÇÈçºÎͨ¹ýRPC£¨Remote Procedure Call) ½øÐеã¶ÔµãͨÐŵ쬷ÖΪÈý¸öÕ½ڣº

Spark RPCµÄ¼òµ¥Ê¾ÀýºÍʵ¼ÊÓ¦Óã»

Spark RPCÄ£¿éµÄÉè¼ÆÔ­Àí£»

Spark RPCºËÐļ¼Êõ×ܽᡣ

Ò»¡¢Spark RPCµÄ¼òµ¥Ê¾ÀýºÍʵ¼ÊÓ¦ÓÃ

SparkµÄRPCÖ÷ÒªÔÚÁ½¸öÄ£¿éÖУº

ÔÚSpark-coreÖУ¬Ö÷Òª³ÐÔØÁ˸üºÃµÄ·â×°serverºÍclientµÄ×÷Óã¬ÒÔ¼°ºÍscalaÓïÑÔµÄÈںϣ¬ËüÒÀÀµÓÚÄ£¿éorg.apache.spark.spark-network-common£»

ÔÚorg.apache.spark.spark-network-commonÖУ¬¸ÃÄ£¿éÊÇjavaÓïÑÔ±àдµÄ£¬×îа汾ÊÇ»ùÓÚnetty4¿ª·¢µÄ£¬Ìṩȫ˫¹¤¡¢¶à·¸´ÓÃI/OÄ£Ð͵ÄSocket I/OÄÜÁ¦£¬SparkµÄ´«ÊäЭÒé½á¹¹£¨wire protocol£©Ò²ÊÇ×Ô¶¨ÒåµÄ¡£

ΪÁ˸üºÃµÄÁ˽âSpark RPCµÄÄÚ²¿ÊµÏÖϸ½Ú£¬ÎÒ»ùÓÚSpark 2.1°æ±¾³éÀëÁËRPCͨÐŵIJ¿·Ö£¬µ¥¶ÀÆôÁËÒ»¸öÏîÄ¿£¬·Åµ½ÁËgithubÒÔ¼°·¢²¼µ½MavenÖÐÑë²Ö¿â×öѧϰʹÓã¬ÌṩÁ˱ȽϺõÄÉÏÊÖÎĵµ¡¢²ÎÊýÉèÖúÍÐÔÄÜÆÀ¹À¡£ÏÂÃæ¾Íͨ¹ýÕâ¸öÄ£¿é¶ÔSpark RPCÏÈ×öÒ»¸ö¸ÐÐÔµÄÈÏʶ¡£

ÒÔϵĴúÂë¾ù¿ÉÒÔÔÚkraps-rpcÕÒµ½¡£

1.1 ¼òµ¥Ê¾Àý

¼ÙÉèÎÒÃÇÒª¿ª·¢Ò»¸öHello·þÎñ£¬¿Í»§¶Ë¿ÉÒÔ´«Êästring£¬·þÎñ¶ËÏìÓ¦hi»òÕßbye£¬²¢echo»ØÈ¥ÊäÈëµÄstring¡£

µÚÒ»²½£¬¶¨ÒåÒ»¸öHelloEndpoint¼Ì³Ð×ÔRpcEndpoint±íÃ÷¿ÉÒÔ²¢·¢µÄµ÷Óø÷þÎñ£¬Èç¹û¼Ì³Ð×ÔThreadSafeRpcEndpointÔò±íÃ÷¸ÃEndpoint²»ÔÊÐí²¢·¢¡£

class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
override def onStart(): Unit = {
println("start hello endpoint")
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case SayHi(msg) => {
println(s"receive $msg")
context.reply(s"hi, $msg")
}
case SayBye(msg) => {
println(s"receive $msg")
context.reply(s"bye, $msg")
}
}

override def onStop(): Unit = {
println("stop hello endpoint")
}
}

case class SayHi(msg: String)
case class SayBye(msg: String)

ºÍJava´«Í³µÄRPC½â¾ö·½°¸¶Ô±È£¬¿ÉÒÔ¿´³öÕâÀï²»Óö¨Òå½Ó¿Ú»òÕß·½·¨±êʾ£¨±ÈÈçͨ³£µÄid»òÕßname£©£¬Ê¹ÓÃscalaµÄģʽƥÅä½øÐз½·¨µÄ·ÓÉ¡£ËäÈ»µã¶ÔµãͨÐŵįõÔ¼½»»»ÊÜÖÆÓÚÓïÑÔ£¬ÕâÀï¾ÍÊÇSayHiºÍSayByeÁ½¸öcase class£¬µ«ÊÇSpark RPC¶¨Î»ÓÚÄÚ²¿×é¼þͨÐÅ£¬ËùÒÔÎÞÉË´óÑÅ¡£

µÚ¶þ²½£¬°Ñ¸Õ¸Õ¿ª·¢ºÃµÄEndpoint½»¸øSpark RPC¹ÜÀíÆäÉúÃüÖÜÆÚ£¬ÓÃÓÚÏìÓ¦ÍⲿÇëÇó¡£RpcEnvServerConfig¿ÉÒÔ¶¨ÒåһЩ²ÎÊý¡¢serverÃû³Æ£¨½ö½öÊÇÒ»¸ö±êʶ£©¡¢bindµØÖ·ºÍ¶Ë¿Ú¡£Í¨¹ýNettyRpcEnvFactoryÕâ¸ö¹¤³§·½·¨£¬Éú³ÉRpcEnv£¬RpcEnvÊÇÕû¸öSpark RPCµÄºËÐÄËùÔÚ£¬ºóÎÄ»áÏêϸչ¿ª£¬Í¨¹ýsetupEndpoint½«¡±hello-service¡±Õâ¸öÃû×ֺ͵ÚÒ»²½¶¨ÒåµÄEndpoint°ó¶¨£¬ºóÐøclientµ÷Ó÷Óɵ½Õâ¸öEndpoint¾ÍÐèÒª¡±hello-service¡±Õâ¸öÃû×Ö¡£µ÷ÓÃawaitTerminationÀ´×èÈû·þÎñ¶Ë¼àÌýÇëÇó²¢ÇÒ´¦Àí¡£

val config = RpcEnvServerConfig(new RpcConf(), "hello-server", "localhost", 52345)
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service", helloEndpoint)
rpcEnv.awaitTermination()

µÚÈý²½£¬¿ª·¢Ò»¸öclientµ÷ÓÃ¸Õ¸ÕÆô¶¯µÄserver£¬Ê×ÏÈRpcEnvClientConfigºÍRpcEnv¶¼ÊDZØÐëµÄ£¬È»ºóͨ¹ý¸Õ¸ÕÌáµ½µÄ¡±hello-service¡±Ãû×Öн¨Ò»¸öÔ¶³ÌEndpointµÄÒýÓã¨Ref£©£¬¿ÉÒÔ¿´×öÊÇstub£¬ÓÃÓÚµ÷Óã¬ÕâÀïÊ×ÏÈչʾͨ¹ýÒì²½µÄ·½Ê½À´×öÇëÇó¡£

val rpcConf = new RpcConf()
val config = RpcEnvClientConfig (rpcConf, " hello-client")
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create (config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef (RpcAddress ("localhost", 52345), "hell-service")
val future: Future[String] = endPointRef.ask [String] (SayHi("neo"))
future.onComplete {
case scala.util.Success(value) = > println(s"Got the result = $value")
case scala.util.Failure(e) = > println (s"Got error: $e")
}
Await.result (future, Duration.apply ("30s"))

Ò²¿ÉÒÔͨ¹ýͬ²½µÄ·½Ê½£¬ÔÚ×îеÄSparkÖÐaskWithRetryʵ¼ÊÒѸüÃûΪaskSync¡£

val result = endPointRef.askWithRetry [String] (SayBye ("neo"))

Õâ¾ÍÊÇSpark RPCµÄͨÐŹý³Ì£¬Ê¹ÓÃÆðÀ´Ò×ÓÃÐÔ¿ÉÏë¶øÖª£¬·Ç³£¼òµ¥£¬RPC¿ò¼ÜÆÁ±ÎÁËSocket I/OÄ£ÐÍ¡¢Ïß³ÌÄ£ÐÍ¡¢ÐòÁл¯/·´ÐòÁл¯¹ý³Ì¡¢Ê¹ÓÃnetty×öÁ˰üʶ±ð£¬³¤Á¬½Ó£¬ÍøÂçÖØÁ¬ÖØÊԵȻúÖÆ¡£

1.2 ʵ¼ÊÓ¦ÓÃ

ÔÚSparkÄÚ²¿£¬ºÜ¶àµÄEndpointÒÔ¼°EndpointRefÓë֮ͨÐŶ¼ÊÇͨ¹ýÕâÖÖÐÎʽµÄ£¬¾ÙÀýÀ´Ëµ±ÈÈçdriverºÍexecutorÖ®¼äµÄ½»»¥Óõ½ÁËÐÄÌø»úÖÆ£¬Ê¹ÓÃHeartbeatReceiverÀ´ÊµÏÖ£¬ÕâÒ²ÊÇÒ»¸öEndpoint£¬ËüµÄ×¢²áÔÚSparkContext³õʼ»¯µÄʱºò×öµÄ£¬´úÂëÈçÏ£º

_heartbeatReceiver = env.rpcEnv.setupEndpoint (HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

¶øËüµÄµ÷ÓÃÔÚExecutorÄڵķ½Ê½ÈçÏ£º

val message = Heartbeat( executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val response = heartbeatReceiverRef.askWithRetry [HeartbeatResponse] (message, RpcTimeou t(conf, "spark.executor.heartbeatInterval", "10s"))

¶þ¡¢Spark RPCÄ£¿éµÄÉè¼ÆÔ­Àí

Ê×ÏÈ˵Ã÷Ï£¬×ÔSpark 2.0ºóÒѾ­°ÑAkkaÕâ¸öRPC¿ò¼Ü°þÀë³öÈ¥ÁË£¨Ïêϸ¼ûSPARK-5293£©£¬Ô­ÒòºÜ¼òµ¥£¬ÒòΪºÜ¶àÓû§»áʹÓÃAkka×öÏûÏ¢´«µÝ£¬ÄÇô¾Í»áºÍSparkÄÚǶµÄ°æ±¾²úÉú³åÍ»£¬¶øSparkÒ²½ö½öÓÃÁËAkka×öRPC£¬ËùÒÔ2.0Ö®ºó£¬»ùÓڵײãµÄorg.apache.spark.spark-network-commonÄ£¿éʵÏÖÁËÒ»¸öÀàËÆAkka ActorÏûÏ¢´«µÝģʽµÄscalaÄ£¿é£¬·â×°ÔÚÁËcoreÀïÃæ£¬kraps-rpcÒ²¾ÍÊǰÑÕâ¸ö²¿·Ö´ÓcoreÀïÃæ°þÀë³öÀ´¶ÀÁ¢ÁËÒ»¸öÏîÄ¿¡£

ËäÈ»°þÀëÁËAkka£¬µ«ÊÇ»¹ÊÇÑØÏ®ÁËActorģʽÖеÄһЩ¸ÅÄÔÚÏÖÔÚµÄSpark RPCÖÐÓÐÈçÏÂÓ³Éä¹ØÏµ¡£

RpcEndpoint => Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem

µ×²ãͨÐÅÈ«²¿Ê¹ÓÃnetty½øÐÐÁËÌæ»»£¬Ê¹ÓõÄÊÇorg.apache.spark.spark-network-commonÕâ¸öÄÚ²¿lib¡£

2.1 Ààͼ·ÖÎö

ÕâÀïÏÈÉÏÒ»¸öUMLͼչʾÁËSpark RPCÄ£¿éÄÚµÄÀà¹ØÏµ£¬°×É«µÄÊÇSpark-coreÖеÄscalaÀ࣬»ÆÉ«µÄÊÇorg.apache.spark.spark-network-commonÖеÄjavaÀà¡£

²»Òª±»ÕâÕÅͼËùÏŵ¹£¬¾­¹ýÏÂÃæµÄ½âÊÍ·ÖÎö£¬ÏàÐŶÁÕß¿ÉÒÔÁì»áÆäÄÚº­£¬²»ÓÃϸ¾¿ÆäÉè¼ÆµÄºÏÀí¶È£¬SparkÊÇÒ»¸ö·¢Õ¹ºÜ¿ì¡¢²»¶ÏÑݽøµÄÏîÄ¿£¬´úÂë²»ÊÇÒ»³É²»±äµÄ£¬³ÖÐø±ä»¯ÊÇÒ»¶¨µÄ¡£

RpcEndpointºÍRpcCallContext

ÏÈ¿´×î×ó²àµÄRpcEndpoint£¬RpcEndpointÊÇÒ»¸ö¿ÉÒÔÏìÓ¦ÇëÇóµÄ·þÎñ£¬ºÍAkkaÖеÄActorÀàËÆ£¬´ÓËüµÄÌṩµÄ·½·¨Ç©Ãû£¨ÈçÏ£©¿ÉÒÔ¿´³ö£¬receive·½·¨Êǵ¥Ïò·½Ê½µÄ£¬¿ÉÒÔ±È×÷UDP£¬¶øreceiveAndReplyÊÇÓ¦´ð·½Ê½µÄ£¬¿ÉÒÔ±È×÷TCP¡£ËüµÄ×ÓÀàʵÏÖ¿ÉÒÔÑ¡ÔñÐԵĸ²¸ÇÕâÁ½¸öº¯Êý£¬ÎÒÃǵÚÒ»ÕÂʵÏÖµÄHelloEndpointÒÔ¼°SparkÖеÄHeartbeatReceiver¶¼ÊÇËüµÄ×ÓÀà¡£

def receive: PartialFunction[Any, Unit] = {
case _ => throw new RpcException(self + " does not implement 'receive'")
}

def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new RpcException(self + " won't reply anything"))
}

ÆäÖÐRpcCallContextÊÇÓÃÓÚ·ÖÀëºËÐÄÒµÎñÂß¼­ºÍµ×²ã´«ÊäµÄÇŽӷ½·¨£¬ÕâÒ²¿ÉÒÔ¿´³öSpark RPC¶àÓÃ×éºÏ£¬¾ÛºÏÒÔ¼°»Øµ÷callbackµÄÉè¼ÆÄ£Ê½À´×öOO³éÏó£¬ÕâÑù¿ÉÒÔ°þÀëÒµÎñÂß¼­->RPC·â×°£¨Spark-coreÄ£¿éÄÚ£©->µ×²ãͨÐÅ£¨spark-network-common£©ÈýÕß¡£RpcCallContext¿ÉÒÔÓÃÓڻظ´Õý³£µÄÏìÓ¦ÒÔ¼°´íÎóÒì³££¬ÀýÈ磺

dreply(response: Any) // »Ø¸´Ò»¸ömessage£¬¿ÉÒÔÊÇÒ»¸öcase class¡£
sendFailure(e: Throwable) // »Ø¸´Ò»¸öÒì³££¬¿ÉÒÔÊÇExceptionµÄ×ÓÀ࣬ÓÉÓÚSpark RPCĬÈϲÉÓÃJavaÐòÁл¯·½Ê½£¬ËùÒÔÒì³£¿ÉÒÔÍêÕûµÄÔÚ¿Í»§¶Ë»¹Ô­²¢ÇÒ×÷Ϊcause re-throw³öÈ¥¡£

RpcCallContextÒ²·ÖΪÁËÁ½¸ö×ÓÀ࣬·Ö±ðÊÇLocalNettyRpcCallContextºÍRemoteNettyRpcCallContext£¬Õâ¸öÖ÷ÒªÊÇ¿ò¼ÜÄÚ²¿Ê¹Óã¬Èç¹ûÊDZ¾µØ¾Í×ßLocalNettyRpcCallContextÖ±½Óµ÷ÓÃEndpoint¼´¿É£¬·ñÔò¾Í×ßRemoteNettyRpcCallContextÐèҪͨ¹ýRPCºÍÔ¶³Ì½»»¥£¬ÕâµãÒ²ÌåÏÖÁËRPCµÄºËÐĸÅÄ¾ÍÊÇÈçºÎÖ´ÐÐÁíÍâÒ»¸öµØÖ·¿Õ¼äÉϵĺ¯Êý¡¢·½·¨£¬¾Í·Â·ðÔÚ±¾µØµ÷ÓÃÒ»Ñù¡£

ÁíÍ⣬RpcEndpoint»¹ÌṩÁËһϵÁлص÷º¯Êý¸²¸Ç¡£

onError

onConnected

onDisconnected

onNetworkError

onStart

onStop

stop

ÁíÍâÐèҪעÒâÏ£¬ËüµÄÒ»¸ö×ÓÀàÊÇThreadSafeRpcEndpoint£¬ºÜ¶àSparkÖеÄEndpoint¼Ì³ÐÁËÕâ¸öÀ࣬Spark RPC¿ò¼Ü¶ÔÕâÖÖEndpoint²»×ö²¢·¢´¦Àí£¬Ò²¾ÍÊÇͬһʱ¼äÖ»ÔÊÐíÒ»¸öÏß³ÌÔÚ×öµ÷Óá£

»¹ÓÐÒ»¸öĬÈϵÄRpcEndpoint½Ð×öRpcEndpointVerifier£¬Ã¿Ò»¸öRpcEnv³õʼ»¯µÄʱºò¶¼»á×¢²áÉÏÕâ¸öEndpoint£¬ÒòΪ¿Í»§¶ËµÄµ÷ÓÃÿ´Î¶¼ÐèÒªÏÈѯÎÊ·þÎñ¶ËÊÇ·ñ´æÔÚijһ¸öEndpoint¡£

RpcEndpointRef

RpcEndpointRefÀàËÆÓÚAkkaÖÐActorRef£¬¹ËÃû˼Ò壬ËüÊÇRpcEndpointµÄÒýÓã¬ÌṩµÄ·½·¨sendµÈͬÓÚ!, ask·½·¨µÈͬÓÚ?£¬sendÓÃÓÚµ¥Ïò·¢ËÍÇëÇó£¨RpcEndpointÖеÄreceiveÏìÓ¦Ëü£©£¬Ìṩfire-and-forgetÓïÒ壬¶øaskÌṩÇëÇóÏìÓ¦µÄÓïÒ壨RpcEndpointÖеÄreceiveAndReplyÏìÓ¦Ëü£©£¬Ä¬ÈÏÊÇÐèÒª·µ»ØresponseµÄ£¬´øÓг¬Ê±»úÖÆ£¬¿ÉÒÔͬ²½×èÈûµÈ´ý£¬Ò²¿ÉÒÔ·µ»ØÒ»¸öFuture¾ä±ú£¬²»×èÈû·¢ÆðÇëÇóµÄ¹¤×÷Ï̡߳£

RpcEndpointRefÊǿͻ§¶Ë·¢ÆðÇëÇóµÄÈë¿Ú£¬Ëü¿ÉÒÔ´ÓRpcEnvÖлñÈ¡£¬²¢ÇÒ´ÏÃ÷µÄ×ö±¾µØµ÷ÓûòÕßRPC¡£

RpcEnvºÍNettyRpcEnv

Àà¿âÖÐ×îºËÐĵľÍÊÇRpcEnv£¬¸Õ¸ÕÌáµ½ÁËÕâ¾ÍÊÇActorSystem£¬·þÎñ¶ËºÍ¿Í»§¶Ë¶¼¿ÉÒÔʹÓÃËüÀ´×öͨÐÅ¡£

¶ÔÓÚserver sideÀ´Ëµ£¬RpcEnvÊÇRpcEndpointµÄÔËÐл·¾³£¬¸ºÔðRpcEndpointµÄÕû¸öÉúÃüÖÜÆÚ¹ÜÀí£¬Ëü¿ÉÒÔ×¢²á»òÕßÏú»ÙEndpoint£¬½âÎöTCP²ãµÄÊý¾Ý°ü²¢·´ÐòÁл¯£¬·â×°³ÉRpcMessage£¬²¢ÇÒ·ÓÉÇëÇóµ½Ö¸¶¨µÄEndpoint£¬µ÷ÓÃÒµÎñÂß¼­´úÂ룬Èç¹ûEndpointÐèÒªÏìÓ¦£¬°Ñ·µ»ØµÄ¶ÔÏóÐòÁл¯ºóͨ¹ýTCP²ãÔÙ´«Êäµ½Ô¶³Ì¶Ô¶Ë£¬Èç¹ûEndpoint·¢ÉúÒì³££¬ÄÇôµ÷ÓÃRpcCallContext.sendFailureÀ´°ÑÒì³£·¢ËÍ»ØÈ¥¡£

¶Ôclient sideÀ´Ëµ£¬Í¨¹ýRpcEnv¿ÉÒÔ»ñÈ¡RpcEndpointÒýÓã¬Ò²¾ÍÊÇRpcEndpointRefµÄ¡£

RpcEnvÊǺ;ßÌåµÄµ×²ãͨÐÅÄ£¿é½»»¥µÄ¸ºÔðÈË£¬ËüµÄ°éÉú¶ÔÏó°üº¬´´½¨RpcEnvµÄ·½·¨£¬Ç©ÃûÈçÏ£º

def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)
}

RpcEnvµÄ´´½¨ÓÉRpcEnvFactory¸ºÔð£¬RpcEnvFactoryĿǰֻÓÐÒ»¸ö×ÓÀàÊÇNettyRpcEnvFactory£¬Ô­À´»¹ÓÐAkkaRpcEnvFactory¡£NettyRpcEnvFactory.create·½·¨Ò»µ©µ÷ÓþͻáÁ¢¼´ÔÚbindµÄaddressºÍportÉÏÆô¶¯server¡£

ËüÒÀÀµµÄRpcEnvConfig¾ÍÊÇÒ»¸ö°üº¬ÁËSparkConfÒÔ¼°Ò»Ð©²ÎÊý£¨kraps-rpcÖиüÃûΪRpcConf£©¡£RpcEnvµÄ²ÎÊý¶¼ÐèÒª´ÓRpcEnvConfigÖÐÄã¬×î»ù±¾µÄhostnameºÍport£¬»¹Óи߼¶Ð©µÄÁ¬½Ó³¬Ê±¡¢ÖØÊÔ´ÎÊý¡¢ReactorÏ̳߳شóСµÈµÈ¡£

ÏÂÃæ¿´¿´RpcEnv×î³£ÓõÄÁ½¸ö·½·¨£º

// ×¢²áendpoint£¬±ØÐëÖ¸¶¨Ãû³Æ£¬¿Í»§¶Ë·ÓɾͿ¿Õâ¸öÃû³ÆÀ´ÕÒendpoint
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

// Äõ½Ò»¸öendpointµÄÒýÓÃ
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef

NettyRpcEnvÓÉNettyRpcEnvFactory.create´´½¨£¬ÕâÊÇÕû¸öSpark coreºÍorg.apache.spark.spark-network-commonµÄÇÅÁº£¬ÄÚ²¿leverageµ×²ãÌṩµÄͨÐÅÄÜÁ¦£¬Í¬Ê±°ü×°ÁËÒ»¸öÀàActorµÄÓïÒå¡£ÉÏÃæÁ½¸öºËÐĵķ½·¨£¬setupEndpoint»áÔÚDispatcherÖÐ×¢²áEndpoint£¬setupEndpointRef»áÏÈÈ¥µ÷ÓÃRpcEndpointVerifier³¢ÊÔÑéÖ¤±¾µØ»òÕßÔ¶³ÌÊÇ·ñ´æÔÚij¸öendpoint£¬È»ºóÔÙ´´½¨RpcEndpointRef¡£¸ü¶à¹ØÓÚ·þÎñ¶Ë¡¢¿Í»§¶Ëµ÷ÓõÄϸ½Ú½«ÔÚʱÐòͼÖвûÊö£¬ÕâÀï²»ÔÙÕ¹¿ª¡£

DispatcherºÍInbox

NettyRpcEnvÖаüº¬Dispatcher£¬Ö÷ÒªÕë¶Ô·þÎñ¶Ë£¬°ïÖú·Óɵ½ÕýÈ·µÄRpcEndpoint£¬²¢ÇÒµ÷ÓÃÆäÒµÎñÂß¼­¡£

ÕâÀïÐèÒªÏȲûÊöÏÂReactorÄ£ÐÍ£¬Spark RPCµÄSocket I/OÒ»¸öµäÐ͵ÄReactorÄ£Ð͵쬵«ÊǽáºÏÁËActor patternÖеÄmailbox£¬¿ÉνÊÇÒ»ÖÖ»ìºÏµÄʵÏÖ·½Ê½¡£

ʹÓÃReactorÄ£ÐÍ£¬Óɵײãnetty´´½¨µÄEventLoop×öI/O¶à·¸´Óã¬ÕâÀïʹÓÃMultiple ReactorsÕâÖÖÐÎʽ£¬ÈçÏÂͼËùʾ£¬´ÓnettyµÄ½Ç¶È¶øÑÔ£¬Main ReactorºÍSub Reactor¶ÔÓ¦BossGroupºÍWorkerGroupµÄ¸ÅÄǰÕ߸ºÔð¼àÌýTCPÁ¬½Ó¡¢½¨Á¢ºÍ¶Ï¿ª£¬ºóÕ߸ºÔðÕæÕýµÄI/O¶Áд£¬¶øÍ¼ÖеÄThreadPool¾ÍÊǵÄDispatcherÖеÄÏ̳߳أ¬ËüÀ´½âñÀ´ºÄʱµÄÒµÎñÂß¼­ºÍI/O²Ù×÷£¬ÕâÑù¾Í¿ÉÒÔ¸üscalabe£¬Ö»ÐèÒªÉÙÊýµÄÏ߳̾ͿÉÒÔ´¦Àí³ÉǧÉÏÍòµÄÁ¬½Ó£¬ÕâÖÖ˼ÏëÊDZê×¼µÄ·ÖÖβßÂÔ£¬offload·ÇI/O²Ù×÷µ½ÁíÍâµÄÏ̳߳ء£

ÕæÕý´¦ÀíRpcEndpointµÄÒµÎñÂß¼­ÔÚThreadPoolÀïÃæ£¬Öм俿ReactorÏß³ÌÖеÄhandler´¦Àídecode³ÉRpcMessage£¬È»ºóͶµÝµ½InboxÖУ¬ËùÒÔcomputeµÄ¹ý³ÌÔÚÁíÍâµÄÏÂÃæ½éÉܵÄDispatcherÏ̳߳ØÀïÃæ×ö¡£

¸Õ¸Õ»¹Ìáµ½ÁËActor patternÖÐmailboxģʽ£¬Spark RPC×îÔçÆðÔ´ÓÚAkka£¬ËùÒÔ½ø»¯µ½ÏÖÔÚ£¬ÈÔÈ»ÁËʹÓÃÁËÕâ¸öģʽ¡£ÕâÀï¾Í½éÉÜInbox£¬Ã¿¸öEndpoint¶¼ÓÐÒ»¸öInbox£¬InboxÀïÃæÓÐÒ»¸öInboxMessageµÄÁ´±í£¬InboxMessageÓкܶà×ÓÀ࣬¿ÉÒÔÊÇÔ¶³Ìµ÷ÓùýÀ´µÄRpcMessage£¬¿ÉÒÔÊÇÔ¶³Ìµ÷ÓùýÀ´µÄfire-and-forgetµÄµ¥ÏòÏûÏ¢OneWayMessage£¬»¹¿ÉÒÔÊǸ÷ÖÖ·þÎñÆô¶¯£¬Á´Â·½¨Á¢¶Ï¿ªµÈMessage£¬ÕâЩMessage¶¼»áÔÚInboxÄÚ²¿µÄ·½·¨ÄÚ×öģʽƥÅ䣬µ÷ÓÃÏàÓ¦µÄRpcEndpointµÄº¯Êý£¨¶¼ÊÇÒ»Ò»¶ÔÓ¦µÄ£©¡£

DispatcherÖаüº¬Ò»¸öMessageLoop£¬Ëü¶ÁÈ¡LinkedBlockingQueueÖеÄͶµÝRpcMessage£¬¸ù¾Ý¿Í»§¶ËÖ¸¶¨µÄEndpoint±êʶ£¬ÕÒµ½EndpointµÄInbox£¬È»ºóͶµÝ½øÈ¥£¬ÓÉÓÚÊÇ×èÈû¶ÓÁУ¬µ±Ã»ÓÐÏûÏ¢µÄʱºò×ÔÈ»×èÈû£¬Ò»µ©ÓÐÏûÏ¢£¬¾Í¿ªÊ¼¹¤×÷¡£DispatcherµÄThreadPool¸ºÔðÏû·ÑÕâЩMessage¡£

DispatcherµÄThreadPoolËüʹÓòÎÊýspark.rpc.netty.dispatcher.numThreadsÀ´¿ØÖÆÊýÁ¿£¬Èç¹ûkill -3 ÿ¸öSpark driver»òÕßexecutor½ø³Ì£¬¶¼»á¿´µ½N¸ödispatcherỊ̈߳º

"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=31 tid=0x00007f8877153800 nid=0x7103 waiting on condition [0x000000011f78b000]

ÄÇôÁíÍâµÄÎÊÌâÊÇË­»áµ÷ÓÃDispatcher·Ö·¢MessageµÄ·½·¨ÄØ£¿´ð°¸ÊÇRpcHandlerµÄ×ÓÀàNettyRpcHandler£¬Õâ¾ÍÊÇReactorÖеÄÏß³Ì×öµÄÊÂÇé¡£RpcHandlerÊǵײãorg.apache.spark.spark-network-commonÌṩµÄhandler£¬µ±Ô¶³ÌµÄÊý¾Ý°ü½âÎö³É¹¦ºó£¬»áµ÷ÓÃÕâ¸öhandler×ö´¦Àí¡£

ÕâÑù¾ÍÍê³ÉÁËÒ»¸öÍêÈ«Òì²½µÄÁ÷³Ì£¬Network IOͨÐÅÓɵײ㸺Ôð£¬È»ºóÓÉDispatcher·Ö·¢£¬Ö»ÒªDispatcherÖеÄInboxMessageµÄÁ´±í×ã¹»´ó£¬ÄÇô¾Í¿ÉÒÔÈÃDispatcherÖеÄThreadPoolÂýÂýÏû»¯ÏûÏ¢£¬ºÍµ×²ãµÄIO½âñÀ´£¬ÍêÈ«ÔÚ¶ÀÁ¢µÄÏß³ÌÖÐÍê³É£¬Ò»µ©Íê³ÉEndpointÄÚ²¿ÒµÎñÂß¼­£¬ÀûÓÃRpcCallContext»Øµ÷À´×öÏûÏ¢µÄ·µ»Ø¡£

Outbox

NettyRpcEnvÖаüº¬Ò»¸öConcurrentHashMap[RpcAddress, Outbox]£¬Ã¿¸öÔ¶³ÌEndpoint¶¼¶ÔÓ¦Ò»¸öOutbox£¬ÕâºÍÉÏÃæInboxÒ£ÏàºôÓ¦£¬ÊÇÒ»¸ömailboxËÆµÄʵÏÖ·½Ê½¡£

ºÍInboxÀàËÆ£¬OutboxÄÚ²¿°üº¬Ò»¸öOutboxMessageµÄÁ´±í£¬OutboxMessageÓÐÁ½¸ö×ÓÀ࣬OneWayOutboxMessageºÍRpcOutboxMessage£¬·Ö±ð¶ÔÓ¦µ÷ÓÃRpcEndpointµÄreceiveºÍreceiveAndReply·½·¨¡£

NettyRpcEnvÖеÄsendºÍask·½·¨»áµ÷ÓÃÖ¸¶¨µØÖ·OutboxÖеÄsend·½·¨£¬µ±Ô¶³ÌÁ¬½Ó佨Á¢Ê±£¬»áÏȽ¨Á¢Á¬½Ó£¬È»ºóÈ¥Ïû»¯OutboxMessage¡£

ͬÑù£¬Ò»¸öÎÊÌâÊÇOutboxÖеÄsend·½·¨ÈçºÎ½«ÏûϢͨ¹ýNetwork IO·¢ËͳöÈ¥£¬Èç¹ûÊÇask·½·¨ÓÖÊÇÈçºÎ¶ÁȡԶ³ÌÏìÓ¦µÄÄØ£¿´ð°¸ÊÇsend·½·¨Í¨¹ýorg.apache.spark.spark-network-common´´½¨µÄTransportClient·¢ËͳöÈ¥ÏûÏ¢£¬ÓÉReactorÏ̸߳ºÔðÐòÁл¯²¢ÇÒ·¢ËͳöÈ¥£¬Ã¿¸öMessage¶¼»á·µ»ØÒ»¸öUUID£¬ÓɵײãÀ´Î¬»¤Ò»¸ö·¢ËͳöÈ¥ÏûÏ¢ÓëÆäCallbackµÄHashMap£¬µ±NettyÊÕµ½ÍêÕûµÄÔ¶³ÌRpcResponseʱºò£¬»Øµ÷ÏìÓ¦µÄCallback£¬×ö·´ÐòÁл¯£¬½ø¶ø»Øµ÷Spark coreÖеÄÒµÎñÂß¼­£¬×öPromise/FutureµÄdone£¬ÉϲãÍ˳ö×èÈû¡£

ÕâÒ²ÊÇÒ»¸öÒì²½µÄ¹ý³Ì£¬·¢ËÍÏûÏ¢µ½Outboxºó£¬Ö±½Ó·µ»Ø£¬Network IOͨÐÅÓɵײ㸺Ôð£¬Ò»µ©RPCµ÷Óóɹ¦»òÕßʧ°Ü£¬¶¼»á»Øµ÷ÉϲãµÄº¯Êý£¬×öÏàÓ¦µÄ´¦Àí¡£

spark-network-commonÖеÄÀà

ÕâÀïÔݲ»×ö¹ý¶àµÄÕ¹¿ª£¬¶¼ÊÇ»ùÓÚNettyµÄ·â×°£¬ÓÐÐËȤµÄ¶ÁÕß¿ÉÒÔ×ÔÐÐÔĶÁÔ´Â룬µ±È»»¹¿ÉÒԲο¼ÎÒ֮ǰ¿ªÔ´µÄNavi-pbrpc¿ò¼ÜµÄ´úÂ룬ÆäÔ­ÀíÊÇ»ù±¾ÏàͬµÄ¡£

2.2 ʱÐòͼ·ÖÎö

·þÎñÆô¶¯

»°²»¶àÊö£¬Ö±½ÓÉÏͼ¡£

·þÎñ¶ËÏìÓ¦

µÚÒ»½×¶Î£¬IO½ÓÊÕ¡£TransportRequestHandlerÊÇnettyµÄ»Øµ÷handler£¬Ëü»á¸ù¾Ýwire format£¨ÏÂÎÄ»á½éÉÜ£©½âÎöºÃÒ»¸öÍêÕûµÄÊý¾Ý°ü£¬½»¸øNettyRpcEnv×ö·´ÐòÁл¯£¬Èç¹ûÊÇRPCµ÷ÓûṹÔìRpcMessage£¬È»ºó»Øµ÷RpcHandlerµÄ·½·¨´¦ÀíRpcMessage£¬ÄÚ²¿»áµ÷ÓÃDispatcher×öRpcMessageµÄͶµÝ£¬·Åµ½InboxÖУ¬µ½´Ë½áÊø¡£

µÚ¶þ½×¶Î£¬IOÏìÓ¦¡£MessageLoop»ñÈ¡´ø´¦ÀíµÄRpcMessage£¬½»¸øDispatcherÖеÄThreadPool×ö´¦Àí£¬Êµ¼Ê¾ÍÊǵ÷ÓÃRpcEndpointµÄÒµÎñÂß¼­£¬Í¨¹ýRpcCallContext½«ÏûÏ¢ÐòÁл¯£¬Í¨¹ý»Øµ÷º¯Êý£¬¸æËßTransportRequestHandlerÕâÓÐÒ»¸öÏûÏ¢´¦ÀíÍê±Ï£¬ÏìÓ¦»ØÈ¥¡£

ÕâÀïÇëÖØµãÌå»áÒì²½´¦Àí´øÀ´µÄ±ãÀû£¬Ê¹ÓÃReactorºÍActor mailboxµÄ½áºÏµÄģʽ£¬½âñîÁËÏûÏ¢µÄ»ñÈ¡ÒÔ¼°´¦ÀíÂß¼­¡£

¿Í»§¶ËÇëÇó

¿Í»§¶ËÒ»°ãÐèÒªÏȽ¨Á¢RpcEnv£¬È»ºó»ñÈ¡RpcEndpointRef¡£

µÚÒ»½×¶Î£¬IO·¢ËÍ¡£ÀûÓÃRpcEndpointRef×ösend»òÕßask¶¯×÷£¬ÕâÀïÒÔsendΪÀý£¬send»áÏȽøÐÐÏûÏ¢µÄÐòÁл¯£¬È»ºóͶµÝµ½Ö¸¶¨µØÖ·µÄOutboxÖУ¬OutboxÈç¹û·¢ÏÖÁ¬½Ó佨Á¢ÔòÏȳ¢ÊÔ½¨Á¢Á¬½Ó£¬È»ºóµ÷ÓõײãµÄTransportClient·¢ËÍÊý¾Ý£¬Ö±½Óͨ¹ý¸ÃnettyµÄAPIÍê³É£¬Íê³Éºó¼´¿É·µ»Ø£¬ÕâÀï·µ»ØÁËUUID×÷ΪÏûÏ¢µÄ±êʶ£¬ÓÃÓÚÏÂÒ»¸ö½×¶ÎµÄ»Øµ÷£¬Ê¹ÓõĽǶÈÀ´Ëµ¿ÉÒÔ·µ»ØÒ»¸öFuture£¬¿Í»§¶Ë¿ÉÒÔ×èÈû»òÕß¼ÌÐø×öÆäËû²Ù×÷¡£

µÚ¶þ£¬IO½ÓÊÕ¡£TransportResponseHandler½ÓÊÕµ½Ô¶³ÌµÄÏìÓ¦ºó£¬»áÏÈ×ö·´ÐòÁкţ¬È»ºó»Øµ÷µÚÒ»½×¶ÎµÄFuture£¬Íê³Éµ÷Óã¬Õâ¸ö¹ý³ÌÈ«²¿ÔÚReactorÏß³ÌÖÐÍê³ÉµÄ£¬Í¨¹ýFuture×öÏ̼߳äµÄ֪ͨ¡£

Èý¡¢Spark RPCºËÐļ¼Êõ×ܽá

Spark RPC×÷ΪRPC´«Êä²ãÑ¡ÔñTCPЭÒ飬×ö¿É¿¿µÄ¡¢È«Ë«¹¤µÄbinary streamͨµÀ¡£

×öÒ»¸ö¸ßÐÔÄÜ/scalableµÄRPC£¬ÐèÒªÄܹ»Âú×ãµÚÒ»£¬·þÎñ¶Ë¾¡¿ÉÄܶàµÄ´¦Àí²¢·¢ÇëÇ󣬵ڶþ£¬Í¬Ê±¾¡¿ÉÄ̵ܶĴ¦ÀíÍê±Ï¡£CPUºÍI/O֮ǰÌìÈ»´æÔÚ×ŲîÒì£¬ÍøÂç´«ÊäµÄÑÓʱ²»¿É¿Ø£¬CPU×ÊÔ´±¦¹ó£¬ÏµÍ³½ø³Ì/Ïß³Ì×ÊÔ´±¦¹ó£¬ÎªÁ˾¡¿ÉÄܱÜÃâSocket I/O×èÈû·þÎñ¶ËºÍ¿Í»§¶Ëµ÷Óã¬ÓÐһЩģʽ£¨pattern£©ÊÇ¿ÉÒÔÓ¦Óõġ£Spark RPCµÄI/O ModelÓÉÓÚ²ÉÓÃÁËNetty£¬Òò´ËʹÓõĵײãµÄI/O¶à·¸´Óã¨I/O Multiplexing£©»úÖÆ£¬ÕâÀï¿ÉÒÔͨ¹ýspark.rpc.io.mode²ÎÊýÉèÖ㬲»Í¬µÄƽ̨ʹÓõļ¼Êõ²»Í¬£¬ÀýÈçlinuxʹÓÃepoll¡£

Ïß³ÌÄ£ÐͲÉÓÃMulti-Reactors + mailboxµÄÒì²½·½Ê½À´´¦Àí£¬ÔÚÉÏÎÄÖÐÒѾ­½éÉܹý¡£

Schema DeclarationºÍÐòÁл¯·½Ã棬Spark RPCĬÈϲÉÓÃJava native serialization·½°¸£¬Ö÷Òª´Ó¼æÈÝÐÔºÍJVMƽ̨ÄÚ²¿×é¼þͨÐÅ£¬ÒÔ¼°scalaÓïÑÔµÄÈںϿ¼ÂÇ£¬ËùÒÔ²»¾ß±¸¿çÓïÑÔͨÐŵÄÄÜÁ¦£¬ÐÔÄÜÉÏÒ²²»ÊÇ×·Çó¼«Ö£¬Ä¿Ç°»¹Ã»ÓÐʹÓÃKyroµÈ¸üºÃÐòÁл¯ÐÔÄܺÍÊý¾Ý´óСµÄ·½°¸¡£

ЭÒé½á¹¹£¬Spark RPC²ÉÓÃ˽ÓеÄwire formatÈçÏ£¬²ÉÓÃheadr+payloadµÄ×éÖ¯·½Ê½£¬headerÖаüÀ¨Õû¸öframeµÄ³¤¶È£¬messageµÄÀàÐÍ£¬ÇëÇóUUID¡£Îª½â¾öTCPÕ³°üºÍ°ë°üÎÊÌ⣬ÒÔ¼°×éÖ¯³ÉÍêÕûµÄMessageµÄÂß¼­¶¼ÔÚorg.apache.spark.network.protocol.MessageEncoderÖС£

ʹÓÃwireshake¾ßÌå·ÖÎöһϡ£

Ê×ÏÈ¿´Ò»¸öRPCÇëÇ󣬾ÍÊǵ÷ÓõÚÒ»ÕÂ˵µÄHelloEndpoint£¬¿Í»§¶Ëµ÷Ó÷ÖÁ½¸öTCP Segment´«Ê䣬ÕâÊÇÒòΪSparkʹÓÃnettyµÄʱºòheaderºÍbody·Ö±ðwriteAndFlush³öÈ¥¡£

ÏÂͼÊǵÚÒ»¸öTCP segment£º

Àý×ÓÖÐÀ¶É«µÄ²¿·ÖÊÇheader£¬Í·ÖеÄ×Ö½Ú½âÎöÈçÏ£º

00 00 00 00 00 00 05 d2 // Ê®½øÖÆ1490£¬ÊÇÕû¸öframeµÄ³¤¶È

03Ò»¸ö×Ö½Ú±íʾµÄÊÇRpcRequest£¬Ã¶¾Ù¶¨ÒåÈçÏ£º

RpcRequest(3)
RpcResponse(4)
RpcFailure(5)
StreamRequest(6)
StreamResponse(7)
StreamFailure(8),
OneWayMessage(9)
User(-1)

ÿ¸ö×Ö½ÚµÄÒâÒåÈçÏ£º

4b ac a6 9f 83 5d 17 a9 // 8¸ö×Ö½ÚÊÇUUID
05 bd // Ê®½øÖÆ1469£¬payload³¤¶È

¾ßÌåµÄPayload¾Í³¤ÏÂÃæÕâ¸öÑù×Ó£¬¿ÉÒÔ¿´³öʹÓÃJava native serialization£¬Ò»¸ö¼òµ¥µÄEchoÇëÇó¾ÍÓÐ1469¸ö×Ö½Ú£¬»¹ÊǺܴóµÄ£¬ÐòÁл¯µÄЧÂʲ»¸ß¡£µ«ÊÇSpark RPC¶¨Î»ÄÚ²¿Í¨ÐÅ£¬²»ÊÇÒ»¸öͨÓõÄRPC¿ò¼Ü£¬²¢ÇÒʹÓõÄÁ¿·Ç³£Ð¡£¬ËùÒÔÕâµãÏûºÄÒ²¾Í¿ÉÒÔºöÂÔÁË£¬»¹ÓÐSpark Structured StreamingʹÓøÃÐòÁл¯·½Ê½£¬ÆäÐÔÄÜ»¹ÊÇ¿ÉÒÔÂú×ãÒªÇóµÄ¡£

ÁíÍ⣬×÷ÕßÔÚkraps-rpcÖл¹¸øSpark-rpc×öÁËÒ»´ÎÐÔÄܲâÊÔ£¬¾ßÌå¿ÉÒԲο¼github¡£

×ܽá

×÷Õß´ÓºÃÆæµÄ½Ç¶ÈÀ´Éî¶ÈÍÚ¾òÁËÏÂSpark RPCµÄÄÚÄ»£¬²¢ÇÒ´Ó2.1°æ±¾µÄSpark coreÖжÀÁ¢³öÁËÒ»¸öרÃŵÄÏîÄ¿Kraps-rpc£¬·Åµ½ÁËgithubÒÔ¼°·¢²¼µ½MavenÖÐÑë²Ö¿â×öѧϰʹÓã¬ÌṩÁ˱ȽϺõÄÉÏÊÖÎĵµ¡¢²ÎÊýÉèÖúÍÐÔÄÜÆÀ¹À£¬ÔÚÕûºÏkraps-rpc»¹·¢ÏÖÁËÒ»¸öСµÄ¸Ä½øµã£¬¸øSparkÌáÁËÒ»¸öPR¡ª¡ª[SPARK-21701]£¬ÒѾ­±»mergeµ½ÁËÖ÷¸É£¬ËãÊÇcontributeÉçÇøÁË£¨10086¸ö¿ªÐÄ£©¡£

½Ó×ÅÉîÈëÆÊÎöÁËSpark RPCÄ£¿éÄÚµÄÀà×éÖ¯¹ØÏµ£¬Ê¹ÓÃUMLÀàͼºÍʱÐòͼ°ïÖú¶ÁÕ߸üºÃµÄÀí½âһЩºËÐĵĸÅÄ°üÀ¨RpcEnv£¬RpcEndpoint£¬RpcEndpointRefµÈ£¬ÒÔ¼°I/OµÄÉè¼ÆÄ£Ê½£¬°üÀ¨I/O¶à·¸´Óã¬ReactorºÍActor mailboxµÈ£¬ÕâÀﻹÊÇÖØµãÌáÏÂSpark RPCµÄÉè¼ÆÕÜѧ£¬ÀûÓÃnettyÇ¿´óµÄSocket I/OÄÜÁ¦£¬¹¹½¨Ò»¸öÒì²½µÄͨÐÅ¿ò¼Ü¡£×îºó£¬´ÓTCP²ãµÄsegment¶þ½øÖƽǶȷÖÎöÁËwire protocol¡£

   
3828 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ