±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚshiyanjun.cn£¬ÎÄÖÐÊÇÒ»¸ö¹ý³Ì·ÖÎö£¬´úÂ뼰ͼʾ½ÏΪÇåÎú£¬ÏàÐÅ´ó¼Ò»áÓиöȫеÄÈÏʶ¡£ |
|
SparkÔÚMap½×¶Îµ÷¶ÈÔËÐеÄShuffleMapTask£¬×îºó»áÉú³É.dataºÍ.indexÎļþ£¬¿ÉÒÔͨ¹ýÎÒµÄÕâÆªÎÄÕÂ
Spark Shuffle¹ý³Ì·ÖÎö£ºMap½×¶Î´¦ÀíÁ÷³Ì Á˽â¾ßÌåÁ÷³ÌºÍÏêÇ顣ͬʱ£¬ÔÚExecutorÉÏÔËÐÐÒ»¸öShuffleMapTask£¬·µ»ØÁËÒ»¸öMapStatus¶ÔÏó£¬ÏÂÃæÊÇShuffleMapTaskÖ´Ðк󷵻ؽá¹ûµÄÏà¹Ø´úÂëÆ¬¶Î£º
var writer: ShuffleWriter[Any,
Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any] (dep.shuffleHandle,
partitionId, context)
writer.write(rdd.iterator (partition, context).asInstanceOf[Iterator[_
<: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
} |
Èç¹ûShuffleMapTaskÖ´Ðйý³ÌûÓз¢ÉúÒì³££¬Ôò×îºóÖ´Ðеĵ÷ÓÃΪ£º
writer.stop(success
= true).get |
ÕâÀï·µ»ØÁËÒ»¸öMapStatusÀàÐ͵ĶÔÏó£¬MapStatusµÄ¶¨ÒåÈçÏÂËùʾ£º
private[spark]
sealed trait MapStatus {
def location: BlockManagerId
def getSizeForBlock(reduceId: Int): Long
} |
ÆäÖаüº¬ÁËÔËÐÐShuffleMapTaskËùÔÚµÄBlockManagerµÄµØÖ·£¬ÒÔ¼°ºóÐøReduce½×¶Îÿ¸öResultTask¼ÆËãÐèÒªMapÊä³öµÄ´óС£¨Size£©¡£ÎÒÃÇ¿ÉÒÔ¿´ÏÂMapStatusÈçºÎ´´½¨µÄ£¬ÔÚSortShuffleWriterµÄwrite()·½·¨ÖУ¬¿ÉÒÔ¿´µ½MapStatusµÄ´´½¨£¬ÈçÏ´úÂëËùʾ£º
mapStatus =
MapStatus(blockManager.shuffleServerId, partitionLengths) |
¼ÌÐø¸ú×Ù¿ÉÒÔ¿´µ½£¬µ÷ÓÃÁËMapStatusµÄ°éÉú¶ÔÏóµÄapply()·½·¨£º
def apply(loc:
BlockManagerId, uncompressedSizes: Array[Long]):
MapStatus = {
if (uncompressedSizes.length > 2000) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
} |
uncompressedSizes±íʾPartitionµÄ¸öÊý£¬Èç¹û´óÓÚ2000Ôò´´½¨HighlyCompressedMapStatus¶ÔÏ󣬷ñÔò´´½¨CompressedMapStatus¶ÔÏó£¬ËûÃǾßÌåµÄʵÏÖ¿ÉÒԲο¼Ô´Âë¡£
º¬ÓÐShuffle¹ý³ÌµÄSpark ApplicationʾÀý
ÎÒÃÇÏȸø³öÒ»¸ö¼òµ¥µÄSpark Application³ÌÐò´úÂ룬ÈçÏÂËùʾ£º
val rdd = sc.textFile("/temp/*.h")
val finalRdd = rdd.flatMap(line => line.split("\\s+")).map(w
=> (w, 1)).reduceByKey(_ + _)
finalRdd.toDebugString
finalRdd.saveAsTextFile("/temp/output") |
ͨ¹ýRDDµÄtoDebugString()·½·¨£¬´òÓ¡µ÷ÊÔÐÅÏ¢£º
scala> finalRdd.toDebugString
res0: String =
(133) ShuffledRDD[6] at reduceByKey at <console>:30
[]
+-(133) MapPartitionsRDD[5] at map at <console>:30
[]
| MapPartitionsRDD[4] at flatMap at <console>:30
[]
| /temp/*.h MapPartitionsRDD[3] at textFile at
<console>:29 []
| /temp/*.h HadoopRDD[2] at textFile at <console>:29
[] |
¿ÉÒÔ¿´µ½Õâ¸ö¹ý³ÌÖУ¬µ÷ÓÃÁËreduceByKey()£¬´´½¨ÁËÒ»¸öShuffledRDD£¬ÕâÔÚ¼ÆËã¹ý³ÌÖлáÖ´ÐÐShuffle²Ù×÷¡£
ShuffleMapTaskÖ´Ðнá¹ûÉϱ¨´¦ÀíÁ÷³Ì
Spark ApplicationÌá½»ÒԺ󣬻áÉú³ÉShuffleMapStageºÍ/»òResultStage£¬¶øÒ»¸öShuffleMapStage¶ÔÓ¦Ò»×éʵ¼ÊÐèÒªÔËÐеÄShuffleMapTask£¬ResultStage¶ÔÓ¦Ò»×éʵ¼ÊÐèÒªÔËÐÐResultTask£¬Ã¿×éTask¶¼ÊÇÓÐTaskSetManagerÀ´¹ÜÀíµÄ£¬²¢ÇÒÖ»ÓÐShuffleMapStage¶ÔÓ¦µÄÒ»×éShuffleMapTask¶¼ÔËÐгɹ¦½áÊøÒԺ󣬲Żáµ÷¶ÈResultStage¡£
ËùÒÔ£¬ÎÒÃÇÕâÀï¹Ø×¢µÄÊÇ£¬µ±ShuffleMapStageÖÐ×îºóÒ»¸öShuffleMapTaskÔËÐгɹ¦ºó£¬ÈçºÎ½«Map½×¶ÎµÄÐÅÏ¢Éϱ¨¸øµ÷¶ÈÆ÷£¨DriverÉϵÄTaskSchedulerºÍDAGScheduler£©£¬Á˽âÕâ¸ö´¦ÀíÁ÷³Ì¶ÔÀí½âºóÐøµÄReduce½×¶Î´¦ÀíÖÁ¹ØÖØÒª£¬Õâ¸ö¹ý³ÌµÄÏêϸ´¦ÀíÁ÷³Ì£¬ÈçÏÂͼËùʾ£º

ÎÒÃǽ«Õû¸öÁ÷³Ì°´ÕÕ˳Ðò·ÖΪÈçϼ¸¸ö¹ý³ÌÀ´ÃèÊö£º
ShuffleMapTaskÍê³Éºó´¦Àí½á¹û
Executor»áÆô¶¯Ò»¸öTaskRunnerÏß³ÌÀ´ÔËÐÐShuffleMapTask£¬ShuffleMapTaskÍê³Éºó£¬»á¶Ô½á¹û½øÐÐÐòÁл¯´¦Àí£¬´úÂëÈçÏÂËùʾ£º
val directResult
= new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit |
¸ù¾ÝÐòÁл¯ºó½á¹ûserializedDirectResultµÄ´óСresultSize£¬»á½øÐÐһЩÓÅ»¯£¬´úÂëÈçÏÂËùʾ£º
val serializedResult:
ByteBuffer = {
if (maxResultSize > 0 && resultSize
> maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId).
Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} >
${Utils.bytesToString(maxResultSize)}), "
+
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId),
resultSize))
} else if (resultSize > maxDirectResultSize)
{
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize
bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId,
resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId).
$resultSize bytes result sent to driver")
serializedDirectResult
}
} |
Èç¹û½á¹û´óСûÓг¬¹ýÖ¸¶¨µÄDirectTaskResultµÄ×î´óÏÞÖÆÖµmaxDirectResultSize£¬¾ÍÖ±½Ó½«ÉÏÃæµÄDirectTaskResultµÄÐòÁл¯½á¹û·¢Ë͸øDriver£»Èç¹û½á¹û´óС³¬¹ýÁËTask½á¹ûµÄ×î´óÏÞÖÆÖµmaxResultSize£¬ÔòÖ±½Ó¶ªÆú½á¹û£»·ñÔò£¬µ±½á¹û´óС½éÓÚmaxDirectResultSizeÓëmaxResultSizeÖ®¼äʱ£¬»á»ùÓÚTask
ID´´½¨Ò»¸öTaskResultBlockId£¬È»ºóͨ¹ýBlockManager½«½á¹ûÔÝʱ±£´æÔÚExecutorÉÏ£¨DiskStore»òMemoryStore£©£¬ÒÔ±ãºóÐø¼ÆËãÖ±½ÓÇëÇó»ñÈ¡¸ÃÊý¾Ý¡£
×îºó£¬½á¹û»áµ÷ÓÃCoarseGrainedExecutorBackendµÄstatusUpdate·½·¨£¬ÈçÏÂËùʾ£º
execBackend.statusUpdate(taskId,
TaskState.FINISHED, serializedResult) |
½«Task¶ÔÓ¦µÄÔËÐÐ״̬¡¢ÔËÐнá¹û·¢Ë͸øDriver¡£
Driver»ñÈ¡TaskÔËÐнá¹û
¼¯ÈºÄ£Ê½Ï£¬Driver¶Ë¸ºÔð½ÓÊÕTaskÔËÐнá¹ûµÄÊÇCoarseGrainedSchedulerBackend£¬ËüÄÚ²¿ÓÐÒ»¸öDriverEndpointÀ´¸ºÔðʵ¼ÊÍøÂçͨÐÅ£¬ÒÔ¼°½ÓÊÕTask״̬¼°Æä½á¹û£¬´úÂëÈçÏÂËùʾ£º
case StatusUpdate(executorId,
taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about
the executor.
logWarning(s"Ignored task status update ($taskId
state $state) " +
s"from unknown executor with ID $executorId")
}
} |
Èç¹ûÏûÏ¢ÀàÐÍΪStatusUpdate£¬ÔòÊ×ÏÈÖ±½Óµ÷ÓÃÁËTaskSchedulerImplµÄstatusUpdate()·½·¨£¬À´»ñÈ¡TaskµÄÔËÐÐ״̬¼°Æä½á¹û£¬´úÂëÈçÏÂËùʾ£º
if (TaskState.isFinished(state))
{
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet,
tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED,
TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid,
state, serializedData)
}
} |
Èç¹ûTask״̬ÊÇTaskState.FINISHED£¬Ôòͨ¹ýTaskResultGetterÀ´»ñÈ¡TaskÔËÐзµ»ØµÄ½á¹û£¬ÕâÀï´æÔÚDirectTaskResultºÍIndirectTaskResultÁ½ÖÖÀàÐ͵Ľá¹û£¬ËûÃǵĴ¦Àí·½Ê½²»Í¬£º¶ÔÓÚDirectTaskResultÀàÐ͵Ľá¹û£¬ÈçÏÂËùʾ£º
case directResult:
DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResultsv (serializedData.limit()))
{
return
}
// deserialize "value" without holding
any lock so that it won't block other threads.
directResult.value (taskResultSerializer.get()) |
Ö±½Ó´ÓDirectTaskResultÖоͿÉÒÔͨ¹ý·´ÐòÁл¯µÃµ½½á¹û£¬¶ø¶ÔÓÚIndirectTaskResultÀàÐ͵Ľá¹û£¬Âß¼Ïà¶Ô¸´ÔÓһЩ£¬ÈçÏÂËùʾ£º
case IndirectTaskResult(blockId,
size) =>
... ...
scheduler.handleTaskGettingResult(taskSetManager,
tid)
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
... ...
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get.toByteBuffer)
// force deserialization of referenced value
deserializedResult.value(taskResultSerializer.get())
sparkEnv.blockManager.master.removeBlock(blockId) |
½á¹û´óС³¬¹ýÖ¸¶¨µÄÏÞÖÆÖµ£¬ÔÚShuffleMapTaskÔËÐйý³ÌÖлáÖ±½Óͨ¹ýBlockManager´æ´¢µ½ExecutorµÄÄÚ´æ/´ÅÅÌÉÏ£¬ÕâÀï¾Í»á¸ù¾Ý½á¹ûBlock
ID£¬Í¨¹ýBlockManagerÀ´»ñÈ¡µ½½á¹û¶ÔÓ¦µÄBlockÊý¾Ý¡£
¸üÐÂDriver¶ËTask¡¢Stage״̬£¬²¢µ÷¶ÈStageÔËÐÐ
»ñÈ¡µ½ShuffleMapTaskÔËÐеĽá¹ûÊý¾Ýºó£¬ÐèÒª¸üÐÂTaskSetManagerÖжÔÓ¦µÄ״̬ÐÅÏ¢£¬ÒÔ±ãΪºóÐøµ÷¶ÈTaskÔËÐÐÌṩ¾ö²ßÖ§³Ö£¬´úÂëÈçÏÂËùʾ£º
scheduler.handleSuccessfulTask(taskSetManager,
tid, result) |
ÉÏÃæ´úÂëµ÷ÓÃÁËTaskSetManagerµÄhandleSuccessfulTask()·½·¨£¬¸üÐÂÏà¹Ø×´Ì¬£¬Í¬Ê±¼ÌÐø¸üÐÂDAGSchedulerÖжÔÓ¦µÄ״̬£¬´úÂëÆ¬¶ÎÈçÏÂËùʾ£º
sched.dagScheduler.taskEnded(tasks(index),
Success, result.value(), result.accumUpdates,
info)
maybeFinishTaskSet() |
µ÷ÓÃDAGSchedulerµÄtaskEnded()·½·¨£¬¸üÐÂStageÐÅÏ¢¡£Èç¹ûÒ»¸öShuffleMapTaskÔËÐÐÍê³Éºó£¬¶øÇÒÊǶÔÓ¦µÄShuffleMapStageÖÐ×îºóÒ»¸öShuffleMapTask£¬Ôò¸ÃShuffleMapStageÒ²Íê³ÉÁË£¬Ôò»á×¢²á¸ÃShuffleMapStageÔËÐеõ½µÄËùÓÐMapÊä³ö½á¹û£¬´úÂëÈçÏÂËùʾ£º
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true) |
ÉÏÃæMapOutputTrackerά»¤ÁËÒ»¸öConcurrentHashMap[Int, Array[MapStatus]]ÄÚ´æ½á¹¹£¬ÓÃÀ´¹ÜÀíÿ¸öShuffleMapTaskÔËÐÐÍê³É·µ»ØµÄ½á¹ûÊý¾Ý£¬ÆäÖÐKeyÊÇShuffle
ID£¬ValueʹÓÃÊý×é¼Ç¼ÿ¸öMap ID¶ÔÓ¦µÄÊä³ö½á¹ûÐÅÏ¢¡£
ÏÂÃæ´úÂëÅжÏShuffleMapStageÊÇ·ñ¿ÉÓ㬴Ӷø½øÐÐÏàÓ¦µÄ´¦Àí£º
if (!shuffleStage.isAvailable)
{
// Some tasks had failed; let's resubmit this
shuffleStage.
// TODO: Lower-level scheduler should also deal
with this
logInfo("Resubmitting " + shuffleStage
+ " (" + shuffleStage.name + ")
because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(",
"))
submitStage(shuffleStage)
} else {
// Mark any map-stage jobs waiting on this stage
as finished
if (shuffleStage.mapStageJobs.nonEmpty) {
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
for (job <- shuffleStage.mapStageJobs) {
markMapStageJobAsFinished(job, stats)
}
}
submitWaitingChildStages(shuffleStage) |
Èç¹ûShuffleMapStage²»¿ÉÓã¬ËµÃ÷»¹ÓÐijЩPartition¶ÔÓ¦µÄ½á¹ûûÓмÆË㣨»òÕßijЩ¼ÆËãʧ°Ü£©£¬Spark»áÖØÐÂÌá½»¸ÃShuffleMapStage£»Èç¹û¿ÉÓã¬Ôò˵Ã÷µ±Ç°ShuffleMapStageÒѾÔËÐÐÍê³É£¬¸üжÔÓ¦µÄ״̬ºÍ½á¹ûÐÅÏ¢£º±ê¼ÇShuffleMapStageÒѾÍê³É£¬Í¬Ê±Ìá½»StageÒÀÀµ¹ØÏµÁ´ÖÐÏàÁÚÏÂÓεÄStageÔËÐС£Èç¹ûºóÃæÊÇResultStage£¬Ôò»áÌá½»¸ÃResultStageÔËÐС£
ÊÍ·Å×ÊÔ´¡¢ÖØÐµ÷¶ÈTaskÔËÐÐ
Ò»¸öShuffleMapTaskÔËÐÐÍê³É£¬ÒªÊͷŵô¶ÔÓ¦µÄExecutorÕ¼ÓõÄ×ÊÔ´£¬ÔÚDriver¶Ë»áÔö¼Ó¶ÔÓ¦µÄ×ÊÔ´ÁÐ±í£¬Í¬Ê±µ÷¶ÈTaskµ½¸ÃÊͷŵÄExecutorÉÏÔËÐУ¬¿É¼ûCoarseGrainedSchedulerBackend.DriverEndpointÖжÔÓ¦µÄ´¦ÀíÂߣ¬´úÂëÈçÏÂËùʾ£º
if (TaskState.isFinished(state))
{
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId) |
ÉÏÃæmakeOffers()·½·¨£¬»áµ÷¶ÈÒ»¸öTaskµ½¸ÃexecutorId±êʶµÄExecutorÉÏÔËÐС£Èç¹ûShuffleMapStageÒѾÍê³É£¬ÄÇôÕâÀï¿ÉÄÜ»áµ÷¶ÈResultStage½×¶ÎµÄResultTaskÔËÐС£
Reduce½×¶Î´¦ÀíÁ÷³Ì
ÉÏÃæÎÒÃǸø³öµÄÀý×ÓÖУ¬Ö´ÐÐreduceByKeyºó£¬ÓÉÓÚÉÏÓεÄRDDûÓа´ÕÕkeyÖ´ÐзÖÇø²Ù×÷£¬ËùÒԱض¨»á´´½¨Ò»¸öShuffledRDD£¬¿ÉÒÔÔÚPairRDDFunctionsÀàµÄÔ´ÂëÖп´µ½combineByKeyWithClassTag·½·¨£¬ÊµÏÖ´úÂëÈçÏÂËùʾ£º
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null) (implicit ct: ClassTag[C]):
RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners
must be defined") // required as of Spark
0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side
combining with array keys.")
}
if (partitioner.isInstanceOf [HashPartitioner])
{
throw new SparkException ("Default partitioner
cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean (createCombiner),
self.context.clean (mergeValue),
self.context.clean (mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator (context, aggregator.combineValuesByKey(iter,
context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
} |
ÕâÀÒòΪÎÒÃǸø³öµÄÀý×ÓµÄÉÏÏÂÎÄÖУ¬self.partitioner == Some(partitioner)²»³ÉÁ¢£¬ËùÒÔ×îÖÕ´´½¨ÁËÒ»¸öShuffledRDD¶ÔÏó¡£ËùÒÔ£¬¶ÔÓÚReduce½×¶ÎµÄ´¦ÀíÁ÷³Ì£¬ÎÒÃÇ»ùÓÚShuffledRDDµÄ´¦Àí¹ý³ÌÀ´½øÐзÖÎö¡£
ÎÒÃÇ´ÓResultTaskÀ࿪ʼ£¬¸ÃÀàÖÐʵÏÖÁËrunTask()·½·¨£¬´úÂëÈçÏÂËùʾ£º
override def
runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the
broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext,
Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis()
- deserializeStartTime
func(context, rdd.iterator(partition, context))
} |
ÆäÖУ¬×îºËÐĵľÍÊÇÉÏÃæµÄrdd.iterator()µ÷Ó㬾ßÌå´¦Àí¹ý³Ì£¬ÈçÏÂͼËùʾ£º

×îÖÕ£¬ËüÓÃÀ´¼ÆËãÒ»¸öRDD£¬¼´¶ÔÓ¦ShuffledRDDµÄ¼ÆËã¡£iterator()·½·¨ÊÇÔÚRDDÀàÖиø³öµÄ£¬ÈçÏÂËùʾ£º
final def iterator(split:
Partition, context: TaskContext): Iterator[T]
= {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
} |
¸ú×ÙgetOrCompute()·½·¨£¬×îÖÕÓ¦¸ÃÊÇÔÚShuffledRDDÀàµÄcompute()·½·¨Öж¨Òå¡£
ShuffledRDD¼ÆËã
ShuffledRDD¶ÔÓ¦µÄcompute·½·¨µÄʵÏÖ´úÂ룬ÈçÏÂËùʾ£º
override def
compute(split: Partition, context: TaskContext):
Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf [ShuffleDependency[K,
V, C]]
SparkEnv.get.shuffleManager.getReader (dep.shuffleHandle,
split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
} |
ÉÏÃæÖ÷ÒªÊÇͨ¹ýBlockStoreShuffleReaderµÄread()·½·¨£¬À´ÊµÏÖShuffledRDDµÄ¼ÆË㣬ÎÒÃÇͨ¹ýÏÂÃæµÄÐòÁÐͼÀ´¿´Ò»ÏÂÏêϸµÄÖ´ÐÐÁ÷³Ì£º

¸ú×ÙMapµÄÊä³ö½á¹û£¬ÊÇ»ùÓÚExecutor¶ËµÄMapOutputTrackerÓëDriver¶ËµÄMapOutputTrackerMasterÀ´ÊµÏֵ쬯äÖÐMapOutputTrackerMaster×÷ΪServer¶Ë£¬MapOutputTracker×÷ΪClient¶Ë¡£Driver¶Ë¹ÜÀíÁËÒ»¸öSpark
Application¼ÆËã³ÌÐòµÄShuffleMapStageÖÐËùÓÐShuffleMapTaskµÄÊä³ö£¬ËùÒÔÔÚReduce¹ý³ÌÖÐExecutor»áͨ¹ýMapOutputTrackerÓëDriverµÄMapOutputTrackerMaster½øÐÐͨÐÅ»ñÈ¡¡£
µ÷ÓÃBlockStoreShuffleReaderµÄread()·½·¨£¬×îÖյõ½ÁËReduce¹ý³ÌÖÐÐèÒªµÄÊäÈ룬¼´ShuffleMapTaskµÄÊä³ö½á¹ûËùÔÚµÄλÖá£Í¨³££¬ÎªÁËÄܹ»Ê¹¼ÆËãÔÚÊý¾Ý±¾µØ½øÐУ¬Ã¿¸öResultTaskÔËÐÐËùÔÚµÄExecutor½Úµã»á´æÔÚ¶ÔÓ¦µÄMapÊä³ö£¬ÊÇͨ¹ýBlockManagerÀ´¹ÜÀíÕâЩÊý¾ÝµÄ£¬Í¨¹ýBlock
IDÀ´±êʶ¡£ËùÒÔ£¬ÉÏͼÖÐ×îºó·µ»ØÁËÒ»¸öBlockManager ID¼°ÊÜÆä¹ÜÀíµÄÒ»¸öBlock IDÁÐ±í£¬È»ºóExecutorÉϵÄResultTask¾ÍÄܹ»¸ù¾ÝBlockManager
IDÀ´»ñÈ¡µ½¶ÔÓ¦µÄMapÊä³öÊý¾Ý£¬´Ó¶ø½øÐÐÊý¾ÝµÄ¼ÆËã¡£
ResultTaskÔËÐÐÍê³Éºó£¬×îÖÕ·µ»ØÒ»¸ö¼Ç¼µÄµü´úÆ÷£¬´Ëʱ¼ÆËãµÃµ½µÄ×îÖÕ½á¹ûÊý¾Ý£¬ÊÇÔÚ¸÷¸öResultTaskÔËÐÐËùÔÚµÄExecutorÉϵ쬶øÊý¾ÝÓÖÊǰ´BlockÀ´´æ´¢µÄ£¬ÊÇͨ¹ýBlockManagerÀ´¹ÜÀíµÄ¡£
±£´æ½á¹ûRDD
¸ù¾ÝÇ°ÃæµÄ³ÌÐòʾÀý£¬×îºóµ÷ÓÃÁËRDDµÄsaveAsTextFile()£¬Õâ»áÓÖÉú³ÉÒ»¸öResultStage£¬½ø¶ø¶ÔÓ¦×ÅÒ»×éResultTask¡£±£´æ½á¹ûRDDµÄ´¦ÀíÁ÷³Ì£¬ÈçÏÂͼËùʾ£º

ÉÏÃæÕû¸öÁ÷³Ì£¬»áÖ´ÐÐÉèÖÃRDDÊä³öµ½HDFSµÄWriter£¨Ò»¸öдÎļþµÄº¯Êý£©¡¢Ìá½»ResultStage¡¢¹¹½¨°üº¬ResultTaskµÄTaskSet¡¢µ÷¶ÈResultTaskµ½Ö¸¶¨ExecutorÉÏÖ´ÐÐÕ⼸¸öºËÐĵĹý³Ì¡£Êµ¼ÊÉÏ£¬ÔÚÿ¸öExecutorÉÏÔËÐеÄResultTaskµÄºËÐÄ´¦ÀíÂß¼£¬Ö÷ÒªÊÇÏÂÃæÕâ¶Îº¯Êý´úÂ룺
val writer =
new SparkHadoopWriter(hadoopConf)
writer.preSetup()
val writeToFile = (context: TaskContext, iter:
Iterator[(K, V)]) => {
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
val (outputMetrics, callback) = SparkHadoop WriterUtils.initHadoopOutputMetrics (context)
writer.setup (context.stageId, context.partitionId,
taskAttemptId)
writer.open()
var recordsWritten = 0L
Utils.tryWithSafeFinallyAndFailureCallbacks {
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
// Update bytes written metric every few records
SparkHadoopWriterUtils.maybeUpdateOutputMetrics (outputMetrics,
callback, recordsWritten)
recordsWritten += 1
}
}(finallyBlock = writer.close())
writer.commit()
outputMetrics.setBytesWritten (callback())
outputMetrics.setRecordsWritten(recordsWritten)
} |
»¹¼ÇµÃÎÒÃÇÔÚ¼ÆËãShuffledRDDµÄ¹ý³ÌÖУ¬×îÖÕµÄResultTaskÉú³ÉÁËÒ»¸ö½á¹ûµÄµü´úÆ÷¡£µ±µ÷ÓÃsaveAsTextFile()ʱ£¬ResultStage¶ÔÓ¦µÄÒ»×éResultTask»áÔÚExecutorÉÏÔËÐУ¬½«Ã¿¸öµü´úÆ÷¶ÔÓ¦µÄ½á¹ûÊý¾Ý±£´æµ½HDFSÉÏ¡£ |