Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
flink¿ª·¢ÊµÕ½Ö®flinkÔ­Àí½âÎö
 
  4884  次浏览      27
 2018-11-6 
 
±à¼­ÍƼö:
À´Ô´ÍøÂ磬Flink ÊÇÒ»¸öÕë¶ÔÁ÷Êý¾ÝºÍÅúÊý¾ÝµÄ·Ö²¼Ê½´¦ÀíÒýÇæ¡£Ö÷ÒªÊÇÓÉ Java ´úÂëʵÏÖ¡£ÆäËùÒª´¦ÀíµÄÖ÷Òª³¡¾°¾ÍÊÇÁ÷Êý¾Ý£¬ÅúÊý¾ÝÖ»ÊÇÁ÷Êý¾ÝµÄÒ»¸ö¼«ÏÞÌØÀý¶øÒÑ¡£

¹ØFlink³öÏֵı³¾°

ÎÒÃÇÖªµÀĿǰÁ÷´¦ÀíµÄÖ÷ÒªÁ÷ÐеļÆËãÒýÇæÓУ¬Storm£¬SparkStreaming¡£µ«ÊÇÕâ¸öÁ½¸ö¼ÆËãÒýÇæ¶¼ÓÐ×Ô¼ºµÄ¾ÖÏÞÐÔ¡£StormʵÏÖÁ˵ÍÑÓ³Ù£¬µ«ÊÇĿǰ»¹Ã»ÓÐʵÏÖ¸ßÍÌÍ£¬Ò²²»ÄÜÔÚ¹ÊÕÏ·¢ÉúµÄʱºò׼ȷµÄ´¦Àí¼ÆËã״̬£¨½«Êý¾Ý´ÓÒ»¸öʼþ±£´æµ½ÁíÒ»¸öʼþµÄ£¬ÕâЩ±£ÁôÏÂÀ´µÄÊÇÊý¾Ý½Ï¼ÆËã״̬£©£¬Í¬Ê±Ò²²»ÄÜʵÏÖexactly-once¡£SparkStreamingͨ¹ý΢Åú´¦Àí·½·¨ÊµÏÖÁ˸ßÍÌͺÍÈÝ´íÐÔ£¬µ«ÊÇÎþÉüÁ˵ÍÑÓ³ÙºÍʵʱ´¦ÀíµÄÄÜÁ¦£¬Ò²²»ÄÜʹÓô°¿ÚÓë×ÔȻʱ¼äÏàÆ¥Åä¡£FlinkµÄ³öÏÖÍêÃÀµÄ½â¾öÁËÒÔÉÏÎÊÌ⣬ÕâÒ²ÊÇflink³öÏÖµÄÔ­Òò£¬flink²»½öÄÜÌṩͬʱ֧³Ö¸ßÍÌͺÍexactly-onceÓïÒåµÄʵʱ¼ÆË㣬»¹Äܹ»ÌṩÅúÁ¿Êý¾ÝµÄ´¦Àí,²¢ÇÒºÍÆäËûµÄ¼ÆËãÒýÇæÏà±È£¬flinkÄܹ»Çø·Ö³ö²»Í¬µÄÀàÐ͵Äʱ¼ä¡£

Flink ¼ò½é

Flink µÄǰÉíÒѾ­ÊǰØÁÖÀí¹¤´óѧһ¸öÑо¿ÐÔÏîÄ¿£¬ ÔÚ 2014 ±» Apache ·õ»¯Æ÷Ëù½ÓÊÜ£¬È»ºóѸËٵسÉΪÁË ASF£¨Apache Software Foundation£©µÄ¶¥¼¶ÏîĿ֮һ¡£Flink ÊÇÒ»¸öÕë¶ÔÁ÷Êý¾ÝºÍÅúÊý¾ÝµÄ·Ö²¼Ê½´¦ÀíÒýÇæ¡£Ö÷ÒªÊÇÓÉ Java ´úÂëʵÏÖ¡£ÆäËùÒª´¦ÀíµÄÖ÷Òª³¡¾°¾ÍÊÇÁ÷Êý¾Ý£¬ÅúÊý¾ÝÖ»ÊÇÁ÷Êý¾ÝµÄÒ»¸ö¼«ÏÞÌØÀý¶øÒÑ¡£Flink ¿ÉÒÔÖ§³Ö±¾µØµÄ¿ìËÙµü´ú£¬ÒÔ¼°Ò»Ð©»·Ðεĵü´úÈÎÎñ¡£²¢ÇÒ Flink ¿ÉÒÔ¶¨ÖÆ»¯ÄÚ´æ¹ÜÀí¡£ÔÚÕâµã£¬Èç¹ûÒª¶Ô±È Flink ºÍ Spark µÄ»°£¬Flink ²¢Ã»Óн«ÄÚ´æÍêÈ«½»¸øÓ¦Óò㡣ÕâÒ²ÊÇΪʲô Spark Ïà¶ÔÓÚ Flink£¬¸üÈÝÒ׳öÏÖ OOM µÄÔ­Òò£¨out of memory£©¡£¾Í¿ò¼Ü±¾ÉíÓëÓ¦Óó¡¾°À´Ëµ£¬Flink ¸üÏàËÆÓë Storm¡£ÏÂÃæÈÃÎÒÃÇÏÈÀ´¿´Ï Flink µÄ¼Ü¹¹Í¼¡£

Èçͼ Ëùʾ£¬ÎÒÃÇ¿ÉÒÔÁ˽⵽ Flink ¼¸¸ö×î»ù´¡µÄ¸Å

Client¡¢JobManager ºÍ TaskManager£º

Client ÓÃÀ´Ìá½»ÈÎÎñ¸ø JobManager£¬JobManager ·Ö·¢ÈÎÎñ¸ø TaskManager È¥Ö´ÐУ¬È»ºó TaskManager »áÐÄÌøµÄ»ã±¨ÈÎÎñ״̬¡£´Ó¼Ü¹¹Í¼È¥¿´£¬JobManager ºÜÏñµ±ÄêµÄ JobTracker£¬TaskManager Ò²ºÜÏñµ±ÄêµÄ TaskTracker¡£È»¶øÓÐÒ»¸ö×îÖØÒªµÄÇø±ð¾ÍÊÇ TaskManager Ö®¼äÊÇÊÇÁ÷£¨Stream£©¡£Æä´Î£¬Hadoop Ò»´úÖУ¬Ö»ÓÐ Map ºÍ Reduce Ö®¼äµÄ Shuffle£¬¶ø¶Ô Flink ¶øÑÔ£¬¿ÉÄÜÊǺܶ༶¶ø²»Ïñ Hadoop£¬Êǹ̶¨µÄ Map µ½ Reduce¡£

Flink µÄÉú̬Ȧ(¼¼ÊõÕ»)

Flink Ê×ÏÈÖ§³ÖÁË Scala ºÍ Java µÄ API£¬Python Ò²ÕýÔÚ²âÊÔÖС£Flink ͨ¹ý Gelly Ö§³ÖÁËͼ²Ù×÷£¬»¹ÓлúÆ÷ѧϰµÄ FlinkML¡£Table ÊÇÒ»ÖÖ½Ó¿Ú»¯µÄ SQL Ö§³Ö£¬Ò²¾ÍÊÇ API Ö§³Ö£¬¶ø²»ÊÇÎı¾»¯µÄ SQL ½âÎöºÍÖ´ÐС£ÖµµÄÒ»ÌáµÄÊÇflink·Ö±ðÌṩÁËÃæÏòÁ÷´¦Àí½Ó¿Ú£¨DataStream API£©ºÍÃæÏòÅú´¦ÀíµÄ½Ó¿Ú(DataSet API),ͬʱflinkÖ§³ÖÍØÕ¹¿âÉè¼Æ»úÆ÷ѧϰ£¬FlinkML£¬¸´ÔÓʱ¼ä´¦Àí£¨CEP£©ÒÔ¼°Í¼¼ÆË㣬»¹ÓзֱðÕë¶ÔÁ÷´¦ÀíºÍÅú´¦ÀíµÄTable API

Ö´ÐÐÅäÖÃ

flinkÖ´Ðл·¾³°üÀ¨Åú´¦ÀíºÍÁ÷³ö£¬ËùÒÔÒª·ÖÁ½ÖÖÇé¿ö½øÐÐÖ´ÐÐÅäÖÃ

Flink Åú´¦Àí»·¾³

val env = ExecutionEnvironment .getExecutionEnvironment

Flink Á÷´¦Àí»·¾³

val env = StreamExecutionEnvironment .getExecutionEnvironment

½ÓÏÂÀ´ÎÒ¿ÉÒÔÔÚenv½øÐÐÏà¹ØµÄÉèÖÃ

StreamExecutionEnvironment°üº¬ExecutionConfigÔÊÐíΪÔËÐÐʱÉèÖù¤×÷µÄ¾ßÌåÅäÖÃÖµ¡£Òª¸ü¸ÄÓ°ÏìËùÓÐ×÷ÒµµÄĬÈÏÖµ¡£

val env = StreamExecutionEnvironment .getExecutionEnvironment

var executionConfig = env.getConfig

¿ÉÒÔʹÓÃÒÔÏÂÅäÖÃÑ¡Ïî:

enableClosureCleaner() /disableClosureCleaner()¡£

ĬÈÏÇé¿öÏÂÆôÓñհüÇåÀíÆ÷¡£±Õ°üÇåÀíÆ÷ɾ³ýFlink³ÌÐòÖжÔÖÜΧÀàÄäÃûº¯ÊýµÄ²»ÐèÒªµÄÒýÓ᣽ûÓñհüÇå³ý³ÌÐòºó£¬¿ÉÄܻᷢÉúÄäÃûÓû§º¯ÊýÒýÓÃÖÜΧµÄÀࣨͨ³£²»ÊÇSerializable£©¡£Õ⽫µ¼ÖÂÐòÁл¯³ÌÐò³öÏÖÒì³£¡£

getParallelism() /setParallelism(int parallelism)

ÉèÖÃ×÷ÒµµÄĬÈϲ¢Ðжȡ£

getMaxParallelism() /setMaxParallelism(int parallelism)

ÉèÖÃ×÷ÒµµÄĬÈÏ×î´ó²¢Ðжȡ£´ËÉèÖÃÈ·¶¨×î´ó²¢ÐжȲ¢Ö¸¶¨¶¯Ì¬Ëõ·ÅµÄÉÏÏÞ

»¹ÓÐÆäËûµÄÅäÖÃÏî¿ÉÒÔÅäÖ㬾Ͳ»Ò»Ò»Áо٣¬¿ÉÒԲο¼flink¹Ù·½ÍøÕ¾

 

ÉèÖò¢ÐÐÐÔ

Flink³ÌÐòÓɶà¸öÈÎÎñ£¨×ª»»/ÔËËã·û£¬Êý¾ÝÔ´ºÍ½ÓÊÕÆ÷£©×é³É¡£ÈÎÎñ±»·Ö³É¼¸¸ö²¢ÐÐʵÀýÒÔ¹©Ö´ÐУ¬Ã¿¸ö²¢ÐÐʵÀý´¦ÀíÈÎÎñÊäÈëÊý¾ÝµÄ×Ó¼¯¡£ÈÎÎñµÄ²¢ÐÐʵÀýÊý³ÆÎª²¢ÐÐÐÔ¡£Èç¹ûҪʹÓñ£´æµã£¬»¹Ó¦¿¼ÂÇÉèÖÃ×î´ó²¢Ðжȣ¨»òmax parallelism£©¡£´Ó±£´æµã»Ö¸´Ê±£¬Äú¿ÉÒÔ¸ü¸ÄÌØ¶¨ÔËËã·û»òÕû¸ö³ÌÐòµÄ²¢Ðжȣ¬´ËÉèÖÃÖ¸¶¨²¢ÐжȵÄÉÏÏÞ¡£ÕâÊDZØÐèµÄ£¬ÒòΪFlinkÔÚÄÚ²¿½«×´Ì¬»®·ÖΪÃÜÔ¿×飬²¢ÇÒÎÒÃDz»ÄÜÓµÓÐ+Inf¶à¸öÃÜÔ¿×飬ÒòΪÕâ»á¶ÔÐÔÄܲúÉú²»ÀûÓ°Ïì¡£

²Ù×÷¼¶±ð

¿ÉÒÔͨ¹ýµ÷ÓÃÆäsetParallelism()·½·¨À´¶¨Òåµ¥¸öÔËËã·û£¬Êý¾ÝÔ´»òÊý¾Ý½ÓÊÕÆ÷µÄ²¢ÐÐÐÔ¡£ÀýÈç

final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); DataStream<String> text = [...]DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow (Time.seconds(5))
.sum(1).setParallelism(5) ; wordCounts.print() ;env.execute("Word Count Example");

 

Ö´Ðл·¾³¼¶±ð

Flink³ÌÐòÔÚÖ´Ðл·¾³µÄÉÏÏÂÎÄÖÐÖ´ÐС£Ö´Ðл·¾³ÎªÆäÖ´ÐеÄËùÓвÙ×÷·û£¬Êý¾ÝÔ´ºÍÊý¾Ý½ÓÊÕÆ÷¶¨ÒåĬÈϲ¢ÐÐÐÔ¡£¿ÉÒÔͨ¹ýÏÔʽÅäÖÃÔËËã·ûµÄ²¢ÐÐÐÔÀ´¸²¸ÇÖ´Ðл·¾³²¢ÐÐÐÔ¡£¿ÉÒÔͨ¹ýµ÷ÓÃsetParallelism()·½·¨À´Ö¸¶¨Ö´Ðл·¾³µÄĬÈϲ¢ÐÐÐÔ¡£ÒªÒÔ²¢Ðз½Ê½Ö´ÐÐËùÓÐÔËËã·û£¬Êý¾ÝÔ´ºÍÊý¾Ý½ÓÊÕÆ÷£¬Çë3°´ÈçÏ·½Ê½ÉèÖÃÖ´Ðл·¾³µÄĬÈϲ¢Ðжȣº

final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");

¿Í»§¼¶±ð

ÔÚÏòFlinkÌá½»×÷ҵʱ£¬¿ÉÒÔÔÚ¿Í»§¶ËÉèÖò¢ÐÐÐÔ¡£¿Í»§¶Ë¿ÉÒÔÊÇJava»òScala³ÌÐò¡£ÕâÖÖ¿Í»§¶ËµÄÒ»¸öÀý×ÓÊÇFlinkµÄÃüÁîÐнçÃæ£¨CLI£©¡£

¶ÔÓÚCLI¿Í»§¶Ë£¬¿ÉÒÔʹÓÃÖ¸¶¨parallelism²ÎÊý-p¡£ÀýÈ磺

./bin/flink run -p 10 ../examples/ *WordCount-java*.jar

»ù±¾API£¨Á÷´¦ÀíºÍÅú´¦Àí£©

Åú´¦ÀíÊÇÁ÷´¦ÀíµÄÒ»Öַdz£ÌØÊâµÄÇé¿ö¡£FlinkµÄÌØÊâÖ®´¦¾ÍÔÚÓڼȿÉÒÔ°ÑÊý¾Ýµ±×öÁ÷½øÐд¦ÀíÒ²¿ÉÒÔ°ÑÊý¾Ýµ±×÷ÓÐÏÞÁ÷½øÐÐÅú´¦Àí¡£¿ÉÒÔÀí½âΪ£º

DataSet PIÓÃÓÚÅú´¦Àí£ºÏ൱ÓÚspark core

DataStream APIÓÃÓÚÁ÷ʽ´¦Àí£ºÏ൱ÓÚspark streaming

DataSetºÍDataStream

Flink¾ßÓÐÌØÊâÀàDataSetºÍDataStreamÔÚ³ÌÐòÖбíʾÊý¾Ý¡£Äú¿ÉÒÔ½«ËüÃÇÊÓΪ¿ÉÒÔ°üº¬Öظ´ÏîµÄ²»¿É±äÊý¾Ý¼¯ºÏ¡£ÔÚDataSetÊý¾ÝÓÐÏÞ£¬¶ÔÓÚÒ»¸öDataStreamÔªËØµÄÊýÁ¿¿ÉÒÔÊÇÎÞ½çµÄ¡£ÕâЩ¼¯ºÏÔÚijЩ¹Ø¼ü·½ÃæÓë³£¹æJava¼¯ºÏ²»Í¬¡£Ê×ÏÈ£¬ËüÃÇÊDz»¿É±äµÄ£¬ÕâÒâζ×ÅÒ»µ©´´½¨ËüÃǾÍÎÞ·¨Ìí¼Ó»òɾ³ýÔªËØ¡£ÄãÒ²²»Äܼòµ¥µØ¼ì²éÀïÃæµÄÔªËØ¡£¼¯ºÏ×î³õͨ¹ýÔÚ¸¥Áֿ˳ÌÐòÌí¼ÓÔ´´´½¨ºÍÐµļ¯ºÏ´ÓÕâЩͨ¹ý½«ËüÃÇʹÓÃAPI·½·¨ÈçÑÜÉúmap£¬filterµÈµÈ¡£

Flink¼Æ»®µÄÆÊÎö

Flink³ÌÐò¿´ÆðÀ´ÏñÊÇת»»Êý¾Ý¼¯ºÏµÄ³£¹æ³ÌÐò¡£Ã¿¸ö³ÌÐò°üº¬ÏàͬµÄ»ù±¾²¿·Ö£º

»ñµÃÒ»¸öexecution environment£¬

¼ÓÔØ/´´½¨³õʼÊý¾Ý£¬

Ö¸¶¨´ËÊý¾ÝµÄת»»£¬

Ö¸¶¨·ÅÖüÆËã½á¹ûµÄλÖã¬

´¥·¢³ÌÐòÖ´ÐÐ

ÎÒÃÇÏÖÔÚ½«¸ÅÊöÿ¸ö²½Ö裬Çë²ÎÔÄÏàÓ¦²¿·ÖÒÔ»ñÈ¡¸ü¶àÏêϸÐÅÏ¢¡£Çë×¢Ò⣬Scala DataSet APIµÄËùÓкËÐÄÀà¶¼¿ÉÒÔÔÚorg.apache.flink.api.scala°üÖÐÕÒµ½£¬¶øScala DataStream APIµÄÀà¿ÉÒÔÔÚorg.apache.flink.streaming.api.scalaÖÐÕÒµ½¡£

ÕâStreamExecutionEnvironmentÊÇËùÓÐFlink¼Æ»®µÄ»ù´¡¡£Äú¿ÉÒÔʹÓÃÒÔϾ²Ì¬·½·¨»ñȡһ¸öStreamExecutionEnvironment£º

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

ͨ³££¬ÄúÖ»ÐèҪʹÓÃgetExecutionEnvironment()£¬ÒòΪÕ⽫¸ù¾ÝÉÏÏÂÎÄ×öÕýÈ·µÄÊÂÇ飺Èç¹ûÄúÔÚIDEÖÐÖ´ÐгÌÐò»ò×÷Ϊ³£¹æJava³ÌÐò£¬Ëü½«´´½¨Ò»¸ö±¾µØ»·¾³£¬½«ÔÚ±¾µØ¼ÆËã»úÉÏÖ´ÐÐÄúµÄ³ÌÐò¡£Èç¹ûÄú´Ó³ÌÐòÖд´½¨ÁËÒ»¸öJARÎļþ£¬²¢Í¨¹ýÃüÁîÐе÷ÓÃËü£¬ÔòFlink¼¯Èº¹ÜÀíÆ÷½«Ö´ÐÐÄúµÄmain·½·¨²¢getExecutionEnvironment()·µ»ØÒ»¸öÖ´Ðл·¾³£¬ÒÔ±ãÔÚ¼¯ÈºÉÏÖ´ÐÐÄúµÄ³ÌÐò¡£

¶ÁÈ¡Êý¾Ý

¶ÔÓÚÖ¸¶¨Êý¾ÝÔ´£¬Ö´Ðл·¾³Óм¸ÖÖ·½·¨¿ÉÒÔʹÓø÷ÖÖ·½·¨´ÓÎļþÖжÁÈ¡£ºÄú¿ÉÒÔÖðÐжÁÈ¡ËüÃÇ£¬CSVÎļþ»òʹÓÃÍêÈ«×Ô¶¨ÒåÊý¾ÝÊäÈë¸ñʽ¡£Òª½«Îı¾Îļþ×÷ΪһϵÁÐÐжÁÈ¡£¬Äú¿ÉÒÔʹÓãº

val env = StreamExecutionEnvironment .getExecutionEnvironment()

val text: DataStream[String] = env.readTextFile("file:///path/to/file")

 

Õ⽫ΪÄúÌṩһ¸öDataStream£¬È»ºóÄú¿ÉÒÔÔÚÆäÉÏÓ¦ÓÃת»»À´´´½¨ÐµÄÅÉÉúDataStream¡£

Äú¿ÉÒÔͨ¹ýʹÓÃת»»º¯Êýµ÷ÓÃDataSetÉϵķ½·¨À´Ó¦ÓÃת»»¡£

ÀýÈ磬mapת»»ÈçÏÂËùʾ£º

val input:DataSet[String]= ...val mapped=input.map {x=>x.toInt}

Õ⽫ͨ¹ý½«Ô­Ê¼¼¯ºÏÖеÄÿ¸öStringת»»ÎªIntegerÀ´´´½¨ÐµÄDataStream¡£

Êý¾ÝÊä³ö

Ò»µ©ÓÐÁ˰üº¬×îÖÕ½á¹ûµÄDataStream£¬¾Í¿ÉÒÔͨ¹ý´´½¨½ÓÊÕÆ÷½«ÆäдÈëÍⲿϵͳ¡£ÕâЩֻÊÇ´´½¨½ÓÊÕÆ÷µÄһЩʾÀý·½·¨£º

writeAsText(path:String)print()

Ò»µ©ÄúÖ¸¶¨µÄÍêÕû³ÌÐò£¬ÄãÐèÒª´¥·¢Ö´ÐгÌÐòµ÷ÓÃexecute()ÉÏStreamExecutionEnvironment¡£¸ù¾ÝÖ´ÐеÄÀàÐÍ£¬ExecutionEnvironment½«ÔÚ±¾µØ¼ÆËã»úÉÏ´¥·¢Ö´ÐлòÌá½»³ÌÐòÒÔÔÚȺ¼¯ÉÏÖ´ÐС£

¸Ãexecute()·½·¨·µ»ØÒ»¸öJobExecutionResult£¬°üº¬Ö´ÐÐʱ¼äºÍÀÛ¼ÓÆ÷½á¹û¡£

flink±à³ÌÄ£ÐÍ

DataSetºÍDataStreamÏà¹ØËã×ÓÌ«¶à¾Í²»Ò»Ò»ÁоÙÁË£¬Ê¹ÓÃʱ¿ÉÒԲο¼¹Ù·½Îĵµ¡£ÔÚÕâ¾ÙÁ½¸öÀý×Ó½øÐÐչʾflinkµÄ±à³ÌÄ£ÐÍ

°¸ÀýÒ»£º»ùÓÚÎļþ£¨±¾µØ£¬hdfs£©µÄwordcount

public class FunctionTest {
public static void main(String[] args) throws Exception {
//´´½¨Á÷Ö´Ðл·¾³
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//¶ÁÈ¡Îı¾ÎļþÖеÄÊý¾Ý
DataStreamSource<String> streamSource = env.readTextFile ("C:/flink_data/1.txt");
//½øÐÐÂß¼­¼ÆËã
SingleOutputStreamOperator< Tuple2<String, Integer>> dataStream = streamSource
.flatMap(new Splitter())
.keyBy(0)
.sum(1);
dataStream.print();
//ÉèÖóÌÐòÃû³Æ
env.execute("Window WordCount");
}
}

 

ʵÏÖ FlatMapFunction

public class Splitter implements FlatMapFunction< String, Tuple2<String, Integer>> {
@Override
public void flatMap (String sentence, Collector< Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split (" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}

 

°¸Àý¶þ£º¶ÁÈ¡kafakÖеÄÊý¾Ý±£´æµ½hdfsÖÐ

Ìí¼ÓmavenÒÀÀµ

<dependency>
<groupId> org.apache.flink< /groupId>
<artifactId> flink-connector-kafka-0.9_2.10 </artifactId>
<version>1.1.3</version>
</dependency>

 

³ÌÐò´úÂë

object DataFkafka {
def main(args: Array[String]): Unit = {
//ÉèÖÃkafkaÁ¬½Ó²ÎÊý
val properties = new Properties()
properties.setProperty ("bootstrap.servers", "10.10.4.11:9092, 10.10.49.183:9092,10.10.49.207:9092");
properties.setProperty ("zookeeper.connect", "10.10.4.11:2181, 10.10.49.183:2181");
properties.setProperty ("group.id", "res");
//»ñÈ¡Á÷Ö´Ðл·¾³
val env = StreamExecutionEnvironment .getExecutionEnvironment
//ÉèÖÃʱ¼äÀàÐÍ
env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime)
//ÉèÖüì²éµãʱ¼ä¼ä¸ô
env.enableCheckpointing(1000)
//ÉèÖüì²éµãģʽ
env.getCheckpointConfig.setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE)
//´´½¨kafakÏû·ÑÕߣ¬»ñÈ¡kafakÖеÄÊý¾Ý
val myConsumer: FlinkKafkaConsumer010[String] = new FlinkKafkaConsumer010 [String]("flink", new SimpleStringSchema(), properties)
val kafkaData: DataStream[String] = env.addSource(myConsumer)
kafkaData.print()
//Êý¾Ý±£´æµ½hdfs
kafkaData.writeAsText ("hdfs://10.10.4.11:9000/output/flink.txt")
print("kafka")
//ÉèÖóÌÐòÃû³Æ
env.execute("data_from_kafak_wangzh")

}

}

 

javaºÍscala¶Ô±È¿ÉÒÔ¿´³ö »¹ÊÇscala±È½Ï¼ò½à¡£

¼ì²éµã checkpoint

FlinkµÄ¼ì²éµãÌØÐÔÔÚÁ÷´¦ÀíÆ÷ÖÐÊǶÀÒ»ÎÞ¶þµÄ£¬³ÌÐòÔËÐÐʱÓÐflink×Ô¶¯Éú³É£¬

ËüʹµÃflink¿ÉÒÔ׼ȷµÄά³Ö״̬£¬ÊµÏÖÊý¾ÝµÄÒ»ÖÂÐÔ£¨exactly-once£©£¬²¢ÇÒ¸ßЧµÄÖØÐ´¦ÀíÊý¾Ý¡£

¼ì²éµã½éÉÜ

FlinkµÄ¼ì²éµã»úÖÆÊµÏÖÁ˱ê×¼µÄChandy-LamportËã·¨£¬²¢ÓÃÀ´ÊµÏÖ·Ö²¼Ê½¿ìÕÕ¡£ÔÚ·Ö²¼Ê½¿ìÕÕµ±ÖУ¬ÓÐÒ»¸öºËÐĵÄÔªËØ£ºBarrier¡£ÆÁÕÏ×÷ΪÊý¾ÝÁ÷µÄÒ»²¿·ÖËæ×żÇ¼±»×¢Èëµ½Êý¾ÝÁ÷ÖС£ÆÁÕÏÓÀÔ¶²»»á¸Ï³¬Í¨³£µÄÁ÷¼Ç¼£¬Ëü»áÑϸñ×ñѭ˳Ðò¡£ÆÁÕϽ«Êý¾ÝÁ÷ÖеļǼ¸ôÀë³ÉһϵÁеļǼ¼¯ºÏ£¬²¢½«Ò»Ð©¼¯ºÏÖеÄÊý¾Ý¼ÓÈëµ½µ±Ç°µÄ¿ìÕÕÖУ¬¶øÁíһЩÊý¾Ý¼ÓÈëµ½ÏÂÒ»¸ö¿ìÕÕÖС£Ã¿Ò»¸öÆÁÕÏЯ´ø×Å¿ìÕÕµÄID£¬¿ìÕռǼ×ÅID²¢ÇÒ½«Æä·ÅÔÚ¿ìÕÕÊý¾ÝµÄÇ°Ãæ¡£ÆÁÕϲ»»áÖжÏÁ÷´¦Àí£¬Òò´Ë·Ç³£ÇáÁ¿¼¶¡£À´×Ô²»Í¬¿ìÕյĶà¸öÆÁÕÏ¿ÉÄÜͬʱ³öÏÖÔÚÁ÷ÖУ¬ÕâÒâζ×Ŷà¸ö¿ìÕÕ¿ÉÄܲ¢·¢µØ·¢Éú¡£

¾ÙÀý˵Ã÷£º¾ÍÏñ¶à¸öÈËÒ»ÆðÊýÒ»´®ÏîÁ´µÄÖé×ÓÊýÁ¿£¬¼¸¸öÈËÔÚ˵»°£¬¿ÉÄÜijһʱ¿Ì£¬Íü¼ÇÊýÁ¿ÊǶàÉÙÁË£¬´ËʱÈç¹ûÎÒÃÇÿÎå¸öÖé×Ó¾Í˨һÌõ²»Í¬µÄÑÕÉ«£¬²¢ÇÒÌáǰÉèÖúùæÔò¡£±ÈÈçºìµÄ´ú±íÊýÎå¸ö£¬»ÆÉ«µÄ´ú±íÊýÁË10Öé×Ó£¬ÒÔ´ÎÀàÍÆ£¬ÄÇôµ±ÎÒÃÇÍü¼ÇÊýÁ˸öÖé×ÓµÄʱºò¶àÉÙʱ£¬¾Í¿ÉÒÔ¿´Ò»ÏÂÉþ×ÓµÄÑÕÉ«£¬¾ÍÖªµÀ×îеÄÉþ×Ó´ú±íµÄÖé×Ó˵£¬ÖØÐ´ÓÉþ×ÓÄÄÀï¼ÌÐøÊýÖé×ӵĸöÊý¡£

ÏÂͼÊÇcheckpointµÄÕûÌåÂß¼­Í¼£¬ÆäÖÐckptÊǼì²éµãÆÁÕÏ¡£ÔÚÊý¾ÝÁ÷ÖУ¬Ã¿Ò»ÌìÊý¾Ý¶¼»áÑϸñ°´ÕÕ¼ì²éµãǰºÍ¼ì²éµãºóµÄ¹æ¶¨£¬±»´¦Àí¡£¼ì²éµãÆÁÕÏÒ²»áÏñÊý¾ÝÒ»ÑùÔÚËã×Ó֮ǰÁ÷¶¯¡£µ±flinkËã×ÓÓöµ½¼ì²éµãÆÁÕÏʱ£¬Ëü»á½«¼ì²éµãÔÚÊý¾ÝÁ÷µÄλÖüǼÏÂÀ´£¬Èç¹ûÊý¾ÝÀ´×ÔkafakÄÇôλÖþÍÊÇÆ«ÒÆÁ¿¡£

µ±¼ì²éµã²Ù×÷Íê³É£¬½á¹û״̬ºÍλÖûᱸ·Ýµ½Îȶ¨µÄ´æ´¢½éÖÊÖÐÈçÏÂͼ¡£ÐèҪעÒâµÄÊÇ£ºÈç¹û¼ì²éµã²Ù×÷ʧ°ÜÁË£¬flink»á¶ªÆú¸Ã¼ì²éµã¼ÌÐøÕý³£Ö´ÐУ¬ÒòΪ֮ºóµÄijһ¸ö¼ì²éµãºÜ´ó³Ì¶È»á³É¹¦£¬ËäÈ»ÕâÑù»Ö¸´Ê±¼äÓе㳤£¬µ«ÊǶÔ״̬µÄ±£ÕÏÒÀ¾ÉºÜÓÐÁ¦£¬Ö»ÓÐÔÚһϵÁÐÁ¬µÄ¼ì²éµã²Ù×÷ʧ°Üflink²Å»á±¨´í¡£

¹ÊÕϽô¸ú¼ì²éµãµÄÇé¿ö

µ±¼ì²éµã²Ù×÷ÒѾ­Íê³É£¬µ«ÊǹÊÕϽôËæÆäºó¡£ÕâÖÖÇé¿öÏ£¬flink»áÖØÐÂÍØÆË£¬½«ÊäÈëÁ÷µ¹»Øµ½ÉÏÒ»¸ö¼ì²éµã£¬È»ºó»Ö¸´×´Ì¬Öµ²¢´Ó¸Ã³öÖØÐ¼ÌÐø¼ÆË㣬¿ÉÒÔ±£Ö¤ÔÚʣϵļǼ±»´¦Àíºó£¬µÃµ½µÄmapËã×ÓµÄ״̬ÓëûÓз¢Éú¹ÊÕϵÄ״̬һÖ£¬ÖµµÃ×¢ÒâµÄÊÇÓÐЩÊý¾Ý»áÖØ¸´¼ÆË㣬Ҳ¾ÍÊÇÊý¾Ý¿ÉÄÜ»á³öÏÖ¾Ö²¿µÄÖØ¸´¡£µ«ÊÇÎÒÃÇ¿ÉÒÔ½«Êý¾ÝÁ÷дÈëµ½ÌØÊâµÄϵͳÖУ¨±ÈÈçÎļþϵͳ£¬Êý¾Ý¿â£©À´½â¾öÕâ¸öÎÊÌâ¡£

ÆôÓúÍÅäÖüì²éµã

ĬÈÏÇé¿öÏ£¬½ûÓüì²éµã¡£ÎªÁËʹ¼ì²éµãÔÚStreamExecutionEnvironmentÉÏ£¬µ÷ÓÃ

enableCheckpointing(n)£¬ÆäÖÐÊÇÒÔºÁÃëΪµ¥Î»µÄ¼ì²éµã¼ä¸ô¡£

¼ì²éµãµÄÆäËû²ÎÊý°üÀ¨£º

Íêȫһ´ÎÓëÖÁÉÙÒ»´Î£ºÄú¿ÉÒÔÑ¡Ôñ½«Ä£Ê½´«µÝ¸øenableCheckpointing(n)·½·¨£¬ÒÔÔÚÁ½¸ö±£Ö¤¼¶±ðÖ®¼ä½øÐÐÑ¡Ôñ¡£¶ÔÓÚ´ó¶àÊýÓ¦ÓÃÀ´Ëµ£¬Ç¡ºÃÒ»´ÎÊÇÓÅÑ¡µÄ¡£ÖÁÉÙÒ»´Î¿ÉÄÜÓëijЩ³¬µÍÑÓ³Ù£¨Ê¼ÖÕΪ¼¸ºÁÃ룩µÄÓ¦ÓóÌÐòÏà¹Ø¡£

checkpoint timeout£¨¼ì²éµã³¬Ê±£©£ºÈç¹ûµ±Ç°¼ì²éµãδÍê³É£¬ÔòÖÐÖ¹¼ì²éµãµÄʱ¼ä¡£

minimum time between checkpoints¼ì²éµãÖ®¼äµÄ×î¶Ìʱ¼ä£ºÎªÈ·±£Á÷Ó¦ÓóÌÐòÔÚ¼ì²éµãÖ®¼äÈ¡µÃÒ»¶¨½øÕ¹£¬¿ÉÒÔ¶¨Òå¼ì²éµãÖ®¼äÐèÒª¾­¹ý¶à³¤Ê±¼ä¡£Èç¹û½«´ËÖµÉèÖÃΪÀýÈç5000£¬ÔòÎÞÂÛ¼ì²éµã³ÖÐøÊ±¼äºÍ¼ì²éµã¼ä¸ôÈçºÎ£¬ÏÂÒ»¸ö¼ì²éµã½«ÔÚÉÏÒ»¸ö¼ì²éµãÍê³Éºó²»³ÙÓÚ5ÃëÆô¶¯¡£Çë×¢Ò⣬ÕâÒâζ׿ì²éµã¼ä¸ôÓÀÔ¶²»»áСÓڴ˲ÎÊý¡£

ͨ¹ý¶¨Òå¡°¼ì²éµãÖ®¼äµÄʱ¼ä¡±¶ø²»ÊǼì²éµã¼ä¸ôÀ´ÅäÖÃÓ¦ÓóÌÐòͨ³£¸üÈÝÒ×£¬ÒòΪ¡°¼ì²éµãÖ®¼äµÄʱ¼ä¡±²»Ò×Êܼì²éµãÓÐʱÐèÒª±Èƽ¾ùʱ¼ä¸ü³¤µÄÊÂʵµÄÓ°Ï죨ÀýÈ磬Èç¹ûÄ¿±ê´æ´¢ÏµÍ³ÔÝʱºÜÂý£©¡£

Çë×¢Ò⣬´ËÖµ»¹±íʾ²¢·¢¼ì²éµãµÄÊýÁ¿ÎªÒ»¡£

number of concurrent checkpoints²¢·¢¼ì²éµãÊý£ºÄ¬ÈÏÇé¿öÏ£¬µ±Ò»¸ö¼ì²éµãÈÔ´¦ÓÚÔËÐÐ״̬ʱ£¬ÏµÍ³²»»á´¥·¢ÁíÒ»¸ö¼ì²éµã¡£Õâ¿ÉÈ·±£ÍØÆË²»»áÔÚ¼ì²éµãÉÏ»¨·ÑÌ«¶àʱ¼ä£¬Ò²²»»áÔÚ´¦ÀíÁ÷·½ÃæÈ¡µÃ½øÕ¹¡£¿ÉÒÔÔÊÐí¶à¸öÖØµþ¼ì²éµã£¬Õâ¶ÔÓÚ¾ßÓÐÌØ¶¨´¦ÀíÑӳٵĹܵÀ£¨ÀýÈ磬ÒòΪº¯Êýµ÷ÓÃÐèҪһЩʱ¼äÀ´ÏìÓ¦µÄÍⲿ·þÎñ£©¶ø¸ÐÐËȤ£¬µ«ÊÇÈÔȻϣÍûÖ´Ðзdz£Æµ·±µÄ¼ì²éµã£¨100ºÁÃ룩 £©ÔÚʧ°ÜÊ±ÖØÐ´¦ÀíºÜÉÙ¡£

µ±¶¨Òå¼ì²éµãÖ®¼äµÄ×î¶Ìʱ¼äʱ£¬²»ÄÜʹÓôËÑ¡Ïî¡£

externalized checkpointsÍⲿ»¯¼ì²éµã£ºÄú¿ÉÒÔ½«ÍâΧ¼ì²éµãÅäÖÃΪÍⲿ³Ö¾Ã»¯¡£Íⲿ»¯¼ì²éµã½«ÆäÔªÊý¾ÝдÈë³Ö¾Ã´æ´¢£¬²¢ÇÒÔÚ×÷ҵʧ°Üʱ²»»á×Ô¶¯Çå³ý¡£ÕâÑù£¬Èç¹ûÄúµÄ¹¤×÷ʧ°Ü£¬Äú½«ÓÐÒ»¸ö¼ì²éµã¿ÉÒÔ´ÓÖлָ´¡£ÓйØÍⲿ»¯¼ì²éµãµÄ²¿Êð˵Ã÷ÖÐÓиü¶àÏêϸÐÅÏ¢¡£

fail/continue task on checkpoint errors¹ØÓÚ¼ì²éµã´íÎóµÄʧ°Ü/¼ÌÐøÈÎÎñ£ºÕâÈ·¶¨Èç¹ûÔÚÖ´ÐÐÈÎÎñµÄ¼ì²éµã¹ý³ÌÖз¢Éú´íÎó£¬ÈÎÎñÊÇ·ñ½«Ê§°Ü¡£ÕâÊÇĬÈÏÐÐΪ¡£»òÕߣ¬µ±½ûÓôËÑ¡Ïîʱ£¬ÈÎÎñ½«¼òµ¥µØ¾Ü¾ø¼ì²éµãЭµ÷Æ÷µÄ¼ì²éµã²¢¼ÌÐøÔËÐÐ


StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing (1000);
// advanced options: // set mode to exactly-once (this is the default)
env.getCheckpointConfig() .setCheckpointingMode (CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig() .setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig() .setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig () .setMaxConcurrentCheckpoints (1); // enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig() .enableExternalizedCheckpoints (ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION);

´°¿Ú

´°¿ÚÊÇÒ»ÖÖ»úÖÆ¡£ÔÊÐíÐí¶àʼþ°´ÕÕʱ¼ä»òÕ߯äËûÌØÕ÷½øÐзÖ×飬½«Ã¿Ò»×é×÷ΪÕûÌåÈ¥·ÖÎö¼ÆËã¡£FlinkÖеĴ°¿ÚÖ÷ÒªÓÐʱ¼ä´°¿Ú£¬¼ÆÊý´°¿Ú£¬»Ø»°´°¿Ú¡£²¢ÇÒÎÒÃÇÒªÖªµÀflinkÊÇΨһһ¸öÖ§³Ö»Ø»°´°¿ÚµÄ¿ªÔ´Á÷´¦ÀíÆ÷£¬ÕâÀïÖ÷Òª½éÉÜÓô¦×é¶àµÄʱ¼ä´°¿Ú¡£

ʱ¼ä´°¿Ú

ʱ¼ä´°¿ÚÊÇ×î¼òµ¥£¬×îÓÐÓõÄÒ»ÖÖ´°¿Ú£¬ËüÖ§³Ö¹ö¶¯ºÍ»¬¶¯£¬¼¸¸ö¼òµ¥µÄÀý×Ó£¬¶Ô´«¸ÐÆ÷µÄ·¢³öµÄÊý¾Ý½øÐÐÇóºÍ

Ò»·ÖÖÓ¹ö¶¯´°¿ÚÊÕ¼¯×î½üÒ»·ÖÖÓµÄÊýÖµ£¬²¢ÔÚÒ»·ÖÖÓ½áÊøÊ±Êä³ö×ܺͣ¬ÈçÏÂͼ

Ò»·ÖÖÓ»¬¶¯´°¿Ú¼ÆËã×î½üÒ»·ÖÖÓµÄÊýÖµ×ܺͣ¬µ«ÊÇÿ°ë·ÖÖÓ»¬¶¯Ò»´Î²¢Êä³ö½á¹û£¬ÈçÏÂͼ

µÚÒ»¸ö»¬¶¯´°¿Ú¶Ô?3,2,5,7ÇóºÍµÃµ½17£¬°ë·ÖÖÓºó´°¿Ú»¬¶¯£¬È»ºó¶Ô2,5,7,1ÇóºÍµÃµ½½á¹û15ÒÔ´ËÀàÍÆ¡£

ʱ¼ä´°¿Ú´úÂë

Ò»·ÖÖӵϬ¶¯´°¿Ú£º

Stream.timeWindows(Time.minute(1))

ÿ°ë·ÖÖÓ£¨30Ã룩»¬¶¯Ò»´ÎµÄÒ»·ÖÖÓ»¬¶¯´°¿Ú

Stream.timeWindows (Time.minute(1), Time.second(30))

¼ÆÊý´°¿Ú

¼ÆÊý´°¿ÚµÄ·Ö×éÒÀ¾Ý²»ÔÙÊÇʱ¼ä£¬¶øÊÇÔªËØµÄÊýÁ¿¡£ÀýÈçÔÚÉÏÃæµÄͼ-2Ò²¿ÉÒÔ½âÊÍΪÓÉ4¸öÔªËØ×é³ÉµÄ¼ÆÊý´°¿Ú£¬²¢ÇÒÿÁ½¸öÔªËØ»¬¶¯Ò»´Î£¬¹ö¶¯ºÍ»¬¶¯¼ÆÊý´°¿Ú¶¨ÒåÈçÏÂ

Stream.countWindow(4)

Stream.countWindow(4,2)

×¢Òâ;

¼ÆÊý´°¿Ú²»Èçʱ¼ä´°¿ÚÄÇôÑϽ÷£¬Òª½÷É÷ʹÓ㬱ÈÈçÆä¶¨ÒåµÄÔªËØÊýÁ¿Îª100£¬È»¶øÄ³Ò»¸ökey¶ÔÓ¦µÄÔªËØÓÀÔ¶´ï²»µ½100¸ö£¬ÄÇô¼ÆÊý´°¿Ú¾Í»áÓÀÔ¶²»¹Ø±Õ£¬Ôò±»¸Ã´°¿ÚÕ¼ÓõÄÄÚ´æ¾ÍÀË·ÑÁË£¬Ò»ÖÖ½â¾ö°ì·¨¾ÍÊÇÓÃʱ¼ä´°¿Ú´¥·¢³¬Ê±¡£

»á»°´°¿Ú

»á»°Ö¸µÄÊǻ½×¶Î£¬Æäǰºó¶¼ÊǷǻ½×¶Î£¬ÀýÈçijÓû§ÔÚÓëÍøÕ¾½øÐÐһϵÁеĽ»»¥Ö®ºó£¬¹Ø±Õä¯ÀÀÆ÷»òÕß²»ÔÚ½»»¥£¨·Ç»î¶¯½×¶Î£©¡£»á»°ÐèÒªÓÐ×Ô¼ºµÄ´¦Àí»úÖÆ£¬ÒòΪËûÃÇͨ³£Ã»Óй̶¨µÄ³ÖÐøÊ±¼ä£¬»òÕß˵¹Ì¶¨µÄ½»»¥´ÎÊý£¨ÓеĿÉÄܵã»÷3´Î¾Í¹ºÂòÁËÎïÆ·£¬ÓеĿÉÄܵã»÷40´Î²Å¹ºÂòÎïÆ·£©¡£

ÔÚflinkÖС£»á»°´°¿ÚÓÉʱ¼äÉ趨¡£¼ÈÏ£ÍûµÈ´ý¶à¾ÃÈÏΪ»á»°ÒѾ­½áÊø¡£¾ÙÀýÀ´Ëµ£¬ÒÔÏ´úÂë±íʾ£¬Óû§´¦Óڷǻʱ¼ä³¬¹ýÎå·ÖÖÓ¼ÈÈÏΪ»á»°½áÊø

Stream.window (sessionWindow.withGap (Time.minutes(5)))

ˮӡ

ÏÖÔÚÓÐÒ»¸öÎÊÌâ¾ÍÊÇ£ºÈçºÎÅжÏËùÓеÄʼþÊÇ·ñ¶¼ÒѾ­µ½´ï£¬ÒÔ¼°ºÎʱ¼ÆËãºÍÊä³ö´°¿ÚµÄ½á¹û£¿»»ÑÔÖ®¾ÍÊÇ£ºÈçºÎ×·×Ùʼþʱ¼ä£¬²¢ÖªÏþÊäÈëÊý¾ÝÒѾ­Á÷È뵽ij¸öʼþʱ¼äÄØ£¿ÎªÁË×·×Ùʼþʱ¼ä£¬ÐèÒªÒÀ¿¿ÓÉÊý¾ÝÇý¶¯µÄʱÖÓ£¬¶ø²»ÊÇϵͳʱ¼ä¡£

Flinkͨ¹ýˮӡÀ´ÍƽøÊ¼þʱ¼ä¡£Ë®Ó¡ÊÇǶÈëÔÚÁ÷Öеij£¹æ¼Ç¼¡£¼ÆËã³ÌÐòͨ³£Í¨¹ýË®»ñ֪ij¸öʱ¼äµãÒѵ½¡£±ÈÈç¶ÔÓÚÒ»·ÖÖӵĹö¶¯´°¿Ú£¬¼ÙÉèˮӡ±ê¼Çʱʱ¼äΪ£º10:01£º00£¬ÄÇôÊÕµ½Ë®Ó¡µÄ´°¿Ú¾ÍÖªµÀ²»»áÔÙÓÐÔçÓÚ¸Ãʱ¼äµÄ¼Ç¼³öÏÖ£¬ÒòΪËùÓÐʱ¼ä´ÁСÓÚ»òµÈÓÚ¸Ãʱ¼äµÄʼþ¶¼ÒѾ­µ½´ï¡£Õâʱ£¬´°¿Ú¾Í¿ÉÒÔ°²È«µÄ¼ÆËã²¢¸ø³ö½á¹û¡£Ë®Ó¡Ê¹µÃʼþʱ¼äºÍ´¦Àíʱ¼äÍêÈ«Î޹ء£³Ùµ½µÄˮӡ²¢²»»áÓ°Ïìµ½½á¹ûµÄÕýÈ·ÐÔ£¬¶ø»áÓ°Ïìµ½½á¹ûµÄËÙ¶È¡£

ˮӡÈçºÎÉú³É

ÔÚflinkÖУ¬Ë®Ó¡µÄÉú³ÉÓÉ¿ª·¢ÈËÔ±Éú³É£¬Õâͨ³£ÐèÒª¶ÔÏàÓ¦µÄÁìÓòÓÐÒ»¶¨µÄÁ˽⡣ÍêÃÀµÄˮӡ£ºÊ±¼ä´ÁСÓÚˮӡ±ê¼Çʱ¼äµÄʼþ²»»áÔÙ³öÏÖ¡£ÔÚÌØÊâÇé¿öÏ£¨Èç·ÇÂÒÐòʼþÁ÷£©£¬×î½üÒ»´ÎʼþµÄʱ¼ä´Á¾Í¿ÉÄÜÊÇÍêÃÀµÄˮӡ¡£Æô·¢Ê½Ë®Ó¡ÔòÏà·´£¬ËüÖ»¹À¼ÆÊ±¼ä£¬Òò´ËÓпÉÄܳö´í£¬¼È³Ùµ½µÄʱ¼ä£ºÍíÓÚˮӡ³öÏÖ¡£Èç¹ûÖªµÀʱ¼äµÄ³Ùµ½Ê±¼ä²»»á³¬¹ý5Ã룬¾Í¿ÉÒÔ½«Ë®Ó¡Ê±¼äÉèΪÊÕµ½×î´óʱ¼ä´Á¼õÈ¥5Ãë¡£ÁíÒ»ÖÖ×ö·¨ÊÇ£¬²ÉÓÃÒ»¸öflink×÷ÒµµÄ¼à¿ØÊ¼þÁ÷£¬Ñ§Ï°Ê¼þµÄ³Ùµ½¹æÂÉ£¬²¢ÒԴ˹¹³ÉˮӡµÄÉú³ÉÄ£ÐÍ¡£

ÓÐ״̬µÄ¼ÆËã

Á÷ʧ¼ÆËã·ÖΪÓÐ״̬¼ÆËãºÍÎÞ״̬¼ÆËã¡£ÎÞ״̬¼ÆËãÊǹ۲ìÿһ¸ö¶ÀÁ¢Ê±¼ä£¬²¢¸ù¾Ý×îºóÒ»¸öʱ¼äÊä³öʱ¼ä½á¹û£¬ÓÐ״̬¼ÆËãÔòÊǸù¾Ý¶à¸öʼþÊä³ö½á¹û¡£

ÀýÈç:

¼ÆËã¹ýÈ¥Ò»¸öСʱµÄƽ¾ùζȾÍÊÇÓÐ״̬µÄ¼ÆË㣬Ðè񻃾¼°¶à¸öʼþ¹²Í¬¼ÆËã³öµÄ½á¹û¡£

¹ã²¥±äÁ¿

¹ã²¥±äÁ¿ÔÊÐíÄúΪ²Ù×÷µÄËùÓв¢ÐÐʵÀýÌṩÊý¾Ý¼¯¡£Õâ¶ÔÓÚ¸¨ÖúÊý¾Ý¼¯»òÓëÊý¾ÝÏà¹ØµÄ²ÎÊý»¯·Ç³£ÓÐÓá£È»ºó£¬²Ù×÷Ô±¿ÉÒÔ½«Êý¾Ý¼¯×÷Ϊ¼¯ºÏ·ÃÎÊ¡£

¹ã²¥£º¹ã²¥¼¯Í¨¹ýÃû³Æ×¢²áwithBroadcastSet(DataSet, String)

·ÃÎÊ£º¿Éͨ¹ýgetRuntimeContext() .getBroadcastVariable(String)Ä¿±êÔËÓªÉÌ·ÃÎÊ¡£

val data = env.fromElements("a", "b")
data.map (new RichMapFunction[String, String]() {
var broadcastSet: Traversable [String] = null
override def open (config: Configuration): Unit = {
// 3. Access the broadcast DataSet as a Collection
broadcastSet =getRuntimeContext().
getBroadcastVariable[String("broadcastSetName").asScala
}
def map(in: String): String = {
}}).withBroadcastSet (toBroadcast, "broadcastSetName")

 

×¢Ò⣺ÓÉÓڹ㲥±äÁ¿µÄÄÚÈݱ£´æÔÚÿ¸ö½ÚµãµÄÄÚ´æÖУ¬Òò´Ë²»Ó¦¸Ã±äµÃÌ«´ó¡£¶ÔÓÚ±êÁ¿ÖµÖ®ÀàµÄ¼òµ¥ÊÂÎÄú¿ÉÒÔ¼òµ¥µØ½«²ÎÊý×÷Ϊº¯Êý±Õ°üµÄÒ»²¿·Ö£¬»òÕßʹÓøÃwithParameters(...)·½·¨´«µÝÅäÖá£

   
4884 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ