±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½éÉÜ×Ö½ÚÌø¶¯ÔÚ
Bucket ·½ÃæµÄÓÅ»¯¡£
±¾ÎÄÀ´×Ô΢ÐŹ«ÖںŹýÍù¼ÇÒä´óÊý¾Ý£¬ÓÉ»ðÁú¹ûÈí¼þLinda±à¼¡¢ÍƼö¡£
|
|
±¾ÎÄÖ÷Òª´ÓÒÔÏÂËĸö·½Ãæ½éÉÜ£º
Spark SQL ÔÚ×Ö½ÚÌø¶¯µÄÓ¦ÓÃ
ʲôÊÇ·ÖͰ
Spark ·ÖͰµÄÏÞÖÆ
×Ö½ÚÌø¶¯ÔÚ·ÖͰ·½ÃæµÄÓÅ»¯

ÏÂÃæÊÇ Spark SQL ÔÚ×Ö½ÚÌø¶¯µÄÓ¦Óá£
2016ÄêÖ÷ÒªÊÇС¹æÄ£µÄ²âÊÔ½×¶Î
2017ÄêÓÃÓÚ´¦Àí Ad-hoc ¹¤×÷¸ºÔØ
2018ÄêÔÚÉú²ú»·¾³Ï´¦ÀíÉÙÁ¿µÄ ETL ¹ÜµÀ¹¤×÷£»
2019ÄêÔÚÉú²ú»·¾³ÏÂÈ«Ãæ²¿Êð£»
2020Äê³ÉΪ DW ÁìÓòµÄÖ÷Òª¼ÆËãÒýÇæ¡£

ʲôÊÇ·ÖͰ

ÉÏÃæÀý×ÓչʾÁË´´½¨·ÖͰ±íµÄ·½·¨¡£Ö÷Òª¹Ø¼ü×ÖÊÇ clustered by (xxx) sorted
by (xxx) into N buckets
Èç¹ûÎÒÃÇÍù·ÖͰ±íÀïÃæ²åÈëÊý¾Ý£¬¿ÉÒÔÈçÏÂʹÓÃ
INSERT INTO order
SELECT order_id, user_id, product, amount
FROM order_staging
¿É¼û£¬Õâ¸öºÍÕý³£±íµÄʹÓò¢Ã»ÓÐÊ²Ã´Çø±ð¡£

Èç¹ûÎÒÃǽøÐÐÒ»¸ö ShuffleHashJoin µÄʱºò£¬Ê×ÏÈÐèÒª½«±íµÄÊý¾Ý°´ÕÕ on µÄÌõ¼þ½øÐзÖÇø£¬È»ºó²ÅÊǽøÐÐ
Join ²Ù×÷¡£ 
µ«ÊÇÈç¹û²ÎÓë Join µÄ±íÒѾʵÏÖ·ÖͰÁË£¬ÄÇôÔÚÖ´ÐÐ ShuffleHashJoin µÄʱºòʡȥ
Shuffle µÄ²Ù×÷¡£±ÈÈçÉÏÃæµÄÀý×ÓÈç¹ûÎÒÃÇ¶Ô order ºÍ user ±í°´ÕÕ user_id
×ֶνøÐзÖͰ£¬ÄÇôÔÚ ShuffleHashJoin µÄʱºò¾Í²»ÐèÒª½øÐÐ Exchange ²Ù×÷ÁË¡£

¶ÔÓÚ SortMergeJoin £¬ÐèÒª¶Ô on ÀïÃæµÄÌõ¼þ×ֶνøÐÐ Exchange ²Ù×÷£¬È»ºóÔÙ½øÐÐ
Sort ²Ù×÷£¬×îºó²ÅÊÇÖ´ÐÐ SortMergeJoin£¨¸ü¶à¹ØÓÚ Join µÄ²ßÂÔ¿ÉÒԲμû¹ýÍù¼ÇÒä´óÊý¾ÝµÄ¡¶Ã¿¸ö
Spark ¹¤³Ìʦ¶¼Ó¦¸ÃÖªµÀµÄÎåÖÖ Join ²ßÂÔ¡·ÎÄÕ£©¡£ 
Èç¹û²ÎÓë Join µÄ±íÒѾ·ÖͰÁË£¬ÄÇô²»ÐèÒª¾ÍÐÐ Exchange ºÍ Sort ²Ù×÷ÁË¡£
Spark ·ÖͰµÄÏÞÖÆ
СÎļþÎÊÌâ 
Ö´ÐÐÉÏÃæµÄ SQL£¬Ã¿¸ö task ×î¶à¿ÉÄܲúÉú 1024 ¸öÎļþ£¬ÆäÖÐ 1024 ÊÇ·ÖͰµÄÊýÁ¿¡£ËùÒÔÈç¹ûÎÒÃÇÓÐ
M ¸ö task£¬ÄÇô×î¶à²úÉúµÄÎļþ¸öÊýΪ M * 1024¡£±ÈÈçÉÏÃæµÄ attempt_20200519145628_0014_m_000014_0
Ŀ¼Ï²úÉúÁË 988 ¸öÎļþ¡£
½â¾öСÎļþµÄÎÊÌâ¿ÉÒÔ¼ÓÉÏ DISTRIBUTE BY £¬ÈçÏ£º
INSERT INTO order SELECT order_id, user_id, product,
amount
FROM order_staging
DISTRIBUTE BY user_id
Èç¹û 1024 ÊÇ M µÄ±¶Êý£¬ÄÇô×î¶à»á²úÉú 1024 ¸öÎļþ£¬ÆäÖÐ M = spark.sql.shuffle.partitions£»
Èç¹û M ÊÇ 1024 µÄ±¶Êý£¬ÄÇô×î¶à»á²úÉú M ¸öÎļþ£¬ÆäÖÐ M = spark.sql.shuffle.partitions¡£
Spark ·ÖͰºÍÆäËû SQL ÒýÇæ²»¼æÈÝ 
Spark µÄ·ÖͰºÍ Hive µÄ·ÖͰÊDz»¼æÈݵģ¬Í¬Ê±ºÍ Presto Ò²ÊDz»¼æÈݵ컵«ÊÇ Presto
Óë Hive µÄ·ÖͰÊǼæÈݵġ£
Spark µÄ·ÖͰºÍ Hive ²»¼æÈÝÖ÷ÒªÔÒòÊÇÒÔÏÂÔÒòµ¼Öµģº
Hive ÔÚÉú³É·ÖͰµÄʱºò»á¶îÍâ½øÐÐÒ»¸ö Reduce ²Ù×÷£¬ÒÔ±£Ö¤Ïàͬ·ÖͰµÄÊý¾Ý¶¼´æ´¢ÔÚÒ»¸öÎļþÖС£¶ø
Spark SQL ÔÚд·ÖͰÎļþʱ²»ÐèÒª Shuffle ²Ù×÷£¬ÕâÑù¾Í»áµ¼ÖÂÿ¸ö·ÖͰ×î¶à²úÉú M ¸öÎļþ£¬Õâ¾Íµ¼ÖÂÉÏÃæËµµÄСÎļþÎÊÌ⣻
Spark ·ÖͰºÍ Hive ·ÖͰ²ÉÓò»Í¬µÄ hash Ëã·¨¡£Hive ÓõÄÊÇ HiveHash£»¶ø
Spark ÓõÄÊÇ Murmur3£¬ËùÒÔÊý¾ÝµÄ·Ö²¼ÊDz»Ò»ÑùµÄ¡£

ÒòΪ Spark ºÍ Hive ·ÖͰ²»¼æÈÝ£¬ËùÒÔµ± Spark µÄ·ÖͰ±íºÍ Hive µÄ·ÖͰ±í½øÐÐ
SortMergeJoin µÄʱºòÊÇÐèÒª½øÐÐ Sort ºÍ Exchange ²Ù×÷µÄ¡£
¶îÍâµÄÅÅÐò²Ù×÷

ÒòΪ Spark SQL ±íÖеÄÿ¸ö·ÖͰÀïÃæ×îÉÙ°üº¬Ò»¸öÎļþ£¬ËùÒÔÔÚ½øÐÐ Join ֮ǰÐèÒª½øÐжîÍâµÄÅÅÐò²Ù×÷¡£
·ÖͰÊý²»¶ÔÆë 
Èç¹û²ÎÓë Join µÄ±í·ÖͰÊý²»Ò»Ö£¬ÄÇôÆäÖÐÒ»ÕűíÐèÒª½øÐжîÍâµÄ Exchange ²Ù×÷¡£
²ÎÓë Join µÄ key ºÍ·ÖͰÁв»Ò»ÑùÐèÒª¶îÍâ²Ù×÷

µ±²ÎÓë Join µÄ key ºÍ·ÖͰµÄÁв»Ò»Ñùʱ£¬ÐèÒª¶îÍâµÄ Exchange ²Ù×÷¡£

ÉÏÃæµÄÀý×Ó¾¡¹Ü²ÎÓë Join µÄ±í¶¼ÊÇ¶Ô user_id ×ֶνøÐзÖͰ£¬²¢ÇÒ·ÖͰÊýÒ»Ñù£¬µ«ÊÇ»¹ÊÇÐèÒª¶îÍâµÄ
Exchange ²Ù×÷¡£
×Ö½ÚÌø¶¯ÔÚ·ÖͰ·½ÃæµÄÓÅ»¯
Spark ·ÖͰºÍ Hive ·ÖͰ¶ÔÆë

Ç°Ãæ½éÉÜÁË Spark ºÍ Hive ·ÖͰ²»¼æÈÝ£¬¶ÔÓÚÕâ·½Ãæ£¬×Ö½ÚÌø¶¯½« Hive ·ÖͰ±íºÍ Spark
·ÖͰ±í½øÐÐÁË¶ÔÆë£¬Ö÷Òª°üÀ¨£º

Spark SQL д Hive ·ÖͰ±íµÄÂß¼ºÍ Hive Ò»Ö¡£ ÖØÐ´ÁË InsertIntoHiveTable#requiredOrdering
ºÍ InsertIntoHiveTable#requiredDistribution£¬²¢ÇÒҲʹÓÃÁË
HiveHash Ëã·¨¡£

¶ÔÓÚ¶Á·½Ãæ£¬ÖØÐ´ÁË HiveTableScanExec#outputPartitioning ºÍ
HiveTableScanExec#outputOrdering£¬Ê¹ÓÃÁË HiveHash Ëã·¨£¬²¢ÇÒʹÓÃÁË
Hive µÄ·ÖͰԪÊý¾Ý¡£ 
ÉÏÃæÊÇ Spark ¶ÁÈ¡ Hive ·ÖͰ±í¸Ä½øÇ°ºÍ¸Ä½øºóµÄÇø±ð¡£¿ÉÒÔ¿´µ½£¬¸Ä½øºó£¬outputPartitioning
Ϊ HashPartitioning£¬²¢ÇÒ outputOrdering Ϊ SortOrder£¬Âú×ãÁË
requireChildDistribution Ϊ HashClusteredDistributionµÄÒªÇóÒÔ¼°requireChildOrdering
Ϊ SortOrder£¬´Ó¶øÔÚ½øÐÐ SortMergeJoin µÄʱºòʡȥÁË Exchange ºÍ
Sort ²Ù×÷¡£
One to Mange Bucket Join
ÁíÒ»¸ö¸Ä½øÊÇ One to Merge Bucket Join£¬±ÈÈçÏÂÃæÀý×Ó A ±íÓÐÈý¸ö·ÖͰ£¬B
±íÓÐÁù¸ö·ÖͰ¡£

Èç¹ûÎÒÃÇÔÚ Spark ¶ÔÉÏÃæÁ½ÕÅ±í½øÐÐ Join ²Ù×÷£¬B ±íÐèÒª¶îÍâµÄ Sort ²Ù×÷£¬ÒòΪÉÏÃæÁ½ÕűíµÄ·ÖͰÊý²»Ò»Ñù¡£µ«ÊÇÔÚ×Ö½Ú¹«Ë¾£¬ÓÉÓÚ¶ÔÐÔÄܵÄÒªÇó£¬ÐèÒª±ÜÃâ
Sort ²Ù×÷¡£
Ò»ÖÖ·½·¨Êǽ« A ±íµÄ·ÖͰ 0 ºÍ B ±íµÄ·ÖͰ 0 ¡¢·ÖͰ 3 ½øÐйØÁª£»½« A ±íµÄ·ÖͰ 1 ºÍ
B ±íµÄ·ÖͰ 1 ¡¢·ÖͰ 4 ½øÐйØÁª£»½« A ±íµÄ·ÖͰ 2 ºÍ B ±íµÄ·ÖͰ 2 ¡¢·ÖͰ 5 ½øÐйØÁª¡£ÎÒÃÇÖ»ÐèÒª½«
A ±í¸´ÖÆÒ»·Ý£¬ÕâÑù A ±íÒ²Âú×ã 6 ¸ö·ÖͰ¡£½« A ±íºÍ A ±í½øÐÐ Union ¿ÉÒÔ²úÉú µ½
6 ¸ö·ÖͰµÄÐÂ±í£¬µ«ÊÇ Spark ×Ô´øµÄ Union ²Ù×÷Ö®ºó outputPartitioning
ºÍ outputOrdering ½«±»É¾³ý£¬ËùÒÔ×Ö½Ú×Ô¼º¿ª·¢³ö bucket union£¬Ê¹µÃ outputPartitioning
ºÍ outputOrdering ±»±£Áô£¬ÕâÑù¾Í¿ÉÒÔʡȥ Sort ºÍ Exchange ²Ù×÷¡£

²»¹ýÉÏÃæµÄ·½ÃæÔÚ B left join A ¡¢B left semi join A¡¢B anti
join A¡¢B inner join A ¿ÉÒÔÕý³£¹¤×÷£¬µ«ÊÇÔÚ B right join A¡¢B
full outer join A¡¢B cross join A µÄʱºò½á¹ûÓÐÖØ¸´£¬ÒòΪ A ±íµÄÊý¾Ý±»É¨ÃèÁËÁ½´Î¡£ 
ΪÁ˽â¾öÕâ¸öÎÊÌ⣬ÔÚ TableScan ºóÃæ¼ÓÉÏÁË hash(10) % buckets = bucket
id µÄ¹ýÂËÌõ¼þ£¬±ÈÈç bucket 0 ½«»á°Ñ 3¡¢9¡¢15 ¹ýÂ˵ô£¬Í¨¹ýÕâÖÖ°ì·¨½«»áÏû³ýÖØ¸´Êý¾Ý¡£ 
×Ö½ÚµÄÁíÍâÒ»¸öÓÅ»¯ÊÇÈç¹û Join µÄ Key ²»½ö½öÊÇ·ÖͰµÄ Key£¬ÔÉúµÄ Spark »á²úÉú¶îÍâµÄ
Exchange ºÍ Sort ²Ù×÷¡£

ͨ¹ýÓÅ»¯ºó£¬Exchange ½«Ïû³ý¡£

|