Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
FlinkµÄ¿ìËÙÈëÃÅ
 
  2558  次浏览      27
 2019-5-9
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚmamicode£¬±¾ÎĽéÉÜÁËFlinkµÄ¸ÅÄFlinkËùÖ§³ÖµÄÌØÐÔ£¬FlinkµÄÈýÖÖ²¿ÊðÄ£ÒÔ¼°Flink¿ª·¢±ê×¼Á÷³ÌµÈÏà¹ØÄÚÈÝ¡£

1. FlinkµÄÒýÈë

Õ⼸Äê´óÊý¾ÝµÄ·ÉËÙ·¢Õ¹£¬³öÏÖÁ˺ܶàÈÈÃŵĿªÔ´ÉçÇø£¬ÆäÖÐÖøÃûµÄÓÐ Hadoop¡¢Storm£¬ÒÔ¼°ºóÀ´µÄ Spark£¬ËûÃǶ¼ÓÐן÷×ÔרעµÄÓ¦Óó¡¾°¡£Spark ÏÆ¿ªÁËÄÚ´æ¼ÆËãµÄÏȺӣ¬Ò²ÒÔÄÚ´æÎª¶Ä×¢£¬Ó®µÃÁËÄÚ´æ¼ÆËãµÄ·ÉËÙ·¢Õ¹¡£Spark µÄ»ðÈÈ»ò¶à»òÉÙµÄÑÚ¸ÇÁËÆäËû·Ö²¼Ê½¼ÆËãµÄϵͳÉíÓ°¡£¾ÍÏñ Flink£¬Ò²¾ÍÔÚÕâ¸öʱºòĬĬµÄ·¢Õ¹×Å¡£

ÔÚ¹úÍâһЩÉçÇø£¬ÓкܶàÈ˽«´óÊý¾ÝµÄ¼ÆËãÒýÇæ·Ö³ÉÁË 4 ´ú£¬µ±È»£¬Ò²ÓкܶàÈ˲»»áÈÏͬ¡£ÎÒÃÇÏȹÃÇÒÕâôÈÏΪºÍÌÖÂÛ¡£

Ê×ÏȵÚÒ»´úµÄ¼ÆËãÒýÇæ£¬ÎÞÒɾÍÊÇ Hadoop ³ÐÔØµÄ MapReduce¡£ÕâÀï´ó¼ÒÓ¦¸Ã¶¼²»»á¶Ô MapReduce İÉú£¬Ëü½«¼ÆËã·ÖΪÁ½¸ö½×¶Î£¬·Ö±ðΪ Map ºÍ Reduce¡£¶ÔÓÚÉϲãÓ¦ÓÃÀ´Ëµ£¬¾Í²»µÃ²»Ïë·½É跨ȥ²ð·ÖËã·¨£¬ÉõÖÁÓÚ²»µÃ²»ÔÚÉϲãÓ¦ÓÃʵÏÖ¶à¸ö Job µÄ´®Áª£¬ÒÔÍê³ÉÒ»¸öÍêÕûµÄËã·¨£¬ÀýÈçµü´ú¼ÆËã¡£

ÓÉÓÚÕâÑùµÄ±×¶Ë£¬´ßÉúÁËÖ§³Ö DAG ¿ò¼ÜµÄ²úÉú¡£Òò´Ë£¬Ö§³Ö DAG µÄ¿ò¼Ü±»»®·ÖΪµÚ¶þ´ú¼ÆËãÒýÇæ¡£Èç Tez ÒÔ¼°¸üÉϲãµÄ Oozie¡£ÕâÀïÎÒÃDz»È¥Ï¸¾¿¸÷ÖÖ DAG ʵÏÖÖ®¼äµÄÇø±ð£¬²»¹ý¶ÔÓÚµ±Ê±µÄ Tez ºÍ Oozie À´Ëµ£¬´ó¶à»¹ÊÇÅú´¦ÀíµÄÈÎÎñ¡£

½ÓÏÂÀ´¾ÍÊÇÒÔ Spark Ϊ´ú±íµÄµÚÈý´úµÄ¼ÆËãÒýÇæ¡£µÚÈý´ú¼ÆËãÒýÇæµÄÌØµãÖ÷ÒªÊÇ Job ÄÚ²¿µÄ DAG Ö§³Ö£¨²»¿çÔ½ Job£©£¬ÒÔ¼°Ç¿µ÷µÄʵʱ¼ÆËã¡£ÔÚÕâÀºÜ¶àÈËÒ²»áÈÏΪµÚÈý´ú¼ÆËãÒýÇæÒ²Äܹ»ºÜºÃµÄÔËÐÐÅú´¦ÀíµÄ Job¡£

Ëæ×ŵÚÈý´ú¼ÆËãÒýÇæµÄ³öÏÖ£¬´Ù½øÁËÉϲãÓ¦ÓÿìËÙ·¢Õ¹£¬ÀýÈç¸÷ÖÖµü´ú¼ÆËãµÄÐÔÄÜÒÔ¼°¶ÔÁ÷¼ÆËãºÍ SQL µÈµÄÖ§³Ö¡£Flink µÄµ®Éú¾Í±»¹éÔÚÁ˵ÚËÄ´ú¡£ÕâÓ¦¸ÃÖ÷Òª±íÏÖÔÚ Flink ¶ÔÁ÷¼ÆËãµÄÖ§³Ö£¬ÒÔ¼°¸üÒ»²½µÄʵʱÐÔÉÏÃæ¡£µ±È» Flink Ò²¿ÉÒÔÖ§³Ö Batch µÄÈÎÎñ£¬ÒÔ¼° DAG µÄÔËËã¡£

Ê×ÏÈ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÐÔÄܲâÊÔ³õ²½Á˽âÁ½¸ö¿ò¼ÜµÄÐÔÄÜÇø±ð£¬ËüÃǶ¼¿ÉÒÔ»ùÓÚÄÚ´æ¼ÆËã¿ò¼Ü½øÐÐʵʱ¼ÆË㣬ËùÒÔ¶¼ÓµÓзdz£ºÃµÄ¼ÆËãÐÔÄÜ¡£¾­¹ý²âÊÔ£¬Flink¼ÆËãÐÔÄÜÉÏÂԺá£

²âÊÔ»·¾³£º

1.CPU£º7000¸ö£»

2.Äڴ棺µ¥»ú128GB£»

3.°æ±¾£ºHadoop 2.3.0£¬Spark 1.4£¬Flink 0.9

4.Êý¾Ý£º800MB£¬8GB£¬8TB£»

5.Ëã·¨£ºK-means£ºÒÔ¿Õ¼äÖÐK¸öµãΪÖÐÐĽøÐоÛÀ࣬¶Ô×î¿¿½üËüÃǵĶÔÏó¹éÀࡣͨ¹ýµü´úµÄ·½·¨£¬Öð´Î¸üи÷¾ÛÀàÖÐÐĵÄÖµ£¬Ö±ÖÁµÃµ½×îºÃµÄ¾ÛÀà½á¹û¡£

6.µü´ú£ºK=10£¬3×éÊý¾Ý

µü´ú´ÎÊý£¨×Ý×ø±êÊÇÃ룬ºá×ø±êÊÇ´ÎÊý£©

SparkºÍFlinkÈ«²¿¶¼ÔËÐÐÔÚHadoop YARNÉÏ£¬ÐÔÄÜΪFlink > Spark > Hadoop(MR)£¬µü´ú´ÎÊýÔ½¶àÔ½Ã÷ÏÔ£¬ÐÔÄÜÉÏ£¬FlinkÓÅÓÚSparkºÍHadoop×îÖ÷ÒªµÄÔ­ÒòÊÇFlinkÖ§³ÖÔöÁ¿µü´ú£¬¾ßÓжԵü´ú×Ô¶¯ÓÅ»¯µÄ¹¦ÄÜ¡£

2. Flink¼ò½é

ºÜ¶àÈË¿ÉÄܶ¼ÊÇÔÚ 2015 Äê²ÅÌýµ½ Flink Õâ¸ö´Ê£¬ÆäʵÔçÔÚ 2008 Ä꣬Flink µÄǰÉíÒѾ­ÊǰØÁÖÀí¹¤´óѧһ¸öÑо¿ÐÔÏîÄ¿£¬ ÔÚ 2014 ±» Apache ·õ»¯Æ÷Ëù½ÓÊÜ£¬È»ºóѸËٵسÉΪÁË ASF£¨Apache Software Foundation£©µÄ¶¥¼¶ÏîĿ֮һ¡£Flink µÄ×îа汾ĿǰÒѾ­¸üе½ÁË 0.10.0 ÁË£¬ÔںܶàÈ˸п® Spark µÄ¿ìËÙ·¢Õ¹µÄͬʱ£¬»òÐíÎÒÃÇÒ²¸ÃΪ Flink µÄ·¢Õ¹Ëٶȵã¸öÔÞ¡£

Flink ÊÇÒ»¸öÕë¶ÔÁ÷Êý¾ÝºÍÅúÊý¾ÝµÄ·Ö²¼Ê½´¦ÀíÒýÇæ¡£ËüÖ÷ÒªÊÇÓÉ Java ´úÂëʵÏÖ¡£Ä¿Ç°Ö÷Òª»¹ÊÇÒÀ¿¿¿ªÔ´ÉçÇøµÄ¹±Ï×¶ø·¢Õ¹¡£¶Ô Flink ¶øÑÔ£¬ÆäËùÒª´¦ÀíµÄÖ÷Òª³¡¾°¾ÍÊÇÁ÷Êý¾Ý£¬ÅúÊý¾ÝÖ»ÊÇÁ÷Êý¾ÝµÄÒ»¸ö¼«ÏÞÌØÀý¶øÒÑ¡£ÔÙ»»¾ä»°Ëµ£¬Flink »á°ÑËùÓÐÈÎÎñµ±³ÉÁ÷À´´¦Àí£¬ÕâÒ²ÊÇÆä×î´óµÄÌØµã¡£

Flink ¿ÉÒÔÖ§³Ö±¾µØµÄ¿ìËÙµü´ú£¬ÒÔ¼°Ò»Ð©»·Ðεĵü´úÈÎÎñ¡£²¢ÇÒ Flink ¿ÉÒÔ¶¨ÖÆ»¯ÄÚ´æ¹ÜÀí¡£ÔÚÕâµã£¬Èç¹ûÒª¶Ô±È Flink ºÍ Spark µÄ»°£¬Flink ²¢Ã»Óн«ÄÚ´æÍêÈ«½»¸øÓ¦Óò㡣ÕâÒ²ÊÇΪʲô Spark Ïà¶ÔÓÚ Flink£¬¸üÈÝÒ׳öÏÖ OOM µÄÔ­Òò£¨out of memory£©¡£¾Í¿ò¼Ü±¾ÉíÓëÓ¦Óó¡¾°À´Ëµ£¬Flink ¸üÏàËÆÓë Storm¡£Èç¹û֮ǰÁ˽â¹ý Storm »òÕß Flume µÄ¶ÁÕߣ¬¿ÉÄÜ»á¸üÈÝÒ×Àí½â Flink µÄ¼Ü¹¹ºÍºÜ¶à¸ÅÄî¡£ÏÂÃæÈÃÎÒÃÇÏÈÀ´¿´Ï Flink µÄ¼Ü¹¹Í¼¡£

ÎÒÃÇ¿ÉÒÔÁ˽⵽ Flink ¼¸¸ö×î»ù´¡µÄ¸ÅÄClient¡¢JobManager ºÍ TaskManager¡£Client ÓÃÀ´Ìá½»ÈÎÎñ¸ø JobManager£¬JobManager ·Ö·¢ÈÎÎñ¸ø TaskManager È¥Ö´ÐУ¬È»ºó TaskManager »áÐÄÌøµÄ»ã±¨ÈÎÎñ״̬¡£¿´µ½ÕâÀÓеÄÈËÓ¦¸ÃÒѾ­ÓÐÖֻص½ Hadoop Ò»´úµÄ´í¾õ¡£È·Êµ£¬´Ó¼Ü¹¹Í¼È¥¿´£¬JobManager ºÜÏñµ±ÄêµÄ JobTracker£¬TaskManager Ò²ºÜÏñµ±ÄêµÄ TaskTracker¡£È»¶øÓÐÒ»¸ö×îÖØÒªµÄÇø±ð¾ÍÊÇ TaskManager Ö®¼äÊÇÊÇÁ÷£¨Stream£©¡£Æä´Î£¬Hadoop Ò»´úÖУ¬Ö»ÓÐ Map ºÍ Reduce Ö®¼äµÄ Shuffle£¬¶ø¶Ô Flink ¶øÑÔ£¬¿ÉÄÜÊǺܶ༶£¬²¢ÇÒÔÚ TaskManager ÄÚ²¿ºÍ TaskManager Ö®¼ä¶¼»áÓÐÊý¾Ý´«µÝ£¬¶ø²»Ïñ Hadoop£¬Êǹ̶¨µÄ Map µ½ Reduce¡£

3. ¼¼ÊõµÄÌØµã£¨¿ÉÑ¡£©

¹ØÓÚFlinkËùÖ§³ÖµÄÌØÐÔ£¬ÎÒÕâÀïÖ»ÊÇͨ¹ý·ÖÀàµÄ·½Ê½¼òµ¥×öÒ»ÏÂÊáÀí£¬Éæ¼°µ½¾ßÌåµÄһЩ¸ÅÄî¼°ÆäÔ­Àí»áÔÚºóÃæµÄ²¿·Ö×öÏêϸ˵Ã÷¡£

3.1. Á÷´¦ÀíÌØÐÔ

Ö§³Ö¸ßÍÌÍ¡¢µÍÑÓ³Ù¡¢¸ßÐÔÄܵÄÁ÷´¦Àí

Ö§³Ö´øÓÐʼþʱ¼äµÄ´°¿Ú£¨Window£©²Ù×÷

Ö§³ÖÓÐ״̬¼ÆËãµÄExactly-onceÓïÒå

Ö§³Ö¸ß¶ÈÁé»îµÄ´°¿Ú£¨Window£©²Ù×÷£¬Ö§³Ö»ùÓÚtime¡¢count¡¢session£¬ÒÔ¼°data-drivenµÄ´°¿Ú²Ù×÷

Ö§³Ö¾ßÓÐBackpressure¹¦ÄܵijÖÐøÁ÷Ä£ÐÍ

Ö§³Ö»ùÓÚÇáÁ¿¼¶·Ö²¼Ê½¿ìÕÕ£¨Snapshot£©ÊµÏÖµÄÈÝ´í

Ò»¸öÔËÐÐʱͬʱ֧³ÖBatch on Streaming´¦ÀíºÍStreaming´¦Àí

FlinkÔÚJVMÄÚ²¿ÊµÏÖÁË×Ô¼ºµÄÄÚ´æ¹ÜÀí

Ö§³Öµü´ú¼ÆËã

Ö§³Ö³ÌÐò×Ô¶¯ÓÅ»¯£º±ÜÃâÌØ¶¨Çé¿öÏÂShuffle¡¢ÅÅÐòµÈ°º¹ó²Ù×÷£¬Öмä½á¹ûÓбØÒª½øÐлº´æ

3.2. APIÖ§³Ö

¶ÔStreamingÊý¾ÝÀàÓ¦Óã¬ÌṩDataStream API

¶ÔÅú´¦ÀíÀàÓ¦Óã¬ÌṩDataSet API£¨Ö§³ÖJava/Scala£©

3.3. LibrariesÖ§³Ö

Ö§³Ö»úÆ÷ѧϰ£¨FlinkML£©

Ö§³Öͼ·ÖÎö£¨Gelly£©

Ö§³Ö¹ØÏµÊý¾Ý´¦Àí£¨Table£©

Ö§³Ö¸´ÔÓʼþ´¦Àí£¨CEP£©

3.4. ÕûºÏÖ§³Ö

Ö§³ÖFlink on YARN

Ö§³ÖHDFS

Ö§³ÖÀ´×ÔKafkaµÄÊäÈëÊý¾Ý

Ö§³ÖApache HBase

Ö§³ÖHadoop³ÌÐò

Ö§³ÖTachyon

Ö§³ÖElasticSearch

Ö§³ÖRabbitMQ

Ö§³ÖApache Storm

Ö§³ÖS3

Ö§³ÖXtreemFS

3.5. FlinkÉú̬Ȧ

Ò»¸ö¼ÆËã¿ò¼ÜÒªÓг¤Ô¶µÄ·¢Õ¹£¬±ØÐë´òÔìÒ»¸öÍêÕûµÄ Stack¡£²»È»¾Í¸úÖ½ÉÏ̸±øÒ»Ñù£¬Ã»ÓÐÈκÎÒâÒå¡£Ö»ÓÐÉϲãÓÐÁ˾ßÌåµÄÓ¦Ó㬲¢Äܺܺõķ¢»Ó¼ÆËã¿ò¼Ü±¾ÉíµÄÓÅÊÆ£¬ÄÇôÕâ¸ö¼ÆËã¿ò¼Ü²ÅÄÜÎüÒý¸ü¶àµÄ×ÊÔ´£¬²Å»á¸ü¿ìµÄ½ø²½¡£ËùÒÔ Flink Ò²ÔÚŬÁ¦¹¹½¨×Ô¼ºµÄ Stack¡£

Flink Ê×ÏÈÖ§³ÖÁË Scala ºÍ Java µÄ API£¬Python Ò²ÕýÔÚ²âÊÔÖС£Flink ͨ¹ý Gelly Ö§³ÖÁËͼ²Ù×÷£¬»¹ÓлúÆ÷ѧϰµÄ FlinkML¡£Table ÊÇÒ»ÖÖ½Ó¿Ú»¯µÄ SQL Ö§³Ö£¬Ò²¾ÍÊÇ API Ö§³Ö£¬¶ø²»ÊÇÎı¾»¯µÄ SQL ½âÎöºÍÖ´ÐС£¶ÔÓÚÍêÕûµÄ Stack ÎÒÃÇ¿ÉÒԲο¼ÏÂͼ¡£

Flink ΪÁ˸ü¹ã·ºµÄÖ§³Ö´óÊý¾ÝµÄÉú̬Ȧ£¬ÆäÏÂҲʵÏÖÁ˺ܶà Connector µÄ×ÓÏîÄ¿¡£×îÊìϤµÄ£¬µ±È»¾ÍÊÇÓë Hadoop HDFS ¼¯³É¡£Æä´Î£¬Flink Ò²Ðû²¼Ö§³ÖÁË Tachyon¡¢S3 ÒÔ¼° MapRFS¡£²»¹ý¶ÔÓÚ Tachyon ÒÔ¼° S3 µÄÖ§³Ö£¬¶¼ÊÇͨ¹ý Hadoop HDFS Õâ²ã°üװʵÏֵģ¬Ò²¾ÍÊÇ˵ҪʹÓà Tachyon ºÍ S3£¬¾Í±ØÐëÓÐ Hadoop£¬¶øÇÒÒª¸ü¸Ä Hadoop µÄÅäÖã¨core-site.xml£©¡£Èç¹ûä¯ÀÀ Flink µÄ´úÂëĿ¼£¬ÎÒÃǾͻῴµ½¸ü¶à Connector ÏîÄ¿£¬ÀýÈç Flume ºÍ Kafka¡£

4. °²×°

Flink ÓÐÈýÖÖ²¿Êðģʽ£¬·Ö±ðÊÇ Local¡¢Standalone Cluster ºÍ Yarn Cluster¡£

4.1. Localģʽ

¶ÔÓÚ Local ģʽÀ´Ëµ£¬JobManager ºÍ TaskManager »á¹«ÓÃÒ»¸ö JVM À´Íê³É Workload¡£Èç¹ûÒªÑéÖ¤Ò»¸ö¼òµ¥µÄÓ¦Óã¬Local ģʽÊÇ×î·½±ãµÄ¡£Êµ¼ÊÓ¦ÓÃÖдó¶àʹÓà Standalone »òÕß Yarn Cluster£¬¶ølocalģʽֻÊǽ«°²×°°ü½âѹÆô¶¯£¨./bin/start-local.sh£©¼´¿É£¬ÔÚÕâÀï²»ÔÚÑÝʾ¡£

4.2. Standalone ģʽ

4.2.1. ÏÂÔØ

°²×°°üÏÂÔØµØÖ·£ºhttp://flink.apache.org/downloads.html

¿ìËÙÈëÃŽ̵̳ØÖ·£º

https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html

4.2.2. ÉÏ´«°²×°°üµ½linuxϵͳ

ʹÓÃrzÃüÁî

4.2.3. ½âѹ

tar ¨Czxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz

4.2.4. ÖØÃüÃû

mv flink-1.3.2 flink

4.2.5. Ð޸Ļ·¾³±äÁ¿

Çл»µ½rootÓû§ÅäÖÃ
export FLINK_HOME=/home/hadoop/flink
export PATH=$PATH:$FLINK_HOME/bin
ÅäÖýáÊøºóÇл»»áÆÕͨÓû§
source /etc/profile

4.2.6. ÐÞ¸ÄÅäÖÃÎļþ

ÐÞ¸Äflink/conf/masters
master1:8081
ÐÞ¸Äflink/conf/slaves
master1ha
master2
master2ha
ÐÞ¸Äflink/conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 2
jobmanager.rpc.address: master1

4.2.7. Æô¶¯flink

/home/Hadoop/flink/bin/start-cluster.sh

4.2.8. Flink µÄ Rest API

Flink ºÍÆäËû´ó¶à¿ªÔ´µÄ¿ò¼ÜÒ»Ñù£¬ÌṩÁ˺ܶàÓÐÓÃµÄ Rest API¡£²»¹ý Flink µÄ RestAPI£¬Ä¿Ç°»¹²»ÊǺÜÇ¿´ó£¬Ö»ÄÜÖ§³ÖһЩ Monitor µÄ¹¦ÄÜ¡£Flink Dashboard ±¾ÉíÒ²ÊÇͨ¹ýÆä Rest À´²éѯ¸÷ÏîµÄ½á¹ûÊý¾Ý¡£ÔÚ Flink RestAPI »ù´¡ÉÏ£¬¿ÉÒԱȽÏÈÝÒ׵Ľ« Flink µÄ Monitor ¹¦ÄÜºÍÆäËûµÚÈý·½¹¤¾ßÏ༯³É£¬ÕâÒ²ÊÇÆäÉè¼ÆµÄ³õÖÔ¡£

ÔÚ Flink µÄ½ø³ÌÖУ¬ÊÇÓÉ JobManager À´Ìṩ Rest API µÄ·þÎñ¡£Òò´ËÔÚµ÷Óà Rest ֮ǰ£¬ÒªÈ·¶¨ JobManager ÊÇ·ñ´¦ÓÚÕý³£µÄ״̬¡£Õý³£Çé¿öÏ£¬ÔÚ·¢ËÍÒ»¸ö Rest ÇëÇó¸ø JobManager Ö®ºó£¬Client ¾Í»áÊÕµ½Ò»¸ö JSON ¸ñʽµÄ·µ»Ø½á¹û¡£ÓÉÓÚĿǰ Rest ÌṩµÄ¹¦ÄÜ»¹²»¶à£¬ÐèÒªÔöÇ¿Õâ¿é¹¦ÄܵĶÁÕß¿ÉÒÔÔÚ×ÓÏîÄ¿ flink-runtime-web ÖÐÕÒµ½¶ÔÓ¦µÄ´úÂë¡£ÆäÖÐ×î¹Ø¼üÒ»¸öÀà WebRuntimeMonitor£¬¾ÍÊÇÓÃÀ´¶ÔËùÓÐµÄ Rest ÇëÇó×ö·ÖÁ÷µÄ£¬Èç¹ûÐèÒªÌí¼ÓÒ»¸öÐÂÀàÐ͵ÄÇëÇ󣬾ÍÐèÒªÔÚÕâÀïÔö¼Ó¶ÔÓ¦µÄ´¦Àí´úÂë¡£ÏÂÃæÎÒÀý¾Ù¼¸¸ö³£Óà Rest API¡£

1.²éѯ Flink ¼¯ÈºµÄ»ù±¾ÐÅÏ¢: /overview¡£Ê¾ÀýÃüÁîÐиñʽÒÔ¼°·µ»Ø½á¹ûÈçÏ£º

$ curl http://localhost:8081/overview
{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

2.²éѯµ±Ç° Flink ¼¯ÈºÖÐµÄ Job ÐÅÏ¢£º/jobs¡£Ê¾ÀýÃüÁîÐиñʽÒÔ¼°·µ»Ø½á¹ûÈçÏ£º

$ curl http://localhost:8081/jobs
{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

3.²éѯһ¸öÖ¸¶¨µÄ Job ÐÅÏ¢: /jobs/jobid¡£Õâ¸ö²éѯµÄ½á¹û»á·µ»ØÌرð¶àµÄÏêϸµÄÄÚÈÝ£¬ÕâÊÇÎÒÔÚä¯ÀÀÆ÷ÖнøÐеIJâÊÔ£¬ÈçÏÂͼ£º

ÏëÒªÁ˽â¸ü¶à Rest ÇëÇóÄÚÈݵĶÁÕߣ¬¿ÉÒÔÈ¥ Apache Flink µÄÒ³ÃæÖвéÕÒ¡£

4.2.9. ÔËÐвâÊÔÈÎÎñ

./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout

4.3. Flink µÄ HA

Ê×ÏÈ£¬ÎÒÃÇÐèÒªÖªµÀ Flink ÓÐÁ½ÖÖ²¿ÊðµÄģʽ£¬·Ö±ðÊÇ Standalone ÒÔ¼° Yarn Cluster ģʽ¡£¶ÔÓÚ Standalone À´Ëµ£¬Flink ±ØÐëÒÀÀµÓÚ Zookeeper À´ÊµÏÖ JobManager µÄ HA£¨Zookeeper ÒѾ­³ÉΪÁ˴󲿷ֿªÔ´¿ò¼Ü HA ±Ø²»¿ÉÉÙµÄÄ£¿é£©¡£ÔÚ Zookeeper µÄ°ïÖúÏ£¬Ò»¸ö Standalone µÄ Flink ¼¯Èº»áͬʱÓжà¸ö»î×ÅµÄ JobManager£¬ÆäÖÐÖ»ÓÐÒ»¸ö´¦ÓÚ¹¤×÷״̬£¬ÆäËû´¦ÓÚ Standby ״̬¡£µ±¹¤×÷ÖÐµÄ JobManager ʧȥÁ¬½Óºó£¨Èçå´»ú»ò Crash£©£¬Zookeeper »á´Ó Standby ÖÐÑ¡¾ÙÐ嵀 JobManager À´½Ó¹Ü Flink ¼¯Èº¡£

¶ÔÓÚ Yarn Cluaster ģʽÀ´Ëµ£¬Flink ¾ÍÒªÒÀ¿¿ Yarn ±¾ÉíÀ´¶Ô JobManager ×ö HA ÁË¡£ÆäʵÕâÀïÍêÈ«ÊÇ Yarn µÄ»úÖÆ¡£¶ÔÓÚ Yarn Cluster ģʽÀ´Ëµ£¬JobManager ºÍ TaskManager ¶¼ÊDZ» Yarn Æô¶¯ÔÚ Yarn µÄ Container ÖС£´ËʱµÄ JobManager£¬ÆäʵӦ¸Ã³ÆÖ®Îª Flink Application Master¡£Ò²¾Í˵ËüµÄ¹ÊÕϻָ´£¬¾ÍÍêÈ«ÒÀ¿¿×Å Yarn ÖÐµÄ ResourceManager£¨ºÍ MapReduce µÄ AppMaster Ò»Ñù£©¡£ÓÉÓÚÍêÈ«ÒÀÀµÁË Yarn£¬Òò´Ë²»Í¬°æ±¾µÄ Yarn ¿ÉÄÜ»áÓÐϸ΢µÄ²îÒì¡£ÕâÀï²»ÔÙ×öÉ¡£

4.3.1. ÐÞ¸ÄÅäÖÃÎļþ

ÐÞ¸Äflink-conf.yaml

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://master1:9000/flink/ha/
high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181
high-availability.zookeeper.client.acl: open¡¡

ÐÞ¸Äconf

server.1=master1ha:2888:3888
server.2=master2:2888:3888
server.3=master2ha:2888:3888

ÐÞ¸Ämasters

master1:8082
master1ha:8082

ÐÞ¸Äslaves

master1ha
master2
master2ha

4.3.2. Æô¶¯

/home/Hadoop/flink/bin/start-cluster.sh

4.4. Yarn Cluster ģʽ

4.4.1. ÒýÈë

ÔÚÒ»¸öÆóÒµÖУ¬ÎªÁË×î´ó»¯µÄÀûÓü¯Èº×ÊÔ´£¬Ò»°ã¶¼»áÔÚÒ»¸ö¼¯ÈºÖÐͬʱÔËÐжàÖÖÀàÐ굀 Workload¡£Òò´Ë Flink Ò²Ö§³ÖÔÚ Yarn ÉÏÃæÔËÐС£Ê×ÏÈ£¬ÈÃÎÒÃÇͨ¹ýÏÂͼÁ˽âÏ Yarn ºÍ Flink µÄ¹ØÏµ¡£

ÔÚͼÖпÉÒÔ¿´³ö£¬Flink Óë Yarn µÄ¹ØÏµÓë MapReduce ºÍ Yarn µÄ¹ØÏµÊÇÒ»ÑùµÄ¡£Flink ͨ¹ý Yarn µÄ½Ó¿ÚʵÏÖÁË×Ô¼ºµÄ App Master¡£µ±ÔÚ Yarn Öв¿ÊðÁË Flink£¬Yarn ¾Í»áÓÃ×Ô¼ºµÄ Container À´Æô¶¯ Flink µÄ JobManager£¨Ò²¾ÍÊÇ App Master£©ºÍ TaskManager¡£

4.4.2. Ð޸Ļ·¾³±äÁ¿

export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop

4.4.3. ²¿ÊðÆô¶¯

yarn-session.sh -d -s 2 -tm 800 -n 2

ÉÏÃæµÄÃüÁîµÄÒâ˼ÊÇ£¬Í¬Ê±ÏòYarnÉêÇë3¸öcontainer£¬ÆäÖÐ 2 ¸ö Container Æô¶¯ TaskManager£¨-n 2£©£¬Ã¿¸ö TaskManager ÓµÓÐÁ½¸ö Task Slot£¨-s 2£©£¬²¢ÇÒÏòÿ¸ö TaskManager µÄ Container ÉêÇë 800M µÄÄڴ棬ÒÔ¼°Ò»¸öApplicationMaster£¨Job Manager£©¡£

Flink²¿Êðµ½Yarn Clusterºó£¬»áÏÔʾJob ManagerµÄÁ¬½Óϸ½ÚÐÅÏ¢¡£

Flink on Yarn»á¸²¸ÇÏÂÃæ¼¸¸ö²ÎÊý£¬Èç¹û²»Ï£Íû¸Ä±äÅäÖÃÎļþÖеIJÎÊý£¬¿ÉÒÔ¶¯Ì¬µÄͨ¹ý-DÑ¡ÏîÖ¸¶¨£¬Èç -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

jobmanager.rpc.address£ºÒòΪJobManager»á¾­³£·ÖÅäµ½²»Í¬µÄ»úÆ÷ÉÏ

taskmanager.tmp.dirs£ºÊ¹ÓÃYarnÌṩµÄtmpĿ¼

parallelism.default£ºÈç¹ûÓÐÖ¸¶¨slot¸öÊýµÄÇé¿öÏÂ

yarn-session.sh»á¹ÒÆð½ø³Ì£¬ËùÒÔ¿ÉÒÔͨ¹ýÔÚÖÕ¶ËʹÓÃCTRL+C»òÊäÈëstopÍ£Ö¹yarn-session¡£

Èç¹û²»Ï£ÍûFlink Yarn client³¤ÆÚÔËÐУ¬FlinkÌṩÁËÒ»ÖÖdetached YARN session£¬Æô¶¯Ê±ºò¼ÓÉϲÎÊý-d»ò¡ªdetached

ÔÚÉÏÃæµÄÃüÁî³É¹¦ºó£¬ÎÒÃǾͿÉÒÔÔÚ Yarn Application Ò³Ãæ¿´µ½ Flink µÄ¼Í¼¡£ÈçÏÂͼ¡£

Èç¹ûÔÚÐéÄâ»úÖвâÊÔ£¬¿ÉÄÜ»áÓöµ½´íÎó¡£ÕâÀïÐèҪעÒâÄÚ´æµÄ´óС£¬Flink Ïò Yarn »áÉêÇë¶à¸ö Container£¬µ«ÊÇ Yarn µÄÅäÖÿÉÄÜÏÞÖÆÁË Container ËùÄÜÉêÇëµÄÄÚ´æ´óС£¬ÉõÖÁ Yarn ±¾ÉíËù¹ÜÀíµÄÄÚ´æ¾ÍºÜС¡£ÕâÑùºÜ¿ÉÄÜÎÞ·¨Õý³£Æô¶¯ TaskManager£¬ÓÈÆäµ±Ö¸¶¨¶à¸ö TaskManager µÄʱºò¡£Òò´Ë£¬ÔÚÆô¶¯ Flink Ö®ºó£¬ÐèҪȥ Flink µÄÒ³ÃæÖмì²éÏ Flink µÄ״̬¡£ÕâÀï¿ÉÒÔ´Ó RM µÄÒ³ÃæÖУ¬Ö±½ÓÌø×ª£¨µã»÷ Tracking UI£©¡£Õâʱºò Flink µÄÒ³ÃæÈçͼ

yarn-session.shÆô¶¯ÃüÁî²ÎÊýÈçÏ£º

Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-st,--streaming Start Flink in streaming mode
-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]

4.4.4. Ìá½»ÈÎÎñ

Ö®ºó£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÕâÖÖ·½Ê½Ìá½»ÎÒÃǵÄÈÎÎñ

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

ÒÔÉÏÃüÁîÔÚ²ÎÊýǰ¼ÓÉÏyǰ׺£¬-yn±íʾTaskManager¸öÊý¡£

ÔÚÕâ¸öģʽÏ£¬Í¬Ñù¿ÉÒÔʹÓÃ-m yarn-clusterÌá½»Ò»¸ö"ÔËÐк󼴷Ù"µÄdetached yarn£¨-yd£©×÷Òµµ½yarn cluster¡£

4.4.5. ֹͣyarn cluster

yarn application -kill application_1507603745315_0001

5. ¼¼ÊõµÄʹÓÃ

5.1. Flink¿ª·¢±ê×¼Á÷³Ì

»ñÈ¡execution environment,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

¼ÓÔØ/´´½¨³õʼ»¯Êý¾Ý

DataStream<String> text = env.readTextFile("file:///path/to/file");

Ö¸¶¨ transformations ×÷ÓÃÔÚÊý¾ÝÉÏ

val mapped = input.map { x => x.toInt }

´æ´¢½á¹û¼¯

writeAsText(String path)

print()

´¥·¢³ÌÐòÖ´ÐÐ

ÔÚlocalģʽÏÂÖ´ÐгÌÐò

execute()

½«³ÌÐò´ï³ÉjarÔËÐÐÔÚÏßÉÏ

./bin/flink run \

-m master1:8082 \

./examples/batch/WordCount.jar \

--input hdfs://master1:9000/words.txt \

--output hdfs://master1:9000/clinkout \

5.2. Wordcount

5.2.1. Scala´úÂë

object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run ¡®SocketWindowWordCount --port <port>¡®")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("localhost", port, ¡®\n¡®)
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}

5.2.2. Java´úÂë

public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run ¡®SocketWindowWordCount --port <port>¡®");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}¡¡

5.2.3. ÔËÐÐ

l Æô¶¯nc·¢ËÍÏûÏ¢

$ nc -l 9000

l Æô¶¯flink³ÌÐò

$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

5.2.4. ²âÊÔ

l ÊäÈë

$ nc -l 9000

lorem ipsum
ipsum ipsum ipsum
bye

l Êä³ö

$ tail -f log/flink-*-jobmanager-*.out

lorem : 1
bye : 1
ipsum : 4

5.3. ʹÓÃIDEA¿ª·¢ÀëÏß³ÌÐò

DatasetÊÇflinkµÄ³£ÓóÌÐò£¬Êý¾Ý¼¯Í¨¹ýsource½øÐгõʼ»¯£¬ÀýÈç¶ÁÈ¡Îļþ»òÕßÐòÁл¯¼¯ºÏ£¬È»ºóͨ¹ýtransformation£¨filtering¡¢mapping¡¢joining¡¢grouping£©½«Êý¾Ý¼¯×ª³É£¬È»ºóͨ¹ýsink½øÐд洢£¬¼È¿ÉÒÔдÈëhdfsÕâÖÖ·Ö²¼Ê½Îļþϵͳ£¬Ò²¿ÉÒÔ´òÓ¡¿ØÖÆÌ¨£¬flink¿ÉÒÔÓкܶàÖÖÔËÐз½Ê½£¬Èçlocal¡¢flink¼¯Èº¡¢yarnµÈ

5.3.1. Pom

n Java

<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.2</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<hadoop.version>2.6.2</hadoop.version>
<flink.version>1.3.2</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.
resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>¡¡

n Scala

<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.2</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<hadoop.version>2.6.2</hadoop.version>
<flink.version>1.3.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation=
"org.apache.maven.plugins.shade.resource.
ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

5.3.2. ³ÌÐò

n Java

public class WordCountExample {
public static void main(String[] args) throws Exception {
//¹¹½¨»·¾³
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//ͨ¹ý×Ö·û´®¹¹½¨Êý¾Ý¼¯
DataSet<String> text = env.fromElements(
"Who¡®s there?",
"I think I hear them. Stand, ho! Who¡®s there?");
//·Ö¸î×Ö·û´®¡¢°´ÕÕkey½øÐзÖ×顢ͳ¼ÆÏàͬµÄkey¸öÊý
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//´òÓ¡
wordCounts.print();
}
//·Ö¸î×Ö·û´®µÄ·½·¨
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

n Scala

import org.apache.flink.api.scala._

object WordCount {
def main(args: Array[String]) {
//³õʼ»¯»·¾³
val env = ExecutionEnvironment.getExecutionEnvironment
//´Ó×Ö·û´®ÖмÓÔØÊý¾Ý
val text = env.fromElements(
"Who¡®s there?",
"I think I hear them. Stand, ho! Who¡®s there?")
//·Ö¸î×Ö·û´®¡¢»ã×Ütuple¡¢°´ÕÕkey½øÐзÖ×顢ͳ¼Æ·Ö×éºóword¸öÊý
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//´òÓ¡
counts.print()
}
}

5.3.3. ÔËÐÐ

n ±¾µØ

Ö±½Órunas¼´¿É

n ÏßÉÏ

1¡¢ ´ò°ü

2¡¢ ÉÏ´«

3¡¢ Ö´ÐÐÃüÁflink run -m master1:8082 -c org.apache.flink.WordCount original-Flink-1.0-SNAPSHOT.jar

   
2558 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ