Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Hadoopѧϰ±Ê¼Ç¡ª3.Hadoop RPC»úÖÆµÄʹÓÃ
 
  1734  次浏览      27
 2019-1-4
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚ¸öÈ˲©¿Í£¬±¾ÎÄÖ÷Ҫͨ¹ýÓ¦ÓÃʵÀý½éÉÜRPC»úÖÆÔËÈçºÎʹÓÃÒÔ¼°ÈçºÎÔÚRPC»ù´¡Ö®ÉÏÔËÐÐHadoop£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£

Ò»¡¢RPC»ù´¡¸ÅÄî

1.1 RPCµÄ»ù´¡¸ÅÄî

RPC£¬¼´Remote Procdure Call£¬ÖÐÎÄÃû£ºÔ¶³Ì¹ý³Ìµ÷Óã»

£¨1£©ËüÔÊÐíһ̨¼ÆËã»ú³ÌÐòÔ¶³Ìµ÷ÓÃÁíÍâһ̨¼ÆËã»úµÄ×Ó³ÌÐò£¬¶ø²»ÓÃÈ¥¹ØÐĵײãµÄÍøÂçͨÐÅϸ½Ú£¬¶ÔÎÒÃÇÀ´ËµÊÇ͸Ã÷µÄ¡£Òò´Ë£¬Ëü¾­³£ÓÃÓÚ·Ö²¼Ê½ÍøÂçͨÐÅÖС£

RPCЭÒé¼Ù¶¨Ä³Ð©´«ÊäЭÒéµÄ´æÔÚ£¬ÈçTCP»òUDP£¬ÎªÍ¨ÐųÌÐòÖ®¼äЯ´øÐÅÏ¢Êý¾Ý¡£ÔÚOSIÍøÂçͨÐÅÄ£ÐÍÖУ¬RPC¿çÔ½ÁË´«Êä²ãºÍÓ¦Óò㡣RPCʹµÃ¿ª·¢°üÀ¨ÍøÂç·Ö²¼Ê½¶à³ÌÐòÔÚÄÚµÄÓ¦ÓóÌÐò¸ü¼ÓÈÝÒס£

£¨2£©HadoopµÄ½ø³Ì¼ä½»»¥¶¼ÊÇͨ¹ýRPCÀ´½øÐе쬱ÈÈçNamenodeÓëDatanodeÖ±½Ó£¬JobtrackerÓëTasktrackerÖ®¼äµÈ¡£

Òò´Ë£¬¿ÉÒÔ˵£ºHadoopµÄÔËÐоÍÊǽ¨Á¢ÔÚRPC»ù´¡Ö®Éϵġ£

1.2 RPCµÄÏÔÖøÌØµã

£¨1£©Í¸Ã÷ÐÔ£ºÔ¶³Ìµ÷ÓÃÆäËû»úÆ÷ÉϵijÌÐò£¬¶ÔÓû§À´Ëµ¾ÍÏñÊǵ÷Óñ¾µØ·½·¨Ò»Ñù£»

£¨2£©¸ßÐÔÄÜ£ºRPC ServerÄܹ»²¢·¢´¦Àí¶à¸öÀ´×ÔClientµÄÇëÇó£»

£¨3£©¿É¿ØÐÔ£ºjdkÖÐÒѾ­ÌṩÁËÒ»¸öRPC¿ò¼Ü¡ªRMI£¬µ«ÊǸÃPRC¿ò¼Ü¹ýÓÚÖØÁ¿¼¶²¢ÇÒ¿É¿ØÖ®´¦±È½ÏÉÙ£¬ËùÒÔHadoop RPCʵÏÖÁË×Ô¶¨ÒåµÄPRC¿ò¼Ü¡£

1.3 RPCµÄ»ù±¾Á÷³Ì

£¨1£©RPC²ÉÓÃÁËC/SµÄģʽ£»

£¨2£©Client¶Ë·¢ËÍÒ»¸ö´øÓвÎÊýµÄÇëÇóÐÅÏ¢µ½Server£»

£¨3£©Server½ÓÊÕµ½Õâ¸öÇëÇóÒԺ󣬸ù¾Ý·¢Ë͹ýÀ´µÄ²ÎÊýµ÷ÓÃÏàÓ¦µÄ³ÌÐò£¬È»ºó°Ñ×Ô¼º¼ÆËãºÃµÄ½á¹û·¢Ë͸øClient¶Ë£»

£¨4£©Client¶Ë½ÓÊÕµ½½á¹ûºó¼ÌÐøÔËÐУ»

1.4 HadoopÖеÄRPC»úÖÆ

ͬÆäËûRPC¿ò¼ÜÒ»Ñù£¬Hadoop RPC·ÖΪËĸö²¿·Ö£º

£¨1£©ÐòÁл¯²ã£ºClentÓëServer¶ËͨÐÅ´«µÝµÄÐÅÏ¢²ÉÓÃÁËHadoopÀïÌṩµÄÐòÁл¯Àà»ò×Ô¶¨ÒåµÄWritableÀàÐÍ£»

£¨2£©º¯Êýµ÷Óò㣺Hadoop RPCͨ¹ý¶¯Ì¬´úÀíÒÔ¼°java·´ÉäʵÏÖº¯Êýµ÷Óã»

£¨3£©ÍøÂç´«Êä²ã£ºHadoop RPC²ÉÓÃÁË»ùÓÚTCP/IPµÄsocket»úÖÆ£»

£¨4£©·þÎñÆ÷¶Ë¿ò¼Ü²ã£ºRPC ServerÀûÓÃjava NIOÒÔ¼°²ÉÓÃÁËʼþÇý¶¯µÄI/OÄ£ÐÍ£¬Ìá¸ßRPC ServerµÄ²¢·¢´¦ÀíÄÜÁ¦£»

Hadoop RPCÔÚÕû¸öHadoopÖÐÓ¦Ó÷dz£¹ã·º£¬Client¡¢DataNode¡¢NameNodeÖ®¼äµÄͨѶȫ¿¿ËüÁË¡£ÀýÈ磺ÎÒÃÇÆ½Ê±²Ù×÷HDFSµÄʱºò£¬Ê¹ÓõÄÊÇFileSystemÀ࣬ËüµÄÄÚ²¿ÓиöDFSClient¶ÔÏó£¬Õâ¸ö¶ÔÏó¸ºÔðÓëNameNode´ò½»µÀ¡£ÔÚÔËÐÐʱ£¬DFSClientÔÚ±¾µØ´´½¨Ò»¸öNameNodeµÄ´úÀí£¬È»ºó¾Í²Ù×÷Õâ¸ö´úÀí£¬Õâ¸ö´úÀí¾Í»áͨ¹ýÍøÂ磬Զ³Ìµ÷Óõ½NameNodeµÄ·½·¨£¬Ò²ÄÜ·µ»ØÖµ¡£

1.5 Hadoop RPCÉè¼Æ¼¼Êõ

£¨1£©¶¯Ì¬´úÀí

About£º¶¯Ì¬´úÀí¿ÉÒÔÌṩ¶ÔÁíÒ»¸ö¶ÔÏóµÄ·ÃÎÊ£¬Í¬Ê±Òþ²ØÊµ¼Ê¶ÔÏóµÄ¾ßÌåÊÂʵ£¬´úÀí¶ÔÏó¶Ô¿Í»§Òþ²ØÁËʵ¼Ê¶ÔÏó¡£Ä¿Ç°Java¿ª·¢°üÖÐÌṩÁ˶Զ¯Ì¬´úÀíµÄÖ§³Ö£¬µ«ÏÖÔÚÖ»Ö§³Ö¶Ô½Ó¿ÚµÄʵÏÖ¡£

£¨2£©·´É䡪¡ª¶¯Ì¬¼ÓÔØÀà

£¨3£©ÐòÁл¯

£¨4£©·Ç×èÈûµÄÒì²½IO£¨NIO£©

Java NIOÔ­ÀíÇë²Î¿¼ÔĶÁ£ºhttp://weixiaolu.iteye.com/blog/1479656

¶þ¡¢ÈçºÎʹÓÃRPC

2.1 Hadoop RPC¶ÔÍâÌṩµÄ½Ó¿Ú

Hadoop RPC¶ÔÍâÖ÷ÒªÌṩÁËÁ½ÖÖ½Ó¿Ú£¨¼ûÀàorg.apache.hadoop.ipc.RPC£©£¬·Ö±ðÊÇ£º

£¨1£©public static <T> ProtocolProxy <T> getProxy/waitForProxy(¡­)

¹¹ÔìÒ»¸ö¿Í»§¶Ë´úÀí¶ÔÏ󣨸öÔÏóʵÏÖÁËij¸öЭÒ飩£¬ÓÃÓÚÏò·þÎñÆ÷·¢ËÍRPCÇëÇó¡£

£¨2£©public static Server RPC.Builder (Configuration).build()

Ϊij¸öЭÒ飨ʵ¼ÊÉÏÊÇJava½Ó¿Ú£©ÊµÀý¹¹ÔìÒ»¸ö·þÎñÆ÷¶ÔÏó£¬ÓÃÓÚ´¦Àí¿Í»§¶Ë·¢Ë͵ÄÇëÇó¡£

2.2 ʹÓÃHadoop RPCµÄËÄ´ó²½´Õ

£¨1£©¶¨ÒåRPCЭÒé

RPCЭÒéÊǿͻ§¶ËºÍ·þÎñÆ÷¶ËÖ®¼äµÄͨÐŽӿڣ¬Ëü¶¨ÒåÁË·þÎñÆ÷¶Ë¶ÔÍâÌṩµÄ·þÎñ½Ó¿Ú¡£

£¨2£©ÊµÏÖRPCЭÒé

Hadoop RPCЭÒéͨ³£ÊÇÒ»¸öJava½Ó¿Ú£¬Óû§ÐèҪʵÏָýӿڡ£

£¨3£©¹¹ÔìºÍÆô¶¯RPC SERVER

Ö±½ÓʹÓþ²Ì¬ÀàBuilder¹¹ÔìÒ»¸öRPC Server£¬²¢µ÷Óú¯Êýstart()Æô¶¯¸ÃServer¡£

£¨4£©¹¹ÔìRPC Client²¢·¢ËÍÇëÇó

ʹÓþ²Ì¬·½·¨getProxy¹¹Ôì¿Í»§¶Ë´úÀí¶ÔÏó£¬Ö±½Óͨ¹ý´úÀí¶ÔÏóµ÷ÓÃÔ¶³Ì¶ËµÄ·½·¨¡£

Èý¡¢RPCÓ¦ÓÃʵÀý

3.1 ¶¨ÒåRPCЭÒé

ÈçÏÂËùʾ£¬ÎÒÃǶ¨ÒåÒ»¸öIProxyProtocol ͨÐŽӿڣ¬ÉùÃ÷ÁËÒ»¸öAdd()·½·¨¡£

public interface IProxyProtocol extends VersionedProtocol {
static final long VERSION = 23234L; //°æ±¾ºÅ£¬Ä¬ÈÏÇé¿öÏ£¬²»Í¬°æ±¾ºÅµÄRPC ClientºÍServerÖ®¼ä²»ÄÜÏ໥ͨÐÅ
int Add(int number1,int number2);
}

ÐèҪעÒâµÄÊÇ£º

£¨1£©HadoopÖÐËùÓÐ×Ô¶¨ÒåRPC½Ó¿Ú¶¼ÐèÒª¼Ì³ÐVersionedProtocol½Ó¿Ú£¬ËüÃèÊöÁËЭÒéµÄ°æ±¾ÐÅÏ¢¡£

£¨2£©Ä¬ÈÏÇé¿öÏ£¬²»Í¬°æ±¾ºÅµÄRPC ClientºÍServerÖ®¼ä²»ÄÜÏ໥ͨÐÅ£¬Òò´Ë¿Í»§¶ËºÍ·þÎñ¶Ëͨ¹ý°æ±¾ºÅ±êʶ¡£

3.2 ʵÏÖRPCЭÒé

Hadoop RPCЭÒéͨ³£ÊÇÒ»¸öJava½Ó¿Ú£¬Óû§ÐèҪʵÏָýӿڡ£¶ÔIProxyProtocol½Ó¿Ú½øÐмòµ¥µÄʵÏÖÈçÏÂËùʾ£º

public class MyProxy implements IProxyProtocol {
public int Add(int number1,int number2) {
System.out.println("ÎÒ±»µ÷ÓÃÁË!");
int result = number1+number2;
return result;
}

public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
System.out.println("MyProxy.ProtocolVersion=" + IProxyProtocol.VERSION);
// ×¢Ò⣺ÕâÀï·µ»ØµÄ°æ±¾ºÅÓë¿Í»§¶ËÌṩµÄ°æ±¾ºÅÐè±£³ÖÒ»ÖÂ
return IProxyProtocol.VERSION;
}
}

ÕâÀïʵÏÖµÄAdd·½·¨ºÜ¼òµ¥£¬¾ÍÊÇÒ»¸ö¼Ó·¨²Ù×÷¡£ÎªÁ˲鿴Ч¹û£¬ÕâÀïͨ¹ý¿ØÖÆÌ¨Êä³öÒ»¾ä£º¡°ÎÒ±»µ÷ÓÃÁË£¡¡±

3.3 ¹¹ÔìRPC Server²¢Æô¶¯·þÎñ

ÕâÀïͨ¹ýRPCµÄ¾²Ì¬·½·¨getServerÀ´»ñµÃServer¶ÔÏó£¬ÈçÏ´úÂëËùʾ£º

public class MyServer {
public static int PORT = 5432;
public static String IPAddress = "127.0.0.1";

public static void main(String[] args) throws Exception {
MyProxy proxy = new MyProxy();
final Server server = RPC.getServer(proxy, IPAddress, PORT, new Configuration());
server.start();
}
}

Õâ¶Î´úÂëµÄºËÐÄÔÚÓÚµÚ5ÐеÄRPC.getServer·½·¨£¬¸Ã·½·¨ÓÐËĸö²ÎÊý£¬µÚÒ»¸ö²ÎÊýÊDZ»µ÷ÓõÄjava¶ÔÏ󣬵ڶþ¸ö²ÎÊýÊÇ·þÎñÆ÷µÄµØÖ·£¬µÚÈý¸ö²ÎÊýÊÇ·þÎñÆ÷µÄ¶Ë¿Ú ¡£»ñµÃ·þÎñÆ÷¶ÔÏóºó£¬Æô¶¯·þÎñÆ÷¡£ÕâÑù£¬·þÎñÆ÷¾ÍÔÚÖ¸¶¨¶Ë¿Ú¼àÌý¿Í»§¶ËµÄÇëÇó¡£µ½´ËΪֹ£¬·þÎñÆ÷¾Í´¦ÓÚ¼àÌý״̬£¬²»Í£µØµÈ´ý¿Í»§¶ËÇëÇóµ½´ï¡£

3.4 ¹¹ÔìRPC Client²¢·¢³öÇëÇó

ÕâÀïʹÓþ²Ì¬·½·¨getProxy»òwaitForProxy¹¹Ôì¿Í»§¶Ë´úÀí¶ÔÏó£¬Ö±½Óͨ¹ý´úÀí¶ÔÏóµ÷ÓÃÔ¶³Ì¶ËµÄ·½·¨£¬¾ßÌåÈçÏÂËùʾ£º

public class MyClient {

public static void main(String[] args) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(
MyServer.IPAddress, MyServer.PORT);

try {
// ×¢Ò⣺ÕâÀï´«ÈëµÄ°æ±¾ºÅÐèÒªÓë´úÀí±£³ÖÒ»ÖÂ
IProxyProtocol proxy = (IProxyProtocol) RPC.waitForProxy(
IProxyProtocol.class, IProxyProtocol.VERSION, inetSocketAddress,
new Configuration());
int result = proxy.Add(10, 25);
System.out.println("10+25=" + result);

RPC.stopProxy(proxy);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

ÒÔÉÏ´úÂëÖкËÐÄÔÚÓÚRPC.waitForProxy()£¬¸Ã·½·¨ÓÐËĸö²ÎÊý£¬µÚÒ»¸ö²ÎÊýÊDZ»µ÷ÓõĽӿÚÀ࣬µÚ¶þ¸öÊǿͻ§¶Ë°æ±¾ºÅ£¬µÚÈý¸öÊÇ·þÎñ¶ËµØÖ·¡£·µ»ØµÄ´úÀí¶ÔÏ󣬾ÍÊÇ·þÎñ¶Ë¶ÔÏóµÄ´úÀí£¬ÄÚ²¿¾ÍÊÇʹÓÃjava.lang.ProxyʵÏֵġ£

¾­¹ýÒÔÉÏËIJ½£¬ÎÒÃDZãÀûÓÃHadoop RPC´î½¨ÁËÒ»¸ö·Ç³£¸ßЧµÄ¿Í»§»ú¨C·þÎñÆ÷ÍøÂçÄ£ÐÍ¡£

3.5 ²é¿´ÔËÐнá¹û

£¨1£©Æô¶¯·þÎñ¶Ë£¬¿ªÊ¼¼àÌý¿Í»§¶ËÇëÇó

£¨2£©Æô¶¯¿Í»§¶Ë£¬¿ªÊ¼Ïò·þÎñ¶Ë·¢ÇëÇó

£¨3£©²é¿´·þÎñ¶Ë״̬£¬ÊÇ·ñ±»µ÷ÓÃ

SUMMARY£º´ÓÉÏÃæµÄRPCµ÷ÓÃÖУ¬¿ÉÒÔ¿´³ö£ºÔÚ¿Í»§¶Ëµ÷ÓõÄÒµÎñÀàµÄ·½·¨ÊǶ¨ÒåÔÚÒµÎñÀàµÄ½Ó¿ÚÖеġ£¸Ã½Ó¿ÚʵÏÖÁËVersionedProtocal½Ó¿Ú¡£

£¨4£©ÏÖÔÚÎÒÃÇÔÚÃüÁîÐÐÖ´ÐÐjpsÃüÁ²é¿´Êä³öÐÅÏ¢£¬»á³öÏÖÈçÏÂͼËùʾµÄ£º

´ÓÉÏͼÖпÉÒÔ¿´µ½Ò»¸öjava½ø³Ì£¬ÊÇ¡°MyServer¡±£¬¸Ã½ø³ÌÕýÊÇÎÒÃǸոÕÔËÐеÄRPCµÄ·þÎñ¶ËÀàMyServer¡£Òò´Ë£¬´ó¼Ò¿ÉÒÔÁªÏëµ½ÎÒÃǴHadoop»·¾³Ê±£¬Ò²Ö´Ðйý¸ÃÃüÁîÓÃÀ´ÅжÏHadoopµÄÏà¹Ø½ø³ÌÊÇ·ñÈ«²¿Æô¶¯¡£

SUMMARY£ºÄÇô¿ÉÒÔÅжϣ¬HadoopÆô¶¯Ê±²úÉúµÄ5¸öjava½ø³ÌÒ²Ó¦¸ÃÊÇRPCµÄ·þÎñ¶Ë¡£¡¡¡¡

ÏÂÃæÎÒÃǹ۲ìNameNodeµÄÔ´´úÂ룬ÈçÏÂͼËùʾ£¬¿ÉÒÔ¿´µ½NameNodeȷʵ´´½¨ÁËRPCµÄ·þÎñ¶Ë¡£

private void initialize(Configuration conf) throws IOException {
......
// create rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
......
}

   
1734 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ