Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
SparkÔ´ÂëϵÁУ¨Áù£©ShuffleµÄ¹ý³Ì½âÎö
 
×÷Õß á¯Óñº£µÄ²©¿Í£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-11-07
  3384  次浏览      44
 

Spark´ó»áÉÏ£¬ËùÓеÄÑݽ²¼Î±ö¶¼ÈÏΪshuffleÊÇ×îÓ°ÏìÐÔÄܵĵط½£¬µ«ÊÇÓÖÎÞ¿ÉÄκΡ£Ö®Ç°È¥°Ù¶ÈÃæÊÔhadoopµÄʱºò£¬Ò²±»Îʵ½ÁËÕâ¸öÎÊÌ⣬ֱ½Ó»Ø´ðÁ˲»ÖªµÀ¡£ÕâÆªÎÄÕÂÖ÷ÒªÊÇÑØ×ÅÏÂÃæ¼¸¸öÎÊÌâÀ´¿ªÕ¹£ºshuffle¹ý³ÌµÄ»®·Ö£¿shuffleµÄÖмä½á¹ûÈçºÎ´æ´¢£¿shuffleµÄÊý¾ÝÈçºÎÀ­È¡¹ýÀ´£¿

Shuffle¹ý³ÌµÄ»®·Ö

SparkµÄ²Ù×÷Ä£ÐÍÊÇ»ùÓÚRDDµÄ£¬µ±µ÷ÓÃRDDµÄreduceByKey¡¢groupByKeyµÈÀàËÆµÄ²Ù×÷µÄʱºò£¬¾ÍÐèÒªÓÐshuffleÁË¡£ÔÙÄóöreduceByKeyÕâ¸öÀ´½²¡£

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
reduceByKey(new HashPartitioner(numPartitions), func)
}

reduceByKeyµÄʱºò£¬ÎÒÃÇ¿ÉÒÔÊÖ¶¯É趨reduceµÄ¸öÊý£¬Èç¹û²»Ö¸¶¨µÄ»°£¬¾Í¿ÉÄܲ»ÊÜ¿ØÖÆÁË¡£

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}

Èç¹û²»Ö¸¶¨reduce¸öÊýµÄ»°£¬¾Í°´Ä¬ÈϵÄ×ߣº

1¡¢Èç¹û×Ô¶¨ÒåÁË·ÖÇøº¯ÊýpartitionerµÄ»°£¬¾Í°´ÄãµÄ·ÖÇøº¯ÊýÀ´×ß¡£

2¡¢Èç¹ûûÓж¨Ò壬ÄÇôÈç¹ûÉèÖÃÁËspark.default.parallelism£¬¾ÍʹÓùþÏ£µÄ·ÖÇø·½Ê½£¬reduce¸öÊý¾ÍÊÇÉèÖõÄÕâ¸öÖµ¡£

3¡¢Èç¹ûÕâ¸öҲûÉèÖã¬ÄǾͰ´ÕÕÊäÈëÊý¾ÝµÄ·ÖƬµÄÊýÁ¿À´É趨¡£Èç¹ûÊÇhadoopµÄÊäÈëÊý¾ÝµÄ»°£¬Õâ¸ö¾Í¶àÁË¡£¡£¡£´ó¼Ò¿ÉҪСÐİ¡¡£

É趨ÍêÖ®ºó£¬Ëü»á×öÈý¼þÊÂÇ飬Ҳ¾ÍÊÇ֮ǰ½²µÄ3´ÎRDDת»»¡£
//map¶ËÏȰ´ÕÕkeyºÏ²¢Ò»´Î
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
//reduceץȡÊý¾Ý
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer)
//ºÏ²¢Êý¾Ý£¬Ö´ÐÐreduce¼ÆËã
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)

1¡¢ÔÚµÚÒ»¸öMapPartitionsRDDÕâÀïÏÈ×öÒ»´Îmap¶ËµÄ¾ÛºÏ²Ù×÷¡£

2¡¢ShuffledRDDÖ÷ÒªÊÇ×ö´ÓÕâ¸öץȡÊý¾ÝµÄ¹¤×÷¡£

3¡¢µÚ¶þ¸öMapPartitionsRDD°Ñץȡ¹ýÀ´µÄÊý¾ÝÔٴνøÐоۺϲÙ×÷¡£

4¡¢²½Öè1ºÍ²½Öè3¶¼»áÉæ¼°µ½spillµÄ¹ý³Ì¡£

Ôõô×öµÄ¾ÛºÏ²Ù×÷£¬»ØÈ¥¿´RDDÄÇÕ¡£

ShuffleµÄÖмä½á¹ûÈçºÎ´æ´¢

×÷ÒµÌá½»µÄʱºò£¬DAGScheduler»á°ÑShuffleµÄ¹ý³ÌÇзֳÉmapºÍreduceÁ½¸öStage£¨Ö®Ç°Ò»Ö±±»ÎÒ½Ð×öshuffleǰºÍshuffleºó£©£¬¾ßÌåµÄÇзֵÄλÖÃÔÚÉÏͼµÄÐéÏß´¦¡£

map¶ËµÄÈÎÎñ»á×÷Ϊһ¸öShuffleMapTaskÌá½»£¬×îºóÔÚTaskRunnerÀïÃæµ÷ÓÃÁËËüµÄrunTask·½·¨¡£

override def runTask(context: TaskContext): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
metrics = Some(context.taskMetrics)

val blockManager = SparkEnv.get.blockManager
val shuffleBlockManager = blockManager.shuffleBlockManager
var shuffle: ShuffleWriterGroup = null
var success = false

try {
// serializerΪ¿ÕµÄÇé¿öµ÷ÓÃĬÈϵÄJavaSerializer£¬Ò²¿ÉÒÔͨ¹ýspark.serializerÀ´ÉèÖóɱðµÄ
val ser = Serializer.getSerializer(dep.serializer)
// ʵÀý»¯Writer£¬WriterµÄÊýÁ¿=numOutputSplits=Ç°ÃæÎÒÃÇ˵µÄÄǸöreduceµÄÊýÁ¿
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

// ±éÀúrddµÄÔªËØ£¬°´ÕÕkey¼ÆËã³öÀ´ËüËùÔÚµÄbucketId£¬È»ºóͨ¹ýbucketIdÕÒµ½ÏàÓ¦µÄWriterдÈë
for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}

// ÌύдÈë²Ù×÷. ¼ÆËãÿ¸öbucket blockµÄ´óС
var totalBytes = 0L
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}

// ¸üРshuffle ¼à¿Ø²ÎÊý.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception =>
// ³ö´íÁË£¬È¡Ïû֮ǰµÄ²Ù×÷£¬¹Ø±Õwriter
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throw e
} finally {
// ¹Ø±Õwriter
if (shuffle != null && shuffle.writers != null) {
try {
shuffle.releaseWriters(success)
} catch {
case e: Exception => logError("Failed to release shuffle writers", e)
}
}
// Ö´ÐÐ×¢²áµÄ»Øµ÷º¯Êý£¬Ò»°ãÊÇ×öÇåÀí¹¤×÷
context.executeOnCompleteCallbacks()
}
}

±éÀúÿһ¸ö¼Ç¼£¬Í¨¹ýËüµÄkeyÀ´È·¶¨ËüµÄbucketId£¬ÔÙͨ¹ýÕâ¸öbucketµÄwriterдÈëÊý¾Ý¡£

ÏÂÃæÎÒÃÇ¿´¿´ShuffleBlockManagerµÄforMapTask·½·¨°É¡£

¡¡val cpier = new ShuffleCopier(blockManager.conf)
cpier.getBlocks(cmId, req.blocks, putResult)

Õâ¿é½ÓÏÂÀ´¾ÍÊÇnettyµÄ¿Í»§¶Ëµ÷Óõķ½·¨ÁË£¬ÎÒ¶ÔÕâ¸ö²»Á˽⡣ÔÚ·þÎñ¶ËµÄ´¦ÀíÊÇÔÚDiskBlockManagerÄÚ²¿Æô¶¯ÁËÒ»¸öShuffleSenderµÄ·þÎñ£¬×îÖÕµÄÒµÎñ´¦ÀíÂß¼­ÊÇÔÚFileServerHandler¡£

ËüÊÇͨ¹ýgetBlockLocation·µ»ØÒ»¸öFileSegment£¬ÏÂÃæÕâ¶Î´úÂëÊÇShuffleBlockManagerµÄgetBlockLocation·½·¨¡£

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null

val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
¡¡¡¡¡¡¡¡¡¡¡¡// ´ÓÒÑÓеÄÎļþ×éÀïÑ¡Îļþ£¬Ò»¸öbucketÒ»¸öÎļþ£¬¼´Òª·¢Ë͵½Í¬Ò»¸öreduceµÄÊý¾ÝдÈ뵽ͬһ¸öÎļþ
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
}
} else {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
// °´ÕÕblockIdÀ´Éú³ÉÎļþ£¬ÎļþÊýΪmapÊý*reduceÊý
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
if (blockFile.exists) {
if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile")
} else {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
}

1¡¢mapµÄÖмä½á¹ûÊÇдÈëµ½±¾µØÓ²Å̵쬶ø²»ÊÇÄÚ´æ¡£

2¡¢Ä¬ÈÏÊÇÒ»¸öExecutorµÄÖмä½á¹ûÎļþÊÇM*R£¨M=mapÊýÁ¿£¬R=reduceµÄÊýÁ¿£©£¬ÉèÖÃÁËspark.shuffle.consolidateFilesΪtrueÖ®ºóÊÇR¸öÎļþ£¬¸ù¾ÝbucketId°ÑÒª·Öµ½Í¬Ò»¸öreduceµÄ½á¹ûдÈëµ½Ò»¸öÎļþÖС£

3¡¢consolidateFiles²ÉÓõÄÊÇÒ»¸öreduceÒ»¸öÎļþ£¬Ëü»¹¼Ç¼ÁËÿ¸ömapµÄдÈëÆðʼλÖã¬ËùÒÔ²éÕÒµÄʱºòÏÈͨ¹ýreduceId²éÕÒµ½ÄĸöÎļþ£¬ÔÙͨ¹ýmapId²éÕÒË÷Òýµ±ÖÐµÄÆðʼλÖÃoffset£¬³¤¶Èlength=£¨mapId + 1£©.offset -£¨mapId£©.offset£¬ÕâÑù¾Í¿ÉÒÔÈ·¶¨Ò»¸öFileSegment(file, offset, length)¡£

4¡¢Finally£¬´æ´¢½áÊøÖ®ºó£¬ ·µ»ØÁËÒ»¸önew MapStatus(blockManager.blockManagerId, compressedSizes)£¬°ÑblockManagerIdºÍblockµÄ´óС¶¼Ò»Æð·µ»Ø¡£

¸öÈËÏë·¨£¬shuffleÕâ¿éºÍhadoopµÄ»úÖÆ²î±ð²»´ó£¬tezÕâÑùµÄÒýÇæ»á¸ÏÉÏsparkµÄËÙ¶ÈÄØ£¿»¹ÊÇÈÃÎÒÃÇÊÃÄ¿ÒÔ´ý°É£¡

ShuffleµÄÊý¾ÝÈçºÎÀ­È¡¹ýÀ´

ShuffleMapTask½áÊøÖ®ºó£¬×îºó×ßµ½DAGSchedulerµÄhandleTaskCompletion·½·¨µ±ÖУ¨¹ØÓÚÖмäµÄ¹ý³Ì£¬Çë¿´¡¶Í¼½â×÷ÒµÉúÃüÖÜÆÚ¡·£©¡£

case smt: ShuffleMapTask =>
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
markStageAsFinished(stage)
if (stage.shuffleDep.isDefined) {
// ÕæµÄmap¹ý³Ì²Å»áÓÐÕâ¸öÒÀÀµ£¬reduce¹ý³ÌNone
mapOutputTracker.registerMapOutputs(
¡¡¡¡stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
changeEpoch = true)
}
clearCacheLocs()
if (stage.outputLocs.exists(_ == Nil)) {
// һЩÈÎÎñʧ°ÜÁË£¬ÐèÒªÖØÐÂÌá½»stage
submitStage(stage)
} else {
// Ìá½»ÏÂÒ»ÅúÈÎÎñ
¡¡¡¡¡¡}
}

1¡¢°Ñ½á¹ûÌí¼Óµ½StageµÄoutputLocsÊý×éÀËüÊǰ´ÕÕÊý¾ÝµÄ·ÖÇøIdÀ´´æ´¢Ó³Éä¹ØÏµµÄpartitionId->MapStaus¡£

2¡¢stage½áÊøÖ®ºó£¬Í¨¹ýmapOutputTrackerµÄregisterMapOutputs·½·¨£¬°Ñ´Ë´ÎshuffleµÄ½á¹ûoutputLocs¼Ç¼µ½mapOutputTrackerÀïÃæ¡£

Õâ¸östage½áÊøÖ®ºó£¬¾Íµ½ShuffleRDDÔËÐÐÁË£¬ÎÒÃÇ¿´Ò»ÏÂËüµÄcomputeº¯Êý¡£

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)

ËüÊÇͨ¹ýShuffleFetchµÄfetch·½·¨À´×¥È¡µÄ£¬¾ßÌåʵÏÖÔÚBlockStoreShuffleFetcherÀïÃæ¡£

override def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
{
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
¡¡¡¡ // mapOutputTrackerÒ²·ÖMasterºÍWorker£¬ WorkerÏòMasterÇëÇó»ñÈ¡reduceÏà¹ØµÄMapStatus£¬Ö÷ÒªÊÇ£¨BlockManagerIdºÍsize£©
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
// Ò»¸öBlockManagerId¶ÔÓ¦¶à¸öÎļþµÄ´óС
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
for (((address, size), index) <- statuses.zipWithIndex) {
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
}
// ¹¹ÔìBlockManagerId ºÍ BlockIdµÄÓ³Éä¹ØÏµ£¬Ïë²»µ½ShffleBlockIdµÄmapId£¬¾ÓÈ»ÊÇ1,2,3,4µÄÐòÁÐ...
val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
case (address, splits) =>
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}
// ÃûΪupdateBlock£¬Êµ¼ÊÊǼìÑ麯Êý£¬Ã¿¸öBlock¶¼¶ÔÓ¦×ÅÒ»¸öIterator½Ó¿Ú£¬ Èç¹û¸Ã½Ó¿ÚΪ¿Õ£¬ÔòÓ¦¸Ã±¨´í
def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
block.asInstanceOf[Iterator[T]]
}
case None => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
case _ =>
throw new SparkException("Failed to get block " + blockId + ", which is not a shuffle block")
}
}
}
}
// ´ÓblockManager»ñÈ¡reduceËùÐèÒªµÄÈ«²¿block£¬²¢Ìí¼ÓУÑ麯Êý
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)

¡¡¡¡val completionIter = CompletionIterator[T, Iterator[T]](itr, {
// CompelteIteratorµü´ú½áÊøÖ®ºó£¬»áÖ´ÐÐÒÔÏÂÕⲿ·Ö´úÂ룬Ìá½»Ëü¼Ç¼µÄ¸÷ÖÖ²ÎÊý
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
})

new InterruptibleIterator[T](context, completionIter)
}
}

1¡¢MapOutputTrackerWorkerÏòMapOutputTrackerMaster»ñÈ¡shuffleÏà¹ØµÄmap½á¹ûÐÅÏ¢¡£

2¡¢°Ñmap½á¹ûÐÅÏ¢¹¹Ôì³ÉBlockManagerId --> Array(BlockId, size)µÄÓ³Éä¹ØÏµ¡£

3¡¢Í¨¹ýBlockManagerµÄgetMultipleÅúÁ¿À­È¡block¡£

4¡¢·µ»ØÒ»¸ö¿É±éÀúµÄIterator½Ó¿Ú£¬²¢¸üÐÂÏà¹ØµÄ¼à¿Ø²ÎÊý¡£

ÎÒÃǼÌÐø¿´getMultiple·½·¨¡£

def getMultiple(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer): BlockFetcherIterator = {
val iter =
if (conf.getBoolean("spark.shuffle.use.netty", false)) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
} else {
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
}

iter.initialize()
iter
}

·ÖÁ½ÖÖÇé¿ö´¦Àí£¬·Ö±ðÊÇnettyµÄºÍBasicµÄ£¬BasicµÄ¾Í²»½²ÁË£¬¾ÍÊÇͨ¹ýConnectionManagerÈ¥Ö¸¶¨µÄBlockManagerÄÇÀï»ñÈ¡Êý¾Ý£¬ÉÏÒ»Õ¸պÃ˵ÁË¡£

ÎÒÃǽ²Ò»ÏÂNettyµÄ°É£¬Õâ¸öÊÇÐèÒªÉèÖõIJÅÄÜÆôÓõ쬲»ÖªµÀÐÔÄܻ᲻»áºÃÒ»Ð©ÄØ£¿

¿´NettyBlockFetcherIteratorµÄinitialize·½·¨£¬ÔÙ¿´BasicBlockFetcherIteratorµÄinitialize·½·¨£¬·¢ÏÖBasicµÄ²»ÄÜͬʱץȡ³¬¹ý48MbµÄÊý¾Ý¡£

override def initialize() {
// ·Ö¿ª±¾µØÇëÇóºÍÔ¶³ÌÇëÇ󣬷µ»ØÔ¶³ÌµÄFetchRequest
val remoteRequests = splitLocalRemoteBlocks()
// ץȡ˳ÐòËæ»ú
for (request <- Utils.randomize(remoteRequests)) {
fetchRequestsSync.put(request)
}
// ĬÈÏÊÇ¿ª6¸öÏß³ÌÈ¥½øÐÐץȡ
copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// ¶ÁÈ¡±¾µØµÄblock
getLocalBlocks()
}

ÔÚNettyBlockFetcherIteratorµÄsendRequest·½·¨ÀïÃæ£¬·¢ÏÖËüÊÇͨ¹ýShuffleCopierÀ´ÊÔϵġ£

val cpier = new ShuffleCopier(blockManager.conf)
cpier.getBlocks(cmId, req.blocks, putResult)

Õâ¿é½ÓÏÂÀ´¾ÍÊÇnettyµÄ¿Í»§¶Ëµ÷Óõķ½·¨ÁË£¬ÎÒ¶ÔÕâ¸ö²»Á˽⡣ÔÚ·þÎñ¶ËµÄ´¦ÀíÊÇÔÚDiskBlockManagerÄÚ²¿Æô¶¯ÁËÒ»¸öShuffleSenderµÄ·þÎñ£¬×îÖÕµÄÒµÎñ´¦ÀíÂß¼­ÊÇÔÚFileServerHandler¡£

ËüÊÇͨ¹ýgetBlockLocation·µ»ØÒ»¸öFileSegment£¬ÏÂÃæÕâ¶Î´úÂëÊÇShuffleBlockManagerµÄgetBlockLocation·½·¨¡£
def getBlockLocation(id: ShuffleBlockId): FileSegment = {
// Search all file groups associated with this shuffle.
val shuffleState = shuffleStates(id.shuffleId)
for (fileGroup <- shuffleState.allFileGroups) {
val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
if (segment.isDefined) { return segment.get }
}
throw new IllegalStateException("Failed to find shuffle block: " + id)
}

ÏÈͨ¹ýshuffleIdÕÒµ½ShuffleState£¬ÔÙͨ¹ýreduceIdÕÒµ½Îļþ£¬×îºóͨ¹ýmapIdÈ·¶¨ËüµÄÎļþ·ÖƬµÄλÖᣵ«ÊÇÕâÀïÓиöÒÉÎÊÁË£¬Èç¹ûÆôÓÃÁËconsolidateFiles£¬Ò»¸öreduceµÄËùÐèÊý¾Ý¶¼ÔÚÒ»¸öÎļþÀÊDz»ÊǾͿÉÒÔ°ÑÕû¸öÎļþÒ»Æð·µ»ØÄØ£¬¶ø²»ÊÇͨ¹ýN¸ömapÀ´¶à´Î¶ÁÈ¡£¿»¹ÊǺ¦ÅÂÒ»´Î·¢ËÍÒ»¸ö´óÎļþÈÝÒ×ʧ°Ü£¿Õâ¾Í²»µÃ¶øÖªÁË¡£

µ½ÕâÀïÕû¸ö¹ý³Ì¾Í½²ÍêÁË¡£¿ÉÒÔ¿´µÃ³öÀ´ShuffleÕâ¿é»¹ÊÇ×öÁËһЩÓÅ»¯µÄ£¬µ«ÊÇÕâЩ²ÎÊý²¢Ã»ÓÐÆôÓã¬ÓÐÐèÒªµÄÅóÓÑ¿ÉÒÔ×Ô¼ºÆôÓÃÒ»ÏÂÊÔÊÔЧ¹û¡£

   
3384 ´Îä¯ÀÀ       44
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
ǶÈëʽÈí¼þ¼Ü¹¹Éè¼Æ 12-11[±±¾©]
LLM´óÄ£ÐÍÓëÖÇÄÜÌ忪·¢ÊµÕ½ 12-18[±±¾©]
ǶÈëʽÈí¼þ²âÊÔ 12-25[±±¾©]
AIÔ­ÉúÓ¦ÓõÄ΢·þÎñ¼Ü¹¹ 1-9[±±¾©]
AI´óÄ£Ðͱàд¸ßÖÊÁ¿´úÂë 1-14[±±¾©]
ÐèÇó·ÖÎöÓë¹ÜÀí 1-22[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí