±à¼ÍƼö: |
±¾ÎÄΪÄú½éÉÜÌáÉýÐÔÄܵÄFlink
SQLÍÆ¼öд·¨¡¢ÅäÖü°º¯Êý£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
±¾ÎÄÀ´×ÔÓÚ°¢ÀïÔÆ£¬ÓÉAlice±à¼¡¢ÍƼö¡£
|
|
Group AggregateÓÅ»¯¼¼ÇÉ
¿ªÆôMicroBatch»òMiniBatch£¨ÌáÉýÍÌÍ£©
MicroBatchºÍMiniBatch¶¼ÊÇ΢Åú´¦Àí£¬Ö»ÊÇ΢ÅúµÄ´¥·¢»úÖÆÂÔÓв»Í¬¡£ÔÀíͬÑùÊÇ»º´æÒ»¶¨µÄÊý¾ÝºóÔÙ´¥·¢´¦Àí£¬ÒÔ¼õÉÙ¶ÔStateµÄ·ÃÎÊ£¬´Ó¶øÌáÉýÍÌͲ¢¼õÉÙÊý¾ÝµÄÊä³öÁ¿¡£
MiniBatchÖ÷ÒªÒÀ¿¿ÔÚÿ¸öTaskÉÏ×¢²áµÄTimerÏß³ÌÀ´´¥·¢Î¢Åú£¬ÐèÒªÏûºÄÒ»¶¨µÄÏ̵߳÷¶ÈÐÔÄÜ¡£MicroBatchÊÇMiniBatchµÄÉý¼¶°æ£¬Ö÷Òª»ùÓÚʼþÏûÏ¢À´´¥·¢Î¢Åú£¬Ê¼þÏûÏ¢»á°´ÄúÖ¸¶¨µÄʱ¼ä¼ä¸ôÔÚÔ´Í·²åÈë¡£MicroBatchÔÚÔªËØÐòÁл¯Ð§ÂÊ¡¢·´Ñ¹±íÏÖ¡¢ÍÌͺÍÑÓ³ÙÐÔÄÜÉ϶¼ÒªÓÅÓÚMiniBatch¡£
ÊÊÓó¡¾°
΢Åú´¦Àíͨ¹ýÔö¼ÓÑÓ³Ù»»È¡¸ßÍÌÍ£¬Èç¹ûÄúÓ㬵ÍÑÓ³ÙµÄÒªÇ󣬲»½¨Ò鿪Æô΢Åú´¦Àí¡£Í¨³£¶ÔÓھۺϵij¡¾°£¬Î¢Åú´¦Àí¿ÉÒÔÏÔÖøµÄÌáÉýϵͳÐÔÄÜ£¬½¨Ò鿪Æô¡£
˵Ã÷ MicroBatchģʽҲÄܽâ¾öÁ½¼¶¾ÛºÏÊý¾Ý¶¶¶¯ÎÊÌâ¡£
¿ªÆô·½Ê½
MicroBatchºÍMiniBatchĬÈϹرգ¬¿ªÆô·½Ê½ÈçÏ¡£
# 3.2¼°ÒÔÉϰ汾¿ªÆôWindow miniBatch·½·¨£¨3.2¼°ÒÔÉϰ汾ĬÈϲ»¿ªÆôWindow
miniBatch£©¡£
sql.exec.mini-batch.window.enabled=true
# ÅúÁ¿Êä³öµÄ¼ä¸ôʱ¼ä£¬ÔÚʹÓÃmicroBatch²ßÂÔʱ£¬ÐèÒªÔö¼Ó¸ÃÅäÖã¬ÇÒ½¨ÒéºÍblink.miniBatch.allowLatencyMs±£³ÖÒ»Ö¡£
blink.microBatch.allowLatencyMs=5000
# ÔÚʹÓÃmicroBatchʱ£¬ÐèÒª±£ÁôÒÔÏÂÁ½¸öminiBatchÅäÖá£
blink.miniBatch.allowLatencyMs=5000
# ·ÀÖ¹OOMÉèÖÃÿ¸öÅú´Î×î¶à»º´æÊý¾ÝµÄÌõÊý¡£
blink.miniBatch.size=20000 |
¿ªÆôLocalGlobal£¨½â¾ö³£¼ûÊý¾ÝÈȵãÎÊÌ⣩
LocalGlobalÓÅ»¯½«ÔÏȵÄAggregate·Ö³ÉLocal+GlobalÁ½½×¶Î¾ÛºÏ£¬¼´MapReduceÄ£ÐÍÖеÄCombine+Reduce´¦Àíģʽ¡£µÚÒ»½×¶ÎÔÚÉÏÓνڵ㱾µØÔÜÒ»ÅúÊý¾Ý½øÐоۺϣ¨localAgg£©£¬²¢Êä³öÕâ´Î΢ÅúµÄÔöÁ¿Öµ£¨Accumulator£©¡£µÚ¶þ½×¶ÎÔÙ½«ÊÕµ½µÄAccumulatorºÏ²¢£¨Merge£©£¬µÃµ½×îÖյĽá¹û£¨GlobalAgg£©¡£
LocalGlobal±¾ÖÊÉÏÄܹ»¿¿LocalAggµÄ¾ÛºÏɸ³ý²¿·ÖÇãбÊý¾Ý£¬´Ó¶ø½µµÍGlobalAggµÄÈȵ㣬ÌáÉýÐÔÄÜ¡£Äú¿ÉÒÔ½áºÏÏÂͼÀí½âLocalGlobalÈçºÎ½â¾öÊý¾ÝÇãбµÄÎÊÌâ¡£

ÊÊÓó¡¾°
LocalGlobalÊÊÓÃÓÚÌáÉýÈçSUM¡¢COUNT¡¢MAX¡¢MINºÍAVGµÈÆÕͨ¾ÛºÏµÄÐÔÄÜ£¬ÒÔ¼°½â¾öÕâЩ³¡¾°ÏµÄÊý¾ÝÈȵãÎÊÌâ¡£
˵Ã÷ ¿ªÆôLocalGlobalÐèÒªUDAFʵÏÖMerge·½·¨¡£
¿ªÆô·½Ê½
ʵʱ¼ÆËã2.0°æ±¾¿ªÊ¼£¬LocalGlobalÊÇĬÈÏ¿ªÆôµÄ£¬²ÎÊýÊÇblink.localAgg.enabled=true
£¬µ«ÊÇÐèÒªÔÚmicrobatch»òminibatch¿ªÆôµÄǰÌáϲÅÄÜÉúЧ¡£
ÅжÏÊÇ·ñÉúЧ
¹Û²ì×îÖÕÉú³ÉµÄÍØÆËͼµÄ½ÚµãÃû×ÖÖÐÊÇ·ñ°üº¬GlobalGroupAggregate»òLocalGroupAggregate¡£
¿ªÆôPartialFinal£¨½â¾öCOUNT DISTINCTÈȵãÎÊÌ⣩
LocalGlobalÓÅ»¯Õë¶ÔÆÕͨ¾ÛºÏ£¨ÀýÈçSUM¡¢COUNT¡¢MAX¡¢MINºÍAVG£©ÓнϺõÄЧ¹û£¬¶ÔÓÚCOUNT
DISTINCTÊÕЧ²»Ã÷ÏÔ£¬ÒòΪCOUNT DISTINCTÔÚLocal¾ÛºÏʱ£¬¶ÔÓÚDISTINCT
KEYµÄÈ¥ÖØÂʲ»¸ß£¬µ¼ÖÂÔÚGlobal½ÚµãÈÔÈ»´æÔÚÈȵ㡣
֮ǰ£¬ÎªÁ˽â¾öCOUNT DISTINCTµÄÈȵãÎÊÌ⣬ͨ³£ÐèÒªÊÖ¶¯¸ÄдΪÁ½²ã¾ÛºÏ£¨Ôö¼Ó°´Distinct
KeyȡģµÄ´òÉ¢²ã£©¡£×Ô2.2.0°æ±¾¿ªÊ¼£¬ÊµÊ±¼ÆËãÌṩÁËCOUNT DISTINCT×Ô¶¯´òÉ¢£¬¼´PartialFinalÓÅ»¯£¬ÄúÎÞÐè×ÔÐиÄдΪÁ½²ã¾ÛºÏ¡£PartialFinalºÍLocalGlobalµÄÔÀí¶Ô±È²Î¼ûÏÂͼ¡£

ÊÊÓó¡¾°
ʹÓÃCOUNT DISTINCT£¬µ«ÎÞ·¨Âú×ã¾ÛºÏ½ÚµãÐÔÄÜÒªÇó¡£
˵Ã÷
²»ÄÜÔÚ°üº¬UDAFµÄFlink SQLÖÐʹÓÃPartialFinalÓÅ»¯·½·¨¡£
Êý¾ÝÁ¿²»´óµÄÇé¿öÏ£¬²»½¨ÒéʹÓÃPartialFinalÓÅ»¯·½·¨¡£PartialFinalÓÅ»¯»á×Ô¶¯´òÉ¢³ÉÁ½²ã¾ÛºÏ£¬ÒýÈë¶îÍâµÄÍøÂçShuffle£¬ÔÚÊý¾ÝÁ¿²»´óµÄÇé¿öÏ£¬ÀË·Ñ×ÊÔ´¡£
¿ªÆô·½Ê½
ĬÈϲ»¿ªÆô£¬Ê¹ÓòÎÊýÏÔʽ¿ªÆôblink.partialAgg.enabled=true¡£
ÅжÏÊÇ·ñÉúЧ
¹Û²ì×îÖÕÉú³ÉµÄÍØÆËͼµÄ½ÚµãÃûÖÐÊÇ·ñ°üº¬Expand½Úµã£¬»òÕßÔÀ´Ò»²ãµÄ¾ÛºÏ±ä³ÉÁËÁ½²ãµÄ¾ÛºÏ¡£
¸ÄдΪAGG WITH FILTERÓï·¨£¨ÌáÉý´óÁ¿COUNT DISTINCT³¡¾°ÐÔÄÜ£©
˵Ã÷ ½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³ÖAGG WITH FILTERÓï·¨¡£
ͳ¼Æ×÷ÒµÐèÒª¼ÆËã¸÷ÖÖά¶ÈµÄUV£¬ÀýÈçÈ«ÍøUV¡¢À´×ÔÊÖ»ú¿Í»§¶ËµÄUV¡¢À´×ÔPCµÄUVµÈµÈ¡£½¨ÒéʹÓñê×¼µÄAGG
WITH FILTERÓï·¨À´´úÌæCASE WHENʵÏÖ¶àά¶Èͳ¼ÆµÄ¹¦ÄÜ¡£ÊµÊ±¼ÆËãĿǰµÄSQLÓÅ»¯Æ÷ÄÜ·ÖÎö³öFilter²ÎÊý£¬´Ó¶øÍ¬Ò»¸ö×Ö¶ÎÉϼÆË㲻ͬÌõ¼þϵÄCOUNT
DISTINCTÄܹ²ÏíState£¬¼õÉÙ¶ÔStateµÄ¶Áд²Ù×÷¡£ÐÔÄܲâÊÔÖУ¬Ê¹ÓÃAGG WITH FILTERÓï·¨À´´úÌæCASE
WHENÄܹ»Ê¹ÐÔÄÜÌáÉý1±¶¡£
ÊÊÓó¡¾°
½¨ÒéÄú½«AGG WITH CASE WHENµÄÓï·¨¶¼Ìæ»»³ÉAGG WITH FILTERµÄÓï·¨£¬ÓÈÆäÊǶÔͬһ¸ö×Ö¶ÎÉϼÆË㲻ͬÌõ¼þϵÄCOUNT
DISTINCT½á¹û£¬ÐÔÄÜÌáÉýºÜ´ó¡£
Ôʼд·¨
COUNT(distinct visitor_id)
as UV1 , COUNT(distinct case when is_wireless='y'
then visitor_id else null end) as UV2 |
ÓÅ»¯Ð´·¨
COUNT(distinct visitor_id)
as UV1 , COUNT(distinct visitor_id) filter (where
is_wireless='y') as UV2 |
TopNÓÅ»¯¼¼ÇÉ
TopNËã·¨
µ±TopNµÄÊäÈëÊǷǸüÐÂÁ÷£¨ÀýÈçSource£©£¬TopNÖ»ÓÐÒ»ÖÖËã·¨AppendRank¡£µ±TopNµÄÊäÈëÊǸüÐÂÁ÷ʱ£¨ÀýÈç¾¹ýÁËAGG/JOIN¼ÆË㣩£¬TopNÓÐ3ÖÖËã·¨£¬ÐÔÄܴӸߵ½µÍ·Ö±ðÊÇ£ºUpdateFastRank
¡¢ UnaryUpdateRankºÍRetractRank¡£Ëã·¨Ãû×Ö»áÏÔʾÔÚÍØÆËͼµÄ½ÚµãÃû×ÖÉÏ¡£
UpdateFastRank £º×îÓÅËã·¨¡£
ÐèÒª¾ß±¸2¸öÌõ¼þ£º
ÊäÈëÁ÷ÓÐPK£¨Primary Key£©ÐÅÏ¢£¬ÀýÈçORDER BY AVG¡£
ÅÅÐò×ֶεĸüÐÂÊǵ¥µ÷µÄ£¬ÇÒµ¥µ÷·½ÏòÓëÅÅÐò·½ÏòÏà·´¡£ÀýÈ磬ORDER BY COUNT/COUNT_DISTINCT/SUM£¨ÕýÊý£©DESC£¨½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³Ö£©¡£
Èç¹ûÄúÒª»ñÈ¡µ½ÓÅ»¯Plan£¬ÔòÄúÐèÒªÔÚʹÓÃORDER BY SUM DESCʱ£¬Ìí¼ÓSUMΪÕýÊýµÄ¹ýÂËÌõ¼þ£¬È·±£total_feeΪÕýÊý¡£
insert
into print_test
SELECT
cate_id,
seller_id,
stat_date,
pay_ord_amt --²»Êä³örownum×ֶΣ¬ÄܼõС½á¹û±íµÄÊä³öÁ¿¡£
FROM (
SELECT
*,
ROW_NUMBER () OVER (
PARTITION BY cate_id,
stat_date --×¢ÒâÒªÓÐʱ¼ä×ֶΣ¬·ñÔòstate¹ýÆÚ»áµ¼ÖÂÊý¾Ý´íÂÒ¡£
ORDER
BY pay_ord_amt DESC
) as rownum --¸ù¾ÝÉÏÓÎsum½á¹ûÅÅÐò¡£
FROM (
SELECT
cate_id,
seller_id,
stat_date,
--ÖØµã¡£ÉùÃ÷SumµÄ²ÎÊý¶¼ÊÇÕýÊý£¬ËùÒÔSumµÄ½á¹ûÊǵ¥µ÷µÝÔöµÄ£¬Òò´ËTopNÄÜʹÓÃÓÅ»¯Ëã·¨£¬Ö»»ñȡǰ100¸öÊý¾Ý¡£
sum (total_fee) filter (
where
total_fee >= 0
) as pay_ord_amt
FROM
random_test
WHERE
total_fee >= 0
GROUP
BY cate_name,
seller_id,
stat_date
) a
WHERE
rownum <= 100
); |
UnaryUpdateRank£º½ö´ÎÓÚUpdateFastRankµÄËã·¨¡£ÐèÒª¾ß±¸1¸öÌõ¼þ£ºÊäÈëÁ÷ÖдæÔÚPKÐÅÏ¢¡£
RetractRank£ºÆÕͨËã·¨£¬ÐÔÄÜ×î²î£¬²»½¨ÒéÔÚÉú²ú»·¾³Ê¹ÓøÃËã·¨¡£Çë¼ì²éÊäÈëÁ÷ÊÇ·ñ´æÔÚPKÐÅÏ¢£¬Èç¹û´æÔÚ£¬Ôò¿É½øÐÐUnaryUpdateRank»òUpdateFastRankÓÅ»¯¡£
TopNÓÅ»¯·½·¨
ÎÞÅÅÃûÓÅ»¯
TopNµÄÊä³ö½á¹ûÎÞÐèÒªÏÔʾrownumÖµ£¬½öÐèÔÚ×îÖÕǰ¶ËÏÔʽʱ½øÐÐ1´ÎÅÅÐò£¬¼«´óµØ¼õÉÙÊäÈë½á¹û±íµÄÊý¾ÝÁ¿¡£ÎÞÅÅÃûÓÅ»¯·½·¨ÏêÇéÇë²Î¼ûTopNÓï¾ä¡£
Ôö¼ÓTopNµÄCache´óС
TopNΪÁËÌáÉýÐÔÄÜÓÐÒ»¸öState Cache²ã£¬Cache²ãÄÜÌáÉý¶ÔStateµÄ·ÃÎÊЧÂÊ¡£TopNµÄCacheÃüÖÐÂʵļÆË㹫ʽΪ¡£
cache_hit = cache_size*parallelism/top_n/partition_key_num |
ÀýÈ磬Top100ÅäÖûº´æ10000Ìõ£¬²¢·¢50£¬µ±ÄúµÄPatitionByµÄkeyά¶È½Ï´óʱ£¬ÀýÈç10Íò¼¶±ðʱ£¬CacheÃüÖÐÂÊÖ»ÓÐ10000*50/100/100000=5%£¬ÃüÖÐÂÊ»áºÜµÍ£¬µ¼Ö´óÁ¿µÄÇëÇó¶¼»á»÷ÖÐState£¨´ÅÅÌ£©£¬ÐÔÄÜ»á´ó·ùϽµ¡£Òò´Ëµ±PartitionKeyά¶ÈÌØ±ð´óʱ£¬¿ÉÒÔÊʵ±¼Ó´óTopNµÄCacheS
ize£¬Ïà¶ÔÓ¦µÄÒ²½¨ÒéÊʵ±¼Ó´óTopN½ÚµãµÄHeap Memory£¨Çë²Î¼ûÊÖ¶¯ÅäÖõ÷ÓÅ£©¡£
##ĬÈÏ10000Ìõ£¬µ÷ÕûTopN cahceµ½20Íò£¬ÄÇôÀíÂÛÃüÖÐÂÊÄÜ´ï200000*50/100/100000
= 100%¡£
blink.topn.cache.size=200000 |
PartitionByµÄ×Ö¶ÎÖÐÒªÓÐʱ¼äÀà×Ö¶Î
ÀýÈçÿÌìµÄÅÅÃû£¬Òª´øÉÏDay×ֶΡ£·ñÔòTopNµÄ½á¹ûµ½×îºó»áÓÉÓÚState ttlÓдíÂÒ¡£
¸ßÐ§È¥ÖØ·½°¸
˵Ã÷ ½öBlink 3.2.1°æ±¾Ö§³Ö¸ßÐ§È¥ÖØ·½°¸¡£
ʵʱ¼ÆËãµÄÔ´Êý¾ÝÔÚ²¿·Ö³¡¾°ÖдæÔÚÖØ¸´Êý¾Ý£¬È¥ÖسÉΪÁËÓû§¾³£·´À¡µÄÐèÇó¡£ÊµÊ±¼ÆËãÓб£ÁôµÚÒ»Ìõ£¨Deduplicate
Keep FirstRow£©ºÍ±£Áô×îºóÒ»Ìõ£¨Deduplicate Keep LastRow£©2ÖÖÈ¥ÖØ·½°¸¡£
Óï·¨
ÓÉÓÚSQLÉÏûÓÐÖ±½ÓÖ§³ÖÈ¥ÖØµÄÓï·¨£¬»¹ÒªÁé»îµÄ±£ÁôµÚÒ»Ìõ»ò±£Áô×îºóÒ»Ìõ¡£Òò´ËÎÒÃÇʹÓÃÁËSQLµÄROW_NUMBER
OVER WINDOW¹¦ÄÜÀ´ÊµÏÖÈ¥ÖØÓï·¨¡£È¥Öر¾ÖÊÉÏÊÇÒ»ÖÖÌØÊâµÄTopN¡£
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
ORDER BY timeAttributeCol [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1 |
ʹÓÃROW_NUMBER() ´°¿Úº¯ÊýÀ´¶ÔÊý¾Ý¸ù¾Ýʱ¼äÊôÐÔÁнøÐÐÅÅÐò²¢±êÉÏÅÅÃû¡£
˵Ã÷
µ±ÅÅÐò×Ö¶ÎÊÇProctimeÁÐʱ£¬Flink¾Í»á°´ÕÕϵͳʱ¼äÈ¥ÖØ£¬Æäÿ´ÎÔËÐеĽá¹ûÊDz»È·¶¨µÄ¡£
µ±ÅÅÐò×Ö¶ÎÊÇRowtimeÁÐʱ£¬Flink¾Í»á°´ÕÕÒµÎñʱ¼äÈ¥ÖØ£¬Æäÿ´ÎÔËÐеĽá¹ûÊÇÈ·¶¨µÄ¡£
¶ÔÅÅÃû½øÐйýÂË£¬Ö»È¡µÚÒ»Ìõ£¬´ïµ½ÁËÈ¥ÖØµÄÄ¿µÄ¡£
˵Ã÷ ÅÅÐò·½Ïò¿ÉÒÔÊǰ´ÕÕʱ¼äÁеÄ˳Ðò£¬Ò²¿ÉÒÔÊǵ¹Ðò£º
Deduplicate Keep FirstRow£ºË³Ðò²¢È¡µÚÒ»ÌõÐÐÊý¾Ý¡£
Deduplicate Keep LastRow£ºµ¹Ðò²¢È¡µÚÒ»ÌõÐÐÊý¾Ý¡£
Deduplicate Keep FirstRow
±£ÁôÊ×ÐеÄÈ¥ÖØ²ßÂÔ£º±£ÁôKEYϵÚÒ»Ìõ³öÏÖµÄÊý¾Ý£¬Ö®ºó³öÏÖ¸ÃKEYϵÄÊý¾Ý»á±»¶ªÆúµô¡£ÒòΪSTATEÖÐÖ»´æ´¢ÁËKEYÊý¾Ý£¬ËùÒÔÐÔÄܽÏÓÅ£¬Ê¾ÀýÈçÏ¡£
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime)
as rowNum
FROM T
)
WHERE rowNum = 1 |
˵Ã÷ ÒÔÉÏʾÀýÊǽ«T±í°´ÕÕb×ֶνøÐÐÈ¥ÖØ£¬²¢°´ÕÕϵͳʱ¼ä±£ÁôµÚÒ»ÌõÊý¾Ý¡£ProctimeÔÚÕâÀïÊÇÔ´±íTÖеÄÒ»¸ö¾ßÓÐProcessing
TimeÊôÐÔµÄ×ֶΡ£Èç¹ûÄú°´ÕÕϵͳʱ¼äÈ¥ÖØ£¬Ò²¿ÉÒÔ½«Proctime×ֶμò»¯PROCTIME()º¯Êýµ÷Ó㬿ÉÒÔÊ¡ÂÔProctime×ֶεÄÉùÃ÷¡£
Deduplicate Keep LastRow
±£ÁôÄ©ÐеÄÈ¥ÖØ²ßÂÔ£º±£ÁôKEYÏÂ×îºóÒ»Ìõ³öÏÖµÄÊý¾Ý¡£±£ÁôÄ©ÐеÄÈ¥ÖØ²ßÂÔÐÔÄÜÂÔÓÅÓÚLAST_VALUEº¯Êý£¬Ê¾ÀýÈçÏ¡£
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY
rowtime DESC) as rowNum
FROM T
)
WHERE rowNum = 1 |
˵Ã÷ ÒÔÉÏʾÀýÊǽ«T±í°´ÕÕbºÍd×ֶνøÐÐÈ¥ÖØ£¬²¢°´ÕÕÒµÎñʱ¼ä±£Áô×îºóÒ»ÌõÊý¾Ý¡£RowtimeÔÚÕâÀïÊÇÔ´±íTÖеÄÒ»¸ö¾ßÓÐEvent
TimeÊôÐÔµÄ×ֶΡ£
¸ßЧµÄÄÚÖú¯Êý
ʹÓÃÄÚÖú¯ÊýÌæ»»×Ô¶¨Ò庯Êý
ʵʱ¼ÆËãµÄÄÚÖú¯ÊýÔÚ³ÖÐøµÄÓÅ»¯µ±ÖУ¬Ç뾡Á¿Ê¹ÓÃÄÚ²¿º¯ÊýÌæ»»×Ô¶¨Ò庯Êý¡£ÊµÊ±¼ÆËã2.0°æ±¾¶ÔÄÚÖú¯ÊýÖ÷Òª½øÐÐÁËÈçÏÂÓÅ»¯£º
ÓÅ»¯Êý¾ÝÐòÁл¯ºÍ·´ÐòÁл¯µÄºÄʱ¡£
ÐÂÔöÖ±½Ó¶Ô×Ö½Úµ¥Î»½øÐвÙ×÷µÄ¹¦ÄÜ¡£
KEY VALUEº¯ÊýʹÓõ¥×Ö·ûµÄ·Ö¸ô·û
KEY VALUE µÄÇ©Ãû£ºKEYVALUE(content, keyValueSplit, keySplit,
keyName)£¬µ±keyValueSplitºÍKeySplitÊǵ¥×Ö·û£¨ÀýÈ磬ðºÅ£¨:£©¡¢¶ººÅ£¨,£©£©Ê±£¬ÏµÍ³»áʹÓÃÓÅ»¯Ëã·¨£¬ÔÚ¶þ½øÖÆÊý¾ÝÉÏÖ±½ÓѰÕÒËùÐèµÄkeyName
µÄÖµ£¬¶ø²»»á½«Õû¸öcontent×öÇз֡£ÐÔÄÜÔ¼ÌáÉý30%¡£
¶àKEY VALUE³¡¾°Ê¹ÓÃMULTI_KEYVALUE
˵Ã÷ ½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³ÖMULTI_KEYVALUE¡£
ÔÚQueryÖжÔͬһ¸öContent½øÐдóÁ¿KEY VALUEµÄ²Ù×÷£¬»á¶ÔÐÔÄܲúÉúºÜ´óÓ°Ïì¡£ÀýÈçContentÖаüº¬10¸öKey-Value¶Ô£¬Èç¹ûÄúÏ£Íû°Ñ10¸öValueµÄÖµ¶¼È¡³öÀ´×÷Ϊ×ֶΣ¬Äú¾ÍÐèҪд10¸öKEY
VALUEº¯Êý£¬Ôòϵͳ¾Í»á¶ÔContent½øÐÐ10´Î½âÎö£¬µ¼ÖÂÐÔÄܽµµÍ¡£
ÔÚÕâÖÖÇé¿öÏ£¬½¨ÒéÄúʹÓÃMULTI_KEYVALUE±íÖµº¯Êý£¬¸Ãº¯Êý¿ÉÒÔ¶ÔContentÖ»½øÐÐÒ»´ÎSplit½âÎö£¬ÐÔÄÜÔ¼ÄÜÌáÉý50%~100%¡£
LIKE²Ù×÷×¢ÒâÊÂÏî
Èç¹ûÐèÒª½øÐÐStartWith²Ù×÷£¬Ê¹ÓÃLIKE 'xxx%'¡£
Èç¹ûÐèÒª½øÐÐEndWith²Ù×÷£¬Ê¹ÓÃLIKE '%xxx'¡£
Èç¹ûÐèÒª½øÐÐContains²Ù×÷£¬Ê¹ÓÃLIKE '%xxx%'¡£
Èç¹ûÐèÒª½øÐÐEquals²Ù×÷£¬Ê¹ÓÃLIKE 'xxx'£¬µÈ¼ÛÓÚstr = 'xxx'¡£
Èç¹ûÐèҪƥÅä _ ×Ö·û£¬Çë×¢ÒâÒªÍê³ÉתÒåLIKE '%seller/id%' ESCAPE '/'¡£_ÔÚSQLÖÐÊôÓÚµ¥×Ö·ûͨÅä·û£¬ÄÜÆ¥ÅäÈκÎ×Ö·û¡£Èç¹ûÉùÃ÷Ϊ
LIKE '%seller_id%'£¬Ôò²»µ¥»áÆ¥Åäseller_id»¹»áÆ¥Åäseller#id¡¢sellerxid»òseller1id
µÈ£¬µ¼Ö½á¹û´íÎó¡£
É÷ÓÃÕýÔòº¯Êý£¨REGEXP£©
ÕýÔò±í´ïʽÊǷdz£ºÄʱµÄ²Ù×÷£¬¶Ô±È¼Ó¼õ³Ë³ýͨ³£Óаٱ¶µÄÐÔÄÜ¿ªÏú£¬¶øÇÒÕýÔò±í´ïʽÔÚijЩ¼«¶ËÇé¿öÏ¿ÉÄÜ»á½øÈëÎÞÏÞÑ»·£¬µ¼ÖÂ×÷Òµ×èÈû¡£½¨ÒéʹÓÃLIKE¡£ÕýÔòº¯Êý°üÀ¨£º
REGEXP
REGEXP_EXTRACT
REGEXP_REPLACE
ÍøÂç´«ÊäµÄÓÅ»¯
Ŀǰ³£¼ûµÄPartitioner²ßÂÔ°üÀ¨£º
KeyGroup/Hash£º¸ù¾ÝÖ¸¶¨µÄKey·ÖÅä¡£
Rebalance£ºÂÖѯ·ÖÅ䏸¸÷¸öChannel¡£
Dynamic-Rebalance£º¸ù¾ÝÏÂÓθºÔØÇé¿ö¶¯Ì¬Ñ¡Ôñ·ÖÅ䏸¸ºÔؽϵ͵ÄChannel¡£
Forward£ºÎ´ChainÒ»Æðʱ£¬Í¬Rebalance¡£ChainÒ»ÆðʱÊÇÒ»¶ÔÒ»·ÖÅä¡£
Rescale£ºÉÏÓÎÓëÏÂÓÎÒ»¶Ô¶à»ò¶à¶ÔÒ»¡£
ʹÓÃDynamic-RebalanceÌæ´úRebalance
Dynamic-Rebalance¿ÉÒÔ¸ù¾Ýµ±Ç°¸÷SubpartitionÖжѻýµÄBufferµÄÊýÁ¿£¬Ñ¡Ôñ¸ºÔؽÏÇáµÄSubpartition½øÐÐдÈ룬´Ó¶øÊµÏÖ¶¯Ì¬µÄ¸ºÔؾùºâ¡£Ïà±ÈÓÚ¾²Ì¬µÄRebalance²ßÂÔ£¬ÔÚÏÂÓθ÷ÈÎÎñ¼ÆËãÄÜÁ¦²»¾ùºâʱ£¬¿ÉÒÔʹ¸÷ÈÎÎñÏà¶Ô¸ºÔظü¼Ó¾ùºâ£¬´Ó¶øÌá¸ßÕû¸ö×÷ÒµµÄÐÔÄÜ¡£ÀýÈ磬ÔÚʹÓÃRebalanceʱ£¬·¢ÏÖÏÂÓθ÷¸ö²¢·¢¸ºÔز»¾ùºâʱ£¬¿ÉÒÔ¿¼ÂÇʹÓÃDynamic-Rebalance¡£²ÎÊý£ºtask.dynamic.rebalance.enabled=true£¬
ĬÈϹرա£
ʹÓÃRescaleÌæ´úRebalance
˵Ã÷ ½öʵʱ¼ÆËã2.2.2¼°ÒÔÉϰ汾֧³ÖRescale¡£
ÀýÈ磬ÉÏÓÎÊÇ5¸ö²¢·¢£¬ÏÂÓÎÊÇ10¸ö²¢·¢¡£µ±Ê¹ÓÃRebalanceʱ£¬ÉÏÓÎÿ¸ö²¢·¢»áÂÖѯ·¢¸øÏÂÓÎ10¸ö²¢·¢¡£µ±Ê¹ÓÃRescaleʱ£¬ÉÏÓÎÿ¸ö²¢·¢Ö»ÐèÂÖѯ·¢¸øÏÂÓÎ2¸ö²¢·¢¡£ÒòΪChannel¸öÊý±äÉÙÁË£¬SubpartitionµÄBufferÌî³äËÙ¶ÈÄܱä¿ì£¬ÄÜÌá¸ßÍøÂçЧÂÊ¡£µ±ÉÏÓεÄÊý¾Ý±È½Ï¾ùÔÈʱ£¬ÇÒÉÏÏÂÓεIJ¢·¢Êý³É±ÈÀýʱ£¬¿ÉÒÔʹÓÃRescaleÌæ»»Rebalance¡£²ÎÊý£ºenable.rescale.shuffling=true£¬Ä¬ÈϹرա£
ÍÆ¼öµÄÓÅ»¯ÅäÖ÷½°¸
×ÛÉÏËùÊö£¬×÷Òµ½¨ÒéʹÓÃÈçϵÄÍÆ¼öÅäÖá£
# EXACTLY_ONCEÓïÒå¡£
blink.checkpoint.mode=EXACTLY_ONCE
# checkpoint¼ä¸ôʱ¼ä£¬µ¥Î»ºÁÃë¡£
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# 2.xʹÓÃniagara×÷Ϊstatebackend£¬ÒÔ¼°É趨stateÊý¾ÝÉúÃüÖÜÆÚ£¬ µ¥Î»ºÁÃë¡£
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# 2.x¿ªÆô5ÃëµÄmicrobatch¡£
blink.microBatch.allowLatencyMs=5000
# Õû¸öJobÔÊÐíµÄÑÓ³Ù¡£
blink.miniBatch.allowLatencyMs=5000
# µ¥¸öbatchµÄsize¡£
blink.miniBatch.size=20000
# local ÓÅ»¯£¬2.xĬÈÏÒѾ¿ªÆô£¬1.6.4ÐèÊÖ¶¯¿ªÆô¡£
blink.localAgg.enabled=true
# 2.x¿ªÆôPartialFinaÓÅ»¯£¬½â¾öCOUNT DISTINCTÈȵ㡣
blink.partialAgg.enabled=true
# union allÓÅ»¯¡£
blink.forbid.unionall.as.breakpoint.in.subsection. optimization=true
# object reuseÓÅ»¯£¬Ä¬ÈÏÒÑ¿ªÆô¡£
#blink.object.reuse=true
# GCÓÅ»¯£¨SLS×öÔ´±í²»ÄÜÉèÖøòÎÊý£©¡£
blink.job.option=-yD heartbeat.timeout=180000
-yD env.java.opts='-verbose:gc -XX:NewRatio=3
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX: ParallelGCThreads=4'
# Ê±ÇøÉèÖá£
blink.job.timeZone=Asia/Shanghai |
|