Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark SQL Ô´Âë·ÖÎöÖ®Physical Plan µ½ RDDµÄ¾ßÌåʵÏÖ
 
×÷Õß OopsOutOfMemory £¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-08-26
  4248  次浏览      27
 

½ÓÉÏһƪÎÄÕÂSpark SQL CatalystÔ´Âë·ÖÎöÖ®Physical Plan£¬±¾ÎĽ«½éÉÜPhysical PlanµÄtoRDDµÄ¾ßÌåʵÏÖϸ½Ú£º

ÎÒÃǶ¼ÖªµÀÒ»¶Îsql£¬ÕæÕýµÄÖ´ÐÐÊǵ±Äãµ÷ÓÃËüµÄcollect()·½·¨²Å»áÖ´ÐÐSpark Job£¬×îºó¼ÆËãµÃµ½RDD¡£

lazy val toRdd: RDD[Row] = executedPlan.execute()

Spark Plan»ù±¾°üº¬4ÖÖ²Ù×÷ÀàÐÍ£¬¼´BasicOperator»ù±¾ÀàÐÍ£¬»¹ÓоÍÊÇJoin¡¢AggregateºÍSortÕâÖÖÉÔ¸´Ôӵġ£

Èçͼ£º

Ò»¡¢BasicOperator

1.1¡¢Project

Project µÄ´óÖº¬ÒåÊÇ£º´«ÈëһϵÁбí´ïʽSeq[NamedExpression]£¬¸ø¶¨ÊäÈëµÄRow£¬¾­¹ýConvert£¨ExpressionµÄ¼ÆËãeval£©²Ù×÷£¬Éú³ÉÒ»¸öеÄRow¡£

ProjectµÄʵÏÖÊǵ÷ÓÃÆächild.execute()·½·¨£¬È»ºóµ÷ÓÃmapPartitions¶Ôÿһ¸öPartition½øÐвÙ×÷¡£

Õâ¸öfº¯ÊýÆäʵÊÇnewÁËÒ»¸öMutableProjection£¬È»ºóÑ­»·µÄ¶Ôÿ¸öpartition½øÐÐConvert¡£

case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {  
override def output = projectList.map(_.toAttribute)
override def execute() = child.execute().mapPartitions { iter => //¶Ôÿ¸ö·ÖÇø½øÐÐfÓ³Éä
@transient val reusableProjection = new MutableProjection(projectList)
iter.map(reusableProjection)
}
}

ͨ¹ý¹Û²ìMutableProjectionµÄ¶¨Ò壬¿ÉÒÔ·¢ÏÖ£¬¾ÍÊÇbind references to a schema ºÍ evalµÄ¹ý³Ì£º

½«Ò»¸öRowת»»ÎªÁíÒ»¸öÒѾ­¶¨ÒåºÃschema columnµÄRow¡£

Èç¹ûÊäÈëµÄRowÒѾ­ÓÐSchemaÁË£¬Ôò´«ÈëµÄSeq[Expression]Ò²»áboundµ½µ±Ç°µÄSchema¡£

case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {  
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema))) //bound schema

private[this] val exprArray = expressions.toArray
private[this] val mutableRow = new GenericMutableRow(exprArray.size) //еÄRow
def currentValue: Row = mutableRow
def apply(input: Row): Row = {
var i = 0
while (i < exprArray.length) {
mutableRow(i) = exprArray(i).eval(input) //¸ù¾ÝÊäÈëµÄinput£¬¼´Ò»¸öRow£¬¼ÆËãÉú³ÉµÄRow
i += 1
}
mutableRow //·µ»ØÐµÄRow
}
}

1.2¡¢Filter

FilterµÄ¾ßÌåʵÏÖÊÇ´«ÈëµÄcondition½øÐжÔinput rowµÄeval¼ÆË㣬×îºó·µ»ØµÄÊÇÒ»¸öBooleanÀàÐÍ£¬Èç¹û±í´ïʽ¼ÆËã³É¹¦£¬·µ»Øtrue£¬ÔòÕâ¸ö·ÖÇøµÄÕâÌõÊý¾Ý¾Í»á±£´æÏÂÀ´£¬·ñÔò»á¹ýÂ˵ô¡£

case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {  
override def output = child.output

override def execute() = child.execute().mapPartitions { iter =>
iter.filter(condition.eval(_).asInstanceOf[Boolean]) //¼ÆËã±í´ïʽ eval(input row)
}
}

1.3¡¢Sample

SampleÈ¡Ñù²Ù×÷ÆäʵÊǵ÷ÓÃÁËchild.execute()µÄ½á¹ûºó£¬·µ»ØµÄÊÇÒ»¸öRDD£¬¶ÔÕâ¸öRDDµ÷ÓÃÆäsampleº¯Êý£¬Ô­Éú·½·¨¡£

case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)  
extends UnaryNode
{
override def output = child.output

// TODO: How to pick seed?
override def execute() = child.execute().sample(withReplacement, fraction, seed)
}

1.4¡¢Union

Union²Ù×÷Ö§³Ö¶à¸ö×Ó²éѯµÄUnion£¬ËùÒÔ´«ÈëµÄchildÊÇÒ»¸öSeq[SparkPlan]

execute()·½·¨µÄʵÏÖÊÇ¶ÔÆäËùÓеÄchildren£¬Ã¿Ò»¸ö½øÐÐexecute()£¬¼´select²éѯµÄ½á¹û¼¯ºÏRDD¡£

ͨ¹ýµ÷ÓÃSparkContextµÄunion·½·¨£¬½«ËùÓÐ×Ó²éѯµÄ½á¹ûºÏ²¢ÆðÀ´¡£

case class Union(children: Seq[SparkPlan])(@transient sqlContext: SQLContext) extends SparkPlan {  
// TODO: attributes output by union should be distinct for nullability purposes
override def output = children.head.output
override def execute() = sqlContext.sparkContext.union(children.map(_.execute())) //×Ó²éѯµÄ½á¹û½øÐÐunion

override def otherCopyArgs = sqlContext :: Nil
}

1.5¡¢Limit

Limit²Ù×÷ÔÚRDDµÄÔ­ÉúAPIÀïÒ²ÓУ¬¼´take().

µ«ÊÇLimitµÄʵÏÖ·Ö2ÖÖÇé¿ö£º

µÚÒ»ÖÖÊÇ limit×÷Ϊ½áβµÄ²Ù×÷·û£¬¼´select xxx from yyy limit zzz¡£ ²¢ÇÒÊDZ»executeCollectµ÷Óã¬ÔòÖ±½ÓÔÚdriverÀïʹÓÃtake·½·¨¡£

µÚ¶þÖÖÊÇ limit²»ÊÇ×÷Ϊ½áβµÄ²Ù×÷·û£¬¼´limitºóÃæ»¹Óвéѯ£¬ÄÇô¾ÍÔÚÿ¸ö·ÖÇøµ÷ÓÃlimit£¬×îºórepartitionµ½Ò»¸ö·ÖÇøÀ´¼ÆËãglobal limit.

case class Limit(limit: Int, child: SparkPlan)(@transient sqlContext: SQLContext)  
extends UnaryNode {
// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
// partition local limit -> exchange into one partition -> partition local limit again

override def otherCopyArgs = sqlContext :: Nil

override def output = child.output

override def executeCollect() = child.execute().map(_.copy()).take(limit) //Ö±½ÓÔÚdriverµ÷ÓÃtake

override def execute() = {
val rdd = child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Boolean, Row]()
iter.take(limit).map(row => mutablePair.update(false, row)) //ÿ¸ö·ÖÇøÏȼÆËãlimit
}
val part = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Boolean, Row, Row, MutablePair[Boolean, Row]](rdd, part) //ÐèÒªshuffle£¬À´repartition
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
shuffled.mapPartitions(_.take(limit).map(_._2)) //×îºóµ¥¶ÀÒ»¸öpartitionÀ´take limit
}
}

1.6¡¢TakeOrdered

TakeOrderedÊǾ­¹ýÅÅÐòºóµÄlimit N£¬Ò»°ãÊÇÓÃÔÚsort by ²Ù×÷·ûºóµÄlimit¡£

¿ÉÒÔ¼òµ¥Àí½âΪTopN²Ù×÷·û¡£

case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)  
(@transient sqlContext: SQLContext) extends UnaryNode {
override def otherCopyArgs = sqlContext :: Nil

override def output = child.output

@transient
lazy val ordering = new RowOrdering(sortOrder) //ÕâÀïÊÇͨ¹ýRowOrderingÀ´ÊµÏÖÅÅÐòµÄ

override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ordering)

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
override def execute() = sqlContext.sparkContext.makeRDD(executeCollect(), 1)
}

1.7¡¢Sort

SortÒ²ÊÇͨ¹ýRowOrderingÕâ¸öÀàÀ´ÊµÏÖÅÅÐòµÄ£¬child.execute()¶Ôÿ¸ö·ÖÇø½øÐÐmap£¬Ã¿¸ö·ÖÇø¸ù¾ÝRowOrderingµÄorderÀ´½øÐÐÅÅÐò£¬Éú³ÉÒ»¸öеÄÓÐÐò¼¯ºÏ¡£

Ò²ÊÇͨ¹ýµ÷ÓÃSpark RDDµÄsorted·½·¨À´ÊµÏֵġ£

case class Sort(  
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

@transient
lazy val ordering = new RowOrdering(sortOrder) //ÅÅÐò˳Ðò

override def execute() = attachTree(this, "sort") {
// TODO: Optimize sorting operation?
child.execute()
.mapPartitions(
iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator, //ÿ¸ö·ÖÇøµ÷ÓÃsorted·½·¨£¬´«Èë <span style="font-family: Arial, Helvetica, sans-serif;">orderingÅÅÐò¹æÔò£¬½øÐÐÅÅÐò</span>
preservesPartitioning = true)
}

override def output = child.output
}

1.8¡¢ExistingRdd

ExistingRddÊÇ

object ExistingRdd {  
def convertToCatalyst(a: Any): Any = a match {
case o: Option[_] => o.orNull
case s: Seq[Any] => s.map(convertToCatalyst)
case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray)
case other => other
}

def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
data.mapPartitions { iterator =>
if (iterator.isEmpty) {
Iterator.empty
} else {
val bufferedIterator = iterator.buffered
val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)

bufferedIterator.map { r =>
var i = 0
while (i < mutableRow.length) {
mutableRow(i) = convertToCatalyst(r.productElement(i))
i += 1
}

mutableRow
}
}
}
}

def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
}
}

¶þ¡¢ Join Related Operators

HashJoin£º

ÔÚ½²½âJoin Related Operator֮ǰ£¬ÓбØÒªÁ˽âÒ»ÏÂHashJoinÕâ¸öλÓÚexecution°üϵÄjoins.scalaÎļþÀïµÄtrait¡£

Join²Ù×÷Ö÷Òª°üº¬BroadcastHashJoin¡¢LeftSemiJoinHash¡¢ShuffledHashJoin¾ùʵÏÖÁËHashJoinÕâ¸ötrait.

Ö÷ÒªÀàͼÈçÏÂ:

 

HashJoinÕâ¸ötraitµÄÖ÷Òª³ÉÔ±ÓУº

buildSideÊÇ×óÁ¬½Ó»¹ÊÇÓÒÁ¬½Ó£¬ÓÐÒ»ÖÖ»ù×¼µÄÒâ˼¡£

leftKeysÊÇ×óº¢×ÓµÄexpressions, rightKeysÊÇÓÒº¢×ÓµÄexpressions¡£

leftÊÇ×óº¢×ÓÎïÀí¼Æ»®£¬rightÊÇÓÒº¢×ÓÎïÀí¼Æ»®¡£

buildSideKeyGeneratorÊÇÒ»¸öProjectionÊǸù¾Ý´«ÈëµÄRow¶ÔÏóÀ´¼ÆËãbuildSideµÄExpressionµÄ¡£

streamSideKeyGeneratorÊÇÒ»¸öMutableProjectionÊǸù¾Ý´«ÈëµÄRow¶ÔÏóÀ´¼ÆËãstreamSideµÄExpressionµÄ¡£

ÕâÀïbuildSideÈç¹ûÊÇleftµÄ»°£¬¿ÉÒÔÀí½âΪbuildSideÊÇ×ó±í£¬ÄÇôȥÁ¬½ÓÕâ¸ö×ó±íµÄÓÒ±í¾ÍÊÇstreamSide¡£

HashJoin¹Ø¼üµÄ²Ù×÷ÊÇjoinIterators£¬¼òµ¥À´Ëµ¾ÍÊÇjoinÁ½¸ö±í£¬°Ñÿ¸ö±í¿´×ÅIterators[Row].

·½Ê½£º

1¡¢Ê×ÏȱéÀúbuildSide£¬¼ÆËãbuildKeysÈ»ºóÀûÓÃÒ»¸öHashMap£¬ÐÎ³É (buildKeys, Iterators[Row])µÄ¸ñʽ¡£

2¡¢±éÀúStreamedSide£¬¼ÆËãstreamedKey£¬È¥HashMapÀïÃæÈ¥Æ¥Åäkey£¬À´½øÐÐjoin

3¡¢×îºóÉú³ÉÒ»¸öjoinRow£¬Õâ¸ö½«2¸örow¶Ô½Ó¡£

¼û´úÂë×¢ÊÍ£º

trait HashJoin {  
val leftKeys: Seq[Expression]
val rightKeys: Seq[Expression]
val buildSide: BuildSide
val left: SparkPlan
val right: SparkPlan

lazy val (buildPlan, streamedPlan) = buildSide match { //ģʽƥÅ䣬½«physical plan·â×°ÐγÉTuple2£¬Èç¹ûÊÇbuildLeft£¬ÄÇô¾ÍÊÇ(left,right)£¬·ñÔòÊÇ(right,left)
case BuildLeft => (left, right)
case BuildRight => (right, left)
}

lazy val (buildKeys, streamedKeys) = buildSide match { //ģʽƥÅ䣬½«expression½øÐзâ×°<span style="font-family: Arial, Helvetica, sans-serif;">Tuple2</span>

case BuildLeft => (leftKeys, rightKeys)
case BuildRight => (rightKeys, leftKeys)
}

def output = left.output ++ right.output

@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) //Éú³ÉbuildSideKeyÀ´¸ù¾ÝExpressionÀ´¼ÆËãRow·µ»Ø½á¹û
@transient lazy val streamSideKeyGenerator = //<span style="font-family: Arial, Helvetica, sans-serif;">Éú³É</span> <span style="font-family: Arial, Helvetica, sans-serif;">streamSideKeyGenerator</span> <span style="font-family: Arial, Helvetica, sans-serif;">À´¸ù¾ÝExpressionÀ´¼ÆËãRow·µ»Ø½á¹û</span>
() => new MutableProjection(streamedKeys, streamedPlan.output)

def joinIterators(buildIter: Iterator[Row], streamIter: Iterator[Row]): Iterator[Row] = { //°Ñbuild±íµÄIterator[Row]ºÍstreamIterator[Row]½øÐÐjoin²Ù×÷·µ»ØJoinºóµÄIterator[Row]
// TODO: Use Spark's HashMap implementation.

val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]() //Æ¥ÅäÖ÷ҪʹÓÃHashMapʵÏÖ
var currentRow: Row = null

// Create a mapping of buildKeys -> rows
while (buildIter.hasNext) { //Ŀǰֻ¶Ôbuild Iterator½øÐеü´ú£¬ ÐγÉrowKey£¬Rows£¬ÀàËÆwordCount£¬µ«ÊÇÕâÀï²»ÊÇÀÛ¼ÓValue£¬¶øÊÇRowµÄ¼¯ºÏ¡£
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow) //¼ÆËãrowKey×÷ΪHashMapµÄkey
if(!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
val matchList = if (existingMatchList == null) {
val newMatchList = new ArrayBuffer[Row]()
hashTable.put(rowKey, newMatchList) //(rowKey, matchedRowList)
newMatchList
} else {
existingMatchList
}
matchList += currentRow.copy() //·µ»ØmatchList
}
}

new Iterator[Row] { //×îºóÓÃstreamedRowµÄKeyÀ´Æ¥ÅäbuildSide¶ËµÄHashMap
private[this] var currentStreamedRow: Row = _
private[this] var currentHashMatches: ArrayBuffer[Row] = _
private[this] var currentMatchPosition: Int = -1

// Mutable per row objects.
private[this] val joinRow = new JoinedRow

private[this] val joinKeys = streamSideKeyGenerator()

override final def hasNext: Boolean =
(currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) ||
(streamIter.hasNext && fetchNext())

override final def next() = {
val ret = buildSide match {
case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) //ÓÒÁ¬½ÓµÄ»°£¬streamedRow·Å×ó±ß£¬Æ¥Åäµ½µÄkeyµÄRow·Åµ½ÓÒ±í
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) //×óÁ¬½ÓµÄ»°£¬Ïà·´¡£
}
currentMatchPosition += 1
ret
}

/**
* Searches the streamed iterator for the next row that has at least one match in hashtable.
*
* @return true if the search is successful, and false if the streamed iterator runs out of
* tuples.
*/
private final def fetchNext(): Boolean = {
currentHashMatches = null
currentMatchPosition = -1

while (currentHashMatches == null && streamIter.hasNext) {
currentStreamedRow = streamIter.next()
if (!joinKeys(currentStreamedRow).anyNull) {
currentHashMatches = hashTable.get(joinKeys.currentValue) //streamedRow´ÓbuildSideÀïµÄHashTableÀïÃæÆ¥ÅärowKey
}
}

if (currentHashMatches == null) {
false
} else {
currentMatchPosition = 0
true
}
}
}
}
}

joinRowµÄʵÏÖ£¬ÊµÏÖ2¸öRow¶Ô½Ó:

ʵ¼ÊÉϾÍÊÇÉú³ÉÒ»¸öеÄArray£¬½«2¸öArrayºÏ²¢¡£

class JoinedRow extends Row {  
private[this] var row1: Row = _
private[this] var row2: Row = _
.........
def copy() = {
val totalSize = row1.size + row2.size
val copiedValues = new Array[Any](totalSize)
var i = 0
while(i < totalSize) {
copiedValues(i) = apply(i)
i += 1
}
new GenericRow(copiedValues) //·µ»ØÒ»¸öеĺϲ¢ºóµÄRow
}

2.1¡¢LeftSemiJoinHash

left semi join£¬²»¶à˵ÁË£¬hiveÔçÆÚ°æ±¾ÀïÌæ´úINºÍEXISTS µÄ°æ±¾¡£

½«ÓÒ±íµÄjoin keys·Åµ½HashSetÀȻºó±éÀú×ó±í£¬²éÕÒ×ó±íµÄjoin keyÊÇ·ñÄÜÆ¥Åä¡£

case class LeftSemiJoinHash(  
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashJoin {

val buildSide = BuildRight //buildSideÊÇÒÔÓÒ±íΪ»ù×¼

override def requiredChildDistribution =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

override def output = left.output

def execute() = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //ÓÒ±íµÄÎïÀí¼Æ»®Ö´ÐкóÉú³ÉRDD£¬ÀûÓÃzipPartitions¶ÔPartition½øÐкϲ¢¡£È»ºóÓÃÉÏÊö·½·¨ÊµÏÖ¡£
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null

// Create a Hash set of buildKeys
while (buildIter.hasNext) {
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow)
if(!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
if (!keyExists) {
hashSet.add(rowKey)
}
}
}

val joinKeys = streamSideKeyGenerator()
streamIter.filter(current => {
!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
})
}
}
}

2.2¡¢BroadcastHashJoin

ÃûÔ¼£º ¹ã²¥HashJoin£¬ºÇºÇ¡£

ÊÇInnerHashJoinµÄʵÏÖ¡£ÕâÀïÓõ½ÁËconcurrent²¢·¢ÀïµÄfuture£¬Òì²½µÄ¹ã²¥buildPlanµÄ±íÖ´ÐкóµÄµÄRDD¡£

Èç¹û½ÓÊÕµ½Á˹㲥ºóµÄ±í£¬ÄÇô¾ÍÓÃstreamedPlanÀ´Æ¥ÅäÕâ¸ö¹ã²¥µÄ±í¡£

ʵÏÖÊÇRDDµÄmapPartitionsºÍHashJoinÀïµÄjoinIterators×îºóÉú³ÉjoinµÄ½á¹û¡£

case class BroadcastHashJoin(  
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
buildSide: BuildSide,
left: SparkPlan,
right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {

override def otherCopyArgs = sqlContext :: Nil

override def outputPartitioning: Partitioning = left.outputPartitioning

override def requiredChildDistribution =
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil

@transient
lazy val broadcastFuture = future { //ÀûÓÃSparkContext¹ã²¥±í
sqlContext.sparkContext.broadcast(buildPlan.executeCollect())
}

def execute() = {
val broadcastRelation = Await.result(broadcastFuture, 5.minute)

streamedPlan.execute().mapPartitions { streamedIter =>
joinIterators(broadcastRelation.value.iterator, streamedIter) //µ÷ÓÃjoinIterators¶Ôÿ¸ö·ÖÇømap
}
}
}

2.3¡¢ShuffleHashJoin

ShuffleHashJoin¹ËÃû˼Òå¾ÍÊÇÐèÒªshuffleÊý¾Ý£¬outputPartitioningÊÇ×óº¢×ӵĵÄPartitioning¡£

»á¸ù¾ÝÕâ¸öPartitioning½øÐÐshuffle¡£È»ºóÀûÓÃSparkContextÀïµÄzipPartitions·½·¨¶Ôÿ¸ö·ÖÇø½øÐÐzip¡£

ÕâÀïµÄrequiredChildDistribution£¬µÄÊÇClusteredDistribution£¬Õâ¸ö»áÔÚHashPartitioningÀïÃæ½øÐÐÆ¥Åä¡£

¹ØÓÚÕâÀïÃæµÄ·ÖÇøÕâÀﲻ׸Êö£¬¿ÉÒÔÈ¥org.apache.spark.sql.catalyst.plans.physicalϵÄpartitioningÀïÃæÈ¥²é¿´¡£

case class ShuffledHashJoin(  
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
buildSide: BuildSide,
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashJoin {

override def outputPartitioning: Partitioning = left.outputPartitioning

override def requiredChildDistribution =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

def execute() = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) {
(buildIter, streamIter) => joinIterators(buildIter, streamIter)
}
}
}
   
4248 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí