YARNÊÇ¿ªÔ´ÏîÄ¿HadoopµÄÒ»¸ö×ÊÔ´¹ÜÀíϵͳ£¬×î³õÉè¼ÆÊÇΪÁ˽â¾öHadoopÖÐMapReduce¼ÆËã¿ò¼ÜÖеÄ×ÊÔ´¹ÜÀíÎÊÌ⣬µ«ÊÇÏÖÔÚËüÒѾÊÇÒ»¸ö¸ü¼ÓͨÓõÄ×ÊÔ´¹ÜÀíϵͳ£¬¿ÉÒÔ°ÑMapReduce¼ÆËã¿ò¼Ü×÷Ϊһ¸öÓ¦ÓóÌÐòÔËÐÐÔÚYARNϵͳ֮ÉÏ£¬Í¨¹ýYARNÀ´¹ÜÀí×ÊÔ´¡£Èç¹ûÄãµÄÓ¦ÓóÌÐòÒ²ÐèÒª½èÖúYARNµÄ×ÊÔ´¹ÜÀí¹¦ÄÜ£¬ÄãÒ²¿ÉÒÔʵÏÖYARNÌṩµÄ±à³ÌAPI£¬½«ÄãµÄÓ¦ÓóÌÐòÔËÐÐÓÚYARNÖ®ÉÏ£¬½«×ÊÔ´µÄ·ÖÅäÓë»ØÊÕͳһ½»¸øYARNÈ¥¹ÜÀí£¬¿ÉÒÔ´ó´ó¼ò»¯×ÊÔ´¹ÜÀí¹¦ÄܵĿª·¢¡£µ±Ç°£¬Ò²ÓкܶàÓ¦ÓóÌÐòÒѾ¿ÉÒÔ¹¹½¨ÓÚYARNÖ®ÉÏ£¬ÈçStorm¡¢SparkµÈ¼ÆËã¿ò¼Ü¡£
YARNÕûÌå¼Ü¹¹
YARNÊÇ»ùÓÚMaster/SlaveģʽµÄ·Ö²¼Ê½¼Ü¹¹£¬ÎÒÃÇÏÈ¿´Ò»Ï£¬YARNµÄ¼Ü¹¹Éè¼Æ£¬ÈçͼËùʾ£¨À´×Ô¹ÙÍøÎĵµ£©£º

ÉÏͼ£¬´ÓÂß¼É϶¨ÒåÁËYARNϵͳµÄºËÐÄ×é¼þºÍÖ÷Òª½»»¥Á÷³Ì£¬¸÷¸ö×é¼þ˵Ã÷ÈçÏ£º
YARN Client
YARN ClientÌá½»Applicationµ½RM£¬Ëü»áÊ×ÏÈ´´½¨Ò»¸öApplicationÉÏÏÂÎļþ¶ÔÏ󣬲¢ÉèÖÃAM±ØÐèµÄ×ÊÔ´ÇëÇóÐÅÏ¢£¬È»ºóÌá½»µ½RM¡£YARN
ClientÒ²¿ÉÒÔÓëRMͨÐÅ£¬»ñÈ¡µ½Ò»¸öÒѾÌá½»²¢ÔËÐеÄApplicationµÄ״̬ÐÅÏ¢µÈ£¬¾ßÌåÏê¼ûºóÃæApplicationClientProtocolÐÒéµÄ·ÖÎö˵Ã÷¡£
ResourceManager£¨RM£©
RMÊÇYARN¼¯ÈºµÄMaster£¬¸ºÔð¹ÜÀíÕû¸ö¼¯ÈºµÄ×ÊÔ´ºÍ×ÊÔ´·ÖÅä¡£RM×÷Ϊ¼¯Èº×ÊÔ´µÄ¹ÜÀíºÍµ÷¶ÈµÄ½ÇÉ«£¬Èç¹û´æÔÚµ¥µã¹ÊÕÏ£¬ÔòÕû¸ö¼¯ÈºµÄ×ÊÔ´¶¼ÎÞ·¨Ê¹Óá£ÔÚ2.4.0°æ±¾²ÅÐÂÔöÁËRM
HAµÄÌØÐÔ£¬ÕâÑù¾ÍÔö¼ÓÁËRMµÄ¿ÉÓÃÐÔ¡£
NodeManager£¨NM£©
NMÊÇYARN¼¯ÈºµÄSlave£¬ÊǼ¯ÈºÖÐʵ¼ÊÓµÓÐʵ¼Ê×ÊÔ´µÄ¹¤×÷½Úµã¡£ÎÒÃÇÌá½»JobÒԺ󣬻Ὣ×é³ÉJobµÄ¶à¸öTaskµ÷¶Èµ½¶ÔÓ¦µÄNMÉϽøÐÐÖ´ÐС£Hadoop¼¯ÈºÖУ¬ÎªÁË»ñµÃ·Ö²¼Ê½¼ÆËãÖеÄLocalityÌØÐÔ£¬»á½«DNºÍNMÔÚͬһ¸ö½ÚµãÉÏÔËÐУ¬ÕâÑù¶ÔÓ¦µÄHDFSÉϵÄBlock¿ÉÄܾÍÔÚ±¾µØ£¬¶øÎÞÐèÔÚÍøÂç¼ä½øÐÐÊý¾ÝµÄ´«Êä¡£
Container
ContainerÊÇYARN¼¯ÈºÖÐ×ÊÔ´µÄ³éÏ󣬽«NMÉϵÄ×ÊÔ´½øÐÐÁ¿»¯£¬¸ù¾ÝÐèÒª×é×°³ÉÒ»¸ö¸öContainer£¬È»ºó·þÎñÓÚÒÑÊÚȨ×ÊÔ´µÄ¼ÆËãÈÎÎñ¡£¼ÆËãÈÎÎñÔÚÍê³É¼ÆËãºó£¬ÏµÍ³»á»ØÊÕ×ÊÔ´£¬ÒÔ¹©ºóÐø¼ÆËãÈÎÎñÉêÇëʹÓá£Container°üº¬Á½ÖÖ×ÊÔ´£ºÄÚ´æºÍCPU£¬ºóÐøHadoop°æ±¾¿ÉÄÜ»áÔö¼ÓÓ²ÅÌ¡¢ÍøÂçµÈ×ÊÔ´¡£
ApplicationMaster£¨AM£©
AMÖ÷Òª¹ÜÀíºÍ¼à¿Ø²¿ÊðÔÚYARN¼¯ÈºÉϵÄApplication£¬ÒÔMapReduceΪÀý£¬MapReduce
ApplicationÊÇÒ»¸öÓÃÀ´´¦ÀíMapReduce¼ÆËãµÄ·þÎñ¿ò¼Ü³ÌÐò£¬ÎªÓû§±àдµÄMapReduce³ÌÐòÌṩÔËÐÐʱ֧³Ö¡£Í¨³£ÎÒÃÇÔÚ±àдµÄÒ»¸öMapReduce³ÌÐò¿ÉÄܰüº¬¶à¸öMap
Task»òReduce Task£¬¶ø¸÷¸öTaskµÄÔËÐйÜÀíÓë¼à¿Ø¶¼ÊÇÓÉÕâ¸öMapReduce ApplicationÀ´¸ºÔ𣬱ÈÈçÔËÐÐTaskµÄ×ÊÔ´ÉêÇ룬ÓÉAMÏòRMÉêÇ룻Æô¶¯/Í£Ö¹NMÉÏijTaskµÄ¶ÔÓ¦µÄContainer£¬ÓÉAMÏòNMÇëÇóÀ´Íê³É¡£
ÏÂÃæ£¬ÎÒÃÇ»ùÓÚHadoop 2.6.0µÄYARNÔ´Â룬À´Ì½ÌÖYARNÄÚ²¿ÊµÏÖÔÀí¡£
YARNÐÒé
YARNÊÇÒ»¸ö·Ö²¼Ê½×ÊÔ´¹ÜÀíϵͳ£¬Ëü°üº¬ÁË·Ö²¼µÄ¶à¸ö×é¼þ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÕâЩ×é¼þÖ®¼äÉè¼ÆµÄ½»»¥ÐÒéÀ´ËµÃ÷£¬ÈçͼËùʾ£º

ÏÂÃæÎÒÃÇÀ´Ïêϸ¿´¿´¸÷¸öÐÒéʵÏֵŦÄÜ£º
ApplicationClientProtocol£¨Client ->
RM£©

ResourceTracker£¨NM -> RM£©

ApplicationMasterProtocol£¨AM -> RM£©

ContainerManagementProtocol£¨AM -> NM£©

ResourceManagerAdministrationProtocol£¨RM Admin ->
RM£©

HAServiceProtocol£¨Active RM HA Framework Standby RM£©

YARN RPCʵÏÖ
1.X°æ±¾µÄHadoopʹÓÃĬÈÏʵÏÖµÄWritableÐÒé×÷ΪRPCÐÒ飬¶øÔÚ2.X°æ±¾£¬ÖØÐ´ÁËRPC¿ò¼Ü£¬¸Ä³ÉĬÈÏʹÓÃProtobufÐÒé×÷ΪHadoopµÄĬÈÏRPCͨÐÅÐÒé¡£
YARN RPCµÄʵÏÖ£¬ÈçÏÂÃæÀàͼËùʾ£º

ͨ¹ýÉÏͼ¿ÉÒÔ¿´³ö£¬RpcEngineÓÐÁ½¸öʵÏÖ£ºWritableRpcEngineºÍProtobufRpcEngine£¬Ä¬ÈÏʹÓÃProtobufRpcEngine£¬ÎÒÃÇ¿ÉÒÔÑ¡ÔñʹÓÃ1.XĬÈϵÄRPCͨÐÅÐÒ飬ÉõÖÁ¿ÉÒÔ×Ô¶¨ÒåʵÏÖ¡£
ResourceManagerÄÚ²¿ÔÀí
RMÊÇYARN·Ö²¼Ê½ÏµÍ³µÄÖ÷½Úµã£¬ResourceManager·þÎñ½ø³ÌÄÚ²¿Óкܶà×é¼þÌṩÆäËû·þÎñ£¬°üÀ¨¶ÔÍâRPC·þÎñ£¬ÒѾά»¤ÄÚ²¿Ò»Ð©¶ÔÏó״̬µÄ·þÎñµÈ£¬RMµÄÄÚ²¿½á¹¹ÈçͼËùʾ£º

ÉÏͼÖÐRMÄÚ²¿¸÷¸ö×é¼þ£¨Dispatcher/EventHandler/Service£©µÄ¹¦ÄÜ£¬¿ÉÒԲ鿴ԴÂë¡£
ÕâÀ˵һÏÂResourceScheduler×é¼þ£¬ËüÊÇRMÄÚ²¿×îÖØÒªµÄÒ»¸ö×é¼þ£¬ÓÃËüÀ´ÊµÏÖ×ÊÔ´µÄ·ÖÅäÓë»ØÊÕ£¬ËüÌṩÁËÒ»¶¨Ëã·¨£¬ÔÚÔËÐÐʱ¿ÉÒÔ¸ù¾ÝËã·¨ÌṩµÄ²ßÂÔÀ´¶Ô×ÊÔ´½øÐе÷¶È¡£YARNÄÚ²¿ÓÐ3ÖÖ×ÊÔ´µ÷¶È²ßÂÔµÄʵÏÖ£ºFifoScheduler¡¢FairScheduler¡¢CapacityScheduler£¬ÆäÖÐĬÈÏʵÏÖΪCapacityScheduler¡£CapacitySchedulerʵÏÖÁË×ÊÔ´¸ü¼ÓϸÁ£¶ÈµÄ·ÖÅ䣬¿ÉÒÔÉèÖö༶¶ÓÁУ¬Ã¿¸ö¶ÓÁж¼ÓÐÒ»¶¨µÄÈÝÁ¿£¬¼´¶Ô¶ÓÁÐÉèÖÃ×ÊÔ´ÉÏÏÞºÍÏÂÏÞ£¬È»ºó¶Ôÿһ¼¶±ð¶ÓÁзֱðÔÙ²ÉÓúÏÊʵĵ÷¶È²ßÂÔ£¨ÈçFIFO£©½øÐе÷¶È¡£
Èç¹ûÎÒÃÇÏëʵÏÖ×Ô¼ºµÄ×ÊÔ´µ÷¶È²ßÂÔ£¬¿ÉÒÔÖ±½ÓʵÏÖYARNµÄ×ÊÔ´µ÷¶È½Ó¿ÚResourceScheduler£¬È»ºóÐÞ¸Äyarn-site.xmlÖеÄÅäÖÃÏîyarn.resourcemanager.scheduler.class¼´¿É¡£
NodeManagerÄÚ²¿ÔÀí
NMÊÇYARNϵͳÖÐʵ¼Ê³ÖÓÐ×ÊÔ´µÄ´Ó½Úµã£¬Ò²ÊÇʵ¼ÊÓû§³ÌÐòÔËÐеÄËÞÖ÷½Úµã£¬ÄÚ²¿½á¹¹ÈçͼËùʾ£º

ÉÏͼÖÐNMÄÚ²¿¸÷¸ö×é¼þ£¨Dispatcher/EventHandler/Service£©µÄ¹¦ÄÜ£¬¿ÉÒԲ鿴ԴÂ룬²»ÔÙÀÛÊö¡£
ʼþ´¦Àí»úÖÆ
ʼþ´¦Àí¿ÉÒÔ·Ö³É2´óÀ࣬һÀàÊÇͬ²½´¦Àíʼþ£¬Ê¼þ´¦Àí¹ý³Ì»á×èÈûµ÷Óýø³Ì£¬Í¨³£ÕâÑùµÄʼþ´¦ÀíÂß¼·Ç³£¼òµ¥£¬²»»á³¤Ê±¼ä×èÈû£»ÁíÒ»Àà¾ÍÊÇÒì²½´¦Àí´¦Àíʼþ£¬Í¨³£ÔÚ½ÓÊÕµ½Ê¼þÒԺ󣬻áÓÐÒ»¸öÓÃÀ´ÅÉ·¢Ê¼þµÄDispatcher£¬½«Ê¼þ·¢µ½¶ÔÓ¦µÄʼþ¶ÓÁÐÖУ¬Õâ²ÉÓÃÉú²úÕß-Ïû·ÑÕßģʽ£¬Ïû·ÑÕßÕâ»á¼àÊÓ×ŶÓÁУ¬²¢´ÓÈ¡³öʼþ½øÐÐÒì²½´¦Àí¡£
YARNÖе½´¦¿ÉÒÔ¼ûµ½Ê¼þ´¦Àí£¬ÆäÖбȽÏÌØÊâÒ»µãµÄ¾ÍÊǽ«×´Ì¬»ú£¨StateMachine£©×÷Ϊһ¸öʼþ´¦ÀíÆ÷£¬´Ó¶øÍ¨¹ýʼþÀ´´¥·¢Ìض¨¶ÔÏó״̬µÄ±äǨ£¬Í¨¹ýÕâÖÖ·½Ê½À´¹ÜÀí¶ÔÏó״̬¡£ÎÒÃÇÏÈ¿´Ò»ÏÂYARNÖÐʼþ´¦ÀíµÄ»úÖÆ£¬ÒÔResourceManager¶ËΪÀý£¬ÈçÏÂͼËùʾ£º

²úÉúµÄʼþͨ¹ýDispatcher½øÐÐÅÉ·¢²¢½øÐд¦Àí£¬Èç¹ûEventHandler´¦ÀíÂß¼±È½Ï¼òµ¥£¬Ö±½Óͬ²½´¦Àí£¬·ñÔò¿ÉÄÜ»á²ÉÓÃÒì²½´¦ÀíµÄ·½Ê½¡£ÔÚEventHandler´¦ÀíµÄ¹ý³ÌÖУ¬»¹¿ÉÄܲúÉúеÄʼþEvent£¬È»ºóÔÙ´Îͨ¹ýRMµÄDispatcher½øÐÐÅÉ·¢£¬¶øºó´¦Àí¡£
״̬»ú
ÎÒÃÇÒÔRM¶Ë¹ÜÀíµÄRMAppImpl¶ÔÏóΪÀý£¬Ëü±íʾһ¸öApplicationÔËÐйý³ÌÖУ¬ÔÚRM¶ËµÄËùά»¤µÄApplicationµÄ״̬£¬¸Ã¶ÔÏó¶ÔÓ¦µÄËùÓÐ״̬¼°Æä×´Ì¬×ªÒÆÂ·¾¶£¬ÈçÏÂͼËùʾ£º

ÔÚÉÏͼÖÐÈç¹û¼ÓÉÏ´¥·¢×´Ì¬×ªÒƵÄʼþ¼°ÆäÀàÐÍ£¬¿ÉÄÜÕû¸öͼ»áÏԵúÜÂÒ£¬ËùÒÔÕâÀÎÒÏêϸ»ÁËÒ»¸ö·Öͼ£¬ÓÃÀ´ËµÃ÷£¬Ã¿Ò»¸ö״̬µÄ±ä»¯¶¼ÊÇÓÐÄÄÖÖÀàÐ͵Äʼþ´¥·¢µÄ£¬¸ù¾ÝÕâ¸öͼ£¬¿ÉÒÔ·½±ãµØÔĶÁÔ´Â룬ÈçÏÂͼËùʾ£º

NMLivelinessMonitorÔ´Âë·ÖÎöʵÀý
YARNÖ÷Òª²ÉÓÃÁËDispatcher+EventHandler+ServiceÕâÑùµÄ³éÏ󣬽«ËùÓеÄÄÚ²¿/Íⲿ×é¼þ²ÉÓÃÕâÖÖ»úÖÆÀ´ÊµÏÖ£¬ÓÉÓÚ´æÔںܶàµÄServiceºÍEventHandler£¬¶øÇÒÓеÄ×é¼þ¿ÉÄܼÈÊÇÒ»¸öService£¬Í¬Ê±»¹ÊÇÒ»¸öEventHandler£¬ËùÒÔÔÚÔĶÁ´úÂëµÄʱºò¿ÉÄÜ»á¸Ð¾õÃÔ㣬ÕâÀïÎÒ¸ø³öÁËÒ»¸öÔĶÁNMLivelinessMonitor·þÎñµÄʵÀý£¬½ö¹©ÏëÑо¿Ô´ÂëµÄÈ˲ο¼¡£
NMLivelinessMonitorÊÇResourceManager¶ËµÄÒ»¸ö¼à¿Ø·þÎñʵÏÖ£¬ËüÖ÷ÒªÊÇÓÃÀ´¼à¿Ø×¢²áµÄ½ÚµãµÄLiveliness״̬£¬ÕâÀïÊÇ¼à¿ØNodeManagerµÄ״̬¡£¸Ã·þÎñ»áÖÜÆÚÐԵؼì²éNodeManagerµÄÐÄÌøÐÅÏ¢À´È·±£×¢²áµ½ResourceManagerµÄNodeManagerµ±Ç°´¦ÓÚ»îԾ״̬£¬¿ÉÒÔÖ´ÐÐ×ÊÔ´·ÖÅäÒÔ¼°´¦Àí¼ÆËãÈÎÎñ£¬ÔÚNMLivelinessMonitorÀà¼Ì³ÐµÄ³éÏó·ºÐÍÀàAbstractLivelinessMonitorÖÐÓÐÒ»¸öMap£¬ÈçÏÂËùʾ£º
private Map<O, Long> running = new HashMap<O, Long>();
|
ÕâÀïÃæO±»Ìæ»»³ÉÁËNodeId£¬¶øÖµÀàÐÍLong±íʾʱ¼ä´Á£¬Ò²¾ÍÊDZí´ïÁËÒ»¸öNodeManagerÏòResourceManager×îºó·¢ËÍÐÄÌøÐÅϢʱ¼ä´Á£¬Í¨¹ý¼ì²ârunningÖеÄʱ¼ä´Á£»À´ÅжÏNodeManagerÊÇ·ñ¿ÉÒÔÕý³£Ê¹Óá£
ÔÚResourceManagerÖпÉÒÔ¿´µ½£¬NMLivelinessMonitorµÄʵÀýÊÇÆäÒ»¸ö³ÉÔ±£º
protected NMLivelinessMonitor nmLivelinessMonitor;
|
¿´Ò»ÏÂNMLivelinessMonitorÀàµÄʵÏÖ£¬Ëü¼Ì³Ð×Ô³éÏó·ºÐÍÀàAbstractLivelinessMonitor£¬¿´NMLivelinessMonitorÀàµÄÉùÃ÷£º
public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId>
|
ÔÚÀàʵÏÖÖУ¬ÓÐÒ»¸öÖØÐ´£¨@Override£©µÄprotectedµÄ·½·¨expire£¬ÈçÏÂËùʾ£º
@Override 2 protected void expire(NodeId id) { 3 dispatcher.handle( 4 new RMNodeEvent(id, RMNodeEventType.EXPIRE)); 5 }
|
ÎÒÃÇ¿ÉÒÔͨ¹ý¸ÃÀàNMLivelinessMonitor³éÏó»ùÀàÖп´µ½µ÷ÓÃexpire·½·¨µÄÂß¼£¬ÊÇÔÚÒ»¸öÄÚ²¿Ïß³ÌÀàPingCheckerÖУ¬´úÂëÈçÏÂËùʾ£º
private class PingChecker implements Runnable { 02 03 @Override 04 public void run() { 05 while (!stopped && !Thread.currentThread().isInterrupted()) { 06 synchronized (AbstractLivelinessMonitor.this) { 07 Iterator<Map.Entry<O, Long>> iterator = 08 running.entrySet().iterator(); 09 10 //avoid calculating current time everytime in loop 11 long currentTime = clock.getTime(); 12 13 while (iterator.hasNext()) { 14 Map.Entry<O, Long> entry = iterator.next(); 15 if (currentTime > entry.getValue() + expireInterval) { 16 iterator.remove(); 17 expire(entry.getKey()); // µ÷ÓóéÏó·½·¨expire£¬»áÔÚ×ÓÀàÖÐʵÏÖ 18 LOG.info("Expired:" + entry.getKey().toString() + 19 " Timed out after " + expireInterval/1000 + " secs"); 20 } 21 } 22 } 23 try { 24 Thread.sleep(monitorInterval); 25 } catch (InterruptedException e) { 26 LOG.info(getName() + " thread interrupted"); 27 break; 28 } 29 } 30 } 31 }
|
ÕâÀïÃæµÄ·ºÐÍOÔÚNMLivelinessMonitorÀàÖоÍÊÇNodeId£¬ËùÒÔ×î¹ØÐĵÄÂß¼¾ÍÊÇÇ°ÃæÌáµ½µÄNMLivelinessMonitorÖеÄexpire·½·¨µÄʵÏÖ¡£ÔÚexpire·½·¨ÖУ¬µ÷ÓÃÁËdispatcherµÄhandle·½·¨À´´¦Àí£¬ËùÒÔdispatcherÓ¦¸ÃÊÇÒ»¸öEventHandler¶ÔÏ󣬺óÃæÎÒÃǻῴµ½£¬ËüÆäʵÊÇͨ¹ýResourceManagerÖеÄdispatcher³ÉÔ±£¬Ò²¾ÍÊÇAsyncDispatcherÀ´»ñÈ¡µ½µÄ£¨AsyncDispatcherÄÚ²¿ÓÐÒ»¸ö×éºÏ¶ø³ÉµÄEventHandler£©¡£
ÏÂÃæ£¬ÎÒÃǽÓ×Å¿´NMLivelinessMonitorÊÇÈçºÎ´´½¨µÄ£¬ÔÚResourceManager.RMActiveServicesÀàµÄserviceInit()·½·¨ÖУ¬´úÂëÈçÏÂËùʾ£º
nmLivelinessMonitor = createNMLivelinessMonitor(); 2 addService(nmLivelinessMonitor);
|
¸ú×Ù´úÂë¼ÌÐø¿´createNMLivelinessMonitor·½·¨£¬ÈçÏÂËùʾ£º
private NMLivelinessMonitor createNMLivelinessMonitor() { 2 return new NMLivelinessMonitor(this.rmContext 3 .getDispatcher()); 4 }
|
ÉÏÃæÍ¨¹ýrmContextµÄgetDispatcher»ñÈ¡µ½Ò»¸öDispatcher¶ÔÏó£¬À´×÷ΪNMLivelinessMonitor¹¹Ôì·½·¨µÄ²ÎÊý£¬ÎÒÃÇÐèÒª¿´Ò»ÏÂÕâ¸öDispatcherÊÇÈçºÎ´´½¨µÄ£¬²é¿´ResourceManager.serviceInit·½·¨£¬´úÂëÈçÏÂËùʾ£º
rmDispatcher = setupDispatcher(); 2 addIfService(rmDispatcher); 3 rmContext.setDispatcher(rmDispatcher);
|
¼ÌÐø¸ú×Ù´úÂ룬setupDispatcher()·½·¨ÊµÏÖÈçÏÂËùʾ£º
private Dispatcher setupDispatcher() { 2 Dispatcher dispatcher = createDispatcher(); 3 dispatcher.register(RMFatalEventType.class, 4 new ResourceManager.RMFatalEventDispatcher()); 5 return dispatcher; 6 }
|
¼ÌÐø¿´createDispatcher()·½·¨´úÂëʵÏÖ£º
protected Dispatcher createDispatcher() { 2 return new AsyncDispatcher(); 3 }
|
¿ÉÒÔ¿´µ½£¬ÔÚÕâÀï´´½¨ÁËÒ»¸öAsyncDispatcher¶ÔÏóÔÚ´´½¨µÄNMLivelinessMonitorʵÀýÖаüº¬Ò»¸öAsyncDispatcherʵÀý¡£»Øµ½Ç°Ã棬ÎÒÃÇÐèÒªÖªµÀÕâ¸öAsyncDispatcherµ÷ÓÃgetEventHandler()·µ»ØµÄEventHandlerµÄ´¦ÀíÂß¼ÊÇÈçºÎµÄ£¬NMLivelinessMonitorµÄ´úÂëʵÏÖÈçÏÂËùʾ£º
public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId> { 02 03 private EventHandler dispatcher; 04 05 public NMLivelinessMonitor(Dispatcher d) { 06 super("NMLivelinessMonitor", new SystemClock()); 07 this.dispatcher = d.getEventHandler(); // µ÷ÓÃAsyncDispatcherµÄgetEventHandler()·½·¨»ñÈ¡EventHandler 08 } 09 10 public void serviceInit(Configuration conf) throws Exception { 11 int expireIntvl = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 12 YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); 13 setExpireInterval(expireIntvl); 14 setMonitorInterval(expireIntvl/3); 15 super.serviceInit(conf); 16 } 17 18 @Override 19 protected void expire(NodeId id) { 20 dispatcher.handle( 21 new RMNodeEvent(id, RMNodeEventType.EXPIRE)); 22 } 23 }
|
²é¿´AsyncDispatcherÀàµÄgetEventHandler()·½·¨£¬´úÂëÈçÏÂËùʾ£º
@Override 2 public EventHandler getEventHandler() { 3 if (handlerInstance == null) { 4 handlerInstance = new GenericEventHandler(); 5 } 6 return handlerInstance; 7 } |
¿É¼û£¬ÕâÀïÃæÎÞÂÛÊǵÚÒ»´Îµ÷Óû¹ÊÇÆäËû¶ÔÏóÒѾµ÷Óùý¸Ã·½·¨£¬ÕâÀïÃæ×îÖÕÖ»ÓÐÒ»¸öGenericEventHandlerʵÀý×÷ΪÕâ¸ödispatcherµÄÄÚ²¿EventHandlerʵÀý£¬ËùÒÔ¼ÌÐø¸ú×Ù´úÂ룬¿´GenericEventHandlerʵÏÖ£¬ÈçÏÂËùʾ£º
class GenericEventHandler implements EventHandler<Event> { 02 public void handle(Event event) { 03 if (blockNewEvents) { 04 return; 05 } 06 drained = false; 07 08 /* all this method does is enqueue all the events onto the queue */ 09 int qSize = eventQueue.size(); 10 if (qSize !=0 && qSize %1000 == 0) { 11 LOG.info("Size of event-queue is " + qSize); 12 } 13 int remCapacity = eventQueue.remainingCapacity(); 14 if (remCapacity < 1000) { 15 LOG.warn("Very low remaining capacity in the event-queue: " 16 + remCapacity); 17 } 18 try { 19 eventQueue.put(event); // ½«Event·ÅÈëµ½¶ÓÁÐeventQueueÖÐ 20 } catch (InterruptedException e) { 21 if (!stopped) { 22 LOG.warn("AsyncDispatcher thread interrupted", e); 23 } 24 throw new YarnRuntimeException(e); 25 } 26 }; 27 } |
½«´«Èëhandle·½·¨µÄEvent¶ª½øÁËeventQueue¶ÓÁУ¬Ò²¾ÍÊÇ˵GenericEventHandlerÊÇ»ùÓÚeventQueueµÄÒ»¸öÉú²úÕߣ¬ÄÇôÏû·ÑÕßÊÇAsyncDispatcherÄÚ²¿µÄÁíÒ»¸öỊ̈߳¬ÈçÏÂËùʾ£º
@Override 2 protected void serviceStart() throws Exception { 3 //start all the components 4 super.serviceStart(); 5 eventHandlingThread = new Thread(createThread()); // µ÷Óô´½¨Ïû·ÑeventQueue¶ÓÁÐÖÐʼþµÄÏß³Ì 6 eventHandlingThread.setName("AsyncDispatcher event handler"); 7 eventHandlingThread.start(); 8 }
|
²é¿´createThread()·½·¨£¬ÈçÏÂËùʾ£º
Runnable createThread() { 02 return new Runnable() { 03 @Override 04 public void run() { 05 while (!stopped && !Thread.currentThread().isInterrupted()) { 06 drained = eventQueue.isEmpty(); 07 // blockNewEvents is only set when dispatcher is draining to stop, 08 // adding this check is to avoid the overhead of acquiring the lock 09 // and calling notify every time in the normal run of the loop. 10 if (blockNewEvents) { 11 synchronized (waitForDrained) { 12 if (drained) { 13 waitForDrained.notify(); 14 } 15 } 16 } 17 Event event; 18 try { 19 event = eventQueue.take(); // ´Ó¶ÓÁÐÈ¡³öʼþEvent 20 } catch(InterruptedException ie) { 21 if (!stopped) { 22 LOG.warn("AsyncDispatcher thread interrupted", ie); 23 } 24 return; 25 } 26 if (event != null) { 27 dispatch(event); // ·Ö·¢´¦Àí¸ÃÓÐЧʼþEvent 28 } 29 } 30 } 31 }; 32 }
|
¿ÉÒÔ¿´µ½£¬´ÓeventQueue¶ÓÁÐÖÐÈ¡³öEvent£¬È»ºóµ÷ÓÃdispatch(event);À´´¦Àíʼþ£¬¿´dispatch(event)·½·¨£¬ÈçÏÂËùʾ£º
@SuppressWarnings("unchecked") 02 protected void dispatch(Event event) { 03 //all events go thru this loop 04 if (LOG.isDebugEnabled()) { 05 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 06 + event.toString()); 07 } 08 09 Class<? extends Enum> type = event.getType().getDeclaringClass(); 10 11 try{ 12 EventHandler handler = eventDispatchers.get(type);
// ͨ¹ýevent»ñÈ¡µ½Ê¼þÀàÐÍ£¬ÔÙ¸ù¾ÝʼþÀàÐÍ»ñÈ¡µ½ÒѾע²áµÄEventHandler 13 if(handler != null) { 14 handler.handle(event); // ʹÓöÔÓ¦µÄEventHandler´¦Àíʼþevent 15 } else { 16 throw new Exception("No handler for registered for " + type); 17 } 18 } catch (Throwable t) { 19 //TODO Maybe log the state of the queue 20 LOG.fatal("Error in dispatcher thread", t); 21 // If serviceStop is called, we should exit this thread gracefully. 22 if (exitOnDispatchException 23 && (ShutdownHookManager.get().isShutdownInProgress()) == false 24 && stopped == false) { 25 LOG.info("Exiting, bbye.."); 26 System.exit(-1); 27 } 28 } 29 }
|
¿ÉÒÔ¿´µ½£¬¸ù¾ÝÒѾע²áµÄMap<Class, EventHandler> eventDispatchers±í£¬Ñ¡Ôñ¶ÔÓ¦µÄEventHandlerÀ´Ö´ÐÐʵ¼ÊµÄʼþ´¦ÀíÂß¼¡£ÕâÀÔÙ¿´¿´Õâ¸öEventHandlerÊÇÔÚÄÄÀïסµÄ¡£Ç°ÃæÒѾ¿´µ½£¬NMLivelinessMonitorÀàµÄexpire·½·¨ÖУ¬´«ÈëµÄÊÇnew
RMNodeEvent(id, RMNodeEventType.EXPIRE)£¬ÎÒÃÇÔٲ鿴ResourceManager.RMActiveServices.serviceInit()·½·¨£º
// Register event handler for RmNodes 2 rmDispatcher.register( 3 RMNodeEventType.class, new NodeEventDispatcher(rmContext));
// ×¢²á£ºÊ¼þÀàÐÍRMNodeEventType£¬EventHandlerʵÏÖÀàNodeEventDispatcher |
¿É¼ûRMNodeEventTypeÀàÐ͵ÄʼþÊÇʹÓÃResourceManager.NodeEventDispatcherÕâ¸öEventHandlerÀ´´¦ÀíµÄ£¬Í¬Ê±ËüÒ²ÊÇÒ»¸öDispatcher£¬ÏÖÔÚÔÙ¿´NodeEventDispatcherµÄʵÏÖ£º
@Private 02 public static final class NodeEventDispatcher implements 03 EventHandler<RMNodeEvent> { 04 05 private final RMContext rmContext; 06 07 public NodeEventDispatcher(RMContext rmContext) { 08 this.rmContext = rmContext; 09 } 10 11 @Override 12 public void handle(RMNodeEvent event) { 13 NodeId nodeId = event.getNodeId(); 14 RMNode node = this.rmContext.getRMNodes().get(nodeId);
// µ÷ÓÃgetRMNodes()»ñÈ¡µ½Ò»¸öConcurrentMap<NodeId, RMNode>£¬
Ëüά»¤Ã¿¸öNodeIdµÄ״̬£¨RMNodeÊÇÒ»¸ö״̬»ú¶ÔÏó£© 15 if (node != null) { 16 try { 17 ((EventHandler<RMNodeEvent>) node).handle(event);
// RMNodeµÄʵÏÖΪRMNodeImpl£¬ËüÒ²ÊÇÒ»¸öEventHandler 18 } catch (Throwable t) { 19 LOG.error("Error in handling event type " + event.getType() 20 + " for node " + nodeId, t); 21 } 22 } 23 } 24 }
|
Õâ¸öÀïÃæ»¹Ã»ÓÐÕæÕýµØÈ¥´¦Àí£¬¶øÊÇ»ùÓÚRMNode״̬»ú¶ÔÏóÀ´½øÐÐ×ªÒÆ´¦Àí£¬ËùÒÔÎÒÃǼÌÐø¿´RMNodeµÄʵÏÖRMNodeImpl£¬ÒòÎªÇ°ÃæÊ¼þÀàÐÍRMNodeEventType.EXPIRE£¬ÎÒÃÇ¿´×´Ì¬»ú´´½¨Ê±¶Ô¸ÃʼþÀàÐ͵Ä×ªÒÆ¶¯×÷ÊÇÈçºÎ×¢²áµÄ£º
private static final StateMachineFactory<RMNodeImpl, 02 NodeState, 03 RMNodeEventType, 04 RMNodeEvent> stateMachineFactory 05 = new StateMachineFactory<RMNodeImpl, 06 NodeState, 07 RMNodeEventType, 08 RMNodeEvent>(NodeState.NEW) 09 ... 10 .addTransition(NodeState.RUNNING, NodeState.LOST, 11 RMNodeEventType.EXPIRE, 12 new DeactivateNodeTransition(NodeState.LOST)) 13 ... 14 .addTransition(NodeState.UNHEALTHY, NodeState.LOST, 15 RMNodeEventType.EXPIRE, 16 new DeactivateNodeTransition(NodeState.LOST))
|
ÔÚResourceManager¶Ëά»¤µÄNodeManagerµÄÐÅϢʹÓÃRMNodeImplÀ´±íʾ£¨ÔÚÄÚ´æÖб£´æConcurrentMap£©£¬ËùÒÔµ±Ç°Èç¹ûexpire·½·¨±»µ÷Óã¬RMNodeImpl»á¸ù¾Ý״̬»ú¶ÔÏóÖÐÒѾע²áµÄǰÖÃ×ªÒÆ×´Ì¬£¨pre-transition
state£©¡¢ºóÖÃ×ªÒÆ×´Ì¬£¨post-transition state£©¡¢Ê¼þÀàÐÍ£¨event type£©¡¢×ªÒÆHook³ÌÐò£¬À´¶Ôʼþ½øÐд¦Àí£¬²¢Ê¹µ±Ç°RMNodeImplµÄ״̬ÓÉǰÖÃ×ªÒÆ×´Ì¬¸üÐÂΪºóÖÃ×ªÒÆ×´Ì¬¡£
¶ÔÓÚÉÏÃæ´úÂ룬Èç¹ûµ±Ç°RMNodeImpl״̬ÊÇNodeState.RUNNING£¬Ê¼þΪRMNodeEventType.EXPIREÀàÐÍ£¬Ôò»áµ÷ÓÃHook³ÌÐòʵÏÖDeactivateNodeTransition£¬×´Ì¬¸üÐÂΪNodeState.LOST£»Èç¹ûµ±Ç°RMNodeImpl״̬ÊÇNodeState.UNHEALTHY£¬Ê¼þΪRMNodeEventType.EXPIREÀàÐÍ£¬Ôò»áµ÷ÓÃHook³ÌÐòʵÏÖDeactivateNodeTransition£¬×´Ì¬¸üÐÂΪNodeState.LOST¡£¾ßÌ嵨£¬Ã¿¸öTransitionµÄ´¦ÀíÂß¼ÈçºÎ£¬¿ÉÒԲ鿴¶ÔÓ¦µÄTransitionʵÏÖ´úÂë¡£
|