1¡¢HiveÊÇ»ùÓÚHadoopµÄÒ»¸öÊý¾Ý²Ö¿âϵͳ£¬ÔÚ¸÷´ó¹«Ë¾¶¼Óй㷺µÄÓ¦Óá£ÃÀÍÅÊý¾Ý²Ö¿âÒ²ÊÇ»ùÓÚHive´î½¨£¬Ã¿ÌìÖ´ÐнüÍò´ÎµÄHive
ETL¼ÆËãÁ÷³Ì£¬¸ºÔðÿÌìÊý°ÙGBµÄÊý¾Ý´æ´¢ºÍ·ÖÎö¡£HiveµÄÎȶ¨ÐÔºÍÐÔÄܶÔÎÒÃǵÄÊý¾Ý·ÖÎö·Ç³£¹Ø¼ü¡£
ÔÚ¼¸´ÎÉý¼¶HiveµÄ¹ý³ÌÖУ¬ÎÒÃÇÓöµ½ÁËһЩ´ó´óССµÄÎÊÌ⡣ͨ¹ýÏòÉçÇøµÄ×ÉѯºÍ×Ô¼ºµÄŬÁ¦£¬ÔÚ½â¾öÕâЩÎÊÌâµÄͬʱÎÒÃǶÔHive½«SQL±àÒëΪMapReduceµÄ¹ý³ÌÓÐÁ˱ȽÏÉîÈëµÄÀí½â¡£¶ÔÕâÒ»¹ý³ÌµÄÀí½â²»½ö°ïÖúÎÒÃǽâ¾öÁËһЩHiveµÄbug£¬Ò²ÓÐÀûÓÚÎÒÃÇÓÅ»¯Hive
SQL£¬ÌáÉýÎÒÃǶÔHiveµÄÕÆ¿ØÁ¦£¬Í¬Ê±ÓÐÄÜÁ¦È¥¶¨ÖÆÒ»Ð©ÐèÒªµÄ¹¦ÄÜ¡£
MapReduceʵÏÖ»ù±¾SQL²Ù×÷µÄÔÀí
Ïêϸ½²½âSQL±àÒëΪMapReduce֮ǰ£¬ÎÒÃÇÏÈÀ´¿´¿´MapReduce¿ò¼ÜʵÏÖSQL»ù±¾²Ù×÷µÄÔÀí
JoinµÄʵÏÖÔÀí
select u.name, o.orderid from order o join user u on o.uid = u.uid; |
ÔÚmapµÄÊä³övalueÖÐΪ²»Í¬±íµÄÊý¾Ý´òÉÏtag±ê¼Ç£¬ÔÚreduce½×¶Î¸ù¾ÝtagÅжÏÊý¾ÝÀ´Ô´¡£MapReduceµÄ¹ý³ÌÈçÏ£¨ÕâÀïÖ»ÊÇ˵Ã÷×î»ù±¾µÄJoinµÄʵÏÖ£¬»¹ÓÐÆäËûµÄʵÏÖ·½Ê½£©

Group ByµÄʵÏÖÔÀí
select rank, isonline, count(*) from city group by rank, isonline; |
½«GroupByµÄ×Ö¶Î×éºÏΪmapµÄÊä³ökeyÖµ£¬ÀûÓÃMapReduceµÄÅÅÐò£¬ÔÚreduce½×¶Î±£´æLastKeyÇø·Ö²»Í¬µÄkey¡£MapReduceµÄ¹ý³ÌÈçÏ£¨µ±È»ÕâÀïÖ»ÊÇ˵Ã÷Reduce¶ËµÄ·ÇHash¾ÛºÏ¹ý³Ì£©

DistinctµÄʵÏÖÔÀí
select dealid, count(distinct uid) num from order group by dealid; |
µ±Ö»ÓÐÒ»¸ödistinct×Ö¶Îʱ£¬Èç¹û²»¿¼ÂÇMap½×¶ÎµÄHash GroupBy£¬Ö»ÐèÒª½«GroupBy×ֶκÍDistinct×Ö¶Î×éºÏΪmapÊä³ökey£¬ÀûÓÃmapreduceµÄÅÅÐò£¬Í¬Ê±½«GroupBy×Ö¶Î×÷ΪreduceµÄkey£¬ÔÚreduce½×¶Î±£´æLastKey¼´¿ÉÍê³ÉÈ¥ÖØ

Èç¹ûÓжà¸ödistinct×Ö¶ÎÄØ£¬ÈçÏÂÃæµÄSQL
select dealid, count(distinct uid), count(distinct date) from order group by dealid; |
ʵÏÖ·½Ê½ÓÐÁ½ÖÖ£º
£¨1£©Èç¹ûÈÔÈ»°´ÕÕÉÏÃæÒ»¸ödistinct×ֶεķ½·¨£¬¼´ÏÂͼÕâÖÖʵÏÖ·½Ê½£¬ÎÞ·¨¸ú¾ÝuidºÍdate·Ö±ðÅÅÐò£¬Ò²¾ÍÎÞ·¨Í¨¹ýLastKeyÈ¥ÖØ£¬ÈÔÈ»ÐèÒªÔÚreduce½×¶ÎÔÚÄÚ´æÖÐͨ¹ýHashÈ¥ÖØ

£¨2£©µÚ¶þÖÖʵÏÖ·½Ê½£¬¿ÉÒÔ¶ÔËùÓеÄdistinct×ֶαàºÅ£¬Ã¿ÐÐÊý¾ÝÉú³ÉnÐÐÊý¾Ý£¬ÄÇôÏàͬ×ֶξͻá·Ö±ðÅÅÐò£¬ÕâʱֻÐèÒªÔÚreduce½×¶Î¼Ç¼LastKey¼´¿ÉÈ¥ÖØ¡£
ÕâÖÖʵÏÖ·½Ê½ºÜºÃµÄÀûÓÃÁËMapReduceµÄÅÅÐò£¬½ÚÊ¡ÁËreduce½×¶ÎÈ¥ÖØµÄÄÚ´æÏûºÄ£¬µ«ÊÇȱµãÊÇÔö¼ÓÁËshuffleµÄÊý¾ÝÁ¿¡£
ÐèҪעÒâµÄÊÇ£¬ÔÚÉú³Éreduce valueʱ£¬³ýµÚÒ»¸ödistinct×Ö¶ÎËùÔÚÐÐÐèÒª±£ÁôvalueÖµ£¬ÆäÓàdistinctÊý¾ÝÐÐvalue×ֶξù¿ÉΪ¿Õ¡£

SQLת»¯ÎªMapReduceµÄ¹ý³Ì
Á˽âÁËMapReduceʵÏÖSQL»ù±¾²Ù×÷Ö®ºó£¬ÎÒÃÇÀ´¿´¿´HiveÊÇÈçºÎ½«SQLת»¯ÎªMapReduceÈÎÎñµÄ£¬Õû¸ö±àÒë¹ý³Ì·ÖΪÁù¸ö½×¶Î£º
Antlr¶¨ÒåSQLµÄÓï·¨¹æÔò£¬Íê³ÉSQL´Ê·¨£¬Óï·¨½âÎö£¬½«SQLת»¯Îª³éÏóÓï·¨Ê÷AST
Tree
±éÀúAST Tree£¬³éÏó³ö²éѯµÄ»ù±¾×é³Éµ¥ÔªQueryBlock
±éÀúQueryBlock£¬·ÒëΪִÐвÙ×÷Ê÷OperatorTree
Âß¼²ãÓÅ»¯Æ÷½øÐÐOperatorTree±ä»»£¬ºÏ²¢²»±ØÒªµÄReduceSinkOperator£¬¼õÉÙshuffleÊý¾ÝÁ¿
±éÀúOperatorTree£¬·ÒëΪMapReduceÈÎÎñ
ÎïÀí²ãÓÅ»¯Æ÷½øÐÐMapReduceÈÎÎñµÄ±ä»»£¬Éú³É×îÖÕµÄÖ´Ðмƻ®
ÏÂÃæ·Ö±ð¶ÔÕâÁù¸ö½×¶Î½øÐнéÉÜ
Phase1 SQL´Ê·¨£¬Óï·¨½âÎö
Antlr
HiveʹÓÃAntlrʵÏÖSQLµÄ´Ê·¨ºÍÓï·¨½âÎö¡£AntlrÊÇÒ»ÖÖÓïÑÔʶ±ðµÄ¹¤¾ß£¬¿ÉÒÔÓÃÀ´¹¹ÔìÁìÓòÓïÑÔ¡£
ÕâÀï²»Ïêϸ½éÉÜAntlr£¬Ö»ÐèÒªÁ˽âʹÓÃAntlr¹¹ÔìÌØ¶¨µÄÓïÑÔÖ»ÐèÒª±àдһ¸öÓï·¨Îļþ£¬¶¨Òå´Ê·¨ºÍÓï·¨Ìæ»»¹æÔò¼´¿É£¬AntlrÍê³ÉÁË´Ê·¨·ÖÎö¡¢Óï·¨·ÖÎö¡¢ÓïÒå·ÖÎö¡¢Öмä´úÂëÉú³ÉµÄ¹ý³Ì¡£
HiveÖÐÓï·¨¹æÔòµÄ¶¨ÒåÎļþÔÚ0.10°æ±¾ÒÔǰÊÇHive.gÒ»¸öÎļþ£¬Ëæ×ÅÓï·¨¹æÔòÔ½À´Ô½¸´ÔÓ£¬ÓÉÓï·¨¹æÔòÉú³ÉµÄJava½âÎöÀà¿ÉÄܳ¬¹ýJavaÀàÎļþµÄ×î´óÉÏÏÞ£¬0.11°æ±¾½«Hive.g²ð³ÉÁË5¸öÎļþ£¬´Ê·¨¹æÔòHiveLexer.gºÍÓï·¨¹æÔòµÄ4¸öÎļþSelectClauseParser.g£¬FromClauseParser.g£¬IdentifiersParser.g£¬HiveParser.g¡£
³éÏóÓï·¨Ê÷AST Tree
¾¹ý´Ê·¨ºÍÓï·¨½âÎöºó£¬Èç¹ûÐèÒª¶Ô±í´ïʽ×ö½øÒ»²½µÄ´¦Àí£¬Ê¹Óà Antlr
µÄ³éÏóÓï·¨Ê÷Óï·¨Abstract Syntax Tree£¬ÔÚÓï·¨·ÖÎöµÄͬʱ½«ÊäÈëÓï¾äת»»³É³éÏóÓï·¨Ê÷£¬ºóÐøÔÚ±éÀúÓï·¨Ê÷ʱÍê³É½øÒ»²½µÄ´¦Àí¡£
ÏÂÃæµÄÒ»¶ÎÓï·¨ÊÇHive SQLÖÐSelectStatementµÄÓï·¨¹æÔò£¬´ÓÖпÉÒÔ¿´³ö£¬SelectStatement°üº¬select,
from, where, groupby, having, orderbyµÈ×Ӿ䡣
£¨ÔÚÏÂÃæµÄÓï·¨¹æÔòÖУ¬¼ýÍ·±íʾ¶ÔÓÚÔÓï¾äµÄ¸Äд£¬¸Äдºó»á¼ÓÈëÒ»Ð©ÌØÊâ´Ê±êÊ¾ÌØ¶¨Óï·¨£¬±ÈÈçTOK_QUERY±êʾһ¸ö²éѯ¿é£©
selectStatement : selectClause fromClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause? distributeByClause? sortByClause? limitClause? -> ^(TOK_QUERY fromClause ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE)) selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause? distributeByClause? sortByClause? limitClause?)) ; |
ÑùÀýSQL
ΪÁËÏêϸ˵Ã÷SQL·ÒëΪMapReduceµÄ¹ý³Ì£¬ÕâÀïÒÔÒ»Ìõ¼òµ¥µÄSQLΪÀý£¬SQLÖаüº¬Ò»¸ö×Ó²éѯ£¬×îÖÕ½«Êý¾ÝдÈëµ½Ò»ÕűíÖÐ
FROM ( SELECT p.datekey datekey, p.userid userid, c.clienttype FROM detail.usersequence_client c JOIN fact.orderpayment p ON p.orderid = c.orderid JOIN default.user du ON du.userid = p.userid WHERE p.datekey = 20131118 ) base INSERT OVERWRITE TABLE `test`.`customer_kpi` SELECT base.datekey, base.clienttype, count(distinct base.userid) buyer_count GROUP BY base.datekey, base.clienttype |
SQLÉú³ÉAST Tree
Antlr¶ÔHive SQL½âÎöµÄ´úÂëÈçÏ£¬HiveLexerX£¬HiveParser·Ö±ðÊÇAntlr¶ÔÓï·¨ÎļþHive.g±àÒëºó×Ô¶¯Éú³ÉµÄ´Ê·¨½âÎöºÍÓï·¨½âÎöÀ࣬ÔÚÕâÁ½¸öÀàÖнøÐи´ÔӵĽâÎö¡£
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command)); //´Ê·¨½âÎö£¬ºöÂԹؼü´ÊµÄ´óСд TokenRewriteStream tokens = new TokenRewriteStream(lexer); if (ctx != null) { ctx.setTokenRewriteStream(tokens); } HiveParser parser = new HiveParser(tokens); //Óï·¨½âÎö parser.setTreeAdaptor(adaptor); HiveParser.statement_return r = null; try { r = parser.statement(); //ת»¯ÎªAST Tree } catch (RecognitionException e) { e.printStackTrace(); throw new ParseException(parser.errors); } |
×îÖÕÉú³ÉµÄAST TreeÈçÏÂͼÓҲࣨʹÓÃAntlr WorksÉú³É£¬Antlr
WorksÊÇAntlrÌṩµÄ±àдÓï·¨ÎļþµÄ±à¼Æ÷£©£¬Í¼ÖÐÖ»ÊÇÕ¹¿ªÁ˹Ǽܵö½Úµã£¬Ã»ÓÐÍêȫչ¿ª¡£

ÕâÀï×¢ÒâÒ»ÏÂÄÚ²ã×Ó²éѯҲ»áÉú³ÉÒ»¸öTOK_DESTINATION½Úµã¡£Çë¿´ÉÏÃæSelectStatementµÄÓï·¨¹æÔò£¬Õâ¸ö½ÚµãÊÇÔÚÓï·¨¸ÄдÖÐÌØÒâÔö¼ÓÁ˵ÄÒ»¸ö½Úµã¡£ÔÒòÊÇHiveÖÐËùÓвéѯµÄÊý¾Ý¾ù»á±£´æÔÚHDFSÁÙʱµÄÎļþÖУ¬ÎÞÂÛÊÇÖмäµÄ×Ó²éѯ»¹ÊDzéѯ×îÖյĽá¹û£¬InsertÓï¾ä×îÖջὫÊý¾ÝдÈë±íËùÔÚµÄHDFSĿ¼Ï¡£
ÏêϸÀ´¿´£¬½«ÄÚ´æ×Ó²éѯµÄfrom×Ó¾äÕ¹¿ªºó£¬µÃµ½ÈçÏÂAST Tree£¬Ã¿¸ö±íÉú³ÉÒ»¸öTOK_TABREF½Úµã£¬JoinÌõ¼þÉú³ÉÒ»¸ö¡°=¡±½Úµã¡£ÆäËûSQL²¿·ÖÀàËÆ£¬²»Ò»Ò»ÏêÊö¡£

Phase2 SQL»ù±¾×é³Éµ¥ÔªQueryBlock
AST TreeÈÔÈ»·Ç³£¸´ÔÓ£¬²»¹»½á¹¹»¯£¬²»·½±ãÖ±½Ó·ÒëΪMapReduce³ÌÐò£¬AST
Treeת»¯ÎªQueryBlock¾ÍÊǽ«SQL½øÒ»²¿³éÏóºÍ½á¹¹»¯¡£
QueryBlock
QueryBlockÊÇÒ»ÌõSQL×î»ù±¾µÄ×é³Éµ¥Ôª£¬°üÀ¨Èý¸ö²¿·Ö£ºÊäÈëÔ´£¬¼ÆËã¹ý³Ì£¬Êä³ö¡£¼òµ¥À´½²Ò»¸öQueryBlock¾ÍÊÇÒ»¸ö×Ó²éѯ¡£
ÏÂͼΪHiveÖÐQueryBlockÏà¹Ø¶ÔÏóµÄÀàͼ£¬½âÊÍͼÖм¸¸öÖØÒªµÄÊôÐÔ
QB#aliasToSubq£¨±íʾQBÀàµÄaliasToSubqÊôÐÔ£©±£´æ×Ó²éѯµÄQB¶ÔÏó£¬aliasToSubq
keyÖµÊÇ×Ó²éѯµÄ±ðÃû
QB#qbp¼´QBParseInfo±£´æÒ»¸ö»ù±¾SQLµ¥ÔªÖеĸø¸ö²Ù×÷²¿·ÖµÄAST
Tree½á¹¹£¬QBParseInfo#nameToDestÕâ¸öHashMap±£´æ²éѯµ¥ÔªµÄÊä³ö£¬keyµÄÐÎʽÊÇinclause-i£¨ÓÉÓÚHiveÖ§³ÖMulti
InsertÓï¾ä£¬ËùÒÔ¿ÉÄÜÓжà¸öÊä³ö£©£¬valueÊǶÔÓ¦µÄASTNode½Úµã£¬¼´TOK_DESTINATION½Úµã¡£ÀàQBParseInfoÆäÓàHashMapÊôÐԷֱ𱣴æÊä³öºÍ¸÷¸ö²Ù×÷µÄASTNode½ÚµãµÄ¶ÔÓ¦¹ØÏµ¡£
QBParseInfo#JoinExpr±£´æTOK_JOIN½Úµã¡£QB#QBJoinTreeÊǶÔJoinÓï·¨Ê÷µÄ½á¹¹»¯¡£
QB#qbm±£´æÃ¿¸öÊäÈë±íµÄÔªÐÅÏ¢£¬±ÈÈç±íÔÚHDFSÉϵÄ·¾¶£¬±£´æ±íÊý¾ÝµÄÎļþ¸ñʽµÈ¡£
QBExprÕâ¸ö¶ÔÏóÊÇΪÁ˱íʾUnion²Ù×÷¡£

AST TreeÉú³ÉQueryBlock
AST TreeÉú³ÉQueryBlockµÄ¹ý³ÌÊÇÒ»¸öµÝ¹éµÄ¹ý³Ì£¬ÏÈÐò±éÀúAST
Tree£¬Óöµ½²»Í¬µÄToken½Úµã£¬±£´æµ½ÏàÓ¦µÄÊôÐÔÖУ¬Ö÷Òª°üº¬ÒÔϼ¸¸ö¹ý³Ì
TOK_QUERY => ´´½¨QB¶ÔÏó£¬Ñ»·µÝ¹é×Ó½Úµã
TOK_FROM => ½«±íÃûÓï·¨²¿·Ö±£´æµ½QB¶ÔÏóµÄaliasToTabsµÈÊôÐÔÖÐ
TOK_INSERT => Ñ»·µÝ¹é×Ó½Úµã
TOK_DESTINATION => ½«Êä³öÄ¿±êµÄÓï·¨²¿·Ö±£´æÔÚQBParseInfo¶ÔÏóµÄnameToDestÊôÐÔÖÐ
TOK_SELECT => ·Ö±ð½«²éѯ±í´ïʽµÄÓï·¨²¿·Ö±£´æÔÚdestToSelExpr¡¢destToAggregationExprs¡¢destToDistinctFuncExprsÈý¸öÊôÐÔÖÐ
TOK_WHERE => ½«Where²¿·ÖµÄÓï·¨±£´æÔÚQBParseInfo¶ÔÏóµÄdestToWhereExprÊôÐÔÖÐ
×îÖÕÑùÀýSQLÉú³ÉÁ½¸öQB¶ÔÏó£¬QB¶ÔÏóµÄ¹ØÏµÈçÏ£¬QB1ÊÇÍâ²ã²éѯ£¬QB2ÊÇ×Ó²éѯ
Phase3 Âß¼²Ù×÷·ûOperator
Operator
Hive×îÖÕÉú³ÉµÄMapReduceÈÎÎñ£¬Map½×¶ÎºÍReduce½×¶Î¾ùÓÉOperatorTree×é³É¡£Âß¼²Ù×÷·û£¬¾ÍÊÇÔÚMap½×¶Î»òÕßReduce½×¶ÎÍê³Éµ¥Ò»Ìض¨µÄ²Ù×÷¡£
»ù±¾µÄ²Ù×÷·û°üÀ¨TableScanOperator£¬SelectOperator£¬FilterOperator£¬JoinOperator£¬GroupByOperator£¬ReduceSinkOperator
´ÓÃû×Ö¾ÍÄܲ³ö¸÷¸ö²Ù×÷·ûÍê³ÉµÄ¹¦ÄÜ£¬TableScanOperator´ÓMapReduce¿ò¼ÜµÄMap½Ó¿ÚÔʼÊäÈë±íµÄÊý¾Ý£¬¿ØÖÆÉ¨Ãè±íµÄÊý¾ÝÐÐÊý£¬±ê¼ÇÊÇ´ÓÔ±íÖÐÈ¡Êý¾Ý¡£JoinOperatorÍê³ÉJoin²Ù×÷¡£FilterOperatorÍê³É¹ýÂ˲Ù×÷
ReduceSinkOperator½«Map¶ËµÄ×Ö¶Î×éºÏÐòÁл¯ÎªReduce Key/value, Partition
Key£¬Ö»¿ÉÄܳöÏÖÔÚMap½×¶Î£¬Í¬Ê±Ò²±êÖ¾×ÅHiveÉú³ÉµÄMapReduce³ÌÐòÖÐMap½×¶ÎµÄ½áÊø¡£
OperatorÔÚMap Reduce½×¶ÎÖ®¼äµÄÊý¾Ý´«µÝ¶¼ÊÇÒ»¸öÁ÷ʽµÄ¹ý³Ì¡£Ã¿Ò»¸öOperator¶ÔÒ»ÐÐÊý¾ÝÍê³É²Ù×÷ºóÖ®ºó½«Êý¾Ý´«µÝ¸øchildOperator¼ÆËã¡£
OperatorÀàµÄÖ÷ÒªÊôÐԺͷ½·¨ÈçÏÂ
RowSchema±íʾOperatorµÄÊä³ö×Ö¶Î
InputObjInspector outputObjInspector½âÎöÊäÈëºÍÊä³ö×Ö¶Î
processOp½ÓÊÕ¸¸Operator´«µÝµÄÊý¾Ý£¬forward½«´¦ÀíºÃµÄÊý¾Ý´«µÝ¸ø×ÓOperator´¦Àí
HiveÿһÐÐÊý¾Ý¾¹ýÒ»¸öOperator´¦ÀíÖ®ºó£¬»á¶Ô×Ö¶ÎÖØÐ±àºÅ£¬colExprMap¼Ç¼ÿ¸ö±í´ïʽ¾¹ýµ±Ç°Operator´¦ÀíǰºóµÄÃû³Æ¶ÔÓ¦¹ØÏµ£¬ÔÚÏÂÒ»¸ö½×¶ÎÂß¼ÓÅ»¯½×¶ÎÓÃÀ´»ØËÝ×Ö¶ÎÃû
ÓÉÓÚHiveµÄMapReduce³ÌÐòÊÇÒ»¸ö¶¯Ì¬µÄ³ÌÐò£¬¼´²»È·¶¨Ò»¸öMapReduce
Job»á½øÐÐʲôÔËË㣬¿ÉÄÜÊÇJoin£¬Ò²¿ÉÄÜÊÇGroupBy£¬ËùÒÔOperator½«ËùÓÐÔËÐÐʱÐèÒªµÄ²ÎÊý±£´æÔÚOperatorDescÖУ¬OperatorDescÔÚÌá½»ÈÎÎñǰÐòÁл¯µ½HDFSÉÏ£¬ÔÚMapReduceÈÎÎñÖ´ÐÐǰ´ÓHDFS¶ÁÈ¡²¢·´ÐòÁл¯¡£Map½×¶ÎOperatorTreeÔÚHDFSÉϵÄλÖÃÔÚJob.getConf(¡°hive.exec.plan¡±)
+ ¡°/map.xml¡±

QueryBlockÉú³ÉOperator Tree
QueryBlockÉú³ÉOperator Tree¾ÍÊDZéÀúÉÏÒ»¸ö¹ý³ÌÖÐÉú³ÉµÄQBºÍQBParseInfo¶ÔÏóµÄ±£´æÓï·¨µÄÊôÐÔ£¬°üº¬Èçϼ¸¸ö²½Ö裺
QB#aliasToSubq => ÓÐ×Ó²éѯ£¬µÝ¹éµ÷ÓÃ
QB#aliasToTabs => TableScanOperator
QBParseInfo#joinExpr => QBJoinTree
=> ReduceSinkOperator + JoinOperator
QBParseInfo#destToWhereExpr => FilterOperator
QBParseInfo#destToGroupby => ReduceSinkOperator
+ GroupByOperator
QBParseInfo#destToOrderby => ReduceSinkOperator
+ ExtractOperator
ÓÉÓÚJoin/GroupBy/OrderBy¾ùÐèÒªÔÚReduce½×¶ÎÍê³É£¬ËùÒÔÔÚÉú³ÉÏàÓ¦²Ù×÷µÄOperator֮ǰ¶¼»áÏÈÉú³ÉÒ»¸öReduceSinkOperator£¬½«×Ö¶Î×éºÏ²¢ÐòÁл¯ÎªReduce
Key/value, Partition Key
½ÓÏÂÀ´Ïêϸ·ÖÎöÑùÀýSQLÉú³ÉOperatorTreeµÄ¹ý³Ì
ÏÈÐò±éÀúÉÏÒ»¸ö½×¶ÎÉú³ÉµÄQB¶ÔÏó
Ê×Ïȸù¾Ý×ÓQueryBlock QB2#aliasToTabs {du=dim.user,
c=detail.usersequence_client, p=fact.orderpayment}Éú³ÉTableScanOperator
TableScanOperator(¡°dim.user¡±) TS[0] TableScanOperator(¡°detail.usersequence_client¡±) TS[1] TableScanOperator(¡°fact.orderpayment¡±) TS[2] |
ÏÈÐò±éÀúQBParseInfo#joinExprÉú³ÉQBJoinTree£¬ÀàQBJoinTreeÒ²ÊÇÒ»¸öÊ÷×´½á¹¹£¬QBJoinTree±£´æ×óÓÒ±íµÄASTNodeºÍÕâ¸ö²éѯµÄ±ðÃû£¬×îÖÕÉú³ÉµÄ²éѯÊ÷ÈçÏÂ
ǰÐò±éÀúQBJoinTree£¬ÏÈÉú³Édetail.usersequence_clientºÍfact.orderpaymentµÄJoin²Ù×÷Ê÷

ͼÖÐ TS=TableScanOperator
RS=ReduceSinkOperator JOIN=JoinOperator
Éú³ÉÖмä±íÓëdim.userµÄJoin²Ù×÷Ê÷

¸ù¾ÝQB2 QBParseInfo#destToWhereExpr Éú³ÉFilterOperator¡£´ËʱQB2±éÀúÍê³É¡£
ÏÂͼÖÐSelectOperatorÔÚijЩ³¡¾°Ï»á¸ù¾ÝһЩÌõ¼þÅжÏÊÇ·ñÐèÒª½âÎö×ֶΡ£

ͼÖÐ FIL= FilterOperator
SEL= SelectOperator
¸ù¾ÝQB1µÄQBParseInfo#destToGroupbyÉú³ÉReduceSinkOperator
+ GroupByOperator

ͼÖÐ GBY= GroupByOperator
GBY[12]ÊÇHASH¾ÛºÏ£¬¼´ÔÚÄÚ´æÖÐͨ¹ýHash½øÐоۺÏÔËËã
×îÖÕ¶¼½âÎöÍêºó£¬»áÉú³ÉÒ»¸öFileSinkOperator£¬½«Êý¾ÝдÈëHDFS

ͼÖÐFS=FileSinkOperator
Phase4 Âß¼²ãÓÅ»¯Æ÷
´ó²¿·ÖÂß¼²ãÓÅ»¯Æ÷ͨ¹ý±ä»»OperatorTree£¬ºÏ²¢²Ù×÷·û£¬´ïµ½¼õÉÙMapReduce
Job£¬¼õÉÙshuffleÊý¾ÝÁ¿µÄÄ¿µÄ¡£

±í¸ñÖТٵÄÓÅ»¯Æ÷¾ùÊÇÒ»¸öJob¸É¾¡¿ÉÄܶàµÄÊÂÇé/ºÏ²¢¡£¢ÚµÄ¶¼ÊǼõÉÙshuffleÊý¾ÝÁ¿£¬ÉõÖÁ²»×öReduce¡£
CorrelationOptimizerÓÅ»¯Æ÷·Ç³£¸´ÔÓ£¬¶¼ÄÜÀûÓòéѯÖеÄÏà¹ØÐÔ£¬ºÏ²¢ÓÐÏà¹ØÐÔµÄJob£¬²Î¿¼
Hive Correlation Optimizer
¶ÔÓÚÑùÀýSQL£¬ÓÐÁ½¸öÓÅ»¯Æ÷¶ÔÆä½øÐÐÓÅ»¯¡£ÏÂÃæ·Ö±ð½éÉÜÕâÁ½¸öÓÅ»¯Æ÷µÄ×÷Ó㬲¢²¹³äÒ»¸öÓÅ»¯Æ÷ReduceSinkDeDuplicationµÄ×÷ÓÃ
PredicatePushDownÓÅ»¯Æ÷
¶ÏÑÔÅжÏÌáǰÓÅ»¯Æ÷½«OperatorTreeÖеÄFilterOperatorÌáǰµ½TableScanOperatorÖ®ºó

NonBlockingOpDeDupProcÓÅ»¯Æ÷
NonBlockingOpDeDupProcÓÅ»¯Æ÷ºÏ²¢SEL-SEL »òÕß
FIL-FIL Ϊһ¸öOperator

ReduceSinkDeDuplicationÓÅ»¯Æ÷
ReduceSinkDeDuplication¿ÉÒԺϲ¢ÏßÐÔÏàÁ¬µÄÁ½¸öRS¡£Êµ¼ÊÉÏCorrelationOptimizerÊÇReduceSinkDeDuplicationµÄ³¬¼¯£¬Äܺϲ¢ÏßÐԺͷÇÏßÐԵIJÙ×÷RS£¬µ«ÊÇHiveÏÈʵÏÖµÄReduceSinkDeDuplication
Æ©ÈçÏÂÃæÕâÌõSQLÓï¾ä
from (select key, value from src group by key, value) s select s.key group by s.key; |
¾¹ýÇ°Ãæ¼¸¸ö½×¶ÎÖ®ºó£¬»áÉú³ÉÈçϵÄOperatorTree£¬Á½¸öTreeÊÇÏàÁ¬µÄ£¬ÕâÀïûÓлµ½Ò»Æð
Õâʱºò±éÀúOperatorTreeºóÄÜ·¢ÏÖǰǰºóÁ½¸öRSÊä³öµÄKeyÖµºÍPartitionKeyÈçÏÂ

ReduceSinkDeDuplicationÓÅ»¯Æ÷¼ì²âµ½£º1. pRS KeyÍêÈ«°üº¬cRS Key£¬ÇÒÅÅÐò˳ÐòÒ»Ö£»2.
pRS PartitionKeyÍêÈ«°üº¬cRS PartitionKey¡£·ûºÏÓÅ»¯Ìõ¼þ£¬»á¶ÔÖ´Ðмƻ®½øÐÐÓÅ»¯¡£
ReduceSinkDeDuplication½«childRSºÍparentheRSÓëchildRSÖ®¼äµÄOperatorɾµô£¬±£ÁôµÄRSµÄKeyΪkey,value×ֶΣ¬PartitionKeyΪkey×ֶΡ£ºÏ²¢ºóµÄOperatorTreeÈçÏ£º

Phase5 OperatorTreeÉú³ÉMapReduce JobµÄ¹ý³Ì
OperatorTreeת»¯ÎªMapReduce JobµÄ¹ý³Ì·ÖΪÏÂÃæ¼¸¸ö½×¶Î
¶ÔÊä³ö±íÉú³ÉMoveTask
´ÓOperatorTreeµÄÆäÖÐÒ»¸ö¸ù½ÚµãÏòÏÂÉî¶ÈÓÅÏȱéÀú
ReduceSinkOperator±êʾMap/ReduceµÄ½çÏÞ£¬¶à¸öJob¼äµÄ½çÏÞ
±éÀúÆäËû¸ù½Úµã£¬Óö¹ýÅöµ½JoinOperatorºÏ²¢MapReduceTask
Éú³ÉStatTask¸üÐÂÔªÊý¾Ý
¼ô¶ÏMapÓëReduce¼äµÄOperatorµÄ¹ØÏµ
¶ÔÊä³ö±íÉú³ÉMoveTask
ÓÉÉÏÒ»²½OperatorTreeÖ»Éú³ÉÁËÒ»¸öFileSinkOperator£¬Ö±½ÓÉú³ÉÒ»¸öMoveTask£¬Íê³É½«×îÖÕÉú³ÉµÄHDFSÁÙʱÎļþÒÆ¶¯µ½Ä¿±ê±íĿ¼ÏÂ
MoveTask[Stage-0] Move Operator |
¿ªÊ¼±éÀú
½«OperatorTreeÖеÄËùÓиù½Úµã±£´æÔÚÒ»¸ötoWalkµÄÊý×éÖУ¬Ñ»·È¡³öÊý×éÖеÄÔªËØ£¨Ê¡ÂÔQB1£¬Î´»³ö£©
È¡³ö×îºóÒ»¸öÔªËØTS[p]·ÅÈëÕ» opStack{TS[p]}ÖÐ
Rule #1 TS% Éú³ÉMapReduceTask¶ÔÏó£¬È·¶¨MapWork
·¢ÏÖÕ»ÖеÄÔªËØ·ûºÏÏÂÃæ¹æÔòR1£¨ÕâÀïÓÃpython´úÂë¼òµ¥±íʾ£©
"".join([t + "%" for t in opStack]) == "TS%" |
Éú³ÉÒ»¸öMapReduceTask[Stage-1]¶ÔÏó£¬MapReduceTask[Stage-1]¶ÔÏóµÄMapWorkÊôÐÔ±£´æOperator¸ù½ÚµãµÄÒýÓá£ÓÉÓÚOperatorTreeÖ®¼äÖ®¼äµÄParent
Child¹ØÏµ£¬Õâ¸öʱºòMapReduceTask[Stage-1]°üº¬ÁËÒÔTS[p]Ϊ¸ùµÄËùÓÐOperator

Rule #2 TS%.*RS% È·¶¨ReduceWork
¼ÌÐø±éÀúTS[p]µÄ×ÓOperator£¬½«×ÓOperator´æÈëÕ»opStackÖÐ
µ±µÚÒ»¸öRS½øÕ»ºó£¬¼´Õ»opStack = {TS[p], FIL[18],
RS[4]}ʱ£¬¾Í»áÂú×ãÏÂÃæµÄ¹æÔòR2
"".join([t + "%" for t in opStack]) == "TS%.*RS%" |
ÕâʱºòÔÚMapReduceTask[Stage-1]¶ÔÏóµÄReduceWorkÊôÐÔ±£´æJOIN[5]µÄÒýÓÃ

Rule #3 RS%.*RS% Éú³ÉÐÂMapReduceTask¶ÔÏó£¬ÇзÖMapReduceTask
¼ÌÐø±éÀúJOIN[5]µÄ×ÓOperator£¬½«×ÓOperator´æÈëÕ»opStackÖÐ
µ±µÚ¶þ¸öRS·ÅÈëջʱ£¬¼´µ±Õ»opStack = {TS[p], FIL[18], RS[4], JOIN[5],
RS[6]}ʱ£¬¾Í»áÂú×ãÏÂÃæµÄ¹æÔòR3
"".join([t + "%" for t in opStack]) == ¡°RS%.*RS%¡± //Ñ»·±éÀúopStackµÄÿһ¸öºó׺Êý×é |
Õâʱºò´´½¨Ò»¸öеÄMapReduceTask[Stage-2]¶ÔÏ󣬽«OperatorTree´ÓJOIN[5]ºÍRS[6]Ö®¼ä¼ô¿ª£¬²¢ÎªJOIN[5]Éú³ÉÒ»¸ö×ÓOperator
FS[19]£¬RS[6]Éú³ÉÒ»¸öTS[20]£¬MapReduceTask[Stage-2]¶ÔÏóµÄMapWorkÊôÐÔ±£´æTS[20]µÄÒýÓá£
ÐÂÉú³ÉµÄFS[19]½«ÖмäÊý¾ÝÂ䵨£¬´æ´¢ÔÚHDFSÁÙʱÎļþÖС£

¼ÌÐø±éÀúRS[6]µÄ×ÓOperator£¬½«×ÓOperator´æÈëÕ»opStackÖÐ
µ±opStack = {TS[p], FIL[18], RS[4], JOIN[5], RS[6],
JOIN[8], SEL[10], GBY[12], RS[13]}ʱ£¬ÓÖ»áÂú×ãR3¹æÔò
ͬÀíÉú³ÉMapReduceTask[Stage-3]¶ÔÏ󣬲¢Çпª Stage-2 ºÍ Stage-3
µÄOperatorTree

R4 FS% Á¬½ÓMapReduceTaskÓëMoveTask
×îÖÕ½«ËùÓÐ×ÓOperator´æÈëÕ»ÖÐÖ®ºó£¬opStack = {TS[p],
FIL[18], RS[4], JOIN[5], RS[6], JOIN[8], SEL[10], GBY[12],
RS[13], GBY[14], SEL[15], FS[17]} Âú×ã¹æÔòR4
"".join([t + "%" for t in opStack]) == ¡°FS%¡± |
Õâʱºò½«MoveTaskÓëMapReduceTask[Stage-3]Á¬½ÓÆðÀ´£¬²¢Éú³ÉÒ»¸öStatsTask£¬Ð޸ıíµÄÔªÐÅÏ¢

ºÏ²¢Stage
´Ëʱ²¢Ã»ÓнáÊø£¬»¹ÓÐÁ½¸ö¸ù½ÚµãûÓбéÀú¡£
½«opStackÕ»Çå¿Õ£¬½«toWalkµÄµÚ¶þ¸öÔªËØ¼ÓÈëÕ»¡£»á·¢ÏÖopStack = {TS[du]}¼ÌÐøÂú×ãR1
TS%£¬Éú³ÉMapReduceTask[Stage-5]

¼ÌÐø´ÓTS[du]ÏòϱéÀú£¬µ±opStack={TS[du], RS[7]}ʱ£¬Âú×ã¹æÔòR2 TS%.*RS%
´Ëʱ½«JOIN[8]±£´æÎªMapReduceTask[Stage-5]µÄReduceWorkʱ£¬·¢ÏÖÔÚÒ»¸öMap¶ÔÏó±£´æµÄOperatorÓëMapReduceWork¶ÔÏó¹ØÏµµÄMap<Operator,
MapReduceWork>¶ÔÏóÖз¢ÏÖ£¬JOIN[8]ÒѾ´æÔÚ¡£´Ëʱ½«MapReduceTask[Stage-2]ºÍMapReduceTask[Stage-5]ºÏ²¢ÎªÒ»¸öMapReduceTask

ͬÀí´Ó×îºóÒ»¸ö¸ù½ÚµãTS[c]¿ªÊ¼±éÀú£¬Ò²»á¶ÔMapReduceTask½øÐкϲ¢

ÇзÖMap Reduce½×¶Î
×îºóÒ»¸ö½×¶Î£¬½«MapWorkºÍReduceWorkÖеÄOperatorTreeÒÔRSΪ½çÏÞ¼ô¿ª

OperatorTreeÉú³ÉMapReduceTaskȫò
×îÖÕ¹²Éú³É3¸öMapReduceTask£¬ÈçÏÂͼ
Phase6 ÎïÀí²ãÓÅ»¯Æ÷
ÕâÀï²»Ïêϸ½éÉÜÿ¸öÓÅ»¯Æ÷µÄÔÀí£¬µ¥¶À½éÉÜÒ»ÏÂMapJoinµÄÓÅ»¯Æ÷

mapjoinÔÀí

MapJoin¼òµ¥Ëµ¾ÍÊÇÔÚMap½×¶Î½«Ð¡±í¶ÁÈëÄڴ棬˳ÐòɨÃè´ó±íÍê³ÉJoin¡£
ÉÏͼÊÇHive MapJoinµÄÔÀíͼ£¬³ö×ÔFacebook¹¤³ÌʦLiyin TangµÄһƪ½éÉÜJoinÓÅ»¯µÄslice£¬´ÓͼÖпÉÒÔ¿´³öMapJoin·ÖΪÁ½¸ö½×¶Î£º
ͨ¹ýMapReduce Local Task£¬½«Ð¡±í¶ÁÈëÄڴ棬Éú³ÉHashTableFilesÉÏ´«ÖÁDistributed
CacheÖУ¬ÕâÀï»á¶ÔHashTableFiles½øÐÐѹËõ¡£
MapReduce JobÔÚMap½×¶Î£¬Ã¿¸öMapper´ÓDistributed
Cache¶ÁÈ¡HashTableFilesµ½ÄÚ´æÖУ¬Ë³ÐòɨÃè´ó±í£¬ÔÚMap½×¶ÎÖ±½Ó½øÐÐJoin£¬½«Êý¾Ý´«µÝ¸øÏÂÒ»¸öMapReduceÈÎÎñ¡£

Èç¹ûJoinµÄÁ½ÕűíÒ»ÕűíÊÇÁÙʱ±í£¬¾Í»áÉú³ÉÒ»¸öConditionalTask£¬ÔÚÔËÐÐÆÚ¼äÅжÏÊÇ·ñʹÓÃMapJoin
CommonJoinResolverÓÅ»¯Æ÷
CommonJoinResolverÓÅ»¯Æ÷¾ÍÊǽ«CommonJoinת»¯ÎªMapJoin£¬×ª»¯¹ý³ÌÈçÏÂ
Éî¶ÈÓÅÏȱéÀúTask Tree
ÕÒµ½JoinOperator£¬ÅжÏ×óÓÒ±íÊý¾ÝÁ¿´óС
¶ÔÓëС±í + ´ó±í => MapJoinTask£¬¶ÔÓÚС/´ó±í +
Öмä±í => ConditionalTask
±éÀúÉÏÒ»¸ö½×¶ÎÉú³ÉµÄMapReduceÈÎÎñ£¬·¢ÏÖMapReduceTask[Stage-2]
JOIN[8]ÖÐÓÐÒ»ÕűíΪÁÙʱ±í£¬ÏȶÔStage-2½øÐÐÉî¶È¿½±´£¨ÓÉÓÚÐèÒª±£ÁôÔʼִÐмƻ®ÎªBackup
Plan£¬ËùÒÔÕâÀォִÐмƻ®¿½±´ÁËÒ»·Ý£©£¬Éú³ÉÒ»¸öMapJoinOperatorÌæ´úJoinOperator£¬È»ºóÉú³ÉÒ»¸öMapReduceLocalWork¶ÁȡС±íÉú³ÉHashTableFilesÉÏ´«ÖÁDistributedCacheÖС£
mapjoin񄯯

MapReduceTask¾¹ý±ä»»ºóµÄÖ´Ðмƻ®ÈçÏÂͼËùʾ

MapJoinResolverÓÅ»¯Æ÷
MapJoinResolverÓÅ»¯Æ÷±éÀúTask Tree£¬½«ËùÓÐÓÐlocal
workµÄMapReduceTask²ð³ÉÁ½¸öTask

MapJoinResolver

×îÖÕMapJoinResolver´¦ÀíÍêÖ®ºó£¬Ö´Ðмƻ®ÈçÏÂͼËùʾ

Hive SQL±àÒë¹ý³ÌµÄÉè¼Æ
´ÓÉÏÊöÕû¸öSQL±àÒëµÄ¹ý³Ì£¬¿ÉÒÔ¿´³ö±àÒë¹ý³ÌµÄÉè¼ÆÓм¸¸öÓŵãÖµµÃѧϰºÍ½è¼ø
ʹÓÃAntlr¿ªÔ´Èí¼þ¶¨ÒåÓï·¨¹æÔò£¬´ó´ó¼ò»¯ÁË´Ê·¨ºÍÓï·¨µÄ±àÒë½âÎö¹ý³Ì£¬½ö½öÐèҪά»¤Ò»·ÝÓï·¨Îļþ¼´¿É¡£
ÕûÌå˼·ºÜÇåÎú£¬·Ö½×¶ÎµÄÉè¼ÆÊ¹Õû¸ö±àÒë¹ý³Ì´úÂëÈÝÒ×ά»¤£¬Ê¹µÃºóÐø¸÷ÖÖÓÅ»¯Æ÷·½±ãµÄÒԿɲå°ÎµÄ·½Ê½¿ª¹Ø£¬Æ©ÈçHive
0.13×îеÄÌØÐÔVectorizationºÍ¶ÔTezÒýÇæµÄÖ§³Ö¶¼Êǿɲå°ÎµÄ¡£
ÿ¸öOperatorÖ»Íê³Éµ¥Ò»µÄ¹¦ÄÜ£¬¼ò»¯ÁËÕû¸öMapReduce³ÌÐò¡£
ÉçÇø·¢Õ¹·½Ïò
HiveÒÀÈ»ÔÚѸËٵķ¢Õ¹ÖУ¬ÎªÁËÌáÉýHiveµÄÐÔÄÜ£¬hortonworks¹«Ë¾Ö÷µ¼µÄStinger¼Æ»®Ìá³öÁËһϵÁжÔHiveµÄ¸Ä½ø£¬±È½ÏÖØÒªµÄ¸Ä½øÓУº
Vectorization - ʹHive´Óµ¥Ðе¥Ðд¦ÀíÊý¾Ý¸ÄΪÅúÁ¿´¦Àí·½Ê½£¬´ó´óÌáÉýÁËÖ¸ÁîÁ÷Ë®Ïߺͻº´æµÄÀûÓÃÂÊ
Hive on Tez - ½«Hiveµ×²ãµÄMapReduce¼ÆËã¿ò¼ÜÌæ»»ÎªTez¼ÆËã¿ò¼Ü¡£Tez²»½ö¿ÉÒÔÖ§³Ö¶àReduce½×¶ÎµÄÈÎÎñMRR£¬»¹¿ÉÒÔÒ»´ÎÐÔÌá½»Ö´Ðмƻ®£¬Òò¶øÄܸüºÃµÄ·ÖÅä×ÊÔ´¡£
Cost Based Optimizer - ʹHiveÄܹ»×Ô¶¯Ñ¡Ôñ×îÓŵÄJoin˳Ðò£¬Ìá¸ß²éѯËÙ¶È
Implement insert, update, and delete
in Hive with full ACID support - Ö§³Ö±í°´Ö÷¼üµÄÔöÁ¿¸üÐÂ
ÎÒÃÇÒ²½«¸ú½øÉçÇøµÄ·¢Õ¹£¬½áºÏ×ÔÉíµÄÒµÎñÐèÒª£¬ÌáÉýHiveÐÍETLÁ÷³ÌµÄÐÔÄÜ
|