±à¼ÍƼö: |
±¾ÎÄ´ø´ó¼ÒʹÓÃ
Docker Compose ¿ìËÙÉÏÊÖ Flink SQL µÄ±à³Ì£¬²¢¶Ô±È Window
Aggregate ºÍ Group Aggregate µÄÇø±ð£¬ÒÔ¼°ÕâÁ½ÖÖÀàÐ͵Ä×÷ÒµÈçºÎдÈëµ½
ÍⲿϵͳÖС£
±¾ÎÄÀ´×ÔÓÚ¸öÈ˲©¿ÍJark's Blog
£¬ÓÉAlice±à¼¡¢ÍƼö¡£ |
|
ͨ¹ý±¾¿ÎÄãÄÜѧµ½Ê²Ã´£¿
±¾ÎĽ«Í¨¹ýÎå¸öʵÀýÀ´¹á´© Flink SQL µÄ±à³Ìʵ¼ù£¬Ö÷Òª»áº¸ÇÒÔϼ¸¸ö·½ÃæµÄÄÚÈÝ¡£
1.ÈçºÎʹÓà SQL CLI ¿Í»§¶Ë
2.ÈçºÎÔÚÁ÷ÉÏÔËÐÐ SQL ²éѯ
3.ÔËÐÐ window aggregate Óë non-window
aggregate£¬Àí½âÆäÇø±ð
4.ÈçºÎÓà SQL Ïû·Ñ Kafka Êý¾Ý
5.ÈçºÎÓà SQL ½«½á¹ûдÈë Kafka ºÍ ElasticSearch
±¾Îļٶ¨ÄúÒѾ߱¸»ù´¡µÄ SQL ֪ʶ¡£
»·¾³×¼±¸
±¾ÎĽ̳ÌÊÇ»ùÓÚ Docker ½øÐеģ¬Òò´ËÄãÖ»ÐèÒª°²×°ÁË Docker ¼´¿É¡£²»ÐèÒªÒÀÀµ Java¡¢Scala
»·¾³¡¢»òÊÇIDE¡£
×¢Ò⣺Docker ĬÈÏÅäÖõÄ×ÊÔ´¿ÉÄܲ»Ì«¹»£¬»áµ¼ÖÂÔËÐÐ Flink Job ʱ¿¨ËÀ¡£Òò´ËÍÆ¼öÅäÖÃ
Docker ×ÊÔ´µ½ 3-4 GB£¬3-4 CPUs¡£

±¾´Î½Ì³ÌµÄ»·¾³Ê¹Óà Docker Compose À´°²×°£¬°üº¬ÁËËùÐèµÄ¸÷ÖÖ·þÎñµÄÈÝÆ÷£¬°üÀ¨£º
1.Flink SQL Client£ºÓÃÀ´Ìá½»query£¬ÒÔ¼°¿ÉÊÓ»¯½á¹û
2.Flink JobManager ºÍ TaskManager£ºÓÃÀ´ÔËÐÐ
Flink SQL ÈÎÎñ¡£
3.Apache Kafka£ºÓÃÀ´Éú³ÉÊäÈëÁ÷ºÍдÈë½á¹ûÁ÷¡£
4.Apache Zookeeper£ºKafka µÄÒÀÀµÏî
5.ElasticSearch£ºÓÃÀ´Ð´Èë½á¹û
ÎÒÃÇÒѾÌṩºÃÁËDocker Compose ÅäÖÃÎļþ£¬¿ÉÒÔÖ±½ÓÏÂÔØ docker-compose.yml
Îļþ¡£
È»ºó´ò¿ªÃüÁîÐд°¿Ú£¬½øÈë´æ·Å docker-compose.yml ÎļþµÄĿ¼£¬È»ºóÔËÐÐÒÔÏÂÃüÁ
Linux & MacOS
Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d |
docker-compose ÃüÁî»áÆô¶¯ËùÓÐËùÐèµÄÈÝÆ÷¡£µÚÒ»´ÎÔËÐеÄʱºò£¬Docker »á×Ô¶¯µØ´Ó
Docker Hub ÏÂÔØ¾µÏñ£¬Õâ¿ÉÄÜ»áÐèÒªÒ»¶Îʱ¼ä£¨½«½ü 2.3GB£©¡£Ö®ºóÔËÐеϰ£¬¼¸ÃëÖÓ¾ÍÄÜÆô¶¯ÆðÀ´ÁË¡£ÔËÐгɹ¦µÄ»°£¬»áÔÚÃüÁîÐÐÖп´µ½ÒÔÏÂÊä³ö£¬²¢ÇÒÒ²¿ÉÒÔÔÚ
http://localhost:8081 ·ÃÎʵ½ Flink Web UI¡£

ÔËÐÐ Flink SQL CLI ¿Í»§¶Ë
ÔËÐÐÏÂÃæÃüÁî½øÈë Flink SQL CLI ¡£
docker-compose
exec sql-client ./sql-client.sh |
docker-compose exec sql-client ./sql-client.sh
¸ÃÃüÁî»áÔÚÈÝÆ÷ÖÐÆô¶¯ Flink SQL CLI ¿Í»§¶Ë¡£È»ºóÄã»á¿´µ½ÈçÏµĻ¶Ó½çÃæ¡£

Êý¾Ý½éÉÜ
Docker Compose ÖÐÒѾԤÏÈ×¢²áÁËһЩ±íºÍÊý¾Ý£¬¿ÉÒÔÔËÐÐ SHOW TABLES; À´²é¿´¡£±¾ÎÄ»áÓõ½µÄÊý¾ÝÊÇ
Rides ±í£¬ÕâÊÇÒ»Õųö×â³µµÄÐгµ¼Ç¼Êý¾ÝÁ÷£¬°üº¬ÁËʱ¼äºÍλÖÃÐÅÏ¢£¬ÔËÐÐ DESCRIBE Rides;
¿ÉÒԲ鿴±í½á¹¹¡£
Flink SQL>
DESCRIBE Rides;
root
|-- rideId: Long // ÐÐΪID (°üº¬Á½Ìõ¼Ç¼£¬Ò»ÌõÈëÒ»Ìõ³ö£©
|-- taxiId: Long // ³ö×â³µID
|-- isStart: Boolean // ¿ªÊ¼ or ½áÊø
|-- lon: Float // ¾¶È
|-- lat: Float // γ¶È
|-- rideTime: TimeIndicatorTypeInfo(rowtime) //
ʱ¼ä
|-- psgCnt: Integer // ³Ë¿ÍÊý |
Rides ±íµÄÏêϸ¶¨Òå¼û training-config.yaml¡£
ʵÀý1£º¹ýÂË
ÀýÈçÎÒÃÇÏÖÔÚÖ»Ïë²é¿´·¢ÉúÔÚŦԼµÄÐгµ¼Ç¼¡£
×¢£ºDocker »·¾³ÖÐÒѾԤ¶¨ÒåÁËһЩÄÚÖú¯Êý£¬Èç isInNYC(lon, lat) ¿ÉÒÔÈ·¶¨Ò»¸ö¾Î³¶ÈÊÇ·ñÔÚŦԼ£¬toAreaId(lon,
lat) ¿ÉÒÔ½«¾Î³¶Èת»»³ÉÇø¿é¡£
Òò´Ë£¬´Ë´¦ÎÒÃÇ¿ÉÒÔʹÓà isInNYC À´¿ìËÙ¹ýÂ˳öŦԼµÄÐгµ¼Ç¼¡£ÔÚ SQL CLI ÖÐÔËÐÐÈçÏÂ
Query£º
SELECT * FROM
Rides WHERE isInNYC(lon, lat); |
SQL CLI ±ã»áÌá½»Ò»¸ö SQL ÈÎÎñµ½ Docker ¼¯ÈºÖУ¬´ÓÊý¾ÝÔ´£¨Rides Á÷´æ´¢ÔÚKafkaÖУ©²»¶ÏÀÈ¡Êý¾Ý£¬²¢Í¨¹ý
isInNYC ¹ýÂ˳öËùÐèµÄÊý¾Ý¡£SQL CLI Ò²»á½øÈë¿ÉÊÓ»¯Ä£Ê½£¬²¢²»¶ÏË¢ÐÂչʾ¹ýÂ˺óµÄ½á¹û£º

Ò²¿ÉÒÔµ½ http://localhost:8081 ²é¿´ Flink ×÷ÒµµÄÔËÐÐÇé¿ö¡£
ʵÀý2£ºGroup Aggregate
ÎÒÃǵÄÁíÒ»¸öÐèÇóÊǼÆËã´îÔØÃ¿Öֳ˿ÍÊýÁ¿µÄÐгµÊ¼þÊý¡£Ò²¾ÍÊÇ´îÔØ1¸ö³Ë¿ÍµÄÐгµÊý¡¢´îÔØ2¸ö³Ë¿ÍµÄÐгµ¡
µ±È»£¬ÎÒÃÇÈÔȻֻ¹ØÐÄŦԼµÄÐгµÊ¼þ¡£
Òò´Ë£¬ÎÒÃÇ¿ÉÒÔ°´Õճ˿ÍÊýpsgCnt×ö·Ö×飬ʹÓà COUNT(*) ¼ÆËã³öÿ¸ö·Ö×éµÄʼþÊý£¬×¢ÒâÔÚ·Ö×éǰÐèÒªÏȹýÂ˳öisInNYCµÄÊý¾Ý¡£ÔÚ
SQL CLI ÖÐÔËÐÐÈçÏ Query£º
SELECT psgCnt,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY psgCnt; |
SQL CLI µÄ¿ÉÊÓ»¯½á¹ûÈçÏÂËùʾ£¬½á¹ûÿÃë¶¼ÔÚ·¢Éú±ä»¯¡£²»¹ý×î´óµÄ³Ë¿ÍÊý²»»á³¬¹ý 6 ÈË¡£

ʵÀý3£ºWindow Aggregate
ΪÁ˳ÖÐøµØ¼à²âŦԼµÄ½»Í¨Á÷Á¿£¬ÐèÒª¼ÆËã³öÿ¸öÇø¿éÿ5·ÖÖӵĽøÈëµÄ³µÁ¾Êý¡£ÎÒÃÇÖ»¹ØÐÄÖÁÉÙÓÐ5Á¾³µ×Ó½øÈëµÄÇø¿é¡£
´Ë´¦Ðè񻃾¼°µ½´°¿Ú¼ÆË㣨ÿ5·ÖÖÓ£©£¬ËùÒÔÐèÒªÓõ½ Tumbling Window µÄÓï·¨¡£¡°Ã¿¸öÇø¿é¡±
ËùÒÔ»¹Òª°´ÕÕ toAreaId ½øÐзÖ×鼯Ëã¡£¡°½øÈëµÄ³µÁ¾Êý¡± ËùÒÔÔÚ·Ö×éǰÐèÒª¸ù¾Ý isStart
×ֶιýÂ˳ö½øÈëµÄÐгµ¼Ç¼£¬²¢Ê¹Óà COUNT(*) ͳ¼Æ³µÁ¾Êý¡£×îºó»¹ÓÐÒ»¸ö ¡°ÖÁÉÙÓÐ5Á¾³µ×ÓµÄÇø¿é¡±
µÄÌõ¼þ£¬ÕâÊÇÒ»¸ö»ùÓÚͳ¼ÆÖµµÄ¹ýÂËÌõ¼þ£¬ËùÒÔ¿ÉÒÔÓà SQL HAVING ×Ó¾äÀ´Íê³É¡£
×îºóµÄ Query ÈçÏÂËùʾ£º
SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) and isStart
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5; |
ÔÚ SQL CLI ÖÐÔËÐÐºó£¬Æä¿ÉÊÓ»¯½á¹ûÈçÏÂËùʾ£¬Ã¿¸ö area + window_end µÄ½á¹ûÊä³öºó¾Í²»»áÔÙ·¢Éú±ä»¯£¬µ«ÊÇ»áÿ¸ô
5 ·ÖÖÓ»áÊä³öÒ»Åúд°¿ÚµÄ½á¹û¡£ÒòΪ Docker »·¾³ÖеÄsourceÎÒÃÇ×öÁË10±¶µÄ¼ÓËÙ¶ÁÈ¡£¨Ïà¶ÔÓÚÔʼËÙ¶È£©£¬ËùÒÔÑÝʾµÄʱºò£¬´ó¸Åÿ¸ô30Ãë¾Í»áÊä³öÒ»Åúд°¿Ú¡£

Window Aggregate Óë Group Aggregate µÄÇø±ð
´ÓʵÀý2ºÍʵÀý3µÄ½á¹ûÏÔʾÉÏ£¬¿ÉÒÔÌåÑé³öÀ´ Window Aggregate Óë Group Aggregate
ÊÇÓÐһЩÃ÷ÏÔµÄÇø±ðµÄ¡£ÆäÖ÷ÒªµÄÇø±ðÊÇ£¬Window Aggregate Êǵ±window½áÊøÊ±²ÅÊä³ö£¬ÆäÊä³öµÄ½á¹ûÊÇ×îÖÕÖµ£¬²»»áÔÙ½øÐÐÐ޸쬯äÊä³öÁ÷ÊÇÒ»¸ö
Append Á÷¡£¶ø Group Aggregate ÊÇÿ´¦ÀíÒ»ÌõÊý¾Ý£¬¾ÍÊä³ö×îеĽá¹û£¬Æä½á¹ûÊÇÔÚ²»¶Ï¸üÐµģ¬¾ÍºÃÏñÊý¾Ý¿âÖеÄÊý¾ÝÒ»Ñù£¬ÆäÊä³öÁ÷ÊÇÒ»¸ö
Update Á÷¡£
ÁíÍâÒ»¸öÇø±ðÊÇ£¬window ÓÉÓÚÓÐ watermark £¬¿ÉÒÔ¾«È·ÖªµÀÄÄЩ´°¿ÚÒѾ¹ýÆÚÁË£¬ËùÒÔ¿ÉÒÔ¼°Ê±ÇåÀí¹ýÆÚ״̬£¬±£Ö¤×´Ì¬Î¬³ÖÔÚÎȶ¨µÄ´óС¡£¶ø
Group Aggregate ÒòΪ²»ÖªµÀÄÄЩÊý¾ÝÊǹýÆÚµÄ£¬ËùÒÔ״̬»áÎÞÏÞÔö³¤£¬Õâ¶ÔÓÚÉú²ú×÷ÒµÀ´Ëµ²»ÊǺÜÎȶ¨£¬ËùÒÔ½¨Òé¶Ô
Group Aggregate µÄ×÷ÒµÅäÉÏ State TTL µÄÅäÖá£

ÀýÈçͳ¼ÆÃ¿¸öµêÆÌÿÌìµÄʵʱPV£¬ÄÇô¾Í¿ÉÒÔ½« TTL ÅäÖÃ³É 24+ Сʱ£¬ÒòΪһÌìǰµÄ״̬һ°ãÀ´Ëµ¾ÍÓò»µ½ÁË¡£
SELECT DATE_FORMAT(ts,
'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id
|
µ±È»£¬Èç¹û TTL ÅäÖõØÌ«Ð¡£¬¿ÉÄÜ»áÇå³ýµôһЩÓÐÓõÄ״̬ºÍÊý¾Ý£¬´Ó¶øµ¼ÖÂÊý¾Ý¾«È·ÐÔµØÎÊÌâ¡£ÕâÒ²ÊÇÓû§ÐèҪȨºâµØÒ»¸ö²ÎÊý¡£
ʵÀý4£º½« Append Á÷дÈë Kafka
ÉÏһС½Ú½éÉÜÁË Window Aggregate ºÍ Group Aggregate µÄÇø±ð£¬ÒÔ¼°
Append Á÷ºÍ Update Á÷µÄÇø±ð¡£ÔÚ Flink ÖУ¬Ä¿Ç° Update Á÷Ö»ÄÜдÈëÖ§³Ö¸üеÄÍⲿ´æ´¢£¬Èç
MySQL, HBase, ElasticSearch¡£Append Á÷¿ÉÒÔдÈëÈÎÒâµØ´æ´¢£¬²»¹ýÒ»°ãдÈëÈÕÖ¾ÀàÐ͵Äϵͳ£¬Èç
Kafka¡£
ÕâÀïÎÒÃÇÏ£Íû½«¡°Ã¿10·ÖÖӵĴî³ËµÄ³Ë¿ÍÊý¡±Ð´ÈëKafka¡£
ÎÒÃÇÒѾԤ¶¨ÒåÁËÒ»ÕÅ Kafka µÄ½á¹û±í Sink_TenMinPsgCnts£¨training-config.yaml
ÖÐÓÐÍêÕûµÄ±í¶¨Ò壩¡£
ÔÚÖ´ÐÐ Query ǰ£¬ÎÒÃÇÏÈÔËÐÐÈçÏÂÃüÁÀ´¼à¿ØÐ´Èëµ½ TenMinPsgCnts topic
ÖеÄÊý¾Ý£º
docker-compose
exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092 --topic TenMinPsgCnts
--from-beginning |
ÿ10·ÖÖӵĴî³ËµÄ³Ë¿ÍÊý¿ÉÒÔʹÓà Tumbling Window À´ÃèÊö£¬ÎÒÃÇʹÓà INSERT INTO
Sink_TenMinPsgCnts À´Ö±½Ó½« Query ½á¹ûдÈëµ½½á¹û±í¡£
INSERT INTO Sink_TenMinPsgCnts
SELECT
TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS
cntStart,
TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS
cntEnd,
CAST(SUM(psgCnt) AS BIGINT) AS cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);
|
ÎÒÃÇ¿ÉÒÔ¼à¿Øµ½ TenMinPsgCnts topic µÄÊý¾ÝÒÔ JSON µÄÐÎʽдÈëµ½ÁË Kafka
ÖУº

ʵÀý5£º½« Update Á÷дÈë ElasticSearch
×îºóÎÒÃÇʵ¼ùһϽ«Ò»¸ö³ÖÐø¸üÐ嵀 Update Á÷дÈë ElasticSearch ÖС£ÎÒÃÇÏ£Íû½«¡°Ã¿¸öÇøÓò³ö·¢µÄÐгµÊý¡±£¬Ð´Èëµ½
ES ÖС£
ÎÒÃÇÒ²ÒѾԤ¶¨ÒåºÃÁËÒ»ÕÅ Sink_AreaCnts µÄ ElasticSearch ½á¹û±í£¨training-config.yaml
ÖÐÓÐÍêÕûµÄ±í¶¨Ò壩¡£¸Ã±íÖÐÖ»ÓÐÁ½¸ö×Ö¶Î areaId ºÍ cnt¡£
ͬÑùµÄ£¬ÎÒÃÇҲʹÓà INSERT INTO ½« Query ½á¹ûÖ±½ÓдÈëµ½ Sink_AreaCnts
±íÖС£
INSERT INTO Sink_AreaCnts
SELECT toAreaId(lon, lat) AS areaId, COUNT(*)
AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat); |
ÔÚ SQL CLI ÖÐÖ´ÐÐÉÏÊö Query ºó£¬Elasticsearch »á×Ô¶¯µØ´´½¨ area-cnts
Ë÷Òý¡£Elasticsearch ÌṩÁËÒ»¸ö REST API ¡£ÎÒÃÇ¿ÉÒÔ·ÃÎÊ
²é¿´area-cntsË÷ÒýµÄÏêϸÐÅÏ¢£º http://localhost:9200/area-cnts
²é¿´area-cntsË÷ÒýµÄͳ¼ÆÐÅÏ¢£º http://localhost:9200/area-cnts/_stats
·µ»Øarea-cntsË÷ÒýµÄÄÚÈÝ£ºhttp://localhost:9200/area-cnts/_search
ÏÔʾ Çø¿é 49791 µÄÐгµÊý£ºhttp://localhost:9200/area-cnts/_search?q=areaId:49791
Ëæ×Å Query µÄÒ»Ö±ÔËÐУ¬ÄãÒ²¿ÉÒԹ۲쵽һЩͳ¼ÆÖµ£¨_all.primaries.docs.count,
_all.primaries.docs.deleted£©ÔÚ²»¶ÏµÄÔö³¤£ºhttp://localhost:9200/area-cnts/_stats
×ܽá
±¾ÎÄ´ø´ó¼ÒʹÓà Docker Compose ¿ìËÙÉÏÊÖ Flink SQL µÄ±à³Ì£¬²¢¶Ô±È Window
Aggregate ºÍ Group Aggregate µÄÇø±ð£¬ÒÔ¼°ÕâÁ½ÖÖÀàÐ͵Ä×÷ÒµÈçºÎдÈëµ½ ÍⲿϵͳÖС£¸ÐÐËȤµÄͬѧ£¬¿ÉÒÔ»ùÓÚÕâ¸ö
Docker »·¾³¸ü¼ÓÉîÈëµØÈ¥Êµ¼ù£¬ÀýÈçÔËÐÐ×Ô¼ºÐ´µÄ UDF , UDTF, UDAF¡£²éѯÄÚÖÃµØÆäËûÔ´±íµÈµÈ¡£ |