±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚ¸öÈ˲©¿Í£¬±¾ÎÄÖ÷Ҫͨ¹ýÓ¦ÓÃʵÀý½éÉÜRPC»úÖÆÔËÈçºÎʹÓÃÒÔ¼°ÈçºÎÔÚRPC»ù´¡Ö®ÉÏÔËÐÐHadoop£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£ |
|
Ò»¡¢RPC»ù´¡¸ÅÄî 1.1 RPCµÄ»ù´¡¸ÅÄî
RPC£¬¼´Remote Procdure Call£¬ÖÐÎÄÃû£ºÔ¶³Ì¹ý³Ìµ÷Óã»
£¨1£©ËüÔÊÐíһ̨¼ÆËã»ú³ÌÐòÔ¶³Ìµ÷ÓÃÁíÍâһ̨¼ÆËã»úµÄ×Ó³ÌÐò£¬¶ø²»ÓÃÈ¥¹ØÐĵײãµÄÍøÂçͨÐÅϸ½Ú£¬¶ÔÎÒÃÇÀ´ËµÊÇ͸Ã÷µÄ¡£Òò´Ë£¬Ëü¾³£ÓÃÓÚ·Ö²¼Ê½ÍøÂçͨÐÅÖС£
RPCÐÒé¼Ù¶¨Ä³Ð©´«ÊäÐÒéµÄ´æÔÚ£¬ÈçTCP»òUDP£¬ÎªÍ¨ÐųÌÐòÖ®¼äЯ´øÐÅÏ¢Êý¾Ý¡£ÔÚOSIÍøÂçͨÐÅÄ£ÐÍÖУ¬RPC¿çÔ½ÁË´«Êä²ãºÍÓ¦Óò㡣RPCʹµÃ¿ª·¢°üÀ¨ÍøÂç·Ö²¼Ê½¶à³ÌÐòÔÚÄÚµÄÓ¦ÓóÌÐò¸ü¼ÓÈÝÒס£
£¨2£©HadoopµÄ½ø³Ì¼ä½»»¥¶¼ÊÇͨ¹ýRPCÀ´½øÐе쬱ÈÈçNamenodeÓëDatanodeÖ±½Ó£¬JobtrackerÓëTasktrackerÖ®¼äµÈ¡£

Òò´Ë£¬¿ÉÒÔ˵£ºHadoopµÄÔËÐоÍÊǽ¨Á¢ÔÚRPC»ù´¡Ö®Éϵġ£
1.2 RPCµÄÏÔÖøÌØµã £¨1£©Í¸Ã÷ÐÔ£ºÔ¶³Ìµ÷ÓÃÆäËû»úÆ÷ÉϵijÌÐò£¬¶ÔÓû§À´Ëµ¾ÍÏñÊǵ÷Óñ¾µØ·½·¨Ò»Ñù£»
£¨2£©¸ßÐÔÄÜ£ºRPC ServerÄܹ»²¢·¢´¦Àí¶à¸öÀ´×ÔClientµÄÇëÇó£»
£¨3£©¿É¿ØÐÔ£ºjdkÖÐÒѾÌṩÁËÒ»¸öRPC¿ò¼Ü¡ªRMI£¬µ«ÊǸÃPRC¿ò¼Ü¹ýÓÚÖØÁ¿¼¶²¢ÇÒ¿É¿ØÖ®´¦±È½ÏÉÙ£¬ËùÒÔHadoop
RPCʵÏÖÁË×Ô¶¨ÒåµÄPRC¿ò¼Ü¡£
1.3 RPCµÄ»ù±¾Á÷³Ì

£¨1£©RPC²ÉÓÃÁËC/SµÄģʽ£»
£¨2£©Client¶Ë·¢ËÍÒ»¸ö´øÓвÎÊýµÄÇëÇóÐÅÏ¢µ½Server£»
£¨3£©Server½ÓÊÕµ½Õâ¸öÇëÇóÒԺ󣬸ù¾Ý·¢Ë͹ýÀ´µÄ²ÎÊýµ÷ÓÃÏàÓ¦µÄ³ÌÐò£¬È»ºó°Ñ×Ô¼º¼ÆËãºÃµÄ½á¹û·¢Ë͸øClient¶Ë£»
£¨4£©Client¶Ë½ÓÊÕµ½½á¹ûºó¼ÌÐøÔËÐУ»
1.4 HadoopÖеÄRPC»úÖÆ ͬÆäËûRPC¿ò¼ÜÒ»Ñù£¬Hadoop RPC·ÖΪËĸö²¿·Ö£º
£¨1£©ÐòÁл¯²ã£ºClentÓëServer¶ËͨÐÅ´«µÝµÄÐÅÏ¢²ÉÓÃÁËHadoopÀïÌṩµÄÐòÁл¯Àà»ò×Ô¶¨ÒåµÄWritableÀàÐÍ£»
£¨2£©º¯Êýµ÷Óò㣺Hadoop RPCͨ¹ý¶¯Ì¬´úÀíÒÔ¼°java·´ÉäʵÏÖº¯Êýµ÷Óã»
£¨3£©ÍøÂç´«Êä²ã£ºHadoop RPC²ÉÓÃÁË»ùÓÚTCP/IPµÄsocket»úÖÆ£»
£¨4£©·þÎñÆ÷¶Ë¿ò¼Ü²ã£ºRPC ServerÀûÓÃjava NIOÒÔ¼°²ÉÓÃÁËʼþÇý¶¯µÄI/OÄ£ÐÍ£¬Ìá¸ßRPC
ServerµÄ²¢·¢´¦ÀíÄÜÁ¦£»
Hadoop RPCÔÚÕû¸öHadoopÖÐÓ¦Ó÷dz£¹ã·º£¬Client¡¢DataNode¡¢NameNodeÖ®¼äµÄͨѶȫ¿¿ËüÁË¡£ÀýÈ磺ÎÒÃÇÆ½Ê±²Ù×÷HDFSµÄʱºò£¬Ê¹ÓõÄÊÇFileSystemÀ࣬ËüµÄÄÚ²¿ÓиöDFSClient¶ÔÏó£¬Õâ¸ö¶ÔÏó¸ºÔðÓëNameNode´ò½»µÀ¡£ÔÚÔËÐÐʱ£¬DFSClientÔÚ±¾µØ´´½¨Ò»¸öNameNodeµÄ´úÀí£¬È»ºó¾Í²Ù×÷Õâ¸ö´úÀí£¬Õâ¸ö´úÀí¾Í»áͨ¹ýÍøÂ磬Զ³Ìµ÷Óõ½NameNodeµÄ·½·¨£¬Ò²ÄÜ·µ»ØÖµ¡£

1.5 Hadoop RPCÉè¼Æ¼¼Êõ £¨1£©¶¯Ì¬´úÀí
About£º¶¯Ì¬´úÀí¿ÉÒÔÌṩ¶ÔÁíÒ»¸ö¶ÔÏóµÄ·ÃÎÊ£¬Í¬Ê±Òþ²ØÊµ¼Ê¶ÔÏóµÄ¾ßÌåÊÂʵ£¬´úÀí¶ÔÏó¶Ô¿Í»§Òþ²ØÁËʵ¼Ê¶ÔÏó¡£Ä¿Ç°Java¿ª·¢°üÖÐÌṩÁ˶Զ¯Ì¬´úÀíµÄÖ§³Ö£¬µ«ÏÖÔÚÖ»Ö§³Ö¶Ô½Ó¿ÚµÄʵÏÖ¡£
£¨2£©·´É䡪¡ª¶¯Ì¬¼ÓÔØÀà
£¨3£©ÐòÁл¯
£¨4£©·Ç×èÈûµÄÒì²½IO£¨NIO£©
Java NIOÔÀíÇë²Î¿¼ÔĶÁ£ºhttp://weixiaolu.iteye.com/blog/1479656
¶þ¡¢ÈçºÎʹÓÃRPC 2.1 Hadoop RPC¶ÔÍâÌṩµÄ½Ó¿Ú Hadoop RPC¶ÔÍâÖ÷ÒªÌṩÁËÁ½ÖÖ½Ó¿Ú£¨¼ûÀàorg.apache.hadoop.ipc.RPC£©£¬·Ö±ðÊÇ£º
£¨1£©public static <T> ProtocolProxy <T>
getProxy/waitForProxy(¡)
¹¹ÔìÒ»¸ö¿Í»§¶Ë´úÀí¶ÔÏ󣨸öÔÏóʵÏÖÁËij¸öÐÒ飩£¬ÓÃÓÚÏò·þÎñÆ÷·¢ËÍRPCÇëÇó¡£
£¨2£©public static Server RPC.Builder (Configuration).build()
Ϊij¸öÐÒ飨ʵ¼ÊÉÏÊÇJava½Ó¿Ú£©ÊµÀý¹¹ÔìÒ»¸ö·þÎñÆ÷¶ÔÏó£¬ÓÃÓÚ´¦Àí¿Í»§¶Ë·¢Ë͵ÄÇëÇó¡£
2.2 ʹÓÃHadoop RPCµÄËÄ´ó²½´Õ £¨1£©¶¨ÒåRPCÐÒé
RPCÐÒéÊǿͻ§¶ËºÍ·þÎñÆ÷¶ËÖ®¼äµÄͨÐŽӿڣ¬Ëü¶¨ÒåÁË·þÎñÆ÷¶Ë¶ÔÍâÌṩµÄ·þÎñ½Ó¿Ú¡£
£¨2£©ÊµÏÖRPCÐÒé
Hadoop RPCÐÒéͨ³£ÊÇÒ»¸öJava½Ó¿Ú£¬Óû§ÐèҪʵÏָýӿڡ£
£¨3£©¹¹ÔìºÍÆô¶¯RPC SERVER
Ö±½ÓʹÓþ²Ì¬ÀàBuilder¹¹ÔìÒ»¸öRPC Server£¬²¢µ÷Óú¯Êýstart()Æô¶¯¸ÃServer¡£
£¨4£©¹¹ÔìRPC Client²¢·¢ËÍÇëÇó
ʹÓþ²Ì¬·½·¨getProxy¹¹Ôì¿Í»§¶Ë´úÀí¶ÔÏó£¬Ö±½Óͨ¹ý´úÀí¶ÔÏóµ÷ÓÃÔ¶³Ì¶ËµÄ·½·¨¡£
Èý¡¢RPCÓ¦ÓÃʵÀý 3.1 ¶¨ÒåRPCÐÒé ÈçÏÂËùʾ£¬ÎÒÃǶ¨ÒåÒ»¸öIProxyProtocol ͨÐŽӿڣ¬ÉùÃ÷ÁËÒ»¸öAdd()·½·¨¡£
public
interface IProxyProtocol extends VersionedProtocol
{
static final long VERSION = 23234L; //°æ±¾ºÅ£¬Ä¬ÈÏÇé¿öÏ£¬²»Í¬°æ±¾ºÅµÄRPC
ClientºÍServerÖ®¼ä²»ÄÜÏ໥ͨÐÅ
int Add(int number1,int number2);
} |
ÐèҪעÒâµÄÊÇ£º
£¨1£©HadoopÖÐËùÓÐ×Ô¶¨ÒåRPC½Ó¿Ú¶¼ÐèÒª¼Ì³ÐVersionedProtocol½Ó¿Ú£¬ËüÃèÊöÁËÐÒéµÄ°æ±¾ÐÅÏ¢¡£
£¨2£©Ä¬ÈÏÇé¿öÏ£¬²»Í¬°æ±¾ºÅµÄRPC ClientºÍServerÖ®¼ä²»ÄÜÏ໥ͨÐÅ£¬Òò´Ë¿Í»§¶ËºÍ·þÎñ¶Ëͨ¹ý°æ±¾ºÅ±êʶ¡£
3.2 ʵÏÖRPCÐÒé Hadoop RPCÐÒéͨ³£ÊÇÒ»¸öJava½Ó¿Ú£¬Óû§ÐèҪʵÏָýӿڡ£¶ÔIProxyProtocol½Ó¿Ú½øÐмòµ¥µÄʵÏÖÈçÏÂËùʾ£º
public
class MyProxy implements IProxyProtocol {
public int Add(int number1,int number2) {
System.out.println("ÎÒ±»µ÷ÓÃÁË!");
int result = number1+number2;
return result;
}
public long getProtocolVersion(String protocol,
long clientVersion)
throws IOException {
System.out.println("MyProxy.ProtocolVersion="
+ IProxyProtocol.VERSION);
// ×¢Ò⣺ÕâÀï·µ»ØµÄ°æ±¾ºÅÓë¿Í»§¶ËÌṩµÄ°æ±¾ºÅÐè±£³ÖÒ»ÖÂ
return IProxyProtocol.VERSION;
}
} |
ÕâÀïʵÏÖµÄAdd·½·¨ºÜ¼òµ¥£¬¾ÍÊÇÒ»¸ö¼Ó·¨²Ù×÷¡£ÎªÁ˲鿴Ч¹û£¬ÕâÀïͨ¹ý¿ØÖÆÌ¨Êä³öÒ»¾ä£º¡°ÎÒ±»µ÷ÓÃÁË£¡¡±
3.3 ¹¹ÔìRPC Server²¢Æô¶¯·þÎñ ÕâÀïͨ¹ýRPCµÄ¾²Ì¬·½·¨getServerÀ´»ñµÃServer¶ÔÏó£¬ÈçÏ´úÂëËùʾ£º
public
class MyServer {
public static int PORT = 5432;
public static String IPAddress = "127.0.0.1";
public static void main(String[] args) throws
Exception {
MyProxy proxy = new MyProxy();
final Server server = RPC.getServer(proxy, IPAddress,
PORT, new Configuration());
server.start();
}
} |
Õâ¶Î´úÂëµÄºËÐÄÔÚÓÚµÚ5ÐеÄRPC.getServer·½·¨£¬¸Ã·½·¨ÓÐËĸö²ÎÊý£¬µÚÒ»¸ö²ÎÊýÊDZ»µ÷ÓõÄjava¶ÔÏ󣬵ڶþ¸ö²ÎÊýÊÇ·þÎñÆ÷µÄµØÖ·£¬µÚÈý¸ö²ÎÊýÊÇ·þÎñÆ÷µÄ¶Ë¿Ú
¡£»ñµÃ·þÎñÆ÷¶ÔÏóºó£¬Æô¶¯·þÎñÆ÷¡£ÕâÑù£¬·þÎñÆ÷¾ÍÔÚÖ¸¶¨¶Ë¿Ú¼àÌý¿Í»§¶ËµÄÇëÇó¡£µ½´ËΪֹ£¬·þÎñÆ÷¾Í´¦ÓÚ¼àÌý״̬£¬²»Í£µØµÈ´ý¿Í»§¶ËÇëÇóµ½´ï¡£
3.4 ¹¹ÔìRPC Client²¢·¢³öÇëÇó ÕâÀïʹÓþ²Ì¬·½·¨getProxy»òwaitForProxy¹¹Ôì¿Í»§¶Ë´úÀí¶ÔÏó£¬Ö±½Óͨ¹ý´úÀí¶ÔÏóµ÷ÓÃÔ¶³Ì¶ËµÄ·½·¨£¬¾ßÌåÈçÏÂËùʾ£º
public
class MyClient {
public static void main(String[] args) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(
MyServer.IPAddress, MyServer.PORT);
try {
// ×¢Ò⣺ÕâÀï´«ÈëµÄ°æ±¾ºÅÐèÒªÓë´úÀí±£³ÖÒ»ÖÂ
IProxyProtocol proxy = (IProxyProtocol) RPC.waitForProxy(
IProxyProtocol.class, IProxyProtocol.VERSION,
inetSocketAddress,
new Configuration());
int result = proxy.Add(10, 25);
System.out.println("10+25=" + result);
RPC.stopProxy(proxy);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} |
ÒÔÉÏ´úÂëÖкËÐÄÔÚÓÚRPC.waitForProxy()£¬¸Ã·½·¨ÓÐËĸö²ÎÊý£¬µÚÒ»¸ö²ÎÊýÊDZ»µ÷ÓõĽӿÚÀ࣬µÚ¶þ¸öÊǿͻ§¶Ë°æ±¾ºÅ£¬µÚÈý¸öÊÇ·þÎñ¶ËµØÖ·¡£·µ»ØµÄ´úÀí¶ÔÏ󣬾ÍÊÇ·þÎñ¶Ë¶ÔÏóµÄ´úÀí£¬ÄÚ²¿¾ÍÊÇʹÓÃjava.lang.ProxyʵÏֵġ£
¾¹ýÒÔÉÏËIJ½£¬ÎÒÃDZãÀûÓÃHadoop RPC´î½¨ÁËÒ»¸ö·Ç³£¸ßЧµÄ¿Í»§»ú¨C·þÎñÆ÷ÍøÂçÄ£ÐÍ¡£
3.5 ²é¿´ÔËÐнá¹û £¨1£©Æô¶¯·þÎñ¶Ë£¬¿ªÊ¼¼àÌý¿Í»§¶ËÇëÇó

£¨2£©Æô¶¯¿Í»§¶Ë£¬¿ªÊ¼Ïò·þÎñ¶Ë·¢ÇëÇó

£¨3£©²é¿´·þÎñ¶Ë״̬£¬ÊÇ·ñ±»µ÷ÓÃ

SUMMARY£º´ÓÉÏÃæµÄRPCµ÷ÓÃÖУ¬¿ÉÒÔ¿´³ö£ºÔÚ¿Í»§¶Ëµ÷ÓõÄÒµÎñÀàµÄ·½·¨ÊǶ¨ÒåÔÚÒµÎñÀàµÄ½Ó¿ÚÖеġ£¸Ã½Ó¿ÚʵÏÖÁËVersionedProtocal½Ó¿Ú¡£
£¨4£©ÏÖÔÚÎÒÃÇÔÚÃüÁîÐÐÖ´ÐÐjpsÃüÁ²é¿´Êä³öÐÅÏ¢£¬»á³öÏÖÈçÏÂͼËùʾµÄ£º

´ÓÉÏͼÖпÉÒÔ¿´µ½Ò»¸öjava½ø³Ì£¬ÊÇ¡°MyServer¡±£¬¸Ã½ø³ÌÕýÊÇÎÒÃǸոÕÔËÐеÄRPCµÄ·þÎñ¶ËÀàMyServer¡£Òò´Ë£¬´ó¼Ò¿ÉÒÔÁªÏëµ½ÎÒÃǴHadoop»·¾³Ê±£¬Ò²Ö´Ðйý¸ÃÃüÁîÓÃÀ´ÅжÏHadoopµÄÏà¹Ø½ø³ÌÊÇ·ñÈ«²¿Æô¶¯¡£
SUMMARY£ºÄÇô¿ÉÒÔÅжϣ¬HadoopÆô¶¯Ê±²úÉúµÄ5¸öjava½ø³ÌÒ²Ó¦¸ÃÊÇRPCµÄ·þÎñ¶Ë¡£¡¡¡¡
ÏÂÃæÎÒÃǹ۲ìNameNodeµÄÔ´´úÂ룬ÈçÏÂͼËùʾ£¬¿ÉÒÔ¿´µ½NameNodeȷʵ´´½¨ÁËRPCµÄ·þÎñ¶Ë¡£
private
void initialize(Configuration conf) throws IOException
{
......
// create rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this.serviceRpcServer = RPC.getServer(this,
dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf,
namesystem
.getDelegationTokenSecretManager());
......
} |
|