Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
FlinkÖ®ÈÝ´í»úÖÆ
 
×÷Õߣºherokang
  2321  次浏览      32
 2021-10-18
 
±à¼­ÍƼö:
±¾ÎÄÖ÷Òª½éÉÜCheckpointµÄʵÏÖËã·¨£¬checkpoint¶ÔÐÔÄܵÄÓ°Ï죬ÒÔ¼°ÈçºÎʹÓÃcheckpointµÈ£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×ÔÓÚCSDN£¬ÓÉAlice±à¼­¡¢ÍƼö¡£

Ò»¡¢Checkpoint£¬Ò»ÖÂÐÔ¼ì²éµã

flink¹ÊÕϻָ´»úÖÆµÄºËÐľÍÊÇcheckpoint

ÓÐ״̬µÄÁ÷Ó¦ÓõÄÒ»ÖÂÐÔ¼ì²éµã£¬Æäʵ¾ÍÊÇËùÓÐÈÎÎñµÄ״̬£¬ÔÚij¸öʱ¼äµãµÄÒ»·Ý¿ìÕÕ£¬Õâ¸öʱ¼äµãÊÇÖ¸ËùÓÐÈÎÎñ¶¼Ç¡ºÃ´¦ÀíÍêÒ»¸öÏàͬµÄÊäÈëÊý¾ÝµÄʱºò

ÈçÉÏͼËùʾ£º´ËÓ¦ÓÃÓÐÒ»¸ösource task£¬Ïû·ÑÒ»¸öµÝÔöÊýµÄÁ÷£¬Èç1£¬2£¬3µÈµÈ¡£Á÷ÖеÄÊý¾Ý±»·ÖÇøµ½Ò»¸öÆæÊýÁ÷£¬Ò»¸öżÊýÁ÷¡£ÔÚÒ»¸ösum operatorÖУ¬ÓÐÁ½¸ötask£¬·Ö±ðÓÃÓÚÀÛ¼ÓÆæÊýÓëżÊý¡£Source task ´æ´¢µ±Ç°ÊäÈëÁ÷µÄÆ«ÒÆÁ¿×÷Ϊstate¡£Sum task ½«µ±Ç°µÄÀÛ¼ÓºÍ×÷Ϊstate²¢´æ´¢¡£Í¼ÖУ¬ÔÚÊäÈëÆ«ÒÆÁ¿Îª5ʱ£¬Flink×öÁËÒ»¸ö¼ì²éµã£¬´ËʱÁ½¸ötaskµÄÀۼӺͷֱðΪ6ºÍ9¡£Èç¹ûÈÎÎñ´¦Àíµ½6/7ʱ·¢Éú¹ÊÕÏ£¬ÔòÖØÆôÈÎÎñºó¿ÉÒÔ´ÓcheckponintÖÐÄõ½5ʱËùÓûÈÎÎñµÄ״̬£¬´Ó5¿ªÊ¼ÖØÐ¼ÆËã

FlinkµÄExactly-onceÐèÒª´Ó×î½üµÄÒ»·Ý¿ìÕÕ¿ªÊ¼ÖØ·ÅÊý¾Ý, Òò´ËÕâÒ²ºÍÊý¾ÝÔ´µÄÄÜÁ¦ÓйØ, ²»ÊÇËùÓеÄÊý¾ÝÔ´¶¼¿ÉÒÔÌṩExactly-onceÓïÒåµÄ. ÒÔÏÂÊÇapache¹ÙÍøÁгöµÄÊý¾ÝÔ´ºÍExactly-onceÓïÒå±£ÕÏÄÜÁ¦Áбí.

¶þ¡¢CheckpointµÄʵÏÖËã·¨

»ùÓÚChandy-LamportËã·¨µÄ·Ö²¼Ê½¿ìÕÕÔ­Àí

½«¼ì²éµãµÄ±£´æºÍÊý¾Ý´¦Àí·Ö¿ª£¬²»ÔÝÍ£Õû¸öÓ¦ÓÃ

FlinkµÄ¼ì²éµãË㷨ʹÓÃÁËÒ»¸öÌØÊâµÄrecordÀàÐÍ£¬³ÆÎªÒ»¸ö¼ì²éµã·Ö½ç£¨checkpoint barrier£©¡£ÀàËÆÓÚˮӡ£¬¼ì²éµãbarriersÓÉsource operator×¢Èëµ½³£¹æµÄÁ÷¼Ç¼ÖУ¬²¢ÇÒÎÞ·¨±»ÆäËûrecords ¸Ï³¬¡£Ã¿¸ö¼ì²éµãbarrier»áЯ´øÒ»¸ö¼ì²éµãID£¬ÓÃÓÚ±æ±ðËüÊôÓÚÄĸö¼ì²éµã£¬²¢ÇÒ½«Ò»¸öÁ÷ÔÚÂß¼­ÉÏ·Ö³ÉÁ½²¿·Ö¡£ÔÚÒ»¸öbarrier֮ǰ£¬¶ÔstateµÄËùÓÐÐ޸쬰üº¬ÓÚ´ËbarrierµÄ¼ì²éµã¡£ÈôÊÇÔÚÒ»¸öbarrierÖ®ºó¶ÔstateµÄËùÓÐÐ޸ģ¬Ôò°üº¬ÓÚÏÂÒ»¸ö¼ì²éµã¡£

ÎÒÃÇÒÀÈ»ÓÃ֮ǰµÄÆæÅ¼ÊýsumÀý×ÓÀ´ËµÃ÷£º

JobManager Ïòÿ¸ösource task·¢ËÍÒ»Ìõ°üº¬Ò»¸öÐÂcheckpointIDµÄÏûÏ¢£¬ÒÔ³õʼ»¯Ò»¸ö¼ì²éµã£¬ÈçÏÂͼ£º

µ±Ò»¸ösource task ÊÕµ½ÕâÌõcheckpointÏûϢʱ£¬Ëü»áÍ£Ö¹ÊÍ·ÅÊý¾Ý£¬½«offset±£´æµ½checkpoint²¢Í¨ÖªJobManager£¬¹ã²¥¼ì²éµãbarrier¸øÏÂÓεÄËùÓÐÈÎÎñ ¡£Ôڹ㲥ÏûÏ¢·¢³öºó£¬source¼ÌÐøËüµÄ³£¹æ²Ù×÷¡£barrierÔò±»×¢ÈëÌí¼Óµ½ËüºóÐøµÄÊä³öÁ÷¡£

¼ì²éµãbarrier±»¹ã²¥µ½ËùÓÐÏàÁ¬µÄ²¢ÐÐtasksÖС£µ±Ò»¸ötaskÊÕµ½Ò»¸öмì²éµãµÄbarrierʱ£¬Ëü»áµÈ´ýbarriers´ÓËüËùÓеÄÊäÈë·ÖÇøµ½´ï£¨Í¼Öеȴý»ÆÉ«ºÍÀ¶É«µÄÈý½Ç2¶¼µ½´ï£©¡£ÔÚËüµÈ´ýʱ£¬ÐÂÀ´µÄÊý¾ÝÀ¶É«µÄ4²»»á±»Á¢¼´´¦Àí£¬¶øÊDZ»·ÅÈ뻺´æ¡£

Ò»µ©Ò»¸ötask´ÓËüËùÓÐÊäÈë·ÖÇøÖУ¬ÊÕµ½ÁËÈ«²¿µÄbarriers¡£Ëü¿ªÊ¼ÔÚstate backend³õʼ»¯¼ì²éµã£¬²¢¹ã²¥¼ì²éµãbarrierµ½ËüËùÓеÄÏÂÓÎtasks£¬ÈçÏÂͼ£º8£¬8±£´æµ½checkpoint

ÔÚËùÓмì²éµãbarriersÒѾ­±»Êͷźó£¬task¿ªÊ¼´¦Àí±»»º´æµÄ¼Ç¼¡£ÔÚËùÓб»»º´æµÄ¼Ç¼±»Êͷźó£¬task ¼ÌÐø´¦ÀíËüµÄÊäÈëÁ÷¡£ÏÂͼÏÔʾÁËÓ¦ÓÃÔÚÕâ¸öʱ¼äµãµÄÔËÐÐ×´¿ö£º

×îÖÕ£¬¼ì²éµãbarriers µ½´ïÒ»¸ösink task¡£µ±Ò»¸ösink task ÊÕµ½Ò»¸öbarrierʱ£¬Ëü»á×öÒ»¸öbarrier µ÷Õû£¨alignment£©£¬¸øËü×Ô¼ºµÄ״̬×ö¼ì²éµã£¬²¢ÏòJobManagerÈ·ÈÏ£¨acknowledge£©ËüÒÑÊÕµ½barrier¡£JobManagerÔÚÊÕµ½Ò»¸öapplicationµÄËùÓÐtask·¢Ë͵Ächeckpoint acknowledgeºó£¬Ëü»á¼Ç¼£º´ËapplicationµÄ¼ì²éµãÍê³É¡£ÏÂͼÏÔʾÁ˼ì²éµãËã·¨µÄ×îºóÒ»²½£¬Íê³ÉµÄ¼ì²éµã¿ÉÒÔÓÃÓÚ´Ó¹ÊÕÏÖлָ´Ò»¸öapplication¡£

Èý¡¢checkpoint¶ÔÐÔÄܵÄÓ°Ïì

FlinkµÄ¼ì²éµãËã·¨¿ÉÒÔÔÚ²»Í£Ö¹Õû¸öapplicationµÄÇé¿öÏ£¬´ÓÁ÷Ó¦ÓÃÖÐÉú³ÉÒ»ÖÂÐÔ·Ö²¼Ê½µÄ¼ì²éµã¡£È»¶ø£¬Ëü»áÔö¼ÓapplicationµÄ´¦ÀíÑÓʱ£¨processing latency£©¡£Flink ʵÏÖÁËÇá΢µ÷Õû£¬ÒÔÔÚÄ³Ð©ÌØ¶¨Ìõ¼þÏ»º½âÐÔÄÜÓ°Ïì¡£

ÔÚÒ»¸ötask¶ÔËüµÄ״̬×ö¼ì²éµãʱ£¬Ëü»á×èÈû£¬²¢»º´æËüµÄÊäÈë¡£ÒòΪstate¿ÉÒÔ±äµÄºÜ´ó£¬²¢ÇÒ¼ì²éµãµÄ²Ù×÷ÐèҪͨ¹ýÍøÂçдÈëÊý¾Ýµ½Ò»¸öÔ¶¶Ë´æ´¢ÏµÍ³£¬ËùÒÔ×ö¼ì²éµãµÄ²Ù×÷¿ÉÄÜ»áºÜÈÝÒ׾ͻ¨·Ñ¼¸Ãëµ½¼¸·ÖÖÓ£¬Õâ¶ÔÓÚÑÓʱÃô¸ÐµÄapplicationÀ´Ëµ£¬ÑÓʱ¹ý³¤ÁË¡£ÔÚFlinkµÄÉè¼ÆÖУ¬×öÒ»¸ö¼ì²éµãÊÇÓÉstate backend¸ºÔðµÄ¡£Ò»¸ötaskµÄstateÈçºÎ¾«È·µÄ±»¸´ÖÆ£¬È¡¾öÓÚstate backendµÄʵÏÖ¡£ÀýÈ磬FileSystem state backendÓëRocksDB state backendÖ§³ÖÒì²½×ö¼ì²éµã¡£µ±Ò»¸ö¼ì²éµã±»´¥·¢Ê±£¬state backendÔÚ±¾µØ´´½¨Ò»¸ö¼ì²éµãµÄ¸±±¾¡£ÔÚ±¾µØ¸±±¾´´½¨Íê³Éºó£¬task¼ÌÐøËüµÄÕý³£´¦Àí¡£Ò»¸öºó¶ËÏ̻߳áÒì²½µØ¸´ÖƱ¾µØ¿ìÕÕµ½Ô¶¶Ë´æ´¢£¬²¢ÔÚËüÍê³É¼ì²éµãºóÌáÐÑtask¡£Òì²½¼ì²éµã¿ÉÒÔÏÔÖøµØ½µµÍÒ»¸ötask´ÓÔÝÍ£µ½¼ÌÐø´¦ÀíÊý¾Ý£¬ÕâÖмäµÄʱ¼ä¡£ÁíÍ⣬RocksDB state backendÒ²ÓÐÔöÁ¿¼ì²éµãµÄ¹¦ÄÜ£¬¿ÉÒÔ¼õÉÙÊý¾ÝµÄ´«ÊäÁ¿¡£

ÁíÒ»¸öÓÃÓÚ¼õÉÙ¼ì²éµãËã·¨¶Ô´¦ÀíÑÓʱӰÏìµÄ¼¼ÊõÊÇ£ºÎ¢µ÷barrierÅÅÁв½Öè¡£ÈôÊÇÒ»¸öÓ¦ÓÃÐèÒª·Ç³£¶ÌµÄÑÓʱ£¬²¢ÇÒ¿ÉÒÔÈÝÈÌat-least-once ״̬±£Ö¤¡£Flink¿ÉÒÔ±»ÅäÖÃΪÔÚbuffer alignmentʱ¶ÔËùÓе½´ïµÄ¼Ç¼×ö´¦Àí£¬¶ø²»Êǽ«ÕâЩ¼Ç¼ΪÒѾ­µ½´ïµÄbarrier»º´æÏÂÀ´¡£¶ÔÓÚÒ»¸ö¼ì²éµã£¬ÔÚËüËùÓеÄbarriers¶¼µ½´ïºó£¬operatorΪËüµÄ״̬×ö¼ì²éµã£¬ÏÖÔÚÕâÀï¿ÉÄÜÒ²»á°üÀ¨£º±¾Ó¦ÊôÓÚÏÂÒ»¸ö¼ì²éµãµÄrecords¶Ôstate ×öµÄÐ޸ġ£ÔÚ´íÎó·¢Éúʱ£¬ÕâЩrecords»á±»Ôٴδ¦Àí£¬Ò²¾ÍÊÇ˵£¬ÕâÀï¼ì²éµãÌṩµÄÊÇat-least-once Ò»ÖÂÐÔ±£Ö¤£¬¶ø²»ÊÇexcatly-once Ò»ÖÂÐÔ±£Ö¤¡£

ËÄ¡¢±£´æµã£¨Savepoints£©

Ô­ÀíÉÏ£¬±£´æµãÓë¼ì²éµãÓõÄÊÇÏàͬµÄËã·¨´´½¨µÄ£¬ËùÒÔ±£´æµãÆäʵ¾ÍÊÇ£º¼ì²éµã¼ÓÉÏһЩ¶îÍâµÄÔªÊý¾Ý¡£Flink²»»á×Ô¶¯×öÒ»¸ö±£´æµã£¬ËùÒÔÒ»¸öÓû§£¨»òÊÇÍⲿµ÷¶ÈÆ÷£©ÐèÒªÃ÷È·µØ´¥·¢´´½¨±£´æµã¡£FlinkÒ²²»»á×Ô¶¯ÇåÀí±£´æµã¡£

±£´æµã¿ÉÒÔ×öºÜ¶àÊÂÇ飬±ÈÈ綨ÆÚÊÖ¶¯±¸·Ý£¬¸üÐÂÓ¦ÓóÌÐò£¨¸Äbug£©£¬flink°æ±¾Éý¼¶£¬¼¯ÈºÇ¨ÒÆ£¬ÔÝÍ£ºÍÖØÆôÓ¦ÓõÈ

Î塢ʹÓÃcheckpoint

object CheckPointTest {
def main(args: Array[String]) {
//´´½¨Ö´Ðл·¾³
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//¿ªÆôcheckpointÿ·ÖÖÓcheckpointÒ»´Î
env.enableCheckpointing(60000)
//Ñ¡ÔñcheckpointµÄ״̬ºó¶Ë
env.setStateBackend(new FsStateBackend("hdfs:// namenode: 9000 / flink/ checkpoints"))
//ÉèÖÃÖØÆô²ßÂÔ£¬Ò²¿ÉÒÔÔÚÅäÖÃÎļþÅä,×î´óÈý´ÎÖØÆô£¬Ã¿´Î¼ä¸ô10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.seconds(10)))
//ÉèÖÃÖØÆô²ßÂÔ£¬Ò²¿ÉÒÔÔÚÅäÖÃÎļþÅä,10sÄÚÈý´ÎÖØÆô£¬Ã¿´Î¼ä¸ô1s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,
org.apache.flink.api.common.time.Time.seconds(10),
org.apache.flink.api.common.time.Time.seconds(1)))
//ĬÈÏEXACTLY_ONCE
env.getCheckpointConfig.setCheckpointingMode ( CheckpointingMode . EXACTLY_ONCE)
//ÉèÖÃcheckpoint³¬Ê±Ê±¼ä³¬Ê±ºó·ÅÆú±¾´Îcheckpoint
env.getCheckpointConfig.setCheckpointTimeout(600000)
//checkpoint³öÏÖÒ쳣ʱÊÇ·ñÖ÷¶¯°ÑÓ¦ÓóÌÐòjob failµô£¬
ĬÈÏÊÇ true ¾ÍÊÇ checkpointʧ°ÜÈËÎïÒ²»áʧ°Ü
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
//×î´ó²¢ÐÐcheckpointÈÎÎñÊý£¬checkpoint̫Ƶ·±»áÓ°ÏìÐÔÄÜ
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//Á½´ÎcheckpointµÄ×îСʱ¼ä¼ä¸ôms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000)
//ÊÇ·ñ¿ªÆôcheckpointÍⲿ³Ö¾Ã»¯£¬
ĬÈÏÈÎÎñʧ°Ü¼ì²éµã ±£´æµÄÊý¾Ý»á±»É¾³ý
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:
¼´Ê¹ÊÖ¶¯È¡ÏûÈÎÎñÒ²²»ÒªÉ¾³ý±£´æµã
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION£º
ÊÖ¶¯È¡ÏûÈÎÎñɾ³ý±£´æµã
env.getCheckpointConfig.enableExternalizedCheckpoints
( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
}
}

Áù¡¢state backend ״̬ºó¶Ë

flinkµÄ״̬ºó¶Ë¸ºÔðÁ½¼þÊ£¬state backend£¬¸ºÔð±¾µØ×´Ì¬¹ÜÀí£¬×´Ì¬µÄ´æ´¢·ÃÎʼ°Î¬»¤£»ÒÔ¼°½«¼ì²éµã£¨checkpoint£©×´Ì¬Ð´ÈëÔ¶³Ì´æ´¢

¿ÉÒÔͨ¹ýÒÔÏ´úÂëÉèÖã¨Ò»°ãÊÇÔÚ¹«Ë¾Flink¼¯Èº¶ËͳһÅäÖã©£º

//Ñ¡ÔñcheckpointµÄ״̬ºó¶Ë
env.setStateBackend(new FsStateBackend
("hdfs://namenode:9000/flink/checkpoints"))

״̬ºó¶ËÑ¡Ôñ£º

MemoryStateBackend£¬ÄÚ´æ¼°µÄ״̬ºó¶Ë£¬»á½«½¡¿Ø×´Ì¬´æ´¢ÔÚTaskManagerµÄjvm¶ÑÉÏ£¬¶ø½µcheckpoint´æ´¢ÔÚJobManagerµÄjvm¶ÑÉÏ

ÌØµã£º¿ìËÙ¡¢µÍÑÓʱ¡¢µ«²»Îȶ¨£¬Éú²ú²»ÓÃ

FsStateBackend£º±¾µØ×´Ì¬ºÍÉÏÒ»¸öÒ»Ñù£¬checkpoint´æ´¢ÔÚÔ¶³ÌµÄ³Ö¾Ã»¯ÎļþϵͳÉÏ

ÌØµã£¬¿ìËÙµÍÑÓʱ¸üºÃµÄÎȶ¨ÐÔ£¬²»Êdz¬´óÐÍÊý¾ÝÒ»°ãʹÓÃÕâÖÖ¼´¿É

RocksDBStateBackend£º½«ËùÓÐÊý¾ÝÐòÁл¯ºó´æÈë±¾µØµÄRocksDBÖд洢

¸ü¼ÓÖØÁ¿¼¶£¬²»»áÊÜ·þÎñÆ÷ÄÚ´æºÍgcÓ°Ïì

ÅäÖÃ״̬ºó¶Ë£º

µÚÒ»ÖÖ£ºµ¥ÈÎÎñµ÷Õû

Ð޸ĵ±Ç°ÈÎÎñ´úÂë

env.setStateBackend(new FsStateBackend(¡° hdfs://namenode:9000/flink/checkpoints¡±));

»òÕßnew MemoryStateBackend()

»òÕßnew RocksDBStateBackend(filebackend, true);¡¾ÐèÒªÌí¼ÓµÚÈý·½ÒÀÀµ¡¿

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.7.0</version>
</dependency>

µÚ¶þÖÖ£ºÈ«¾Öµ÷Õû

ÐÞ¸Äflink-conf.yaml

state.backend: filesystem

state.checkpoints.dir: hdfs: // namenode : 9000/flink/checkpoints

×¢Ò⣺state.backendµÄÖµ¿ÉÒÔÊÇÏÂÃæ¼¸ÖÖ£ºjobmanager (MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

Æß¡¢±£´æ¶à¸öCheckpoint

ĬÈÏÇé¿öÏ£¬Èç¹ûÉèÖÃÁËCheckpointÑ¡ÏÔòFlinkÖ»±£Áô×î½ü³É¹¦Éú³ÉµÄ1¸öCheckpoint£¬¶øµ±Flink³ÌÐòʧ°Üʱ£¬¿ÉÒÔ´Ó×î½üµÄÕâ¸öCheckpointÀ´½øÐлָ´¡£µ«ÊÇ£¬Èç¹ûÎÒÃÇÏ£Íû±£Áô¶à¸öCheckpoint£¬²¢Äܹ»¸ù¾Ýʵ¼ÊÐèҪѡÔñÆäÖÐÒ»¸ö½øÐлָ´£¬ÕâÑù»á¸ü¼ÓÁé»î£¬±ÈÈ磬ÎÒÃÇ·¢ÏÖ×î½ü4¸öСʱÊý¾Ý¼Ç¼´¦ÀíÓÐÎÊÌ⣬ϣÍû½«Õû¸ö״̬»¹Ô­µ½4Сʱ֮ǰ¡£

Flink¿ÉÒÔÖ§³Ö±£Áô¶à¸öCheckpoint£¬ÐèÒªÔÚFlinkµÄÅäÖÃÎļþconf/flink-conf.yamlÖУ¬Ìí¼ÓÈçÏÂÅäÖã¬Ö¸¶¨×î¶àÐèÒª±£´æCheckpointµÄ¸öÊý£º

state.checkpoints.num-retained: 20

ÔÚHDFSµÄÏàÓ¦Îļþ¼ÐÏÂÃæ»á²úÉú¶à¸öcheckpointÎļþ¡£

°Ë¡¢´ÓCheckpoint½øÐлָ´

Èç¹ûFlink³ÌÐòÒ쳣ʧ°Ü£¬»òÕß×î½üÒ»¶Îʱ¼äÄÚÊý¾Ý´¦Àí´íÎó£¬ÎÒÃÇ¿ÉÒÔ½«³ÌÐò´Óijһ¸öCheckpointµã£¬±ÈÈçchk-860½øÐлطţ¬Ö´ÐÐÈçÏÂÃüÁ

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3 / flink-checkpoints / 582e17d2cc343e6c56255d111bae0191 / chk-860/_metadata flink-app-jobs.jar

´ÓÉÏÃæÎÒÃÇ¿ÉÒÔ¿´µ½£¬Ç°ÃæFlink JobµÄIDΪ 582e17d2cc343e6c56255d111bae0191 £¬ËùÓÐµÄ CheckpointÎļþ¶¼ÔÚÒÔJob IDΪÃû³ÆµÄĿ¼ÀïÃæ£¬µ±JobÍ£µôºó£¬ÖØÐ´Óij¸öCheckpointµã£¨chk-860£©½øÐлָ´Ê±£¬ÖØÐÂÉú³ÉJob ID£¨ÕâÀïÊÇ11bbc5d9933e4ff7e25198a760e9792e£©£¬¶ø¶ÔÓ¦µÄCheckpoint±àºÅ»á´Ó¸Ã´ÎÔËÐлùÓڵıàºÅ¼ÌÐøÁ¬ÐøÉú³É£ºchk-861¡¢chk-862¡¢chk-863µÈµÈ¡£

¾Å¡¢Flink Savepoint

Savepoint»áÔÚFlink JobÖ®Íâ´æ´¢×Ô°üº¬£¨self-contained£©½á¹¹µÄCheckpoint£¬ËüʹÓà Flink µÄ Checkpointing»úÖÆÀ´´´½¨Ò»¸ö·ÇÔöÁ¿µÄSnapshot£¬ÀïÃæ°üº¬Streaming³ÌÐòµÄ״̬£¬²¢½«CheckpointµÄÊý¾Ý´æ´¢µ½Íⲿ´æ´¢ÏµÍ³ÖС£

Flink³ÌÐòÖаüº¬Á½ÖÖ״̬Êý¾Ý£¬Ò»ÖÖÊÇÓû§¶¨ÒåµÄ״̬£¨User-defined State£©£¬ËûÃÇÊÇ»ùÓÚFlinkµÄTransformationº¯ÊýÀ´´´½¨»òÕßÐ޸ĵõ½µÄ״̬Êý¾Ý£»ÁíÒ»ÖÖÊÇϵͳ״̬£¨System State£©£¬ËûÃÇÊÇÖ¸×÷ΪOperator¼ÆËãÒ»²¿·ÖµÄÊý¾ÝBufferµÈ״̬Êý¾Ý£¬±ÈÈçÔÚʹÓÃWindow Functionʱ£¬ÔÚWindowÄÚ²¿»º´æStreamingÊý¾Ý¼Ç¼¡£ÎªÁËÄܹ»ÔÚ´´½¨ Savepoint ¹ý³ÌÖУ¬Î¨Ò»Ê¶±ð¶ÔÓ¦µÄOperatorµÄ״̬Êý¾Ý£¬FlinkÌṩÁËAPIÀ´Îª³ÌÐòÖÐÿ¸öOperatorÉèÖÃID£¬ÕâÑù¿ÉÒÔÔÚºóÐø¸üÐÂ/Éý¼¶³ÌÐòµÄʱºò£¬¿ÉÒÔÔÚSavepointÊý¾ÝÖлùÓÚOperator IDÀ´Óë¶ÔÓ¦µÄ״̬ÐÅÏ¢½øÐÐÆ¥Å䣬´Ó¶øÊµÏÖ»Ö¸´¡£µ±È»£¬Èç¹ûÎÒÃDz»Ö¸¶¨Operator ID£¬FlinkÒ²»áÎÒÃÇ×Ô¶¯Éú³É¶ÔÓ¦µÄOperator״̬ID¡£

¶øÇÒ£¬Ç¿ÁÒ½¨ÒéÊÖ¶¯ÎªÃ¿¸öOperatorÉèÖÃID£¬¼´Ê¹Î´À´FlinkÓ¦ÓóÌÐò¿ÉÄÜ»á¸Ä¶¯ºÜ´ó£¬±ÈÈçÌæ»»Ô­À´µÄOperatorʵÏÖ¡¢Ôö¼ÓеÄOperator¡¢É¾³ýOperatorµÈµÈ£¬ÖÁÉÙÎÒÃÇÓпÉÄÜÓëSavepointÖд洢µÄOperator״̬¶ÔÓ¦ÉÏ¡£ÁíÍ⣬±£´æµÄSavepoint״̬Êý¾Ý£¬±Ï¾¹ÊÇ»ùÓÚµ±Ê±³ÌÐò¼°ÆäÄÚ´æÊý¾Ý½á¹¹Éú³ÉµÄ£¬ËùÒÔÈç¹ûδÀ´Flink³ÌÐò¸Ä¶¯±È½Ï´ó£¬ÓÈÆäÊǶÔÓ¦µÄÐèÒª²Ù×÷µÄÄÚ´æÊý¾Ý½á¹¹¶¼±ä»¯ÁË£¬¿ÉÄܸù±¾¾ÍÎÞ·¨´ÓÔ­À´¾ÉµÄSavepointÕýÈ·µØ»Ö¸´¡£

ÏÂÃæ£¬ÎÒÃÇÒÔFlink¹ÙÍøÎĵµÖиø¶¨µÄÀý×Ó£¬À´¿´ÏÂÈçºÎÉèÖÃOperator ID£¬´úÂëÈçÏÂËùʾ£º

DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID

Ê®¡¢´´½¨Savepoint

´´½¨Ò»¸öSavepoint£¬ÐèÒªÖ¸¶¨¶ÔÓ¦SavepointĿ¼£¬ÓÐÁ½ÖÖ·½Ê½À´Ö¸¶¨£º

Ò»ÖÖÊÇ£¬ÐèÒªÅäÖÃSavepointµÄĬÈÏ·¾¶£¬ÐèÒªÔÚFlinkµÄÅäÖÃÎļþ conf/flink-conf.yaml ÖУ¬Ìí¼ÓÈçÏÂÅäÖã¬ÉèÖÃSavepoint´æ´¢Ä¿Â¼£¬ÀýÈçÈçÏÂËùʾ£º

state.savepoints.dir: hdfs://namenode01.td.com /flink-1.5.3/ flink-savepoints

ÁíÒ»ÖÖÊÇ£¬ÔÚÊÖ¶¯Ö´ÐÐsavepointÃüÁîµÄʱºò£¬Ö¸¶¨Savepoint´æ´¢Ä¿Â¼£¬ÃüÁî¸ñʽÈçÏÂËùʾ£º

bin/flink savepoint :jobId [:targetDirectory]

ÀýÈ磬ÕýÔÚÔËÐеÄFlink Job¶ÔÓ¦µÄIDΪ 40dcc6d2ba90f13930abce295de8d038£¬Ê¹ÓÃĬÈÏ state.savepoints.dir ÅäÖÃÖ¸¶¨µÄSavepointĿ¼£¬Ö´ÐÐÈçÏÂÃüÁ

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

¿ÉÒÔ¿´µ½£¬ÔÚĿ¼hdfs://namenode01.td.com/flink-1.5.3 /flink-savepoints /savepoint-40dcc6-4790807da3b0 ÏÂÃæÉú³ÉÁËIDΪ40dcc6d2ba90f13930abce295de8d038µÄJobµÄSavepointÊý¾Ý¡£

ΪÕýÔÚÔËÐеÄFlink JobÖ¸¶¨Ò»¸öĿ¼´æ´¢SavepointÊý¾Ý£¬Ö´ÐÐÈçÏÂÃüÁ

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs:// namenode01.td.com / tmp / flink / savepoints

¿ÉÒÔ¿´µ½£¬ÔÚĿ¼ hdfs://namenode01.td.com/tmp/flink/savepoints/ savepoint-40dcc6-a90008f0f82fÏÂÃæÉú³ÉÁËIDΪ40dcc6d2ba90f13930abce295de8d038µÄJobµÄSavepointÊý¾Ý¡£

ʮһ¡¢´ÓSavepoint»Ö¸´

ÏÖÔÚ£¬ÎÒÃÇ¿ÉÒÔÍ£µôJob 40dcc6d2ba90f13930abce295de8d038£¬È»ºóͨ¹ýSavepointÃüÁîÀ´»Ö¸´JobÔËÐУ¬ÃüÁî¸ñʽÈçÏÂËùʾ£º

bin/flink run -s :savepointPath [:runArgs]

ÒÔÉÏÃæ±£´æµÄSavepointΪÀý£¬»Ö¸´JobÔËÐУ¬Ö´ÐÐÈçÏÂÃüÁ

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/ savepoint-40dcc6-a90008f0f82f flink-app-jobs .jar

¿ÉÒÔ¿´µ½£¬Æô¶¯Ò»¸öÐ嵀 Flink Job £¬IDΪ cdbae3af1b7441839e7c03bab0d0eefd ¡£

 

   
2321 ´Îä¯ÀÀ       32
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]
 
×îÐÂÎÄÕÂ
´óÊý¾Ýƽ̨ϵÄÊý¾ÝÖÎÀí
ÈçºÎÉè¼ÆÊµÊ±Êý¾Ýƽ̨£¨¼¼Êõƪ£©
´óÊý¾Ý×ʲú¹ÜÀí×ÜÌå¿ò¼Ü¸ÅÊö
Kafka¼Ü¹¹ºÍÔ­Àí
ELK¶àÖּܹ¹¼°ÓÅÁÓ
×îпγÌ
´óÊý¾Ýƽ̨´î½¨Óë¸ßÐÔÄܼÆËã
´óÊý¾Ýƽ̨¼Ü¹¹ÓëÓ¦ÓÃʵս
´óÊý¾ÝϵͳÔËά
´óÊý¾Ý·ÖÎöÓë¹ÜÀí
Python¼°Êý¾Ý·ÖÎö
³É¹¦°¸Àý
ijͨÐÅÉ豸ÆóÒµ PythonÊý¾Ý·ÖÎöÓëÍÚ¾ò
Ä³ÒøÐÐ È˹¤ÖÇÄÜ+Python+´óÊý¾Ý
±±¾© Python¼°Êý¾Ý·ÖÎö
ÉñÁúÆû³µ ´óÊý¾Ý¼¼Êõƽ̨-Hadoop
ÖйúµçÐÅ ´óÊý¾Ýʱ´úÓëÏÖ´úÆóÒµµÄÊý¾Ý»¯ÔËӪʵ¼ù