×Ô2013Äê6Ô½øÈëApache·õ»¯Æ÷£¬SparkÒѾÓÐÀ´×Ô25¸ö×éÖ¯µÄ120¶àλ¿ª·¢Õß²ÎÓë¹±Ïס£¶øÔÚ²»¾Ãǰ£¬¸ü³ÉΪÁËApacheÈí¼þ»ù½ð»áµÄ¶¥¼¶ÏîÄ¿£¬µ±ÏÂÒÑÊÇÖªÃûHadoop¿ª·¢ÉÌClouderaºÍMapRµÄг衣
SparkÊÇ·¢Ô´ÓÚÃÀ¹ú¼ÓÖÝ´óѧ²®¿ËÀû·ÖУAMPLabµÄ¼¯Èº¼ÆËãÆ½Ì¨£¬ËüÁ¢×ãÓÚÄÚ´æ¼ÆË㣬ÐÔÄܳ¬¹ýHadoop°Ù±¶£¬¼´Ê¹Ê¹ÓôÅÅÌ£¬µü´úÀàÐ͵ļÆËãÒ²»áÓÐ10±¶ËٶȵÄÌáÉý¡£Spark´Ó¶àµü´úÅúÁ¿´¦Àí³ö·¢£¬¼æÊÕ²¢ÐîÊý¾Ý²Ö¿â¡¢Á÷´¦ÀíºÍͼ¼ÆËãµÈ¶àÖÖ¼ÆË㷶ʽ£¬ÊǺ±¼ûµÄÈ«ÄÜÑ¡ÊÖ¡£Sparkµ±ÏÂÒѳÉΪApache»ù½ð»áµÄ¶¥¼¶¿ªÔ´ÏîÄ¿£¬ÓµÓÐ×ÅÅÓ´óµÄÉçÇøÖ§³Ö¡ª¡ª»îÔ¾¿ª·¢ÕßÈËÊýÒѳ¬¹ýHadoop
MapReduce£©¡£ÕâÀÎÒÃÇΪ´ó¼Ò·ÖÏíÐíÅôµÄ¡°Apache SparkÔ´Âë×ß¶Á¡±ÏµÁв©ÎÄ£¬´ÓÔ´Âë·½Ãæ¶ÔÕâ¸öÁ÷ÐдóÊý¾Ý¼ÆËã¿ò¼Ü½øÐÐÉî¶ÈÁ˽⡣
¹ØÓÚ²©Ö÷£ºÐíÅô£¬»¨Ãû@»Õ»¦Ò»ÀÉ£¬2000Äê±ÏÒµÓÚÄϾ©ÓʵçѧԺ£¬ÏÖ¾ÍÒµÓÚ°®Á¢ÐÅÉϺ££¬ÔÚUDM²¿ÃÅ´ÓÊÂÏà¹Ø²úÆ·Ñз¢£¬¸öÈ˹Ø×¢ÓÚLinux
Äں˼°ÊµÊ±¼ÆËã¿ò¼ÜÈçStorm¡¢SparkµÈ¡£
Ш×Ó
Ô´ÂëÔĶÁÊÇÒ»¼þ·Ç³£ÈÝÒ×µÄÊ£¬Ò²ÊÇÒ»¼þ·Ç³£ÄѵÄÊ¡£ÈÝÒ×µÄÊÇ´úÂë¾ÍÔÚÄÇÀһ´ò¿ª¾Í¿ÉÒÔ¿´µ½¡£ÄѵÄÊÇҪͨ¹ý´úÂëÃ÷°××÷Õßµ±³õΪʲôҪÕâÑùÉè¼Æ£¬Éè¼ÆÖ®³õÒª½â¾öµÄÖ÷ÒªÎÊÌâÊÇʲô¡£
ÔÚ¶ÔSparkµÄÔ´Âë½øÐоßÌåµÄ×ß¶Á֮ǰ£¬Èç¹ûÏëÒª¿ìËÙ¶ÔSparkµÄÓÐÒ»¸öÕûÌåÐÔµÄÈÏʶ£¬ÔĶÁMatei
Zaharia×öµÄSparkÂÛÎÄÊÇÒ»¸ö·Ç³£²»´íµÄÑ¡Ôñ¡£
ÔÚÔĶÁ¸ÃÂÛÎĵĻù´¡Ö®ÉÏ£¬ÔÙ½áºÏSpark×÷ÕßÔÚ2012 Developer MeetupÉÏ×öµÄÑݽ²Introduction
to Spark Internals£¬ÄÇô¶ÔÓÚSparkµÄÄÚ²¿ÊµÏÖ»áÓÐÒ»¸ö±È½Ï´ó¸ÅµÄÁ˽⡣
ÓÐÁËÉÏÊöµÄÁ½ÆªÎÄÕµ춨»ù´¡Ö®ºó£¬ÔÙÀ´½øÐÐÔ´ÂëÔĶÁ£¬ÄÇô¾Í»áÖªµÀ·ÖÎöµÄÖØµã¼°Äѵ㡣
»ù±¾¸ÅÄBasic Concepts£©
1. RDD¡ª¡ªResillient Distributed Dataset µ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯¡£
2. Operation¡ª¡ª×÷ÓÃÓÚRDDµÄ¸÷ÖÖ²Ù×÷·ÖΪtransformationºÍaction¡£
3. Job¡ª¡ª×÷Òµ£¬Ò»¸öJOB°üº¬¶à¸öRDD¼°×÷ÓÃÓÚÏàÓ¦RDDÉϵĸ÷ÖÖoperation¡£
4. Stage¡ª¡ªÒ»¸ö×÷Òµ·ÖΪ¶à¸ö½×¶Î¡£
5. Partition¡ª¡ªÊý¾Ý·ÖÇø£¬ Ò»¸öRDDÖеÄÊý¾Ý¿ÉÒԷֳɶà¸ö²»Í¬µÄÇø¡£
6. DAG¡ª¡ªDirected Acycle graph£¬ÓÐÏòÎÞ»·Í¼£¬·´Ó¦RDDÖ®¼äµÄÒÀÀµ¹ØÏµ¡£
7. Narrow dependency¡ª¡ªÕÒÀÀµ£¬×ÓRDDÒÀÀµÓÚ¸¸RDDÖй̶¨µÄdata partition¡£
8. Wide Dependency¡ª¡ª¿íÒÀÀµ£¬×ÓRDD¶Ô¸¸RDDÖеÄËùÓÐdata partition¶¼ÓÐÒÀÀµ¡£
9. Caching Managenment¡ª¡ª»º´æ¹ÜÀí£¬¶ÔRDDµÄÖÐ¼ä¼ÆËã½á¹û½øÐлº´æ¹ÜÀíÒÔ¼Ó¿ìÕûÌåµÄ´¦ÀíËÙ¶È¡£
±à³ÌÄ£ÐÍ£¨Programming Model£©
RDDÊÇÖ»¶ÁµÄÊý¾Ý·ÖÇø¼¯ºÏ£¬×¢ÒâÊÇÊý¾Ý¼¯¡£
×÷ÓÃÓÚRDDÉϵÄOperation·ÖΪtransformantionºÍaction¡£ ¾Transformation´¦ÀíÖ®ºó£¬Êý¾Ý¼¯ÖеÄÄÚÈݻᷢÉú¸ü¸Ä£¬ÓÉÊý¾Ý¼¯Aת»»³ÉΪÊý¾Ý¼¯B£»¶ø¾Action´¦ÀíÖ®ºó£¬Êý¾Ý¼¯ÖеÄÄÚÈݻᱻ¹éԼΪһ¸ö¾ßÌåµÄÊýÖµ¡£
Ö»Óе±RDDÉÏÓÐactionʱ£¬¸ÃRDD¼°Æä¸¸RDDÉϵÄËùÓÐoperation²Å»á±»Ìá½»µ½clusterÖÐÕæÕýµÄ±»Ö´ÐС£
´Ó´úÂëµ½¶¯Ì¬ÔËÐУ¬Éæ¼°µ½µÄ×é¼þÈçÏÂͼËùʾ¡£

ÑÝʾ´úÂë
val sc = new SparkContext("Spark://...", "MyJob", home, jars) val file = sc.textFile("hdfs://...") val errors = file.filter(_.contains("ERROR")) errors.cache() errors.count() |
ÔËÐÐ̬£¨Runtime view£©
²»¹ÜʲôÑùµÄ¾²Ì¬Ä£ÐÍ£¬ÆäÔÚ¶¯Ì¬ÔËÐеÄʱºòÎÞÍâºõÓɽø³Ì£¬Ïß³Ì×é³É¡£
ÓÃSparkµÄÊõÓïÀ´Ëµ£¬static view³ÆÎªdataset view£¬¶ødynamic view³ÆÎªparition
view¡£¹ØÏµÈçͼËùʾ

ÔÚSparkÖеÄtask¿ÉÒÔ¶ÔÓ¦ÓÚỊ̈߳¬workerÊÇÒ»¸ö¸öµÄ½ø³Ì£¬workerÓÉdriverÀ´½øÐйÜÀí¡£
ÄÇôÎÊÌâÀ´ÁË£¬ÕâÒ»¸ö¸öµÄtaskÊÇÈçºÎ´ÓRDDÑݱä¹ýÀ´µÄÄØ£¿Ï½ڽ«Ïêϸ»Ø´ðÕâ¸öÎÊÌâ¡£
²¿Êð£¨Deployment view£©
µ±ÓÐAction×÷ÓÃÓÚijRDDʱ£¬¸Ãaction»á×÷Ϊһ¸öjob±»Ìá½»¡£
ÔÚÌá½»µÄ¹ý³ÌÖУ¬DAGSchedulerÄ£¿é½éÈëÔËË㣬¼ÆËãRDDÖ®¼äµÄÒÀÀµ¹ØÏµ¡£RDDÖ®¼äµÄÒÀÀµ¹ØÏµ¾ÍÐγÉÁËDAG¡£
ÿһ¸öJOB±»·ÖΪ¶à¸östage£¬»®·ÖstageµÄÒ»¸öÖ÷ÒªÒÀ¾ÝÊǵ±Ç°¼ÆËãÒò×ÓµÄÊäÈëÊÇ·ñÊÇÈ·¶¨µÄ£¬Èç¹ûÊÇÔò½«Æä·ÖÔÚͬһ¸östage£¬±ÜÃâ¶à¸östageÖ®¼äµÄÏûÏ¢´«µÝ¿ªÏú¡£
µ±stage±»Ìá½»Ö®ºó£¬ÓÉtaskschedulerÀ´¸ù¾ÝstageÀ´¼ÆËãËùÐèÒªµÄtask£¬²¢½«taskÌá½»µ½¶ÔÓ¦µÄworker¡£
SparkÖ§³ÖÒÔϼ¸ÖÖ²¿Êðģʽ£¬Standalone¡¢MesosºÍYARN¡£ÕâЩ²¿Êðģʽ½«×÷ΪtaskschedulerµÄ³õʼ»¯Èë²Î¡£

RDD½Ó¿Ú£¨RDD Interface£©
RDDÓÉÒÔϼ¸¸öÖ÷Òª²¿·Ö×é³É
partitions¡ª¡ªpartition¼¯ºÏ£¬Ò»¸öRDDÖÐÓжàÉÙdata
partition
dependencies¡ª¡ªRDDÒÀÀµ¹ØÏµ
compute(parition)¡ª¡ª¶ÔÓÚ¸ø¶¨µÄÊý¾Ý¼¯£¬ÐèÒª×÷ÄÄЩ¼ÆËã
preferredLocations¡ª¡ª¶ÔÓÚdata partitionµÄλÖÃÆ«ºÃ
partitioner¡ª¡ª¶ÔÓÚ¼ÆËã³öÀ´µÄÊý¾Ý½á¹ûÈçºÎ·Ö·¢
»º´æ»úÖÆ£¨caching£©
RDDµÄÖÐ¼ä¼ÆËã½á¹û¿ÉÒÔ±»»º´æÆðÀ´£¬»º´æÏÈÑ¡Memory£¬Èç¹ûMemory²»¹»µÄ»°£¬½«»á±»Ð´Èëµ½´ÅÅÌÖС£
¸ù¾ÝLRU£¨last-recent update£©À´¾ö¶¨ÄÄÏÈÄÚÈݼÌÐø±£´æÔÚÄڴ棬ÄÄЩ±£´æµ½´ÅÅÌ¡£
ÈÝ´íÐÔ£¨Fault-tolerant£©
´Ó×î³õʼµÄRDDµ½ÑÜÉú³öÀ´µÄ×îºóÒ»¸öRDD£¬ÖмäÒª¾¹ýһϵÁеĴ¦Àí¡£ÄÇôÈçºÎ´¦ÀíÖм价½Ú³öÏÖ´íÎóµÄ³¡¾°ÄØ£¿
SparkÌṩµÄ½â¾ö·½°¸ÊÇÖ»¶ÔʧЧµÄdata partition½øÐÐʼþÖØÑÝ£¬¶øÎÞÐë¶ÔÕû¸öÊý¾ÝÈ«¼¯½øÐÐʼþÖØÑÝ£¬ÕâÑù¿ÉÒÔ´ó´ó¼Ó¿ì³¡¾°»Ö¸´µÄ¿ªÏú¡£
RDDÓÖÊÇÈçºÎÖªµÀ×Ô¼ºµÄdata partitionµÄnumber¸ÃÊǶàÉÙ£¿Èç¹ûÊÇHDFSÎļþ£¬ÄÇôHDFSÎļþµÄblock½«»á³ÉΪһ¸öÖØÒªµÄ¼ÆËãÒÀ¾Ý¡£
¼¯Èº¹ÜÀí£¨cluster management£©
taskÔËÐÐÔÚclusterÖ®ÉÏ£¬³ýÁËSpark×ÔÉíÌṩµÄStandalone²¿Êðģʽ֮Í⣬Spark»¹ÄÚÔÚÖ§³ÖYarnºÍmesos¡£
YarnÀ´¸ºÔð¼ÆËã×ÊÔ´µÄµ÷¶ÈºÍ¼à¿Ø£¬¸ù¾Ý¼à¿Ø½á¹ûÀ´ÖØÆôʧЧµÄtask»òÕßÊÇÖØÐÂdistributed
taskÒ»µ©ÓÐеÄnode¼ÓÈëclusterµÄ»°¡£
ÕâÒ»²¿·ÖµÄÄÚÈÝÐèÒª²ÎYarnµÄÎĵµ¡£
С½á
ÔÚÔ´ÂëÔĶÁʱ£¬ÐèÒªÖØµã°ÑÎÕÒÔÏÂÁ½´óÖ÷Ïß¡£
¾²Ì¬view ¼´ RDD£¬transformation and action
¶¯Ì¬view ¼´ life of a job£¬ ÿһ¸öjobÓÖ·ÖΪ¶à¸östage£¬Ã¿Ò»¸östageÖпÉÒÔ°üº¬¶à¸ördd¼°Æätransformation£¬ÕâЩstageÓÖÊÇÈçºÎÓ³Éä³ÉΪtask±»distributedµ½clusterÖС£
¸ÅÒª
±¾ÎÄÒÔwordCountΪÀý£¬Ïêϸ˵Ã÷Spark´´½¨ºÍÔËÐÐjobµÄ¹ý³Ì£¬ÖصãÊÇÔÚ½ø³Ì¼°Ï̵߳Ĵ´½¨¡£
ʵÑé»·¾³´î½¨
ÔÚ½øÐкóÐø²Ù×÷ǰ£¬È·±£ÏÂÁÐÌõ¼þÒÑÂú×ã¡£
ÏÂÔØspark binary 0.9.1
°²×°scala
°²×°sbt
°²×°java
Æô¶¯spark-shell
µ¥»úģʽÔËÐУ¬¼´localģʽ
localģʽÔËÐзdz£¼òµ¥£¬Ö»ÒªÔËÐÐÒÔÏÂÃüÁî¼´¿É£¬¼ÙÉ赱ǰĿ¼ÊÇ$SPARK_HOME
MASTER=local bin/spark-shell |
"MASTER=local"¾ÍÊDZíÃ÷µ±Ç°ÔËÐÐÔÚµ¥»úģʽ
local cluster·½Ê½ÔËÐÐ
local clusterģʽÊÇÒ»ÖÖαclusterģʽ£¬ÔÚµ¥»ú»·¾³ÏÂÄ£ÄâStandaloneµÄ¼¯Èº£¬Æô¶¯Ë³Ðò·Ö±ðÈçÏ£º
Æô¶¯master
Æô¶¯worker
Æô¶¯spark-shell
master
$SPARK_HOME/sbin/start-master.sh |
×¢ÒâÔËÐÐʱµÄÊä³ö£¬ÈÕ־ĬÈϱ£´æÔÚ$SPARK_HOME/logsĿ¼¡£
masterÖ÷ÒªÊÇÔËÐÐÀà org.apache.spark.deploy.master.Master£¬ÔÚ8080¶Ë¿ÚÆô¶¯¼àÌý£¬ÈÕÖ¾ÈçÏÂͼËùʾ
ÐÞ¸ÄÅäÖÃ
½øÈë$SPARK_HOME/confĿ¼
½«spark-env.sh.templateÖØÃüÃûΪspark-env.sh
ÐÞ¸Äspark-env.sh£¬Ìí¼ÓÈçÏÂÄÚÈÝ
export SPARK_MASTER_IP=localhost export SPARK_LOCAL_IP=localhost |
ÔËÐÐworker
bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1 -c 1 -m 512M <br> |
workerÆô¶¯Íê³É£¬Á¬½Óµ½master¡£´ò¿ªmaserµÄweb ui¿ÉÒÔ¿´µ½Á¬½ÓÉÏÀ´µÄworker.
Master WEb UIµÄ¼àÌýµØÖ·ÊÇhttp://localhost:8080
Æô¶¯spark-shell
MASTER=spark://localhost:7077 bin/spark-shell |
Èç¹ûÒ»ÇÐ˳Àû£¬½«¿´µ½ÏÂÃæµÄÌáʾÐÅÏ¢¡£
Created spark context.. Spark context available as sc.
|
¿ÉÒÔÓÃä¯ÀÀÆ÷´ò¿ªlocalhost:4040À´²é¿´ÈçÏÂÄÚÈÝ
stages storage environment executors |
wordcount
ÉÏÊö»·¾³×¼±¸Í×µ±Ö®ºó£¬ÎÒÃÇÔÚsparkshellÖÐÔËÐÐÒ»ÏÂ×î¼òµ¥µÄÀý×Ó£¬ÔÚspark-shellÖÐÊäÈëÈçÏ´úÂë
scala>sc.textFile("README.md").filter(_.contains("Spark")).count |
ÉÏÊö´úÂëͳ¼ÆÔÚREADME.mdÖк¬ÓÐSparkµÄÐÐÊýÓжàÉÙ
²¿Êð¹ý³ÌÏê½â
Spark²¼Öû·¾³ÖÐ×é¼þ¹¹³ÉÈçÏÂͼËùʾ¡£

Spark cluster components
Driver Program ¼òÒªÀ´ËµÔÚspark-shellÖÐÊäÈëµÄwordcountÓï¾ä¶ÔÓ¦ÓÚÉÏͼµÄDriver
Program¡£
Cluster Manager ¾ÍÊǶÔÓ¦ÓÚÉÏÃæÌáµ½µÄmaster£¬Ö÷ÒªÆðµ½deploy
managementµÄ×÷ÓÃ
Worker Node ÓëMasterÏà±È£¬ÕâÊÇslave node¡£ÉÏÃæÔËÐи÷¸öexecutor£¬executor¿ÉÒÔ¶ÔÓ¦ÓÚÏ̡߳£executor´¦ÀíÁ½ÖÖ»ù±¾µÄÒµÎñÂß¼£¬Ò»ÖÖ¾ÍÊÇdriver
programme£¬ÁíÒ»ÖÖ¾ÍÊÇjobÔÚÌá½»Ö®ºó²ð·Ö³É¸÷¸östage£¬Ã¿¸östage¿ÉÒÔÔËÐÐÒ»µ½¶à¸ötask
Notes: ÔÚ¼¯Èº£¨cluster£©·½Ê½Ï£¬Cluster ManagerÔËÐÐÔÚÒ»¸öjvm½ø³ÌÖ®ÖУ¬¶øworkerÔËÐÐÔÚÁíÒ»¸öjvm½ø³ÌÖС£ÔÚlocal
clusterÖУ¬ÕâЩjvm½ø³Ì¶¼ÔÚͬһ̨»úÆ÷ÖУ¬Èç¹ûÊÇÕæÕýµÄStandalone»òMesos¼°Yarn¼¯Èº£¬workerÓëmaster»ò·Ö²¼ÓÚ²»Í¬µÄÖ÷»úÖ®ÉÏ¡£
JOBµÄÉú³ÉºÍÔËÐÐ
jobÉú³ÉµÄ¼òµ¥Á÷³ÌÈçÏÂ
Ê×ÏÈÓ¦ÓóÌÐò´´½¨SparkContextµÄʵÀý£¬ÈçʵÀýΪsc
ÀûÓÃSparkContextµÄʵÀýÀ´´´½¨Éú³ÉRDD
¾¹ýÒ»Á¬´®µÄtransformation²Ù×÷£¬ÔʼµÄRDDת»»³ÉΪÆäËüÀàÐ͵ÄRDD
µ±action×÷ÓÃÓÚת»»Ö®ºóRDDʱ£¬»áµ÷ÓÃSparkContextµÄrunJob·½·¨
sc.runJobµÄµ÷ÓÃÊǺóÃæÒ»Á¬´®·´Ó¦µÄÆðµã£¬¹Ø¼üÐÔµÄÔ¾±ä¾Í·¢ÉúÔÚ´Ë´¦
µ÷Ó÷¾¶´óÖÂÈçÏÂ
sc.runJob->dagScheduler.runJob->submitJob
DAGScheduler::submitJob»á´´½¨JobSummittedµÄevent·¢Ë͸øÄÚǶÀàeventProcessActor
eventProcessActorÔÚ½ÓÊÕµ½JobSubmmittedÖ®ºóµ÷ÓÃprocessEvent´¦Àíº¯Êý
jobµ½stageµÄת»»£¬Éú³ÉfinalStage²¢Ìá½»ÔËÐУ¬¹Ø¼üÊǵ÷ÓÃsubmitStage
ÔÚsubmitStageÖÐ»á¼ÆËãstageÖ®¼äµÄÒÀÀµ¹ØÏµ£¬ÒÀÀµ¹ØÏµ·ÖΪ¿íÒÀÀµºÍÕÒÀÀµÁ½ÖÖ
Èç¹û¼ÆËãÖз¢ÏÖµ±Ç°µÄstageûÓÐÈκÎÒÀÀµ»òÕßËùÓеÄÒÀÀµ¶¼ÒѾ׼±¸Íê±Ï£¬ÔòÌá½»task
Ìá½»taskÊǵ÷Óú¯ÊýsubmitMissingTasksÀ´Íê³É
taskÕæÕýÔËÐÐÔÚÄĸöworkerÉÏÃæÊÇÓÉTaskSchedulerÀ´¹ÜÀí£¬Ò²¾ÍÊÇÉÏÃæµÄsubmitMissingTasks»áµ÷ÓÃTaskScheduler::submitTasks
TaskSchedulerImplÖлá¸ù¾ÝSparkµÄµ±Ç°ÔËÐÐģʽÀ´´´½¨ÏàÓ¦µÄbackend£¬Èç¹ûÊÇÔÚµ¥»úÔËÐÐÔò´´½¨LocalBackend
LocalBackendÊÕµ½TaskSchedulerImpl´«µÝ½øÀ´µÄReceiveOffersʼþ
receiveOffers->executor.launchTask->TaskRunner.run
´úÂëÆ¬¶Îexecutor.lauchTask
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) } |
˵ÁËÕâôһ´óͨ£¬Ò²¾ÍÊǽ²×îÖÕµÄÂß¼´¦ÀíÇÐÇÐʵʵÊÇ·¢ÉúÔÚTaskRunnerÕâôһ¸öexecutorÖ®ÄÚ¡£
ÔËËã½á¹ûÊǰü×°³ÉΪMapStatusÈ»ºóͨ¹ýһϵÁеÄÄÚ²¿ÏûÏ¢´«µÝ£¬·´À¡µ½DAGScheduler£¬ÕâÒ»¸öÏûÏ¢´«µÝ·¾¶²»ÊǹýÓÚ¸´ÔÓ£¬ÓÐÐËȤ¿ÉÒÔ×ÔÐй´ÀÕ¡£
|