您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  要资料 文章 文库 Lib 视频 Code iProcess 课程 认证 咨询 工具 讲座吧   成长之路  
会员   
 
   
 
  
每天15篇文章
不仅获得谋生技能
更可以追随信仰
 
 
     
   
 订阅
  捐助
Impala的信息仓库:解读TQueryExecRequest结构
 
作者:查锐   来源:CSDN   发布于 2016-4-1
来自于要资料   794 次浏览     评价:      
 

相信很多做海量数据处理和大数据技术研发的朋友对Impala这个基于Hadoop的交互式MPP引擎都不陌生,尤其对Impala出色的数据处理能力印象深刻。在查询执行的整个生命周期内,Impala主要通过Frontend生成优化的查询计划,Backend执行运行时代码生成来优化查询效率。在客户端的一个SQL查询下发到ImpalaServer后,Frontend会在生成查询计划的过程中,收集必要的统计信息,作为Backend分布式执行的依据。这些信息主要包括:表结构、分区统计、SQL语句的表达式集合,以及执行计划分片描述等。这些信息的集合在Impala中被称为TQueryExecRequest。通过名字可以看出,这是个Thrift结构,由Frontend封装,通过Impala服务开启的ThriftServer发送到后端的Coordinator。可以说,TQueryExecRequest结构就是Impala执行查询的信息仓库,熟悉这个结构有助于理解整个Impala的分布式架构实现。

TQueryExecRequest结构组成

TQyeryExecRequest结构中,主要包含如下成员:

TDescriptorTable desc_tbl
vector<tplanfragment> fragments
vector<int32_t> dest_fragment_idx
map<tplannodeid, vector<tscanrangelocations=""> > per_node_scan_ranges
TResultSetMetadata result_set_metadata
TFinalizeParams finalize_params
TQueryCtx query_ctx
string query_plan
TStmtType::type stmt_type
int64_t per_host_mem_req
int64_t per_host_vcore
vector<tnetworkaddress> host_list
string lineage_graph</tnetworkaddress></tplannodeid,></int32_t></tplanfragment>

描述符表

描述符表的类型为TDescriptorTable,包含和查询结果相关的table、tuple以及slot描述信息。

这里需要说明的是,无论提交的SQL查询有多复杂,包括数据过滤、聚合、JOIN,都是在对HDFS或HBase(所查询分区内的)全量数据扫描的基础上进行的。数据扫描结果集的每一行都是一个tuple,tuple中的每个字段都是一个slot。描述符表有如下成员:

vector<tslotdescriptor> slotDescriptors
vector<ttupledescriptor> tupleDescriptors
vector<ttabledescriptor> tableDescriptors

Tuple描述符

在Impala中,一个查询返回结果集中的每一行叫做一个tuple,对tuple的描述信息存放在tuple描述符结构中。tuple描述符有如下成员:

TTupleId id;
int32_t byteSize;
int32_t numNullBytes;
TTableId tableId;
  1. id是表中的tuple id,截止到Impala2.2版本,一个tupleRow中只会有一个tuple,因此id总是0。
  2. byteSize是一个tuple中所有slot大小的和,再加上nullIndicator占用的字节数。
  3. numNullBytes指的是,如果tuple中的一个slot可以为NULL,则需要占用tuple 1个bit大小的空间,numNullBytes等于(N + 7) / 8,N是可能为NULL的slot个数。例如在一个tuple中,可能为NULL的slot数为0,则numNullBytes=0,也就是不需要额外的空间去存储null indicator的信息;如果可以为NULL的slot数为1-8,则numNullBytes为1。
  4. tableId是tuple所属的table id。

Slot描述符

slot描述信息存放在TSlotDescriptor结构中,主要用来存放所查询表中某个字段的相关信息,成员包括:

TSlotId id
TTupleId parent
TColumnType slotType
vector<int32_t> columnPath
int32_t byteOffset
int32_t nullIndicatorByte
int32_t nullIndicatorBit
int32_t slotIdx
bool isMaterialized</int32_t>

id在一个tuple中唯一标识一个slot。

parent代表slot所在的tuple id。

slotType是slot所在的column类型。对于Parquet嵌套文件格式来说,一个TColumnType可能有多个TTypeNode。例如,一个STRUCT类型可能包含多个SCALAR类型。一个TTypeNode的类型可能是标量的(确定的内建类型)、数组(ARRAY)、映射(MAP)或者结构(STRUCT)。目前Impala的2.3版本已经支持ARRAY、MAP和STRUCT这三种复杂类型(仅限Parquet文件格式),这个新特性应该会使更多的人转向Impala阵营。TColumnType的结构如图1所示。

图1 TColumnType结构图

columnPath是一个整型集合,但是在前端只会填充一个元素,因为一个slot只对应一个columnPath。columnPath代表一个slot在表中的位置,如create table t (id int, name string),则id的columnPath为0,name的columnPath为1。对于分区表,partition key字段的columnPath会排在前面。如create table t(id int、name string、calling string)partitioned by (date string, phone int),则date的columnPath为0, phone的columnPath为1, id的columnPath为2,name的columnPath为3,calling的columnPath为4。在查询时,columnPath的顺序是按照column在提交的SQL语句中出现的顺序排列的,如select name from t where date = ‘2016-01-01’ and calling = ‘123’ and phone = abs(fnv_hash(‘123’)) % 10这个SQL,columnPath的顺序为3, 0, 4, 1。

byteOffset是slot在tuple中的偏移,单位为字节。

nullIndicatorByte表明当前slot为NULL时,在tuple的哪个字节中。

nullIndicatorBit表明当前slot为NULL时,在第nullIndicatorByte个字节的哪个bit上。

slotIdx是slot在tuple中的序号。

isMaterialized表明当前slot是否被物化。对于partition key,也就是clustering column来说,isMaterialized为false,也就是partition key不会被物化。

Table描述符

Table描述符包含和查询相关表的信息,包括表字段、类型、分区等信息。成员包括:

TSlotId id
TTupleId parent
TColumnType slotType
vector<int32_t> columnPath
int32_t byteOffset
int32_t nullIndicatorByte
int32_t nullIndicatorBit
int32_t slotIdx
bool isMaterialized</int32_t>

在一个查询中,id字段唯一标识一个表。

tableType表明表的类型,TTableType为枚举类型,包括HDFS_TABLE = 0,HBASE_TABLE = 1,VIEW = 2,DATA_SOURCE_TABLE = 3。

numCols代表table中column的个数。

numClusteringCols代表table中clustering column的个数,也就是parititon的个数。

colNames代表table中所有column的名称的集合。

THdfsTable中的分区信息

THdfsTable结构中包含当前table和HDFS相关的所有信息。关于hdfs table中的partition信息,有如下说明:

partition key的类型只能是标量的,如int、float、string、decimal、timestamp。

不同的partition可以有不同的的文件格式,用户可以在一个表中,增加、删除分区,为分区设置特定的文件格式:

[localhost:21000] > create table census (name string) partitioned by (year smallint);
[localhost:21000] > alter table census add partition (year=2012); -- text format
[localhost:21000] > alter table census add partition (year=2013); -- parquet format
[localhost:21000] > alter table census partition (year=2013) set fileformat parquet;

THdfsTable结构中的partitions类型为map,这个字段不能为空,即使没有为表指定分区,也会有一个默认的partition。

partitions的Size为表中partition的总数。例如,一个表按month、day以及postcode分区,有12个month,每个month中30个day,每个day中100个postcode,则partition数为12 * 30 * 100=36000(如上面的例子一个表按month、day以及postcode分区,month为string类型,day为int类型,postcode为bigint类型,则partitionKeyExprs中的三个成员(TExpr)中的nodes的唯一一个元素(TExprNode)的node_type分别为STRING_LITERAL(11)、INT_LITERAL(4)和INT_LITERAL(4))。

THdfsPartition结构中的partitionKeyExprs类型为TExpr的集合,每个parititon key的信息由一个TExpr描述。这里的TExpr的类型是标量类型(partition key只能是标量类型)的字面值,基类为LiteralExpr,根据不同的partition key类型,可能是StringLiteral、IntLiteral等。

TExpr的成员类型是TExprNode的集合,由于partition key的类型为LiteralExpr,所以这里的TExprNode集合中只会有一个成员,因为在Expr树中,LiteralExpr节点不会再有孩子Expr。

THdfsPartition结构中的file_desc成员类型为THdfsFileDesc的集合,这表明一个分区下可能会有多个文件。THdfsFileDesc结构中的file_blocks成员类型为THdfsFileBlock的集合,这表明一个文件可能由多个block组成,在THdfsFileBlock中指定了block的大小、在文件中的偏移量等。

THdfsTable结构如图2所示。

图2 THdfsTable结构图

执行计划分片

一个查询请求提交给ImpalaServer之后,会在后端调用JNI初始化一个前端的Frontend实例,由这个实例对提交的SQL语句做语法分析,找出和查询相关的table、scanList以及expr信息,通过这些信息构造执行节点(如HdfsScanNode、AggreagtionNode、HashJoinNode),根据节点类型评估节点为分布式执行还是本地执行,执行节点组成执行计划分片,最终构造出整个执行计划。

例如,客户端提交了一个SQL请求select date, count(user) from t group by date,通过前端语法分析,可以得到如下信息:

  1. scanList由date和user字段组成。
  2. groupingExpr是一个SLOT_REF类型的表达式(date字段)。
  3. aggregateExpr是一个AGGREGATE_EXPR类型的表达式(count(STRING)),它的孩子是一个SLOT_REF类型的表达式(user字段)。

根据这些信息可以得到如图3所示的执行计划。

图3 Frontend生成的执行计划

下面分析TQueryExecRequest中的执行计划分片(fragments)结构。fragments是一个TPlanFragment类型的集合,一个TPlanFragment中包含和一个执行计划分片相关的所有信息。TPlanFragment结构的成员如下:

string display_name
TPlan plan
vector<texpr> output_exprs
TDataSink output_sink
TDataPartition partition

执行计划分片中的执行节点信息

TPlanFragment结构中的plan成员的类型为TPlan,包含一个执行分片中的节点及其相关的表达式信息。TPlan结构如图4所示。

图4 TPlan结构中的执行节点信息

需要说明的是,TPlan的成员是TPlanNode的集合,每个TPlanNode包含一个执行节点的信息。

全局conjuncts

TPlanNode结构中的conjuncts成员包含where子句的过滤条件,是TExpr类型的集合,而TExpr的成员又是TExprNode类型的集合。也就是说,conjuncts包含了至少一棵表达式树,表达式树的信息由TExpr描述,树中的每个节点的信息由TExprNode描述。表达式树的叶子节点的表达式类型一般是SLOT_REF或者LITERAL,非叶子节点的表达式类型一般是FUNCTION_CALL或者PREDICATE。FUNCTION_CALL可能是内建的,也可能是Hive或者Impala的UDF;PREDICATE可能是COMPOUND_PRED(and和or)、LIKE_PRED(a like ‘%b%’)或者IN_PRED(a in (1, 2, 3))等。例如,where子句中的过滤条件为phone = ‘123’ OR imsi IS NOT NULL,则这个conjuncts树如图5所示。

图5 组成Conjuncts的表达式树

聚合操作节点TAggregationNode

如果TPlanNode是一个TAggregationNode,那么在TAggregationNode这个结构中有两个比较重要的字段,一个是grouping_exprs,另一个是aggregate_functions。grouping_exprs的类型是一个TExpr集合,存储了至少一棵TExpr树。grouping_exprs中的每棵TExpr树描述了一个分组(group),例如group by fnv_hash(date)分组中的fnv_hash(phone)就是在grouping_exprs的一棵TExpr树中描述的,树的叶子节点的表达式类型为SLOT_REF,父节点为FUNCTION_CALL。和grouping_exprs类似的是,aggregate_functions也是一个TExpr集合,只不过它描述的是聚合函数的信息,例如聚合函数count(user)在aggregate_functions的一棵TExpr数中描述,叶子节点的表达式类型为SLOT_REF,父节点表达式类型为AGGREGATE_EXPR。

对于一个聚合操作来说,执行计划的最底层两个分片都会包含AggregationNode,但是这两个AggregationNode的grouping_exprs和aggregate_functions中的TExprNode节点类型以及节点的字面值(scalar_type)类型不尽相同。第0个分片中的聚合操作是UNPARTITIONED的,也就是说当前聚合操作的结果要广播给下一个分片,分片1中的AggregationNode收到所有分片0的多个实例广播的本地聚合后的数据集,做最后的数据merge。这就比较好理解这两个分片中的AggregationNode的grouping_exprs和aggregate_functions中的TExprNode节点类型为何不同了。例如,group by a, fnv_hash(b)这个分组,分片0的TAggregationNode中的grouping_exprs中的第二个TExpr树描述了fnv_hash(b)操作,由两个TExprNode组成,根节点类型为FUNCTION_CALL;而分片1中TAggregationNode中的grouping_exprs中的第二个TExpr树只有一个TExprNode,类型为SLOT_REF。如图6所示。

图6 AggregationNode中grouping_exprs在不同执行计划分片中的表达式树实现

join操作节点THashJoinNode

如果TPlanNode是一个THashJoinNode,则有两个比较重要的字段,一个是eq_join_conjuncts,另一个是other_join_conjuncts。eq_join_conjuncts的类型是TEqJoinCondition,TEqJoinCondition的成员是名为left和right的两个TExpr。这个比较好理解,left和right分别代表join子句中等号两边的表达式。例如t1 join t2 on t1.a=t2.a,那么left这个TExpr中只有一个TExprNode,表达式类型为SLOT_REF,right这个TExpr中也只有一个TExprNode,表达式类型同为SLOT_REF。这里需要重点说一下other_join_conjuncts这个结构。大家可以看一下Impala前端的HashJoinNode代码,其中对other_join_conjuncts的解释是:join conjuncts from the JOIN clause that aren’t equi-join predicates,单看起来似乎说的很明确,就是在join子句中出现非equi-join的条件时会设置other_join_conjuncts。但其实这里有个前提,就是只有在outer join和semi join这两种操作中,other_join_conjuncts才会被设置,inner join的情况并不会设置other_join_conjuncts。这里通过一个例子来说明会比较容易理解。比如我有两张表,左表t1中的数据如下:

右表t2中的数据如下:

对于这两张表,我们先来找出两张表姓名相同,性别不同的记录的交集,这里我们使用inner join,SQL语句如下:

select t1.name, t1.gender, t2.gender from t1 inner join t2 on t1.name = t2.name and t1.gender != t2.gender

在这里,impala会自动将t1.gender != t2.gender转化为全局的conjuncts,转换后的SQL语句为:

select t1.name, t1.gender, t2.gender from t1 inner join t2 on t1.name = t2.name where t1.gender != t2.gender

返回的结果集如下:

这种变换是很容易理解的,先找出名字相同的记录,再全局过滤掉join返回的结果集中性别相同的记录。现在考虑另外一种情况,返回左表的所有记录,并以姓名相同,性别不同的条件join右表,这里我们使用left outer join,SQL语句如下:

select t1.name, t1.gender, t2.gender from t1 left outer join t2 on t1.name = t2.name and t1.gender != t2.gender

返回的结果集如下

可以看到left outer join返回了左表的所有记录,但是由于两张表中姓名为a的记录的性别相同,不符合left outer join子句中的“姓名相同但性别不同”的约束条件,因此对应的记录中t2.gender的值为NULL。那么考虑一下,可否像inner join那样,把outer join中的non equiv-conjuncts转化为全局的conjuncts呢?答案是否定的。先来看一下left outer join转化后的SQL语句和对应的结果。转化后的SQL语句如下:

select t1.name, t1.gender, t2.gender from t1 left outer join t2 on t1.name = t2.name where t1.gender != t2.gender

返回的结果集如下:

可以看到outer join子句中的non equiv-conjuncts转化为全局conjuncts之后,结果集中姓名b对应的记录被过滤掉了,这当然不是我们想要的结果。
之所以不能将outer join子句中的non equiv-conjuncts转化为全局conjunts,是因为无论join子句中是否存在non equiv-conjuncts,最终的结果集都应该包含左表(left outer join)或者右表(right outer join)的全部记录。因此en_join_conjuncts和other_join_conjuncts两部分信息能够决定带有non equiv-conjuncts的outer join或semi join的正确性,将non equiv-conjuncts转换成全局conjuncts并不是正确的做法。

和查询结果列相关的表达式信息

我们提交的查询结果集中的每一列都是一个表达式,在TPlanFragment结构中由output_exprs字段表示。output_exprs的类型为TExpr的集合。集合中每个TExpr都是一棵TExprNode树,包含了查询输出一列的表达式信息。例如,select abs(fnv_hash(a)), count(b) from t group by a查询的输出有两列,第一列的TExprNode树的根节点为表达式类型为FUNCTION_CALL(abs(BIGINT)),其孩子节点类型也是FUNCTION_CALL(fnv_hash(STRING)),叶子节点类型为SLOT_REF。

Output Sink

Output Sink,即数据流输出的目的地。这个目的地要么是下一个查询计划分片(select),要么是一个表(insert select或者 create table as select)。在TPlanFragment结构中的otput_sink字段的类型是TDataSink,TDataSink类型的成员如下:

TDataSinkType::type type
TDataStreamSink stream_sink
TTableSink table_sink

TDataSinkType

根据数据流输出目的地的不同,date sink有两种类型,一种是DATA_STREAM_SINK,位于data stream sender下游查询计划分片中;一种是TABLE_SINK,位于coordinator下游,是数据查询结果集和待插入的新表之间的媒介。

TDataStreamSink

TDataStreamSink结构中有两个成员,一个是dest_node_id,即目的节点的id。例如一个select a, count(b) from t group by a的聚合查询,执行计划最底层分片的AggregationNode是一个data stream sender,它的id为1;它所在分片的下游分片中的exchangeNode是一个data stream receiver,它的id为2,那么TDataStreamSink变量总的dest_node_id为2。

TDataStreamSink的另一个成员是TDataPartition类型的变量output_partition。从名字上来看,很容易让人误以为和是和表分区相关,然而并不是。TDataPartition结构描述了数据流的分发方式,有四种分发方式,UNPARTITIONED、RANDOM、HASH_PARTITIONED以及RANGE_PARTITIONED。截止到Impala2.2版本还不支持RANGE PARTITION的方式。那下面我们就对UNPARTITIONED、RANDOM和HASH_PARTITIONED做一下解释。

1. UNPARTITIONED——顾名思义,就是“不分片”,也就是所有数据位于同一个impalad节点。

2. RANDOM——数据并不按照某一列分片,而是随机分布在多个节点上。例如HdfsScanNode的数据分片就是RANDOM的。

3. HASH_PARTITIONED——数据按照某一列分片,不同分片的数据位于不同的impalad节点。

TTableSink

TTableSink结构中的结构相对简单,主要包括目的表id、tableSink类型(HDFS或HBASE)、hdfsTableSink的partition key表达式,以及是否覆盖原有数据(insert overwrite)等信息。

Impalad节点上的数据扫描范围

TQueryExecRequest结构中的per_node_scan_ranges成员定义了和查询相关的数据扫描范围,类型是map >,为Impalad节点到TScanRangeLocation集合的映射。这个结构主要描述了需要扫描的数据在集群上的分布,包括数据位于哪些节点、每个节点上数据所在block在文件中的偏移和大小,以及block的备份信息。这个结构的作用也很明显,就是根据TScanRangeLocations在每个Impalad节点上的数量(这里可以认为TScanRangeLocations的数量就是一个节点上需要扫描的block数),来决定在在一个ScanNode实例中,会有多少个并发的Scanner。对于text文件来说,每个Scanner负责扫描一个block;对于parquet文件来说,每个Scanner负责扫描一个文件。TScanRangeLocations的成员如下:

TScanRange scan_range
vector<tscanrangelocation> locations</tscanrangelocation>

TScanRange

TScanRange顾名思义,定义了一个数据分片的扫描范围。这个数据范围对于HDFS来说是一个block,对于HBASE则是一个key range。TScanRange中有两个成员,一个是THdfsFileSplit类型的hdfs_file_split变量,在THdfsFileSplit这个结构中,定义了一个Scanner所需的block的全部信息,包括block所在的文件名、block在文件中的偏移、block的大小、block所在的partition id(还记得在描述符表的Table描述符中THdfsTable中定义的partitions成员吗?类型为map,Scanner会从这里找到partition id对应的partition信息)、文件长度以及采用的压缩算法。另一个TScanRange的成员是THBaseKeyRange类型的hbase_key_range变量,在THBseKeyRange这个结构中,存储了当前需要扫描的数据分片的起始rowKey和结束rowKey。

TScanRangeLocation

数据分片(对HDFS来说是block)的replication信息保存在TScanRangeLocations中。众所周知。HDFS的数据默认是3备份,那么在locations这个集合中就存储了3个TScanRangeLocation,每个TScanRangeLocation都保存了其中一个备份的相关信息,包括这个replication所在的主机id、数据所在的volumn id以及数据是否被hdfs缓存。

Impala的查询上下文

和用户提交的查询相关的上下文信息保存在TQueryExecRequest的query_ctx成员中,类型为TQueryCtx。TQueryCtx的成员如下:

TClientRequest request
TUniqueId query_id
TSessionState session
string now_string
int32_t pid
TNetworkAddress coord_address
vector< ::impala::TTableName> tables_missing_stats
bool disable_spilling
TUniqueId parent_query_idx

TClientRequest

TClientRequest结构中保存了客户端提交的SQL语句以及Impala的启动参数,包括计算节点数、scanner一次扫描的batch大小、最大scanner线程数、最大io缓冲区大小、mem_limit、parquet文件大小,以及是否启用运行时代码生成等信息。

TSessionState

TSessionState结构中保存了客户端连接信息,包括客户端连接的方式(BEESWAX或者HIVESERVER2)、连接的数据库、用户名、以及提交查询所在节点的主机名和端口。

TNetworkAddress

TNetworkAddress结构中保存了coordinator的主机名和端口。

总结

通过对TQueryExecRequest结构的分析,我们不仅能够了解Impala在一个查询的生命周期内收集了哪些有用的信息,更加重要的是,对照这些信息,能够帮助我们更好的理解Impala的查询执行逻辑,使得对Impala代码的理解更加深刻,在实际的使用场景中,根据不同的查询需求和数据量级,做出更有针对性的查询优化调整。

 

 

   
 订阅
  捐助
相关文章 相关文档 相关课程



我们该如何设计数据库
数据库设计经验谈
数据库设计过程
数据库编程总结
数据库性能调优技巧
数据库性能调整
数据库性能优化讲座
数据库系统性能调优系列
高性能数据库设计与优化
高级数据库架构师
数据仓库和数据挖掘技术
Hadoop原理、部署与性能调优
 

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


领先IT公司 android开发平台最佳实践
北京 Android开发技术进阶
某新能源领域企业 Android开发技术
某航天公司 Android、IOS应用软件开发
阿尔卡特 Linux内核驱动
艾默生 嵌入式软件架构设计
西门子 嵌入式架构设计
更多...   
 
 
 
 
 
每天2个文档/视频
扫描微信二维码订阅
订阅技术月刊
获得每月300个技术资源
 
 

关于我们 | 联系我们 | 京ICP备10020922号 京公海网安备110108001071号