Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
FlinkµÄÈÝ´í»úÖÆ
 
×÷Õߣº´óÊý¾ÝÃæ±ÚÕß
 
  1870  次浏览      27
2021-7-8 
 
±à¼­ÍƼö:
±¾ÎÄÖ÷Òª½éÉÜÁËFlinkµÄÈÝ´í»úÖÆ×´Ì¬µÄÒ»ÖÂÐÔ¡¢CheckpointÔ­Àí¡¢SavepointÔ­Àí¡¢checkpointºÍsavepointµÄÇø±ð¡¢Kafka+Flink+Kafka ʵÏֶ˵½¶ËÑϸñÒ»´ÎµÈÏà¹ØÄÚÈÝ¡£
±¾ÎÄÀ´×Ôcsdn£¬ÓÉ»ðÁú¹ûÈí¼þAnna±à¼­¡¢ÍƼö¡£

Ò»¡¢×´Ì¬µÄÒ»ÖÂÐÔ

µ±ÔÚ·Ö²¼Ê½ÏµÍ³ÖÐÒýÈë״̬ʱ£¬×ÔȻҲÒýÈëÁËÒ»ÖÂÐÔÎÊÌâ¡£

Ò»ÖÂÐÔʵ¼ÊÉÏÊÇ"ÕýÈ·ÐÔ¼¶±ð"µÄÁíÒ»ÖÖ˵·¨£¬Ò²¾ÍÊÇ˵Ôڳɹ¦´¦Àí¹ÊÕϲ¢»Ö¸´Ö®ºóµÃµ½µÄ½á¹û£¬ÓëûÓз¢ÉúÈκιÊÕÏʱµÃµ½µÄ½á¹ûÏà±È£¬Ç°Õßµ½µ×ÓжàÕýÈ·£¿¾ÙÀýÀ´Ëµ£¬¼ÙÉèÒª¶Ô×î½üһСʱµÇ¼µÄÓû§¼ÆÊý¡£ÔÚϵͳ¾­Àú¹ÊÕÏÖ®ºó£¬¼ÆÊý½á¹ûÊǶàÉÙ£¿Èç¹ûÓÐÆ«²î£¬ÊÇÓЩµôµÄ¼ÆÊý»¹ÊÇÖØ¸´¼ÆÊý£¿

Ò»ÖÂÐÔ¼¶±ð

ÔÚÁ÷´¦ÀíÖУ¬Ò»ÖÂÐÔ¿ÉÒÔ·ÖΪ3¸ö¼¶±ð:

at-most-once(×î¶àÒ»´Î):

ÕâÆäʵÊÇûÓÐÕýÈ·ÐÔ±£ÕϵÄίÍñ˵·¨¡ª¡ª¹ÊÕÏ·¢ÉúÖ®ºó£¬¼ÆÊý½á¹û¿ÉÄܶªÊ§¡£

at-least-once(ÖÁÉÙÒ»´Î):

Õâ±íʾ¼ÆÊý½á¹û¿ÉÄÜ´óÓÚÕýÈ·Öµ£¬µ«¾ø²»»áСÓÚÕýÈ·Öµ¡£Ò²¾ÍÊÇ˵£¬¼ÆÊý³ÌÐòÔÚ·¢Éú¹ÊÕϺó¿ÉÄܶàË㣬µ«ÊǾø²»»áÉÙËã¡£

exactly-once(ÑϸñÒ»´Î):

ÕâÖ¸µÄÊÇϵͳ±£Ö¤ÔÚ·¢Éú¹ÊÕϺóµÃµ½µÄ¼ÆÊý½á¹ûÓëÕýÈ·ÖµÒ»ÖÂ.¼È²»¶àËãÒ²²»ÉÙËã

Ôø¾­£¬at-least-once·Ç³£Á÷ÐС£µÚÒ»´úÁ÷´¦ÀíÆ÷(ÈçStormºÍSamza)¸ÕÎÊÊÀʱֻ±£Ö¤at-least-once£¬Ô­ÒòÓжþ:

±£Ö¤exactly-onceµÄϵͳʵÏÖÆðÀ´¸ü¸´ÔÓ¡£ÕâÔÚ»ù´¡¼Ü¹¹²ã(¾ö¶¨Ê²Ã´´ú±íÕýÈ·£¬ÒÔ¼°exactly-onceµÄ·¶Î§ÊÇʲô)ºÍʵÏֲ㶼ºÜÓÐÌôÕ½ÐÔ

Á÷´¦ÀíϵͳµÄÔçÆÚÓû§Ô¸Òâ½ÓÊÜ¿ò¼ÜµÄ¾ÖÏÞÐÔ£¬²¢ÔÚÓ¦ÓòãÏë°ì·¨ÃÖ²¹(ÀýÈçʹӦÓóÌÐò¾ßÓÐÃݵÈÐÔ£¬»òÕßÓÃÅúÁ¿¼ÆËã²ãÔÙ×öÒ»±é¼ÆËã)¡£

×îÏȱ£Ö¤exactly-onceµÄϵͳ(Storm TridentºÍSpark Streaming)ÔÚÐÔÄܺͱíÏÖÁ¦ÕâÁ½¸ö·½Ã渶³öÁ˺ܴóµÄ´ú¼Û¡£ÎªÁ˱£Ö¤exactly-once£¬ÕâЩϵͳÎÞ·¨µ¥¶ÀµØ¶ÔÿÌõ¼Ç¼ÔËÓÃÓ¦ÓÃÂß¼­£¬¶øÊÇͬʱ´¦Àí¶àÌõ(Ò»Åú)¼Ç¼£¬±£Ö¤¶ÔÿһÅúµÄ´¦ÀíҪôȫ²¿³É¹¦£¬ÒªÃ´È«²¿Ê§°Ü¡£Õâ¾Íµ¼ÖÂÔڵõ½½á¹ûǰ£¬±ØÐëµÈ´ýÒ»Åú¼Ç¼´¦Àí½áÊø¡£Òò´Ë£¬Óû§¾­³£²»µÃ²»Ê¹ÓÃÁ½¸öÁ÷´¦Àí¿ò¼Ü(Ò»¸öÓÃÀ´±£Ö¤exactly-once£¬ÁíÒ»¸öÓÃÀ´¶Ôÿ¸öÔªËØ×öµÍÑÓ³Ù´¦Àí)£¬½á¹ûʹ»ù´¡ÉèÊ©¸ü¼Ó¸´ÔÓ¡£Ôø¾­£¬Óû§²»µÃ²»ÔÚ±£Ö¤exactly-onceÓë»ñµÃµÍÑÓ³ÙºÍЧÂÊÖ®¼äȨºâÀû±×¡£Flink±ÜÃâÁËÕâÖÖȨºâ¡£

FlinkµÄÒ»¸öÖØ´ó¼ÛÖµÔÚÓÚ£¬Ëü¼È±£Ö¤ÁËexactly-once£¬ÓÖ¾ßÓеÍÑӳٺ͸ßÍÌ͵Ĵ¦ÀíÄÜÁ¦¡£

´Ó¸ù±¾ÉÏ˵£¬Flinkͨ¹ýʹ×ÔÉíÂú×ãËùÓÐÐèÇóÀ´±ÜÃâȨºâ£¬ËüÊÇÒµ½çµÄÒ»´ÎÒâÒåÖØ´óµÄ¼¼Êõ·ÉÔ¾¡£

¶Ëµ½¶ËµÄ״̬һÖÂÐÔ

ĿǰÎÒÃÇ¿´µ½µÄÒ»ÖÂÐÔ±£Ö¤¶¼ÊÇÓÉÁ÷´¦ÀíÆ÷ʵÏֵģ¬Ò²¾ÍÊÇ˵¶¼ÊÇÔÚ Flink Á÷´¦ÀíÆ÷ÄÚ²¿±£Ö¤µÄ£»¶øÔÚÕæÊµÓ¦ÓÃÖУ¬Á÷´¦ÀíÓ¦ÓóýÁËÁ÷´¦ÀíÆ÷ÒÔÍ⻹°üº¬ÁËÊý¾ÝÔ´£¨ÀýÈç Kafka£©ºÍÊä³öµ½³Ö¾Ã»¯ÏµÍ³¡£

¶Ëµ½¶ËµÄÒ»ÖÂÐÔ±£Ö¤£¬Òâζ׎á¹ûµÄÕýÈ·ÐԹᴩÁËÕû¸öÁ÷´¦ÀíÓ¦ÓõÄʼÖÕ£»Ã¿Ò»¸ö×é¼þ¶¼±£Ö¤ÁËËü×Ô¼ºµÄÒ»ÖÂÐÔ£¬Õû¸ö¶Ëµ½¶ËµÄÒ»ÖÂÐÔ¼¶±ðÈ¡¾öÓÚËùÓÐ×é¼þÖÐÒ»ÖÂÐÔ×îÈõµÄ×é¼þ¡£

¾ßÌå»®·ÖÈçÏÂ:

source¶Ë

ÐèÒªÍⲿԴ¿ÉÖØÉèÊý¾ÝµÄ¶ÁȡλÖÃ.ĿǰÎÒÃÇʹÓõÄKafka Source¾ßÓÐÕâÖÖÌØÐÔ: ¶ÁÈ¡Êý¾ÝµÄʱºò¿ÉÒÔÖ¸¶¨offset

flinkÄÚ²¿

ÒÀÀµcheckpoint»úÖÆ

sink¶Ë

ÐèÒª±£Ö¤´Ó¹ÊÕϻָ´Ê±£¬Êý¾Ý²»»áÖØ¸´Ð´ÈëÍⲿϵͳ. ÓÐ2ÖÖʵÏÖÐÎʽ:

a)Ãݵȣ¨Idempotent£©Ð´Èë

ËùνÃݵȲÙ×÷£¬ÊÇ˵һ¸ö²Ù×÷£¬¿ÉÒÔÖØ¸´Ö´Ðкܶà´Î£¬µ«Ö»µ¼ÖÂÒ»´Î½á¹û¸ü¸Ä£¬Ò²¾ÍÊÇ˵£¬ºóÃæÔÙÖØ¸´Ö´ÐоͲ»Æð×÷ÓÃÁË¡£

b)ÊÂÎñÐÔ£¨Transactional£©Ð´Èë

ÐèÒª¹¹½¨ÊÂÎñÀ´Ð´ÈëÍⲿϵͳ£¬¹¹½¨µÄÊÂÎñ¶ÔÓ¦×Å checkpoint£¬µÈµ½ checkpoint ÕæÕýÍê³ÉµÄʱºò£¬²Å°ÑËùÓжÔÓ¦µÄ½á¹ûдÈë sink ϵͳÖС£¶ÔÓÚÊÂÎñÐÔдÈ룬¾ßÌåÓÖÓÐÁ½ÖÖʵÏÖ·½Ê½£ºÔ¤Ð´ÈÕÖ¾£¨WAL£©ºÍÁ½½×¶ÎÌá½»£¨2PC£©

¶þ¡¢CheckpointÔ­Àí

Flink¾ßÌåÈçºÎ±£Ö¤exactly-onceÄØ? ËüʹÓÃÒ»ÖÖ±»³ÆÎª"¼ì²éµã"£¨checkpoint£©µÄÌØÐÔ£¬ÔÚ³öÏÖ¹ÊÕÏʱ½«ÏµÍ³ÖØÖûØÕýȷ״̬¡£ÏÂÃæÍ¨¹ý¼òµ¥µÄÀà±ÈÀ´½âÊͼì²éµãµÄ×÷Óá£

¼ÙÉèÄãºÍÁ½Î»ÅóÓÑÕýÔÚÊýÏîÁ´ÉÏÓжàÉÙ¿ÅÖé×Ó£¬ÈçÏÂͼËùʾ¡£ÄãÄóסÖé×Ó£¬±ßÊý±ß²¦£¬Ã¿²¦¹ýÒ»¿ÅÖé×Ӿ͸ø×ÜÊý¼ÓÒ»¡£ÄãµÄÅóÓÑÒ²ÕâÑùÊýËûÃÇÊÖÖеÄÖé×Ó¡£µ±Äã·ÖÉñÍü¼ÇÊýµ½ÄÄÀïʱ£¬Ôõô°ìÄØ? Èç¹ûÏîÁ´ÉÏÓкܶàÖé×Ó£¬ÄãÏÔÈ»²»Ïë´ÓÍ·ÔÙÊýÒ»±é£¬ÓÈÆäÊǵ±ÈýÈ˵ÄËٶȲ»Ò»ÑùÈ´ÓÖÊÔͼºÏ×÷µÄʱºò£¬¸üÊÇÈç´Ë(±ÈÈçÏë¼Ç¼ǰһ·ÖÖÓÈýÈËÒ»¹²ÊýÁ˶àÉÙ¿ÅÖé×Ó£¬»ØÏëÒ»ÏÂÒ»·ÖÖÓ¹ö¶¯´°¿Ú)¡£

ÓÚÊÇ£¬ÄãÏëÁËÒ»¸ö¸üºÃµÄ°ì·¨: ÔÚÏîÁ´ÉÏÿ¸ôÒ»¶Î¾ÍËÉËɵØÏµÉÏÒ»¸ùÓÐɫƤ½î£¬½«Öé×Ó·Ö¸ô¿ª; µ±Öé×Ó±»²¦¶¯µÄʱºò£¬Æ¤½îÒ²¿ÉÒÔ±»²¦¶¯; È»ºó£¬Äã°²ÅÅÒ»¸öÖúÊÖ£¬ÈÃËûÔÚÄãºÍÅóÓѲ¦µ½Æ¤½îʱ¼Ç¼×ÜÊý¡£ÓÃÕâÖÖ·½·¨£¬µ±ÓÐÈËÊý´íʱ£¬¾Í²»±Ø´ÓÍ·¿ªÊ¼Êý¡£Ïà·´£¬ÄãÏòÆäËûÈË·¢³ö´íÎó¾¯Ê¾£¬È»ºóÄãÃǶ¼´ÓÉÏÒ»¸ùƤ½î´¦¿ªÊ¼ÖØÊý£¬ÖúÊÖÔò»á¸æËßÿ¸öÈËÖØÊýʱµÄÆðʼÊýÖµ£¬ÀýÈçÔÚ·ÛɫƤ½î´¦µÄÊýÖµÊǶàÉÙ¡£

Flink¼ì²éµãµÄ×÷ÓþÍÀàËÆÓÚÆ¤½î±ê¼Ç¡£ÊýÖé×ÓÕâ¸öÀà±ÈµÄ¹Ø¼üµãÊÇ: ¶ÔÓÚÖ¸¶¨µÄƤ½î¶øÑÔ£¬Öé×ÓµÄÏà¶ÔλÖÃÊÇÈ·¶¨µÄ; ÕâÈÃÆ¤½î³ÉÎªÖØÐ¼ÆÊýµÄ²Î¿¼µã¡£×Ü״̬(Öé×ÓµÄ×ÜÊý)ÔÚÿ¿ÅÖé×Ó±»²¦¶¯Ö®ºó¸üÐÂÒ»´Î£¬ÖúÊÖÔò»á±£´æÓëÿ¸ùƤ½î¶ÔÓ¦µÄ¼ì²éµã״̬£¬Èçµ±Óöµ½·ÛɫƤ½îʱһ¹²ÊýÁ˶àÉÙÖé×Ó£¬µ±Óöµ½³ÈɫƤ½îʱÓÖÊǶàÉÙ¡£µ±ÎÊÌâ³öÏÖʱ£¬ÕâÖÖ·½·¨Ê¹µÃÖØÐ¼ÆÊý±äµÃ¼òµ¥¡£

FlinkµÄ¼ì²éµãËã·¨

checkpoint»úÖÆÊÇFlink¿É¿¿ÐԵĻùʯ£¬¿ÉÒÔ±£Ö¤Flink¼¯ÈºÔÚij¸öËã×ÓÒòΪijЩԭÒò(Èç Òì³£Í˳ö)³öÏÖ¹ÊÕÏʱ£¬Äܹ»½«Õû¸öÓ¦ÓÃÁ÷ͼµÄ״̬»Ö¸´µ½¹ÊÕÏ֮ǰµÄijһ״̬£¬±£Ö¤Ó¦ÓÃÁ÷ͼ״̬µÄÒ»ÖÂÐÔ.

¿ìÕÕµÄʵÏÖËã·¨:

a)¼òµ¥Ëã·¨¨CÔÝÍ£Ó¦ÓÃ, È»ºó¿ªÊ¼×ö¼ì²éµã, ÔÙÖØÐ»ָ´Ó¦ÓÃ

b)FlinkµÄ¸Ä½øCheckpointËã·¨. FlinkµÄcheckpoint»úÖÆÔ­ÀíÀ´×Ô"Chandy-Lamport algorithm"Ëã·¨(·Ö²¼Ê½¿ìÕÕËã)µÄÒ»ÖÖ±äÌå: Òì²½ barrier ¿ìÕÕ£¨asynchronous barrier snapshotting£©

ÿ¸öÐèÒªcheckpointµÄÓ¦ÓÃÔÚÆô¶¯Ê±£¬FlinkµÄJobManagerΪÆä´´½¨Ò»¸öCheckpointCoordinator£¬CheckpointCoordinatorȫȨ¸ºÔð±¾Ó¦ÓõĿìÕÕÖÆ×÷¡£

Àí½âBarrier

Á÷µÄbarrierÊÇFlinkµÄCheckpointÖеÄÒ»¸öºËÐĸÅÄî. ¶à¸öbarrier±»²åÈëµ½Êý¾ÝÁ÷ÖÐ, È»ºó×÷ΪÊý¾ÝÁ÷µÄÒ»²¿·ÖËæ×ÅÊý¾ÝÁ÷¶¯(ÓеãÀàËÆÓÚWatermark).ÕâЩbarrier²»»á¿çÔ½Á÷ÖеÄÊý¾Ý.

ÿ¸öbarrier»á°ÑÊý¾ÝÁ÷·Ö³ÉÁ½²¿·Ö: Ò»²¿·ÖÊý¾Ý½øÈ뵱ǰµÄ¿ìÕÕ , ÁíÒ»²¿·ÖÊý¾Ý½øÈëÏÂÒ»¸ö¿ìÕÕ . ÿ¸öbarrierЯ´ø×Å¿ìÕÕµÄid. barrier ²»»áÔÝÍ£Êý¾ÝµÄÁ÷¶¯, ËùÒԷdz£ÇáÁ¿¼¶. ÔÚÁ÷ÖÐ, ͬһʱ¼ä¿ÉÒÔÓÐÀ´Ô´ÓÚ¶à¸ö²»Í¬¿ìÕյĶà¸öbarrier, Õâ¸öÒâζ×Å¿ÉÒÔ²¢·¢µÄ³öÏÖ²»Í¬µÄ¿ìÕÕ.

FlinkµÄ¼ì²éµãÖÆ×÷¹ý³Ì

µÚÒ»²½: Checkpoint Coordinator ÏòËùÓÐ source ½Úµã trigger Checkpoint. È»ºóSource Task»áÔÚÊý¾ÝÁ÷Öа²²åCheckPoint barrier

µÚ¶þ²½: source ½ÚµãÏòÏÂÓι㲥 barrier£¬Õâ¸ö barrier ¾ÍÊÇʵÏÖ Chandy-Lamport ·Ö²¼Ê½¿ìÕÕËã·¨µÄºËÐÄ£¬ÏÂÓ뵀 task Ö»ÓÐÊÕµ½ËùÓÐ input µÄ barrier ²Å»áÖ´ÐÐÏàÓ¦µÄ Checkpoint

µÚÈý²½: µ± task Íê³É state ±¸·Ýºó£¬»á½«±¸·ÝÊý¾ÝµÄµØÖ·£¨state handle£©Í¨Öª¸ø Checkpoint coordinator¡£

µÚËIJ½: ÏÂÓ뵀 sink ½ÚµãÊÕ¼¯ÆëÉÏÓÎÁ½¸ö input µÄ barrier Ö®ºó£¬»áÖ´Ðб¾µØ¿ìÕÕ£¬ÕâÀïÌØµØÕ¹Ê¾ÁË RocksDB incremental Checkpoint µÄÁ÷³Ì£¬Ê×ÏÈ RocksDB »áÈ«Á¿Ë¢Êý¾Ýµ½´ÅÅÌÉÏ£¨ºìÉ«´óÈý½Ç±íʾ£©£¬È»ºó Flink ¿ò¼Ü»á´ÓÖÐÑ¡ÔñûÓÐÉÏ´«µÄÎļþ½øÐг־û¯±¸·Ý£¨×ÏɫСÈý½Ç£©¡£

µÚÎå²½: ͬÑùµÄ£¬sink ½ÚµãÔÚÍê³É×Ô¼ºµÄ Checkpoint Ö®ºó£¬»á½« state handle ·µ»ØÍ¨Öª Coordinator¡£

µÚÁù²½: ×îºó£¬µ± Checkpoint coordinator ÊÕ¼¯ÆëËùÓÐ task µÄ state handle£¬¾ÍÈÏΪÕâÒ»´ÎµÄ Checkpoint È«¾ÖÍê³ÉÁË£¬Ïò³Ö¾Ã»¯´æ´¢ÖÐÔÙ±¸·ÝÒ»¸ö Checkpoint meta Îļþ¡£

ÑϸñÒ»´ÎÓïÒå: barrier¶ÔÆë

ÔڶಢÐжÈÏÂ, Èç¹ûҪʵÏÖÑϸñÒ»´Î, ÔòÒªÖ´ÐÐbarrier¶ÔÆë.

µ± job graph ÖеÄÿ¸ö operator ½ÓÊÕµ½barriers ʱ£¬Ëü¾Í»á¼Ç¼ÏÂÆä״̬¡£ÓµÓÐÁ½¸öÊäÈëÁ÷µÄ Operators£¨ÀýÈç CoProcessFunction£©»áÖ´ÐÐbarrier ¶ÔÆë£¨barrier alignment£©ÒԱ㵱ǰ¿ìÕÕÄܹ»°üº¬Ïû·ÑÁ½¸öÊäÈëÁ÷ barrier ֮ǰ£¨µ«²»³¬¹ý£©µÄËùÓÐ events ¶ø²úÉúµÄ״̬¡£

1.µ±operatorÊÕµ½Êý×ÖÁ÷µÄbarrier nʱ, Ëü¾Í²»ÄÜ´¦Àí(µ«ÊÇ¿ÉÒÔ½ÓÊÕ)À´×Ô¸ÃÁ÷µÄÈκÎÊý¾Ý¼Ç¼£¬Ö±µ½Ëü´Ó×ÖĸÁ÷ËùÓÐÊäÈë½ÓÊÕµ½ barrier n Ϊֹ¡£·ñÔò£¬Ëü»á»ìºÏÊôÓÚ¿ìÕÕ n µÄ¼Ç¼ºÍÊôÓÚ¿ìÕÕ n + 1 µÄ¼Ç¼¡£

2.½ÓÊÕµ½ barrier n µÄÁ÷(Êý×ÖÁ÷)ÔÝʱ±»¸éÖᣴÓÕâЩÁ÷½ÓÊյļǼÈëÊäÈ뻺³åÇø, ²»»á±»´¦Àí¡£

3.ͼһÖÐµÄ Checkpoint barrier nÖ®ºóµÄÊý¾Ý 123Òѽᵽ´ïÁËËã×Ó, ´æÈëµ½ÊäÈ뻺³åÇøÃ»Óб»´¦Àí, Ö»Óеȵ½×ÖĸÁ÷µÄCheckpoint barrier nµ½´ïÖ®ºó²Å»á¿ªÊ¼´¦Àí.

4.Ò»µ©×îºóËùÓÐÊäÈëÁ÷¶¼½ÓÊÕµ½ barrier n£¬Operator ¾Í»á°Ñ»º³åÇøÖÐ pending µÄÊä³öÊý¾Ý·¢³öÈ¥£¬È»ºó°Ñ CheckPoint barrier n ½Ó×ÅÍùÏÂÓη¢ËÍ¡£ÕâÀﻹ»á¶Ô×ÔÉí½øÐпìÕÕ¡£

ÖÁÉÙÒ»´ÎÓïÒå: barrier²»¶ÔÆë

Ç°Ãæ½éÉÜÁËbarrier¶ÔÆë, Èç¹ûbarrier²»¶ÔÆë»áÔõôÑù? »áÖØ¸´Ïû·Ñ, ¾ÍÊÇÖÁÉÙÒ»´ÎÓïÒå.

¼ÙÉè²»¶ÔÆë, ÔÚ×ÖĸÁ÷µÄCheckpoint barrier nµ½´ïǰ, ÒѾ­´¦ÀíÁË1 2 3. µÈ×ÖĸÁ÷Checkpoint barrier nµ½´ïÖ®ºó, »á×öCheckpoint n. ¼ÙÉèÕâ¸öʱºò³ÌÐòÒì³£´íÎóÁË, ÔòÖØÐÂÆô¶¯µÄʱºò»áCheckpoint nÖ®ºóµÄÊý¾ÝÖØÐ¼ÆËã. 1 2 3 »á±»Ôٴα»¼ÆËã, ËùÒÔ123³öÏÖÁËÖØ¸´¼ÆËã.

Èý¡¢SavepointÔ­Àí

Flink »¹ÌṩÁË¿ÉÒÔ×Ô¶¨ÒåµÄ¾µÏñ±£´æ¹¦ÄÜ£¬¾ÍÊDZ£´æµã£¨savepoints£©

Ô­ÔòÉÏ£¬´´½¨±£´æµãʹÓõÄËã·¨Óë¼ì²éµãÍêÈ«Ïàͬ£¬Òò´Ë±£´æµã¿ÉÒÔÈÏΪ¾ÍÊǾßÓÐһЩ¶îÍâÔªÊý¾ÝµÄ¼ì²éµã

Flink²»»á×Ô¶¯´´½¨±£´æµã£¬Òò´ËÓû§£¨»òÍⲿµ÷¶È³ÌÐò£©±ØÐëÃ÷È·µØ´¥·¢´´½¨²Ù×÷

±£´æµãÊÇÒ»¸öÇ¿´óµÄ¹¦ÄÜ¡£³ýÁ˹ÊÕϻָ´Í⣬±£´æµã¿ÉÒÔÓÃÓÚ£ºÓмƻ®µÄÊÖ¶¯±¸·Ý£¬¸üÐÂÓ¦ÓóÌÐò£¬°æ±¾Ç¨ÒÆ£¬ÔÝÍ£ºÍÖØÆôÓ¦Ó㬵ȵÈ

ËÄ¡¢checkpointºÍsavepointµÄÇø±ð

Îå¡¢Kafka+Flink+Kafka ʵÏֶ˵½¶ËÑϸñÒ»´Î

ÎÒÃÇÖªµÀ£¬¶Ëµ½¶ËµÄ״̬һÖÂÐÔµÄʵÏÖ£¬ÐèҪÿһ¸ö×é¼þ¶¼ÊµÏÖ£¬¶ÔÓÚFlink + KafkaµÄÊý¾Ý¹ÜµÀϵͳ£¨Kafka½ø¡¢Kafka³ö£©¶øÑÔ£¬¸÷×é¼þÔõÑù±£Ö¤exactly-onceÓïÒåÄØ£¿

ÄÚ²¿ ¡ª¡ª ÀûÓÃcheckpoint»úÖÆ£¬°Ñ״̬´æÅÌ£¬·¢Éú¹ÊÕϵÄʱºò¿ÉÒÔ»Ö¸´£¬±£Ö¤²¿µÄ״̬һÖÂÐÔ

source ¡ª¡ª kafka consumer×÷Ϊsource£¬¿ÉÒÔ½«Æ«ÒÆÁ¿±£´æÏÂÀ´£¬Èç¹ûºóÐøÈÎÎñ³öÏÖÁ˹ÊÕÏ£¬»Ö¸´µÄʱºò¿ÉÒÔÓÉÁ¬½ÓÆ÷ÖØÖÃÆ«ÒÆÁ¿£¬ÖØÐÂÏû·ÑÊý¾Ý£¬±£Ö¤Ò»ÖÂÐÔ

sink ¡ª¡ª kafka producer×÷Ϊsink£¬²ÉÓÃÁ½½×¶ÎÌá½» sink£¬ÐèҪʵÏÖÒ»¸öTwoPhaseCommitSinkFunction

ÄÚ²¿µÄcheckpoint»úÖÆÎÒÃÇÒѾ­ÓÐÁËÁ˽⣬ÄÇsourceºÍsink¾ßÌåÓÖÊÇÔõÑùÔËÐеÄÄØ£¿½ÓÏÂÀ´ÎÒÃÇÖð²½×öÒ»¸ö·ÖÎö¡£

¾ßÌåµÄÁ½½×¶ÎÌá½»²½Öè×ܽáÈçÏ£º

µÚÒ»ÌõÊý¾ÝÀ´ÁËÖ®ºó£¬¿ªÆôÒ»¸ö kafka µÄÊÂÎñ£¨transaction£©£¬Õý³£Ð´Èë kafka ·ÖÇøÈÕÖ¾µ«±ê¼ÇΪδÌá½»£¬Õâ¾ÍÊÇ¡°Ô¤Ìá½»¡±

jobmanager ´¥·¢ checkpoint ²Ù×÷£¬barrier ´Ó source ¿ªÊ¼ÏòÏ´«µÝ£¬Óöµ½ barrier µÄËã×Ó½«×´Ì¬´æÈë״̬ºó¶Ë£¬²¢Í¨Öª jobmanagerr

sink Á¬½ÓÆ÷ÊÕµ½ barrier£¬±£´æµ±Ç°×´Ì¬£¬´æÈë checkpoint£¬Í¨Öª jobmanager£¬²¢¿ªÆôÏÂÒ»½×¶ÎµÄÊÂÎñ£¬ÓÃÓÚÌύϸö¼ì²éµãµÄÊý¾Ý

jobmanager ÊÕµ½ËùÓÐÈÎÎñµÄ֪ͨ£¬·¢³öÈ·ÈÏÐÅÏ¢£¬±íʾ checkpoint Íê³É

sink ÈÎÎñÊÕµ½ jobmanager µÄÈ·ÈÏÐÅÏ¢£¬ÕýʽÌá½»Õâ¶Îʱ¼äµÄÊý¾Ý

Íⲿkafka¹Ø±ÕÊÂÎñ£¬Ìá½»µÄÊý¾Ý¿ÉÒÔÕý³£Ïû·ÑÁË

Áù¡¢ÔÚ´úÂëÖвâÊÔCheckpoint

package com.flink.charpter07.state;
import org.apache.flink.api .common.functions.FlatMapFunction;
import org.apache.flink.api .common.functions.MapFunction;
import org.apache.flink.api .common.serialization.SimpleStringSchema;
import org.apache.flink.api .java.tuple.Tuple2;
import org.apache.flink.runtime .state.filesystem.FsStateBackend;
import org.apache.flink.streaming .api.CheckpointingMode;
import org.apache.flink.streaming .api.environment.CheckpointConfig;
import org.apache.flink.streaming .api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming .connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming .connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.Properties;
public class S04_CheckPoint {
public static void main(String[] args) throws IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment();
env.setParallelism(2);
Properties properties = new Properties();
properties.setProperty ("bootstrap.servers", "hadoop162:9092, hadoop163:9092");
properties.setProperty ("group.id", "S04_CheckPoint");
properties.setProperty ("auto.offset.reset", "latest");
env.setStateBackend(new FsStateBackend ("hdfs://hadoop162:8020/ flink/checkpoints/fs"));
//ÉèÖÃcheckpointµÄʱ¼ä¼ä¸ô
env.enableCheckpointing(2000);
//ÉèÖÃģʽΪ¾«×¼Ò»´Î(ĬÈÏÖµ)
env.getCheckpointConfig().setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE);
//ÉèÖÃcheckpointÖ®¼äµÄʱ¼ä¼ä¸ô
env.getCheckpointConfig( ).setMinPauseBetweenCheckpoints(1000);
//checkpoint±ØÐëÔÚÒ»·ÖÖÓÄÚÍê³É£¬·ñÔò±»Åׯú
env.getCheckpointConfig( ).setCheckpointTimeout(60000);
//ͬһʱ¼äÖ»ÔÊÐíÒ»¸öcheckpoint
env.getCheckpointConfig( ).setMaxConcurrentCheckpoints(1);

//¿ªÆôÔÚjobÖÐÖÕÖ¹ºóÈÎÈ»±£ÁôµÄexternalized checkpoints
env.getCheckpointConfig( ).enableExternalizedCheckpoints (CheckpointConfig .ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION);

env.addSource(new FlinkKafkaConsumer<String> ("test10",new SimpleStringSchema( ),properties))
.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2< String,Long>> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(Tuple2.of(word,1l));
}
}
})
.keyBy(t->t.f0)
.sum(1)
//.map(t->"("+t.f0+":"+t.f1+")")
.map(new MapFunction<Tuple2<String, Long>, String>() {
@Override
public String map(Tuple2<String, Long> value) throws Exception {

StringBuffer bs = new StringBuffer();
bs.append("(").append(value.f0).append(" :").append(value.f1).append(")");
return bs.toString();
}
})
.addSink(new FlinkKafkaProducer<String> ("hadoop162:9092","test11",new SimpleStringSchema()));


try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}

 
   
1870 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]
 
×îÐÂÎÄÕÂ
´óÊý¾Ýƽ̨ϵÄÊý¾ÝÖÎÀí
ÈçºÎÉè¼ÆÊµÊ±Êý¾Ýƽ̨£¨¼¼Êõƪ£©
´óÊý¾Ý×ʲú¹ÜÀí×ÜÌå¿ò¼Ü¸ÅÊö
Kafka¼Ü¹¹ºÍÔ­Àí
ELK¶àÖּܹ¹¼°ÓÅÁÓ
×îпγÌ
´óÊý¾Ýƽ̨´î½¨Óë¸ßÐÔÄܼÆËã
´óÊý¾Ýƽ̨¼Ü¹¹ÓëÓ¦ÓÃʵս
´óÊý¾ÝϵͳÔËά
´óÊý¾Ý·ÖÎöÓë¹ÜÀí
Python¼°Êý¾Ý·ÖÎö
³É¹¦°¸Àý
ijͨÐÅÉ豸ÆóÒµ PythonÊý¾Ý·ÖÎöÓëÍÚ¾ò
Ä³ÒøÐÐ È˹¤ÖÇÄÜ+Python+´óÊý¾Ý
±±¾© Python¼°Êý¾Ý·ÖÎö
ÉñÁúÆû³µ ´óÊý¾Ý¼¼Êõƽ̨-Hadoop
ÖйúµçÐÅ ´óÊý¾Ýʱ´úÓëÏÖ´úÆóÒµµÄÊý¾Ý»¯ÔËӪʵ¼ù