±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚÔÆÆÜÉçÇø,Apache
FlinkÊÇÒ»¸öÃæÏò·Ö²¼Ê½Êý¾ÝÁ÷´¦ÀíºÍÅúÁ¿Êý¾Ý´¦ÀíµÄ¿ªÔ´¼ÆËãÆ½Ì¨. |
|
Apache FlinkÄܹ»»ùÓÚͬһ¸öFlinkÔËÐÐʱ£¬Ìṩ֧³ÖÁ÷´¦ÀíºÍÅú´¦ÀíÁ½ÖÖÀàÐÍÓ¦ÓõŦÄÜ¡£
ÏÖÓеĿªÔ´¼ÆËã·½°¸£¬»á°ÑÁ÷´¦ÀíºÍÅú´¦Àí×÷ΪÁ½ÖÖ²»Í¬µÄÓ¦ÓÃÀàÐÍ£¬ÒòΪËüÃÇËùÌṩµÄSLA£¨Service-Level-Aggreement£©ÊÇÍêÈ«²»ÏàͬµÄ£ºÁ÷´¦ÀíÒ»°ãÐèÒªÖ§³ÖµÍÑÓ³Ù¡¢Exactly-once±£Ö¤£¬¶øÅú´¦ÀíÐèÒªÖ§³Ö¸ßÍÌÍ¡¢¸ßЧ´¦Àí¡£
Flink´ÓÁíÒ»¸öÊӽǿ´´ýÁ÷´¦ÀíºÍÅú´¦Àí£¬½«¶þÕßͳһÆðÀ´£ºFlinkÊÇÍêȫ֧³ÖÁ÷´¦Àí£¬Ò²¾ÍÊÇ˵×÷ΪÁ÷´¦Àí¿´´ýʱÊäÈëÊý¾ÝÁ÷ÊÇÎÞ½çµÄ£»Åú´¦Àí±»×÷ΪһÖÖÌØÊâµÄÁ÷´¦Àí£¬Ö»ÊÇËüµÄÊäÈëÊý¾ÝÁ÷±»¶¨ÒåΪÓнçµÄ¡£
FlinkÁ÷´¦ÀíÌØÐÔ£º
Ö§³Ö¸ßÍÌÍ¡¢µÍÑÓ³Ù¡¢¸ßÐÔÄܵÄÁ÷´¦Àí
Ö§³Ö´øÓÐʼþʱ¼äµÄ´°¿Ú£¨Window£©²Ù×÷
Ö§³ÖÓÐ״̬¼ÆËãµÄExactly-onceÓïÒå
Ö§³Ö¸ß¶ÈÁé»îµÄ´°¿Ú£¨Window£©²Ù×÷£¬Ö§³Ö»ùÓÚtime¡¢count¡¢session£¬ÒÔ¼°data-drivenµÄ´°¿Ú²Ù×÷
Ö§³Ö¾ßÓÐBackpressure¹¦ÄܵijÖÐøÁ÷Ä£ÐÍ
Ö§³Ö»ùÓÚÇáÁ¿¼¶·Ö²¼Ê½¿ìÕÕ£¨Snapshot£©ÊµÏÖµÄÈÝ´í
Ò»¸öÔËÐÐʱͬʱ֧³ÖBatch on Streaming´¦ÀíºÍStreaming´¦Àí
FlinkÔÚJVMÄÚ²¿ÊµÏÖÁË×Ô¼ºµÄÄÚ´æ¹ÜÀí
Ö§³Öµü´ú¼ÆËã
Ö§³Ö³ÌÐò×Ô¶¯ÓÅ»¯£º±ÜÃâÌØ¶¨Çé¿öÏÂShuffle¡¢ÅÅÐòµÈ°º¹ó²Ù×÷£¬Öмä½á¹ûÓбØÒª½øÐлº´æ
Ò»¡¢¼Ü¹¹
FlinkÒԲ㼶ʽϵͳÐÎʽ×é¼þÆäÈí¼þÕ»£¬²»Í¬²ãµÄÕ»½¨Á¢ÔÚÆäϲã»ù´¡ÉÏ£¬²¢ÇÒ¸÷²ã½ÓÊܳÌÐò²»Í¬²ãµÄ³éÏóÐÎʽ¡£

1.ÔËÐÐʱ²ãÒÔJobGraphÐÎʽ½ÓÊÕ³ÌÐò¡£JobGraph¼´ÎªÒ»¸öÒ»°ã»¯µÄ²¢ÐÐÊý¾ÝÁ÷ͼ£¨data
flow£©£¬ËüÓµÓÐÈÎÒâÊýÁ¿µÄTaskÀ´½ÓÊպͲúÉúdata stream¡£
2.DataStream APIºÍDataSet API¶¼»áʹÓõ¥¶À±àÒëµÄ´¦Àí·½Ê½Éú³ÉJobGraph¡£DataSet
APIʹÓÃoptimizerÀ´¾ö¶¨Õë¶Ô³ÌÐòµÄÓÅ»¯·½·¨£¬¶øDataStream APIÔòʹÓÃstream
builderÀ´Íê³É¸ÃÈÎÎñ¡£
3.ÔÚÖ´ÐÐJobGraphʱ£¬FlinkÌṩÁ˶àÖÖºòÑ¡²¿Êð·½°¸£¨Èçlocal£¬remote£¬YARNµÈ£©¡£
4.Flink¸½ËæÁËһЩ²úÉúDataSet»òDataStream
API³ÌÐòµÄµÄÀà¿âºÍAPI£º´¦ÀíÂß¼±í²éѯµÄTable£¬»úÆ÷ѧϰµÄFlinkML£¬Í¼Ïñ´¦ÀíµÄGelly£¬¸´ÔÓʼþ´¦ÀíµÄCEP¡£

¶þ¡¢ÔÀí
1. Á÷¡¢×ª»»¡¢²Ù×÷·û
Flink³ÌÐòÊÇÓÉStreamºÍTransformationÕâÁ½¸ö»ù±¾¹¹½¨¿é×é³É£¬ÆäÖÐStreamÊÇÒ»¸öÖмä½á¹ûÊý¾Ý£¬¶øTransformationÊÇÒ»¸ö²Ù×÷£¬Ëü¶ÔÒ»¸ö»ò¶à¸öÊäÈëStream½øÐмÆËã´¦Àí£¬Êä³öÒ»¸ö»ò¶à¸ö½á¹ûStream¡£

Flink³ÌÐò±»Ö´ÐеÄʱºò£¬Ëü»á±»Ó³ÉäΪStreaming Dataflow¡£Ò»¸öStreaming
DataflowÊÇÓÉÒ»×éStreamºÍTransformation Operator×é³É£¬ËüÀàËÆÓÚÒ»¸öDAGͼ£¬ÔÚÆô¶¯µÄʱºò´ÓÒ»¸ö»ò¶à¸öSource
Operator¿ªÊ¼£¬½áÊøÓÚÒ»¸ö»ò¶à¸öSink Operator¡£

2. ²¢ÐÐÊý¾ÝÁ÷
Ò»¸öStream¿ÉÒÔ±»·Ö³É¶à¸öStream·ÖÇø£¨Stream Partitions£©£¬Ò»¸öOperator¿ÉÒÔ±»·Ö³É¶à¸öOperator
Subtask£¬Ã¿Ò»¸öOperator SubtaskÊÇÔÚ²»Í¬µÄÏß³ÌÖжÀÁ¢Ö´Ðеġ£Ò»¸öOperatorµÄ²¢Ðжȣ¬µÈÓÚOperator
SubtaskµÄ¸öÊý£¬Ò»¸öStreamµÄ²¢ÐжÈ×ÜÊǵÈÓÚÉú³ÉËüµÄOperatorµÄ²¢Ðжȡ£

One-to-oneģʽ
±ÈÈç´ÓSource[1]µ½map()[1]£¬Ëü±£³ÖÁËSourceµÄ·ÖÇøÌØÐÔ£¨Partitioning£©ºÍ·ÖÇøÄÚÔªËØ´¦ÀíµÄÓÐÐòÐÔ£¬Ò²¾ÍÊÇ˵map()[1]µÄSubtask¿´µ½Êý¾ÝÁ÷ÖмǼµÄ˳Ðò£¬ÓëSource[1]Öп´µ½µÄ¼Ç¼˳ÐòÊÇÒ»Öµġ£
Redistributionģʽ
ÕâÖÖģʽ¸Ä±äÁËÊäÈëÊý¾ÝÁ÷µÄ·ÖÇø£¬±ÈÈç´Ómap()[1]¡¢map()[2]µ½keyBy()/window()/apply()[1]¡¢keyBy()/window()/apply()[2]£¬ÉÏÓεÄSubtaskÏòÏÂÓεĶà¸ö²»Í¬µÄSubtask·¢ËÍÊý¾Ý£¬¸Ä±äÁËÊý¾ÝÁ÷µÄ·ÖÇø£¬ÕâÓëʵ¼ÊÓ¦ÓÃËùÑ¡ÔñµÄOperatorÓйØÏµ¡£
3. ÈÎÎñ¡¢²Ù×÷·ûÁ´
Flink·Ö²¼Ê½Ö´Ðл·¾³ÖУ¬»á½«¶à¸öOperator Subtask´®ÆðÀ´×é³ÉÒ»¸öOperator
Chain£¬Êµ¼ÊÉϾÍÊÇÒ»¸öÖ´ÐÐÁ´£¬Ã¿¸öÖ´ÐÐÁ´»áÔÚTaskManagerÉÏÒ»¸ö¶ÀÁ¢µÄÏß³ÌÖÐÖ´ÐС£

4. ʱ¼ä
´¦ÀíStreamÖеļǼʱ£¬¼Ç¼ÖÐͨ³£»á°üº¬¸÷ÖÖµäÐ͵Äʱ¼ä×ֶΣº
Event Time£º±íʾʼþ´´½¨Ê±¼ä
Ingestion Time£º±íʾʼþ½øÈëµ½Flink DataflowµÄʱ¼ä
Processing Time£º±íʾij¸öOperator¶Ôʼþ½øÐд¦ÀíµÄ±¾µØÏµÍ³Ê±¼ä

FlinkʹÓÃWaterMarkºâÁ¿Ê±¼äµÄʱ¼ä£¬WaterMarkЯ´øÊ±¼ä´Át£¬²¢±»²åÈëµ½streamÖС£
1.WaterMarkµÄº¬ÒåÊÇËùÓÐʱ¼ät'< tµÄʼþ¶¼ÒѾ·¢Éú¡£
2.Õë¶ÔÂÒÐòµÄµÄÁ÷£¬WaterMarkÖÁ¹ØÖØÒª£¬ÕâÑù¿ÉÒÔÔÊÐíһЩʼþµ½´ïÑÓ³Ù£¬¶ø²»ÖÁÓÚ¹ýÓÚÓ°Ïìwindow´°¿ÚµÄ¼ÆËã¡£
3.²¢ÐÐÊý¾ÝÁ÷ÖУ¬µ±OperatorÓжà¸öÊäÈëÁ÷ʱ£¬OperatorµÄevent
timeÒÔ×îСÁ÷event timeΪ׼¡£

5. ´°¿Ú
FlinkÖ§³Ö»ùÓÚʱ¼ä´°¿Ú²Ù×÷£¬Ò²Ö§³Ö»ùÓÚÊý¾ÝµÄ´°¿Ú²Ù×÷£º

´°¿Ú·ÖÀࣺ
°´·Ö¸î±ê×¼»®·Ö£ºtimeWindow¡¢countWindow
°´´°¿ÚÐÐΪ»®·Ö£ºTumbling Window¡¢Sliding Window¡¢×Ô¶¨Òå´°¿Ú
Tumbling/Sliding Time Window
// Stream of
(sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling time window of 1 minute length
.timeWindow(Time.minutes(1))
// compute sum over carCnt
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding time window of 1 minute length and
30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1) |
Tumbling/Sliding Count Window
// Stream of (sensorId,
carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the carCnt sum
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and
10 elements trigger interval
.countWindow(100, 10)
.sum(1) |
×Ô¶¨Òå´°¿Ú
»ù±¾²Ù×÷£º
window£º´´½¨×Ô¶¨Òå´°¿Ú
trigger£º×Ô¶¨Òå´¥·¢Æ÷
evictor£º×Ô¶¨Òåevictor
apply£º×Ô¶¨Òåwindow function
6. ÈÝ´í
Barrier»úÖÆ£º
1.³öÏÖÒ»¸öBarrier£¬ÔÚ¸ÃBarrier֮ǰ³öÏֵļǼ¶¼ÊôÓÚ¸ÃBarrier¶ÔÓ¦µÄSnapshot£¬ÔÚ¸ÃBarrierÖ®ºó³öÏֵļǼÊôÓÚÏÂÒ»¸öSnapshot¡£
2.À´×Ô²»Í¬Snapshot¶à¸öBarrier¿ÉÄÜͬʱ³öÏÖÔÚÊý¾ÝÁ÷ÖУ¬Ò²¾ÍÊÇ˵ͬһ¸öʱ¿Ì¿ÉÄܲ¢·¢Éú³É¶à¸öSnapshot¡£
3.µ±Ò»¸öÖм䣨Intermediate£©Operator½ÓÊÕµ½Ò»¸öBarrierºó£¬Ëü»á·¢ËÍBarrierµ½ÊôÓÚ¸ÃBarrierµÄSnapshotµÄÊý¾ÝÁ÷ÖУ¬µÈµ½Sink
Operator½ÓÊÕµ½¸ÃBarrierºó»áÏòCheckpoint CoordinatorÈ·ÈϸÃSnapshot£¬Ö±µ½ËùÓеÄSink
Operator¶¼È·ÈÏÁ˸ÃSnapshot£¬²Å±»ÈÏΪÍê³ÉÁ˸ÃSnapshot¡£
¶ÔÆë£º
µ±Operator½ÓÊÕµ½¶à¸öÊäÈëµÄÊý¾ÝÁ÷ʱ£¬ÐèÒªÔÚSnapshot BarrierÖжÔÊý¾ÝÁ÷½øÐÐÅÅÁÐ¶ÔÆë£º
1.Operator´ÓÒ»¸öincoming Stream½ÓÊÕµ½Snapshot
Barrier n£¬È»ºóÔÝÍ£´¦Àí£¬Ö±µ½ÆäËüµÄincoming StreamµÄBarrier n£¨·ñÔòÊôÓÚ2¸öSnapshotµÄ¼Ç¼¾Í»ìÔÚÒ»ÆðÁË£©µ½´ï¸ÃOperator
2.½ÓÊÕµ½Barrier nµÄStream±»ÁÙʱ¸éÖã¬À´×ÔÕâЩStreamµÄ¼Ç¼²»»á±»´¦Àí£¬¶øÊDZ»·ÅÔÚÒ»¸öBufferÖС£
3.Ò»µ©×îºóÒ»¸öStream½ÓÊÕµ½Barrier n£¬Operator»áemitËùÓÐÔÝ´æÔÚBufferÖеļǼ£¬È»ºóÏòCheckpoint
Coordinator·¢ËÍSnapshot n¡£
4.¼ÌÐø´¦ÀíÀ´×Ô¶à¸öStreamµÄ¼Ç¼
»ùÓÚStream Aligning²Ù×÷Äܹ»ÊµÏÖExactly OnceÓïÒ壬µ«ÊÇÒ²»á¸øÁ÷´¦ÀíÓ¦ÓôøÀ´ÑÓ³Ù£¬ÒòΪΪÁËÅÅÁÐ¶ÔÆëBarrier£¬»áÔÝʱ»º´æÒ»²¿·ÖStreamµÄ¼Ç¼µ½BufferÖУ¬ÓÈÆäÊÇÔÚÊý¾ÝÁ÷²¢ÐжȺܸߵij¡¾°Ï¿ÉÄܸü¼ÓÃ÷ÏÔ£¬Í¨³£ÒÔ×î³Ù¶ÔÆëBarrierµÄÒ»¸öStreamΪ´¦ÀíBufferÖлº´æ¼Ç¼µÄʱ¿Ìµã¡£ÔÚFlinkÖУ¬ÌṩÁËÒ»¸ö¿ª¹Ø£¬Ñ¡ÔñÊÇ·ñʹÓÃStream
Aligning£¬Èç¹û¹ØµôÔòExactly Once»á±ä³ÉAt least once¡£
CheckPoint£º
Snapshot²¢²»½ö½öÊǶÔÊý¾ÝÁ÷×öÁËÒ»¸ö״̬µÄCheckpoint£¬ËüÒ²°üº¬ÁËÒ»¸öOperatorÄÚ²¿Ëù³ÖÓеÄ״̬£¬ÕâÑù²ÅÄܹ»ÔÚ±£Ö¤ÔÚÁ÷´¦Àíϵͳʧ°ÜʱÄܹ»ÕýÈ·µØ»Ö¸´Êý¾ÝÁ÷´¦Àí¡£×´Ì¬°üº¬Á½ÖÖ£º
1.ϵͳ״̬£ºÒ»¸öOperator½øÐмÆËã´¦ÀíµÄʱºòÐèÒª¶ÔÊý¾Ý½øÐлº³å£¬ËùÒÔÊý¾Ý»º³åÇøµÄ״̬ÊÇÓëOperatorÏà¹ØÁªµÄ¡£ÒÔ´°¿Ú²Ù×÷µÄ»º³åÇøÎªÀý£¬Flinkϵͳ»áÊÕ¼¯»ò¾ÛºÏ¼Ç¼Êý¾Ý²¢·Åµ½»º³åÇøÖУ¬Ö±µ½¸Ã»º³åÇøÖеÄÊý¾Ý±»´¦ÀíÍê³É¡£
2.Ò»ÖÖÊÇÓû§×Ô¶¨Òå״̬£¨×´Ì¬¿ÉÒÔͨ¹ýת»»º¯Êý½øÐд´½¨ºÍÐ޸ģ©£¬Ëü¿ÉÒÔÊǺ¯ÊýÖеÄJava¶ÔÏóÕâÑùµÄ¼òµ¥±äÁ¿£¬Ò²¿ÉÒÔÊÇÓ뺯ÊýÏà¹ØµÄKey/Value״̬¡£
7. µ÷¶È
ÔÚJobManager¶Ë£¬»á½ÓÊÕµ½ClientÌá½»µÄJobGraphÐÎʽµÄFlink Job£¬JobManager»á½«Ò»¸öJobGraphת»»Ó³ÉäΪһ¸öExecutionGraph£¬ExecutionGraphÊÇJobGraphµÄ²¢Ðбíʾ£¬Ò²¾ÍÊÇʵ¼ÊJobManagerµ÷¶ÈÒ»¸öJobÔÚTaskManagerÉÏÔËÐеÄÂß¼ÊÓͼ¡£
ÎïÀíÉϽøÐе÷¶È£¬»ùÓÚ×ÊÔ´µÄ·ÖÅäÓëʹÓõÄÒ»¸öÀý×Ó£º
1.×óÉÏ×Óͼ£ºÓÐ2¸öTaskManager£¬Ã¿¸öTaskManagerÓÐ3¸öTask
Slot
2.×óÏÂ×Óͼ£ºÒ»¸öFlink Job£¬Âß¼Éϰüº¬ÁË1¸ödata source¡¢1¸öMapFunction¡¢1¸öReduceFunction£¬¶ÔÓ¦Ò»¸öJobGraph
3.×óÏÂ×Óͼ£ºÓû§Ìá½»µÄFlink Job¶Ô¸÷¸öOperator½øÐеÄÅäÖ᪡ªdata
sourceµÄ²¢ÐжÈÉèÖÃΪ4£¬MapFunctionµÄ²¢ÐжÈҲΪ4£¬ReduceFunctionµÄ²¢ÐжÈΪ3£¬ÔÚJobManager¶Ë¶ÔÓ¦ÓÚExecutionGraph
4.ÓÒÉÏ×Óͼ£ºTaskManager 1ÉÏ£¬ÓÐ2¸ö²¢ÐеÄExecutionVertex×é³ÉµÄDAGͼ£¬ËüÃǸ÷Õ¼ÓÃÒ»¸öTask
Slot
5.ÓÒÏÂ×Óͼ£ºTaskManager 2ÉÏ£¬Ò²ÓÐ2¸ö²¢ÐеÄExecutionVertex×é³ÉµÄDAGͼ£¬ËüÃÇÒ²¸÷Õ¼ÓÃÒ»¸öTask
Slot
6.ÔÚ2¸öTaskManagerÉÏÔËÐеÄ4¸öExecutionÊDz¢ÐÐÖ´ÐеÄ
8. µü´ú
»úÆ÷ѧϰºÍͼ¼ÆËãÓ¦Ó㬶¼»áʹÓõ½µü´ú¼ÆË㣬Flinkͨ¹ýÔÚµü´úOperatorÖж¨ÒåStepº¯ÊýÀ´ÊµÏÖµü´úËã·¨£¬ÕâÖÖµü´úËã·¨°üÀ¨IterateºÍDelta
IterateÁ½ÖÖÀàÐÍ¡£
Iterate
Iterate OperatorÊÇÒ»ÖÖ¼òµ¥µÄµü´úÐÎʽ£ºÃ¿Ò»ÂÖµü´ú£¬Stepº¯ÊýµÄÊäÈë»òÕßÊÇÊäÈëµÄÕû¸öÊý¾Ý¼¯£¬»òÕßÊÇÉÏÒ»ÂÖµü´úµÄ½á¹û£¬Í¨¹ý¸ÃÂÖµü´ú¼ÆËã³öÏÂÒ»ÂÖ¼ÆËãËùÐèÒªµÄÊäÈ루Ҳ³ÆÎªNext
Partial Solution£©£¬Âú×ãµü´úµÄÖÕÖ¹Ìõ¼þºó£¬»áÊä³ö×îÖÕµü´ú½á¹û¡£
Á÷³Ìα´úÂ룺
IterationState state = getInitialState();
while (!terminationCriterion()) {
state = step(state);
}
setFinalState(state) |
Delta Iterate
Delta Iterate OperatorʵÏÖÁËÔöÁ¿µü´ú¡£
×îСֵ´«²¥£º
Á÷³Ìα´úÂ룺
IterationState
workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
(delta, workset) = step(workset, solution);
solution.update(delta)
}
setFinalState(solution);
|
×îСֵ´«²¥£º
9. Back Pressure¼à¿Ø
Á÷´¦ÀíϵͳÖУ¬µ±ÏÂÓÎOperator´¦ÀíËٶȸú²»ÉϵÄÇé¿ö£¬Èç¹ûÏÂÓÎOperatorÄܹ»½«×Ô¼º´¦Àí״̬´«²¥¸øÉÏÓÎOperator£¬Ê¹µÃÉÏÓÎOperator´¦ÀíËÙ¶ÈÂýÏÂÀ´¾Í»á»º½âÉÏÊöÎÊÌ⣬±ÈÈçͨ¹ý¸æ¾¯µÄ·½Ê½Í¨ÖªÏÖÓÐÁ÷´¦Àíϵͳ´æÔÚµÄÎÊÌâ¡£
Flink Web½çÃæÉÏÌṩÁ˶ÔÔËÐÐJobµÄBackpressureÐÐΪµÄ¼à¿Ø£¬Ëüͨ¹ýʹÓÃSamplingÏ̶߳ÔÕýÔÚÔËÐеÄTask½øÐжÑÕ»¸ú×Ù²ÉÑùÀ´ÊµÏÖ¡£
ĬÈÏÇé¿öÏ£¬JobManager»áÿ¼ä¸ô50ms´¥·¢¶ÔÒ»¸öJobµÄÿ¸öTaskÒÀ´Î½øÐÐ100´Î¶ÑÕ»¸ú×Ùµ÷Ó㬹ý¼ÆËãµÃµ½Ò»¸ö±ÈÖµ£¬ÀýÈ磬radio=0.01£¬±íʾ100´ÎÖнöÓÐ1´Î·½·¨µ÷ÓÃ×èÈû¡£FlinkĿǰ¶¨ÒåÁËÈçÏÂBackpressure״̬£º
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
Èý¡¢¿â
1. Table
FlinkµÄTable APIʵÏÖÁËʹÓÃÀàSQL½øÐÐÁ÷ºÍÅú´¦Àí¡£
ÏêÇé²Î¿¼
2. CEP
FlinkµÄCEP£¨Complex Event Processing£©Ö§³ÖÔÚÁ÷Öз¢ÏÖ¸´ÔÓµÄʼþģʽ£¬¿ìËÙɸѡÓû§¸ÐÐËȤµÄÊý¾Ý¡£
ÏêÇé²Î¿¼
3. Gelly
GellyÊÇFlinkÌṩµÄͼ¼ÆËãAPI£¬ÌṩÁ˼ò»¯¿ª·¢ºÍ¹¹½¨Í¼¼ÆËã·ÖÎöÓ¦ÓõĽӿڡ£
ÏêÇé²Î¿¼
4. FlinkML
FlinkMLÊÇFlinkÌṩµÄ»úÆ÷ѧϰ¿â£¬ÌṩÁË¿ÉÀ©Õ¹µÄ»úÆ÷ѧϰËã·¨¡¢¼ò½àµÄAPIºÍ¹¤¾ß¼ò»¯»úÆ÷ѧϰϵͳµÄ¿ª·¢¡£
ÏêÇé²Î¿¼
ËÄ¡¢²¿Êð
µ±FlinkϵͳÆô¶¯Ê±£¬Ê×ÏÈÆô¶¯JobManagerºÍÒ»ÖÁ¶à¸öTaskManager¡£JobManager¸ºÔðе÷Flinkϵͳ£¬TaskManagerÔòÊÇÖ´Ðв¢ÐгÌÐòµÄworker¡£µ±ÏµÍ³ÒÔ±¾µØÐÎʽÆô¶¯Ê±£¬Ò»¸öJobManagerºÍÒ»¸öTaskManager»áÆô¶¯ÔÚͬһ¸öJVMÖС£
µ±Ò»¸ö³ÌÐò±»Ìá½»ºó£¬ÏµÍ³»á´´½¨Ò»¸öClientÀ´½øÐÐÔ¤´¦Àí£¬½«³ÌÐòת±ä³ÉÒ»¸ö²¢ÐÐÊý¾ÝÁ÷µÄÐÎʽ£¬½»¸øJobManagerºÍTaskManagerÖ´ÐС£
1. Æô¶¯²âÊÔ
±àÒëflink£¬±¾µØÆô¶¯¡£
$ java -version
java version "1.8.0_111"
$ git clone https://github.com/apache/flink.git
$ git checkout release-1.1.4 -b release-1.1.4
$ cd flink
$ mvn clean package -DskipTests
$ cd flink-dist/target/flink-1.1.4-bin/flink-1.1.4
$ ./bin/start-local.sh |
±àд±¾µØÁ÷´¦Àídemo¡£
SocketWindowWordCount.java
public class
SocketWindowWordCount {
public static void main(String[] args) throws
Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please
run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost",
port, "\n");
// parse the data, group it, window it, and aggregate
the counts
DataStream<WordWithCount> windowCounts =
text
.flatMap(new FlatMapFunction<String, WordWithCount>()
{
public void flatMap(String value, Collector<WordWithCount>
out) {
for (String word : value.split("\\s"))
{
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>()
{
public WordWithCount reduce(WordWithCount a, WordWithCount
b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather
than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count)
{
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
|
pom.xml
<!-- Use
this dependency if you are using the DataStream
API -->
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<!-- Use this dependency if you are using the
DataSet API -->
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1.4</version>
</dependency> |
Ö´ÐÐmvn¹¹½¨¡£
$ mvn clean
install
$ ls target/flink-demo-1.0-SNAPSHOT.jar |
¿ªÆô9000¶Ë¿Ú£¬ÓÃÓÚÊäÈëÊý¾Ý£º
Ìá½»flinkÈÎÎñ£º
$ ./bin/flink
run -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar
--port 9000
|
ÔÚncÀïÊäÈëÊý¾Ýºó£¬²é¿´Ö´Ðнá¹û£º
$ tail -f log/flink-*-jobmanager-*.out
|
²é¿´flink webÒ³Ãæ£ºlocalhost:8081
2. ´úÂë½á¹¹
FlinkϵͳºËÐĿɷÖΪ¶à¸ö×ÓÏîÄ¿¡£·Ö¸îÏîĿּÔÚ¼õÉÙ¿ª·¢Flink³ÌÐòÐèÒªµÄÒÀÀµÊýÁ¿£¬²¢¶Ô²âÊԺͿª·¢Ð¡×é¼þÌṩ±ã½Ý¡£
Flinkµ±Ç°»¹°üÀ¨ÒÔÏÂ×ÓÏîÄ¿£º
Flink-dist£ºdistributionÏîÄ¿¡£Ëü¶¨ÒåÁËÈçºÎ½«±àÒëºóµÄ´úÂë¡¢½Å±¾ºÍÆäËû×ÊÔ´ÕûºÏµ½×îÖÕ¿ÉÓõÄĿ¼½á¹¹ÖС£
Flink-quick-start£ºÓйØquickstartºÍ½Ì³ÌµÄ½Å±¾¡¢mavenÔÐͺÍʾÀý³ÌÐò
flink-contrib£ºÒ»ÏµÁÐÓÐÓû§¿ª·¢µÄÔçÆð°æ±¾ºÍÓÐÓõŤ¾ßµÄÏîÄ¿¡£ºóÆÚµÄ´úÂëÖ÷ÒªÓÉÍⲿ¹±Ï×Õß¼ÌÐøÎ¬»¤£¬±»flink-contirb½ÓÊܵĴúÂëµÄÒªÇóµÍÓÚÆäËûÏîÄ¿µÄÒªÇó¡£
3. Flink On YARN
FlinkÔÚYARN¼¯ÈºÉÏÔËÐÐʱ£ºFlink YARN Client¸ºÔðÓëYARN RMͨÐÅÐÉÌ×ÊÔ´ÇëÇó£¬Flink
JobManagerºÍFlink TaskManager·Ö±ðÉêÇëµ½ContainerÈ¥ÔËÐи÷×ԵĽø³Ì¡£
YARN AMÓëFlink JobManagerÔÚͬһ¸öContainerÖУ¬ÕâÑùAM¿ÉÒÔÖªµÀFlink
JobManagerµÄµØÖ·£¬´Ó¶øAM¿ÉÒÔÉêÇëContainerÈ¥Æô¶¯Flink TaskManager¡£´ýFlink³É¹¦ÔËÐÐÔÚYARN¼¯ÈºÉÏ£¬Flink
YARN Client¾Í¿ÉÒÔÌá½»Flink Jobµ½Flink JobManager£¬²¢½øÐкóÐøµÄÓ³Éä¡¢µ÷¶ÈºÍ¼ÆËã´¦Àí¡£
ÉèÖÃHadoop»·¾³±äÁ¿
$ export HADOOP_CONF_DIR=/etc/hadoop/conf
|
ÒÔ¼¯ÈºÄ£Ê½Ìá½»ÈÎÎñ£¬Ã¿´Î¶¼»áн¨flink¼¯Èº
$ ./bin/flink
run -m yarn-cluster -c com.demo.florian.WordCount
$DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar
|
Æô¶¯¹²Ïíflink¼¯Èº£¬Ìá½»ÈÎÎñ
$ ./bin/yarn-session.sh
-n 4 -jm 1024 -tm 4096 -d
$ ./bin/flink run -c?com.demo.florian.WordCount
$DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar
|
|