Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
¸ßÐÔÄÜFlink SQLÓÅ»¯¼¼ÇÉ
 
  3736  次浏览      27
 2021-3-1
 
±à¼­ÍƼö:
±¾ÎÄΪÄú½éÉÜÌáÉýÐÔÄܵÄFlink SQLÍÆ¼öд·¨¡¢ÅäÖü°º¯Êý£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×ÔÓÚ°¢ÀïÔÆ£¬ÓÉAlice±à¼­¡¢ÍƼö¡£

Group AggregateÓÅ»¯¼¼ÇÉ

¿ªÆôMicroBatch»òMiniBatch£¨ÌáÉýÍÌÍ£©

MicroBatchºÍMiniBatch¶¼ÊÇ΢Åú´¦Àí£¬Ö»ÊÇ΢ÅúµÄ´¥·¢»úÖÆÂÔÓв»Í¬¡£Ô­ÀíͬÑùÊÇ»º´æÒ»¶¨µÄÊý¾ÝºóÔÙ´¥·¢´¦Àí£¬ÒÔ¼õÉÙ¶ÔStateµÄ·ÃÎÊ£¬´Ó¶øÌáÉýÍÌͲ¢¼õÉÙÊý¾ÝµÄÊä³öÁ¿¡£

MiniBatchÖ÷ÒªÒÀ¿¿ÔÚÿ¸öTaskÉÏ×¢²áµÄTimerÏß³ÌÀ´´¥·¢Î¢Åú£¬ÐèÒªÏûºÄÒ»¶¨µÄÏ̵߳÷¶ÈÐÔÄÜ¡£MicroBatchÊÇMiniBatchµÄÉý¼¶°æ£¬Ö÷Òª»ùÓÚʼþÏûÏ¢À´´¥·¢Î¢Åú£¬Ê¼þÏûÏ¢»á°´ÄúÖ¸¶¨µÄʱ¼ä¼ä¸ôÔÚÔ´Í·²åÈë¡£MicroBatchÔÚÔªËØÐòÁл¯Ð§ÂÊ¡¢·´Ñ¹±íÏÖ¡¢ÍÌͺÍÑÓ³ÙÐÔÄÜÉ϶¼ÒªÓÅÓÚMiniBatch¡£

ÊÊÓó¡¾°

΢Åú´¦Àíͨ¹ýÔö¼ÓÑÓ³Ù»»È¡¸ßÍÌÍ£¬Èç¹ûÄúÓ㬵ÍÑÓ³ÙµÄÒªÇ󣬲»½¨Ò鿪Æô΢Åú´¦Àí¡£Í¨³£¶ÔÓھۺϵij¡¾°£¬Î¢Åú´¦Àí¿ÉÒÔÏÔÖøµÄÌáÉýϵͳÐÔÄÜ£¬½¨Ò鿪Æô¡£

˵Ã÷ MicroBatchģʽҲÄܽâ¾öÁ½¼¶¾ÛºÏÊý¾Ý¶¶¶¯ÎÊÌâ¡£

¿ªÆô·½Ê½

MicroBatchºÍMiniBatchĬÈϹرգ¬¿ªÆô·½Ê½ÈçÏ¡£

# 3.2¼°ÒÔÉϰ汾¿ªÆôWindow miniBatch·½·¨£¨3.2¼°ÒÔÉϰ汾ĬÈϲ»¿ªÆôWindow miniBatch£©¡£
sql.exec.mini-batch.window.enabled=true
# ÅúÁ¿Êä³öµÄ¼ä¸ôʱ¼ä£¬ÔÚʹÓÃmicroBatch²ßÂÔʱ£¬ÐèÒªÔö¼Ó¸ÃÅäÖã¬ÇÒ½¨ÒéºÍblink.miniBatch.allowLatencyMs±£³ÖÒ»Ö¡£
blink.microBatch.allowLatencyMs=5000
# ÔÚʹÓÃmicroBatchʱ£¬ÐèÒª±£ÁôÒÔÏÂÁ½¸öminiBatchÅäÖá£
blink.miniBatch.allowLatencyMs=5000
# ·ÀÖ¹OOMÉèÖÃÿ¸öÅú´Î×î¶à»º´æÊý¾ÝµÄÌõÊý¡£
blink.miniBatch.size=20000

¿ªÆôLocalGlobal£¨½â¾ö³£¼ûÊý¾ÝÈȵãÎÊÌ⣩

LocalGlobalÓÅ»¯½«Ô­ÏȵÄAggregate·Ö³ÉLocal+GlobalÁ½½×¶Î¾ÛºÏ£¬¼´MapReduceÄ£ÐÍÖеÄCombine+Reduce´¦Àíģʽ¡£µÚÒ»½×¶ÎÔÚÉÏÓνڵ㱾µØÔÜÒ»ÅúÊý¾Ý½øÐоۺϣ¨localAgg£©£¬²¢Êä³öÕâ´Î΢ÅúµÄÔöÁ¿Öµ£¨Accumulator£©¡£µÚ¶þ½×¶ÎÔÙ½«ÊÕµ½µÄAccumulatorºÏ²¢£¨Merge£©£¬µÃµ½×îÖյĽá¹û£¨GlobalAgg£©¡£

LocalGlobal±¾ÖÊÉÏÄܹ»¿¿LocalAggµÄ¾ÛºÏɸ³ý²¿·ÖÇãбÊý¾Ý£¬´Ó¶ø½µµÍGlobalAggµÄÈȵ㣬ÌáÉýÐÔÄÜ¡£Äú¿ÉÒÔ½áºÏÏÂͼÀí½âLocalGlobalÈçºÎ½â¾öÊý¾ÝÇãбµÄÎÊÌâ¡£

ÊÊÓó¡¾°

LocalGlobalÊÊÓÃÓÚÌáÉýÈçSUM¡¢COUNT¡¢MAX¡¢MINºÍAVGµÈÆÕͨ¾ÛºÏµÄÐÔÄÜ£¬ÒÔ¼°½â¾öÕâЩ³¡¾°ÏµÄÊý¾ÝÈȵãÎÊÌâ¡£

˵Ã÷ ¿ªÆôLocalGlobalÐèÒªUDAFʵÏÖMerge·½·¨¡£

¿ªÆô·½Ê½

ʵʱ¼ÆËã2.0°æ±¾¿ªÊ¼£¬LocalGlobalÊÇĬÈÏ¿ªÆôµÄ£¬²ÎÊýÊÇblink.localAgg.enabled=true £¬µ«ÊÇÐèÒªÔÚmicrobatch»òminibatch¿ªÆôµÄǰÌáϲÅÄÜÉúЧ¡£

ÅжÏÊÇ·ñÉúЧ

¹Û²ì×îÖÕÉú³ÉµÄÍØÆËͼµÄ½ÚµãÃû×ÖÖÐÊÇ·ñ°üº¬GlobalGroupAggregate»òLocalGroupAggregate¡£

¿ªÆôPartialFinal£¨½â¾öCOUNT DISTINCTÈȵãÎÊÌ⣩

LocalGlobalÓÅ»¯Õë¶ÔÆÕͨ¾ÛºÏ£¨ÀýÈçSUM¡¢COUNT¡¢MAX¡¢MINºÍAVG£©ÓнϺõÄЧ¹û£¬¶ÔÓÚCOUNT DISTINCTÊÕЧ²»Ã÷ÏÔ£¬ÒòΪCOUNT DISTINCTÔÚLocal¾ÛºÏʱ£¬¶ÔÓÚDISTINCT KEYµÄÈ¥ÖØÂʲ»¸ß£¬µ¼ÖÂÔÚGlobal½ÚµãÈÔÈ»´æÔÚÈȵ㡣

֮ǰ£¬ÎªÁ˽â¾öCOUNT DISTINCTµÄÈȵãÎÊÌ⣬ͨ³£ÐèÒªÊÖ¶¯¸ÄдΪÁ½²ã¾ÛºÏ£¨Ôö¼Ó°´Distinct KeyȡģµÄ´òÉ¢²ã£©¡£×Ô2.2.0°æ±¾¿ªÊ¼£¬ÊµÊ±¼ÆËãÌṩÁËCOUNT DISTINCT×Ô¶¯´òÉ¢£¬¼´PartialFinalÓÅ»¯£¬ÄúÎÞÐè×ÔÐиÄдΪÁ½²ã¾ÛºÏ¡£PartialFinalºÍLocalGlobalµÄÔ­Àí¶Ô±È²Î¼ûÏÂͼ¡£

ÊÊÓó¡¾°

ʹÓÃCOUNT DISTINCT£¬µ«ÎÞ·¨Âú×ã¾ÛºÏ½ÚµãÐÔÄÜÒªÇó¡£

˵Ã÷

²»ÄÜÔÚ°üº¬UDAFµÄFlink SQLÖÐʹÓÃPartialFinalÓÅ»¯·½·¨¡£

Êý¾ÝÁ¿²»´óµÄÇé¿öÏ£¬²»½¨ÒéʹÓÃPartialFinalÓÅ»¯·½·¨¡£PartialFinalÓÅ»¯»á×Ô¶¯´òÉ¢³ÉÁ½²ã¾ÛºÏ£¬ÒýÈë¶îÍâµÄÍøÂçShuffle£¬ÔÚÊý¾ÝÁ¿²»´óµÄÇé¿öÏ£¬ÀË·Ñ×ÊÔ´¡£

¿ªÆô·½Ê½

ĬÈϲ»¿ªÆô£¬Ê¹ÓòÎÊýÏÔʽ¿ªÆôblink.partialAgg.enabled=true¡£

ÅжÏÊÇ·ñÉúЧ

¹Û²ì×îÖÕÉú³ÉµÄÍØÆËͼµÄ½ÚµãÃûÖÐÊÇ·ñ°üº¬Expand½Úµã£¬»òÕßÔ­À´Ò»²ãµÄ¾ÛºÏ±ä³ÉÁËÁ½²ãµÄ¾ÛºÏ¡£

¸ÄдΪAGG WITH FILTERÓï·¨£¨ÌáÉý´óÁ¿COUNT DISTINCT³¡¾°ÐÔÄÜ£©

˵Ã÷ ½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³ÖAGG WITH FILTERÓï·¨¡£

ͳ¼Æ×÷ÒµÐèÒª¼ÆËã¸÷ÖÖά¶ÈµÄUV£¬ÀýÈçÈ«ÍøUV¡¢À´×ÔÊÖ»ú¿Í»§¶ËµÄUV¡¢À´×ÔPCµÄUVµÈµÈ¡£½¨ÒéʹÓñê×¼µÄAGG WITH FILTERÓï·¨À´´úÌæCASE WHENʵÏÖ¶àά¶Èͳ¼ÆµÄ¹¦ÄÜ¡£ÊµÊ±¼ÆËãĿǰµÄSQLÓÅ»¯Æ÷ÄÜ·ÖÎö³öFilter²ÎÊý£¬´Ó¶øÍ¬Ò»¸ö×Ö¶ÎÉϼÆË㲻ͬÌõ¼þϵÄCOUNT DISTINCTÄܹ²ÏíState£¬¼õÉÙ¶ÔStateµÄ¶Áд²Ù×÷¡£ÐÔÄܲâÊÔÖУ¬Ê¹ÓÃAGG WITH FILTERÓï·¨À´´úÌæCASE WHENÄܹ»Ê¹ÐÔÄÜÌáÉý1±¶¡£

ÊÊÓó¡¾°

½¨ÒéÄú½«AGG WITH CASE WHENµÄÓï·¨¶¼Ìæ»»³ÉAGG WITH FILTERµÄÓï·¨£¬ÓÈÆäÊǶÔͬһ¸ö×Ö¶ÎÉϼÆË㲻ͬÌõ¼þϵÄCOUNT DISTINCT½á¹û£¬ÐÔÄÜÌáÉýºÜ´ó¡£

ԭʼд·¨

COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2

ÓÅ»¯Ð´·¨

COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2

TopNÓÅ»¯¼¼ÇÉ

TopNËã·¨

µ±TopNµÄÊäÈëÊǷǸüÐÂÁ÷£¨ÀýÈçSource£©£¬TopNÖ»ÓÐÒ»ÖÖËã·¨AppendRank¡£µ±TopNµÄÊäÈëÊǸüÐÂÁ÷ʱ£¨ÀýÈç¾­¹ýÁËAGG/JOIN¼ÆË㣩£¬TopNÓÐ3ÖÖËã·¨£¬ÐÔÄܴӸߵ½µÍ·Ö±ðÊÇ£ºUpdateFastRank ¡¢ UnaryUpdateRankºÍRetractRank¡£Ëã·¨Ãû×Ö»áÏÔʾÔÚÍØÆËͼµÄ½ÚµãÃû×ÖÉÏ¡£

UpdateFastRank £º×îÓÅËã·¨¡£

ÐèÒª¾ß±¸2¸öÌõ¼þ£º

ÊäÈëÁ÷ÓÐPK£¨Primary Key£©ÐÅÏ¢£¬ÀýÈçORDER BY AVG¡£

ÅÅÐò×ֶεĸüÐÂÊǵ¥µ÷µÄ£¬ÇÒµ¥µ÷·½ÏòÓëÅÅÐò·½ÏòÏà·´¡£ÀýÈ磬ORDER BY COUNT/COUNT_DISTINCT/SUM£¨ÕýÊý£©DESC£¨½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³Ö£©¡£

Èç¹ûÄúÒª»ñÈ¡µ½ÓÅ»¯Plan£¬ÔòÄúÐèÒªÔÚʹÓÃORDER BY SUM DESCʱ£¬Ìí¼ÓSUMΪÕýÊýµÄ¹ýÂËÌõ¼þ£¬È·±£total_feeΪÕýÊý¡£

insert
into print_test
SELECT
cate_id,
seller_id,
stat_date,
pay_ord_amt --²»Êä³örownum×ֶΣ¬ÄܼõС½á¹û±íµÄÊä³öÁ¿¡£
FROM (
SELECT
*,
ROW_NUMBER () OVER (
PARTITION BY cate_id,
stat_date --×¢ÒâÒªÓÐʱ¼ä×ֶΣ¬·ñÔòstate¹ýÆÚ»áµ¼ÖÂÊý¾Ý´íÂÒ¡£
ORDER
BY pay_ord_amt DESC
) as rownum --¸ù¾ÝÉÏÓÎsum½á¹ûÅÅÐò¡£
FROM (
SELECT
cate_id,
seller_id,
stat_date,
--ÖØµã¡£ÉùÃ÷SumµÄ²ÎÊý¶¼ÊÇÕýÊý£¬ËùÒÔSumµÄ½á¹ûÊǵ¥µ÷µÝÔöµÄ£¬Òò´ËTopNÄÜʹÓÃÓÅ»¯Ëã·¨£¬Ö»»ñȡǰ100¸öÊý¾Ý¡£
sum (total_fee) filter (
where
total_fee >= 0
) as pay_ord_amt
FROM
random_test
WHERE
total_fee >= 0
GROUP
BY cate_name,
seller_id,
stat_date
) a
WHERE
rownum <= 100
);

UnaryUpdateRank£º½ö´ÎÓÚUpdateFastRankµÄËã·¨¡£ÐèÒª¾ß±¸1¸öÌõ¼þ£ºÊäÈëÁ÷ÖдæÔÚPKÐÅÏ¢¡£

RetractRank£ºÆÕͨËã·¨£¬ÐÔÄÜ×î²î£¬²»½¨ÒéÔÚÉú²ú»·¾³Ê¹ÓøÃËã·¨¡£Çë¼ì²éÊäÈëÁ÷ÊÇ·ñ´æÔÚPKÐÅÏ¢£¬Èç¹û´æÔÚ£¬Ôò¿É½øÐÐUnaryUpdateRank»òUpdateFastRankÓÅ»¯¡£

TopNÓÅ»¯·½·¨

ÎÞÅÅÃûÓÅ»¯

TopNµÄÊä³ö½á¹ûÎÞÐèÒªÏÔʾrownumÖµ£¬½öÐèÔÚ×îÖÕǰ¶ËÏÔʽʱ½øÐÐ1´ÎÅÅÐò£¬¼«´óµØ¼õÉÙÊäÈë½á¹û±íµÄÊý¾ÝÁ¿¡£ÎÞÅÅÃûÓÅ»¯·½·¨ÏêÇéÇë²Î¼ûTopNÓï¾ä¡£

Ôö¼ÓTopNµÄCache´óС

TopNΪÁËÌáÉýÐÔÄÜÓÐÒ»¸öState Cache²ã£¬Cache²ãÄÜÌáÉý¶ÔStateµÄ·ÃÎÊЧÂÊ¡£TopNµÄCacheÃüÖÐÂʵļÆË㹫ʽΪ¡£

cache_hit = cache_size*parallelism/top_n/partition_key_num

ÀýÈ磬Top100ÅäÖûº´æ10000Ìõ£¬²¢·¢50£¬µ±ÄúµÄPatitionByµÄkeyά¶È½Ï´óʱ£¬ÀýÈç10Íò¼¶±ðʱ£¬CacheÃüÖÐÂÊÖ»ÓÐ10000*50/100/100000=5%£¬ÃüÖÐÂÊ»áºÜµÍ£¬µ¼Ö´óÁ¿µÄÇëÇó¶¼»á»÷ÖÐState£¨´ÅÅÌ£©£¬ÐÔÄÜ»á´ó·ùϽµ¡£Òò´Ëµ±PartitionKeyά¶ÈÌØ±ð´óʱ£¬¿ÉÒÔÊʵ±¼Ó´óTopNµÄCacheS ize£¬Ïà¶ÔÓ¦µÄÒ²½¨ÒéÊʵ±¼Ó´óTopN½ÚµãµÄHeap Memory£¨Çë²Î¼ûÊÖ¶¯ÅäÖõ÷ÓÅ£©¡£

##ĬÈÏ10000Ìõ£¬µ÷ÕûTopN cahceµ½20Íò£¬ÄÇôÀíÂÛÃüÖÐÂÊÄÜ´ï200000*50/100/100000 = 100%¡£
blink.topn.cache.size=200000

PartitionByµÄ×Ö¶ÎÖÐÒªÓÐʱ¼äÀà×Ö¶Î

ÀýÈçÿÌìµÄÅÅÃû£¬Òª´øÉÏDay×ֶΡ£·ñÔòTopNµÄ½á¹ûµ½×îºó»áÓÉÓÚState ttlÓдíÂÒ¡£

¸ßÐ§È¥ÖØ·½°¸

˵Ã÷ ½öBlink 3.2.1°æ±¾Ö§³Ö¸ßÐ§È¥ÖØ·½°¸¡£

ʵʱ¼ÆËãµÄÔ´Êý¾ÝÔÚ²¿·Ö³¡¾°ÖдæÔÚÖØ¸´Êý¾Ý£¬È¥ÖسÉΪÁËÓû§¾­³£·´À¡µÄÐèÇó¡£ÊµÊ±¼ÆËãÓб£ÁôµÚÒ»Ìõ£¨Deduplicate Keep FirstRow£©ºÍ±£Áô×îºóÒ»Ìõ£¨Deduplicate Keep LastRow£©2ÖÖÈ¥ÖØ·½°¸¡£

Óï·¨

ÓÉÓÚSQLÉÏûÓÐÖ±½ÓÖ§³ÖÈ¥ÖØµÄÓï·¨£¬»¹ÒªÁé»îµÄ±£ÁôµÚÒ»Ìõ»ò±£Áô×îºóÒ»Ìõ¡£Òò´ËÎÒÃÇʹÓÃÁËSQLµÄROW_NUMBER OVER WINDOW¹¦ÄÜÀ´ÊµÏÖÈ¥ÖØÓï·¨¡£È¥Öر¾ÖÊÉÏÊÇÒ»ÖÖÌØÊâµÄTopN¡£

SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
ORDER BY timeAttributeCol [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1

ʹÓÃROW_NUMBER() ´°¿Úº¯ÊýÀ´¶ÔÊý¾Ý¸ù¾Ýʱ¼äÊôÐÔÁнøÐÐÅÅÐò²¢±êÉÏÅÅÃû¡£

˵Ã÷

µ±ÅÅÐò×Ö¶ÎÊÇProctimeÁÐʱ£¬Flink¾Í»á°´ÕÕϵͳʱ¼äÈ¥ÖØ£¬Æäÿ´ÎÔËÐеĽá¹ûÊDz»È·¶¨µÄ¡£

µ±ÅÅÐò×Ö¶ÎÊÇRowtimeÁÐʱ£¬Flink¾Í»á°´ÕÕÒµÎñʱ¼äÈ¥ÖØ£¬Æäÿ´ÎÔËÐеĽá¹ûÊÇÈ·¶¨µÄ¡£

¶ÔÅÅÃû½øÐйýÂË£¬Ö»È¡µÚÒ»Ìõ£¬´ïµ½ÁËÈ¥ÖØµÄÄ¿µÄ¡£

˵Ã÷ ÅÅÐò·½Ïò¿ÉÒÔÊǰ´ÕÕʱ¼äÁеÄ˳Ðò£¬Ò²¿ÉÒÔÊǵ¹Ðò£º

Deduplicate Keep FirstRow£ºË³Ðò²¢È¡µÚÒ»ÌõÐÐÊý¾Ý¡£

Deduplicate Keep LastRow£ºµ¹Ðò²¢È¡µÚÒ»ÌõÐÐÊý¾Ý¡£

Deduplicate Keep FirstRow

±£ÁôÊ×ÐеÄÈ¥ÖØ²ßÂÔ£º±£ÁôKEYϵÚÒ»Ìõ³öÏÖµÄÊý¾Ý£¬Ö®ºó³öÏÖ¸ÃKEYϵÄÊý¾Ý»á±»¶ªÆúµô¡£ÒòΪSTATEÖÐÖ»´æ´¢ÁËKEYÊý¾Ý£¬ËùÒÔÐÔÄܽÏÓÅ£¬Ê¾ÀýÈçÏ¡£

SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1

˵Ã÷ ÒÔÉÏʾÀýÊǽ«T±í°´ÕÕb×ֶνøÐÐÈ¥ÖØ£¬²¢°´ÕÕϵͳʱ¼ä±£ÁôµÚÒ»ÌõÊý¾Ý¡£ProctimeÔÚÕâÀïÊÇÔ´±íTÖеÄÒ»¸ö¾ßÓÐProcessing TimeÊôÐÔµÄ×ֶΡ£Èç¹ûÄú°´ÕÕϵͳʱ¼äÈ¥ÖØ£¬Ò²¿ÉÒÔ½«Proctime×ֶμò»¯PROCTIME()º¯Êýµ÷Ó㬿ÉÒÔÊ¡ÂÔProctime×ֶεÄÉùÃ÷¡£

Deduplicate Keep LastRow

±£ÁôÄ©ÐеÄÈ¥ÖØ²ßÂÔ£º±£ÁôKEYÏÂ×îºóÒ»Ìõ³öÏÖµÄÊý¾Ý¡£±£ÁôÄ©ÐеÄÈ¥ÖØ²ßÂÔÐÔÄÜÂÔÓÅÓÚLAST_VALUEº¯Êý£¬Ê¾ÀýÈçÏ¡£

SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
FROM T
)
WHERE rowNum = 1

˵Ã÷ ÒÔÉÏʾÀýÊǽ«T±í°´ÕÕbºÍd×ֶνøÐÐÈ¥ÖØ£¬²¢°´ÕÕÒµÎñʱ¼ä±£Áô×îºóÒ»ÌõÊý¾Ý¡£RowtimeÔÚÕâÀïÊÇÔ´±íTÖеÄÒ»¸ö¾ßÓÐEvent TimeÊôÐÔµÄ×ֶΡ£

¸ßЧµÄÄÚÖú¯Êý

ʹÓÃÄÚÖú¯ÊýÌæ»»×Ô¶¨Ò庯Êý

ʵʱ¼ÆËãµÄÄÚÖú¯ÊýÔÚ³ÖÐøµÄÓÅ»¯µ±ÖУ¬Ç뾡Á¿Ê¹ÓÃÄÚ²¿º¯ÊýÌæ»»×Ô¶¨Ò庯Êý¡£ÊµÊ±¼ÆËã2.0°æ±¾¶ÔÄÚÖú¯ÊýÖ÷Òª½øÐÐÁËÈçÏÂÓÅ»¯£º

ÓÅ»¯Êý¾ÝÐòÁл¯ºÍ·´ÐòÁл¯µÄºÄʱ¡£

ÐÂÔöÖ±½Ó¶Ô×Ö½Úµ¥Î»½øÐвÙ×÷µÄ¹¦ÄÜ¡£

KEY VALUEº¯ÊýʹÓõ¥×Ö·ûµÄ·Ö¸ô·û

KEY VALUE µÄÇ©Ãû£ºKEYVALUE(content, keyValueSplit, keySplit, keyName)£¬µ±keyValueSplitºÍKeySplitÊǵ¥×Ö·û£¨ÀýÈ磬ðºÅ£¨:£©¡¢¶ººÅ£¨,£©£©Ê±£¬ÏµÍ³»áʹÓÃÓÅ»¯Ëã·¨£¬ÔÚ¶þ½øÖÆÊý¾ÝÉÏÖ±½ÓѰÕÒËùÐèµÄkeyName µÄÖµ£¬¶ø²»»á½«Õû¸öcontent×öÇз֡£ÐÔÄÜÔ¼ÌáÉý30%¡£

¶àKEY VALUE³¡¾°Ê¹ÓÃMULTI_KEYVALUE

˵Ã÷ ½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³ÖMULTI_KEYVALUE¡£

ÔÚQueryÖжÔͬһ¸öContent½øÐдóÁ¿KEY VALUEµÄ²Ù×÷£¬»á¶ÔÐÔÄܲúÉúºÜ´óÓ°Ïì¡£ÀýÈçContentÖаüº¬10¸öKey-Value¶Ô£¬Èç¹ûÄúÏ£Íû°Ñ10¸öValueµÄÖµ¶¼È¡³öÀ´×÷Ϊ×ֶΣ¬Äú¾ÍÐèҪд10¸öKEY VALUEº¯Êý£¬Ôòϵͳ¾Í»á¶ÔContent½øÐÐ10´Î½âÎö£¬µ¼ÖÂÐÔÄܽµµÍ¡£

ÔÚÕâÖÖÇé¿öÏ£¬½¨ÒéÄúʹÓÃMULTI_KEYVALUE±íÖµº¯Êý£¬¸Ãº¯Êý¿ÉÒÔ¶ÔContentÖ»½øÐÐÒ»´ÎSplit½âÎö£¬ÐÔÄÜÔ¼ÄÜÌáÉý50%~100%¡£

LIKE²Ù×÷×¢ÒâÊÂÏî

Èç¹ûÐèÒª½øÐÐStartWith²Ù×÷£¬Ê¹ÓÃLIKE 'xxx%'¡£

Èç¹ûÐèÒª½øÐÐEndWith²Ù×÷£¬Ê¹ÓÃLIKE '%xxx'¡£

Èç¹ûÐèÒª½øÐÐContains²Ù×÷£¬Ê¹ÓÃLIKE '%xxx%'¡£

Èç¹ûÐèÒª½øÐÐEquals²Ù×÷£¬Ê¹ÓÃLIKE 'xxx'£¬µÈ¼ÛÓÚstr = 'xxx'¡£

Èç¹ûÐèҪƥÅä _ ×Ö·û£¬Çë×¢ÒâÒªÍê³ÉתÒåLIKE '%seller/id%' ESCAPE '/'¡£_ÔÚSQLÖÐÊôÓÚµ¥×Ö·ûͨÅä·û£¬ÄÜÆ¥ÅäÈκÎ×Ö·û¡£Èç¹ûÉùÃ÷Ϊ LIKE '%seller_id%'£¬Ôò²»µ¥»áÆ¥Åäseller_id»¹»áÆ¥Åäseller#id¡¢sellerxid»òseller1id µÈ£¬µ¼Ö½á¹û´íÎó¡£

É÷ÓÃÕýÔòº¯Êý£¨REGEXP£©

ÕýÔò±í´ïʽÊǷdz£ºÄʱµÄ²Ù×÷£¬¶Ô±È¼Ó¼õ³Ë³ýͨ³£Óаٱ¶µÄÐÔÄÜ¿ªÏú£¬¶øÇÒÕýÔò±í´ïʽÔÚijЩ¼«¶ËÇé¿öÏ¿ÉÄÜ»á½øÈëÎÞÏÞÑ­»·£¬µ¼ÖÂ×÷Òµ×èÈû¡£½¨ÒéʹÓÃLIKE¡£ÕýÔòº¯Êý°üÀ¨£º

REGEXP

REGEXP_EXTRACT

REGEXP_REPLACE

ÍøÂç´«ÊäµÄÓÅ»¯

Ŀǰ³£¼ûµÄPartitioner²ßÂÔ°üÀ¨£º

KeyGroup/Hash£º¸ù¾ÝÖ¸¶¨µÄKey·ÖÅä¡£

Rebalance£ºÂÖѯ·ÖÅ䏸¸÷¸öChannel¡£

Dynamic-Rebalance£º¸ù¾ÝÏÂÓθºÔØÇé¿ö¶¯Ì¬Ñ¡Ôñ·ÖÅ䏸¸ºÔؽϵ͵ÄChannel¡£

Forward£ºÎ´ChainÒ»Æðʱ£¬Í¬Rebalance¡£ChainÒ»ÆðʱÊÇÒ»¶ÔÒ»·ÖÅä¡£

Rescale£ºÉÏÓÎÓëÏÂÓÎÒ»¶Ô¶à»ò¶à¶ÔÒ»¡£

ʹÓÃDynamic-RebalanceÌæ´úRebalance

Dynamic-Rebalance¿ÉÒÔ¸ù¾Ýµ±Ç°¸÷SubpartitionÖжѻýµÄBufferµÄÊýÁ¿£¬Ñ¡Ôñ¸ºÔؽÏÇáµÄSubpartition½øÐÐдÈ룬´Ó¶øÊµÏÖ¶¯Ì¬µÄ¸ºÔؾùºâ¡£Ïà±ÈÓÚ¾²Ì¬µÄRebalance²ßÂÔ£¬ÔÚÏÂÓθ÷ÈÎÎñ¼ÆËãÄÜÁ¦²»¾ùºâʱ£¬¿ÉÒÔʹ¸÷ÈÎÎñÏà¶Ô¸ºÔظü¼Ó¾ùºâ£¬´Ó¶øÌá¸ßÕû¸ö×÷ÒµµÄÐÔÄÜ¡£ÀýÈ磬ÔÚʹÓÃRebalanceʱ£¬·¢ÏÖÏÂÓθ÷¸ö²¢·¢¸ºÔز»¾ùºâʱ£¬¿ÉÒÔ¿¼ÂÇʹÓÃDynamic-Rebalance¡£²ÎÊý£ºtask.dynamic.rebalance.enabled=true£¬ ĬÈϹرա£

ʹÓÃRescaleÌæ´úRebalance

˵Ã÷ ½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³ÖRescale¡£

ÀýÈ磬ÉÏÓÎÊÇ5¸ö²¢·¢£¬ÏÂÓÎÊÇ10¸ö²¢·¢¡£µ±Ê¹ÓÃRebalanceʱ£¬ÉÏÓÎÿ¸ö²¢·¢»áÂÖѯ·¢¸øÏÂÓÎ10¸ö²¢·¢¡£µ±Ê¹ÓÃRescaleʱ£¬ÉÏÓÎÿ¸ö²¢·¢Ö»ÐèÂÖѯ·¢¸øÏÂÓÎ2¸ö²¢·¢¡£ÒòΪChannel¸öÊý±äÉÙÁË£¬SubpartitionµÄBufferÌî³äËÙ¶ÈÄܱä¿ì£¬ÄÜÌá¸ßÍøÂçЧÂÊ¡£µ±ÉÏÓεÄÊý¾Ý±È½Ï¾ùÔÈʱ£¬ÇÒÉÏÏÂÓεIJ¢·¢Êý³É±ÈÀýʱ£¬¿ÉÒÔʹÓÃRescaleÌæ»»Rebalance¡£²ÎÊý£ºenable.rescale.shuffling=true£¬Ä¬ÈϹرա£

ÍÆ¼öµÄÓÅ»¯ÅäÖ÷½°¸

×ÛÉÏËùÊö£¬×÷Òµ½¨ÒéʹÓÃÈçϵÄÍÆ¼öÅäÖá£

# EXACTLY_ONCEÓïÒå¡£
blink.checkpoint.mode=EXACTLY_ONCE
# checkpoint¼ä¸ôʱ¼ä£¬µ¥Î»ºÁÃë¡£
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# 2.xʹÓÃniagara×÷Ϊstatebackend£¬ÒÔ¼°É趨stateÊý¾ÝÉúÃüÖÜÆÚ£¬
µ¥Î»ºÁÃë¡£
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# 2.x¿ªÆô5ÃëµÄmicrobatch¡£
blink.microBatch.allowLatencyMs=5000
# Õû¸öJobÔÊÐíµÄÑÓ³Ù¡£
blink.miniBatch.allowLatencyMs=5000
# µ¥¸öbatchµÄsize¡£
blink.miniBatch.size=20000
# local ÓÅ»¯£¬2.xĬÈÏÒѾ­¿ªÆô£¬1.6.4ÐèÊÖ¶¯¿ªÆô¡£
blink.localAgg.enabled=true
# 2.x¿ªÆôPartialFinaÓÅ»¯£¬½â¾öCOUNT DISTINCTÈȵ㡣
blink.partialAgg.enabled=true
# union allÓÅ»¯¡£
blink.forbid.unionall.as.breakpoint.in.subsection.
optimization=true
# object reuseÓÅ»¯£¬Ä¬ÈÏÒÑ¿ªÆô¡£
#blink.object.reuse=true
# GCÓÅ»¯£¨SLS×öÔ´±í²»ÄÜÉèÖøòÎÊý£©¡£
blink.job.option=-yD heartbeat.timeout=180000
-yD env.java.opts='-verbose:gc -XX:NewRatio=3
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:
ParallelGCThreads=4'
# Ê±ÇøÉèÖá£
blink.job.timeZone=Asia/Shanghai
   
3736 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]
 
×îÐÂÎÄÕÂ
´óÊý¾Ýƽ̨ϵÄÊý¾ÝÖÎÀí
ÈçºÎÉè¼ÆÊµÊ±Êý¾Ýƽ̨£¨¼¼Êõƪ£©
´óÊý¾Ý×ʲú¹ÜÀí×ÜÌå¿ò¼Ü¸ÅÊö
Kafka¼Ü¹¹ºÍÔ­Àí
ELK¶àÖּܹ¹¼°ÓÅÁÓ
×îпγÌ
´óÊý¾Ýƽ̨´î½¨Óë¸ßÐÔÄܼÆËã
´óÊý¾Ýƽ̨¼Ü¹¹ÓëÓ¦ÓÃʵս
´óÊý¾ÝϵͳÔËά
´óÊý¾Ý·ÖÎöÓë¹ÜÀí
Python¼°Êý¾Ý·ÖÎö
³É¹¦°¸Àý
ijͨÐÅÉ豸ÆóÒµ PythonÊý¾Ý·ÖÎöÓëÍÚ¾ò
Ä³ÒøÐÐ È˹¤ÖÇÄÜ+Python+´óÊý¾Ý
±±¾© Python¼°Êý¾Ý·ÖÎö
ÉñÁúÆû³µ ´óÊý¾Ý¼¼Êõƽ̨-Hadoop
ÖйúµçÐÅ ´óÊý¾Ýʱ´úÓëÏÖ´úÆóÒµµÄÊý¾Ý»¯ÔËӪʵ¼ù