Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Hadoop YARN¼Ü¹¹Éè¼ÆÒªµã
 
×÷ÕߣºUri Margalit À´Ô´£ºInfoQ ·¢²¼ÓÚ£º2015-8-13
  3575  次浏览      27
 

YARNÊÇ¿ªÔ´ÏîÄ¿HadoopµÄÒ»¸ö×ÊÔ´¹ÜÀíϵͳ£¬×î³õÉè¼ÆÊÇΪÁ˽â¾öHadoopÖÐMapReduce¼ÆËã¿ò¼ÜÖеÄ×ÊÔ´¹ÜÀíÎÊÌ⣬µ«ÊÇÏÖÔÚËüÒѾ­ÊÇÒ»¸ö¸ü¼ÓͨÓõÄ×ÊÔ´¹ÜÀíϵͳ£¬¿ÉÒÔ°ÑMapReduce¼ÆËã¿ò¼Ü×÷Ϊһ¸öÓ¦ÓóÌÐòÔËÐÐÔÚYARNϵͳ֮ÉÏ£¬Í¨¹ýYARNÀ´¹ÜÀí×ÊÔ´¡£Èç¹ûÄãµÄÓ¦ÓóÌÐòÒ²ÐèÒª½èÖúYARNµÄ×ÊÔ´¹ÜÀí¹¦ÄÜ£¬ÄãÒ²¿ÉÒÔʵÏÖYARNÌṩµÄ±à³ÌAPI£¬½«ÄãµÄÓ¦ÓóÌÐòÔËÐÐÓÚYARNÖ®ÉÏ£¬½«×ÊÔ´µÄ·ÖÅäÓë»ØÊÕͳһ½»¸øYARNÈ¥¹ÜÀí£¬¿ÉÒÔ´ó´ó¼ò»¯×ÊÔ´¹ÜÀí¹¦ÄܵĿª·¢¡£µ±Ç°£¬Ò²ÓкܶàÓ¦ÓóÌÐòÒѾ­¿ÉÒÔ¹¹½¨ÓÚYARNÖ®ÉÏ£¬ÈçStorm¡¢SparkµÈ¼ÆËã¿ò¼Ü¡£

YARNÕûÌå¼Ü¹¹

YARNÊÇ»ùÓÚMaster/SlaveģʽµÄ·Ö²¼Ê½¼Ü¹¹£¬ÎÒÃÇÏÈ¿´Ò»Ï£¬YARNµÄ¼Ü¹¹Éè¼Æ£¬ÈçͼËùʾ£¨À´×Ô¹ÙÍøÎĵµ£©£º

ÉÏͼ£¬´ÓÂß¼­É϶¨ÒåÁËYARNϵͳµÄºËÐÄ×é¼þºÍÖ÷Òª½»»¥Á÷³Ì£¬¸÷¸ö×é¼þ˵Ã÷ÈçÏ£º

YARN Client

YARN ClientÌá½»Applicationµ½RM£¬Ëü»áÊ×ÏÈ´´½¨Ò»¸öApplicationÉÏÏÂÎļþ¶ÔÏ󣬲¢ÉèÖÃAM±ØÐèµÄ×ÊÔ´ÇëÇóÐÅÏ¢£¬È»ºóÌá½»µ½RM¡£YARN ClientÒ²¿ÉÒÔÓëRMͨÐÅ£¬»ñÈ¡µ½Ò»¸öÒѾ­Ìá½»²¢ÔËÐеÄApplicationµÄ״̬ÐÅÏ¢µÈ£¬¾ßÌåÏê¼ûºóÃæApplicationClientProtocolЭÒéµÄ·ÖÎö˵Ã÷¡£

ResourceManager£¨RM£©

RMÊÇYARN¼¯ÈºµÄMaster£¬¸ºÔð¹ÜÀíÕû¸ö¼¯ÈºµÄ×ÊÔ´ºÍ×ÊÔ´·ÖÅä¡£RM×÷Ϊ¼¯Èº×ÊÔ´µÄ¹ÜÀíºÍµ÷¶ÈµÄ½ÇÉ«£¬Èç¹û´æÔÚµ¥µã¹ÊÕÏ£¬ÔòÕû¸ö¼¯ÈºµÄ×ÊÔ´¶¼ÎÞ·¨Ê¹Óá£ÔÚ2.4.0°æ±¾²ÅÐÂÔöÁËRM HAµÄÌØÐÔ£¬ÕâÑù¾ÍÔö¼ÓÁËRMµÄ¿ÉÓÃÐÔ¡£

NodeManager£¨NM£©

NMÊÇYARN¼¯ÈºµÄSlave£¬ÊǼ¯ÈºÖÐʵ¼ÊÓµÓÐʵ¼Ê×ÊÔ´µÄ¹¤×÷½Úµã¡£ÎÒÃÇÌá½»JobÒԺ󣬻Ὣ×é³ÉJobµÄ¶à¸öTaskµ÷¶Èµ½¶ÔÓ¦µÄNMÉϽøÐÐÖ´ÐС£Hadoop¼¯ÈºÖУ¬ÎªÁË»ñµÃ·Ö²¼Ê½¼ÆËãÖеÄLocalityÌØÐÔ£¬»á½«DNºÍNMÔÚͬһ¸ö½ÚµãÉÏÔËÐУ¬ÕâÑù¶ÔÓ¦µÄHDFSÉϵÄBlock¿ÉÄܾÍÔÚ±¾µØ£¬¶øÎÞÐèÔÚÍøÂç¼ä½øÐÐÊý¾ÝµÄ´«Êä¡£

Container

ContainerÊÇYARN¼¯ÈºÖÐ×ÊÔ´µÄ³éÏ󣬽«NMÉϵÄ×ÊÔ´½øÐÐÁ¿»¯£¬¸ù¾ÝÐèÒª×é×°³ÉÒ»¸ö¸öContainer£¬È»ºó·þÎñÓÚÒÑÊÚȨ×ÊÔ´µÄ¼ÆËãÈÎÎñ¡£¼ÆËãÈÎÎñÔÚÍê³É¼ÆËãºó£¬ÏµÍ³»á»ØÊÕ×ÊÔ´£¬ÒÔ¹©ºóÐø¼ÆËãÈÎÎñÉêÇëʹÓá£Container°üº¬Á½ÖÖ×ÊÔ´£ºÄÚ´æºÍCPU£¬ºóÐøHadoop°æ±¾¿ÉÄÜ»áÔö¼ÓÓ²ÅÌ¡¢ÍøÂçµÈ×ÊÔ´¡£

ApplicationMaster£¨AM£©

AMÖ÷Òª¹ÜÀíºÍ¼à¿Ø²¿ÊðÔÚYARN¼¯ÈºÉϵÄApplication£¬ÒÔMapReduceΪÀý£¬MapReduce ApplicationÊÇÒ»¸öÓÃÀ´´¦ÀíMapReduce¼ÆËãµÄ·þÎñ¿ò¼Ü³ÌÐò£¬ÎªÓû§±àдµÄMapReduce³ÌÐòÌṩÔËÐÐʱ֧³Ö¡£Í¨³£ÎÒÃÇÔÚ±àдµÄÒ»¸öMapReduce³ÌÐò¿ÉÄܰüº¬¶à¸öMap Task»òReduce Task£¬¶ø¸÷¸öTaskµÄÔËÐйÜÀíÓë¼à¿Ø¶¼ÊÇÓÉÕâ¸öMapReduce ApplicationÀ´¸ºÔ𣬱ÈÈçÔËÐÐTaskµÄ×ÊÔ´ÉêÇ룬ÓÉAMÏòRMÉêÇ룻Æô¶¯/Í£Ö¹NMÉÏijTaskµÄ¶ÔÓ¦µÄContainer£¬ÓÉAMÏòNMÇëÇóÀ´Íê³É¡£

ÏÂÃæ£¬ÎÒÃÇ»ùÓÚHadoop 2.6.0µÄYARNÔ´Â룬À´Ì½ÌÖYARNÄÚ²¿ÊµÏÖÔ­Àí¡£

YARNЭÒé

YARNÊÇÒ»¸ö·Ö²¼Ê½×ÊÔ´¹ÜÀíϵͳ£¬Ëü°üº¬ÁË·Ö²¼µÄ¶à¸ö×é¼þ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÕâЩ×é¼þÖ®¼äÉè¼ÆµÄ½»»¥Ð­ÒéÀ´ËµÃ÷£¬ÈçͼËùʾ£º

ÏÂÃæÎÒÃÇÀ´Ïêϸ¿´¿´¸÷¸öЭÒéʵÏֵŦÄÜ£º

ApplicationClientProtocol£¨Client -> RM£©

ResourceTracker£¨NM -> RM£©

ApplicationMasterProtocol£¨AM -> RM£©

ContainerManagementProtocol£¨AM -> NM£©

ResourceManagerAdministrationProtocol£¨RM Admin -> RM£©

HAServiceProtocol£¨Active RM HA Framework Standby RM£©

YARN RPCʵÏÖ

1.X°æ±¾µÄHadoopʹÓÃĬÈÏʵÏÖµÄWritableЭÒé×÷ΪRPCЭÒ飬¶øÔÚ2.X°æ±¾£¬ÖØÐ´ÁËRPC¿ò¼Ü£¬¸Ä³ÉĬÈÏʹÓÃProtobufЭÒé×÷ΪHadoopµÄĬÈÏRPCͨÐÅЭÒé¡£ YARN RPCµÄʵÏÖ£¬ÈçÏÂÃæÀàͼËùʾ£º

ͨ¹ýÉÏͼ¿ÉÒÔ¿´³ö£¬RpcEngineÓÐÁ½¸öʵÏÖ£ºWritableRpcEngineºÍProtobufRpcEngine£¬Ä¬ÈÏʹÓÃProtobufRpcEngine£¬ÎÒÃÇ¿ÉÒÔÑ¡ÔñʹÓÃ1.XĬÈϵÄRPCͨÐÅЭÒ飬ÉõÖÁ¿ÉÒÔ×Ô¶¨ÒåʵÏÖ¡£

ResourceManagerÄÚ²¿Ô­Àí

RMÊÇYARN·Ö²¼Ê½ÏµÍ³µÄÖ÷½Úµã£¬ResourceManager·þÎñ½ø³ÌÄÚ²¿Óкܶà×é¼þÌṩÆäËû·þÎñ£¬°üÀ¨¶ÔÍâRPC·þÎñ£¬ÒѾ­Î¬»¤ÄÚ²¿Ò»Ð©¶ÔÏó״̬µÄ·þÎñµÈ£¬RMµÄÄÚ²¿½á¹¹ÈçͼËùʾ£º

ÉÏͼÖÐRMÄÚ²¿¸÷¸ö×é¼þ£¨Dispatcher/EventHandler/Service£©µÄ¹¦ÄÜ£¬¿ÉÒԲ鿴ԴÂë¡£

ÕâÀ˵һÏÂResourceScheduler×é¼þ£¬ËüÊÇRMÄÚ²¿×îÖØÒªµÄÒ»¸ö×é¼þ£¬ÓÃËüÀ´ÊµÏÖ×ÊÔ´µÄ·ÖÅäÓë»ØÊÕ£¬ËüÌṩÁËÒ»¶¨Ëã·¨£¬ÔÚÔËÐÐʱ¿ÉÒÔ¸ù¾ÝËã·¨ÌṩµÄ²ßÂÔÀ´¶Ô×ÊÔ´½øÐе÷¶È¡£YARNÄÚ²¿ÓÐ3ÖÖ×ÊÔ´µ÷¶È²ßÂÔµÄʵÏÖ£ºFifoScheduler¡¢FairScheduler¡¢CapacityScheduler£¬ÆäÖÐĬÈÏʵÏÖΪCapacityScheduler¡£CapacitySchedulerʵÏÖÁË×ÊÔ´¸ü¼ÓϸÁ£¶ÈµÄ·ÖÅ䣬¿ÉÒÔÉèÖö༶¶ÓÁУ¬Ã¿¸ö¶ÓÁж¼ÓÐÒ»¶¨µÄÈÝÁ¿£¬¼´¶Ô¶ÓÁÐÉèÖÃ×ÊÔ´ÉÏÏÞºÍÏÂÏÞ£¬È»ºó¶Ôÿһ¼¶±ð¶ÓÁзֱðÔÙ²ÉÓúÏÊʵĵ÷¶È²ßÂÔ£¨ÈçFIFO£©½øÐе÷¶È¡£

Èç¹ûÎÒÃÇÏëʵÏÖ×Ô¼ºµÄ×ÊÔ´µ÷¶È²ßÂÔ£¬¿ÉÒÔÖ±½ÓʵÏÖYARNµÄ×ÊÔ´µ÷¶È½Ó¿ÚResourceScheduler£¬È»ºóÐÞ¸Äyarn-site.xmlÖеÄÅäÖÃÏîyarn.resourcemanager.scheduler.class¼´¿É¡£

NodeManagerÄÚ²¿Ô­Àí

NMÊÇYARNϵͳÖÐʵ¼Ê³ÖÓÐ×ÊÔ´µÄ´Ó½Úµã£¬Ò²ÊÇʵ¼ÊÓû§³ÌÐòÔËÐеÄËÞÖ÷½Úµã£¬ÄÚ²¿½á¹¹ÈçͼËùʾ£º

ÉÏͼÖÐNMÄÚ²¿¸÷¸ö×é¼þ£¨Dispatcher/EventHandler/Service£©µÄ¹¦ÄÜ£¬¿ÉÒԲ鿴ԴÂ룬²»ÔÙÀÛÊö¡£

ʼþ´¦Àí»úÖÆ

ʼþ´¦Àí¿ÉÒÔ·Ö³É2´óÀ࣬һÀàÊÇͬ²½´¦Àíʼþ£¬Ê¼þ´¦Àí¹ý³Ì»á×èÈûµ÷Óýø³Ì£¬Í¨³£ÕâÑùµÄʼþ´¦ÀíÂß¼­·Ç³£¼òµ¥£¬²»»á³¤Ê±¼ä×èÈû£»ÁíÒ»Àà¾ÍÊÇÒì²½´¦Àí´¦Àíʼþ£¬Í¨³£ÔÚ½ÓÊÕµ½Ê¼þÒԺ󣬻áÓÐÒ»¸öÓÃÀ´ÅÉ·¢Ê¼þµÄDispatcher£¬½«Ê¼þ·¢µ½¶ÔÓ¦µÄʼþ¶ÓÁÐÖУ¬Õâ²ÉÓÃÉú²úÕß-Ïû·ÑÕßģʽ£¬Ïû·ÑÕßÕâ»á¼àÊÓ×ŶÓÁУ¬²¢´ÓÈ¡³öʼþ½øÐÐÒì²½´¦Àí¡£
YARNÖе½´¦¿ÉÒÔ¼ûµ½Ê¼þ´¦Àí£¬ÆäÖбȽÏÌØÊâÒ»µãµÄ¾ÍÊǽ«×´Ì¬»ú£¨StateMachine£©×÷Ϊһ¸öʼþ´¦ÀíÆ÷£¬´Ó¶øÍ¨¹ýʼþÀ´´¥·¢Ìض¨¶ÔÏó״̬µÄ±äǨ£¬Í¨¹ýÕâÖÖ·½Ê½À´¹ÜÀí¶ÔÏó״̬¡£ÎÒÃÇÏÈ¿´Ò»ÏÂYARNÖÐʼþ´¦ÀíµÄ»úÖÆ£¬ÒÔResourceManager¶ËΪÀý£¬ÈçÏÂͼËùʾ£º

²úÉúµÄʼþͨ¹ýDispatcher½øÐÐÅÉ·¢²¢½øÐд¦Àí£¬Èç¹ûEventHandler´¦ÀíÂß¼­±È½Ï¼òµ¥£¬Ö±½Óͬ²½´¦Àí£¬·ñÔò¿ÉÄÜ»á²ÉÓÃÒì²½´¦ÀíµÄ·½Ê½¡£ÔÚEventHandler´¦ÀíµÄ¹ý³ÌÖУ¬»¹¿ÉÄܲúÉúеÄʼþEvent£¬È»ºóÔÙ´Îͨ¹ýRMµÄDispatcher½øÐÐÅÉ·¢£¬¶øºó´¦Àí¡£

״̬»ú

ÎÒÃÇÒÔRM¶Ë¹ÜÀíµÄRMAppImpl¶ÔÏóΪÀý£¬Ëü±íʾһ¸öApplicationÔËÐйý³ÌÖУ¬ÔÚRM¶ËµÄËùά»¤µÄApplicationµÄ״̬£¬¸Ã¶ÔÏó¶ÔÓ¦µÄËùÓÐ״̬¼°Æä×´Ì¬×ªÒÆÂ·¾¶£¬ÈçÏÂͼËùʾ£º

ÔÚÉÏͼÖÐÈç¹û¼ÓÉÏ´¥·¢×´Ì¬×ªÒƵÄʼþ¼°ÆäÀàÐÍ£¬¿ÉÄÜÕû¸öͼ»áÏԵúÜÂÒ£¬ËùÒÔÕâÀÎÒÏêϸ»­ÁËÒ»¸ö·Öͼ£¬ÓÃÀ´ËµÃ÷£¬Ã¿Ò»¸ö״̬µÄ±ä»¯¶¼ÊÇÓÐÄÄÖÖÀàÐ͵Äʼþ´¥·¢µÄ£¬¸ù¾ÝÕâ¸öͼ£¬¿ÉÒÔ·½±ãµØÔĶÁÔ´Â룬ÈçÏÂͼËùʾ£º

NMLivelinessMonitorÔ´Âë·ÖÎöʵÀý

YARNÖ÷Òª²ÉÓÃÁËDispatcher+EventHandler+ServiceÕâÑùµÄ³éÏ󣬽«ËùÓеÄÄÚ²¿/Íⲿ×é¼þ²ÉÓÃÕâÖÖ»úÖÆÀ´ÊµÏÖ£¬ÓÉÓÚ´æÔںܶàµÄServiceºÍEventHandler£¬¶øÇÒÓеÄ×é¼þ¿ÉÄܼÈÊÇÒ»¸öService£¬Í¬Ê±»¹ÊÇÒ»¸öEventHandler£¬ËùÒÔÔÚÔĶÁ´úÂëµÄʱºò¿ÉÄÜ»á¸Ð¾õÃÔ㣬ÕâÀïÎÒ¸ø³öÁËÒ»¸öÔĶÁNMLivelinessMonitor·þÎñµÄʵÀý£¬½ö¹©ÏëÑо¿Ô´ÂëµÄÈ˲ο¼¡£

NMLivelinessMonitorÊÇResourceManager¶ËµÄÒ»¸ö¼à¿Ø·þÎñʵÏÖ£¬ËüÖ÷ÒªÊÇÓÃÀ´¼à¿Ø×¢²áµÄ½ÚµãµÄLiveliness״̬£¬ÕâÀïÊÇ¼à¿ØNodeManagerµÄ״̬¡£¸Ã·þÎñ»áÖÜÆÚÐԵؼì²éNodeManagerµÄÐÄÌøÐÅÏ¢À´È·±£×¢²áµ½ResourceManagerµÄNodeManagerµ±Ç°´¦ÓÚ»îԾ״̬£¬¿ÉÒÔÖ´ÐÐ×ÊÔ´·ÖÅäÒÔ¼°´¦Àí¼ÆËãÈÎÎñ£¬ÔÚNMLivelinessMonitorÀà¼Ì³ÐµÄ³éÏó·ºÐÍÀàAbstractLivelinessMonitorÖÐÓÐÒ»¸öMap£¬ÈçÏÂËùʾ£º

private Map<O, Long> running = new HashMap<O, Long>();

ÕâÀïÃæO±»Ìæ»»³ÉÁËNodeId£¬¶øÖµÀàÐÍLong±íʾʱ¼ä´Á£¬Ò²¾ÍÊDZí´ïÁËÒ»¸öNodeManagerÏòResourceManager×îºó·¢ËÍÐÄÌøÐÅϢʱ¼ä´Á£¬Í¨¹ý¼ì²ârunningÖеÄʱ¼ä´Á£»À´ÅжÏNodeManagerÊÇ·ñ¿ÉÒÔÕý³£Ê¹Óá£

ÔÚResourceManagerÖпÉÒÔ¿´µ½£¬NMLivelinessMonitorµÄʵÀýÊÇÆäÒ»¸ö³ÉÔ±£º

protected NMLivelinessMonitor nmLivelinessMonitor;

¿´Ò»ÏÂNMLivelinessMonitorÀàµÄʵÏÖ£¬Ëü¼Ì³Ð×Ô³éÏó·ºÐÍÀàAbstractLivelinessMonitor£¬¿´NMLivelinessMonitorÀàµÄÉùÃ÷£º

public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId>

ÔÚÀàʵÏÖÖУ¬ÓÐÒ»¸öÖØÐ´£¨@Override£©µÄprotectedµÄ·½·¨expire£¬ÈçÏÂËùʾ£º

@Override
2
protected void expire(NodeId id) {
3
dispatcher.handle(
4
new RMNodeEvent(id, RMNodeEventType.EXPIRE));
5
}

ÎÒÃÇ¿ÉÒÔͨ¹ý¸ÃÀàNMLivelinessMonitor³éÏó»ùÀàÖп´µ½µ÷ÓÃexpire·½·¨µÄÂß¼­£¬ÊÇÔÚÒ»¸öÄÚ²¿Ïß³ÌÀàPingCheckerÖУ¬´úÂëÈçÏÂËùʾ£º

private class PingChecker implements Runnable {
02

03
@Override
04
public void run() {
05
while (!stopped && !Thread.currentThread().isInterrupted()) {
06
synchronized (AbstractLivelinessMonitor.this) {
07
Iterator<Map.Entry<O, Long>> iterator =
08
running.entrySet().iterator();
09

10
//avoid calculating current time everytime in loop
11
long currentTime = clock.getTime();
12

13
while (iterator.hasNext()) {
14
Map.Entry<O, Long> entry = iterator.next();
15
if (currentTime > entry.getValue() + expireInterval) {
16
iterator.remove();
17
expire(entry.getKey()); // µ÷ÓóéÏó·½·¨expire£¬»áÔÚ×ÓÀàÖÐʵÏÖ
18
LOG.info("Expired:" + entry.getKey().toString() +
19
" Timed out after " + expireInterval/1000 + " secs");
20
}
21
}
22
}
23
try {
24
Thread.sleep(monitorInterval);
25
} catch (InterruptedException e) {
26
LOG.info(getName() + " thread interrupted");
27
break;
28
}
29
}
30
}
31
}

ÕâÀïÃæµÄ·ºÐÍOÔÚNMLivelinessMonitorÀàÖоÍÊÇNodeId£¬ËùÒÔ×î¹ØÐĵÄÂß¼­¾ÍÊÇÇ°ÃæÌáµ½µÄNMLivelinessMonitorÖеÄexpire·½·¨µÄʵÏÖ¡£ÔÚexpire·½·¨ÖУ¬µ÷ÓÃÁËdispatcherµÄhandle·½·¨À´´¦Àí£¬ËùÒÔdispatcherÓ¦¸ÃÊÇÒ»¸öEventHandler¶ÔÏ󣬺óÃæÎÒÃǻῴµ½£¬ËüÆäʵÊÇͨ¹ýResourceManagerÖеÄdispatcher³ÉÔ±£¬Ò²¾ÍÊÇAsyncDispatcherÀ´»ñÈ¡µ½µÄ£¨AsyncDispatcherÄÚ²¿ÓÐÒ»¸ö×éºÏ¶ø³ÉµÄEventHandler£©¡£

ÏÂÃæ£¬ÎÒÃǽÓ×Å¿´NMLivelinessMonitorÊÇÈçºÎ´´½¨µÄ£¬ÔÚResourceManager.RMActiveServicesÀàµÄserviceInit()·½·¨ÖУ¬´úÂëÈçÏÂËùʾ£º

nmLivelinessMonitor = createNMLivelinessMonitor();
2
addService(nmLivelinessMonitor);

¸ú×Ù´úÂë¼ÌÐø¿´createNMLivelinessMonitor·½·¨£¬ÈçÏÂËùʾ£º

private NMLivelinessMonitor createNMLivelinessMonitor() {
2
return new NMLivelinessMonitor(this.rmContext
3
.getDispatcher());
4
}

ÉÏÃæÍ¨¹ýrmContextµÄgetDispatcher»ñÈ¡µ½Ò»¸öDispatcher¶ÔÏó£¬À´×÷ΪNMLivelinessMonitor¹¹Ôì·½·¨µÄ²ÎÊý£¬ÎÒÃÇÐèÒª¿´Ò»ÏÂÕâ¸öDispatcherÊÇÈçºÎ´´½¨µÄ£¬²é¿´ResourceManager.serviceInit·½·¨£¬´úÂëÈçÏÂËùʾ£º

rmDispatcher = setupDispatcher();
2
addIfService(rmDispatcher);
3
rmContext.setDispatcher(rmDispatcher);

¼ÌÐø¸ú×Ù´úÂ룬setupDispatcher()·½·¨ÊµÏÖÈçÏÂËùʾ£º

private Dispatcher setupDispatcher() {
2
Dispatcher dispatcher = createDispatcher();
3
dispatcher.register(RMFatalEventType.class,
4
new ResourceManager.RMFatalEventDispatcher());
5
return dispatcher;
6
}

¼ÌÐø¿´createDispatcher()·½·¨´úÂëʵÏÖ£º

protected Dispatcher createDispatcher() {
2
return new AsyncDispatcher();
3
}

¿ÉÒÔ¿´µ½£¬ÔÚÕâÀï´´½¨ÁËÒ»¸öAsyncDispatcher¶ÔÏóÔÚ´´½¨µÄNMLivelinessMonitorʵÀýÖаüº¬Ò»¸öAsyncDispatcherʵÀý¡£»Øµ½Ç°Ã棬ÎÒÃÇÐèÒªÖªµÀÕâ¸öAsyncDispatcherµ÷ÓÃgetEventHandler()·µ»ØµÄEventHandlerµÄ´¦ÀíÂß¼­ÊÇÈçºÎµÄ£¬NMLivelinessMonitorµÄ´úÂëʵÏÖÈçÏÂËùʾ£º

public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId> {
02

03
private EventHandler dispatcher;
04

05
public NMLivelinessMonitor(Dispatcher d) {
06
super("NMLivelinessMonitor", new SystemClock());
07
this.dispatcher = d.getEventHandler(); // µ÷ÓÃAsyncDispatcherµÄgetEventHandler()·½·¨»ñÈ¡EventHandler
08
}
09

10
public void serviceInit(Configuration conf) throws Exception {
11
int expireIntvl = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
12
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
13
setExpireInterval(expireIntvl);
14
setMonitorInterval(expireIntvl/3);
15
super.serviceInit(conf);
16
}
17

18
@Override
19
protected void expire(NodeId id) {
20
dispatcher.handle(
21
new RMNodeEvent(id, RMNodeEventType.EXPIRE));
22
}
23
}

²é¿´AsyncDispatcherÀàµÄgetEventHandler()·½·¨£¬´úÂëÈçÏÂËùʾ£º

@Override
2
public EventHandler getEventHandler() {
3
if (handlerInstance == null) {
4
handlerInstance = new GenericEventHandler();
5
}
6
return handlerInstance;
7
}

¿É¼û£¬ÕâÀïÃæÎÞÂÛÊǵÚÒ»´Îµ÷Óû¹ÊÇÆäËû¶ÔÏóÒѾ­µ÷Óùý¸Ã·½·¨£¬ÕâÀïÃæ×îÖÕÖ»ÓÐÒ»¸öGenericEventHandlerʵÀý×÷ΪÕâ¸ödispatcherµÄÄÚ²¿EventHandlerʵÀý£¬ËùÒÔ¼ÌÐø¸ú×Ù´úÂ룬¿´GenericEventHandlerʵÏÖ£¬ÈçÏÂËùʾ£º

class GenericEventHandler implements EventHandler<Event> {
02
public void handle(Event event) {
03
if (blockNewEvents) {
04
return;
05
}
06
drained = false;
07

08
/* all this method does is enqueue all the events onto the queue */
09
int qSize = eventQueue.size();
10
if (qSize !=0 && qSize %1000 == 0) {
11
LOG.info("Size of event-queue is " + qSize);
12
}
13
int remCapacity = eventQueue.remainingCapacity();
14
if (remCapacity < 1000) {
15
LOG.warn("Very low remaining capacity in the event-queue: "
16
+ remCapacity);
17
}
18
try {
19
eventQueue.put(event); // ½«Event·ÅÈëµ½¶ÓÁÐeventQueueÖÐ
20
} catch (InterruptedException e) {
21
if (!stopped) {
22
LOG.warn("AsyncDispatcher thread interrupted", e);
23
}
24
throw new YarnRuntimeException(e);
25
}
26
};
27
}

½«´«Èëhandle·½·¨µÄEvent¶ª½øÁËeventQueue¶ÓÁУ¬Ò²¾ÍÊÇ˵GenericEventHandlerÊÇ»ùÓÚeventQueueµÄÒ»¸öÉú²úÕߣ¬ÄÇôÏû·ÑÕßÊÇAsyncDispatcherÄÚ²¿µÄÁíÒ»¸öỊ̈߳¬ÈçÏÂËùʾ£º

@Override
2
protected void serviceStart() throws Exception {
3
//start all the components
4
super.serviceStart();
5
eventHandlingThread = new Thread(createThread()); // µ÷Óô´½¨Ïû·ÑeventQueue¶ÓÁÐÖÐʼþµÄÏß³Ì
6
eventHandlingThread.setName("AsyncDispatcher event handler");
7
eventHandlingThread.start();
8
}

²é¿´createThread()·½·¨£¬ÈçÏÂËùʾ£º

Runnable createThread() {
02
return new Runnable() {
03
@Override
04
public void run() {
05
while (!stopped && !Thread.currentThread().isInterrupted()) {
06
drained = eventQueue.isEmpty();
07
// blockNewEvents is only set when dispatcher is draining to stop,
08
// adding this check is to avoid the overhead of acquiring the lock
09
// and calling notify every time in the normal run of the loop.
10
if (blockNewEvents) {
11
synchronized (waitForDrained) {
12
if (drained) {
13
waitForDrained.notify();
14
}
15
}
16
}
17
Event event;
18
try {
19
event = eventQueue.take(); // ´Ó¶ÓÁÐÈ¡³öʼþEvent
20
} catch(InterruptedException ie) {
21
if (!stopped) {
22
LOG.warn("AsyncDispatcher thread interrupted", ie);
23
}
24
return;
25
}
26
if (event != null) {
27
dispatch(event); // ·Ö·¢´¦Àí¸ÃÓÐЧʼþEvent
28
}
29
}
30
}
31
};
32
}

¿ÉÒÔ¿´µ½£¬´ÓeventQueue¶ÓÁÐÖÐÈ¡³öEvent£¬È»ºóµ÷ÓÃdispatch(event);À´´¦Àíʼþ£¬¿´dispatch(event)·½·¨£¬ÈçÏÂËùʾ£º

@SuppressWarnings("unchecked")
02
protected void dispatch(Event event) {
03
//all events go thru this loop
04
if (LOG.isDebugEnabled()) {
05
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
06
+ event.toString());
07
}
08

09
Class<? extends Enum> type = event.getType().getDeclaringClass();
10

11
try{
12
EventHandler handler = eventDispatchers.get(type); // ͨ¹ýevent»ñÈ¡µ½Ê¼þÀàÐÍ£¬ÔÙ¸ù¾ÝʼþÀàÐÍ»ñÈ¡µ½ÒѾ­×¢²áµÄEventHandler
13
if(handler != null) {
14
handler.handle(event); // ʹÓöÔÓ¦µÄEventHandler´¦Àíʼþevent
15
} else {
16
throw new Exception("No handler for registered for " + type);
17
}
18
} catch (Throwable t) {
19
//TODO Maybe log the state of the queue
20
LOG.fatal("Error in dispatcher thread", t);
21
// If serviceStop is called, we should exit this thread gracefully.
22
if (exitOnDispatchException
23
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
24
&& stopped == false) {
25
LOG.info("Exiting, bbye..");
26
System.exit(-1);
27
}
28
}
29
}

¿ÉÒÔ¿´µ½£¬¸ù¾ÝÒѾ­×¢²áµÄMap<Class, EventHandler> eventDispatchers±í£¬Ñ¡Ôñ¶ÔÓ¦µÄEventHandlerÀ´Ö´ÐÐʵ¼ÊµÄʼþ´¦ÀíÂß¼­¡£ÕâÀÔÙ¿´¿´Õâ¸öEventHandlerÊÇÔÚÄÄÀïסµÄ¡£Ç°ÃæÒѾ­¿´µ½£¬NMLivelinessMonitorÀàµÄexpire·½·¨ÖУ¬´«ÈëµÄÊÇnew RMNodeEvent(id, RMNodeEventType.EXPIRE)£¬ÎÒÃÇÔٲ鿴ResourceManager.RMActiveServices.serviceInit()·½·¨£º

// Register event handler for RmNodes
2
rmDispatcher.register(
3
RMNodeEventType.class, new NodeEventDispatcher(rmContext)); // ×¢²á£ºÊ¼þÀàÐÍRMNodeEventType£¬EventHandlerʵÏÖÀàNodeEventDispatcher

¿É¼ûRMNodeEventTypeÀàÐ͵ÄʼþÊÇʹÓÃResourceManager.NodeEventDispatcherÕâ¸öEventHandlerÀ´´¦ÀíµÄ£¬Í¬Ê±ËüÒ²ÊÇÒ»¸öDispatcher£¬ÏÖÔÚÔÙ¿´NodeEventDispatcherµÄʵÏÖ£º

@Private
02
public static final class NodeEventDispatcher implements
03
EventHandler<RMNodeEvent> {
04

05
private final RMContext rmContext;
06

07
public NodeEventDispatcher(RMContext rmContext) {
08
this.rmContext = rmContext;
09
}
10

11
@Override
12
public void handle(RMNodeEvent event) {
13
NodeId nodeId = event.getNodeId();
14
RMNode node = this.rmContext.getRMNodes().get(nodeId); // µ÷ÓÃgetRMNodes()»ñÈ¡µ½Ò»¸öConcurrentMap<NodeId, RMNode>£¬ Ëüά»¤Ã¿¸öNodeIdµÄ״̬£¨RMNodeÊÇÒ»¸ö״̬»ú¶ÔÏó£©
15
if (node != null) {
16
try {
17
((EventHandler<RMNodeEvent>) node).handle(event); // RMNodeµÄʵÏÖΪRMNodeImpl£¬ËüÒ²ÊÇÒ»¸öEventHandler
18
} catch (Throwable t) {
19
LOG.error("Error in handling event type " + event.getType()
20
+ " for node " + nodeId, t);
21
}
22
}
23
}
24
}

Õâ¸öÀïÃæ»¹Ã»ÓÐÕæÕýµØÈ¥´¦Àí£¬¶øÊÇ»ùÓÚRMNode״̬»ú¶ÔÏóÀ´½øÐÐ×ªÒÆ´¦Àí£¬ËùÒÔÎÒÃǼÌÐø¿´RMNodeµÄʵÏÖRMNodeImpl£¬ÒòÎªÇ°ÃæÊ¼þÀàÐÍRMNodeEventType.EXPIRE£¬ÎÒÃÇ¿´×´Ì¬»ú´´½¨Ê±¶Ô¸ÃʼþÀàÐ͵Ä×ªÒÆ¶¯×÷ÊÇÈçºÎ×¢²áµÄ£º

 private static final StateMachineFactory<RMNodeImpl,
02
NodeState,
03
RMNodeEventType,
04
RMNodeEvent> stateMachineFactory
05
= new StateMachineFactory<RMNodeImpl,
06
NodeState,
07
RMNodeEventType,
08
RMNodeEvent>(NodeState.NEW)
09
...
10
.addTransition(NodeState.RUNNING, NodeState.LOST,
11
RMNodeEventType.EXPIRE,
12
new DeactivateNodeTransition(NodeState.LOST))
13
...
14
.addTransition(NodeState.UNHEALTHY, NodeState.LOST,
15
RMNodeEventType.EXPIRE,
16
new DeactivateNodeTransition(NodeState.LOST))

ÔÚResourceManager¶Ëά»¤µÄNodeManagerµÄÐÅϢʹÓÃRMNodeImplÀ´±íʾ£¨ÔÚÄÚ´æÖб£´æConcurrentMap£©£¬ËùÒÔµ±Ç°Èç¹ûexpire·½·¨±»µ÷Óã¬RMNodeImpl»á¸ù¾Ý״̬»ú¶ÔÏóÖÐÒѾ­×¢²áµÄǰÖÃ×ªÒÆ×´Ì¬£¨pre-transition state£©¡¢ºóÖÃ×ªÒÆ×´Ì¬£¨post-transition state£©¡¢Ê¼þÀàÐÍ£¨event type£©¡¢×ªÒÆHook³ÌÐò£¬À´¶Ôʼþ½øÐд¦Àí£¬²¢Ê¹µ±Ç°RMNodeImplµÄ״̬ÓÉǰÖÃ×ªÒÆ×´Ì¬¸üÐÂΪºóÖÃ×ªÒÆ×´Ì¬¡£

¶ÔÓÚÉÏÃæ´úÂ룬Èç¹ûµ±Ç°RMNodeImpl״̬ÊÇNodeState.RUNNING£¬Ê¼þΪRMNodeEventType.EXPIREÀàÐÍ£¬Ôò»áµ÷ÓÃHook³ÌÐòʵÏÖDeactivateNodeTransition£¬×´Ì¬¸üÐÂΪNodeState.LOST£»Èç¹ûµ±Ç°RMNodeImpl״̬ÊÇNodeState.UNHEALTHY£¬Ê¼þΪRMNodeEventType.EXPIREÀàÐÍ£¬Ôò»áµ÷ÓÃHook³ÌÐòʵÏÖDeactivateNodeTransition£¬×´Ì¬¸üÐÂΪNodeState.LOST¡£¾ßÌ嵨£¬Ã¿¸öTransitionµÄ´¦ÀíÂß¼­ÈçºÎ£¬¿ÉÒԲ鿴¶ÔÓ¦µÄTransitionʵÏÖ´úÂë¡£

   
3575 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ

²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí