±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚmamicode£¬±¾ÎĽéÉÜÁËFlinkµÄ¸ÅÄFlinkËùÖ§³ÖµÄÌØÐÔ£¬FlinkµÄÈýÖÖ²¿ÊðÄ£ÒÔ¼°Flink¿ª·¢±ê×¼Á÷³ÌµÈÏà¹ØÄÚÈÝ¡£ |
|
1. FlinkµÄÒýÈë
Õ⼸Äê´óÊý¾ÝµÄ·ÉËÙ·¢Õ¹£¬³öÏÖÁ˺ܶàÈÈÃŵĿªÔ´ÉçÇø£¬ÆäÖÐÖøÃûµÄÓÐ Hadoop¡¢Storm£¬ÒÔ¼°ºóÀ´µÄ
Spark£¬ËûÃǶ¼ÓÐן÷×ÔרעµÄÓ¦Óó¡¾°¡£Spark ÏÆ¿ªÁËÄÚ´æ¼ÆËãµÄÏȺӣ¬Ò²ÒÔÄÚ´æÎª¶Ä×¢£¬Ó®µÃÁËÄÚ´æ¼ÆËãµÄ·ÉËÙ·¢Õ¹¡£Spark
µÄ»ðÈÈ»ò¶à»òÉÙµÄÑÚ¸ÇÁËÆäËû·Ö²¼Ê½¼ÆËãµÄϵͳÉíÓ°¡£¾ÍÏñ Flink£¬Ò²¾ÍÔÚÕâ¸öʱºòĬĬµÄ·¢Õ¹×Å¡£
ÔÚ¹úÍâһЩÉçÇø£¬ÓкܶàÈ˽«´óÊý¾ÝµÄ¼ÆËãÒýÇæ·Ö³ÉÁË 4 ´ú£¬µ±È»£¬Ò²ÓкܶàÈ˲»»áÈÏͬ¡£ÎÒÃÇÏȹÃÇÒÕâôÈÏΪºÍÌÖÂÛ¡£
Ê×ÏȵÚÒ»´úµÄ¼ÆËãÒýÇæ£¬ÎÞÒɾÍÊÇ Hadoop ³ÐÔØµÄ MapReduce¡£ÕâÀï´ó¼ÒÓ¦¸Ã¶¼²»»á¶Ô MapReduce
İÉú£¬Ëü½«¼ÆËã·ÖΪÁ½¸ö½×¶Î£¬·Ö±ðΪ Map ºÍ Reduce¡£¶ÔÓÚÉϲãÓ¦ÓÃÀ´Ëµ£¬¾Í²»µÃ²»Ïë·½É跨ȥ²ð·ÖËã·¨£¬ÉõÖÁÓÚ²»µÃ²»ÔÚÉϲãÓ¦ÓÃʵÏÖ¶à¸ö
Job µÄ´®Áª£¬ÒÔÍê³ÉÒ»¸öÍêÕûµÄËã·¨£¬ÀýÈçµü´ú¼ÆËã¡£
ÓÉÓÚÕâÑùµÄ±×¶Ë£¬´ßÉúÁËÖ§³Ö DAG ¿ò¼ÜµÄ²úÉú¡£Òò´Ë£¬Ö§³Ö DAG µÄ¿ò¼Ü±»»®·ÖΪµÚ¶þ´ú¼ÆËãÒýÇæ¡£Èç
Tez ÒÔ¼°¸üÉϲãµÄ Oozie¡£ÕâÀïÎÒÃDz»È¥Ï¸¾¿¸÷ÖÖ DAG ʵÏÖÖ®¼äµÄÇø±ð£¬²»¹ý¶ÔÓÚµ±Ê±µÄ Tez
ºÍ Oozie À´Ëµ£¬´ó¶à»¹ÊÇÅú´¦ÀíµÄÈÎÎñ¡£
½ÓÏÂÀ´¾ÍÊÇÒÔ Spark Ϊ´ú±íµÄµÚÈý´úµÄ¼ÆËãÒýÇæ¡£µÚÈý´ú¼ÆËãÒýÇæµÄÌØµãÖ÷ÒªÊÇ Job ÄÚ²¿µÄ DAG
Ö§³Ö£¨²»¿çÔ½ Job£©£¬ÒÔ¼°Ç¿µ÷µÄʵʱ¼ÆËã¡£ÔÚÕâÀºÜ¶àÈËÒ²»áÈÏΪµÚÈý´ú¼ÆËãÒýÇæÒ²Äܹ»ºÜºÃµÄÔËÐÐÅú´¦ÀíµÄ
Job¡£
Ëæ×ŵÚÈý´ú¼ÆËãÒýÇæµÄ³öÏÖ£¬´Ù½øÁËÉϲãÓ¦ÓÿìËÙ·¢Õ¹£¬ÀýÈç¸÷ÖÖµü´ú¼ÆËãµÄÐÔÄÜÒÔ¼°¶ÔÁ÷¼ÆËãºÍ SQL µÈµÄÖ§³Ö¡£Flink
µÄµ®Éú¾Í±»¹éÔÚÁ˵ÚËÄ´ú¡£ÕâÓ¦¸ÃÖ÷Òª±íÏÖÔÚ Flink ¶ÔÁ÷¼ÆËãµÄÖ§³Ö£¬ÒÔ¼°¸üÒ»²½µÄʵʱÐÔÉÏÃæ¡£µ±È»
Flink Ò²¿ÉÒÔÖ§³Ö Batch µÄÈÎÎñ£¬ÒÔ¼° DAG µÄÔËËã¡£
Ê×ÏÈ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÐÔÄܲâÊÔ³õ²½Á˽âÁ½¸ö¿ò¼ÜµÄÐÔÄÜÇø±ð£¬ËüÃǶ¼¿ÉÒÔ»ùÓÚÄÚ´æ¼ÆËã¿ò¼Ü½øÐÐʵʱ¼ÆË㣬ËùÒÔ¶¼ÓµÓзdz£ºÃµÄ¼ÆËãÐÔÄÜ¡£¾¹ý²âÊÔ£¬Flink¼ÆËãÐÔÄÜÉÏÂԺá£
²âÊÔ»·¾³£º
1.CPU£º7000¸ö£»
2.Äڴ棺µ¥»ú128GB£»
3.°æ±¾£ºHadoop 2.3.0£¬Spark 1.4£¬Flink 0.9
4.Êý¾Ý£º800MB£¬8GB£¬8TB£»
5.Ëã·¨£ºK-means£ºÒÔ¿Õ¼äÖÐK¸öµãΪÖÐÐĽøÐоÛÀ࣬¶Ô×î¿¿½üËüÃǵĶÔÏó¹éÀࡣͨ¹ýµü´úµÄ·½·¨£¬Öð´Î¸üи÷¾ÛÀàÖÐÐĵÄÖµ£¬Ö±ÖÁµÃµ½×îºÃµÄ¾ÛÀà½á¹û¡£
6.µü´ú£ºK=10£¬3×éÊý¾Ý

µü´ú´ÎÊý£¨×Ý×ø±êÊÇÃ룬ºá×ø±êÊÇ´ÎÊý£©
SparkºÍFlinkÈ«²¿¶¼ÔËÐÐÔÚHadoop YARNÉÏ£¬ÐÔÄÜΪFlink > Spark
> Hadoop(MR)£¬µü´ú´ÎÊýÔ½¶àÔ½Ã÷ÏÔ£¬ÐÔÄÜÉÏ£¬FlinkÓÅÓÚSparkºÍHadoop×îÖ÷ÒªµÄÔÒòÊÇFlinkÖ§³ÖÔöÁ¿µü´ú£¬¾ßÓжԵü´ú×Ô¶¯ÓÅ»¯µÄ¹¦ÄÜ¡£
2. Flink¼ò½é
ºÜ¶àÈË¿ÉÄܶ¼ÊÇÔÚ 2015 Äê²ÅÌýµ½ Flink Õâ¸ö´Ê£¬ÆäʵÔçÔÚ 2008 Ä꣬Flink µÄǰÉíÒѾÊǰØÁÖÀí¹¤´óѧһ¸öÑо¿ÐÔÏîÄ¿£¬
ÔÚ 2014 ±» Apache ·õ»¯Æ÷Ëù½ÓÊÜ£¬È»ºóѸËٵسÉΪÁË ASF£¨Apache Software
Foundation£©µÄ¶¥¼¶ÏîĿ֮һ¡£Flink µÄ×îа汾ĿǰÒѾ¸üе½ÁË 0.10.0 ÁË£¬ÔںܶàÈ˸п®
Spark µÄ¿ìËÙ·¢Õ¹µÄͬʱ£¬»òÐíÎÒÃÇÒ²¸ÃΪ Flink µÄ·¢Õ¹Ëٶȵã¸öÔÞ¡£
Flink ÊÇÒ»¸öÕë¶ÔÁ÷Êý¾ÝºÍÅúÊý¾ÝµÄ·Ö²¼Ê½´¦ÀíÒýÇæ¡£ËüÖ÷ÒªÊÇÓÉ Java ´úÂëʵÏÖ¡£Ä¿Ç°Ö÷Òª»¹ÊÇÒÀ¿¿¿ªÔ´ÉçÇøµÄ¹±Ï×¶ø·¢Õ¹¡£¶Ô
Flink ¶øÑÔ£¬ÆäËùÒª´¦ÀíµÄÖ÷Òª³¡¾°¾ÍÊÇÁ÷Êý¾Ý£¬ÅúÊý¾ÝÖ»ÊÇÁ÷Êý¾ÝµÄÒ»¸ö¼«ÏÞÌØÀý¶øÒÑ¡£ÔÙ»»¾ä»°Ëµ£¬Flink
»á°ÑËùÓÐÈÎÎñµ±³ÉÁ÷À´´¦Àí£¬ÕâÒ²ÊÇÆä×î´óµÄÌØµã¡£
Flink ¿ÉÒÔÖ§³Ö±¾µØµÄ¿ìËÙµü´ú£¬ÒÔ¼°Ò»Ð©»·Ðεĵü´úÈÎÎñ¡£²¢ÇÒ Flink ¿ÉÒÔ¶¨ÖÆ»¯ÄÚ´æ¹ÜÀí¡£ÔÚÕâµã£¬Èç¹ûÒª¶Ô±È
Flink ºÍ Spark µÄ»°£¬Flink ²¢Ã»Óн«ÄÚ´æÍêÈ«½»¸øÓ¦Óò㡣ÕâÒ²ÊÇΪʲô Spark
Ïà¶ÔÓÚ Flink£¬¸üÈÝÒ׳öÏÖ OOM µÄÔÒò£¨out of memory£©¡£¾Í¿ò¼Ü±¾ÉíÓëÓ¦Óó¡¾°À´Ëµ£¬Flink
¸üÏàËÆÓë Storm¡£Èç¹û֮ǰÁ˽â¹ý Storm »òÕß Flume µÄ¶ÁÕߣ¬¿ÉÄÜ»á¸üÈÝÒ×Àí½â Flink
µÄ¼Ü¹¹ºÍºÜ¶à¸ÅÄî¡£ÏÂÃæÈÃÎÒÃÇÏÈÀ´¿´Ï Flink µÄ¼Ü¹¹Í¼¡£

ÎÒÃÇ¿ÉÒÔÁ˽⵽ Flink ¼¸¸ö×î»ù´¡µÄ¸ÅÄClient¡¢JobManager ºÍ TaskManager¡£Client
ÓÃÀ´Ìá½»ÈÎÎñ¸ø JobManager£¬JobManager ·Ö·¢ÈÎÎñ¸ø TaskManager È¥Ö´ÐУ¬È»ºó
TaskManager »áÐÄÌøµÄ»ã±¨ÈÎÎñ״̬¡£¿´µ½ÕâÀÓеÄÈËÓ¦¸ÃÒѾÓÐÖֻص½ Hadoop Ò»´úµÄ´í¾õ¡£È·Êµ£¬´Ó¼Ü¹¹Í¼È¥¿´£¬JobManager
ºÜÏñµ±ÄêµÄ JobTracker£¬TaskManager Ò²ºÜÏñµ±ÄêµÄ TaskTracker¡£È»¶øÓÐÒ»¸ö×îÖØÒªµÄÇø±ð¾ÍÊÇ
TaskManager Ö®¼äÊÇÊÇÁ÷£¨Stream£©¡£Æä´Î£¬Hadoop Ò»´úÖУ¬Ö»ÓÐ Map ºÍ Reduce
Ö®¼äµÄ Shuffle£¬¶ø¶Ô Flink ¶øÑÔ£¬¿ÉÄÜÊǺܶ༶£¬²¢ÇÒÔÚ TaskManager ÄÚ²¿ºÍ
TaskManager Ö®¼ä¶¼»áÓÐÊý¾Ý´«µÝ£¬¶ø²»Ïñ Hadoop£¬Êǹ̶¨µÄ Map µ½ Reduce¡£
3. ¼¼ÊõµÄÌØµã£¨¿ÉÑ¡£©
¹ØÓÚFlinkËùÖ§³ÖµÄÌØÐÔ£¬ÎÒÕâÀïÖ»ÊÇͨ¹ý·ÖÀàµÄ·½Ê½¼òµ¥×öÒ»ÏÂÊáÀí£¬Éæ¼°µ½¾ßÌåµÄһЩ¸ÅÄî¼°ÆäÔÀí»áÔÚºóÃæµÄ²¿·Ö×öÏêϸ˵Ã÷¡£
3.1. Á÷´¦ÀíÌØÐÔ
Ö§³Ö¸ßÍÌÍ¡¢µÍÑÓ³Ù¡¢¸ßÐÔÄܵÄÁ÷´¦Àí
Ö§³Ö´øÓÐʼþʱ¼äµÄ´°¿Ú£¨Window£©²Ù×÷
Ö§³ÖÓÐ״̬¼ÆËãµÄExactly-onceÓïÒå
Ö§³Ö¸ß¶ÈÁé»îµÄ´°¿Ú£¨Window£©²Ù×÷£¬Ö§³Ö»ùÓÚtime¡¢count¡¢session£¬ÒÔ¼°data-drivenµÄ´°¿Ú²Ù×÷
Ö§³Ö¾ßÓÐBackpressure¹¦ÄܵijÖÐøÁ÷Ä£ÐÍ
Ö§³Ö»ùÓÚÇáÁ¿¼¶·Ö²¼Ê½¿ìÕÕ£¨Snapshot£©ÊµÏÖµÄÈÝ´í
Ò»¸öÔËÐÐʱͬʱ֧³ÖBatch on Streaming´¦ÀíºÍStreaming´¦Àí
FlinkÔÚJVMÄÚ²¿ÊµÏÖÁË×Ô¼ºµÄÄÚ´æ¹ÜÀí
Ö§³Öµü´ú¼ÆËã
Ö§³Ö³ÌÐò×Ô¶¯ÓÅ»¯£º±ÜÃâÌØ¶¨Çé¿öÏÂShuffle¡¢ÅÅÐòµÈ°º¹ó²Ù×÷£¬Öмä½á¹ûÓбØÒª½øÐлº´æ
3.2. APIÖ§³Ö
¶ÔStreamingÊý¾ÝÀàÓ¦Óã¬ÌṩDataStream API
¶ÔÅú´¦ÀíÀàÓ¦Óã¬ÌṩDataSet API£¨Ö§³ÖJava/Scala£©
3.3. LibrariesÖ§³Ö
Ö§³Ö»úÆ÷ѧϰ£¨FlinkML£©
Ö§³Öͼ·ÖÎö£¨Gelly£©
Ö§³Ö¹ØÏµÊý¾Ý´¦Àí£¨Table£©
Ö§³Ö¸´ÔÓʼþ´¦Àí£¨CEP£©
3.4. ÕûºÏÖ§³Ö
Ö§³ÖFlink on YARN
Ö§³ÖHDFS
Ö§³ÖÀ´×ÔKafkaµÄÊäÈëÊý¾Ý
Ö§³ÖApache HBase
Ö§³ÖHadoop³ÌÐò
Ö§³ÖTachyon
Ö§³ÖElasticSearch
Ö§³ÖRabbitMQ
Ö§³ÖApache Storm
Ö§³ÖS3
Ö§³ÖXtreemFS
3.5. FlinkÉú̬Ȧ
Ò»¸ö¼ÆËã¿ò¼ÜÒªÓг¤Ô¶µÄ·¢Õ¹£¬±ØÐë´òÔìÒ»¸öÍêÕûµÄ Stack¡£²»È»¾Í¸úÖ½ÉÏ̸±øÒ»Ñù£¬Ã»ÓÐÈκÎÒâÒå¡£Ö»ÓÐÉϲãÓÐÁ˾ßÌåµÄÓ¦Ó㬲¢Äܺܺõķ¢»Ó¼ÆËã¿ò¼Ü±¾ÉíµÄÓÅÊÆ£¬ÄÇôÕâ¸ö¼ÆËã¿ò¼Ü²ÅÄÜÎüÒý¸ü¶àµÄ×ÊÔ´£¬²Å»á¸ü¿ìµÄ½ø²½¡£ËùÒÔ
Flink Ò²ÔÚŬÁ¦¹¹½¨×Ô¼ºµÄ Stack¡£
Flink Ê×ÏÈÖ§³ÖÁË Scala ºÍ Java µÄ API£¬Python Ò²ÕýÔÚ²âÊÔÖС£Flink
ͨ¹ý Gelly Ö§³ÖÁËͼ²Ù×÷£¬»¹ÓлúÆ÷ѧϰµÄ FlinkML¡£Table ÊÇÒ»ÖÖ½Ó¿Ú»¯µÄ SQL
Ö§³Ö£¬Ò²¾ÍÊÇ API Ö§³Ö£¬¶ø²»ÊÇÎı¾»¯µÄ SQL ½âÎöºÍÖ´ÐС£¶ÔÓÚÍêÕûµÄ Stack ÎÒÃÇ¿ÉÒԲο¼ÏÂͼ¡£

Flink ΪÁ˸ü¹ã·ºµÄÖ§³Ö´óÊý¾ÝµÄÉú̬Ȧ£¬ÆäÏÂҲʵÏÖÁ˺ܶà Connector µÄ×ÓÏîÄ¿¡£×îÊìϤµÄ£¬µ±È»¾ÍÊÇÓë
Hadoop HDFS ¼¯³É¡£Æä´Î£¬Flink Ò²Ðû²¼Ö§³ÖÁË Tachyon¡¢S3 ÒÔ¼° MapRFS¡£²»¹ý¶ÔÓÚ
Tachyon ÒÔ¼° S3 µÄÖ§³Ö£¬¶¼ÊÇͨ¹ý Hadoop HDFS Õâ²ã°üװʵÏֵģ¬Ò²¾ÍÊÇ˵ҪʹÓÃ
Tachyon ºÍ S3£¬¾Í±ØÐëÓÐ Hadoop£¬¶øÇÒÒª¸ü¸Ä Hadoop µÄÅäÖã¨core-site.xml£©¡£Èç¹ûä¯ÀÀ
Flink µÄ´úÂëĿ¼£¬ÎÒÃǾͻῴµ½¸ü¶à Connector ÏîÄ¿£¬ÀýÈç Flume ºÍ Kafka¡£
4. °²×°
Flink ÓÐÈýÖÖ²¿Êðģʽ£¬·Ö±ðÊÇ Local¡¢Standalone Cluster ºÍ Yarn
Cluster¡£
4.1. Localģʽ
¶ÔÓÚ Local ģʽÀ´Ëµ£¬JobManager ºÍ TaskManager »á¹«ÓÃÒ»¸ö JVM À´Íê³É
Workload¡£Èç¹ûÒªÑéÖ¤Ò»¸ö¼òµ¥µÄÓ¦Óã¬Local ģʽÊÇ×î·½±ãµÄ¡£Êµ¼ÊÓ¦ÓÃÖдó¶àʹÓà Standalone
»òÕß Yarn Cluster£¬¶ølocalģʽֻÊǽ«°²×°°ü½âѹÆô¶¯£¨./bin/start-local.sh£©¼´¿É£¬ÔÚÕâÀï²»ÔÚÑÝʾ¡£
4.2. Standalone ģʽ
4.2.1. ÏÂÔØ
°²×°°üÏÂÔØµØÖ·£ºhttp://flink.apache.org/downloads.html
¿ìËÙÈëÃŽ̵̳ØÖ·£º
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html


4.2.2. ÉÏ´«°²×°°üµ½linuxϵͳ
ʹÓÃrzÃüÁî
4.2.3. ½âѹ
tar ¨Czxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz
4.2.4. ÖØÃüÃû
mv flink-1.3.2 flink
4.2.5. Ð޸Ļ·¾³±äÁ¿
Çл»µ½rootÓû§ÅäÖÃ
export FLINK_HOME=/home/hadoop/flink
export PATH=$PATH:$FLINK_HOME/bin
ÅäÖýáÊøºóÇл»»áÆÕͨÓû§
source /etc/profile |
4.2.6. ÐÞ¸ÄÅäÖÃÎļþ
ÐÞ¸Äflink/conf/masters
master1:8081
ÐÞ¸Äflink/conf/slaves
master1ha
master2
master2ha
ÐÞ¸Äflink/conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 2
jobmanager.rpc.address: master1 |
4.2.7. Æô¶¯flink
/home/Hadoop/flink/bin/start-cluster.sh

4.2.8. Flink µÄ Rest API
Flink ºÍÆäËû´ó¶à¿ªÔ´µÄ¿ò¼ÜÒ»Ñù£¬ÌṩÁ˺ܶàÓÐÓÃµÄ Rest API¡£²»¹ý Flink µÄ RestAPI£¬Ä¿Ç°»¹²»ÊǺÜÇ¿´ó£¬Ö»ÄÜÖ§³ÖһЩ
Monitor µÄ¹¦ÄÜ¡£Flink Dashboard ±¾ÉíÒ²ÊÇͨ¹ýÆä Rest À´²éѯ¸÷ÏîµÄ½á¹ûÊý¾Ý¡£ÔÚ
Flink RestAPI »ù´¡ÉÏ£¬¿ÉÒԱȽÏÈÝÒ׵Ľ« Flink µÄ Monitor ¹¦ÄÜºÍÆäËûµÚÈý·½¹¤¾ßÏ༯³É£¬ÕâÒ²ÊÇÆäÉè¼ÆµÄ³õÖÔ¡£
ÔÚ Flink µÄ½ø³ÌÖУ¬ÊÇÓÉ JobManager À´Ìṩ Rest API µÄ·þÎñ¡£Òò´ËÔÚµ÷ÓÃ
Rest ֮ǰ£¬ÒªÈ·¶¨ JobManager ÊÇ·ñ´¦ÓÚÕý³£µÄ״̬¡£Õý³£Çé¿öÏ£¬ÔÚ·¢ËÍÒ»¸ö Rest
ÇëÇó¸ø JobManager Ö®ºó£¬Client ¾Í»áÊÕµ½Ò»¸ö JSON ¸ñʽµÄ·µ»Ø½á¹û¡£ÓÉÓÚĿǰ
Rest ÌṩµÄ¹¦ÄÜ»¹²»¶à£¬ÐèÒªÔöÇ¿Õâ¿é¹¦ÄܵĶÁÕß¿ÉÒÔÔÚ×ÓÏîÄ¿ flink-runtime-web
ÖÐÕÒµ½¶ÔÓ¦µÄ´úÂë¡£ÆäÖÐ×î¹Ø¼üÒ»¸öÀà WebRuntimeMonitor£¬¾ÍÊÇÓÃÀ´¶ÔËùÓÐµÄ Rest
ÇëÇó×ö·ÖÁ÷µÄ£¬Èç¹ûÐèÒªÌí¼ÓÒ»¸öÐÂÀàÐ͵ÄÇëÇ󣬾ÍÐèÒªÔÚÕâÀïÔö¼Ó¶ÔÓ¦µÄ´¦Àí´úÂë¡£ÏÂÃæÎÒÀý¾Ù¼¸¸ö³£Óà Rest
API¡£
1.²éѯ Flink ¼¯ÈºµÄ»ù±¾ÐÅÏ¢: /overview¡£Ê¾ÀýÃüÁîÐиñʽÒÔ¼°·µ»Ø½á¹ûÈçÏ£º
$ curl http://localhost:8081/overview
{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0} |
2.²éѯµ±Ç° Flink ¼¯ÈºÖÐµÄ Job ÐÅÏ¢£º/jobs¡£Ê¾ÀýÃüÁîÐиñʽÒÔ¼°·µ»Ø½á¹ûÈçÏ£º
$ curl http://localhost:8081/jobs
{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]} |
3.²éѯһ¸öÖ¸¶¨µÄ Job ÐÅÏ¢: /jobs/jobid¡£Õâ¸ö²éѯµÄ½á¹û»á·µ»ØÌرð¶àµÄÏêϸµÄÄÚÈÝ£¬ÕâÊÇÎÒÔÚä¯ÀÀÆ÷ÖнøÐеIJâÊÔ£¬ÈçÏÂͼ£º

ÏëÒªÁ˽â¸ü¶à Rest ÇëÇóÄÚÈݵĶÁÕߣ¬¿ÉÒÔÈ¥ Apache Flink µÄÒ³ÃæÖвéÕÒ¡£
4.2.9. ÔËÐвâÊÔÈÎÎñ
./bin/flink
run -m master1:8082 ./examples/batch/WordCount.jar
--input hdfs://master1:9000/words.txt --output
hdfs://master1:9000/clinkout |


4.3. Flink µÄ HA
Ê×ÏÈ£¬ÎÒÃÇÐèÒªÖªµÀ Flink ÓÐÁ½ÖÖ²¿ÊðµÄģʽ£¬·Ö±ðÊÇ Standalone ÒÔ¼° Yarn Cluster
ģʽ¡£¶ÔÓÚ Standalone À´Ëµ£¬Flink ±ØÐëÒÀÀµÓÚ Zookeeper À´ÊµÏÖ JobManager
µÄ HA£¨Zookeeper ÒѾ³ÉΪÁ˴󲿷ֿªÔ´¿ò¼Ü HA ±Ø²»¿ÉÉÙµÄÄ£¿é£©¡£ÔÚ Zookeeper
µÄ°ïÖúÏ£¬Ò»¸ö Standalone µÄ Flink ¼¯Èº»áͬʱÓжà¸ö»î×ÅµÄ JobManager£¬ÆäÖÐÖ»ÓÐÒ»¸ö´¦ÓÚ¹¤×÷״̬£¬ÆäËû´¦ÓÚ
Standby ״̬¡£µ±¹¤×÷ÖÐµÄ JobManager ʧȥÁ¬½Óºó£¨Èçå´»ú»ò Crash£©£¬Zookeeper
»á´Ó Standby ÖÐÑ¡¾ÙÐ嵀 JobManager À´½Ó¹Ü Flink ¼¯Èº¡£
¶ÔÓÚ Yarn Cluaster ģʽÀ´Ëµ£¬Flink ¾ÍÒªÒÀ¿¿ Yarn ±¾ÉíÀ´¶Ô JobManager
×ö HA ÁË¡£ÆäʵÕâÀïÍêÈ«ÊÇ Yarn µÄ»úÖÆ¡£¶ÔÓÚ Yarn Cluster ģʽÀ´Ëµ£¬JobManager
ºÍ TaskManager ¶¼ÊDZ» Yarn Æô¶¯ÔÚ Yarn µÄ Container ÖС£´ËʱµÄ
JobManager£¬ÆäʵӦ¸Ã³ÆÖ®Îª Flink Application Master¡£Ò²¾Í˵ËüµÄ¹ÊÕϻָ´£¬¾ÍÍêÈ«ÒÀ¿¿×Å
Yarn ÖÐµÄ ResourceManager£¨ºÍ MapReduce µÄ AppMaster Ò»Ñù£©¡£ÓÉÓÚÍêÈ«ÒÀÀµÁË
Yarn£¬Òò´Ë²»Í¬°æ±¾µÄ Yarn ¿ÉÄÜ»áÓÐϸ΢µÄ²îÒì¡£ÕâÀï²»ÔÙ×öÉ¡£
4.3.1. ÐÞ¸ÄÅäÖÃÎļþ
ÐÞ¸Äflink-conf.yaml
state.backend:
filesystem
state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://master1:9000/flink/ha/
high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181
high-availability.zookeeper.client.acl: open¡¡ |
ÐÞ¸Äconf
server.1=master1ha:2888:3888
server.2=master2:2888:3888
server.3=master2ha:2888:3888 |
ÐÞ¸Ämasters
master1:8082
master1ha:8082 |
ÐÞ¸Äslaves
master1ha
master2
master2ha |
4.3.2. Æô¶¯
/home/Hadoop/flink/bin/start-cluster.sh



4.4. Yarn Cluster ģʽ
4.4.1. ÒýÈë
ÔÚÒ»¸öÆóÒµÖУ¬ÎªÁË×î´ó»¯µÄÀûÓü¯Èº×ÊÔ´£¬Ò»°ã¶¼»áÔÚÒ»¸ö¼¯ÈºÖÐͬʱÔËÐжàÖÖÀàÐ굀 Workload¡£Òò´Ë
Flink Ò²Ö§³ÖÔÚ Yarn ÉÏÃæÔËÐС£Ê×ÏÈ£¬ÈÃÎÒÃÇͨ¹ýÏÂͼÁ˽âÏ Yarn ºÍ Flink µÄ¹ØÏµ¡£

ÔÚͼÖпÉÒÔ¿´³ö£¬Flink Óë Yarn µÄ¹ØÏµÓë MapReduce ºÍ Yarn µÄ¹ØÏµÊÇÒ»ÑùµÄ¡£Flink
ͨ¹ý Yarn µÄ½Ó¿ÚʵÏÖÁË×Ô¼ºµÄ App Master¡£µ±ÔÚ Yarn Öв¿ÊðÁË Flink£¬Yarn
¾Í»áÓÃ×Ô¼ºµÄ Container À´Æô¶¯ Flink µÄ JobManager£¨Ò²¾ÍÊÇ App Master£©ºÍ
TaskManager¡£
4.4.2. Ð޸Ļ·¾³±äÁ¿
export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop
4.4.3. ²¿ÊðÆô¶¯
yarn-session.sh -d -s 2 -tm 800 -n 2
ÉÏÃæµÄÃüÁîµÄÒâ˼ÊÇ£¬Í¬Ê±ÏòYarnÉêÇë3¸öcontainer£¬ÆäÖÐ 2 ¸ö Container Æô¶¯
TaskManager£¨-n 2£©£¬Ã¿¸ö TaskManager ÓµÓÐÁ½¸ö Task Slot£¨-s
2£©£¬²¢ÇÒÏòÿ¸ö TaskManager µÄ Container ÉêÇë 800M µÄÄڴ棬ÒÔ¼°Ò»¸öApplicationMaster£¨Job
Manager£©¡£
Flink²¿Êðµ½Yarn Clusterºó£¬»áÏÔʾJob ManagerµÄÁ¬½Óϸ½ÚÐÅÏ¢¡£
Flink on Yarn»á¸²¸ÇÏÂÃæ¼¸¸ö²ÎÊý£¬Èç¹û²»Ï£Íû¸Ä±äÅäÖÃÎļþÖеIJÎÊý£¬¿ÉÒÔ¶¯Ì¬µÄͨ¹ý-DÑ¡ÏîÖ¸¶¨£¬Èç
-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
jobmanager.rpc.address£ºÒòΪJobManager»á¾³£·ÖÅäµ½²»Í¬µÄ»úÆ÷ÉÏ
taskmanager.tmp.dirs£ºÊ¹ÓÃYarnÌṩµÄtmpĿ¼
parallelism.default£ºÈç¹ûÓÐÖ¸¶¨slot¸öÊýµÄÇé¿öÏÂ
yarn-session.sh»á¹ÒÆð½ø³Ì£¬ËùÒÔ¿ÉÒÔͨ¹ýÔÚÖÕ¶ËʹÓÃCTRL+C»òÊäÈëstopÍ£Ö¹yarn-session¡£
Èç¹û²»Ï£ÍûFlink Yarn client³¤ÆÚÔËÐУ¬FlinkÌṩÁËÒ»ÖÖdetached YARN
session£¬Æô¶¯Ê±ºò¼ÓÉϲÎÊý-d»ò¡ªdetached
ÔÚÉÏÃæµÄÃüÁî³É¹¦ºó£¬ÎÒÃǾͿÉÒÔÔÚ Yarn Application Ò³Ãæ¿´µ½ Flink µÄ¼Í¼¡£ÈçÏÂͼ¡£

Èç¹ûÔÚÐéÄâ»úÖвâÊÔ£¬¿ÉÄÜ»áÓöµ½´íÎó¡£ÕâÀïÐèҪעÒâÄÚ´æµÄ´óС£¬Flink Ïò Yarn »áÉêÇë¶à¸ö
Container£¬µ«ÊÇ Yarn µÄÅäÖÿÉÄÜÏÞÖÆÁË Container ËùÄÜÉêÇëµÄÄÚ´æ´óС£¬ÉõÖÁ
Yarn ±¾ÉíËù¹ÜÀíµÄÄÚ´æ¾ÍºÜС¡£ÕâÑùºÜ¿ÉÄÜÎÞ·¨Õý³£Æô¶¯ TaskManager£¬ÓÈÆäµ±Ö¸¶¨¶à¸ö TaskManager
µÄʱºò¡£Òò´Ë£¬ÔÚÆô¶¯ Flink Ö®ºó£¬ÐèҪȥ Flink µÄÒ³ÃæÖмì²éÏ Flink µÄ״̬¡£ÕâÀï¿ÉÒÔ´Ó
RM µÄÒ³ÃæÖУ¬Ö±½ÓÌø×ª£¨µã»÷ Tracking UI£©¡£Õâʱºò Flink µÄÒ³ÃæÈçͼ

yarn-session.shÆô¶¯ÃüÁî²ÎÊýÈçÏ£º
Usage:
Required
-n,--container <arg> Number of YARN container
to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for
JobManager Container [in MB]
-nm,--name Set a custom name for the application
on YARN
-q,--query Display available YARN resources (memory,
cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-st,--streaming Start Flink in streaming mode
-tm,--taskManagerMemory <arg> Memory per
TaskManager Container [in MB] |
4.4.4. Ìá½»ÈÎÎñ
Ö®ºó£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÕâÖÖ·½Ê½Ìá½»ÎÒÃǵÄÈÎÎñ
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
ÒÔÉÏÃüÁîÔÚ²ÎÊýǰ¼ÓÉÏyǰ׺£¬-yn±íʾTaskManager¸öÊý¡£
ÔÚÕâ¸öģʽÏ£¬Í¬Ñù¿ÉÒÔʹÓÃ-m yarn-clusterÌá½»Ò»¸ö"ÔËÐк󼴷Ù"µÄdetached
yarn£¨-yd£©×÷Òµµ½yarn cluster¡£

4.4.5. ֹͣyarn cluster
yarn application -kill application_1507603745315_0001
5. ¼¼ÊõµÄʹÓÃ
5.1. Flink¿ª·¢±ê×¼Á÷³Ì
»ñÈ¡execution environment,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
¼ÓÔØ/´´½¨³õʼ»¯Êý¾Ý
DataStream<String> text = env.readTextFile("file:///path/to/file");
Ö¸¶¨ transformations ×÷ÓÃÔÚÊý¾ÝÉÏ
val mapped = input.map { x => x.toInt }
´æ´¢½á¹û¼¯
writeAsText(String path)
print()
´¥·¢³ÌÐòÖ´ÐÐ
ÔÚlocalģʽÏÂÖ´ÐгÌÐò
execute()
½«³ÌÐò´ï³ÉjarÔËÐÐÔÚÏßÉÏ
./bin/flink run \
-m master1:8082 \
./examples/batch/WordCount.jar \
--input hdfs://master1:9000/words.txt \
--output hdfs://master1:9000/clinkout \
5.2. Wordcount
5.2.1. Scala´úÂë
object SocketWindowWordCount
{
def main(args: Array[String]) : Unit = {
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please
run ¡®SocketWindowWordCount --port <port>¡®")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("localhost",
port, ¡®\n¡®)
// parse the data, group it, window it, and aggregate
the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather
than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count:
Long)
} |
5.2.2. Java´úÂë
public class
SocketWindowWordCount {
public static void main(String[] args) throws
Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please
run ¡®SocketWindowWordCount --port <port>¡®");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost",
port, "\n");
// parse the data, group it, window it, and aggregate
the counts
DataStream<WordWithCount> windowCounts =
text
.flatMap(new FlatMapFunction<String, WordWithCount>()
{
@Override
public void flatMap(String value, Collector<WordWithCount>
out) {
for (String word : value.split("\\s"))
{
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>()
{
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount
b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather
than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count)
{
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}¡¡ |
5.2.3. ÔËÐÐ
l Æô¶¯nc·¢ËÍÏûÏ¢
$ nc -l 9000
l Æô¶¯flink³ÌÐò
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar
--port 9000



5.2.4. ²âÊÔ
l ÊäÈë
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye |
l Êä³ö
$ tail -f log/flink-*-jobmanager-*.out
lorem : 1
bye : 1
ipsum : 4 |
5.3. ʹÓÃIDEA¿ª·¢ÀëÏß³ÌÐò
DatasetÊÇflinkµÄ³£ÓóÌÐò£¬Êý¾Ý¼¯Í¨¹ýsource½øÐгõʼ»¯£¬ÀýÈç¶ÁÈ¡Îļþ»òÕßÐòÁл¯¼¯ºÏ£¬È»ºóͨ¹ýtransformation£¨filtering¡¢mapping¡¢joining¡¢grouping£©½«Êý¾Ý¼¯×ª³É£¬È»ºóͨ¹ýsink½øÐд洢£¬¼È¿ÉÒÔдÈëhdfsÕâÖÖ·Ö²¼Ê½Îļþϵͳ£¬Ò²¿ÉÒÔ´òÓ¡¿ØÖÆÌ¨£¬flink¿ÉÒÔÓкܶàÖÖÔËÐз½Ê½£¬Èçlocal¡¢flink¼¯Èº¡¢yarnµÈ
5.3.1. Pom
n Java
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding> <scala.version>2.10.2</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<hadoop.version>2.6.2</hadoop.version>
<flink.version>1.3.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade. resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>¡¡ |
n Scala
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding> <scala.version>2.10.2</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<hadoop.version>2.6.2</hadoop.version>
<flink.version>1.3.2</flink.version>
</properties> <dependencies>
<dependency> <groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>${flink.version}</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId>
<version>${flink.version}</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency> <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency> <dependency>
<groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version> </dependency>
<dependency> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version> </dependency>
</dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins> <plugin> <groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version> <executions>
<execution> <goals> <goal>compile</goal>
<goal>testCompile</goal> </goals>
<configuration> <args> <arg>-make:transitive</arg>
<arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg>
</args> </configuration> </execution>
</executions> </plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version> <configuration>
<useFile>false</useFile> <disableXmlReport>true</disableXmlReport>
<includes> <include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes> </configuration>
</plugin> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version> <executions>
<execution> <phase>package</phase>
<goals> <goal>shade</goal>
</goals> <configuration> <filters>
<filter> <artifact>*:*</artifact>
<excludes> <exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes> </filter> </filters>
<transformers> <transformer implementation= "org.apache.maven.plugins.shade.resource. ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer> </transformers>
</configuration> </execution>
</executions> </plugin> </plugins>
</build> |
5.3.2. ³ÌÐò
n Java
public class
WordCountExample {
public static void main(String[] args) throws
Exception {
//¹¹½¨»·¾³
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//ͨ¹ý×Ö·û´®¹¹½¨Êý¾Ý¼¯
DataSet<String> text = env.fromElements(
"Who¡®s there?", "I think I
hear them. Stand, ho! Who¡®s there?");
//·Ö¸î×Ö·û´®¡¢°´ÕÕkey½øÐзÖ×顢ͳ¼ÆÏàͬµÄkey¸öÊý
DataSet<Tuple2<String, Integer>> wordCounts
= text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//´òÓ¡
wordCounts.print();
}
//·Ö¸î×Ö·û´®µÄ·½·¨
public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String,
Integer>> out) {
for (String word : line.split(" "))
{
out.collect(new Tuple2<String, Integer>(word,
1));
}
}
}
} |
n Scala
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]) {
//³õʼ»¯»·¾³
val env = ExecutionEnvironment.getExecutionEnvironment
//´Ó×Ö·û´®ÖмÓÔØÊý¾Ý
val text = env.fromElements(
"Who¡®s there?",
"I think I hear them. Stand, ho! Who¡®s
there?")
//·Ö¸î×Ö·û´®¡¢»ã×Ütuple¡¢°´ÕÕkey½øÐзÖ×顢ͳ¼Æ·Ö×éºóword¸öÊý
val counts = text.flatMap { _.toLowerCase.split("\\W+")
filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//´òÓ¡
counts.print()
}
} |
5.3.3. ÔËÐÐ
n ±¾µØ
Ö±½Órunas¼´¿É
n ÏßÉÏ
1¡¢ ´ò°ü
2¡¢ ÉÏ´«
3¡¢ Ö´ÐÐÃüÁflink run -m master1:8082 -c org.apache.flink.WordCount
original-Flink-1.0-SNAPSHOT.jar |