ǰ¼¸ÆªÎÄÕ½éÉÜÁËSpark SQLµÄCatalystµÄºËÐÄÔËÐÐÁ÷³Ì¡¢SqlParser£¬ºÍAnalyzer
ÒÔ¼°ºËÐÄÀà¿âTreeNode£¬±¾ÎĽ«Ïêϸ½²½âSpark SQLµÄOptimizerµÄÓÅ»¯Ë¼ÏëÒÔ¼°OptimizerÔÚCatalystÀïµÄ±íÏÖ·½Ê½£¬²¢¼ÓÉÏ×Ô¼ºµÄʵ¼ù£¬¶ÔOptimizerÓÐÒ»¸öÖ±¹ÛµÄÈÏʶ¡£
OptimizerµÄÖ÷ÒªÖ°ÔðÊǽ«Analyzer¸øResolvedµÄLogical
Plan¸ù¾Ý²»Í¬µÄÓÅ»¯²ßÂÔBatch£¬À´¶ÔÓï·¨Ê÷½øÐÐÓÅ»¯£¬ÓÅ»¯Âß¼¼Æ»®½Úµã(Logical Plan)ÒÔ¼°±í´ïʽ(Expression)£¬Ò²ÊÇת»»³ÉÎïÀíÖ´Ðмƻ®µÄǰÖá£ÈçÏÂͼ£º

Ò»¡¢Optimizer
OptimizerÕâ¸öÀàÊÇÔÚcatalystÀïµÄoptimizer°üϵÄΨһһ¸öÀ࣬OptimizerµÄ¹¤×÷·½Ê½ÆäʵÀàËÆAnalyzer£¬ÒòΪËüÃǶ¼¼Ì³Ð×ÔRuleExecutor[LogicalPlan]£¬¶¼ÊÇÖ´ÐÐһϵÁеÄBatch²Ù×÷£º

OptimizerÀïµÄbatches°üº¬ÁË3ÀàÓÅ»¯²ßÂÔ£º1¡¢Combine
Limits ºÏ²¢Limits 2¡¢ConstantFolding ³£Á¿ºÏ²¢ 3¡¢Filter Pushdown
¹ýÂËÆ÷ÏÂÍÆ,ÿ¸öBatchÀﶨÒåµÄÓÅ»¯°éËæ¶ÔÏó¶¼¶¨ÒåÔÚOptimizerÀïÁË£º
object Optimizer extends RuleExecutor[LogicalPlan] { val batches = Batch("Combine Limits", FixedPoint(100), CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, BooleanSimplification, SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions) :: Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning) :: Nil } |
ÁíÍâÌáÒ»µã£¬OptimizerÀï²»µ«¶ÔLogical Plan½øÐÐÁËÓÅ»¯£¬¶øÇÒ¶ÔLogical PlanÖеÄExpressionÒ²½øÐÐÁËÓÅ»¯£¬ËùÒÔÓбØÒªÁ˽âÒ»ÏÂExpressionÏà¹ØÀ࣬Ö÷ÒªÊÇÓõ½ÁËreferencesºÍoutputSet£¬referencesÖ÷ÒªÊÇLogical
Plan»òExpression½ÚµãµÄËùÒÀÀµµÄÄÇЩExpressions£¬¶øoutputSetÊÇLogical
PlanËùÓеÄAttributeµÄÊä³ö£º
È磺AggregateÊÇÒ»¸öLogical Plan£¬ ËüµÄreferences¾ÍÊÇgroup byµÄ±í´ïʽ
ºÍ aggreagateµÄ±í´ïʽµÄ²¢¼¯È¥ÖØ¡£
case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { override def output = aggregateExpressions.map(_.toAttribute) override def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet } |

¶þ¡¢ÓÅ»¯²ßÂÔÏê½â
OptimizerµÄÓÅ»¯²ßÂÔ²»½öÓжÔplan½øÐÐtransformµÄ£¬Ò²ÓжÔexpression½øÐÐtransformµÄ£¬¾¿ÆäÔÀí¾ÍÊDZéÀúÊ÷£¬È»ºóÓ¦ÓÃÓÅ»¯µÄRule£¬µ«ÊÇ×¢ÒâÒ»µã£¬¶ÔLogical
PlantransfromµÄÊÇÏÈÐò±éÀú(pre-order)£¬¶ø¶ÔExpression transfromµÄʱºòÊǺóÐò±éÀú(post-order)£º
2.1¡¢Batch: Combine Limits
Èç¹û³öÏÖÁË2¸öLimit£¬Ôò½«2¸öLimitºÏ²¢ÎªÒ»¸ö£¬Õâ¸öÒªÇóÒ»¸öLimitÊÇÁíÒ»¸öLimitµÄgrandChild¡£
/** * Combines two adjacent [[Limit]] operators into one, merging the * expressions into one single expression. */ object CombineLimits extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //llΪµ±Ç°Limit,leΪÆäexpression£¬ nlÊÇllµÄgrandChild£¬neÊÇnlµÄexpression Limit(If(LessThan(ne, le), ne, le), grandChild) //expression±È½Ï£¬Èç¹ûne±ÈleСÔò±í´ïʽΪne£¬·ñÔòΪle } } |
¸ø¶¨SQL£ºval query = sql("select * from (select *
from temp_shengli limit 100)a limit 10 ")
scala> query.queryExecution.analyzed res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Limit 10 Project [key#13,value#14] Limit 100 Project [key#13,value#14] MetastoreRelation default, temp_shengli, None |
×Ó²éѯÀïlimit100,Íâ²ã²éѯlimit10£¬ÕâÀïÎÒÃǵ±È»¿ÉÒÔÔÚ×Ó²éѯÀï²»±Ø²éÄÇô¶à£¬ÒòΪÍâ²ãÖ»ÐèÒª10¸ö£¬ËùÒÔÕâÀï»áºÏ²¢Limit10£¬ºÍLimit100
Ϊ Limit 10¡£
2.2¡¢Batch: ConstantFolding
Õâ¸öBatchÀï°üº¬ÁËRules£ºNullPropagation£¬ConstantFolding£¬BooleanSimplification£¬SimplifyFilters£¬SimplifyCasts£¬SimplifyCaseConversionExpressions¡£
2.2.1¡¢Rule£ºNullPropagation
ÕâÀïÏÈÌáÒ»ÏÂLiteral×ÖÃæÁ¿£¬ËüÆäʵÊÇÒ»¸öÄÜÆ¥ÅäÈÎÒâ»ù±¾ÀàÐ͵ÄÀà¡£(ΪÏÂÎÄ×öÆÌµæ)
object Literal { def apply(v: Any): Literal = v match { case i: Int => Literal(i, IntegerType) case l: Long => Literal(l, LongType) case d: Double => Literal(d, DoubleType) case f: Float => Literal(f, FloatType) case b: Byte => Literal(b, ByteType) case s: Short => Literal(s, ShortType) case s: String => Literal(s, StringType) case b: Boolean => Literal(b, BooleanType) case d: BigDecimal => Literal(d, DecimalType) case t: Timestamp => Literal(t, TimestampType) case a: Array[Byte] => Literal(a, BinaryType) case null => Literal(null, NullType) } } |
×¢ÒâLiteralÊÇÒ»¸öLeafExpression£¬ºËÐÄ·½·¨ÊÇeval£¬¸ø¶¨Row£¬¼ÆËã±í´ïʽ·µ»ØÖµ£º
case class Literal(value: Any, dataType: DataType) extends LeafExpression { override def foldable = true def nullable = value == null def references = Set.empty override def toString = if (value != null) value.toString else "null" type EvaluatedType = Any override def eval(input: Row):Any = value } |
ÏÖÔÚÀ´¿´Ò»ÏÂNullPropagation¶¼×öÁËʲô¡£
NullPropagationÊÇÒ»¸öÄܽ«Expression ExpressionsÌæ»»ÎªµÈ¼ÛµÄLiteralÖµµÄÓÅ»¯£¬²¢ÇÒÄܹ»±ÜÃâNULLÖµÔÚSQLÓï·¨Ê÷µÄ´«²¥¡£
/** * Replaces [[Expression Expressions]] that can be statically evaluated with * equivalent [[Literal]] values. This rule is more specific with * Null value propagation from bottom to top of the expression tree. */ object NullPropagation extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //Èç¹ûcount(null)Ôòת»¯Îªcount(0) case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)
<span style="font-family: Arial;">//Èç¹ûsum(null)Ôòת»¯Îªsum(0)</span> case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType) case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType) case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType) case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType) case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType) case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType) case e @ Coalesce(children) => { val newChildren = children.filter(c => c match { case Literal(null, _) => false case _ => true }) if (newChildren.length == 0) { Literal(null, e.dataType) } else if (newChildren.length == 1) { newChildren(0) } else { Coalesce(newChildren) } } case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValue case e @ In(Literal(v, _), list) if (list.exists(c => c match { case Literal(candidate, _) if candidate == v => true case _ => false })) => Literal(true, BooleanType) // Put exceptional cases above if any case e: BinaryArithmetic => e.children match { case Literal(null, _) :: right :: Nil => Literal(null, e.dataType) case left :: Literal(null, _) :: Nil => Literal(null, e.dataType) case _ => e } case e: BinaryComparison => e.children match { case Literal(null, _) :: right :: Nil => Literal(null, e.dataType) case left :: Literal(null, _) :: Nil => Literal(null, e.dataType) case _ => e } case e: StringRegexExpression => e.children match { case Literal(null, _) :: right :: Nil => Literal(null, e.dataType) case left :: Literal(null, _) :: Nil => Literal(null, e.dataType) case _ => e } } } } |
¸ø¶¨SQL: val query = sql("select count(null) from
temp_shengli where key is not null")
scala> query.queryExecution.analyzed res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [], [COUNT(null) AS c0#5L] //ÕâÀïcountµÄÊÇnull Filter IS NOT NULL key#7 MetastoreRelation default, temp_shengli, None |
µ÷ÓÃNullPropagation
scala> NullPropagation(query.queryExecution.analyzed) res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [], [CAST(0, LongType) AS c0#5L] //ÓÅ»¯ºóΪ0ÁË Filter IS NOT NULL key#7 MetastoreRelation default, temp_shengli, None |
2.2.2¡¢Rule£ºConstantFolding
³£Á¿ºÏ²¢ÊÇÊôÓÚExpressionÓÅ»¯µÄÒ»ÖÖ£¬¶ÔÓÚ¿ÉÒÔÖ±½Ó¼ÆËãµÄ³£Á¿£¬²»Ó÷ŵ½ÎïÀíÖ´ÐÐÀïÈ¥Éú³É¶ÔÏóÀ´¼ÆËãÁË£¬Ö±½Ó¿ÉÒÔÔڼƻ®Àï¾Í¼ÆËã³öÀ´£º
object ConstantFolding extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { //ÏȶÔplan½øÐÐtransform case q: LogicalPlan => q transformExpressionsDown { //¶Ôÿ¸öplanµÄexpression½øÐÐtransform // Skip redundant folding of literals. case l: Literal => l case e if e.foldable => Literal(e.eval(null), e.dataType) //µ÷ÓÃeval·½·¨¼ÆËã½á¹û } } } |
¸ø¶¨SQL£º val query = sql("select 1+2+3+4 from temp_shengli")
scala> query.queryExecution.analyzed res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(((1 + 2) + 3) + 4) AS c0#21] //ÕâÀﻹÊdz£Á¿±í´ïʽ MetastoreRelation default, src, None |
ÓÅ»¯ºó£º
scala> query.queryExecution.optimizedPlan res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [10 AS c0#21] //ÓÅ»¯ºó£¬Ö±½ÓºÏ²¢Îª10 MetastoreRelation default, src, None |
2.2.3¡¢BooleanSimplification
Õâ¸öÊǶԲ¼¶û±í´ïʽµÄÓÅ»¯£¬ÓеãÏñjava²¼¶û±í´ïʽÖеĶÌ·Åжϣ¬²»¹ýÕâ¸öдµÄµ¹ÊǺÜÓÅÑÅ¡£
¿´¿´²¼¶û±í´ïʽ2±ßÄܲ»ÄÜͨ¹ýÖ»¼ÆËã1±ß£¬¶øÊ¡È¥¼ÆËãÁíÒ»±ß¶øÌá¸ßЧÂÊ£¬³ÆÎª¼ò»¯²¼¶û±í´ïʽ¡£
½âÊÍÇë¿´ÎÒдµÄ×¢ÊÍ£º
/** * Simplifies boolean expressions where the answer can be determined without evaluating both sides. * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus * is only safe when evaluations of expressions does not result in side effects. */ object BooleanSimplification extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { case and @ And(left, right) => //Èç¹û²¼¶û±í´ïʽÊÇAND²Ù×÷£¬¼´exp1 and exp2 (left, right) match { //£¨×ó±ß±í´ïʽ£¬Óұ߱í´ïʽ£© case (Literal(true, BooleanType), r) => r
// ×ó±ßtrue£¬·µ»ØÓұߵÄ<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">Öµ</span> case (l, Literal(true, BooleanType)) => l //ÓÒ±ßtrue£¬·µ»Ø×ó±ßµÄboolÖµ case (Literal(false, BooleanType), _) => Literal(false)//×ó±ß¶¼false£¬ÓÒ±ßËæ±ã£¬·´ÕýÊÇ·µ»Øfalse case (_, Literal(false, BooleanType)) => Literal(false)//Ö»ÒªÓÐ1±ßÊÇfalseÁË£¬¶¼ÊÇfalse case (_, _) => and } case or @ Or(left, right) => (left, right) match { case (Literal(true, BooleanType), _) => Literal(true) //Ö»Òª×ó±ßÊÇtrueÁË£¬²»ÓÃÅжÏÓұ߶¼ÊÇtrue case (_, Literal(true, BooleanType)) => Literal(true) //Ö»ÒªÓÐÒ»±ßÊÇtrue£¬¶¼·µ»Øtrue case (Literal(false, BooleanType), r) => r //Ï£ÍûÓÒ±ßrÊÇtrue case (l, Literal(false, BooleanType)) => l case (_, _) => or } } } } |
2.3 Batch: Filter Pushdown
Filter Pushdownϰüº¬ÁËCombineFilters¡¢PushPredicateThroughProject¡¢PushPredicateThroughJoin¡¢ColumnPruning
Ps£º¸Ð¾õFilter PushdownµÄÃû×ÖÆðµÄÓе㲻Äܺ¸ÇÈ«²¿±ÈÈçColumnPruningÁвüô¡£
2.3.1¡¢Combine Filters
ºÏ²¢Á½¸öÏàÁÚµÄFilter,Õâ¸öºÍÉÏÊöCombine Limit²î²»¶à¡£ºÏ²¢2¸ö½Úµã£¬¾Í¿ÉÒÔ¼õÉÙÊ÷µÄÉî¶È´Ó¶ø¼õÉÙÖØ¸´Ö´ÐйýÂ˵Ĵú¼Û¡£
/** * Combines two adjacent [[Filter]] operators into one, merging the * conditions into one conjunctive predicate. */ object CombineFilters extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild) } } |
¸ø¶¨SQL£ºval query = sql("select key from (select
key from temp_shengli where key >100)a where key
> 80 ")
ÓÅ»¯Ç°£ºÎÒÃÇ¿´µ½Ò»¸öfilter ÊÇÁíÒ»¸öfilterµÄgrandChild
scala> query.queryExecution.analyzed res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#27] Filter (key#27 > 80) //filter>80 Project [key#27] Filter (key#27 > 100) //filter>100 MetastoreRelation default, src, None |
ÓÅ»¯ºó£ºÆäʵfilterÒ²¿ÉÒÔ±í´ïΪһ¸ö¸´ÔÓµÄboolean±í´ïʽ
scala> query.queryExecution.optimizedPlan res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#27] Filter ((key#27 > 100) && (key#27 > 80)) //ºÏ²¢Îª1¸ö MetastoreRelation default, src, None |
2.3.2 Filter Pushdown
Filter Pushdown£¬¹ýÂËÆ÷ÏÂÍÆ¡£
ÔÀí¾ÍÊǸüÔçµÄ¹ýÂ˵ô²»ÐèÒªµÄÔªËØÀ´¼õÉÙ¿ªÏú¡£
¸ø¶¨SQL£ºval query = sql("select key
from (select * from temp_shengli)a where key>100")
Éú³ÉµÄÂß¼¼Æ»®Îª£º
scala> scala> query.queryExecution.analyzed res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#31] Filter (key#31 > 100) //ÏÈselect key, value£¬È»ºóÔÙFilter Project [key#31,value#32] MetastoreRelation default, src, None |
ÓÅ»¯ºóµÄ¼Æ»®Îª£º
query.queryExecution.optimizedPlan res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#31] Filter (key#31 > 100) //ÏÈfilter,È»ºóÔÙselect MetastoreRelation default, src, None |
2.3.3¡¢ColumnPruning
ÁвüôÓõıȽ϶࣬¾ÍÊǼõÉÙ²»±ØÒªselectµÄijЩÁС£
ÁвüôÔÚ3Öֵط½¿ÉÒÔÓãº
1¡¢ÔھۺϲÙ×÷ÖУ¬¿ÉÒÔ×öÁвüô
2¡¢ÔÚjoin²Ù×÷ÖУ¬×óÓÒº¢×Ó¿ÉÒÔ×öÁвüô
3¡¢ºÏ²¢ÏàÁÚµÄProjectµÄÁÐ
object ColumnPruning extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Eliminate attributes that are not needed to calculate the specified aggregates. case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
////Èç¹ûprojectµÄoutputSetÖмõÈ¥a.referencesµÄÔªËØÈç¹û²»Í¬£¬ÄÇô¾Í½«AggreagteµÄchildÌæ»»Îªa.references a.copy(child = Project(a.references.toSeq, child)) // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) =>
// Ïû³ýjoinµÄleft ºÍ rightº¢×ӵIJ»±ØÒªÊôÐÔ£¬½«joinµÄ×óÓÒ×ÓÊ÷µÄÁнøÐвüô // Collect the list of off references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) /** Applies a projection only when the child is producing unnecessary attributes */ def prunedChild(c: LogicalPlan) = if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { Project(allReferences.filter(c.outputSet.contains).toSeq, c) } else { c } Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => //ºÏ²¢ÏàÁÚProjectµÄÁÐ // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)). val aliasMap = projectList2.collect { case a @ Alias(e, _) => (a.toAttribute: Expression, a) }.toMap // Substitute any attributes that are produced by the child projection, so that we safely // eliminate it. // e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...' // TODO: Fix TransformBase to avoid the cast below. val substitutedProjection = projectList1.map(_.transform { case a if aliasMap.contains(a) => aliasMap(a) }).asInstanceOf[Seq[NamedExpression]] Project(substitutedProjection, child) // Eliminate no-op Projects case Project(projectList, child) if child.output == projectList => child } } |
·Ö±ð¾ÙÈý¸öÀý×ÓÀ´¶ÔÓ¦ÈýÖÖÇé¿ö½øÐÐ˵Ã÷£º
1¡¢ÔھۺϲÙ×÷ÖУ¬¿ÉÒÔ×öÁвüô
¸ø¶¨SQL£ºval query = sql("SELECT 1+1
as shengli, key from (select key, value from temp_shengli)a
group by key")
ÓÅ»¯Ç°£º
res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [key#51], [(1 + 1) AS shengli#49,key#51] Project [key#51,value#52] //ÓÅ»¯Ç°Ä¬ÈÏselect key ºÍ valueÁ½ÁÐ MetastoreRelation default, temp_shengli, None |
ÓÅ»¯ºó£º
scala> ColumnPruning1(query.queryExecution.analyzed) MetastoreRelation default, temp_shengli, None res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Aggregate [key#51], [(1 + 1) AS shengli#49,key#51] Project [key#51] //ÓÅ»¯ºó£¬ÁвüôµôÁËvalue£¬Ö»select key MetastoreRelation default, temp_shengli, None |
2¡¢ÔÚjoin²Ù×÷ÖУ¬×óÓÒº¢×Ó¿ÉÒÔ×öÁвüô
¸ø¶¨SQL£ºval query = sql("select a.value
qween from (select * from temp_shengli) a join (select
* from temp_shengli)b on a.key =b.key ")
ûÓÐÓÅ»¯Ö®Ç°£º
scala> query.queryExecution.analyzed res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [value#42 AS qween#39] Join Inner, Some((key#41 = key#43)) Project [key#41,value#42] //ÕâÀï¶àselectÁËÒ»ÁУ¬¼´value MetastoreRelation default, temp_shengli, None Project [key#43,value#44] //ÕâÀï¶àselectÁËÒ»ÁУ¬¼´value MetastoreRelation default, temp_shengli, None |
ÓÅ»¯ºó£º£¨ColumnPruning2ÊÇÎÒ×Ô¼ºµ÷ÊÔÓõģ©
scala> ColumnPruning2(query.queryExecution.analyzed) allReferences is -> Set(key#35, key#37) MetastoreRelation default, temp_shengli, None MetastoreRelation default, temp_shengli, None res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#35 AS qween#33] Join Inner, Some((key#35 = key#37)) Project [key#35] //¾¹ýÁвüôÖ®ºó£¬left ChildÖ»ÐèÒªselect keyÕâÒ»¸öÁÐ MetastoreRelation default, temp_shengli, None Project [key#37] //¾¹ýÁвüôÖ®ºó£¬right ChildÖ»ÐèÒªselect keyÕâÒ»¸öÁÐ MetastoreRelation default, temp_shengli, None |
3¡¢ºÏ²¢ÏàÁÚµÄProjectµÄÁУ¬²Ã¼ô
¸ø¶¨SQL£ºval query = sql("SELECT c
+ 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ")
ÓÅ»¯Ç°£º
scala> query.queryExecution.analyzed res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(c#56 + 1) AS c0#57] Project [(1 + 1) AS c#56] MetastoreRelation default, temp_shengli, None |
ÓÅ»¯ºó£º
scala> query.queryExecution.optimizedPlan res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [(2 AS c#56 + 1) AS c0#57] //½«×Ó²éѯÀïµÄc ´úÈëµ½ Íâ²ãselectÀïµÄc£¬Ö±½Ó¼ÆËã½á¹û MetastoreRelation default, temp_shengli, None |
Èý¡¢×ܽ᣺
±¾ÎĽéÉÜÁËOptimizerÔÚCatalystÀïµÄ×÷Óü´½«Analyzed Logical Plan
¾¹ý¶ÔLogical PlanºÍExpression½øÐÐRuleµÄÓ¦ÓÃtransfrom£¬´Ó¶ø´ïµ½Ê÷µÄ½Úµã½øÐкϲ¢ºÍÓÅ»¯¡£ÆäÖÐÖ÷ÒªµÄÓÅ»¯µÄ²ßÂÔ×ܽáÆðÀ´ÊǺϲ¢¡¢Áвüô¡¢¹ýÂËÆ÷ÏÂÍÆ¼¸´óÀà¡£
CatalystÓ¦¸ÃÔÚ²»¶Ïµü´úÖУ¬±¾ÎÄÖ»ÊÇ»ùÓÚspark1.0.0½øÐÐÑо¿£¬ºóÐøÈç¹ûмÓÈëµÄÓÅ»¯²ßÂÔÒ²»áÔÚºóÐø²¹³ä½øÀ´¡£
»¶Ó´ó¼ÒÌÖÂÛ£¬¹²Í¬½ø²½£¡
|