½ÓÉÏһƪÎÄÕÂSpark SQL CatalystÔ´Âë·ÖÎöÖ®Physical
Plan£¬±¾ÎĽ«½éÉÜPhysical PlanµÄtoRDDµÄ¾ßÌåʵÏÖϸ½Ú£º
ÎÒÃǶ¼ÖªµÀÒ»¶Îsql£¬ÕæÕýµÄÖ´ÐÐÊǵ±Äãµ÷ÓÃËüµÄcollect()·½·¨²Å»áÖ´ÐÐSpark
Job£¬×îºó¼ÆËãµÃµ½RDD¡£
lazy val toRdd: RDD[Row] = executedPlan.execute() |
Spark Plan»ù±¾°üº¬4ÖÖ²Ù×÷ÀàÐÍ£¬¼´BasicOperator»ù±¾ÀàÐÍ£¬»¹ÓоÍÊÇJoin¡¢AggregateºÍSortÕâÖÖÉÔ¸´Ôӵġ£
Èçͼ£º

Ò»¡¢BasicOperator
1.1¡¢Project
Project µÄ´óÖº¬ÒåÊÇ£º´«ÈëһϵÁбí´ïʽSeq[NamedExpression]£¬¸ø¶¨ÊäÈëµÄRow£¬¾¹ýConvert£¨ExpressionµÄ¼ÆËãeval£©²Ù×÷£¬Éú³ÉÒ»¸öеÄRow¡£
ProjectµÄʵÏÖÊǵ÷ÓÃÆächild.execute()·½·¨£¬È»ºóµ÷ÓÃmapPartitions¶Ôÿһ¸öPartition½øÐвÙ×÷¡£
Õâ¸öfº¯ÊýÆäʵÊÇnewÁËÒ»¸öMutableProjection£¬È»ºóÑ»·µÄ¶Ôÿ¸öpartition½øÐÐConvert¡£
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode { override def output = projectList.map(_.toAttribute) override def execute() = child.execute().mapPartitions { iter => //¶Ôÿ¸ö·ÖÇø½øÐÐfÓ³Éä @transient val reusableProjection = new MutableProjection(projectList) iter.map(reusableProjection) } } |
ͨ¹ý¹Û²ìMutableProjectionµÄ¶¨Ò壬¿ÉÒÔ·¢ÏÖ£¬¾ÍÊÇbind references to
a schema ºÍ evalµÄ¹ý³Ì£º
½«Ò»¸öRowת»»ÎªÁíÒ»¸öÒѾ¶¨ÒåºÃschema columnµÄRow¡£
Èç¹ûÊäÈëµÄRowÒѾÓÐSchemaÁË£¬Ôò´«ÈëµÄSeq[Expression]Ò²»áboundµ½µ±Ç°µÄSchema¡£
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) { def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema private[this] val exprArray = expressions.toArray private[this] val mutableRow = new GenericMutableRow(exprArray.size) //еÄRow def currentValue: Row = mutableRow def apply(input: Row): Row = { var i = 0 while (i < exprArray.length) { mutableRow(i) = exprArray(i).eval(input) //¸ù¾ÝÊäÈëµÄinput£¬¼´Ò»¸öRow£¬¼ÆËãÉú³ÉµÄRow i += 1 } mutableRow //·µ»ØÐµÄRow } } |
1.2¡¢Filter
FilterµÄ¾ßÌåʵÏÖÊÇ´«ÈëµÄcondition½øÐжÔinput rowµÄeval¼ÆË㣬×îºó·µ»ØµÄÊÇÒ»¸öBooleanÀàÐÍ£¬Èç¹û±í´ïʽ¼ÆËã³É¹¦£¬·µ»Øtrue£¬ÔòÕâ¸ö·ÖÇøµÄÕâÌõÊý¾Ý¾Í»á±£´æÏÂÀ´£¬·ñÔò»á¹ýÂ˵ô¡£
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { override def output = child.output override def execute() = child.execute().mapPartitions { iter => iter.filter(condition.eval(_).asInstanceOf[Boolean]) //¼ÆËã±í´ïʽ eval(input row) } } |
1.3¡¢Sample
SampleÈ¡Ñù²Ù×÷ÆäʵÊǵ÷ÓÃÁËchild.execute()µÄ½á¹ûºó£¬·µ»ØµÄÊÇÒ»¸öRDD£¬¶ÔÕâ¸öRDDµ÷ÓÃÆäsampleº¯Êý£¬ÔÉú·½·¨¡£
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan) extends UnaryNode { override def output = child.output // TODO: How to pick seed? override def execute() = child.execute().sample(withReplacement, fraction, seed) } |
1.4¡¢Union
Union²Ù×÷Ö§³Ö¶à¸ö×Ó²éѯµÄUnion£¬ËùÒÔ´«ÈëµÄchildÊÇÒ»¸öSeq[SparkPlan]
execute()·½·¨µÄʵÏÖÊÇ¶ÔÆäËùÓеÄchildren£¬Ã¿Ò»¸ö½øÐÐexecute()£¬¼´select²éѯµÄ½á¹û¼¯ºÏRDD¡£
ͨ¹ýµ÷ÓÃSparkContextµÄunion·½·¨£¬½«ËùÓÐ×Ó²éѯµÄ½á¹ûºÏ²¢ÆðÀ´¡£
case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes override def output = children.head.output override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //×Ó²éѯµÄ½á¹û½øÐÐunion override def otherCopyArgs = sqlContext :: Nil } |
1.5¡¢Limit
Limit²Ù×÷ÔÚRDDµÄÔÉúAPIÀïÒ²ÓУ¬¼´take().
µ«ÊÇLimitµÄʵÏÖ·Ö2ÖÖÇé¿ö£º
µÚÒ»ÖÖÊÇ limit×÷Ϊ½áβµÄ²Ù×÷·û£¬¼´select xxx from yyy limit zzz¡£
²¢ÇÒÊDZ»executeCollectµ÷Óã¬ÔòÖ±½ÓÔÚdriverÀïʹÓÃtake·½·¨¡£
µÚ¶þÖÖÊÇ limit²»ÊÇ×÷Ϊ½áβµÄ²Ù×÷·û£¬¼´limitºóÃæ»¹Óвéѯ£¬ÄÇô¾ÍÔÚÿ¸ö·ÖÇøµ÷ÓÃlimit£¬×îºórepartitionµ½Ò»¸ö·ÖÇøÀ´¼ÆËãglobal
limit.
case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext) extends UnaryNode { // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again override def otherCopyArgs = sqlContext :: Nil override def output = child.output override def executeCollect() = child.execute().map(_.copy()).take(limit) //Ö±½ÓÔÚdriverµ÷ÓÃtake override def execute() = { val rdd = child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Boolean, Row]() iter.take(limit).map(row => mutablePair.update(false, row)) //ÿ¸ö·ÖÇøÏȼÆËãlimit } val part = new HashPartitioner(1) val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //ÐèÒªshuffle£¬À´repartition shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) shuffled.mapPartitions(_.take(limit).map(_._2)) //×îºóµ¥¶ÀÒ»¸öpartitionÀ´take limit } } |
1.6¡¢TakeOrdered
TakeOrderedÊǾ¹ýÅÅÐòºóµÄlimit N£¬Ò»°ãÊÇÓÃÔÚsort by ²Ù×÷·ûºóµÄlimit¡£
¿ÉÒÔ¼òµ¥Àí½âΪTopN²Ù×÷·û¡£
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) (@transient sqlContext: SQLContext) extends UnaryNode { override def otherCopyArgs = sqlContext :: Nil override def output = child.output @transient lazy val ordering = new RowOrdering(sortOrder) //ÕâÀïÊÇͨ¹ýRowOrderingÀ´ÊµÏÖÅÅÐòµÄ override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1) } |
1.7¡¢Sort
SortÒ²ÊÇͨ¹ýRowOrderingÕâ¸öÀàÀ´ÊµÏÖÅÅÐòµÄ£¬child.execute()¶Ôÿ¸ö·ÖÇø½øÐÐmap£¬Ã¿¸ö·ÖÇø¸ù¾ÝRowOrderingµÄorderÀ´½øÐÐÅÅÐò£¬Éú³ÉÒ»¸öеÄÓÐÐò¼¯ºÏ¡£
Ò²ÊÇͨ¹ýµ÷ÓÃSpark RDDµÄsorted·½·¨À´ÊµÏֵġ£
case class Sort( sortOrder: Seq[SortOrder], global: Boolean, child: SparkPlan) extends UnaryNode { override def requiredChildDistribution = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil @transient lazy val ordering = new RowOrdering(sortOrder) //ÅÅÐò˳Ðò override def execute() = attachTree(this, "sort") { // TODO: Optimize sorting operation? child.execute() .mapPartitions( iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator,
//ÿ¸ö·ÖÇøµ÷ÓÃsorted·½·¨£¬´«Èë
<span style="font-family: Arial, Helvetica, sans-serif;">orderingÅÅÐò¹æÔò£¬½øÐÐÅÅÐò</span> preservesPartitioning = true) } override def output = child.output } |
1.8¡¢ExistingRdd
ExistingRddÊÇ
object ExistingRdd { def convertToCatalyst(a: Any): Any = a match { case o: Option[_] => o.orNull case s: Seq[Any] => s.map(convertToCatalyst) case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray) case other => other } def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = { data.mapPartitions { iterator => if (iterator.isEmpty) { Iterator.empty } else { val bufferedIterator = iterator.buffered val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity) bufferedIterator.map { r => var i = 0 while (i < mutableRow.length) { mutableRow(i) = convertToCatalyst(r.productElement(i)) i += 1 } mutableRow } } } } def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = { ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd)) } } |
¶þ¡¢ Join Related Operators
HashJoin£º
ÔÚ½²½âJoin Related Operator֮ǰ£¬ÓбØÒªÁ˽âÒ»ÏÂHashJoinÕâ¸öλÓÚexecution°üϵÄjoins.scalaÎļþÀïµÄtrait¡£
Join²Ù×÷Ö÷Òª°üº¬BroadcastHashJoin¡¢LeftSemiJoinHash¡¢ShuffledHashJoin¾ùʵÏÖÁËHashJoinÕâ¸ötrait.
Ö÷ÒªÀàͼÈçÏÂ:
HashJoinÕâ¸ötraitµÄÖ÷Òª³ÉÔ±ÓУº
buildSideÊÇ×óÁ¬½Ó»¹ÊÇÓÒÁ¬½Ó£¬ÓÐÒ»ÖÖ»ù×¼µÄÒâ˼¡£
leftKeysÊÇ×óº¢×ÓµÄexpressions, rightKeysÊÇÓÒº¢×ÓµÄexpressions¡£
leftÊÇ×óº¢×ÓÎïÀí¼Æ»®£¬rightÊÇÓÒº¢×ÓÎïÀí¼Æ»®¡£
buildSideKeyGeneratorÊÇÒ»¸öProjectionÊǸù¾Ý´«ÈëµÄRow¶ÔÏóÀ´¼ÆËãbuildSideµÄExpressionµÄ¡£
streamSideKeyGeneratorÊÇÒ»¸öMutableProjectionÊǸù¾Ý´«ÈëµÄRow¶ÔÏóÀ´¼ÆËãstreamSideµÄExpressionµÄ¡£
ÕâÀïbuildSideÈç¹ûÊÇleftµÄ»°£¬¿ÉÒÔÀí½âΪbuildSideÊÇ×ó±í£¬ÄÇôȥÁ¬½ÓÕâ¸ö×ó±íµÄÓÒ±í¾ÍÊÇstreamSide¡£

HashJoin¹Ø¼üµÄ²Ù×÷ÊÇjoinIterators£¬¼òµ¥À´Ëµ¾ÍÊÇjoinÁ½¸ö±í£¬°Ñÿ¸ö±í¿´×ÅIterators[Row].
·½Ê½£º
1¡¢Ê×ÏȱéÀúbuildSide£¬¼ÆËãbuildKeysÈ»ºóÀûÓÃÒ»¸öHashMap£¬ÐÎ³É (buildKeys,
Iterators[Row])µÄ¸ñʽ¡£
2¡¢±éÀúStreamedSide£¬¼ÆËãstreamedKey£¬È¥HashMapÀïÃæÈ¥Æ¥Åäkey£¬À´½øÐÐjoin
3¡¢×îºóÉú³ÉÒ»¸öjoinRow£¬Õâ¸ö½«2¸örow¶Ô½Ó¡£
¼û´úÂë×¢ÊÍ£º
trait HashJoin { val leftKeys: Seq[Expression] val rightKeys: Seq[Expression] val buildSide: BuildSide val left: SparkPlan val right: SparkPlan lazy val (buildPlan, streamedPlan) = buildSide match
{ //ģʽƥÅ䣬½«physical plan·â×°ÐγÉTuple2£¬Èç¹ûÊÇbuildLeft£¬ÄÇô¾ÍÊÇ(left,right)£¬·ñÔòÊÇ(right,left) case BuildLeft => (left, right) case BuildRight => (right, left) } lazy val (buildKeys, streamedKeys) = buildSide match
{ //ģʽƥÅ䣬½«expression½øÐзâ×°<span style="font-family: Arial, Helvetica, sans-serif;">Tuple2</span> case BuildLeft => (leftKeys, rightKeys) case BuildRight => (rightKeys, leftKeys) } def output = left.output ++ right.output @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
//Éú³ÉbuildSideKeyÀ´¸ù¾ÝExpressionÀ´¼ÆËãRow·µ»Ø½á¹û @transient lazy val streamSideKeyGenerator =
//<span style="font-family: Arial, Helvetica, sans-serif;">Éú³É</span>
<span style="font-family: Arial, Helvetica, sans-serif;">streamSideKeyGenerator</span>
<span style="font-family: Arial, Helvetica, sans-serif;">À´¸ù¾ÝExpressionÀ´¼ÆËãRow·µ»Ø½á¹û</span> () => new MutableProjection(streamedKeys, streamedPlan.output) def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] =
{ //°Ñbuild±íµÄIterator[Row]ºÍstreamIterator[Row]½øÐÐjoin²Ù×÷·µ»ØJoinºóµÄIterator[Row] // TODO: Use Spark's HashMap implementation. val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() //Æ¥ÅäÖ÷ҪʹÓÃHashMapʵÏÖ var currentRow: Row = null // Create a mapping of buildKeys -> rows while (buildIter.hasNext)
{ //Ŀǰֻ¶Ôbuild Iterator½øÐеü´ú£¬
ÐγÉrowKey£¬Rows£¬ÀàËÆwordCount£¬µ«ÊÇÕâÀï²»ÊÇÀÛ¼ÓValue£¬¶øÊÇRowµÄ¼¯ºÏ¡£ currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) //¼ÆËãrowKey×÷ΪHashMapµÄkey if(!rowKey.anyNull) { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { val newMatchList = new ArrayBuffer[Row]() hashTable.put(rowKey, newMatchList) //(rowKey, matchedRowList) newMatchList } else { existingMatchList } matchList += currentRow.copy() //·µ»ØmatchList } } new Iterator[Row] { //×îºóÓÃstreamedRowµÄKeyÀ´Æ¥ÅäbuildSide¶ËµÄHashMap private[this] var currentStreamedRow: Row = _ private[this] var currentHashMatches: ArrayBuffer[Row] = _ private[this] var currentMatchPosition: Int = -1 // Mutable per row objects. private[this] val joinRow = new JoinedRow private[this] val joinKeys = streamSideKeyGenerator() override final def hasNext: Boolean = (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || (streamIter.hasNext && fetchNext()) override final def next() = { val ret = buildSide match { case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
//ÓÒÁ¬½ÓµÄ»°£¬streamedRow·Å×ó±ß£¬Æ¥Åäµ½µÄkeyµÄRow·Åµ½ÓÒ±í case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
//×óÁ¬½ÓµÄ»°£¬Ïà·´¡£ } currentMatchPosition += 1 ret } /** * Searches the streamed iterator for the next row that has at least one match in hashtable. * * @return true if the search is successful, and false if the streamed iterator runs out of * tuples. */ private final def fetchNext(): Boolean = { currentHashMatches = null currentMatchPosition = -1 while (currentHashMatches == null && streamIter.hasNext) { currentStreamedRow = streamIter.next() if (!joinKeys(currentStreamedRow).anyNull) { currentHashMatches = hashTable.get(joinKeys.currentValue)
//streamedRow´ÓbuildSideÀïµÄHashTableÀïÃæÆ¥ÅärowKey } } if (currentHashMatches == null) { false } else { currentMatchPosition = 0 true } } } } } |
joinRowµÄʵÏÖ£¬ÊµÏÖ2¸öRow¶Ô½Ó:
ʵ¼ÊÉϾÍÊÇÉú³ÉÒ»¸öеÄArray£¬½«2¸öArrayºÏ²¢¡£
class JoinedRow extends Row { private[this] var row1: Row = _ private[this] var row2: Row = _ ......... def copy() = { val totalSize = row1.size + row2.size val copiedValues = new Array[Any](totalSize) var i = 0 while(i < totalSize) { copiedValues(i) = apply(i) i += 1 } new GenericRow(copiedValues) //·µ»ØÒ»¸öеĺϲ¢ºóµÄRow } |
2.1¡¢LeftSemiJoinHash
left semi join£¬²»¶à˵ÁË£¬hiveÔçÆÚ°æ±¾ÀïÌæ´úINºÍEXISTS µÄ°æ±¾¡£
½«ÓÒ±íµÄjoin keys·Åµ½HashSetÀȻºó±éÀú×ó±í£¬²éÕÒ×ó±íµÄjoin keyÊÇ·ñÄÜÆ¥Åä¡£
case class LeftSemiJoinHash( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { val buildSide = BuildRight //buildSideÊÇÒÔÓÒ±íΪ»ù×¼ override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil override def output = left.output def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute())
{ (buildIter, streamIter) =>
//ÓÒ±íµÄÎïÀí¼Æ»®Ö´ÐкóÉú³ÉRDD£¬ÀûÓÃzipPartitions¶ÔPartition½øÐкϲ¢¡£È»ºóÓÃÉÏÊö·½·¨ÊµÏÖ¡£ val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null // Create a Hash set of buildKeys while (buildIter.hasNext) { currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) if(!rowKey.anyNull) { val keyExists = hashSet.contains(rowKey) if (!keyExists) { hashSet.add(rowKey) } } } val joinKeys = streamSideKeyGenerator() streamIter.filter(current => { !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue) }) } } } |
2.2¡¢BroadcastHashJoin
ÃûÔ¼£º ¹ã²¥HashJoin£¬ºÇºÇ¡£
ÊÇInnerHashJoinµÄʵÏÖ¡£ÕâÀïÓõ½ÁËconcurrent²¢·¢ÀïµÄfuture£¬Òì²½µÄ¹ã²¥buildPlanµÄ±íÖ´ÐкóµÄµÄRDD¡£
Èç¹û½ÓÊÕµ½Á˹㲥ºóµÄ±í£¬ÄÇô¾ÍÓÃstreamedPlanÀ´Æ¥ÅäÕâ¸ö¹ã²¥µÄ±í¡£
ʵÏÖÊÇRDDµÄmapPartitionsºÍHashJoinÀïµÄjoinIterators×îºóÉú³ÉjoinµÄ½á¹û¡£
case class BroadcastHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, left: SparkPlan, right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin { override def otherCopyArgs = sqlContext :: Nil override def outputPartitioning: Partitioning = left.outputPartitioning override def requiredChildDistribution = UnspecifiedDistribution :: UnspecifiedDistribution :: Nil @transient lazy val broadcastFuture = future { //ÀûÓÃSparkContext¹ã²¥±í sqlContext.sparkContext.broadcast(buildPlan.executeCollect()) } def execute() = { val broadcastRelation = Await.result(broadcastFuture, 5.minute) streamedPlan.execute().mapPartitions { streamedIter => joinIterators(broadcastRelation.value.iterator, streamedIter) //µ÷ÓÃjoinIterators¶Ôÿ¸ö·ÖÇømap } } } |
2.3¡¢ShuffleHashJoin
ShuffleHashJoin¹ËÃû˼Òå¾ÍÊÇÐèÒªshuffleÊý¾Ý£¬outputPartitioningÊÇ×óº¢×ӵĵÄPartitioning¡£
»á¸ù¾ÝÕâ¸öPartitioning½øÐÐshuffle¡£È»ºóÀûÓÃSparkContextÀïµÄzipPartitions·½·¨¶Ôÿ¸ö·ÖÇø½øÐÐzip¡£
ÕâÀïµÄrequiredChildDistribution£¬µÄÊÇClusteredDistribution£¬Õâ¸ö»áÔÚHashPartitioningÀïÃæ½øÐÐÆ¥Åä¡£
¹ØÓÚÕâÀïÃæµÄ·ÖÇøÕâÀﲻ׸Êö£¬¿ÉÒÔÈ¥org.apache.spark.sql.catalyst.plans.physicalϵÄpartitioningÀïÃæÈ¥²é¿´¡£
case class ShuffledHashJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], buildSide: BuildSide, left: SparkPlan, right: SparkPlan) extends BinaryNode with HashJoin { override def outputPartitioning: Partitioning = left.outputPartitioning override def requiredChildDistribution = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => joinIterators(buildIter, streamIter) } } } |
|