Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Flink SQL Ô­Àí¼°Ê¹ÓÃÈëÃÅ
 
×÷Õߣº»ªÎªÔƲúÆ·Óë½â¾ö·½°¸
  5038  次浏览      29
 2021-3-8
 
±à¼­ÍƼö:
±¾ÎÄÖ÷Òª½éÉÜÁËFlink SQL »ù±¾ÄÜÁ¦ÒÔ¼°¼Ü¹¹Ô­Àí£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×ÔÓÚInfoQ £¬ÓÉAlice±à¼­¡¢ÍƼö¡£

´óÊý¾ÝÒÔÀëÏß¼ÆËã¾Ó¶à£¬´óÊý¾ÝԽʵʱԽÓмÛÖµ¡£Êý¾Ý¼ÛÖµ×î´ó»¯µÄÓÐЧ·½Ê½¾ÍÊÇͨ¹ýʵʱÁ÷¼ÆËã¼¼Êõ£¨Flink/Spark µÈ£©¿ìËٰѼÆËã½á¹û·´À¡¸øÓû§£¬Ìá¸ßת»¯ÂÊ£¬±£Ö¤ÏßϲúÆ·µÄÕý³£ÔËÐС£¶ø SQL ÊÇͨÓÃÓïÑÔ£¬ÈÝÒ×ÉÏÊÖ£¬ÏÂÃæ¾Í½éÉÜÏ Flink SQL »ù±¾ÄÜÁ¦¡£

1. Get Started

Flink SQL ÊÇ Flink ¸ß²ã API£¬Óï·¨×ñÑ­ ANSI SQL ±ê×¼¡£Ê¾ÀýÈçÏÂ

SELECT car_id, MAX(speed), COUNT(speed)
FROM drive_data
WHERE speed > 90
GROUP BY TUMBLE (proctime, INTERVAL '30' SECOND), car_id

Flink SQL ÊÇÔÚ Flink Table API µÄ»ù´¡ÉÏ·¢Õ¹ÆðÀ´µÄ£¬ÓëÉÏÊöʾÀý¶ÔÓ¦µÄ Table API ʾÀýÈçÏÂ

table.where('speed > 90)
.window(Tumble over 30.second on 'proctime as 'w)
.groupBy('w, 'car_id)
.select('car_id, 'speed.max, 'speed.count)

ÉÏÊöʾÀýʹÓà Scala ´úÂ룬½áºÏÒþʽת»»ºÍÖÐ׺±íʾµÈ Scala Óï·¨£¬Table API ´úÂë¿´ÆðÀ´·Ç³£½Ó½ü SQL ±í´ï¡£

2. ¼Ü¹¹Ô­Àí

Àϰ汾µÄ Table API ͨ¹ýÀàËÆÁ´Ê½µ÷ÓõÄд·¨£¬¹¹ÔìÒ»¿Ã Table Operator Ê÷£¬²¢¶Ô¸÷¸öÊ÷½Úµã×ö´úÂëÉú³É£¬×ª»¯³É Flink µÍ²ã API µ÷ÓôúÂ룬¼´ DataStream/DataSet API¡£

´Ó 2016 Ä꿪ʼ£¬¿ªÔ´ÉçÇøÒѾ­ÓдóÁ¿ SQL-on-Hadoop µÄ³ÉÊì½â¾ö·½°¸£¬°üÀ¨ Apache Hive¡¢Apache Impala¡¢Apache Drill µÈµÈ£¬¶¼ÒÀÀµ Apache Calcite ÌṩµÄ SQL ½âÎöÓÅ»¯ÄÜÁ¦£¬Apache Calcite µ±Ê±ÒѾ­ÊÇÒ»¸ö·Ç³£Á÷ÐеÄÒµ½ç±ê×¼ SQL ½âÎöºÍÓÅ»¯¿ò¼Ü¡£ÓÚ´Ëͬʱ£¬Ëæ×ÅÔÚʵʱ·ÖÎöÁìÓòÖÐ Flink µÄÓ¦Óó¡¾°Ôö¼Ó£¬¶Ô SQL API µÄºôÉù½¥¸ß£¬ÓÚÊÇÉçÇø¿ªÊ¼ÔÚ Apache Calcite µÄ»ù´¡ÉϹ¹½¨Ð°汾µÄ Table API£¬²¢Ôö¼Ó SQL API Ö§³Ö¡£

а汾µÄ Table & SQL API ÔÚÔ­ÓÐµÄ Table API »ù´¡ÉÏ£¬ÓÉ Calcite Ìṩ SQL ½âÎöºÍÓÅ»¯ÄÜÁ¦£¬½« Table API µ÷ÓÃºÍ SQL ²éѯͳһת»»³É Calcite Âß¼­Ö´Ðмƻ®£¨Calcite RelNode Ê÷£©£¬²¢¶Ô´Ë½øÐÐÓÅ»¯ºÍ´úÂëÉú³É£¬×îÖÕͬÑùת»¯³É Flink DataStream/DataSet API µ÷ÓôúÂë¡£

3. DDL & DML

ÍêÕûµÄ SQL Óï·¨ÓÉ DDL£¨data definition language£©ºÍ DML£¨data manipulation language£©Á½²¿·Ö×é³É¡£Flink SQL Ŀǰֻ֧³Ö DML Óï·¨£¬¶ø°üº¬Êý¾ÝÁ÷¶¨ÒåµÄ DDL Óï·¨ÈÔÐèͨ¹ý´úÂëʵÏÖ¡£

¹úÄÚ¸÷´ó¹«ÓÐÔÆ³§ÉÌÖУ¬»ªÎªÔƺͰ¢ÀïÔÆ¶¼ÌṩÁË»ùÓÚ Flink SQL µÄʵʱÁ÷¼ÆËã·þÎñ£¬¸÷×Ô¶¨ÒåÁËÒ»Ì× DDL Óï·¨£¬Óï·¨´óͬСÒì¡£ÒÔ»ªÎªÔÆÎªÀý£¬Êý¾ÝÁ÷¶¨ÒåÒÔCREATE STREAMΪ¹Ø¼ü×Ö£¬¾ßÌåµÄ DDL д·¨Ê¾ÀýÈçÏÂ

CREATE SOURCE STREAM driver_behavior (car_id STRING, speed INT, collect_time LONG)
WITH (
type = "kafka",
kafka_bootstrap_servers = "10.10.10.10:3456,10.10.10.20:3456",
kafka_group_id = "group1",
kafka_topic = "topic1",
encode = "csv",
field_delimiter = ","
) TIMESTAMP BY collect_time.ROWTIME;
CREATE SINK STREAM over_speed_warning (message STRING)
WITH (
type = "smn",
region = "cn-north-1",
topic_urn = "urn:smn:cn-north-1:38834633fd6f4bae813031b5985dbdea:warning",
message_subject = "title",
message_column = "message"
);

DDL Öаüº¬ÊäÈëÊý¾ÝÁ÷ºÍÊä³öÊý¾ÝÁ÷¶¨Ò壬ÃèÊöʵʱÁ÷¼ÆËãµÄÊý¾ÝÉÏÏÂÓÎÉú̬×é¼þ£¬ÔÚÉÏÊöÀý×ÓÖУ¬ÊäÈëÁ÷£¨SOURCE STREAM£©ÀàÐÍÊÇ Kafka£¬WITH×Ó¾äÃèÊöÁË Kafka Ïû·ÑÕßÏà¹ØÅäÖá£Êä³öÁ÷£¨SINK STREAM£©ÀàÐÍÊÇ SMN£¬ÊÇ»ªÎªÔÆÏûϢ֪ͨ·þÎñµÄËõд£¬ÓÃÓÚ¶ÌÐźÍÓʼþ֪ͨ¡£

Êý¾Ý´Ó Kafka Á÷È룬Ïò SMN ·þÎñÁ÷³ö£¬¶øÖмäµÄÊý¾Ý´¦ÀíÂß¼­ÓÉ DML ʵÏÖ£¬¾ßÌåµÄ DML д·¨Ê¾ÀýÈçÏÂ

INSERT INTO over_speed_warning
SELECT "your car speed (" || CAST(speed as CHAR(20)) || ") exceeds the maximum speed."
FROM (
SELECT car_id, MAX(speed) AS speed, COUNT(speed) AS overspeed_count
FROM driver_behavior
WHERE speed > 90
GROUP BY TUMBLE (collect_time, INTERVAL '30' SECOND), car_id
)
WHERE overspeed_count >= 3;

ÒÔÉÏ DML Óï¾ä£¬ÃèÊöÁËÔÚ 30 ÃëÄÚ³µÁ¾ÀۼƳ¬ËÙÈý´Îʱ£¬Ïò×÷ΪÊä³öÁ÷µÄÏÂÓÎ SMN ×é¼þÊä³ö¸æ¾¯ÏûÏ¢¡£DML Óï¾äÖÐINSERT INTO¹Ø¼ü×Öºó½ô½Ó×ÅÊä³öÁ÷Ãû£¬¶øFROM¹Ø¼ü×Öºó½ô½Ó×ÅÊäÈëÁ÷Ãû£¬SELECT ×Ó¾ä±í´ïÊä³öµÄÄÚÈÝ£¬WHERE×Ó¾ä±í´ïÊä³öÐèÒªÂú×ãµÄ¹ýÂËÌõ¼þ¡£ÉÏÊöÀý×ÓʹÓõ½ÁË SQL ×Ó²éѯ£¬Íâ²ãFROMºó¸ú×ÅÒ»Õû¸öSELECT×Ӿ䣬ΪÁË·½±ãÀí½â£¬ÎÒÃÇÒ²¿ÉÒÔ°Ñ×Ó²éѯÓ﷨ת»¯³ÉµÈ¼ÛµÄÁÙʱÁ÷¶¨Òå±í´ï£¬ÔÚ»ªÎªÔÆÊµÊ±Á÷¼ÆËã·þÎñµÄ DDL Óï·¨ÖÐÖ§³ÖÁËÕâÖÖÌØÐÔ£¬ÓëÉÏÊö DML д·¨µÈ¼ÛµÄʾÀýÈçÏÂ

CREATE TEMP STREAM over_speed_info (car_id STRING, speed INT, overspeed_count INT);
INSERT INTO over_speed_info
SELECT car_id, MAX(speed) AS speed, COUNT(speed) AS overspeed_count
FROM driver_behavior
WHERE speed > 90
GROUP BY TUMBLE (collect_time, INTERVAL '30' SECOND), car_id;
INSERT INTO over_speed_warning
SELECT "your car speed (" || CAST(speed as CHAR(20)) || ") exceeds the maximum speed."
FROM over_speed_info
WHERE overspeed_count >= 3;

ͨ¹ýTEMP STREAM Óï·¨¶¨ÒåÁÙʱÁ÷£¬¿ÉÒÔ½«´øÓÐ×Ó²éѯµÄ SQL Ó﷨ƽÆÌ±í´ï£¬´®½ÓÊý¾ÝÁ÷Âß¼­£¬¸üÈÝÒ×Àí½â¡£

4. Óï·¨

Flink SQL µÄºËÐIJ¿·ÖÊÇ DML Óï·¨£¬»ù´¡µÄ DML Óï·¨°üº¬µÑ¿¨¶û»ý£¨µ¥±íÇé¿öÏÂÖ»ÓÐ Scan ²Ù×÷£©¡¢Ñ¡Ôñ£¨Filter£©ºÍͶӰ£¨Projection£©Èý¸öÊý¾Ý²Ù×÷²¿·Ö£¬ÈýÕß·Ö±ð¶ÔÓ¦FROM×Ӿ䡢WHERE ×Ó¾äºÍSELECT×Ӿ䣬ÕâÈý¸ö²¿·ÖµÄ˳Ðò´ú±íÁË DML Óï¾äµÄÂß¼­Ö´ÐÐ˳Ðò¡£½ÏΪ½ø½×µÄÓï·¨°üº¬¾ÛºÏ¡¢´°¿ÚºÍÁ¬½Ó£¨JOIN£©µÈ³£ÓÃÓï·¨£¬ÒÔ¼°ÅÅÐò¡¢ÏÞÖÆºÍ¼¯ºÏµÈ·Ç³£ÓÃÓï·¨¡£Ï±í¼òµ¥Áо٠Flink SQL »ù´¡ºÍ³£ÓõĽø½× DML Óï·¨¾äʽ²¢¼ÓÒÔ˵Ã÷£¬ÆäËûÓï·¨ÔªËØºÍÄÚ½¨º¯ÊýµÈÏêϸÄÚÈÝ£¬¿É²Î¿¼Flink SQLÎĵµ

»ù´¡Óï·¨

¾ÛºÏÓï·¨

Á¬½ÓÓï·¨

5. ³¡¾°

Ŀǰ Flink SQL µÄÓ¦Óù㷺£¬¿ÉÒÔÓÃÔÚ IoT¡¢³µÁªÍø¡¢Öǻ۳ÇÊС¢ÈÕÖ¾·ÖÎö¡¢ETL¡¢ÊµÊ±´óÆÁ¡¢ÊµÊ±¸æ¾¯¡¢ÊµÊ±ÍƼöµÈµÈ¡£ÔÚ IoT ºÍ³µÁªÍøµÈÐÐÒµ¶Ô Flink Óиü¸ßµÄÒªÇó£¬Èçʱ¼äµØÀíº¯Êý¡¢CEP SQL¡¢StreamingML µÈ£¬¸÷¸öÔÆ³§É̶¼Óв»Í¬³Ì¶ÈµÄʵÏÖ£¬»ªÎªÔÆÊµÊ±Á÷¼ÆËãÔÚÕâ·½ÃæÌØÐÔ×îΪ·á¸»¡£

   
5038 ´Îä¯ÀÀ       29
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]
 
×îÐÂÎÄÕÂ
´óÊý¾Ýƽ̨ϵÄÊý¾ÝÖÎÀí
ÈçºÎÉè¼ÆÊµÊ±Êý¾Ýƽ̨£¨¼¼Êõƪ£©
´óÊý¾Ý×ʲú¹ÜÀí×ÜÌå¿ò¼Ü¸ÅÊö
Kafka¼Ü¹¹ºÍÔ­Àí
ELK¶àÖּܹ¹¼°ÓÅÁÓ
×îпγÌ
´óÊý¾Ýƽ̨´î½¨Óë¸ßÐÔÄܼÆËã
´óÊý¾Ýƽ̨¼Ü¹¹ÓëÓ¦ÓÃʵս
´óÊý¾ÝϵͳÔËά
´óÊý¾Ý·ÖÎöÓë¹ÜÀí
Python¼°Êý¾Ý·ÖÎö
³É¹¦°¸Àý
ijͨÐÅÉ豸ÆóÒµ PythonÊý¾Ý·ÖÎöÓëÍÚ¾ò
Ä³ÒøÐÐ È˹¤ÖÇÄÜ+Python+´óÊý¾Ý
±±¾© Python¼°Êý¾Ý·ÖÎö
ÉñÁúÆû³µ ´óÊý¾Ý¼¼Êõƽ̨-Hadoop
ÖйúµçÐÅ ´óÊý¾Ýʱ´úÓëÏÖ´úÆóÒµµÄÊý¾Ý»¯ÔËӪʵ¼ù