Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
SparkÔ´ÂëϵÁУ¨¶þ£©RDDÏê½â
 
×÷Õß á¯Óñº£µÄ²©¿Í£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-11-04
  6403  次浏览      28
 

1¡¢Ê²Ã´ÊÇRDD£¿

ÉÏÒ»Õ½²ÁËSparkÌá½»×÷ÒµµÄ¹ý³Ì£¬ÕâÒ»ÕÂÎÒÃÇÒª½²RDD¡£¼òµ¥µÄ½²£¬RDD¾ÍÊÇSparkµÄinput£¬ÖªµÀinputÊÇɶ°É£¬¾ÍÊÇÊäÈëµÄÊý¾Ý¡£

RDDµÄÈ«ÃûÊÇResilient Distributed Dataset£¬Òâ˼ÊÇÈÝ´íµÄ·Ö²¼Ê½Êý¾Ý¼¯£¬Ã¿Ò»¸öRDD¶¼»áÓÐ5¸öÌØÕ÷£º

1¡¢ÓÐÒ»¸ö·ÖƬÁÐ±í¡£¾ÍÊÇÄܱ»Çз֣¬ºÍhadoopÒ»ÑùµÄ£¬Äܹ»ÇзֵÄÊý¾Ý²ÅÄܲ¢ÐмÆËã¡£

2¡¢ÓÐÒ»¸öº¯Êý¼ÆËãÿһ¸ö·ÖƬ£¬ÕâÀïÖ¸µÄÊÇÏÂÃæ»áÌáµ½µÄcomputeº¯Êý¡£

3¡¢¶ÔÆäËûµÄRDDµÄÒÀÀµÁÐ±í£¬ÒÀÀµ»¹¾ßÌå·ÖΪ¿íÒÀÀµºÍÕ­ÒÀÀµ£¬µ«²¢²»ÊÇËùÓеÄRDD¶¼ÓÐÒÀÀµ¡£

4¡¢¿ÉÑ¡£ºkey-valueÐ͵ÄRDDÊǸù¾Ý¹þÏ£À´·ÖÇøµÄ£¬ÀàËÆÓÚmapreduceµ±ÖеÄParitioner½Ó¿Ú£¬¿ØÖÆkey·Öµ½Äĸöreduce¡£

5¡¢¿ÉÑ¡£ºÃ¿Ò»¸ö·ÖƬµÄÓÅÏȼÆËãλÖã¨preferred locations£©£¬±ÈÈçHDFSµÄblockµÄËùÔÚλÖÃÓ¦¸ÃÊÇÓÅÏȼÆËãµÄλÖá£

¶ÔÓ¦×ÅÉÏÃæÕ⼸µã£¬ÎÒÃÇÔÚRDDÀïÃæÄÜÕÒµ½Õâ4¸ö·½·¨ºÍ1¸öÊôÐÔ£¬±ð׿±£¬ÏÂÃæÎÒÃÇ»áÂýÂýÕ¹¿ªËµÕâ5¸ö¶«¶«¡£

//Ö»¼ÆËãÒ»´Î  
protected def getPartitions: Array[Partition]
//¶ÔÒ»¸ö·ÖƬ½øÐмÆË㣬µÃ³öÒ»¸ö¿É±éÀúµÄ½á¹û
def compute(split: Partition, context: TaskContext): Iterator[T]
//Ö»¼ÆËãÒ»´Î£¬¼ÆËãRDD¶Ô¸¸RDDµÄÒÀÀµ
protected def getDependencies: Seq[Dependency[_]] = deps
//¿ÉÑ¡µÄ£¬·ÖÇøµÄ·½·¨£¬Õë¶ÔµÚ4µã£¬ÀàËÆÓÚmapreduceµ±ÖеÄParitioner½Ó¿Ú£¬¿ØÖÆkey·Öµ½Äĸöreduce
@transient val partitioner: Option[Partitioner] = None
//¿ÉÑ¡µÄ£¬Ö¸¶¨ÓÅÏÈλÖã¬ÊäÈë²ÎÊýÊÇsplit·ÖƬ£¬Êä³ö½á¹ûÊÇÒ»×éÓÅÏȵĽڵãλÖÃ
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

2¡¢¶àÖÖRDDÖ®¼äµÄת»»

ÏÂÃæÓÃÒ»¸öʵÀý½²½âһϰɣ¬¾ÍÄÃÎÒÃdz£ÓõÄÒ»¶Î´úÂëÀ´½²°É£¬È»ºó»á°ÑÎÒÃdz£ÓõÄRDD¶¼»á½²µ½¡£

val hdfsFile = sc.textFile(args(1))
val flatMapRdd = hdfsFile.flatMap(s => s.split(" "))
val filterRdd = flatMapRdd.filter(_.length == 2)
val mapRdd = filterRdd.map(word => (word, 1))
val reduce = mapRdd.reduceByKey(_ + _)

ÕâÀïÉæ¼°µ½ºÜ¶à¸öRDD£¬textFileÊÇÒ»¸öHadoopRDD¾­¹ýmapºóµÄMappredRDD£¬¾­¹ýflatMapÊÇÒ»¸öFlatMappedRDD£¬¾­¹ýfilter·½·¨Ö®ºóÉú³ÉÁËÒ»¸öFilteredRDD£¬¾­¹ýmapº¯ÊýÖ®ºó£¬±ä³ÉÒ»¸öMappedRDD£¬Í¨¹ýÒþʽת»»³É PairRDD£¬×îºó¾­¹ýreduceByKey¡£

ÎÒÃÇÊ×ÏÈ¿´textFileµÄÕâ¸ö·½·¨£¬½øÈëSparkContextÕâ¸ö·½·¨£¬ÕÒµ½Ëü¡£

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString)
}

¿´ËüµÄÊäÈë²ÎÊý£¬path£¬TextInputFormat£¬LongWritable£¬Text£¬Í¬Ö¾ÃÇÁªÏ뵽ʲô£¿Ð´¹ýmapreduceµÄͯЬ¶¼Ó¦¸ÃÖªµÀ¹þ¡£

1¡¢hdfsµÄµØÖ·

2¡¢InputFormatµÄÀàÐÍ

3¡¢MapperµÄµÚÒ»¸öÀàÐÍ

4¡¢MapperµÄµÚ¶þÀàÐÍ

Õâ¾Í²»ÄÑÀí½âΪʲôÁ¢Âí¾Í¶ÔhadoopFileºóÃæ¼ÓÁËÒ»¸ömap·½·¨£¬È¡pairµÄµÚ¶þ¸ö²ÎÊýÁË£¬×îºóÔÚshellÀïÃæÎÒÃÇ¿´µ½ËüÊÇÒ»¸öMappredRDDÁË¡£

ÄÇôÏÖÔÚÈç¹û´ó¼ÒÒªÓõIJ»ÊÇtextFile£¬¶øÊÇÒ»¸ö±ðµÄhadoopÎļþÀàÐÍ£¬´ó¼Ò»á²»»áʹÓÃhadoopFileÀ´µÃµ½×Ô¼ºÒªµÃµ½µÄÀàÐÍÄØ£¬²»Òª¸æËßÎÒ²»»á¹þ£¬²»»áµÄ¸Ï½ô»ØÈ¥¸´Ï°mapreduce¡£

ÑÔ¹éÕý´«£¬Ä¬ÈϵÄdefaultMinPartitionsµÄ2̫СÁË£¬ÎÒÃÇÓõÄʱºò»¹ÊÇÉèÖôóÒ»µã°É¡£

2.1 HadoopRDD

ÎÒÃǼÌÐø×·É±ÏÂÈ¥£¬¿´¿´hadoopFile·½·¨£¬ÀïÃæÎÒÃÇ¿´µ½Ëü×öÁË3¸ö²Ù×÷¡£

1¡¢°ÑhadoopµÄÅäÖÃÎļþ±£´æµ½¹ã²¥±äÁ¿Àï¡£

2¡¢ÉèÖ÷¾¶µÄ·½·¨

3¡¢newÁËÒ»¸öHadoopRDD·µ»Ø

ºÃ£¬ÎÒÃǽÓÏÂÈ¥¿´¿´HadoopRDDÕâ¸öÀà°É£¬ÎÒÃÇÖØµã¿´¿´ËüµÄgetPartitions¡¢compute¡¢getPreferredLocations¡£

ÏÈ¿´getPartitions£¬ËüµÄºËÐÄ´úÂëÈçÏ£º

val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}

Ëüµ÷ÓõÄÊÇinputFormat×Ô´øµÄgetSplits·½·¨À´¼ÆËã·ÖƬ£¬È»ºó°Ñ·ÖƬHadoopPartition°ü×°µ½µ½arrayÀïÃæ·µ»Ø¡£

ÕâÀï˳±ã˳´øÌáһϣ¬ÒòΪ1.0ÓÖ³öÀ´Ò»¸öNewHadoopRDD£¬ËüʹÓõÄÊÇmapreduceÐÂapiµÄinputformat£¬getSplits¾Í²»ÒªÓÐminPartitionsÁË£¬±ðµÄÂß¼­¶¼ÊÇÒ»ÑùµÄ£¬Ö»ÊÇʹÓõÄÀàÓеãÇø±ð¡£

ÎÒÃǽÓÏÂÀ´¿´compute·½·¨£¬ËüµÄÊäÈëÖµÊÇÒ»¸öPartition£¬·µ»ØÊÇÒ»¸öIterator[(K, V)]ÀàÐ͵ÄÊý¾Ý£¬ÕâÀïÃæÎÒÃÇÖ»ÐèÒª¹Ø×¢2µã¼´¿É¡£

1¡¢°ÑPartitionת³ÉHadoopPartition£¬È»ºóͨ¹ýInputSplit´´½¨Ò»¸öRecordReader

2¡¢ÖØÐ´IteratorµÄgetNext·½·¨£¬Í¨¹ý´´½¨µÄreaderµ÷ÓÃnext·½·¨¶ÁÈ¡ÏÂÒ»¸öÖµ¡£

// ת»»³ÉHadoopPartition
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
// ͨ¹ýInputformµÄgetRecordReaderÀ´´´½¨Õâ¸öInputSpitµÄReader
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// µ÷ÓÃReaderµÄnext·½·¨
val key: K = reader.createKey()
val value: V = reader.createValue()
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
}

´ÓÕâÀïÎÒÃÇ¿ÉÒÔ¿´µÃ³öÀ´compute·½·¨ÊÇͨ¹ý·ÖƬÀ´»ñµÃIterator½Ó¿Ú£¬ÒÔ±éÀú·ÖƬµÄÊý¾Ý¡£

getPreferredLocations·½·¨¾Í¸ü¼òµ¥ÁË£¬Ö±½Óµ÷ÓÃInputSplitµÄgetLocations·½·¨»ñµÃËùÔÚµÄλÖá£

2.2 ÒÀÀµ

ÏÂÃæÎÒÃÇ¿´RDDÀïÃæµÄmap·½·¨

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

Ö±½ÓnewÁËÒ»¸öMappedRDD£¬»¹°ÑÄäÃûº¯Êýf´¦ÀíÁËÔÙ´«½øÈ¥£¬ÎÒÃǼÌÐø×·É±µ½MappedRDD¡£

private[spark]
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext) =
firstParent[T].iterator(split, context).map(f)
}

MappedRDD°ÑgetPartitionsºÍcompute¸øÖØÐ´ÁË£¬¶øÇÒ¶¼Óõ½ÁËfirstParent[T]£¬Õâ¸öfirstParentÊǺÎÐëÈËÒ²£¿ÎÒÃÇ¿ÉÒÔÏȵã»÷½øÈëRDD[U](prev)Õâ¸ö¹¹Ô캯ÊýÀïÃæÈ¥¡£

def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))

¾ÍÕâÑùÄã»á·¢ÏÖËü°ÑRDD¸´ÖƸøÁËdeps£¬HadoopRDD³ÉÁËMappedRDDµÄ¸¸ÒÀÀµÁË£¬Õâ¸öOneToOneDependencyÊÇÒ»¸öÕ­ÒÀÀµ£¬×ÓRDDÖ±½ÓÒÀÀµÓÚ¸¸RDD£¬¼ÌÐø¿´firstParent¡£

protected[spark] def firstParent[U: ClassTag] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

ÓÉ´ËÎÒÃÇ¿ÉÒԵóöÁ½¸ö½áÂÛ£º

1¡¢getPartitionsÖ±½ÓÑØÓÃÁ˸¸RDDµÄ·ÖƬÐÅÏ¢

2¡¢computeº¯ÊýÊÇÔÚ¸¸RDD±éÀúÿһÐÐÊý¾ÝʱÌ×Ò»¸öÄäÃûº¯Êýf½øÐд¦Àí

ºÃ°É£¬ÏÖÔÚÎÒÃÇ¿ÉÒÔÀí½âcomputeº¯ÊýÕæÕýÊÇÔÚ¸ÉÂïµÄÁË

ËüµÄÁ½¸öÏÔÖø×÷Óãº

1¡¢ÔÚûÓÐÒÀÀµµÄÌõ¼þÏ£¬¸ù¾Ý·ÖƬµÄÐÅÏ¢Éú³É±éÀúÊý¾ÝµÄIterable½Ó¿Ú

2¡¢ÔÚÓÐǰÖÃÒÀÀµµÄÌõ¼þÏ£¬ÔÚ¸¸RDDµÄIterable½Ó¿ÚÉϸø±éÀúÿ¸öÔªËØµÄʱºòÔÙÌ×ÉÏÒ»¸ö·½·¨

ÎÒÃÇ¿´¿´µã»÷½øÈëmap(f)µÄ·½·¨½øÈ¥¿´Ò»ÏÂ

def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}

¿´»ÆÉ«µÄλÖ㬿´ËüµÄnextº¯Êý£¬²»µÃ²»Ëµ£¬Ð´µÃÕæµÄºÜÃ

ÎÒÃǽÓ×Å¿´RDDµÄflatMap·½·¨£¬Äã»á·¢ÏÖËüºÍmapº¯Êý¼¸ºõÃ»Ê²Ã´Çø±ð£¬Ö»ÊÇRDD±ä³ÉÁËFlatMappedRDD£¬µ«ÊÇflatMapºÍmapµÄЧ¹û»¹ÊDzî±ðͦ´óµÄ¡£

±ÈÈç((1,2),(3,4)), Èç¹ûÊǵ÷ÓÃÁËflatMapº¯Êý£¬ÎÒÃÇ·ÃÎʵ½µÄ¾ÍÊÇ(1,2,3,4)4¸öÔªËØ£»Èç¹ûÊÇmapµÄ»°£¬ÎÒÃÇ·ÃÎʵ½µÄ¾ÍÊÇ(1,2),(3,4)Á½¸öÔªËØ¡£

ÓÐÐËȤµÄ¿ÉÒÔÈ¥¿´¿´FlatMappedRDDºÍFilteredRDDÕâÀï¾Í²»½²ÁË£¬ºÍMappedRDDÀàËÆ¡£

2.3 reduceByKey

Ç°ÃæµÄRDDת»»¶¼¼òµ¥£¬¿ÉÊǵ½ÁËreduceByKey¿É¾Í²»¼òµ¥ÁËŶ£¬ÒòΪÕâÀïÓÐÒ»¸öͬÏàͬkeyµÄÄÚÈݾۺϵÄÒ»¸ö¹ý³Ì£¬ËùÒÔËüÊÇ×ÔÓµÄÄÇÒ»Àà¡£

ÄÇreduceByKeyÕâ¸ö·½·¨ÔÚÄÄÀïÄØ£¬ËüÔÚPairRDDFunctionsÀïÃæ£¬ÕâÊǸöÒþʽת»»£¬ËùÒԱȽÏÒþ±ÎŶ£¬ÄãÔÚRDDÀïÃæÊÇÕÒ²»µ½µÄ¡£

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}

Ëüµ÷ÓõÄÊÇcombineByKey·½·¨£¬¹ý³Ì¹ý³ÌÂù¸´Ôӵģ¬ÕÛµþÆðÀ´£¬Ï²»¶¿´µÄÈË¿´¿´°É¡£

def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {

val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
// Ò»°ãµÄRDDµÄpartitionerÊÇNone£¬Õâ¸öÌõ¼þ²»³ÉÁ¢£¬¼´Ê¹³ÉÁ¢Ö»ÐèÒª¶ÔÕâ¸öÊý¾Ý×öÒ»´Î°´keyºÏ²¢valueµÄ²Ù×÷¼´¿É
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
// ĬÈÏÊÇ×ßµÄÕâ¸ö·½·¨£¬ÐèÒªmap¶ËµÄcombinber.
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializer)
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
}, preservesPartitioning = true)
} else {
// ²»ÐèÒªmap¶ËµÄcombine£¬Ö±½Ó¾ÍÀ´shuffle
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
}
}

°´ÕÕÒ»¸ö±È½Ï±ê×¼µÄÁ÷³ÌÀ´¿´µÄ»°£¬Ó¦¸ÃÊÇ×ßµÄÖмäµÄÕâÌõ·¾¶£¬Ëü¸ÉÁËÈý¼þÊ£º

1¡¢¸øÃ¿¸ö·ÖƬµÄÊý¾ÝÔÚÍâÃæÌ×Ò»¸öcombineValuesByKey·½·¨µÄMapPartitionsRDD¡£

2¡¢ÓÃMapPartitionsRDDÀ´newÁËÒ»¸öShuffledRDD³öÀ´¡£

3¡¢¶ÔShuffledRDD×öÒ»´ÎcombineCombinersByKey¡£

ÏÂÃæÎÒÃÇÏÈ¿´MapPartitionsRDD£¬ÎҰѺͱðµÄRDDÓбðµÄÁ½ÐиøÄóöÀ´ÁË£¬ºÜÃ÷ÏÔµÄÇø±ð£¬f·½·¨ÊÇÌ×ÔÚiteratorµÄÍâ±ß£¬ÕâÑù²ÅÄܶÔiteratorµÄËùÓÐÊý¾Ý×öÒ»¸öºÏ²¢¡£

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def compute(split: Partition, context: TaskContext) =
f(context, split.index, firstParent[T].iterator(split, context))
}

½ÓÏÂÀ´ÎÒÃÇ¿´AggregatorµÄcombineValuesByKeyµÄ·½·¨°É¡£

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
// ÊÇ·ñʹÓÃÍⲿÅÅÐò£¬ÊÇÓɲÎÊýspark.shuffle.spill£¬Ä¬ÈÏÊÇtrue
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
// ÓÃmapÀ´È¥ÖØ£¬ÓÃupdate·½·¨À´¸üÐÂÖµ£¬Èç¹ûûֵµÄʱºò£¬·µ»ØÖµ£¬Èç¹ûÓÐÖµµÄʱºò£¬ ͨ¹ýmergeValue·½·¨À´ºÏ²¢
// mergeValue·½·¨¾ÍÊÇÎÒÃÇÔÚreduceByKeyÀïÃæÐ´µÄÄǸöÄäÃûº¯Êý£¬ÔÚÕâÀï¾ÍÊÇ£¨_ + _£©
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
} else {
// ÓÃÁËÒ»¸öÍⲿÅÅÐòµÄmapÀ´È¥ÖØ£¬¾Í²»Í£µÄÍùÀïÃæ²åÈëÖµ¼´¿É£¬»ù±¾Ô­ÀíºÍÉÏÃæµÄ²î²»¶à£¬ Çø±ðÔÚÓÚÐèÒªÍⲿÅÅÐò
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
combiners.iterator
}

Õâ¸ö¾ÍÊÇÒ»¸öºÜµäÐ͵İ´ÕÕkeyÀ´×öºÏ²¢µÄ·½·¨ÁË£¬ÎÒÃǼÌÐø¿´ShuffledRDD°É¡£

ShuffledRDDºÍ֮ǰµÄRDDºÜÃ÷ÏÔµÄÌØÕ÷ÊÇ

1¡¢ËüµÄÒÀÀµ´«ÁËÒ»¸öNil£¨¿ÕÁÐ±í£©½øÈ¥£¬±íʾËüûÓÐÒÀÀµ¡£

2¡¢ËüµÄcompute¼ÆË㷽ʽ±È½ÏÌØ±ð£¬Õâ¸öÔÚÖ®ºóµÄÎÄÕÂ˵£¬¹ý³Ì±È½Ï¸´ÔÓ¡£

3¡¢ËüµÄ·ÖƬĬÈÏÊDzÉÓÃHashPartitioner£¬ÊýÁ¿ºÍÇ°ÃæµÄRDDµÄ·ÖƬÊýÁ¿Ò»Ñù£¬Ò²¿ÉÒÔ²»Ò»Ñù£¬ÎÒÃÇ¿ÉÒÔÔÚreduceByKeyµÄʱºò¶à´«Ò»¸ö·ÖƬÊýÁ¿¼´¿É¡£

ÔÚnewÍêShuffledRDDÖ®ºóÓÖÀ´ÁËÒ»±émapPartitionsWithContext£¬²»¹ýµ÷ÓõÄÄäÃûº¯Êý±ä³ÉÁËcombineCombinersByKey¡£

combineCombinersByKeyºÍcombineValuesByKeyµÄÂß¼­»ù±¾Ïàͬ£¬Ö»ÊÇÊäÈëÊä³öµÄÀàÐÍÓÐÇø±ð¡£combineCombinersByKeyÖ»ÊÇ×öµ¥´¿µÄºÏ²¢£¬²»»á¶ÔÊäÈëÊä³öµÄÀàÐͽøÐиı䣬combineValuesByKey»á°Ñiter[K, V]µÄVÖµ±ä³Éiter[K, C]¡£

case class Aggregator[K, V, C] (
¡¡¡¡createCombiner: V => C,
¡¡¡¡mergeValue: (C, V) => C,
¡¡¡¡mergeCombiners: (C, C) => C)
¡¡¡¡......
}

Õâ¸ö·½·¨»á¸ù¾ÝÎÒÃÇ´«½øÈ¥µÄÄäÃû·½·¨µÄ²ÎÊýµÄÀàÐÍ×öÒ»¸ö×Ô¶¯×ª»»¡£

µ½ÕâÀ×÷Òµ¶¼Ã»ÓÐÕæÕýÖ´ÐУ¬Ö»Êǽ«RDD¸÷ÖÖǶÌ×£¬ÎÒÃÇͨ¹ýRDDµÄidºÍÀàÐ͵ı仯¹Û²âµ½ÕâÒ»µã£¬RDD[1]->RDD[2]->RDD[3]......

3¡¢ÆäËüRDD

ƽ³£ÎÒÃdzýÁË´ÓhdfsÉÏÃæÈ¡Êý¾ÝÖ®ºó£¬ÎÒÃÇ»¹¿ÉÄÜ´ÓÊý¾Ý¿âÀïÃæÈ¡Êý¾Ý£¬ÄÇÔõô°ìÄØ£¿Ã»¹ØÏµ£¬ÓиöJdbcRDD£¡

val rdd = new JdbcRDD(
sc,
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 100, 3,
(r: ResultSet) => { r.getInt(1) }
).cache()

ǰ¼¸¸ö²ÎÊý´ó¼Ò¶¼¶®£¬ÎÒÃÇÖØµã˵һϺóÃæ1, 100, 3ÊÇÕ¦»ØÊ£¿

ÔÚÕâ¸öJdbcRDDÀïÃæËüĬÈÏÎÒÃÇÊǻᰴÕÕÒ»¸ölongÀàÐ͵Ä×ֶζÔÊý¾Ý½øÐÐÇз֣¬£¨1,100£©·Ö±ðÊÇ×îСֵºÍ×î´óÖµ£¬3ÊÇ·ÖÆ¬µÄÊýÁ¿¡£

±ÈÈçÎÒÃÇÒªÒ»´Î²éIDΪ1-1000,000µÄµÄÓû§£¬·Ö³É10¸ö·ÖƬ£¬ÎÒÃǾÍÌ1, 1000,000£¬ 10£©¼´¿É£¬ÔÚsqlÓï¾äÀïÃæ»¹±ØÐëÓÐ"? <= ID AND ID <= ?"µÄ¾äʽ£¬±ð³¢ÊÔ×Å×Ô¼ºÔì¾äŶ£¡

×îºóÊÇÔõô´¦ÀíResultSetµÄ·½·¨£¬×Ô¼º°®Ôõô´¦ÀíÔõô´¦ÀíÈ¥°É¡£²»¹ýȷʵ¾õ×ÅÓõò»·½±ãµÄ¿ÉÒÔ×Ô¼ºÖØÐ´Ò»¸öRDD¡£

С½á£º

ÕâÒ»ÕÂÖØµã½éÉÜÁ˸÷ÖÖRDDÄÇ5¸öÌØÕ÷£¬ÒÔ¼°RDDÖ®¼äµÄת»»£¬Ï£Íû´ó¼Ò¿ÉÒÔ¶ÔRDDÓиüÉîÈëµÄÁ˽⣬ÏÂÒ»ÕÂÎÒÃǽ«Òª½²×÷ÒµµÄÔËÐйý³Ì£¬¾´Çë¹Ø×¢£¡

   
6403 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí