1¡¢Ê²Ã´ÊÇRDD£¿
ÉÏÒ»Õ½²ÁËSparkÌá½»×÷ÒµµÄ¹ý³Ì£¬ÕâÒ»ÕÂÎÒÃÇÒª½²RDD¡£¼òµ¥µÄ½²£¬RDD¾ÍÊÇSparkµÄinput£¬ÖªµÀinputÊÇɶ°É£¬¾ÍÊÇÊäÈëµÄÊý¾Ý¡£
RDDµÄÈ«ÃûÊÇResilient Distributed Dataset£¬Òâ˼ÊÇÈÝ´íµÄ·Ö²¼Ê½Êý¾Ý¼¯£¬Ã¿Ò»¸öRDD¶¼»áÓÐ5¸öÌØÕ÷£º
1¡¢ÓÐÒ»¸ö·ÖƬÁÐ±í¡£¾ÍÊÇÄܱ»Çз֣¬ºÍhadoopÒ»ÑùµÄ£¬Äܹ»ÇзֵÄÊý¾Ý²ÅÄܲ¢ÐмÆËã¡£
2¡¢ÓÐÒ»¸öº¯Êý¼ÆËãÿһ¸ö·ÖƬ£¬ÕâÀïÖ¸µÄÊÇÏÂÃæ»áÌáµ½µÄcomputeº¯Êý¡£
3¡¢¶ÔÆäËûµÄRDDµÄÒÀÀµÁÐ±í£¬ÒÀÀµ»¹¾ßÌå·ÖΪ¿íÒÀÀµºÍÕÒÀÀµ£¬µ«²¢²»ÊÇËùÓеÄRDD¶¼ÓÐÒÀÀµ¡£
4¡¢¿ÉÑ¡£ºkey-valueÐ͵ÄRDDÊǸù¾Ý¹þÏ£À´·ÖÇøµÄ£¬ÀàËÆÓÚmapreduceµ±ÖеÄParitioner½Ó¿Ú£¬¿ØÖÆkey·Öµ½Äĸöreduce¡£
5¡¢¿ÉÑ¡£ºÃ¿Ò»¸ö·ÖƬµÄÓÅÏȼÆËãλÖã¨preferred locations£©£¬±ÈÈçHDFSµÄblockµÄËùÔÚλÖÃÓ¦¸ÃÊÇÓÅÏȼÆËãµÄλÖá£
¶ÔÓ¦×ÅÉÏÃæÕ⼸µã£¬ÎÒÃÇÔÚRDDÀïÃæÄÜÕÒµ½Õâ4¸ö·½·¨ºÍ1¸öÊôÐÔ£¬±ð׿±£¬ÏÂÃæÎÒÃÇ»áÂýÂýÕ¹¿ªËµÕâ5¸ö¶«¶«¡£
//Ö»¼ÆËãÒ»´Î protected def getPartitions: Array[Partition] //¶ÔÒ»¸ö·ÖƬ½øÐмÆË㣬µÃ³öÒ»¸ö¿É±éÀúµÄ½á¹û def compute(split: Partition, context: TaskContext): Iterator[T] //Ö»¼ÆËãÒ»´Î£¬¼ÆËãRDD¶Ô¸¸RDDµÄÒÀÀµ protected def getDependencies: Seq[Dependency[_]] = deps //¿ÉÑ¡µÄ£¬·ÖÇøµÄ·½·¨£¬Õë¶ÔµÚ4µã£¬ÀàËÆÓÚmapreduceµ±ÖеÄParitioner½Ó¿Ú£¬¿ØÖÆkey·Öµ½Äĸöreduce @transient val partitioner: Option[Partitioner] = None //¿ÉÑ¡µÄ£¬Ö¸¶¨ÓÅÏÈλÖã¬ÊäÈë²ÎÊýÊÇsplit·ÖƬ£¬Êä³ö½á¹ûÊÇÒ»×éÓÅÏȵĽڵãλÖà protected def getPreferredLocations(split: Partition): Seq[String] = Nil |
2¡¢¶àÖÖRDDÖ®¼äµÄת»»
ÏÂÃæÓÃÒ»¸öʵÀý½²½âһϰɣ¬¾ÍÄÃÎÒÃdz£ÓõÄÒ»¶Î´úÂëÀ´½²°É£¬È»ºó»á°ÑÎÒÃdz£ÓõÄRDD¶¼»á½²µ½¡£
val hdfsFile = sc.textFile(args(1)) val flatMapRdd = hdfsFile.flatMap(s => s.split(" ")) val filterRdd = flatMapRdd.filter(_.length == 2) val mapRdd = filterRdd.map(word => (word, 1)) val reduce = mapRdd.reduceByKey(_ + _) |
ÕâÀïÉæ¼°µ½ºÜ¶à¸öRDD£¬textFileÊÇÒ»¸öHadoopRDD¾¹ýmapºóµÄMappredRDD£¬¾¹ýflatMapÊÇÒ»¸öFlatMappedRDD£¬¾¹ýfilter·½·¨Ö®ºóÉú³ÉÁËÒ»¸öFilteredRDD£¬¾¹ýmapº¯ÊýÖ®ºó£¬±ä³ÉÒ»¸öMappedRDD£¬Í¨¹ýÒþʽת»»³É
PairRDD£¬×îºó¾¹ýreduceByKey¡£
ÎÒÃÇÊ×ÏÈ¿´textFileµÄÕâ¸ö·½·¨£¬½øÈëSparkContextÕâ¸ö·½·¨£¬ÕÒµ½Ëü¡£
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) } |
¿´ËüµÄÊäÈë²ÎÊý£¬path£¬TextInputFormat£¬LongWritable£¬Text£¬Í¬Ö¾ÃÇÁªÏ뵽ʲô£¿Ð´¹ýmapreduceµÄͯЬ¶¼Ó¦¸ÃÖªµÀ¹þ¡£
1¡¢hdfsµÄµØÖ·
2¡¢InputFormatµÄÀàÐÍ
3¡¢MapperµÄµÚÒ»¸öÀàÐÍ
4¡¢MapperµÄµÚ¶þÀàÐÍ
Õâ¾Í²»ÄÑÀí½âΪʲôÁ¢Âí¾Í¶ÔhadoopFileºóÃæ¼ÓÁËÒ»¸ömap·½·¨£¬È¡pairµÄµÚ¶þ¸ö²ÎÊýÁË£¬×îºóÔÚshellÀïÃæÎÒÃÇ¿´µ½ËüÊÇÒ»¸öMappredRDDÁË¡£
ÄÇôÏÖÔÚÈç¹û´ó¼ÒÒªÓõIJ»ÊÇtextFile£¬¶øÊÇÒ»¸ö±ðµÄhadoopÎļþÀàÐÍ£¬´ó¼Ò»á²»»áʹÓÃhadoopFileÀ´µÃµ½×Ô¼ºÒªµÃµ½µÄÀàÐÍÄØ£¬²»Òª¸æËßÎÒ²»»á¹þ£¬²»»áµÄ¸Ï½ô»ØÈ¥¸´Ï°mapreduce¡£
ÑÔ¹éÕý´«£¬Ä¬ÈϵÄdefaultMinPartitionsµÄ2̫СÁË£¬ÎÒÃÇÓõÄʱºò»¹ÊÇÉèÖôóÒ»µã°É¡£
2.1 HadoopRDD
ÎÒÃǼÌÐø×·É±ÏÂÈ¥£¬¿´¿´hadoopFile·½·¨£¬ÀïÃæÎÒÃÇ¿´µ½Ëü×öÁË3¸ö²Ù×÷¡£
1¡¢°ÑhadoopµÄÅäÖÃÎļþ±£´æµ½¹ã²¥±äÁ¿Àï¡£
2¡¢ÉèÖ÷¾¶µÄ·½·¨
3¡¢newÁËÒ»¸öHadoopRDD·µ»Ø
ºÃ£¬ÎÒÃǽÓÏÂÈ¥¿´¿´HadoopRDDÕâ¸öÀà°É£¬ÎÒÃÇÖØµã¿´¿´ËüµÄgetPartitions¡¢compute¡¢getPreferredLocations¡£
ÏÈ¿´getPartitions£¬ËüµÄºËÐÄ´úÂëÈçÏ£º
val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) }
|
Ëüµ÷ÓõÄÊÇinputFormat×Ô´øµÄgetSplits·½·¨À´¼ÆËã·ÖƬ£¬È»ºó°Ñ·ÖƬHadoopPartition°ü×°µ½µ½arrayÀïÃæ·µ»Ø¡£
ÕâÀï˳±ã˳´øÌáһϣ¬ÒòΪ1.0ÓÖ³öÀ´Ò»¸öNewHadoopRDD£¬ËüʹÓõÄÊÇmapreduceÐÂapiµÄinputformat£¬getSplits¾Í²»ÒªÓÐminPartitionsÁË£¬±ðµÄÂß¼¶¼ÊÇÒ»ÑùµÄ£¬Ö»ÊÇʹÓõÄÀàÓеãÇø±ð¡£
ÎÒÃǽÓÏÂÀ´¿´compute·½·¨£¬ËüµÄÊäÈëÖµÊÇÒ»¸öPartition£¬·µ»ØÊÇÒ»¸öIterator[(K,
V)]ÀàÐ͵ÄÊý¾Ý£¬ÕâÀïÃæÎÒÃÇÖ»ÐèÒª¹Ø×¢2µã¼´¿É¡£
1¡¢°ÑPartitionת³ÉHadoopPartition£¬È»ºóͨ¹ýInputSplit´´½¨Ò»¸öRecordReader
2¡¢ÖØÐ´IteratorµÄgetNext·½·¨£¬Í¨¹ý´´½¨µÄreaderµ÷ÓÃnext·½·¨¶ÁÈ¡ÏÂÒ»¸öÖµ¡£
// ת»»³ÉHadoopPartition val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) var reader: RecordReader[K, V] = null val jobConf = getJobConf() val inputFormat = getInputFormat(jobConf) context.stageId, theSplit.index, context.attemptId.toInt, jobConf) // ͨ¹ýInputformµÄgetRecordReaderÀ´´´½¨Õâ¸öInputSpitµÄReader reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// µ÷ÓÃReaderµÄnext·½·¨
val key: K = reader.createKey()
val value: V = reader.createValue()
override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
(key, value)
} |
´ÓÕâÀïÎÒÃÇ¿ÉÒÔ¿´µÃ³öÀ´compute·½·¨ÊÇͨ¹ý·ÖƬÀ´»ñµÃIterator½Ó¿Ú£¬ÒÔ±éÀú·ÖƬµÄÊý¾Ý¡£
getPreferredLocations·½·¨¾Í¸ü¼òµ¥ÁË£¬Ö±½Óµ÷ÓÃInputSplitµÄgetLocations·½·¨»ñµÃËùÔÚµÄλÖá£
2.2 ÒÀÀµ
ÏÂÃæÎÒÃÇ¿´RDDÀïÃæµÄmap·½·¨
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) |
Ö±½ÓnewÁËÒ»¸öMappedRDD£¬»¹°ÑÄäÃûº¯Êýf´¦ÀíÁËÔÙ´«½øÈ¥£¬ÎÒÃǼÌÐø×·É±µ½MappedRDD¡£
private[spark] class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U) extends RDD[U](prev) { override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f) } |
MappedRDD°ÑgetPartitionsºÍcompute¸øÖØÐ´ÁË£¬¶øÇÒ¶¼Óõ½ÁËfirstParent[T]£¬Õâ¸öfirstParentÊǺÎÐëÈËÒ²£¿ÎÒÃÇ¿ÉÒÔÏȵã»÷½øÈëRDD[U](prev)Õâ¸ö¹¹Ô캯ÊýÀïÃæÈ¥¡£
def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) |
¾ÍÕâÑùÄã»á·¢ÏÖËü°ÑRDD¸´ÖƸøÁËdeps£¬HadoopRDD³ÉÁËMappedRDDµÄ¸¸ÒÀÀµÁË£¬Õâ¸öOneToOneDependencyÊÇÒ»¸öÕÒÀÀµ£¬×ÓRDDÖ±½ÓÒÀÀµÓÚ¸¸RDD£¬¼ÌÐø¿´firstParent¡£
protected[spark] def firstParent[U: ClassTag] = { dependencies.head.rdd.asInstanceOf[RDD[U]] } |
ÓÉ´ËÎÒÃÇ¿ÉÒԵóöÁ½¸ö½áÂÛ£º
1¡¢getPartitionsÖ±½ÓÑØÓÃÁ˸¸RDDµÄ·ÖƬÐÅÏ¢
2¡¢computeº¯ÊýÊÇÔÚ¸¸RDD±éÀúÿһÐÐÊý¾ÝʱÌ×Ò»¸öÄäÃûº¯Êýf½øÐд¦Àí
ºÃ°É£¬ÏÖÔÚÎÒÃÇ¿ÉÒÔÀí½âcomputeº¯ÊýÕæÕýÊÇÔÚ¸ÉÂïµÄÁË
ËüµÄÁ½¸öÏÔÖø×÷Óãº
1¡¢ÔÚûÓÐÒÀÀµµÄÌõ¼þÏ£¬¸ù¾Ý·ÖƬµÄÐÅÏ¢Éú³É±éÀúÊý¾ÝµÄIterable½Ó¿Ú
2¡¢ÔÚÓÐǰÖÃÒÀÀµµÄÌõ¼þÏ£¬ÔÚ¸¸RDDµÄIterable½Ó¿ÚÉϸø±éÀúÿ¸öÔªËØµÄʱºòÔÙÌ×ÉÏÒ»¸ö·½·¨
ÎÒÃÇ¿´¿´µã»÷½øÈëmap(f)µÄ·½·¨½øÈ¥¿´Ò»ÏÂ
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] { def hasNext = self.hasNext def next() = f(self.next()) } |
¿´»ÆÉ«µÄλÖ㬿´ËüµÄnextº¯Êý£¬²»µÃ²»Ëµ£¬Ð´µÃÕæµÄºÜÃ
ÎÒÃǽÓ×Å¿´RDDµÄflatMap·½·¨£¬Äã»á·¢ÏÖËüºÍmapº¯Êý¼¸ºõÃ»Ê²Ã´Çø±ð£¬Ö»ÊÇRDD±ä³ÉÁËFlatMappedRDD£¬µ«ÊÇflatMapºÍmapµÄЧ¹û»¹ÊDzî±ðͦ´óµÄ¡£
±ÈÈç((1,2),(3,4)), Èç¹ûÊǵ÷ÓÃÁËflatMapº¯Êý£¬ÎÒÃÇ·ÃÎʵ½µÄ¾ÍÊÇ(1,2,3,4)4¸öÔªËØ£»Èç¹ûÊÇmapµÄ»°£¬ÎÒÃÇ·ÃÎʵ½µÄ¾ÍÊÇ(1,2),(3,4)Á½¸öÔªËØ¡£
ÓÐÐËȤµÄ¿ÉÒÔÈ¥¿´¿´FlatMappedRDDºÍFilteredRDDÕâÀï¾Í²»½²ÁË£¬ºÍMappedRDDÀàËÆ¡£
2.3 reduceByKey
Ç°ÃæµÄRDDת»»¶¼¼òµ¥£¬¿ÉÊǵ½ÁËreduceByKey¿É¾Í²»¼òµ¥ÁËŶ£¬ÒòΪÕâÀïÓÐÒ»¸öͬÏàͬkeyµÄÄÚÈݾۺϵÄÒ»¸ö¹ý³Ì£¬ËùÒÔËüÊÇ×ÔÓµÄÄÇÒ»Àà¡£
ÄÇreduceByKeyÕâ¸ö·½·¨ÔÚÄÄÀïÄØ£¬ËüÔÚPairRDDFunctionsÀïÃæ£¬ÕâÊǸöÒþʽת»»£¬ËùÒԱȽÏÒþ±ÎŶ£¬ÄãÔÚRDDÀïÃæÊÇÕÒ²»µ½µÄ¡£
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } |
Ëüµ÷ÓõÄÊÇcombineByKey·½·¨£¬¹ý³Ì¹ý³ÌÂù¸´Ôӵģ¬ÕÛµþÆðÀ´£¬Ï²»¶¿´µÄÈË¿´¿´°É¡£
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = {
val aggregator = new Aggregator[K, V, C](createCombiner,
mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
// Ò»°ãµÄRDDµÄpartitionerÊÇNone£¬Õâ¸öÌõ¼þ²»³ÉÁ¢£¬¼´Ê¹³ÉÁ¢Ö»ÐèÒª¶ÔÕâ¸öÊý¾Ý×öÒ»´Î°´keyºÏ²¢valueµÄ²Ù×÷¼´¿É
self.mapPartitionsWithContext((context, iter)
=> {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter,
context))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
// ĬÈÏÊÇ×ßµÄÕâ¸ö·½·¨£¬ÐèÒªmap¶ËµÄcombinber.
val combined = self.mapPartitionsWithContext((context,
iter) => {
aggregator.combineValuesByKey(iter, context)
}, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined,
partitioner)
.setSerializer(serializer)
partitioned.mapPartitionsWithContext((context,
iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter,
context))
}, preservesPartitioning = true)
} else {
// ²»ÐèÒªmap¶ËµÄcombine£¬Ö±½Ó¾ÍÀ´shuffle
val values = new ShuffledRDD[K, V, (K, V)](self,
partitioner).setSerializer(serializer)
values.mapPartitionsWithContext((context, iter)
=> {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter,
context))
}, preservesPartitioning = true)
}
} |
°´ÕÕÒ»¸ö±È½Ï±ê×¼µÄÁ÷³ÌÀ´¿´µÄ»°£¬Ó¦¸ÃÊÇ×ßµÄÖмäµÄÕâÌõ·¾¶£¬Ëü¸ÉÁËÈý¼þÊ£º
1¡¢¸øÃ¿¸ö·ÖƬµÄÊý¾ÝÔÚÍâÃæÌ×Ò»¸öcombineValuesByKey·½·¨µÄMapPartitionsRDD¡£
2¡¢ÓÃMapPartitionsRDDÀ´newÁËÒ»¸öShuffledRDD³öÀ´¡£
3¡¢¶ÔShuffledRDD×öÒ»´ÎcombineCombinersByKey¡£
ÏÂÃæÎÒÃÇÏÈ¿´MapPartitionsRDD£¬ÎҰѺͱðµÄRDDÓбðµÄÁ½ÐиøÄóöÀ´ÁË£¬ºÜÃ÷ÏÔµÄÇø±ð£¬f·½·¨ÊÇÌ×ÔÚiteratorµÄÍâ±ß£¬ÕâÑù²ÅÄܶÔiteratorµÄËùÓÐÊý¾Ý×öÒ»¸öºÏ²¢¡£
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def compute(split: Partition, context: TaskContext) = f(context, split.index, firstParent[T].iterator(split, context)) } |
½ÓÏÂÀ´ÎÒÃÇ¿´AggregatorµÄcombineValuesByKeyµÄ·½·¨°É¡£
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { // ÊÇ·ñʹÓÃÍⲿÅÅÐò£¬ÊÇÓɲÎÊýspark.shuffle.spill£¬Ä¬ÈÏÊÇtrue if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } // ÓÃmapÀ´È¥ÖØ£¬ÓÃupdate·½·¨À´¸üÐÂÖµ£¬Èç¹ûûֵµÄʱºò£¬·µ»ØÖµ£¬Èç¹ûÓÐÖµµÄʱºò£¬
ͨ¹ýmergeValue·½·¨À´ºÏ²¢ // mergeValue·½·¨¾ÍÊÇÎÒÃÇÔÚreduceByKeyÀïÃæÐ´µÄÄǸöÄäÃûº¯Êý£¬ÔÚÕâÀï¾ÍÊÇ£¨_ + _£© while (iter.hasNext) { kv = iter.next() combiners.changeValue(kv._1, update) } combiners.iterator } else { // ÓÃÁËÒ»¸öÍⲿÅÅÐòµÄmapÀ´È¥ÖØ£¬¾Í²»Í£µÄÍùÀïÃæ²åÈëÖµ¼´¿É£¬»ù±¾ÔÀíºÍÉÏÃæµÄ²î²»¶à£¬
Çø±ðÔÚÓÚÐèÒªÍⲿÅÅÐò val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { val (k, v) = iter.next() combiners.insert(k, v) } combiners.iterator } |
Õâ¸ö¾ÍÊÇÒ»¸öºÜµäÐ͵İ´ÕÕkeyÀ´×öºÏ²¢µÄ·½·¨ÁË£¬ÎÒÃǼÌÐø¿´ShuffledRDD°É¡£
ShuffledRDDºÍ֮ǰµÄRDDºÜÃ÷ÏÔµÄÌØÕ÷ÊÇ
1¡¢ËüµÄÒÀÀµ´«ÁËÒ»¸öNil£¨¿ÕÁÐ±í£©½øÈ¥£¬±íʾËüûÓÐÒÀÀµ¡£
2¡¢ËüµÄcompute¼ÆË㷽ʽ±È½ÏÌØ±ð£¬Õâ¸öÔÚÖ®ºóµÄÎÄÕÂ˵£¬¹ý³Ì±È½Ï¸´ÔÓ¡£
3¡¢ËüµÄ·ÖƬĬÈÏÊDzÉÓÃHashPartitioner£¬ÊýÁ¿ºÍÇ°ÃæµÄRDDµÄ·ÖƬÊýÁ¿Ò»Ñù£¬Ò²¿ÉÒÔ²»Ò»Ñù£¬ÎÒÃÇ¿ÉÒÔÔÚreduceByKeyµÄʱºò¶à´«Ò»¸ö·ÖƬÊýÁ¿¼´¿É¡£
ÔÚnewÍêShuffledRDDÖ®ºóÓÖÀ´ÁËÒ»±émapPartitionsWithContext£¬²»¹ýµ÷ÓõÄÄäÃûº¯Êý±ä³ÉÁËcombineCombinersByKey¡£
combineCombinersByKeyºÍcombineValuesByKeyµÄÂß¼»ù±¾Ïàͬ£¬Ö»ÊÇÊäÈëÊä³öµÄÀàÐÍÓÐÇø±ð¡£combineCombinersByKeyÖ»ÊÇ×öµ¥´¿µÄºÏ²¢£¬²»»á¶ÔÊäÈëÊä³öµÄÀàÐͽøÐиı䣬combineValuesByKey»á°Ñiter[K,
V]µÄVÖµ±ä³Éiter[K, C]¡£
case class Aggregator[K, V, C] ( ¡¡¡¡createCombiner: V => C, ¡¡¡¡mergeValue: (C, V) => C, ¡¡¡¡mergeCombiners: (C, C) => C) ¡¡¡¡...... } |
Õâ¸ö·½·¨»á¸ù¾ÝÎÒÃÇ´«½øÈ¥µÄÄäÃû·½·¨µÄ²ÎÊýµÄÀàÐÍ×öÒ»¸ö×Ô¶¯×ª»»¡£
µ½ÕâÀ×÷Òµ¶¼Ã»ÓÐÕæÕýÖ´ÐУ¬Ö»Êǽ«RDD¸÷ÖÖǶÌ×£¬ÎÒÃÇͨ¹ýRDDµÄidºÍÀàÐ͵ı仯¹Û²âµ½ÕâÒ»µã£¬RDD[1]->RDD[2]->RDD[3]......

3¡¢ÆäËüRDD
ƽ³£ÎÒÃdzýÁË´ÓhdfsÉÏÃæÈ¡Êý¾ÝÖ®ºó£¬ÎÒÃÇ»¹¿ÉÄÜ´ÓÊý¾Ý¿âÀïÃæÈ¡Êý¾Ý£¬ÄÇÔõô°ìÄØ£¿Ã»¹ØÏµ£¬ÓиöJdbcRDD£¡
val rdd = new JdbcRDD( sc, () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 100, 3, (r: ResultSet) => { r.getInt(1) } ).cache() |
ǰ¼¸¸ö²ÎÊý´ó¼Ò¶¼¶®£¬ÎÒÃÇÖØµã˵һϺóÃæ1, 100, 3ÊÇÕ¦»ØÊ£¿
ÔÚÕâ¸öJdbcRDDÀïÃæËüĬÈÏÎÒÃÇÊǻᰴÕÕÒ»¸ölongÀàÐ͵Ä×ֶζÔÊý¾Ý½øÐÐÇз֣¬£¨1,100£©·Ö±ðÊÇ×îСֵºÍ×î´óÖµ£¬3ÊÇ·ÖÆ¬µÄÊýÁ¿¡£
±ÈÈçÎÒÃÇÒªÒ»´Î²éIDΪ1-1000,000µÄµÄÓû§£¬·Ö³É10¸ö·ÖƬ£¬ÎÒÃǾÍÌ1, 1000,000£¬
10£©¼´¿É£¬ÔÚsqlÓï¾äÀïÃæ»¹±ØÐëÓÐ"? <= ID AND ID <= ?"µÄ¾äʽ£¬±ð³¢ÊÔ×Å×Ô¼ºÔì¾äŶ£¡
×îºóÊÇÔõô´¦ÀíResultSetµÄ·½·¨£¬×Ô¼º°®Ôõô´¦ÀíÔõô´¦ÀíÈ¥°É¡£²»¹ýȷʵ¾õ×ÅÓõò»·½±ãµÄ¿ÉÒÔ×Ô¼ºÖØÐ´Ò»¸öRDD¡£
С½á£º
ÕâÒ»ÕÂÖØµã½éÉÜÁ˸÷ÖÖRDDÄÇ5¸öÌØÕ÷£¬ÒÔ¼°RDDÖ®¼äµÄת»»£¬Ï£Íû´ó¼Ò¿ÉÒÔ¶ÔRDDÓиüÉîÈëµÄÁ˽⣬ÏÂÒ»ÕÂÎÒÃǽ«Òª½²×÷ÒµµÄÔËÐйý³Ì£¬¾´Çë¹Ø×¢£¡
|