ÏàÐźܶà×öº£Á¿Êý¾Ý´¦ÀíºÍ´óÊý¾Ý¼¼ÊõÑз¢µÄÅóÓѶÔImpalaÕâ¸ö»ùÓÚHadoopµÄ½»»¥Ê½MPPÒýÇæ¶¼²»Ä°Éú£¬ÓÈÆä¶ÔImpala³öÉ«µÄÊý¾Ý´¦ÀíÄÜÁ¦Ó¡ÏóÉî¿Ì¡£ÔÚ²éѯִÐеÄÕû¸öÉúÃüÖÜÆÚÄÚ£¬ImpalaÖ÷Ҫͨ¹ýFrontendÉú³ÉÓÅ»¯µÄ²éѯ¼Æ»®£¬BackendÖ´ÐÐÔËÐÐʱ´úÂëÉú³ÉÀ´ÓÅ»¯²éѯЧÂÊ¡£ÔÚ¿Í»§¶ËµÄÒ»¸öSQL²éѯÏ·¢µ½ImpalaServerºó£¬Frontend»áÔÚÉú³É²éѯ¼Æ»®µÄ¹ý³ÌÖУ¬ÊÕ¼¯±ØÒªµÄͳ¼ÆÐÅÏ¢£¬×÷ΪBackend·Ö²¼Ê½Ö´ÐеÄÒÀ¾Ý¡£ÕâЩÐÅÏ¢Ö÷Òª°üÀ¨£º±í½á¹¹¡¢·ÖÇøÍ³¼Æ¡¢SQLÓï¾äµÄ±í´ïʽ¼¯ºÏ£¬ÒÔ¼°Ö´Ðмƻ®·ÖƬÃèÊöµÈ¡£ÕâЩÐÅÏ¢µÄ¼¯ºÏÔÚImpalaÖб»³ÆÎªTQueryExecRequest¡£Í¨¹ýÃû×Ö¿ÉÒÔ¿´³ö£¬ÕâÊǸöThrift½á¹¹£¬ÓÉFrontend·â×°£¬Í¨¹ýImpala·þÎñ¿ªÆôµÄThriftServer·¢Ë͵½ºó¶ËµÄCoordinator¡£¿ÉÒÔ˵£¬TQueryExecRequest½á¹¹¾ÍÊÇImpalaÖ´ÐвéѯµÄÐÅÏ¢²Ö¿â£¬ÊìϤÕâ¸ö½á¹¹ÓÐÖúÓÚÀí½âÕû¸öImpalaµÄ·Ö²¼Ê½¼Ü¹¹ÊµÏÖ¡£
TQueryExecRequest½á¹¹×é³É
TQyeryExecRequest½á¹¹ÖУ¬Ö÷Òª°üº¬ÈçϳÉÔ±£º
TDescriptorTable desc_tbl vector<tplanfragment> fragments vector<int32_t> dest_fragment_idx map<tplannodeid, vector<tscanrangelocations=""> > per_node_scan_ranges TResultSetMetadata result_set_metadata TFinalizeParams finalize_params TQueryCtx query_ctx string query_plan TStmtType::type stmt_type int64_t per_host_mem_req int64_t per_host_vcore vector<tnetworkaddress> host_list string lineage_graph</tnetworkaddress></tplannodeid,></int32_t></tplanfragment> |
ÃèÊö·û±í
ÃèÊö·û±íµÄÀàÐÍΪTDescriptorTable£¬°üº¬ºÍ²éѯ½á¹ûÏà¹ØµÄtable¡¢tupleÒÔ¼°slotÃèÊöÐÅÏ¢¡£
ÕâÀïÐèҪ˵Ã÷µÄÊÇ£¬ÎÞÂÛÌá½»µÄSQL²éѯÓжิÔÓ£¬°üÀ¨Êý¾Ý¹ýÂË¡¢¾ÛºÏ¡¢JOIN£¬¶¼ÊÇÔÚ¶ÔHDFS»òHBase£¨Ëù²éѯ·ÖÇøÄڵģ©È«Á¿Êý¾ÝɨÃèµÄ»ù´¡ÉϽøÐеġ£Êý¾ÝɨÃè½á¹û¼¯µÄÿһÐж¼ÊÇÒ»¸ötuple£¬tupleÖеÄÿ¸ö×ֶζ¼ÊÇÒ»¸öslot¡£ÃèÊö·û±íÓÐÈçϳÉÔ±£º
vector<tslotdescriptor> slotDescriptors vector<ttupledescriptor> tupleDescriptors vector<ttabledescriptor> tableDescriptors |
TupleÃèÊö·û
ÔÚImpalaÖУ¬Ò»¸ö²éѯ·µ»Ø½á¹û¼¯ÖеÄÿһÐнÐ×öÒ»¸ötuple£¬¶ÔtupleµÄÃèÊöÐÅÏ¢´æ·ÅÔÚtupleÃèÊö·û½á¹¹ÖС£tupleÃèÊö·ûÓÐÈçϳÉÔ±£º
TTupleId id; int32_t byteSize; int32_t numNullBytes; TTableId tableId; |
- idÊDZíÖеÄtuple id£¬½ØÖ¹µ½Impala2.2°æ±¾£¬Ò»¸ötupleRowÖÐÖ»»áÓÐÒ»¸ötuple£¬Òò´Ëid×ÜÊÇ0¡£
- byteSizeÊÇÒ»¸ötupleÖÐËùÓÐslot´óСµÄºÍ£¬ÔÙ¼ÓÉÏnullIndicatorÕ¼ÓõÄ×Ö½ÚÊý¡£
- numNullBytesÖ¸µÄÊÇ£¬Èç¹ûtupleÖеÄÒ»¸öslot¿ÉÒÔΪNULL£¬ÔòÐèÒªÕ¼ÓÃtuple 1¸öbit´óСµÄ¿Õ¼ä£¬numNullBytesµÈÓÚ(N + 7) / 8£¬NÊÇ¿ÉÄÜΪNULLµÄslot¸öÊý¡£ÀýÈçÔÚÒ»¸ötupleÖУ¬¿ÉÄÜΪNULLµÄslotÊýΪ0£¬ÔònumNullBytes=0£¬Ò²¾ÍÊDz»ÐèÒª¶îÍâµÄ¿Õ¼äÈ¥´æ´¢null indicatorµÄÐÅÏ¢£»Èç¹û¿ÉÒÔΪNULLµÄslotÊýΪ1-8£¬ÔònumNullBytesΪ1¡£
- tableIdÊÇtupleËùÊôµÄtable id¡£
SlotÃèÊö·û
slotÃèÊöÐÅÏ¢´æ·ÅÔÚTSlotDescriptor½á¹¹ÖУ¬Ö÷ÒªÓÃÀ´´æ·ÅËù²éѯ±íÖÐij¸ö×ֶεÄÏà¹ØÐÅÏ¢£¬³ÉÔ±°üÀ¨£º
TSlotId id TTupleId parent TColumnType slotType vector<int32_t> columnPath int32_t byteOffset int32_t nullIndicatorByte int32_t nullIndicatorBit int32_t slotIdx bool isMaterialized</int32_t> |
idÔÚÒ»¸ötupleÖÐΨһ±êʶһ¸öslot¡£
parent´ú±íslotËùÔÚµÄtuple id¡£
slotTypeÊÇslotËùÔÚµÄcolumnÀàÐÍ¡£¶ÔÓÚParquetǶÌ×Îļþ¸ñʽÀ´Ëµ£¬Ò»¸öTColumnType¿ÉÄÜÓжà¸öTTypeNode¡£ÀýÈ磬һ¸öSTRUCTÀàÐÍ¿ÉÄܰüº¬¶à¸öSCALARÀàÐÍ¡£Ò»¸öTTypeNodeµÄÀàÐÍ¿ÉÄÜÊDZêÁ¿µÄ£¨È·¶¨µÄÄÚ½¨ÀàÐÍ£©¡¢Êý×飨ARRAY£©¡¢Ó³É䣨MAP£©»òÕ߽ṹ£¨STRUCT£©¡£Ä¿Ç°ImpalaµÄ2.3°æ±¾ÒѾ֧³ÖARRAY¡¢MAPºÍSTRUCTÕâÈýÖÖ¸´ÔÓÀàÐÍ£¨½öÏÞParquetÎļþ¸ñʽ£©£¬Õâ¸öÐÂÌØÐÔÓ¦¸Ã»áʹ¸ü¶àµÄÈËתÏòImpalaÕóÓª¡£TColumnTypeµÄ½á¹¹Èçͼ1Ëùʾ¡£

ͼ1 TColumnType½á¹¹Í¼
columnPathÊÇÒ»¸öÕûÐͼ¯ºÏ£¬µ«ÊÇÔÚǰ¶ËÖ»»áÌî³äÒ»¸öÔªËØ£¬ÒòΪһ¸öslotÖ»¶ÔÓ¦Ò»¸öcolumnPath¡£columnPath´ú±íÒ»¸öslotÔÚ±íÖеÄλÖã¬Èçcreate table t (id int, name string)£¬ÔòidµÄcolumnPathΪ0£¬nameµÄcolumnPathΪ1¡£¶ÔÓÚ·ÖÇø±í£¬partition key×ֶεÄcolumnPath»áÅÅÔÚÇ°Ãæ¡£Èçcreate table t£¨id int¡¢name string¡¢calling string£©partitioned by (date string, phone int)£¬ÔòdateµÄcolumnPathΪ0£¬ phoneµÄcolumnPathΪ1£¬ idµÄcolumnPathΪ2£¬nameµÄcolumnPathΪ3£¬callingµÄcolumnPathΪ4¡£ÔÚ²éѯʱ£¬columnPathµÄ˳ÐòÊǰ´ÕÕcolumnÔÚÌá½»µÄSQLÓï¾äÖгöÏÖµÄ˳ÐòÅÅÁеģ¬Èçselect name from t where date = ¡®2016-01-01¡¯ and calling = ¡®123¡¯ and phone = abs(fnv_hash(¡®123¡¯)) % 10Õâ¸öSQL£¬columnPathµÄ˳ÐòΪ3, 0, 4, 1¡£
byteOffsetÊÇslotÔÚtupleÖÐµÄÆ«ÒÆ£¬µ¥Î»Îª×Ö½Ú¡£
nullIndicatorByte±íÃ÷µ±Ç°slotΪNULLʱ£¬ÔÚtupleµÄÄĸö×Ö½ÚÖС£
nullIndicatorBit±íÃ÷µ±Ç°slotΪNULLʱ£¬ÔÚµÚnullIndicatorByte¸ö×Ö½ÚµÄÄĸöbitÉÏ¡£
slotIdxÊÇslotÔÚtupleÖеÄÐòºÅ¡£
isMaterialized±íÃ÷µ±Ç°slotÊÇ·ñ±»Îﻯ¡£¶ÔÓÚpartition key£¬Ò²¾ÍÊÇclustering columnÀ´Ëµ£¬isMaterializedΪfalse£¬Ò²¾ÍÊÇpartition key²»»á±»Îﻯ¡£
TableÃèÊö·û
TableÃèÊö·û°üº¬ºÍ²éѯÏà¹Ø±íµÄÐÅÏ¢£¬°üÀ¨±í×ֶΡ¢ÀàÐÍ¡¢·ÖÇøµÈÐÅÏ¢¡£³ÉÔ±°üÀ¨£º
TSlotId id TTupleId parent TColumnType slotType vector<int32_t> columnPath int32_t byteOffset int32_t nullIndicatorByte int32_t nullIndicatorBit int32_t slotIdx bool isMaterialized</int32_t> |
ÔÚÒ»¸ö²éѯÖУ¬id×Ö¶ÎΨһ±êʶһ¸ö±í¡£
tableType±íÃ÷±íµÄÀàÐÍ£¬TTableTypeΪö¾ÙÀàÐÍ£¬°üÀ¨HDFS_TABLE = 0£¬HBASE_TABLE = 1£¬VIEW = 2£¬DATA_SOURCE_TABLE = 3¡£
numCols´ú±ítableÖÐcolumnµÄ¸öÊý¡£
numClusteringCols´ú±ítableÖÐclustering columnµÄ¸öÊý£¬Ò²¾ÍÊÇparititonµÄ¸öÊý¡£
colNames´ú±ítableÖÐËùÓÐcolumnµÄÃû³ÆµÄ¼¯ºÏ¡£
THdfsTableÖеķÖÇøÐÅÏ¢
THdfsTable½á¹¹Öаüº¬µ±Ç°tableºÍHDFSÏà¹ØµÄËùÓÐÐÅÏ¢¡£¹ØÓÚhdfs tableÖеÄpartitionÐÅÏ¢£¬ÓÐÈçÏÂ˵Ã÷£º
partition keyµÄÀàÐÍÖ»ÄÜÊDZêÁ¿µÄ£¬Èçint¡¢float¡¢string¡¢decimal¡¢timestamp¡£
²»Í¬µÄpartition¿ÉÒÔÓв»Í¬µÄµÄÎļþ¸ñʽ£¬Óû§¿ÉÒÔÔÚÒ»¸ö±íÖУ¬Ôö¼Ó¡¢É¾³ý·ÖÇø£¬Îª·ÖÇøÉèÖÃÌØ¶¨µÄÎļþ¸ñʽ£º
[localhost:21000] > create table census (name string) partitioned by (year smallint); [localhost:21000] > alter table census add partition (year=2012); -- text format [localhost:21000] > alter table census add partition (year=2013); -- parquet format [localhost:21000] > alter table census partition (year=2013) set fileformat parquet; |
THdfsTable½á¹¹ÖеÄpartitionsÀàÐÍΪmap£¬Õâ¸ö×ֶβ»ÄÜΪ¿Õ£¬¼´Ê¹Ã»ÓÐΪ±íÖ¸¶¨·ÖÇø£¬Ò²»áÓÐÒ»¸öĬÈϵÄpartition¡£
partitionsµÄSizeΪ±íÖÐpartitionµÄ×ÜÊý¡£ÀýÈ磬һ¸ö±í°´month¡¢dayÒÔ¼°postcode·ÖÇø£¬ÓÐ12¸ömonth£¬Ã¿¸ömonthÖÐ30¸öday£¬Ã¿¸ödayÖÐ100¸öpostcode£¬ÔòpartitionÊýΪ12 * 30 * 100=36000£¨ÈçÉÏÃæµÄÀý×ÓÒ»¸ö±í°´month¡¢dayÒÔ¼°postcode·ÖÇø£¬monthΪstringÀàÐÍ£¬dayΪintÀàÐÍ£¬postcodeΪbigintÀàÐÍ£¬ÔòpartitionKeyExprsÖеÄÈý¸ö³ÉÔ±£¨TExpr£©ÖеÄnodesµÄΨһһ¸öÔªËØ£¨TExprNode£©µÄnode_type·Ö±ðΪSTRING_LITERAL(11)¡¢INT_LITERAL(4)ºÍINT_LITERAL(4)£©¡£
THdfsPartition½á¹¹ÖеÄpartitionKeyExprsÀàÐÍΪTExprµÄ¼¯ºÏ£¬Ã¿¸öparititon keyµÄÐÅÏ¢ÓÉÒ»¸öTExprÃèÊö¡£ÕâÀïµÄTExprµÄÀàÐÍÊDZêÁ¿ÀàÐÍ£¨partition keyÖ»ÄÜÊDZêÁ¿ÀàÐÍ£©µÄ×ÖÃæÖµ£¬»ùÀàΪLiteralExpr£¬¸ù¾Ý²»Í¬µÄpartition keyÀàÐÍ£¬¿ÉÄÜÊÇStringLiteral¡¢IntLiteralµÈ¡£
TExprµÄ³ÉÔ±ÀàÐÍÊÇTExprNodeµÄ¼¯ºÏ£¬ÓÉÓÚpartition keyµÄÀàÐÍΪLiteralExpr£¬ËùÒÔÕâÀïµÄTExprNode¼¯ºÏÖÐÖ»»áÓÐÒ»¸ö³ÉÔ±£¬ÒòΪÔÚExprÊ÷ÖУ¬LiteralExpr½Úµã²»»áÔÙÓк¢×ÓExpr¡£
THdfsPartition½á¹¹ÖеÄfile_desc³ÉÔ±ÀàÐÍΪTHdfsFileDescµÄ¼¯ºÏ£¬Õâ±íÃ÷Ò»¸ö·ÖÇøÏ¿ÉÄÜ»áÓжà¸öÎļþ¡£THdfsFileDesc½á¹¹ÖеÄfile_blocks³ÉÔ±ÀàÐÍΪTHdfsFileBlockµÄ¼¯ºÏ£¬Õâ±íÃ÷Ò»¸öÎļþ¿ÉÄÜÓɶà¸öblock×é³É£¬ÔÚTHdfsFileBlockÖÐÖ¸¶¨ÁËblockµÄ´óС¡¢ÔÚÎļþÖÐµÄÆ«ÒÆÁ¿µÈ¡£
THdfsTable½á¹¹Èçͼ2Ëùʾ¡£

ͼ2 THdfsTable½á¹¹Í¼
Ö´Ðмƻ®·ÖƬ
Ò»¸ö²éѯÇëÇóÌá½»¸øImpalaServerÖ®ºó£¬»áÔÚºó¶Ëµ÷ÓÃJNI³õʼ»¯Ò»¸öǰ¶ËµÄFrontendʵÀý£¬ÓÉÕâ¸öʵÀý¶ÔÌá½»µÄSQLÓï¾ä×öÓï·¨·ÖÎö£¬ÕÒ³öºÍ²éѯÏà¹ØµÄtable¡¢scanListÒÔ¼°exprÐÅÏ¢£¬Í¨¹ýÕâЩÐÅÏ¢¹¹ÔìÖ´Ðнڵ㣨ÈçHdfsScanNode¡¢AggreagtionNode¡¢HashJoinNode£©£¬¸ù¾Ý½ÚµãÀàÐÍÆÀ¹À½ÚµãΪ·Ö²¼Ê½Ö´Ðл¹ÊDZ¾µØÖ´ÐУ¬Ö´Ðнڵã×é³ÉÖ´Ðмƻ®·ÖƬ£¬×îÖÕ¹¹Ôì³öÕû¸öÖ´Ðмƻ®¡£
ÀýÈ磬¿Í»§¶ËÌá½»ÁËÒ»¸öSQLÇëÇóselect date, count(user) from t group by date£¬Í¨¹ýǰ¶ËÓï·¨·ÖÎö£¬¿ÉÒԵõ½ÈçÏÂÐÅÏ¢£º
-
scanListÓÉdateºÍuser×Ö¶Î×é³É¡£
-
groupingExprÊÇÒ»¸öSLOT_REFÀàÐ͵ıí´ïʽ£¨date×ֶΣ©¡£
-
aggregateExprÊÇÒ»¸öAGGREGATE_EXPRÀàÐ͵ıí´ïʽ£¨count(STRING)£©£¬ËüµÄº¢×ÓÊÇÒ»¸öSLOT_REFÀàÐ͵ıí´ïʽ£¨user×ֶΣ©¡£
¸ù¾ÝÕâЩÐÅÏ¢¿ÉÒԵõ½Èçͼ3ËùʾµÄÖ´Ðмƻ®¡£

ͼ3 FrontendÉú³ÉµÄÖ´Ðмƻ®
ÏÂÃæ·ÖÎöTQueryExecRequestÖеÄÖ´Ðмƻ®·ÖƬ£¨fragments£©½á¹¹¡£fragmentsÊÇÒ»¸öTPlanFragmentÀàÐ͵ļ¯ºÏ£¬Ò»¸öTPlanFragmentÖаüº¬ºÍÒ»¸öÖ´Ðмƻ®·ÖƬÏà¹ØµÄËùÓÐÐÅÏ¢¡£TPlanFragment½á¹¹µÄ³ÉÔ±ÈçÏ£º
string display_name TPlan plan vector<texpr> output_exprs TDataSink output_sink TDataPartition partition |
Ö´Ðмƻ®·ÖƬÖеÄÖ´ÐнڵãÐÅÏ¢
TPlanFragment½á¹¹ÖеÄplan³ÉÔ±µÄÀàÐÍΪTPlan£¬°üº¬Ò»¸öÖ´ÐÐ·ÖÆ¬ÖеĽڵ㼰ÆäÏà¹ØµÄ±í´ïʽÐÅÏ¢¡£TPlan½á¹¹Èçͼ4Ëùʾ¡£

ͼ4 TPlan½á¹¹ÖеÄÖ´ÐнڵãÐÅÏ¢
ÐèҪ˵Ã÷µÄÊÇ£¬TPlanµÄ³ÉÔ±ÊÇTPlanNodeµÄ¼¯ºÏ£¬Ã¿¸öTPlanNode°üº¬Ò»¸öÖ´ÐнڵãµÄÐÅÏ¢¡£
È«¾Öconjuncts
TPlanNode½á¹¹ÖеÄconjuncts³ÉÔ±°üº¬where×Ó¾äµÄ¹ýÂËÌõ¼þ£¬ÊÇTExprÀàÐ͵ļ¯ºÏ£¬¶øTExprµÄ³ÉÔ±ÓÖÊÇTExprNodeÀàÐ͵ļ¯ºÏ¡£Ò²¾ÍÊÇ˵£¬conjuncts°üº¬ÁËÖÁÉÙÒ»¿Ã±í´ïʽÊ÷£¬±í´ïʽÊ÷µÄÐÅÏ¢ÓÉTExprÃèÊö£¬Ê÷ÖеÄÿ¸ö½ÚµãµÄÐÅÏ¢ÓÉTExprNodeÃèÊö¡£±í´ïʽÊ÷µÄÒ¶×Ó½ÚµãµÄ±í´ïʽÀàÐÍÒ»°ãÊÇSLOT_REF»òÕßLITERAL£¬·ÇÒ¶×Ó½ÚµãµÄ±í´ïʽÀàÐÍÒ»°ãÊÇFUNCTION_CALL»òÕßPREDICATE¡£FUNCTION_CALL¿ÉÄÜÊÇÄÚ½¨µÄ£¬Ò²¿ÉÄÜÊÇHive»òÕßImpalaµÄUDF£»PREDICATE¿ÉÄÜÊÇCOMPOUND_PRED(andºÍor)¡¢LIKE_PRED(a like ¡®%b%¡¯)»òÕßIN_PRED(a in (1, 2, 3))µÈ¡£ÀýÈ磬where×Ó¾äÖеĹýÂËÌõ¼þΪphone = ¡®123¡¯ OR imsi IS NOT NULL£¬ÔòÕâ¸öconjunctsÊ÷Èçͼ5Ëùʾ¡£

ͼ5 ×é³ÉConjunctsµÄ±í´ïʽÊ÷
¾ÛºÏ²Ù×÷½ÚµãTAggregationNode
Èç¹ûTPlanNodeÊÇÒ»¸öTAggregationNode£¬ÄÇôÔÚTAggregationNodeÕâ¸ö½á¹¹ÖÐÓÐÁ½¸ö±È½ÏÖØÒªµÄ×ֶΣ¬Ò»¸öÊÇgrouping_exprs£¬ÁíÒ»¸öÊÇaggregate_functions¡£grouping_exprsµÄÀàÐÍÊÇÒ»¸öTExpr¼¯ºÏ£¬´æ´¢ÁËÖÁÉÙÒ»¿ÃTExprÊ÷¡£grouping_exprsÖеÄÿ¿ÃTExprÊ÷ÃèÊöÁËÒ»¸ö·Ö×飨group£©£¬ÀýÈçgroup by fnv_hash(date)·Ö×éÖеÄfnv_hash(phone)¾ÍÊÇÔÚgrouping_exprsµÄÒ»¿ÃTExprÊ÷ÖÐÃèÊöµÄ£¬Ê÷µÄÒ¶×Ó½ÚµãµÄ±í´ïʽÀàÐÍΪSLOT_REF£¬¸¸½ÚµãΪFUNCTION_CALL¡£ºÍgrouping_exprsÀàËÆµÄÊÇ£¬aggregate_functionsÒ²ÊÇÒ»¸öTExpr¼¯ºÏ£¬Ö»²»¹ýËüÃèÊöµÄÊǾۺϺ¯ÊýµÄÐÅÏ¢£¬ÀýÈç¾ÛºÏº¯Êýcount(user)ÔÚaggregate_functionsµÄÒ»¿ÃTExprÊýÖÐÃèÊö£¬Ò¶×Ó½ÚµãµÄ±í´ïʽÀàÐÍΪSLOT_REF£¬¸¸½Úµã±í´ïʽÀàÐÍΪAGGREGATE_EXPR¡£
¶ÔÓÚÒ»¸ö¾ÛºÏ²Ù×÷À´Ëµ£¬Ö´Ðмƻ®µÄ×îµ×²ãÁ½¸ö·ÖƬ¶¼»á°üº¬AggregationNode£¬µ«ÊÇÕâÁ½¸öAggregationNodeµÄgrouping_exprsºÍaggregate_functionsÖеÄTExprNode½ÚµãÀàÐÍÒÔ¼°½ÚµãµÄ×ÖÃæÖµ£¨scalar_type£©ÀàÐͲ»¾¡Ïàͬ¡£µÚ0¸ö·ÖƬÖеľۺϲÙ×÷ÊÇUNPARTITIONEDµÄ£¬Ò²¾ÍÊÇ˵µ±Ç°¾ÛºÏ²Ù×÷µÄ½á¹ûÒª¹ã²¥¸øÏÂÒ»¸ö·ÖƬ£¬·ÖƬ1ÖеÄAggregationNodeÊÕµ½ËùÓÐ·ÖÆ¬0µÄ¶à¸öʵÀý¹ã²¥µÄ±¾µØ¾ÛºÏºóµÄÊý¾Ý¼¯£¬×ö×îºóµÄÊý¾Ýmerge¡£Õâ¾Í±È½ÏºÃÀí½âÕâÁ½¸ö·ÖƬÖеÄAggregationNodeµÄgrouping_exprsºÍaggregate_functionsÖеÄTExprNode½ÚµãÀàÐÍΪºÎ²»Í¬ÁË¡£ÀýÈ磬group by a, fnv_hash(b)Õâ¸ö·Ö×飬·ÖƬ0µÄTAggregationNodeÖеÄgrouping_exprsÖеĵڶþ¸öTExprÊ÷ÃèÊöÁËfnv_hash(b)²Ù×÷£¬ÓÉÁ½¸öTExprNode×é³É£¬¸ù½ÚµãÀàÐÍΪFUNCTION_CALL£»¶ø·ÖƬ1ÖÐTAggregationNodeÖеÄgrouping_exprsÖеĵڶþ¸öTExprÊ÷Ö»ÓÐÒ»¸öTExprNode£¬ÀàÐÍΪSLOT_REF¡£Èçͼ6Ëùʾ¡£

ͼ6 AggregationNodeÖÐgrouping_exprsÔÚ²»Í¬Ö´Ðмƻ®·ÖƬÖеıí´ïʽÊ÷ʵÏÖ
join²Ù×÷½ÚµãTHashJoinNode
Èç¹ûTPlanNodeÊÇÒ»¸öTHashJoinNode£¬ÔòÓÐÁ½¸ö±È½ÏÖØÒªµÄ×ֶΣ¬Ò»¸öÊÇeq_join_conjuncts£¬ÁíÒ»¸öÊÇother_join_conjuncts¡£eq_join_conjunctsµÄÀàÐÍÊÇTEqJoinCondition£¬TEqJoinConditionµÄ³ÉÔ±ÊÇÃûΪleftºÍrightµÄÁ½¸öTExpr¡£Õâ¸ö±È½ÏºÃÀí½â£¬leftºÍright·Ö±ð´ú±íjoin×Ó¾äÖеȺÅÁ½±ßµÄ±í´ïʽ¡£ÀýÈçt1 join t2 on t1.a=t2.a£¬ÄÇôleftÕâ¸öTExprÖÐÖ»ÓÐÒ»¸öTExprNode£¬±í´ïʽÀàÐÍΪSLOT_REF£¬rightÕâ¸öTExprÖÐÒ²Ö»ÓÐÒ»¸öTExprNode£¬±í´ïʽÀàÐÍͬΪSLOT_REF¡£ÕâÀïÐèÒªÖØµã˵һÏÂother_join_conjunctsÕâ¸ö½á¹¹¡£´ó¼Ò¿ÉÒÔ¿´Ò»ÏÂImpalaǰ¶ËµÄHashJoinNode´úÂ룬ÆäÖжÔother_join_conjunctsµÄ½âÊÍÊÇ£ºjoin conjuncts from the JOIN clause that aren¡¯t equi-join predicates£¬µ¥¿´ÆðÀ´Ëƺõ˵µÄºÜÃ÷È·£¬¾ÍÊÇÔÚjoin×Ó¾äÖгöÏÖ·Çequi-joinµÄÌõ¼þʱ»áÉèÖÃother_join_conjuncts¡£µ«ÆäʵÕâÀïÓиöǰÌᣬ¾ÍÊÇÖ»ÓÐÔÚouter joinºÍsemi joinÕâÁ½ÖÖ²Ù×÷ÖУ¬other_join_conjuncts²Å»á±»ÉèÖã¬inner joinµÄÇé¿ö²¢²»»áÉèÖÃother_join_conjuncts¡£ÕâÀïͨ¹ýÒ»¸öÀý×ÓÀ´ËµÃ÷»á±È½ÏÈÝÒ×Àí½â¡£±ÈÈçÎÒÓÐÁ½ÕÅ±í£¬×ó±ít1ÖеÄÊý¾ÝÈçÏ£º

ÓÒ±ít2ÖеÄÊý¾ÝÈçÏ£º

¶ÔÓÚÕâÁ½ÕÅ±í£¬ÎÒÃÇÏÈÀ´ÕÒ³öÁ½ÕűíÐÕÃûÏàͬ£¬ÐÔ±ð²»Í¬µÄ¼Ç¼µÄ½»¼¯£¬ÕâÀïÎÒÃÇʹÓÃinner join£¬SQLÓï¾äÈçÏ£º
select t1.name, t1.gender, t2.gender from t1 inner join t2 on t1.name = t2.name and t1.gender != t2.gender |
ÔÚÕâÀimpala»á×Ô¶¯½«t1.gender != t2.genderת»¯ÎªÈ«¾ÖµÄconjuncts£¬×ª»»ºóµÄSQLÓï¾äΪ£º
select t1.name, t1.gender, t2.gender from t1 inner join t2 on t1.name = t2.name where t1.gender != t2.gender |
·µ»ØµÄ½á¹û¼¯ÈçÏ£º

ÕâÖֱ任ÊǺÜÈÝÒ×Àí½âµÄ£¬ÏÈÕÒ³öÃû×ÖÏàͬµÄ¼Ç¼£¬ÔÙÈ«¾Ö¹ýÂ˵ôjoin·µ»ØµÄ½á¹û¼¯ÖÐÐÔ±ðÏàͬµÄ¼Ç¼¡£ÏÖÔÚ¿¼ÂÇÁíÍâÒ»ÖÖÇé¿ö£¬·µ»Ø×ó±íµÄËùÓмǼ£¬²¢ÒÔÐÕÃûÏàͬ£¬ÐÔ±ð²»Í¬µÄÌõ¼þjoinÓÒ±í£¬ÕâÀïÎÒÃÇʹÓÃleft outer join£¬SQLÓï¾äÈçÏ£º
select t1.name, t1.gender, t2.gender from t1 left outer join t2 on t1.name = t2.name and t1.gender != t2.gender |
·µ»ØµÄ½á¹û¼¯ÈçÏÂ

¿ÉÒÔ¿´µ½left outer join·µ»ØÁË×ó±íµÄËùÓмǼ£¬µ«ÊÇÓÉÓÚÁ½ÕűíÖÐÐÕÃûΪaµÄ¼Ç¼µÄÐÔ±ðÏàͬ£¬²»·ûºÏleft outer join×Ó¾äÖеġ°ÐÕÃûÏàͬµ«ÐÔ±ð²»Í¬¡±µÄÔ¼ÊøÌõ¼þ£¬Òò´Ë¶ÔÓ¦µÄ¼Ç¼ÖÐt2.genderµÄֵΪNULL¡£ÄÇô¿¼ÂÇһϣ¬¿É·ñÏñinner joinÄÇÑù£¬°Ñouter joinÖеÄnon equiv-conjunctsת»¯ÎªÈ«¾ÖµÄconjunctsÄØ£¿´ð°¸ÊÇ·ñ¶¨µÄ¡£ÏÈÀ´¿´Ò»ÏÂleft outer joinת»¯ºóµÄSQLÓï¾äºÍ¶ÔÓ¦µÄ½á¹û¡£×ª»¯ºóµÄSQLÓï¾äÈçÏ£º
select t1.name, t1.gender, t2.gender from t1 left outer join t2 on t1.name = t2.name where t1.gender != t2.gender |
·µ»ØµÄ½á¹û¼¯ÈçÏ£º

¿ÉÒÔ¿´µ½outer join×Ó¾äÖеÄnon equiv-conjunctsת»¯ÎªÈ«¾ÖconjunctsÖ®ºó£¬½á¹û¼¯ÖÐÐÕÃûb¶ÔÓ¦µÄ¼Ç¼±»¹ýÂ˵ôÁË£¬Õ⵱Ȼ²»ÊÇÎÒÃÇÏëÒªµÄ½á¹û¡£
Ö®ËùÒÔ²»Äܽ«outer join×Ó¾äÖеÄnon equiv-conjunctsת»¯ÎªÈ«¾Öconjunts£¬ÊÇÒòΪÎÞÂÛjoin×Ó¾äÖÐÊÇ·ñ´æÔÚnon equiv-conjuncts£¬×îÖյĽá¹û¼¯¶¼Ó¦¸Ã°üº¬×ó±í£¨left outer join£©»òÕßÓÒ±í£¨right outer join£©µÄÈ«²¿¼Ç¼¡£Òò´Ëen_join_conjunctsºÍother_join_conjunctsÁ½²¿·ÖÐÅÏ¢Äܹ»¾ö¶¨´øÓÐnon equiv-conjunctsµÄouter join»òsemi joinµÄÕýÈ·ÐÔ£¬½«non equiv-conjunctsת»»³ÉÈ«¾Öconjuncts²¢²»ÊÇÕýÈ·µÄ×ö·¨¡£
ºÍ²éѯ½á¹ûÁÐÏà¹ØµÄ±í´ïʽÐÅÏ¢
ÎÒÃÇÌá½»µÄ²éѯ½á¹û¼¯ÖеÄÿһÁж¼ÊÇÒ»¸ö±í´ïʽ£¬ÔÚTPlanFragment½á¹¹ÖÐÓÉoutput_exprs×ֶαíʾ¡£output_exprsµÄÀàÐÍΪTExprµÄ¼¯ºÏ¡£¼¯ºÏÖÐÿ¸öTExpr¶¼ÊÇÒ»¿ÃTExprNodeÊ÷£¬°üº¬Á˲éѯÊä³öÒ»Áеıí´ïʽÐÅÏ¢¡£ÀýÈ磬select abs(fnv_hash(a)), count(b) from t group by a²éѯµÄÊä³öÓÐÁ½ÁУ¬µÚÒ»ÁеÄTExprNodeÊ÷µÄ¸ù½ÚµãΪ±í´ïʽÀàÐÍΪFUNCTION_CALL(abs(BIGINT))£¬Æäº¢×Ó½ÚµãÀàÐÍÒ²ÊÇFUNCTION_CALL(fnv_hash(STRING))£¬Ò¶×Ó½ÚµãÀàÐÍΪSLOT_REF¡£
Output Sink
Output Sink£¬¼´Êý¾ÝÁ÷Êä³öµÄÄ¿µÄµØ¡£Õâ¸öÄ¿µÄµØÒªÃ´ÊÇÏÂÒ»¸ö²éѯ¼Æ»®·ÖƬ£¨select£©£¬ÒªÃ´ÊÇÒ»¸ö±í£¨insert select»òÕß create table as select£©¡£ÔÚTPlanFragment½á¹¹ÖеÄotput_sink×ֶεÄÀàÐÍÊÇTDataSink£¬TDataSinkÀàÐ͵ijÉÔ±ÈçÏ£º
TDataSinkType::type type TDataStreamSink stream_sink TTableSink table_sink |
TDataSinkType
¸ù¾ÝÊý¾ÝÁ÷Êä³öÄ¿µÄµØµÄ²»Í¬£¬date sinkÓÐÁ½ÖÖÀàÐÍ£¬Ò»ÖÖÊÇDATA_STREAM_SINK£¬Î»ÓÚdata stream senderÏÂÓβéѯ¼Æ»®·ÖƬÖУ»Ò»ÖÖÊÇTABLE_SINK£¬Î»ÓÚcoordinatorÏÂÓΣ¬ÊÇÊý¾Ý²éѯ½á¹û¼¯ºÍ´ý²åÈëµÄбíÖ®¼äµÄý½é¡£
TDataStreamSink
TDataStreamSink½á¹¹ÖÐÓÐÁ½¸ö³ÉÔ±£¬Ò»¸öÊÇdest_node_id£¬¼´Ä¿µÄ½ÚµãµÄid¡£ÀýÈçÒ»¸öselect a, count(b) from t group by aµÄ¾ÛºÏ²éѯ£¬Ö´Ðмƻ®×îµ×²ã·ÖƬµÄAggregationNodeÊÇÒ»¸ödata stream sender£¬ËüµÄidΪ1£»ËüËùÔÚ·ÖÆ¬µÄÏÂÓÎ·ÖÆ¬ÖеÄexchangeNodeÊÇÒ»¸ödata stream receiver£¬ËüµÄidΪ2£¬ÄÇôTDataStreamSink±äÁ¿×ܵÄdest_node_idΪ2¡£
TDataStreamSinkµÄÁíÒ»¸ö³ÉÔ±ÊÇTDataPartitionÀàÐ͵ıäÁ¿output_partition¡£´ÓÃû×ÖÉÏÀ´¿´£¬ºÜÈÝÒ×ÈÃÈËÎóÒÔΪºÍÊǺͱí·ÖÇøÏà¹Ø£¬È»¶ø²¢²»ÊÇ¡£TDataPartition½á¹¹ÃèÊöÁËÊý¾ÝÁ÷µÄ·Ö·¢·½Ê½£¬ÓÐËÄÖÖ·Ö·¢·½Ê½£¬UNPARTITIONED¡¢RANDOM¡¢HASH_PARTITIONEDÒÔ¼°RANGE_PARTITIONED¡£½ØÖ¹µ½Impala2.2°æ±¾»¹²»Ö§³ÖRANGE PARTITIONµÄ·½Ê½¡£ÄÇÏÂÃæÎÒÃǾͶÔUNPARTITIONED¡¢RANDOMºÍHASH_PARTITIONED×öһϽâÊÍ¡£
1. UNPARTITIONED¡ª¡ª¹ËÃû˼Ò壬¾ÍÊÇ¡°²»·ÖƬ¡±£¬Ò²¾ÍÊÇËùÓÐÊý¾ÝλÓÚͬһ¸öimpalad½Úµã¡£
2. RANDOM¡ª¡ªÊý¾Ý²¢²»°´ÕÕijһÁÐ·ÖÆ¬£¬¶øÊÇËæ»ú·Ö²¼ÔÚ¶à¸ö½ÚµãÉÏ¡£ÀýÈçHdfsScanNodeµÄÊý¾Ý·ÖƬ¾ÍÊÇRANDOMµÄ¡£
3. HASH_PARTITIONED¡ª¡ªÊý¾Ý°´ÕÕijһÁÐ·ÖÆ¬£¬²»Í¬·ÖƬµÄÊý¾ÝλÓÚ²»Í¬µÄimpalad½Úµã¡£
TTableSink
TTableSink½á¹¹ÖеĽṹÏà¶Ô¼òµ¥£¬Ö÷Òª°üÀ¨Ä¿µÄ±íid¡¢tableSinkÀàÐÍ£¨HDFS»òHBASE£©¡¢hdfsTableSinkµÄpartition key±í´ïʽ£¬ÒÔ¼°ÊÇ·ñ¸²¸ÇÔÓÐÊý¾Ý£¨insert overwrite£©µÈÐÅÏ¢¡£
Impalad½ÚµãÉϵÄÊý¾ÝɨÃ跶Χ
TQueryExecRequest½á¹¹ÖеÄper_node_scan_ranges³ÉÔ±¶¨ÒåÁ˺ͲéѯÏà¹ØµÄÊý¾ÝɨÃ跶Χ£¬ÀàÐÍÊÇmap >£¬ÎªImpalad½Úµãµ½TScanRangeLocation¼¯ºÏµÄÓ³Éä¡£Õâ¸ö½á¹¹Ö÷ÒªÃèÊöÁËÐèҪɨÃèµÄÊý¾ÝÔÚ¼¯ÈºÉϵķֲ¼£¬°üÀ¨Êý¾ÝλÓÚÄÄЩ½Úµã¡¢Ã¿¸ö½ÚµãÉÏÊý¾ÝËùÔÚblockÔÚÎļþÖÐµÄÆ«ÒƺʹóС£¬ÒÔ¼°blockµÄ±¸·ÝÐÅÏ¢¡£Õâ¸ö½á¹¹µÄ×÷ÓÃÒ²ºÜÃ÷ÏÔ£¬¾ÍÊǸù¾ÝTScanRangeLocationsÔÚÿ¸öImpalad½ÚµãÉϵÄÊýÁ¿£¨ÕâÀï¿ÉÒÔÈÏΪTScanRangeLocationsµÄÊýÁ¿¾ÍÊÇÒ»¸ö½ÚµãÉÏÐèҪɨÃèµÄblockÊý£©£¬À´¾ö¶¨ÔÚÔÚÒ»¸öScanNodeʵÀýÖУ¬»áÓжàÉÙ¸ö²¢·¢µÄScanner¡£¶ÔÓÚtextÎļþÀ´Ëµ£¬Ã¿¸öScanner¸ºÔðɨÃèÒ»¸öblock£»¶ÔÓÚparquetÎļþÀ´Ëµ£¬Ã¿¸öScanner¸ºÔðɨÃèÒ»¸öÎļþ¡£TScanRangeLocationsµÄ³ÉÔ±ÈçÏ£º
TScanRange scan_range vector<tscanrangelocation> locations</tscanrangelocation> |
TScanRange
TScanRange¹ËÃû˼Ò壬¶¨ÒåÁËÒ»¸öÊý¾Ý·ÖƬµÄɨÃ跶Χ¡£Õâ¸öÊý¾Ý·¶Î§¶ÔÓÚHDFSÀ´ËµÊÇÒ»¸öblock£¬¶ÔÓÚHBASEÔòÊÇÒ»¸ökey range¡£TScanRangeÖÐÓÐÁ½¸ö³ÉÔ±£¬Ò»¸öÊÇTHdfsFileSplitÀàÐ͵Ähdfs_file_split±äÁ¿£¬ÔÚTHdfsFileSplitÕâ¸ö½á¹¹ÖУ¬¶¨ÒåÁËÒ»¸öScannerËùÐèµÄblockµÄÈ«²¿ÐÅÏ¢£¬°üÀ¨blockËùÔÚµÄÎļþÃû¡¢blockÔÚÎļþÖÐµÄÆ«ÒÆ¡¢blockµÄ´óС¡¢blockËùÔÚµÄpartition id£¨»¹¼ÇµÃÔÚÃèÊö·û±íµÄTableÃèÊö·ûÖÐTHdfsTableÖж¨ÒåµÄpartitions³ÉÔ±Âð£¿ÀàÐÍΪmap£¬Scanner»á´ÓÕâÀïÕÒµ½partition id¶ÔÓ¦µÄpartitionÐÅÏ¢£©¡¢Îļþ³¤¶ÈÒÔ¼°²ÉÓõÄѹËõËã·¨¡£ÁíÒ»¸öTScanRangeµÄ³ÉÔ±ÊÇTHBaseKeyRangeÀàÐ͵Ähbase_key_range±äÁ¿£¬ÔÚTHBseKeyRangeÕâ¸ö½á¹¹ÖУ¬´æ´¢Á˵±Ç°ÐèҪɨÃèµÄÊý¾Ý·ÖƬµÄÆðʼrowKeyºÍ½áÊørowKey¡£
TScanRangeLocation
Êý¾Ý·ÖƬ£¨¶ÔHDFSÀ´ËµÊÇblock£©µÄreplicationÐÅÏ¢±£´æÔÚTScanRangeLocationsÖС£ÖÚËùÖÜÖª¡£HDFSµÄÊý¾ÝĬÈÏÊÇ3±¸·Ý£¬ÄÇôÔÚlocationsÕâ¸ö¼¯ºÏÖоʹ洢ÁË3¸öTScanRangeLocation£¬Ã¿¸öTScanRangeLocation¶¼±£´æÁËÆäÖÐÒ»¸ö±¸·ÝµÄÏà¹ØÐÅÏ¢£¬°üÀ¨Õâ¸öreplicationËùÔÚµÄÖ÷»úid¡¢Êý¾ÝËùÔÚµÄvolumn idÒÔ¼°Êý¾ÝÊÇ·ñ±»hdfs»º´æ¡£
ImpalaµÄ²éѯÉÏÏÂÎÄ
ºÍÓû§Ìá½»µÄ²éѯÏà¹ØµÄÉÏÏÂÎÄÐÅÏ¢±£´æÔÚTQueryExecRequestµÄquery_ctx³ÉÔ±ÖУ¬ÀàÐÍΪTQueryCtx¡£TQueryCtxµÄ³ÉÔ±ÈçÏ£º
TClientRequest request TUniqueId query_id TSessionState session string now_string int32_t pid TNetworkAddress coord_address vector< ::impala::TTableName> tables_missing_stats bool disable_spilling TUniqueId parent_query_idx |
TClientRequest
TClientRequest½á¹¹Öб£´æÁ˿ͻ§¶ËÌá½»µÄSQLÓï¾äÒÔ¼°ImpalaµÄÆô¶¯²ÎÊý£¬°üÀ¨¼ÆËã½ÚµãÊý¡¢scannerÒ»´ÎɨÃèµÄbatch´óС¡¢×î´óscannerÏß³ÌÊý¡¢×î´óio»º³åÇø´óС¡¢mem_limit¡¢parquetÎļþ´óС£¬ÒÔ¼°ÊÇ·ñÆôÓÃÔËÐÐʱ´úÂëÉú³ÉµÈÐÅÏ¢¡£
TSessionState
TSessionState½á¹¹Öб£´æÁ˿ͻ§¶ËÁ¬½ÓÐÅÏ¢£¬°üÀ¨¿Í»§¶ËÁ¬½ÓµÄ·½Ê½£¨BEESWAX»òÕßHIVESERVER2£©¡¢Á¬½ÓµÄÊý¾Ý¿â¡¢Óû§Ãû¡¢ÒÔ¼°Ìá½»²éѯËùÔÚ½ÚµãµÄÖ÷»úÃûºÍ¶Ë¿Ú¡£
TNetworkAddress
TNetworkAddress½á¹¹Öб£´æÁËcoordinatorµÄÖ÷»úÃûºÍ¶Ë¿Ú¡£
×ܽá
ͨ¹ý¶ÔTQueryExecRequest½á¹¹µÄ·ÖÎö£¬ÎÒÃDz»½öÄܹ»Á˽âImpalaÔÚÒ»¸ö²éѯµÄÉúÃüÖÜÆÚÄÚÊÕ¼¯ÁËÄÄЩÓÐÓõÄÐÅÏ¢£¬¸ü¼ÓÖØÒªµÄÊÇ£¬¶ÔÕÕÕâЩÐÅÏ¢£¬Äܹ»°ïÖúÎÒÃǸüºÃµÄÀí½âImpalaµÄ²éѯִÐÐÂß¼£¬Ê¹µÃ¶ÔImpala´úÂëµÄÀí½â¸ü¼ÓÉî¿Ì£¬ÔÚʵ¼ÊµÄʹÓó¡¾°ÖУ¬¸ù¾Ý²»Í¬µÄ²éѯÐèÇóºÍÊý¾ÝÁ¿¼¶£¬×ö³ö¸üÓÐÕë¶ÔÐԵIJéѯÓÅ»¯µ÷Õû¡£
|