Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark ÈëÃÅʵս֮×îºÃµÄʵÀý
 
  2399  次浏览      28
 2019-6-20
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄÖ÷Òª½éÉÜÈçºÎʹÓà Scala ±àд Spark Ó¦ÓóÌÐò´¦Àí´óÊý¾Ý£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£

´î½¨¿ª·¢»·¾³

°²×° Scala IDE

´î½¨ Scala ÓïÑÔ¿ª·¢»·¾³ºÜÈÝÒ×£¬Scala IDE ¹ÙÍø ÏÂÔØºÏÊʵİ汾²¢½âѹ¾Í¿ÉÒÔÍê³É°²×°£¬±¾ÎÄʹÓõİ汾ÊÇ 4.1.0

°²×° Scala ÓïÑÔ°ü

Èç¹ûÏÂÔØµÄ Scala IDE ×Ô´øµÄ Scala ÓïÑÔ°üÓë Spark 1.3.1 ʹÓÃµÄ Scala °æ±¾ (2.10.x) ²»Ò»Ö£¬ÄÇô¾ÍÐèÒªÏÂÔØºÍ±¾ÎÄËùʹÓÃµÄ Spark ËùÆ¥ÅäµÄ°æ±¾£¬ÒÔÈ·±£ÊµÏÖµÄ Scala ³ÌÐò²»»áÒòΪ°æ±¾ÎÊÌâ¶øÔËÐÐʧ°Ü

ÇëÏÂÔØ²¢°²×° Scala 2.10.5 °æ

°²×° JDK

Èç¹ûÄúµÄ»úÆ÷ÉÏûÓа²×° JDK£¬ÇëÏÂÔØ²¢°²×° 1.6 °æ±¾ÒÔÉ쵀 JDK

´´½¨²¢ÅäÖà Spark ¹¤³Ì

´ò¿ª Scala IDE£¬´´½¨Ò»¸öÃû³ÆÎª spark-exercise µÄ Scala ¹¤³Ì

ͼ 1. ´´½¨ scala ¹¤³Ì

ÔÚ¹¤³ÌĿ¼Ï´´½¨Ò»¸ö lib Îļþ¼Ð£¬²¢ÇÒ°ÑÄúµÄ Spark °²×°°üÏ嵀 spark-assembly jar °ü¿½±´µ½ lib Ŀ¼ÏÂ

ͼ 2. Spark ¿ª·¢ jar °ü

²¢ÇÒÌí¼Ó¸Ã jar °üµ½¹¤³ÌµÄ classpath ²¢ÅäÖù¤³ÌʹÓøոհ²×°µÄ Scala 2.10.5 °æ±¾.£¬¹¤³ÌĿ¼½á¹¹ÈçÏÂ

ͼ 3. Ìí¼Ó jar °üµ½ classpath

ÔËÐл·¾³½éÉÜ

ΪÁ˱ÜÃâ¶ÁÕß¶Ô±¾ÎݸÀýÔËÐл·¾³²úÉúÀ§»ó£¬±¾½Ú»á¶Ô±¾ÎÄÓõ½µÄ¼¯Èº»·¾³µÄ»ù±¾Çé¿ö×ö¸ö¼òµ¥½éÉÜ

±¾ÎÄËùÓÐʵÀýÊý¾Ý´æ´¢µÄ»·¾³ÊÇÒ»¸ö 8 ¸ö»úÆ÷µÄ Hadoop ¼¯Èº£¬Îļþϵͳ×ÜÈÝÁ¿ÊÇ 1.12T£¬NameNode ½Ð hadoop036166, ·þÎñ¶Ë¿ÚÊÇ 9000¡£¶ÁÕß¿ÉÒÔ²»¹ØÐľßÌåµÄ½Úµã·Ö²¼£¬ÒòΪÕâ¸ö²»»áÓ°Ïìµ½ÄúÔĶÁºóÃæµÄÎÄÕ¡£

±¾ÎÄÔËÐÐʵÀý³ÌÐòʹÓÃµÄ Spark ¼¯ÈºÊÇÒ»¸ö°üº¬Ëĸö½ÚµãµÄ Standalone ģʽµÄ¼¯Èº, ÆäÖаüº¬Ò»¸ö Master ½Úµã (¼àÌý¶Ë¿Ú 7077) ºÍÈý¸ö Worker ½Úµã£¬¾ßÌå·Ö²¼ÈçÏ£º

Spark Ìṩһ¸ö Web UI È¥²é¿´¼¯ÈºÐÅÏ¢²¢ÇÒ¼à¿ØÖ´Ðнá¹û£¬Ä¬ÈϵØÖ·ÊÇ:http://<spark_master_ip>:8080 £¬¶ÔÓÚ¸ÃʵÀýÌá½»ºóÎÒÃÇÒ²¿ÉÒÔµ½ web Ò³ÃæÉÏÈ¥²é¿´Ö´Ðнá¹û£¬µ±È»Ò²¿ÉÒÔͨ¹ý²é¿´ÈÕ־ȥÕÒµ½Ö´Ðнá¹û¡£

ͼ 4. Spark µÄ web console

°¸Àý·ÖÎöÓë±à³ÌʵÏÖ

°¸ÀýÒ»

a. °¸ÀýÃè

ÌáÆð Word Count(´ÊƵÊýͳ¼Æ)£¬ÏàÐÅ´ó¼Ò¶¼²»Ä°Éú£¬¾ÍÊÇͳ¼ÆÒ»¸ö»òÕß¶à¸öÎļþÖе¥´Ê³öÏֵĴÎÊý¡£±¾ÎĽ«´Ë×÷Ϊһ¸öÈëÃż¶°¸Àý£¬ÓÉdzÈëÉîµÄ¿ªÆôʹÓà Scala ±àд Spark ´óÊý¾Ý´¦Àí³ÌÐòµÄ´óÃÅ

b£®°¸Àý·Ö

¶ÔÓÚ´ÊÆµÊýͳ¼Æ£¬Óà Spark ÌṩµÄËã×ÓÀ´ÊµÏÖ£¬ÎÒÃÇÊ×ÏÈÐèÒª½«Îı¾ÎļþÖеÄÿһÐÐת»¯³ÉÒ»¸ö¸öµÄµ¥´Ê, Æä´ÎÊǶÔÿһ¸ö³öÏֵĵ¥´Ê½øÐмÇÒ»´ÎÊý£¬×îºó¾ÍÊǰÑËùÓÐÏàͬµ¥´ÊµÄ¼ÆÊýÏà¼ÓµÃµ½×îÖյĽá¹û

¶ÔÓÚµÚÒ»²½ÎÒÃÇ×ÔÈ»µÄÏ뵽ʹÓà flatMap Ëã×Ó°ÑÒ»ÐÐÎı¾ split ³É¶à¸öµ¥´Ê£¬È»ºó¶ÔÓÚµÚ¶þ²½ÎÒÃÇÐèҪʹÓà map Ëã×Ӱѵ¥¸öµÄµ¥´Êת»¯³ÉÒ»¸öÓмÆÊýµÄ Key-Value ¶Ô£¬¼´ word -> (word,1). ¶ÔÓÚ×îºóÒ»²½Í³¼ÆÏàͬµ¥´ÊµÄ³öÏÖ´ÎÊý£¬ÎÒÃÇÐèҪʹÓà reduceByKey Ëã×Ó°ÑÏàͬµ¥´ÊµÄ¼ÆÊýÏà¼ÓµÃµ½×îÖÕ½á¹û¡£

c. ±à³Ìʵ

Çåµ¥ 1.SparkWordCount ÀàÔ´Âë

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCount {
def FILE_NAME:String = "word_count_results_";
def main(args:Array[String]) {
if (args.length < 1) {
println("Usage:SparkWordCount FileName");
System.exit(1);
}
val conf = new SparkConf().setAppName("Spark
Exercise: Spark Version Word Count Program");
val sc = new SparkContext(conf);
val textFile = sc.textFile(args(0));
val wordCounts = textFile.flatMap(line => line.
split(" ")).map(
word => (word, 1)).reduceByKey((a, b) => a + b)
//print the results,for debug use.
//println("Word Count program running results:");
//wordCounts.collect().foreach(e => {
//val (k,v) = e
//println(k+"="+v)
//});
wordCounts.saveAsTextFile(FILE_NAME+System.
currentTimeMillis());
println("Word Count program running results
are successfully saved.");
}
}

d. Ìá½»µ½¼¯ÈºÖ´

±¾ÊµÀýÖÐ, ÎÒÃǽ«Í³¼Æ HDFS ÎļþϵͳÖÐ/user/fams Ŀ¼ÏÂËùÓÐ txt ÎļþÖÐ´ÊÆµÊý¡£ÆäÖÐ spark-exercise.jar ÊÇ Spark ¹¤³Ì´ò°üºóµÄ jar °ü£¬Õâ¸ö jar °üÖ´ÐÐʱ»á±»ÉÏ´«µ½Ä¿±ê·þÎñÆ÷µÄ/home/fams Ŀ¼Ï¡£ÔËÐдËʵÀýµÄ¾ßÌåÃüÁîÈçÏÂ

Çåµ¥ 2.SparkWordCount ÀàÖ´ÐÐÃüÁî

./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt

e. ¼à¿ØÖ´ÐÐ״̬

¸ÃʵÀý°Ñ×îÖյĽá¹û´æ´¢ÔÚÁË HDFS ÉÏ£¬ÄÇôÈç¹û³ÌÐòÔËÐÐÕý³£ÎÒÃÇ¿ÉÒÔÔÚ HDFS ÉÏÕÒµ½Éú³ÉµÄÎļþÐÅ

ͼ 5. °¸ÀýÒ»Êä³ö½á¹û

´ò¿ª Spark ¼¯ÈºµÄ Web UI, ¿ÉÒÔ¿´µ½¸Õ²ÅÌá½»µÄ job µÄÖ´Ðнá¹û

ͼ 6. °¸ÀýÒ»Íê³É״̬

Èç¹û³ÌÐò»¹Ã»ÔËÐÐÍê³É£¬ÄÇôÎÒÃÇ¿ÉÒÔÔÚ Running Applications ÁбíÀïÕÒµ½Ëü

°¸Àý¶þ

a. °¸ÀýÃè

¸Ã°¸ÀýÖУ¬ÎÒÃǽ«¼ÙÉèÎÒÃÇÐèҪͳ¼ÆÒ»¸ö 1000 ÍòÈ˿ڵÄËùÓÐÈËµÄÆ½¾ùÄêÁ䣬µ±È»Èç¹ûÄúÏë²âÊÔ Spark ¶ÔÓÚ´óÊý¾ÝµÄ´¦ÀíÄÜÁ¦£¬Äú¿ÉÒÔ°ÑÈË¿ÚÊý·ÅµÄ¸ü´ó£¬±ÈÈç 1 ÒÚÈ˿ڣ¬µ±È»Õâ¸öÈ¡¾öÓÚ²âÊÔËùÓü¯ÈºµÄ´æ´¢ÈÝÁ¿¡£¼ÙÉèÕâЩÄêÁäÐÅÏ¢¶¼´æ´¢ÔÚÒ»¸öÎļþÀ²¢ÇÒ¸ÃÎļþµÄ¸ñʽÈçÏ£¬µÚÒ»ÁÐÊÇ ID£¬µÚ¶þÁÐÊÇÄêÁä

ͼ 7. °¸Àý¶þ²âÊÔÊý¾Ý¸ñʽԤÀÀ

ÏÖÔÚÎÒÃÇÐèÒªÓà Scala дһ¸öÉú³É 1000 ÍòÈË¿ÚÄêÁäÊý¾ÝµÄÎļþ£¬Ô´³ÌÐòÈçÏÂ

Çåµ¥ 3. ÄêÁäÐÅÏ¢ÎļþÉú³ÉÀàÔ´Âë

import java.io.FileWriter
import java.io.File
import scala.util.Random
object SampleDataFileGenerator {

def main(args:Array[String]) {
val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false)
val rand = new Random()
for ( i <- 1 to 10000000) {
writer.write( i + " " + rand.nextInt(100))
writer.write(System.getProperty("line.separator"))
}
writer.flush()
writer.close()
}
}

b. °¸Àý·ÖÎö

Òª¼ÆËãÆ½¾ùÄêÁ䣬ÄÇôÊ×ÏÈÐèÒª¶ÔÔ´Îļþ¶ÔÓ¦µÄ RDD ½øÐд¦Àí£¬Ò²¾ÍÊǽ«Ëüת»¯³ÉÒ»¸öÖ»°üº¬ÄêÁäÐÅÏ¢µÄ RDD£¬Æä´ÎÊǼÆËãÔªËØ¸öÊý¼´Îª×ÜÈËÊý£¬È»ºóÊǰÑËùÓÐÄêÁäÊý¼ÓÆðÀ´£¬×îºóƽ¾ùÄêÁä=×ÜÄêÁä/ÈËÊý

¶ÔÓÚµÚÒ»²½ÎÒÃÇÐèҪʹÓà map Ëã×Ó°ÑÔ´Îļþ¶ÔÓ¦µÄ RDD Ó³Éä³ÉÒ»¸öеÄÖ»°üº¬ÄêÁäÊý¾ÝµÄ RDD£¬ºÜÏÔÈ»ÐèÒª¶ÔÔÚ map Ëã×ӵĴ«È뺯ÊýÖÐʹÓà split ·½·¨£¬µÃµ½Êý×éºóֻȡµÚ¶þ¸öÔªËØ¼´ÎªÄêÁäÐÅÏ¢£»µÚ¶þ²½¼ÆËãÊý¾ÝÔªËØ×ÜÊýÐèÒª¶ÔÓÚµÚÒ»²½Ó³ÉäµÄ½á¹û RDD ʹÓà count Ëã×Ó£»µÚÈý²½ÔòÊÇʹÓà reduce Ëã×Ó¶ÔÖ»°üº¬ÄêÁäÐÅÏ¢µÄ RDD µÄËùÓÐÔªËØÓüӷ¨ÇóºÍ£»×îºóʹÓóý·¨¼ÆËãÆ½¾ùÄêÁä¼´¿É

ÓÉÓÚ±¾ÀýÊä³ö½á¹ûºÜ¼òµ¥£¬ËùÒÔÖ»´òÓ¡ÔÚ¿ØÖÆÌ¨¼´¿É

c. ±à³ÌʵÏÖ

Çåµ¥ 4.AvgAgeCalculator ÀàÔ´Âë

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:AvgAgeCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val count = dataFile.count()
val ageData = dataFile.map(line => line.split(" ")(1))
val totalAge = ageData.map(age => Integer.parseInt(
String.valueOf(age))).collect().reduce((a,b) => a+b)
println("Total Age:" + totalAge + ";Number of People:" + count )
val avgAge : Double = totalAge.toDouble / count.toDouble
println("Average Age is " + avgAge)
}
}

d. Ìá½»µ½¼¯ÈºÖ´ÐÐ

ÒªÖ´Ðб¾ÊµÀýµÄ³ÌÐò£¬ÐèÒª½«¸Õ¸ÕÉú³ÉµÄÄêÁäÐÅÏ¢ÎļþÉÏ´«µ½ HDFS ÉÏ£¬¼ÙÉèÄú¸Õ²ÅÒѾ­ÔÚÄ¿±ê»úÆ÷ÉÏÖ´ÐÐÉú³ÉÄêÁäÐÅÏ¢ÎļþµÄ Scala À࣬²¢ÇÒÎļþ±»Éú³Éµ½ÁË/home/fams Ŀ¼ÏÂ

ÄÇôÄúÐèÒªÔËÐÐһϠHDFS ÃüÁî°ÑÎļþ¿½±´µ½ HDFS µÄ/user/fams Ŀ¼

Çåµ¥ 5. ÄêÁäÐÅÏ¢Îļþ¿½±´µ½ HDFS Ŀ¼µÄÃüÁî

hdfs dfs ¨CcopyFromLocal /home/fams /user/fams

Çåµ¥ 6.AvgAgeCalculator ÀàµÄÖ´ÐÐÃüÁî

e. ¼à¿ØÖ´ÐÐ״̬


./spark-submit \
--class com.ibm.spark.exercise.basic.AvgAgeCalculator \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/
inputfiles/sample_age_data.txt

ÔÚ¿ØÖÆÌ¨Äú¿ÉÒÔ¿´µ½ÈçÏÂËùʾÐÅÏ¢

ͼ 8. °¸Àý¶þÊä³ö½á¹û

ÎÒÃÇÒ²¿ÉÒÔµ½ Spark Web Console È¥²é¿´ Job µÄÖ´ÐÐ×´

ͼ 9. °¸Àý¶þÍê³É״̬

°¸ÀýÈý

a. °¸ÀýÃèÊö

±¾°¸Àý¼ÙÉèÎÒÃÇÐèÒª¶Ôij¸öÊ¡µÄÈË¿Ú (1 ÒÚ) ÐÔ±ð»¹ÓÐÉí¸ß½øÐÐͳ¼Æ£¬ÐèÒª¼ÆËã³öÄÐÅ®ÈËÊý£¬ÄÐÐÔÖеÄ×î¸ßºÍ×îµÍÉí¸ß£¬ÒÔ¼°Å®ÐÔÖеÄ×î¸ßºÍ×îµÍÉí¸ß¡£±¾°¸ÀýÖÐÓõ½µÄÔ´ÎļþÓÐÒÔϸñʽ, ÈýÁзֱðÊÇ ID£¬ÐÔ±ð£¬Éí¸ß (cm)

ͼ 10. °¸ÀýÈý²âÊÔÊý¾Ý¸ñʽԤÀÀ

ÎÒÃǽ«ÓÃÒÔÏ Scala ³ÌÐòÉú³ÉÕâ¸öÎļþ£¬Ô´ÂëÈçÏÂ

Çåµ¥ 7. ÈË¿ÚÐÅÏ¢Éú³ÉÀàÔ´Âë

import java.io.FileWriter
import java.io.File
import scala.util.Random
object PeopleInfoFileGenerator {
def main(args:Array[String]) {
val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false)
val rand = new Random()
for ( i <- 1 to 100000000) {
var height = rand.nextInt(220)
if (height < 50) {
height = height + 50
}
var gender = getRandomGender
if (height < 100 && gender == "M")
height = height + 100
if (height < 100 && gender == "F")
height = height + 50
writer.write( i + " " + getRandomGender + " " + height)
writer.write(System.getProperty("line.separator"))
}
writer.flush()
writer.close()
println("People Information File generated successfully.")
}

def getRandomGender() :String = {
val rand = new Random()
val randNum = rand.nextInt(2) + 1
if (randNum % 2 == 0) {
"M"
} else {
"F"
}
}
}

b. °¸Àý·Ö

¶ÔÓÚÕâ¸ö°¸Àý£¬ÎÒÃÇÒª·Ö±ðͳ¼ÆÄÐÅ®µÄÐÅÏ¢£¬ÄÇôºÜ×ÔÈ»µÄÏëµ½Ê×ÏÈÐèÒª¶ÔÓÚÄÐÅ®ÐÅÏ¢´ÓÔ´ÎļþµÄ¶ÔÓ¦µÄ RDD ÖнøÐзÖÀ룬ÕâÑù»á²úÉúÁ½¸öÐ嵀 RDD£¬·Ö±ð°üº¬ÄÐÅ®ÐÅÏ¢£»Æä´ÎÊÇ·Ö±ð¶ÔÄÐÅ®ÐÅÏ¢¶ÔÓ¦µÄ RDD µÄÊý¾Ý½øÐнøÒ»²½Ó³É䣬ʹÆäÖ»°üº¬Éí¸ßÊý¾Ý£¬ÕâÑùÎÒÃÇÓֵõ½Á½¸ö RDD£¬·Ö±ð¶ÔÓ¦ÄÐÐÔÉí¸ßºÍÅ®ÐÔÉí¸ß£»×îºóÐèÒª¶ÔÕâÁ½¸ö RDD ½øÐÐÅÅÐò£¬½ø¶øµÃµ½×î¸ßºÍ×îµÍµÄÄÐÐÔ»òÅ®ÐÔÉí¸ß

¶ÔÓÚµÚÒ»²½£¬Ò²¾ÍÊÇ·ÖÀëÄÐÅ®ÐÅÏ¢£¬ÎÒÃÇÐèҪʹÓà filter Ëã×Ó£¬¹ýÂËÌõ¼þ¾ÍÊǰüº¬¡±M¡± µÄÐÐÊÇÄÐÐÔ£¬°üº¬¡±F¡±µÄÐÐÊÇÅ®ÐÔ£»µÚ¶þ²½ÎÒÃÇÐèҪʹÓà map Ëã×Ó°ÑÄÐÅ®¸÷×ÔµÄÉí¸ßÊý¾Ý´Ó RDD ÖзÖÀë³öÀ´£»µÚÈý²½ÎÒÃÇÐèҪʹÓà sortBy Ëã×Ó¶ÔÄÐÅ®Éí¸ßÊý¾Ý½øÐÐÅÅÐò

c. ±à³ÌʵÏÖ

ÔÚʵÏÖÉÏ£¬ÓÐÒ»¸öÐèҪעÒâµÄµãÊÇÔÚ RDD ת»¯µÄ¹ý³ÌÖÐÐèÒª°ÑÉí¸ßÊý¾Ýת»»³ÉÕûÊý£¬·ñÔò sortBy Ëã×Ó»á°ÑËüÊÓΪ×Ö·û´®£¬ÄÇôÅÅÐò½á¹û¾Í»áÊܵ½Ó°Ï죬ÀýÈç Éí¸ßÊý¾ÝÈç¹ûÊÇ£º123,110,84,72,100£¬ÄÇôÉýÐòÅÅÐò½á¹û½«»áÊÇ 100,110,123,72,84£¬ÏÔÈ»ÕâÊDz»¶ÔµÄ

Çåµ¥ 8.PeopleInfoCalculator ÀàÔ´Âë

object PeopleInfoCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:PeopleInfoCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val maleData = dataFile.filter(line => line.contains("M")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
val femaleData = dataFile.filter(line => line.contains("F")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
//for debug use
//maleData.collect().foreach { x => println(x)}
//femaleData.collect().foreach { x => println(x)}
val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
//for debug use
//maleHeightData.collect().foreach { x => println(x)}
//femaleHeightData.collect().foreach { x => println(x)}
val lowestMale = maleHeightData.sortBy(x => x,true).first()
val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
//for debug use
//maleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
//femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
val highestMale = maleHeightData.sortBy(x => x, false).first()
val highestFemale = femaleHeightData.sortBy(x => x, false).first()
println("Number of Male Peole:" + maleData.count())
println("Number of Female Peole:" + femaleData.count())
println("Lowest Male:" + lowestMale)
println("Lowest Female:" + lowestFemale)
println("Highest Male:" + highestMale)
println("Highest Female:" + highestFemale)
}
}

d. Ìá½»µ½¼¯ÈºÖ´

ÔÚÌá½»¸Ã³ÌÐòµ½¼¯ÈºÖ´ÐÐ֮ǰ£¬ÎÒÃÇÐèÒª½«¸Õ²ÅÉú³ÉµÄÈË¿ÚÐÅÏ¢Êý¾ÝÎļþÉÏ´«µ½ HDFS ¼¯Èº£¬¾ßÌåÃüÁî¿ÉÒÔ²ÎÕÕÉÏÎÄ

Çåµ¥ 9.PeopleInfoCalculator ÀàµÄÖ´ÐÐÃüÁî


./spark-submit \
--class com.ibm.spark.exercise.basic.PeopleInfoCalculator
\
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 3g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles
/sample_people_info.txt

e. ¼à¿ØÖ´ÐÐ״̬

¶ÔÓÚ¸ÃʵÀý£¬Èç³ÌÐòÖдòÓ¡µÄÒ»Ñù£¬»áÔÚ¿ØÖÆÌ¨ÏÔʾÈçÏÂÐÅÏ¢

ͼ 11. °¸ÀýÈýÊä³ö½á¹û

ÔÚ Spark Web Console Àï¿ÉÒÔ¿´µ½¾ßÌåµÄÖ´ÐÐ״̬ÐÅ

ͼ 12. °¸ÀýÈýÍê³É״̬

°¸ÀýËÄ

a. °¸ÀýÃèÊö

¸Ã°¸ÀýÖÐÎÒÃǼÙÉèijËÑË÷ÒýÇæ¹«Ë¾ÒªÍ³¼Æ¹ýÈ¥Ò»ÄêËÑË÷ƵÂÊ×î¸ßµÄ K ¸ö¿Æ¼¼¹Ø¼ü´Ê»ò´Ê×飬ΪÁ˼ò»¯ÎÊÌ⣬ÎÒÃǼÙÉè¹Ø¼ü´Ê×éÒѾ­±»ÕûÀíµ½Ò»¸ö»òÕß¶à¸öÎı¾ÎļþÖУ¬²¢ÇÒÎĵµ¾ßÓÐÒÔϸñʽ

ͼ 13. °¸ÀýËIJâÊÔÊý¾Ý¸ñʽԤÀÀ

ÎÒÃÇ¿ÉÒÔ¿´µ½Ò»¸ö¹Ø¼ü´Ê»òÕß´Ê×é¿ÉÄܳöÏÖ¶à´Î£¬²¢ÇÒ´óСд¸ñʽ¿ÉÄܲ»Ò»ÖÂ

b. °¸Àý·ÖÎö

Òª½â¾öÕâ¸öÎÊÌ⣬Ê×ÏÈÎÒÃÇÐèÒª¶Ôÿ¸ö¹Ø¼ü´Ê³öÏֵĴÎÊý½øÐмÆË㣬ÔÚÕâ¸ö¹ý³ÌÖÐÐèҪʶ±ð²»Í¬´óСдµÄÏàͬµ¥´Ê»òÕß´Ê×飬È硱Spark¡±ºÍ¡°spark¡± ÐèÒª±»È϶¨ÎªÒ»¸öµ¥´Ê¡£¶ÔÓÚ³öÏÖ´ÎÊýͳ¼ÆµÄ¹ý³ÌºÍ word count °¸ÀýÀàËÆ£»Æä´ÎÎÒÃÇÐèÒª¶Ô¹Ø¼ü´Ê»òÕß´Ê×é°´ÕÕ³öÏֵĴÎÊý½øÐнµÐòÅÅÐò£¬ÔÚÅÅÐòǰÐèÒª°Ñ RDD Êý¾ÝÔªËØ´Ó (k,v) ת»¯³É (v,k)£»×îºóÈ¡ÅÅÔÚ×îÇ°ÃæµÄ K ¸öµ¥´Ê»òÕß´Ê×é

¶ÔÓÚµÚÒ»²½£¬ÎÒÃÇÐèҪʹÓà map Ëã×Ó¶ÔÔ´Êý¾Ý¶ÔÓ¦µÄ RDD Êý¾Ý½øÐÐȫСдת»¯²¢ÇÒ¸ø´Ê×é¼ÇÒ»´ÎÊý£¬È»ºóµ÷Óà reduceByKey Ëã×Ó¼ÆËãÏàͬ´Ê×éµÄ³öÏÖ´ÎÊý£»µÚ¶þ²½ÎÒÃÇÐèÒª¶ÔµÚÒ»²½²úÉúµÄ RDD µÄÊý¾ÝÔªËØÓà sortByKey Ëã×Ó½øÐнµÐòÅÅÐò£»µÚÈý²½ÔÙ¶ÔÅźÃÐòµÄ RDD Êý¾ÝʹÓà take Ëã×Ó»ñȡǰ K ¸öÊý¾ÝÔªËØ

c. ±à³Ìʵ

Çåµ¥ 10.TopKSearchKeyWords ÀàÔ´Âë

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TopKSearchKeyWords {
def main(args:Array[String]){
if (args.length < 2) {
println("Usage:TopKSearchKeyWords KeyWordsFile K");
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
val sc = new SparkContext(conf)
val srcData = sc.textFile(args(0))
val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
//for debug use
//countedData.foreach(x => println(x))
val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
topKData.foreach(println)
}
}

d. Ìá½»µ½¼¯ÈºÖ´

Çåµ¥ 11.TopKSearchKeyWords ÀàµÄÖ´ÐÐÃüÁî


./spark-submit \
--class com.ibm.spark.exercise.basic.TopKSearchKeyWords \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles
/search_key_words.txt

e. ¼à¿ØÖ´ÐÐ״̬

Èç¹û³ÌÐò³É¹¦Ö´ÐУ¬ÎÒÃǽ«ÔÚ¿ØÖÆÌ¨¿´µ½ÒÔÏÂÐÅÏ¢¡£µ±È»¶ÁÕßÒ²¿ÉÒÔ·ÂÕÕ°¸Àý¶þºÍ°¸ÀýÈýÄÇÑù£¬×Ô¼º³¢ÊÔʹÓà Scala дһ¶ÎС³ÌÐòÉú³É´Ë°¸ÀýÐèÒªµÄÔ´Êý¾ÝÎļþ£¬¿ÉÒÔ¸ù¾ÝÄúµÄ HDFS ¼¯ÈºµÄÈÝÁ¿£¬Éú³É¾¡¿ÉÄÜ´óµÄÎļþ£¬ÓÃÀ´²âÊÔ±¾°¸ÀýÌṩµÄ³ÌÐò

ͼ 14. °¸ÀýËÄÊä³ö½á¹û

ͼ 15. °¸ÀýËÄÍê³É״̬

Spark job µÄÖ´ÐÐÁ÷³Ì¼ò½é

ÎÒÃÇ¿ÉÒÔ·¢ÏÖ£¬Spark Ó¦ÓóÌÐòÔÚÌá½»Ö´Ðк󣬿ØÖÆÌ¨»á´òÓ¡ºÜ¶àÈÕÖ¾ÐÅÏ¢£¬ÕâЩÐÅÏ¢¿´ÆðÀ´ÊÇÔÓÂÒÎÞյ쬵«ÊÇÈ´ÔÚÒ»¶¨³Ì¶ÈÉÏÌåÏÖÁËÒ»¸ö±»Ìá½»µÄ Spark job ÔÚ¼¯ÈºÖÐÊÇÈçºÎ±»µ÷¶ÈÖ´Ðеģ¬ÄÇôÔÚÕâÒ»½Ú£¬½«»áÏò´ó¼Ò½éÉÜÒ»¸öµäÐ굀 Spark job ÊÇÈçºÎ±»µ÷¶ÈÖ´ÐеÄ

ÎÒÃÇÏÈÀ´Á˽âÒÔϼ¸¸ö¸ÅÄî

DAG: ¼´ Directed Acyclic Graph£¬ÓÐÏòÎÞ»·Í¼£¬ÕâÊÇÒ»¸öͼÂÛÖеĸÅÄî¡£Èç¹ûÒ»¸öÓÐÏòͼÎÞ·¨´Óij¸ö¶¥µã³ö·¢¾­¹ýÈô¸ÉÌõ±ß»Øµ½¸Ãµã£¬ÔòÕâ¸öͼÊÇÒ»¸öÓÐÏòÎÞ»·Í¼

Job£ºÎÒÃÇÖªµÀ£¬Spark µÄ¼ÆËã²Ù×÷ÊÇ lazy Ö´Ðеģ¬Ö»Óе±Åöµ½Ò»¸ö¶¯×÷ (Action) Ëã×Óʱ²Å»á´¥·¢ÕæÕýµÄ¼ÆËã¡£Ò»¸ö Job ¾ÍÊÇÓɶ¯×÷Ëã×Ó¶ø²úÉú°üº¬Ò»¸ö»ò¶à¸ö Stage µÄ¼ÆËã×÷Òµ

Stage£ºJob ±»È·¶¨ºó,Spark µÄµ÷¶ÈÆ÷ (DAGScheduler) »á¸ù¾Ý¸Ã¼ÆËã×÷ÒµµÄ¼ÆËã²½Öè°Ñ×÷Òµ»®·Ö³ÉÒ»¸ö»òÕß¶à¸ö Stage¡£Stage ÓÖ·ÖΪ ShuffleMapStage ºÍ ResultStage£¬Ç°ÕßÒÔ shuffle ΪÊä³ö±ß½ç£¬ºóÕß»áÖ±½ÓÊä³ö½á¹û£¬Æä±ß½ç¿ÉÒÔÊÇ»ñÈ¡ÍⲿÊý¾Ý£¬Ò²¿ÉÒÔÊÇÒÔÒ»¸ö ShuffleMapStage µÄÊä³öΪ±ß½ç¡£Ã¿Ò»¸ö Stage ½«°üº¬Ò»¸ö TaskSet

TaskSet£º ´ú±íÒ»×éÏà¹ØÁªµÄûÓÐ shuffle ÒÀÀµ¹ØÏµµÄÈÎÎñ×é³ÉÈÎÎñ¼¯¡£Ò»×éÈÎÎñ»á±»Ò»ÆðÌá½»µ½¸ü¼Óµ×²ãµÄ TaskScheduler

Task£º´ú±íµ¥¸öÊý¾Ý·ÖÇøÉϵÄ×îС´¦Àíµ¥Ôª¡£·ÖΪ ShuffleMapTask ºÍ ResultTask¡£ShuffleMapTask Ö´ÐÐÈÎÎñ²¢°ÑÈÎÎñµÄÊä³ö»®·Öµ½ (»ùÓÚ task µÄ¶ÔÓ¦µÄÊý¾Ý·ÖÇø) ¶à¸ö bucket(ArrayBuffer) ÖÐ,ResultTask Ö´ÐÐÈÎÎñ²¢°ÑÈÎÎñµÄÊä³ö·¢Ë͸øÇý¶¯³ÌÐò

Spark µÄ×÷ÒµÈÎÎñµ÷¶ÈÊǸ´Ôӵģ¬ÐèÒª½áºÏÔ´ÂëÀ´½øÐнÏΪÏ꾡µÄ·ÖÎö£¬µ«ÊÇÕâÒѾ­³¬¹ý±¾Îĵķ¶Î§£¬ËùÒÔÕâÒ»½ÚÎÒÃÇÖ»ÊǶԴóÖµÄÁ÷³Ì½øÐзÖÎö

Spark Ó¦ÓóÌÐò±»Ìá½»ºó£¬µ±Ä³¸ö¶¯×÷Ëã×Ó´¥·¢Á˼ÆËã²Ù×÷ʱ£¬SparkContext »áÏò DAGScheduler Ìá½»Ò»¸ö×÷Òµ£¬½Ó×Å DAGScheduler »á¸ù¾Ý RDD Éú³ÉµÄÒÀÀµ¹ØÏµ»®·Ö Stage£¬²¢¾ö¶¨¸÷¸ö Stage Ö®¼äµÄÒÀÀµ¹ØÏµ£¬Stage Ö®¼äµÄÒÀÀµ¹ØÏµ¾ÍÐγÉÁË DAG¡£Stage µÄ»®·ÖÊÇÒÔ ShuffleDependency ΪÒÀ¾ÝµÄ£¬Ò²¾ÍÊÇ˵µ±Ä³¸ö RDD µÄÔËËãÐèÒª½«Êý¾Ý½øÐÐ Shuffle ʱ£¬Õâ¸ö°üº¬ÁË Shuffle ÒÀÀµ¹ØÏµµÄ RDD ½«±»ÓÃÀ´×÷ΪÊäÈëÐÅÏ¢£¬½ø¶ø¹¹½¨Ò»¸öÐ嵀 Stage¡£ÎÒÃÇ¿ÉÒÔ¿´µ½ÓÃÕâÑùµÄ·½Ê½»®·Ö Stage£¬Äܹ»±£Ö¤ÓÐÒÀÀµ¹ØÏµµÄÊý¾Ý¿ÉÒÔÒÔÕýÈ·µÄ˳ÐòÖ´ÐС£¸ù¾Ýÿ¸ö Stage ËùÒÀÀµµÄ RDD Êý¾ÝµÄ partition µÄ·Ö²¼£¬»á²úÉú³öÓë partition ÊýÁ¿ÏàµÈµÄ Task£¬ÕâЩ Task ¸ù¾Ý partition µÄλÖýøÐзֲ¼¡£Æä´Î¶ÔÓÚ finalStage »òÊÇ mapStage »á²úÉú²»Í¬µÄ Task£¬×îºóËùÓÐµÄ Task »á·â×°µ½ TaskSet ÄÚÌá½»µ½ TaskScheduler È¥Ö´ÐС£ÓÐÐËȤµÄ¶ÁÕß¿ÉÒÔͨ¹ýÔĶÁ DAGScheduler ºÍ TaskScheduler µÄÔ´Âë»ñÈ¡¸üÏêϸµÄÖ´ÐÐÁ÷³Ì

½áÊøÓï

ͨ¹ý±¾ÎÄ£¬ÏàÐŶÁÕß¶ÔÈçºÎʹÓà Scala ±àд Spark Ó¦ÓóÌÐò´¦Àí´óÊý¾ÝÒѾ­ÓÐÁ˽ÏΪÉîÈëµÄÁ˽⡣µ±È»ÔÚ´¦Àíʵ¼ÊÎÊÌâʱ£¬Çé¿ö¿ÉÄܱȱ¾ÎľٵÃÀý×Ó¸´ÔӺܶ࣬µ«Êǽâ¾öÎÊÌâµÄ»ù±¾Ë¼ÏëÊÇÒ»Öµġ£ÔÚÅöµ½Êµ¼ÊÎÊÌâµÄʱºò£¬Ê×ÏÈÒª¶ÔÔ´Êý¾Ý½á¹¹¸ñʽµÈ½øÐзÖÎö£¬È»ºóÈ·¶¨ÈçºÎȥʹÓà Spark ÌṩµÄËã×Ó¶ÔÊý¾Ý½øÐÐת»¯£¬×îÖÕ¸ù¾Ýʵ¼ÊÐèÇóÑ¡ÔñºÏÊʵÄËã×Ó²Ù×÷Êý¾Ý²¢¼ÆËã½á¹û¡£

 

   
2399 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ