Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark SQL CatalystÔ´Âë·ÖÎöÖ®Optimizer
 
×÷Õß OopsOutOfMemory £¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-08-25
  3333  次浏览      27
 

ǰ¼¸ÆªÎÄÕ½éÉÜÁËSpark SQLµÄCatalystµÄºËÐÄÔËÐÐÁ÷³Ì¡¢SqlParser£¬ºÍAnalyzer ÒÔ¼°ºËÐÄÀà¿âTreeNode£¬±¾ÎĽ«Ïêϸ½²½âSpark SQLµÄOptimizerµÄÓÅ»¯Ë¼ÏëÒÔ¼°OptimizerÔÚCatalystÀïµÄ±íÏÖ·½Ê½£¬²¢¼ÓÉÏ×Ô¼ºµÄʵ¼ù£¬¶ÔOptimizerÓÐÒ»¸öÖ±¹ÛµÄÈÏʶ¡£

OptimizerµÄÖ÷ÒªÖ°ÔðÊǽ«Analyzer¸øResolvedµÄLogical Plan¸ù¾Ý²»Í¬µÄÓÅ»¯²ßÂÔBatch£¬À´¶ÔÓï·¨Ê÷½øÐÐÓÅ»¯£¬ÓÅ»¯Âß¼­¼Æ»®½Úµã(Logical Plan)ÒÔ¼°±í´ïʽ(Expression)£¬Ò²ÊÇת»»³ÉÎïÀíÖ´Ðмƻ®µÄǰÖá£ÈçÏÂͼ£º

Ò»¡¢Optimizer

OptimizerÕâ¸öÀàÊÇÔÚcatalystÀïµÄoptimizer°üϵÄΨһһ¸öÀ࣬OptimizerµÄ¹¤×÷·½Ê½ÆäʵÀàËÆAnalyzer£¬ÒòΪËüÃǶ¼¼Ì³Ð×ÔRuleExecutor[LogicalPlan]£¬¶¼ÊÇÖ´ÐÐһϵÁеÄBatch²Ù×÷£º

OptimizerÀïµÄbatches°üº¬ÁË3ÀàÓÅ»¯²ßÂÔ£º1¡¢Combine Limits ºÏ²¢Limits 2¡¢ConstantFolding ³£Á¿ºÏ²¢ 3¡¢Filter Pushdown ¹ýÂËÆ÷ÏÂÍÆ,ÿ¸öBatchÀﶨÒåµÄÓÅ»¯°éËæ¶ÔÏó¶¼¶¨ÒåÔÚOptimizerÀïÁË£º

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Combine Limits", FixedPoint(100),
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
NullPropagation,
ConstantFolding,
BooleanSimplification,
SimplifyFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughJoin,
ColumnPruning) :: Nil
}

ÁíÍâÌáÒ»µã£¬OptimizerÀï²»µ«¶ÔLogical Plan½øÐÐÁËÓÅ»¯£¬¶øÇÒ¶ÔLogical PlanÖеÄExpressionÒ²½øÐÐÁËÓÅ»¯£¬ËùÒÔÓбØÒªÁ˽âÒ»ÏÂExpressionÏà¹ØÀ࣬Ö÷ÒªÊÇÓõ½ÁËreferencesºÍoutputSet£¬referencesÖ÷ÒªÊÇLogical Plan»òExpression½ÚµãµÄËùÒÀÀµµÄÄÇЩExpressions£¬¶øoutputSetÊÇLogical PlanËùÓеÄAttributeµÄÊä³ö£º

È磺AggregateÊÇÒ»¸öLogical Plan£¬ ËüµÄreferences¾ÍÊÇgroup byµÄ±í´ïʽ ºÍ aggreagateµÄ±í´ïʽµÄ²¢¼¯È¥ÖØ¡£

case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan)
extends UnaryNode {
override def output = aggregateExpressions.map(_.toAttribute)
override def references =
(groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet
}

¶þ¡¢ÓÅ»¯²ßÂÔÏê½â

OptimizerµÄÓÅ»¯²ßÂÔ²»½öÓжÔplan½øÐÐtransformµÄ£¬Ò²ÓжÔexpression½øÐÐtransformµÄ£¬¾¿ÆäÔ­Àí¾ÍÊDZéÀúÊ÷£¬È»ºóÓ¦ÓÃÓÅ»¯µÄRule£¬µ«ÊÇ×¢ÒâÒ»µã£¬¶ÔLogical PlantransfromµÄÊÇÏÈÐò±éÀú(pre-order)£¬¶ø¶ÔExpression transfromµÄʱºòÊǺóÐò±éÀú(post-order)£º

2.1¡¢Batch: Combine Limits

Èç¹û³öÏÖÁË2¸öLimit£¬Ôò½«2¸öLimitºÏ²¢ÎªÒ»¸ö£¬Õâ¸öÒªÇóÒ»¸öLimitÊÇÁíÒ»¸öLimitµÄgrandChild¡£

 /**
* Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
*/
object CombineLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ll @ Limit(le, nl @ Limit(ne, grandChild)) => //llΪµ±Ç°Limit,leΪÆäexpression£¬ nlÊÇllµÄgrandChild£¬neÊÇnlµÄexpression
Limit(If(LessThan(ne, le), ne, le), grandChild) //expression±È½Ï£¬Èç¹ûne±ÈleСÔò±í´ïʽΪne£¬·ñÔòΪle
}
}

¸ø¶¨SQL£ºval query = sql("select * from (select * from temp_shengli limit 100)a limit 10 ")

scala> query.queryExecution.analyzed
res12: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Limit 10
Project [key#13,value#14]
Limit 100
Project [key#13,value#14]
MetastoreRelation default, temp_shengli, None

×Ó²éѯÀïlimit100,Íâ²ã²éѯlimit10£¬ÕâÀïÎÒÃǵ±È»¿ÉÒÔÔÚ×Ó²éѯÀï²»±Ø²éÄÇô¶à£¬ÒòΪÍâ²ãÖ»ÐèÒª10¸ö£¬ËùÒÔÕâÀï»áºÏ²¢Limit10£¬ºÍLimit100 Ϊ Limit 10¡£

2.2¡¢Batch: ConstantFolding

Õâ¸öBatchÀï°üº¬ÁËRules£ºNullPropagation£¬ConstantFolding£¬BooleanSimplification£¬SimplifyFilters£¬SimplifyCasts£¬SimplifyCaseConversionExpressions¡£

2.2.1¡¢Rule£ºNullPropagation

ÕâÀïÏÈÌáÒ»ÏÂLiteral×ÖÃæÁ¿£¬ËüÆäʵÊÇÒ»¸öÄÜÆ¥ÅäÈÎÒâ»ù±¾ÀàÐ͵ÄÀà¡£(ΪÏÂÎÄ×öÆÌµæ)

object Literal {
def apply(v: Any): Literal = v match {
case i: Int => Literal(i, IntegerType)
case l: Long => Literal(l, LongType)
case d: Double => Literal(d, DoubleType)
case f: Float => Literal(f, FloatType)
case b: Byte => Literal(b, ByteType)
case s: Short => Literal(s, ShortType)
case s: String => Literal(s, StringType)
case b: Boolean => Literal(b, BooleanType)
case d: BigDecimal => Literal(d, DecimalType)
case t: Timestamp => Literal(t, TimestampType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
}
}

×¢ÒâLiteralÊÇÒ»¸öLeafExpression£¬ºËÐÄ·½·¨ÊÇeval£¬¸ø¶¨Row£¬¼ÆËã±í´ïʽ·µ»ØÖµ£º

case class Literal(value: Any, dataType: DataType) extends LeafExpression {  
override def foldable = true
def nullable = value == null
def references = Set.empty
override def toString = if (value != null) value.toString else "null"
type EvaluatedType = Any
override def eval(input: Row):Any = value
}

ÏÖÔÚÀ´¿´Ò»ÏÂNullPropagation¶¼×öÁËʲô¡£

NullPropagationÊÇÒ»¸öÄܽ«Expression ExpressionsÌæ»»ÎªµÈ¼ÛµÄLiteralÖµµÄÓÅ»¯£¬²¢ÇÒÄܹ»±ÜÃâNULLÖµÔÚSQLÓï·¨Ê÷µÄ´«²¥¡£

/** 
* Replaces [[Expression Expressions]] that can be statically evaluated with
* equivalent [[Literal]] values. This rule is more specific with
* Null value propagation from bottom to top of the expression tree.
*/
object NullPropagation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsUp {
case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType) //Èç¹ûcount(null)Ôòת»¯Îªcount(0)
case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType) <span style="font-family: Arial;">//Èç¹ûsum(null)Ôòת»¯Îªsum(0)</span>
case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType)
case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)
case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)
case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)
case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)
case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)
case e @ Coalesce(children) => {
val newChildren = children.filter(c => c match {
case Literal(null, _) => false
case _ => true
})
if (newChildren.length == 0) {
Literal(null, e.dataType)
} else if (newChildren.length == 1) {
newChildren(0)
} else {
Coalesce(newChildren)
}
}
case e @ If(Literal(v, _), trueValue, falseValue) => if (v == true) trueValue else falseValue
case e @ In(Literal(v, _), list) if (list.exists(c => c match {
case Literal(candidate, _) if candidate == v => true
case _ => false
})) => Literal(true, BooleanType)
// Put exceptional cases above if any
case e: BinaryArithmetic => e.children match {
case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
case _ => e
}
case e: BinaryComparison => e.children match {
case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
case _ => e
}
case e: StringRegexExpression => e.children match {
case Literal(null, _) :: right :: Nil => Literal(null, e.dataType)
case left :: Literal(null, _) :: Nil => Literal(null, e.dataType)
case _ => e
}
}
}
}

¸ø¶¨SQL: val query = sql("select count(null) from temp_shengli where key is not null")

scala> query.queryExecution.analyzed  
res6: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [], [COUNT(null) AS c0#5L] //ÕâÀïcountµÄÊÇnull
Filter IS NOT NULL key#7
MetastoreRelation default, temp_shengli, None

µ÷ÓÃNullPropagation

scala> NullPropagation(query.queryExecution.analyzed)  
res7: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [], [CAST(0, LongType) AS c0#5L] //ÓÅ»¯ºóΪ0ÁË
Filter IS NOT NULL key#7
MetastoreRelation default, temp_shengli, None

2.2.2¡¢Rule£ºConstantFolding

³£Á¿ºÏ²¢ÊÇÊôÓÚExpressionÓÅ»¯µÄÒ»ÖÖ£¬¶ÔÓÚ¿ÉÒÔÖ±½Ó¼ÆËãµÄ³£Á¿£¬²»Ó÷ŵ½ÎïÀíÖ´ÐÐÀïÈ¥Éú³É¶ÔÏóÀ´¼ÆËãÁË£¬Ö±½Ó¿ÉÒÔÔڼƻ®Àï¾Í¼ÆËã³öÀ´£º

object ConstantFolding extends Rule[LogicalPlan] {  
def apply(plan: LogicalPlan): LogicalPlan = plan transform { //ÏȶÔplan½øÐÐtransform
case q: LogicalPlan => q transformExpressionsDown { //¶Ôÿ¸öplanµÄexpression½øÐÐtransform
// Skip redundant folding of literals.
case l: Literal => l
case e if e.foldable => Literal(e.eval(null), e.dataType) //µ÷ÓÃeval·½·¨¼ÆËã½á¹û
}
}
}

¸ø¶¨SQL£º val query = sql("select 1+2+3+4 from temp_shengli")

scala> query.queryExecution.analyzed  
res23: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(((1 + 2) + 3) + 4) AS c0#21] //ÕâÀﻹÊdz£Á¿±í´ïʽ
MetastoreRelation default, src, None

ÓÅ»¯ºó£º

scala> query.queryExecution.optimizedPlan  
res24: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [10 AS c0#21] //ÓÅ»¯ºó£¬Ö±½ÓºÏ²¢Îª10
MetastoreRelation default, src, None

2.2.3¡¢BooleanSimplification

Õâ¸öÊǶԲ¼¶û±í´ïʽµÄÓÅ»¯£¬ÓеãÏñjava²¼¶û±í´ïʽÖеĶÌ·Åжϣ¬²»¹ýÕâ¸öдµÄµ¹ÊǺÜÓÅÑÅ¡£

¿´¿´²¼¶û±í´ïʽ2±ßÄܲ»ÄÜͨ¹ýÖ»¼ÆËã1±ß£¬¶øÊ¡È¥¼ÆËãÁíÒ»±ß¶øÌá¸ßЧÂÊ£¬³ÆÎª¼ò»¯²¼¶û±í´ïʽ¡£

½âÊÍÇë¿´ÎÒдµÄ×¢ÊÍ£º

/** 
* Simplifies boolean expressions where the answer can be determined without evaluating both sides.
* Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
* is only safe when evaluations of expressions does not result in side effects.
*/
object BooleanSimplification extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsUp {
case and @ And(left, right) => //Èç¹û²¼¶û±í´ïʽÊÇAND²Ù×÷£¬¼´exp1 and exp2
(left, right) match { //£¨×ó±ß±í´ïʽ£¬Óұ߱í´ïʽ£©
case (Literal(true, BooleanType), r) => r // ×ó±ßtrue£¬·µ»ØÓұߵÄ<span style="font-family: Arial;">bool</span><span style="font-family: Arial;">Öµ</span>
case (l, Literal(true, BooleanType)) => l //ÓÒ±ßtrue£¬·µ»Ø×ó±ßµÄboolÖµ
case (Literal(false, BooleanType), _) => Literal(false)//×ó±ß¶¼false£¬ÓÒ±ßËæ±ã£¬·´ÕýÊÇ·µ»Øfalse
case (_, Literal(false, BooleanType)) => Literal(false)//Ö»ÒªÓÐ1±ßÊÇfalseÁË£¬¶¼ÊÇfalse
case (_, _) => and
}

case or @ Or(left, right) =>
(left, right) match {
case (Literal(true, BooleanType), _) => Literal(true) //Ö»Òª×ó±ßÊÇtrueÁË£¬²»ÓÃÅжÏÓұ߶¼ÊÇtrue
case (_, Literal(true, BooleanType)) => Literal(true) //Ö»ÒªÓÐÒ»±ßÊÇtrue£¬¶¼·µ»Øtrue
case (Literal(false, BooleanType), r) => r //Ï£ÍûÓÒ±ßrÊÇtrue
case (l, Literal(false, BooleanType)) => l
case (_, _) => or
}
}
}
}

2.3 Batch: Filter Pushdown

Filter Pushdownϰüº¬ÁËCombineFilters¡¢PushPredicateThroughProject¡¢PushPredicateThroughJoin¡¢ColumnPruning

Ps£º¸Ð¾õFilter PushdownµÄÃû×ÖÆðµÄÓе㲻Äܺ­¸ÇÈ«²¿±ÈÈçColumnPruningÁвüô¡£

2.3.1¡¢Combine Filters

ºÏ²¢Á½¸öÏàÁÚµÄFilter,Õâ¸öºÍÉÏÊöCombine Limit²î²»¶à¡£ºÏ²¢2¸ö½Úµã£¬¾Í¿ÉÒÔ¼õÉÙÊ÷µÄÉî¶È´Ó¶ø¼õÉÙÖØ¸´Ö´ÐйýÂ˵Ĵú¼Û¡£

/** 
* Combines two adjacent [[Filter]] operators into one, merging the
* conditions into one conjunctive predicate.
*/
object CombineFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
}
}

¸ø¶¨SQL£ºval query = sql("select key from (select key from temp_shengli where key >100)a where key > 80 ")
ÓÅ»¯Ç°£ºÎÒÃÇ¿´µ½Ò»¸öfilter ÊÇÁíÒ»¸öfilterµÄgrandChild

scala> query.queryExecution.analyzed  
res25: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#27]
Filter (key#27 > 80) //filter>80
Project [key#27]
Filter (key#27 > 100) //filter>100
MetastoreRelation default, src, None

ÓÅ»¯ºó£ºÆäʵfilterÒ²¿ÉÒÔ±í´ïΪһ¸ö¸´ÔÓµÄboolean±í´ïʽ

scala> query.queryExecution.optimizedPlan  
res26: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#27]
Filter ((key#27 > 100) && (key#27 > 80)) //ºÏ²¢Îª1¸ö
MetastoreRelation default, src, None

2.3.2 Filter Pushdown

Filter Pushdown£¬¹ýÂËÆ÷ÏÂÍÆ¡£

Ô­Àí¾ÍÊǸüÔçµÄ¹ýÂ˵ô²»ÐèÒªµÄÔªËØÀ´¼õÉÙ¿ªÏú¡£

¸ø¶¨SQL£ºval query = sql("select key from (select * from temp_shengli)a where key>100")

Éú³ÉµÄÂß¼­¼Æ»®Îª£º

scala> scala> query.queryExecution.analyzed  
res29: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#31]
Filter (key#31 > 100) //ÏÈselect key, value£¬È»ºóÔÙFilter
Project [key#31,value#32]
MetastoreRelation default, src, None

ÓÅ»¯ºóµÄ¼Æ»®Îª£º

query.queryExecution.optimizedPlan  
res30: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#31]
Filter (key#31 > 100) //ÏÈfilter,È»ºóÔÙselect
MetastoreRelation default, src, None

2.3.3¡¢ColumnPruning

ÁвüôÓõıȽ϶࣬¾ÍÊǼõÉÙ²»±ØÒªselectµÄijЩÁС£

ÁвüôÔÚ3Öֵط½¿ÉÒÔÓãº

1¡¢ÔھۺϲÙ×÷ÖУ¬¿ÉÒÔ×öÁвüô

2¡¢ÔÚjoin²Ù×÷ÖУ¬×óÓÒº¢×Ó¿ÉÒÔ×öÁвüô

3¡¢ºÏ²¢ÏàÁÚµÄProjectµÄÁÐ

object ColumnPruning extends Rule[LogicalPlan] {  
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Eliminate attributes that are not needed to calculate the specified aggregates.
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => ////Èç¹ûprojectµÄoutputSetÖмõÈ¥a.referencesµÄÔªËØÈç¹û²»Í¬£¬ÄÇô¾Í½«AggreagteµÄchildÌæ»»Îªa.references
a.copy(child = Project(a.references.toSeq, child))

// Eliminate unneeded attributes from either side of a Join.
case Project(projectList, Join(left, right, joinType, condition)) => // Ïû³ýjoinµÄleft ºÍ rightº¢×ӵIJ»±ØÒªÊôÐÔ£¬½«joinµÄ×óÓÒ×ÓÊ÷µÄÁнøÐвüô
// Collect the list of off references required either above or to evaluate the condition.
val allReferences: Set[Attribute] =
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)

/** Applies a projection only when the child is producing unnecessary attributes */
def prunedChild(c: LogicalPlan) =
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}
Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))

// Combine adjacent Projects.
case Project(projectList1, Project(projectList2, child)) => //ºÏ²¢ÏàÁÚProjectµÄÁÐ
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
val aliasMap = projectList2.collect {
case a @ Alias(e, _) => (a.toAttribute: Expression, a)
}.toMap

// Substitute any attributes that are produced by the child projection, so that we safely
// eliminate it.
// e.g., 'SELECT c + 1 FROM (SELECT a + b AS C ...' produces 'SELECT a + b + 1 ...'
// TODO: Fix TransformBase to avoid the cast below.
val substitutedProjection = projectList1.map(_.transform {
case a if aliasMap.contains(a) => aliasMap(a)
}).asInstanceOf[Seq[NamedExpression]]

Project(substitutedProjection, child)

// Eliminate no-op Projects
case Project(projectList, child) if child.output == projectList => child
}
}

·Ö±ð¾ÙÈý¸öÀý×ÓÀ´¶ÔÓ¦ÈýÖÖÇé¿ö½øÐÐ˵Ã÷£º

1¡¢ÔھۺϲÙ×÷ÖУ¬¿ÉÒÔ×öÁвüô

¸ø¶¨SQL£ºval query = sql("SELECT 1+1 as shengli, key from (select key, value from temp_shengli)a group by key")

ÓÅ»¯Ç°£º

res57: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]
Project [key#51,value#52] //ÓÅ»¯Ç°Ä¬ÈÏselect key ºÍ valueÁ½ÁÐ
MetastoreRelation default, temp_shengli, None

ÓÅ»¯ºó£º

scala> ColumnPruning1(query.queryExecution.analyzed)  
MetastoreRelation default, temp_shengli, None
res59: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Aggregate [key#51], [(1 + 1) AS shengli#49,key#51]
Project [key#51] //ÓÅ»¯ºó£¬ÁвüôµôÁËvalue£¬Ö»select key
MetastoreRelation default, temp_shengli, None

2¡¢ÔÚjoin²Ù×÷ÖУ¬×óÓÒº¢×Ó¿ÉÒÔ×öÁвüô

¸ø¶¨SQL£ºval query = sql("select a.value qween from (select * from temp_shengli) a join (select * from temp_shengli)b on a.key =b.key ")

ûÓÐÓÅ»¯Ö®Ç°£º

scala> query.queryExecution.analyzed  
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#42 AS qween#39]
Join Inner, Some((key#41 = key#43))
Project [key#41,value#42] //ÕâÀï¶àselectÁËÒ»ÁУ¬¼´value
MetastoreRelation default, temp_shengli, None
Project [key#43,value#44] //ÕâÀï¶àselectÁËÒ»ÁУ¬¼´value
MetastoreRelation default, temp_shengli, None

ÓÅ»¯ºó£º£¨ColumnPruning2ÊÇÎÒ×Ô¼ºµ÷ÊÔÓõģ©

scala> ColumnPruning2(query.queryExecution.analyzed)  
allReferences is -> Set(key#35, key#37)
MetastoreRelation default, temp_shengli, None
MetastoreRelation default, temp_shengli, None
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#35 AS qween#33]
Join Inner, Some((key#35 = key#37))
Project [key#35] //¾­¹ýÁвüôÖ®ºó£¬left ChildÖ»ÐèÒªselect keyÕâÒ»¸öÁÐ
MetastoreRelation default, temp_shengli, None
Project [key#37] //¾­¹ýÁвüôÖ®ºó£¬right ChildÖ»ÐèÒªselect keyÕâÒ»¸öÁÐ
MetastoreRelation default, temp_shengli, None

3¡¢ºÏ²¢ÏàÁÚµÄProjectµÄÁУ¬²Ã¼ô

¸ø¶¨SQL£ºval query = sql("SELECT c + 1 FROM (SELECT 1 + 1 as c from temp_shengli ) a ")

ÓÅ»¯Ç°£º

scala> query.queryExecution.analyzed  
res61: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(c#56 + 1) AS c0#57]
Project [(1 + 1) AS c#56]
MetastoreRelation default, temp_shengli, None

ÓÅ»¯ºó£º

scala> query.queryExecution.optimizedPlan  
res62: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [(2 AS c#56 + 1) AS c0#57] //½«×Ó²éѯÀïµÄc ´úÈëµ½ Íâ²ãselectÀïµÄc£¬Ö±½Ó¼ÆËã½á¹û
MetastoreRelation default, temp_shengli, None

Èý¡¢×ܽ᣺

±¾ÎĽéÉÜÁËOptimizerÔÚCatalystÀïµÄ×÷Óü´½«Analyzed Logical Plan ¾­¹ý¶ÔLogical PlanºÍExpression½øÐÐRuleµÄÓ¦ÓÃtransfrom£¬´Ó¶ø´ïµ½Ê÷µÄ½Úµã½øÐкϲ¢ºÍÓÅ»¯¡£ÆäÖÐÖ÷ÒªµÄÓÅ»¯µÄ²ßÂÔ×ܽáÆðÀ´ÊǺϲ¢¡¢Áвüô¡¢¹ýÂËÆ÷ÏÂÍÆ¼¸´óÀà¡£

CatalystÓ¦¸ÃÔÚ²»¶Ïµü´úÖУ¬±¾ÎÄÖ»ÊÇ»ùÓÚspark1.0.0½øÐÐÑо¿£¬ºóÐøÈç¹ûмÓÈëµÄÓÅ»¯²ßÂÔÒ²»áÔÚºóÐø²¹³ä½øÀ´¡£

»¶Ó­´ó¼ÒÌÖÂÛ£¬¹²Í¬½ø²½£¡

   
3333 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí