±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½éÉÜÁËFlink
SQL »ù±¾ÄÜÁ¦ÒÔ¼°¼Ü¹¹ÔÀí£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×ÔÓÚInfoQ
£¬ÓÉAlice±à¼¡¢ÍƼö¡£
|
|
´óÊý¾ÝÒÔÀëÏß¼ÆËã¾Ó¶à£¬´óÊý¾ÝԽʵʱԽÓмÛÖµ¡£Êý¾Ý¼ÛÖµ×î´ó»¯µÄÓÐЧ·½Ê½¾ÍÊÇͨ¹ýʵʱÁ÷¼ÆËã¼¼Êõ£¨Flink/Spark
µÈ£©¿ìËٰѼÆËã½á¹û·´À¡¸øÓû§£¬Ìá¸ßת»¯ÂÊ£¬±£Ö¤ÏßϲúÆ·µÄÕý³£ÔËÐС£¶ø SQL ÊÇͨÓÃÓïÑÔ£¬ÈÝÒ×ÉÏÊÖ£¬ÏÂÃæ¾Í½éÉÜÏÂ
Flink SQL »ù±¾ÄÜÁ¦¡£
1. Get Started
Flink SQL ÊÇ Flink ¸ß²ã API£¬Óï·¨×ñÑ ANSI SQL ±ê×¼¡£Ê¾ÀýÈçÏÂ
SELECT car_id,
MAX(speed), COUNT(speed)
FROM drive_data
WHERE speed > 90
GROUP BY TUMBLE (proctime, INTERVAL '30' SECOND),
car_id |
Flink SQL ÊÇÔÚ Flink Table API µÄ»ù´¡ÉÏ·¢Õ¹ÆðÀ´µÄ£¬ÓëÉÏÊöʾÀý¶ÔÓ¦µÄ Table
API ʾÀýÈçÏÂ
table.where('speed
> 90)
.window(Tumble over 30.second on 'proctime as
'w)
.groupBy('w, 'car_id)
.select('car_id, 'speed.max, 'speed.count)
|
ÉÏÊöʾÀýʹÓà Scala ´úÂ룬½áºÏÒþʽת»»ºÍÖÐ׺±íʾµÈ Scala
Óï·¨£¬Table API ´úÂë¿´ÆðÀ´·Ç³£½Ó½ü SQL ±í´ï¡£
2. ¼Ü¹¹ÔÀí
Àϰ汾µÄ Table API ͨ¹ýÀàËÆÁ´Ê½µ÷ÓõÄд·¨£¬¹¹ÔìÒ»¿Ã Table
Operator Ê÷£¬²¢¶Ô¸÷¸öÊ÷½Úµã×ö´úÂëÉú³É£¬×ª»¯³É Flink µÍ²ã API µ÷ÓôúÂ룬¼´ DataStream/DataSet
API¡£
´Ó 2016 Ä꿪ʼ£¬¿ªÔ´ÉçÇøÒѾÓдóÁ¿ SQL-on-Hadoop µÄ³ÉÊì½â¾ö·½°¸£¬°üÀ¨ Apache
Hive¡¢Apache Impala¡¢Apache Drill µÈµÈ£¬¶¼ÒÀÀµ Apache Calcite
ÌṩµÄ SQL ½âÎöÓÅ»¯ÄÜÁ¦£¬Apache Calcite µ±Ê±ÒѾÊÇÒ»¸ö·Ç³£Á÷ÐеÄÒµ½ç±ê×¼ SQL
½âÎöºÍÓÅ»¯¿ò¼Ü¡£ÓÚ´Ëͬʱ£¬Ëæ×ÅÔÚʵʱ·ÖÎöÁìÓòÖÐ Flink µÄÓ¦Óó¡¾°Ôö¼Ó£¬¶Ô SQL API µÄºôÉù½¥¸ß£¬ÓÚÊÇÉçÇø¿ªÊ¼ÔÚ
Apache Calcite µÄ»ù´¡ÉϹ¹½¨Ð°汾µÄ Table API£¬²¢Ôö¼Ó SQL API Ö§³Ö¡£

а汾µÄ Table & SQL API ÔÚÔÓÐµÄ Table
API »ù´¡ÉÏ£¬ÓÉ Calcite Ìṩ SQL ½âÎöºÍÓÅ»¯ÄÜÁ¦£¬½« Table API µ÷ÓÃºÍ SQL
²éѯͳһת»»³É Calcite Âß¼Ö´Ðмƻ®£¨Calcite RelNode Ê÷£©£¬²¢¶Ô´Ë½øÐÐÓÅ»¯ºÍ´úÂëÉú³É£¬×îÖÕͬÑùת»¯³É
Flink DataStream/DataSet API µ÷ÓôúÂë¡£
3. DDL & DML
ÍêÕûµÄ SQL Óï·¨ÓÉ DDL£¨data definition language£©ºÍ
DML£¨data manipulation language£©Á½²¿·Ö×é³É¡£Flink SQL Ŀǰֻ֧³Ö
DML Óï·¨£¬¶ø°üº¬Êý¾ÝÁ÷¶¨ÒåµÄ DDL Óï·¨ÈÔÐèͨ¹ý´úÂëʵÏÖ¡£
¹úÄÚ¸÷´ó¹«ÓÐÔÆ³§ÉÌÖУ¬»ªÎªÔƺͰ¢ÀïÔÆ¶¼ÌṩÁË»ùÓÚ Flink SQL µÄʵʱÁ÷¼ÆËã·þÎñ£¬¸÷×Ô¶¨ÒåÁËÒ»Ì×
DDL Óï·¨£¬Óï·¨´óͬСÒì¡£ÒÔ»ªÎªÔÆÎªÀý£¬Êý¾ÝÁ÷¶¨ÒåÒÔCREATE STREAMΪ¹Ø¼ü×Ö£¬¾ßÌåµÄ DDL
д·¨Ê¾ÀýÈçÏÂ
CREATE SOURCE
STREAM driver_behavior (car_id STRING, speed INT,
collect_time LONG)
WITH (
type = "kafka",
kafka_bootstrap_servers = "10.10.10.10:3456,10.10.10.20:3456",
kafka_group_id = "group1",
kafka_topic = "topic1",
encode = "csv",
field_delimiter = ","
) TIMESTAMP BY collect_time.ROWTIME;
CREATE SINK STREAM over_speed_warning (message
STRING)
WITH (
type = "smn",
region = "cn-north-1",
topic_urn = "urn:smn:cn-north-1:38834633fd6f4bae813031b5985dbdea:warning",
message_subject = "title",
message_column = "message"
); |
DDL Öаüº¬ÊäÈëÊý¾ÝÁ÷ºÍÊä³öÊý¾ÝÁ÷¶¨Ò壬ÃèÊöʵʱÁ÷¼ÆËãµÄÊý¾ÝÉÏÏÂÓÎÉú̬×é¼þ£¬ÔÚÉÏÊöÀý×ÓÖУ¬ÊäÈëÁ÷£¨SOURCE
STREAM£©ÀàÐÍÊÇ Kafka£¬WITH×Ó¾äÃèÊöÁË Kafka Ïû·ÑÕßÏà¹ØÅäÖá£Êä³öÁ÷£¨SINK
STREAM£©ÀàÐÍÊÇ SMN£¬ÊÇ»ªÎªÔÆÏûϢ֪ͨ·þÎñµÄËõд£¬ÓÃÓÚ¶ÌÐźÍÓʼþ֪ͨ¡£
Êý¾Ý´Ó Kafka Á÷È룬Ïò SMN ·þÎñÁ÷³ö£¬¶øÖмäµÄÊý¾Ý´¦ÀíÂß¼ÓÉ
DML ʵÏÖ£¬¾ßÌåµÄ DML д·¨Ê¾ÀýÈçÏÂ
INSERT INTO over_speed_warning
SELECT "your car speed (" || CAST(speed
as CHAR(20)) || ") exceeds the maximum speed."
FROM (
SELECT car_id, MAX(speed) AS speed, COUNT(speed)
AS overspeed_count
FROM driver_behavior
WHERE speed > 90
GROUP BY TUMBLE (collect_time, INTERVAL '30' SECOND),
car_id
)
WHERE overspeed_count >= 3; |
ÒÔÉÏ DML Óï¾ä£¬ÃèÊöÁËÔÚ 30 ÃëÄÚ³µÁ¾ÀۼƳ¬ËÙÈý´Îʱ£¬Ïò×÷ΪÊä³öÁ÷µÄÏÂÓÎ SMN ×é¼þÊä³ö¸æ¾¯ÏûÏ¢¡£DML
Óï¾äÖÐINSERT INTO¹Ø¼ü×Öºó½ô½Ó×ÅÊä³öÁ÷Ãû£¬¶øFROM¹Ø¼ü×Öºó½ô½Ó×ÅÊäÈëÁ÷Ãû£¬SELECT
×Ó¾ä±í´ïÊä³öµÄÄÚÈÝ£¬WHERE×Ó¾ä±í´ïÊä³öÐèÒªÂú×ãµÄ¹ýÂËÌõ¼þ¡£ÉÏÊöÀý×ÓʹÓõ½ÁË SQL ×Ó²éѯ£¬Íâ²ãFROMºó¸ú×ÅÒ»Õû¸öSELECT×Ӿ䣬ΪÁË·½±ãÀí½â£¬ÎÒÃÇÒ²¿ÉÒÔ°Ñ×Ó²éѯÓ﷨ת»¯³ÉµÈ¼ÛµÄÁÙʱÁ÷¶¨Òå±í´ï£¬ÔÚ»ªÎªÔÆÊµÊ±Á÷¼ÆËã·þÎñµÄ
DDL Óï·¨ÖÐÖ§³ÖÁËÕâÖÖÌØÐÔ£¬ÓëÉÏÊö DML д·¨µÈ¼ÛµÄʾÀýÈçÏÂ
CREATE TEMP STREAM
over_speed_info (car_id STRING, speed INT, overspeed_count
INT);
INSERT INTO over_speed_info
SELECT car_id, MAX(speed) AS speed, COUNT(speed)
AS overspeed_count
FROM driver_behavior
WHERE speed > 90
GROUP BY TUMBLE (collect_time, INTERVAL '30' SECOND),
car_id;
INSERT INTO over_speed_warning
SELECT "your car speed (" || CAST(speed
as CHAR(20)) || ") exceeds the maximum speed."
FROM over_speed_info
WHERE overspeed_count >= 3; |
ͨ¹ýTEMP STREAM Óï·¨¶¨ÒåÁÙʱÁ÷£¬¿ÉÒÔ½«´øÓÐ×Ó²éѯµÄ SQL
Ó﷨ƽÆÌ±í´ï£¬´®½ÓÊý¾ÝÁ÷Âß¼£¬¸üÈÝÒ×Àí½â¡£
4. Óï·¨
Flink SQL µÄºËÐIJ¿·ÖÊÇ DML Óï·¨£¬»ù´¡µÄ DML Óï·¨°üº¬µÑ¿¨¶û»ý£¨µ¥±íÇé¿öÏÂÖ»ÓÐ
Scan ²Ù×÷£©¡¢Ñ¡Ôñ£¨Filter£©ºÍͶӰ£¨Projection£©Èý¸öÊý¾Ý²Ù×÷²¿·Ö£¬ÈýÕß·Ö±ð¶ÔÓ¦FROM×Ӿ䡢WHERE
×Ó¾äºÍSELECT×Ӿ䣬ÕâÈý¸ö²¿·ÖµÄ˳Ðò´ú±íÁË DML Óï¾äµÄÂß¼Ö´ÐÐ˳Ðò¡£½ÏΪ½ø½×µÄÓï·¨°üº¬¾ÛºÏ¡¢´°¿ÚºÍÁ¬½Ó£¨JOIN£©µÈ³£ÓÃÓï·¨£¬ÒÔ¼°ÅÅÐò¡¢ÏÞÖÆºÍ¼¯ºÏµÈ·Ç³£ÓÃÓï·¨¡£Ï±í¼òµ¥ÁоÙ
Flink SQL »ù´¡ºÍ³£ÓõĽø½× DML Óï·¨¾äʽ²¢¼ÓÒÔ˵Ã÷£¬ÆäËûÓï·¨ÔªËØºÍÄÚ½¨º¯ÊýµÈÏêϸÄÚÈÝ£¬¿É²Î¿¼Flink
SQLÎĵµ
»ù´¡Óï·¨

¾ÛºÏÓï·¨

Á¬½ÓÓï·¨

5. ³¡¾°
Ŀǰ Flink SQL µÄÓ¦Óù㷺£¬¿ÉÒÔÓÃÔÚ IoT¡¢³µÁªÍø¡¢Öǻ۳ÇÊС¢ÈÕÖ¾·ÖÎö¡¢ETL¡¢ÊµÊ±´óÆÁ¡¢ÊµÊ±¸æ¾¯¡¢ÊµÊ±ÍƼöµÈµÈ¡£ÔÚ
IoT ºÍ³µÁªÍøµÈÐÐÒµ¶Ô Flink Óиü¸ßµÄÒªÇó£¬Èçʱ¼äµØÀíº¯Êý¡¢CEP SQL¡¢StreamingML
µÈ£¬¸÷¸öÔÆ³§É̶¼Óв»Í¬³Ì¶ÈµÄʵÏÖ£¬»ªÎªÔÆÊµÊ±Á÷¼ÆËãÔÚÕâ·½ÃæÌØÐÔ×îΪ·á¸»¡£ |