Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
SparkÈëÃÅ£¨Python°æ£©
 
×÷ÕߣºKenshinCui À´×ÔÓÚ£º²©¿ÍÔ° ·¢²¼ÓÚ 2015-12-1
  3437  次浏览      28
 

HadoopÊǶԴóÊý¾Ý¼¯½øÐзֲ¼Ê½¼ÆËãµÄ±ê×¼¹¤¾ß£¬ÕâÒ²ÊÇΪʲôµ±Äã´©¹ý»ú³¡Ê±ÄÜ¿´µ½¡±´óÊý¾Ý(Big Data)¡±¹ã¸æµÄÔ­Òò¡£ËüÒѾ­³ÉΪ´óÊý¾ÝµÄ²Ù×÷ϵͳ£¬ÌṩÁ˰üÀ¨¹¤¾ßºÍ¼¼ÇÉÔÚÄڵķḻÉú̬ϵͳ£¬ÔÊÐíʹÓÃÏà¶Ô±ãÒ˵ÄÉÌÒµÓ²¼þ¼¯Èº½øÐг¬¼¶¼ÆËã»ú¼¶±ðµÄ¼ÆËã¡£2003ºÍ2004Ä꣬Á½¸öÀ´×ÔGoogleµÄ¹ÛµãʹHadoop³ÉΪ¿ÉÄÜ£ºÒ»¸ö·Ö²¼Ê½´æ´¢¿ò¼Ü(GoogleÎļþϵͳ)£¬ÔÚHadoopÖб»ÊµÏÖΪHDFS£»Ò»¸ö·Ö²¼Ê½¼ÆËã¿ò¼Ü(MapReduce)¡£

ÕâÁ½¸ö¹Ûµã³ÉΪ¹ýȥʮÄê¹æÄ£·ÖÎö£¨scaling analytics£©¡¢´ó¹æÄ£»úÆ÷ѧϰ£¨machine learning£©£¬ÒÔ¼°ÆäËû´óÊý¾ÝÓ¦ÓóöÏÖµÄÖ÷ÒªÍÆ¶¯Á¦£¡µ«ÊÇ£¬´Ó¼¼Êõ½Ç¶ÈÉϽ²£¬Ê®ÄêÊÇÒ»¶Î·Ç³£³¤µÄʱ¼ä£¬¶øÇÒHadoop»¹´æÔںܶàÒÑÖªÏÞÖÆ£¬ÓÈÆäÊÇMapReduce¡£¶ÔMapReduce±à³ÌÃ÷ÏÔÊÇÀ§Äѵġ£¶Ô´ó¶àÊý·ÖÎö£¬Äã¶¼±ØÐëÓúܶಽÖ轫MapºÍReduceÈÎÎñ´®½ÓÆðÀ´¡£ÕâÔì³ÉÀàSQLµÄ¼ÆËã»ò»úÆ÷ѧϰÐèҪרÃŵÄϵͳÀ´½øÐС£¸üÔãµÄÊÇ£¬MapReduceÒªÇóÿ¸ö²½Öè¼äµÄÊý¾ÝÒªÐòÁл¯µ½´ÅÅÌ£¬ÕâÒâζ×ÅMapReduce×÷ÒµµÄI/O³É±¾ºÜ¸ß£¬µ¼Ö½»»¥·ÖÎöºÍµü´úËã·¨£¨iterative algorithms£©¿ªÏúºÜ´ó£»¶øÊÂʵÊÇ£¬¼¸ºõËùÓеÄ×îÓÅ»¯ºÍ»úÆ÷ѧϰ¶¼Êǵü´úµÄ¡£

ΪÁ˽â¾öÕâЩÎÊÌ⣬HadoopÒ»Ö±ÔÚÏòÒ»ÖÖ¸üΪͨÓõÄ×ÊÔ´¹ÜÀí¿ò¼Üת±ä£¬¼´YARN£¨Yet Another Resource Negotiator, ÓÖÒ»¸ö×ÊԴЭµ÷Õߣ©¡£YARNʵÏÖÁËÏÂÒ»´úµÄMapReduce£¬µ«Í¬Ê±Ò²ÔÊÐíÓ¦ÓÃÀûÓ÷ֲ¼Ê½×ÊÔ´¶ø²»±Ø²ÉÓÃMapReduce½øÐмÆË㡣ͨ¹ý½«¼¯Èº¹ÜÀíÒ»°ã»¯£¬Ñо¿×ªµ½·Ö²¼Ê½¼ÆËãµÄÒ»°ã»¯ÉÏ£¬À´À©Õ¹ÁËMapReduceµÄ³õÖÔ¡£

SparkÊǵÚÒ»¸öÍÑÌ¥ÓÚ¸Ãת±äµÄ¿ìËÙ¡¢Í¨Ó÷ֲ¼Ê½¼ÆË㷶ʽ£¬²¢ÇҺܿìÁ÷ÐÐÆðÀ´¡£SparkʹÓú¯Êýʽ±à³Ì·¶Ê½À©Õ¹ÁËMapReduceÄ£ÐÍÒÔÖ§³Ö¸ü¶à¼ÆËãÀàÐÍ£¬¿ÉÒÔº­¸Ç¹ã·ºµÄ¹¤×÷Á÷£¬ÕâЩ¹¤×÷Á÷֮ǰ±»ÊµÏÖΪHadoopÖ®ÉϵÄÌØÊâϵͳ¡£SparkʹÓÃÄڴ滺´æÀ´ÌáÉýÐÔÄÜ£¬Òò´Ë½øÐн»»¥Ê½·ÖÎöÒ²×ã¹»¿ìËÙ(¾ÍÈçͬʹÓÃPython½âÊÍÆ÷£¬Ó뼯Ⱥ½øÐн»»¥Ò»Ñù)¡£»º´æÍ¬Ê±ÌáÉýÁ˵ü´úËã·¨µÄÐÔÄÜ£¬ÕâʹµÃSpark·Ç³£ÊʺÏÊý¾ÝÀíÂÛÈÎÎñ£¬ÌرðÊÇ»úÆ÷ѧϰ¡£

±¾ÎÄÖУ¬ÎÒÃǽ«Ê×ÏÈÌÖÂÛÈçºÎÔÚ±¾µØ»úÆ÷ÉÏ»òÕßEC2µÄ¼¯ÈºÉÏÉèÖÃSpark½øÐмòµ¥·ÖÎö¡£È»ºó£¬ÎÒÃÇÔÚÈëÃż¶Ë®Æ½Ì½Ë÷Spark£¬Á˽âSparkÊÇʲôÒÔ¼°ËüÈçºÎ¹¤×÷£¨Ï£Íû¿ÉÒÔ¼¤·¢¸ü¶à̽Ë÷£©¡£×îºóÁ½½ÚÎÒÃÇ¿ªÊ¼Í¨¹ýÃüÁîÐÐÓëSpark½øÐн»»¥£¬È»ºóÑÝʾÈçºÎÓÃPythonдSparkÓ¦Ó㬲¢×÷ΪSpark×÷ÒµÌá½»µ½¼¯ÈºÉÏ¡£

ÉèÖÃSpark

ÔÚ±¾»úÉèÖúÍÔËÐÐSpark·Ç³£¼òµ¥¡£ÄãÖ»ÐèÒªÏÂÔØÒ»¸öÔ¤¹¹½¨µÄ°ü£¬Ö»ÒªÄã°²×°ÁËJava 6+ºÍPython 2.6+£¬¾Í¿ÉÒÔÔÚWindows¡¢Mac OS XºÍLinuxÉÏÔËÐÐSpark¡£È·±£java³ÌÐòÔÚPATH»·¾³±äÁ¿ÖУ¬»òÕßÉèÖÃÁËJAVA_HOME»·¾³±äÁ¿¡£ÀàËÆµÄ£¬pythonÒ²ÒªÔÚPATHÖС£

¼ÙÉèÄãÒѾ­°²×°ÁËJavaºÍPython£º

1.·ÃÎÊSparkÏÂÔØÒ³

2.Ñ¡ÔñSpark×îз¢²¼°æ(±¾ÎÄд×÷ʱÊÇ1.2.0)£¬Ò»¸öÔ¤¹¹½¨µÄHadoop 2.4°ü£¬Ö±½ÓÏÂÔØ¡£

ÏÖÔÚ£¬ÈçºÎ¼ÌÐøÒÀÀµÓÚÄãµÄ²Ù×÷ϵͳ£¬¿¿Äã×Ô¼ºÈ¥Ì½Ë÷ÁË¡£WindowsÓû§¿ÉÒÔÔÚÆÀÂÛÇø¶ÔÈçºÎÉèÖõÄÌáʾ½øÐÐÆÀÂÛ¡£

Ò»°ã£¬ÎҵĽ¨ÒéÊǰ´ÕÕÏÂÃæµÄ²½Öè(ÔÚPOSIX²Ù×÷ϵͳÉÏ)£º

1.½âѹSpark

~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz

2.½«½âÑ¹Ä¿Â¼ÒÆ¶¯µ½ÓÐЧӦÓóÌÐòĿ¼ÖÐ(ÈçWindowsÉϵÄ

~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

3.´´½¨Ö¸Ïò¸ÃSpark°æ±¾µÄ·ûºÅÁ´½Óµ½<sparkĿ¼¡£ÕâÑùÄã¿ÉÒÔ¼òµ¥µØÏÂÔØÐÂ/¾É°æ±¾µÄSpark£¬È»ºóÐÞ¸ÄÁ´½ÓÀ´¹ÜÀíSpark°æ±¾£¬¶ø²»Óøü¸Ä·¾¶»ò»·¾³±äÁ¿¡£

~$ ln -s /srv/spark-1.2.0 /srv/spark

4.ÐÞ¸ÄBASHÅäÖ㬽«SparkÌí¼Óµ½PATHÖУ¬ÉèÖÃSPARK_HOME»·¾³±äÁ¿¡£ÕâЩС¼¼ÇÉÔÚÃüÁîÐÐÉÏ»á°ïµ½Äã¡£ÔÚUbuntuÉÏ£¬Ö»Òª±à¼­~/.bash_profile»ò~/.profileÎļþ£¬½«ÒÔÏÂÓï¾äÌí¼Óµ½ÎļþÖУº

export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH

5.sourceÕâЩÅäÖ㨻òÕßÖØÆôÖÕ¶Ë£©Ö®ºó£¬Äã¾Í¿ÉÒÔÔÚ±¾µØÔËÐÐÒ»¸öpyspark½âÊÍÆ÷¡£Ö´ÐÐpysparkÃüÁÄã»á¿´µ½ÒÔϽá¹û£º

~$ pyspark
Python 2.7.8 (default, Dec 2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[¡­ snip ¡­]
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ `_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.0
/_/

Using Python version 2.7.8 (default, Dec 2 2014 12:45:58)
SparkContext available as sc.
>>>

ÏÖÔÚSparkÒѾ­°²×°Íê±Ï£¬¿ÉÒÔÔÚ±¾»úÒÔ¡±µ¥»úģʽ¡°£¨standalone mode£©Ê¹Óá£Äã¿ÉÒÔÔÚ±¾»ú¿ª·¢Ó¦Óò¢Ìá½»Spark×÷Òµ£¬ÕâЩ×÷Òµ½«ÒÔ¶à½ø³Ì/¶àÏß³ÌģʽÔËÐе쬻òÕߣ¬ÅäÖøûúÆ÷×÷Ϊһ¸ö¼¯ÈºµÄ¿Í»§¶Ë£¨²»ÍƼöÕâÑù×ö£¬ÒòΪÔÚSpark×÷ÒµÖУ¬Çý¶¯³ÌÐò(driver)ÊǸöºÜÖØÒªµÄ½ÇÉ«£¬²¢ÇÒÓ¦¸ÃÓ뼯ȺµÄÆäËû²¿·Ö´¦ÓÚÏàÍ¬ÍøÂ磩¡£¿ÉÄܳýÁË¿ª·¢£¬ÄãÔÚ±¾»úʹÓÃSpark×öµÃ×î¶àµÄ¾ÍÊÇÀûÓÃspark-ec2½Å±¾À´ÅäÖÃAmazonÔÆÉϵÄÒ»¸öEC2 Spark¼¯ÈºÁË¡£

¼òÂÔSparkÊä³ö

Spark£¨ºÍPySpark£©µÄÖ´ÐпÉÒÔÌØ±ðÏêϸ£¬ºÜ¶àINFOÈÕÖ¾ÏûÏ¢¶¼»á´òÓ¡µ½ÆÁÄ»¡£¿ª·¢¹ý³ÌÖУ¬ÕâЩ·Ç³£ÄÕÈË£¬ÒòΪ¿ÉÄܶªÊ§PythonÕ»¸ú×Ù»òÕßprintµÄÊä³ö¡£ÎªÁ˼õÉÙSparkÊä³ö ¨C Äã¿ÉÒÔÉèÖÃ$SPARK_HOME/confϵÄlog4j¡£Ê×ÏÈ£¬¿½±´Ò»·Ý$SPARK_HOME/conf/log4j.properties.templateÎļþ£¬È¥µô¡°.template¡±À©Õ¹Ãû¡£

~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties

±à¼­ÐÂÎļþ£¬ÓÃWARNÌæ»»´úÂëÖгöÏÖµÄINFO¡£ÄãµÄlog4j.propertiesÎļþÀàËÆ£º

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

ÏÖÔÚÔËÐÐPySpark£¬Êä³öÏûÏ¢½«»á¸ü¼òÂÔ£¡¸Ðл@genomegeekÔÚÒ»´ÎDistrict Data LabsµÄÑÐÌÖ»áÖÐÖ¸³öÕâÒ»µã¡£

ÔÚSparkÖÐʹÓÃIPython Notebook

µ±ËÑË÷ÓÐÓõÄSparkС¼¼ÇÉʱ£¬ÎÒ·¢ÏÖÁËһЩÎÄÕÂÌáµ½ÔÚPySparkÖÐÅäÖÃIPython notebook¡£IPython notebook¶ÔÊý¾Ý¿ÆÑ§¼ÒÀ´ËµÊǸö½»»¥µØ³ÊÏÖ¿ÆÑ§ºÍÀíÂÛ¹¤×÷µÄ±Ø±¸¹¤¾ß£¬Ëü¼¯³ÉÁËÎı¾ºÍPython´úÂë¡£¶ÔºÜ¶àÊý¾Ý¿ÆÑ§¼Ò£¬IPython notebookÊÇËûÃǵÄPythonÈëÃÅ£¬²¢ÇÒʹÓ÷dz£¹ã·º£¬ËùÒÔÎÒÏëÖµµÃÔÚ±¾ÎÄÖÐÌá¼°¡£

ÕâÀïµÄ´ó²¿·Ö˵Ã÷¶¼À´¸Ä±à×ÔIPython notebook: ÔÚPySparkÖÐÉèÖÃIPython¡£µ«ÊÇ£¬ÎÒÃǽ«¾Û½¹ÔÚ±¾»úÒÔµ¥»úģʽ½«IPtyon shellÁ¬½Óµ½PySpark£¬¶ø²»ÊÇÔÚEC2¼¯Èº¡£Èç¹ûÄãÏëÔÚÒ»¸ö¼¯ÈºÉÏʹÓÃPySpark/IPython£¬²é¿´²¢ÆÀÂÛÏÂÎĵÄ˵Ã÷°É£¡

1.ΪSpark´´½¨Ò»¸öiPython notebookÅäÖÃ

~$ ipython profile create spark
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'

¼ÇסÅäÖÃÎļþµÄλÖã¬Ìæ»»ÏÂÎĸ÷²½ÖèÏàÓ¦µÄ·¾¶£º

2.´´½¨Îļþ$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py£¬²¢Ìí¼ÓÈçÏ´úÂ룺

import os
import sys

# Configure the environment
if 'SPARK_HOME' not in os.environ:
os.environ['SPARK_HOME'] = '/srv/spark'

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

3.ʹÓÃÎÒÃǸոմ´½¨µÄÅäÖÃÀ´Æô¶¯IPython notebook¡£

~$ ipython notebook --profile spark

4.ÔÚnotebookÖУ¬ÄãÓ¦¸ÃÄÜ¿´µ½ÎÒÃǸոմ´½¨µÄ±äÁ¿¡£

print SPARK_HOME

5.ÔÚIPython notebook×îÉÏÃæ£¬È·±£ÄãÌí¼ÓÁËSpark context¡£

from pyspark import  SparkContext
sc = SparkContext( 'local', 'pyspark')

6.ʹÓÃIPython×ö¸ö¼òµ¥µÄ¼ÆËãÀ´²âÊÔSpark context¡£

def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n & 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True

# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))

# Compute the number of primes in the RDD
print nums.filter(isprime).count()

Èç¹ûÄãÄܵõ½Ò»¸öÊý×Ö¶øÇÒûÓдíÎó·¢Éú£¬ÄÇôÄãµÄcontextÕýÈ·¹¤×÷ÁË£¡

±à¼­Ìáʾ£ºÉÏÃæÅäÖÃÁËÒ»¸öʹÓÃPySparkÖ±½Óµ÷ÓÃIPython notebookµÄIPython context¡£µ«ÊÇ£¬ÄãÒ²¿ÉÒÔʹÓÃPySpark°´ÒÔÏ·½Ê½Ö±½ÓÆô¶¯Ò»¸önotebook£º $ IPYTHON_OPTS=¡±notebook ¨Cpylab inline¡± pyspark

Äĸö·½·¨ºÃÓÃÈ¡¾öÓÚÄãʹÓÃPySparkºÍIPythonµÄ¾ßÌåÇé¾°¡£Ç°Ò»¸öÔÊÐíÄã¸üÈÝÒ×µØÊ¹ÓÃIPython notebookÁ¬½Óµ½Ò»¸ö¼¯Èº£¬Òò´ËÊÇÎÒϲ»¶µÄ·½·¨¡£

ÔÚEC2ÉÏʹÓÃSpark

ÔÚ½²ÊÚʹÓÃHadoop½øÐзֲ¼Ê½¼ÆËãʱ£¬ÎÒ·¢Ïֺܶà¿ÉÒÔͨ¹ýÔÚ±¾µØÎ±·Ö²¼Ê½½Úµã£¨pseudo-distributed node£©»òÒÔµ¥½Úµãģʽ£¨single-node mode£©½²ÊÚ¡£µ«ÊÇΪÁËÁ˽âÕæÕý·¢ÉúÁËʲô£¬¾ÍÐèÒªÒ»¸ö¼¯Èº¡£µ±Êý¾Ý±äµÃÅÓ´ó£¬ÕâЩÊéÃæ½²Êڵļ¼ÄܺÍÕæÊµ¼ÆËãÐèÇó¼ä¾­³£³öÏÖ¸ôĤ¡£Èç¹ûÄã¿ÏÔÚѧϰÏêϸʹÓÃSparkÉÏ»¨Ç®£¬ÎÒ½¨ÒéÄãÉèÖÃÒ»¸ö¿ìËÙSpark¼¯Èº×ö×öʵÑé¡£ °üº¬5¸öslave£¨ºÍ1¸ömaster£©Ã¿ÖÜ´ó¸ÅʹÓÃ10СʱµÄ¼¯ÈºÃ¿Ô´ó¸ÅÐèÒª$45.18¡£

ÍêÕûµÄÌÖÂÛ¿ÉÒÔÔÚSparkÎĵµÖÐÕÒµ½£ºÔÚEC2ÉÏÔËÐÐSparkÔÚÄã¾ö¶¨¹ºÂòEC2¼¯ÈºÇ°Ò»¶¨ÒªÍ¨¶ÁÕâÆªÎĵµ£¡ÎÒÁгöÁËһЩ¹Ø¼üµã£º

ͨ¹ýAWS Console»ñÈ¡AWS EC2 key¶Ô£¨·ÃÎÊkeyºÍÃÜÔ¿key£©¡£

½«key¶Ôµ¼³öµ½ÄãµÄ»·¾³ÖС£ÔÚshellÖÐÇóöÒÔÏÂÃüÁ»òÕß½«ËüÃÇÌí¼Óµ½ÅäÖÃÖС£

export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey

×¢ÒⲻͬµÄ¹¤¾ßʹÓò»Í¬µÄ»·¾³Ãû³Æ£¬È·±£ÄãÓõÄÊÇSpark½Å±¾ËùʹÓõÄÃû³Æ¡£

3.Æô¶¯¼¯Èº£º

~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>

4.SSHµ½¼¯ÈºÀ´ÔËÐÐSpark×÷Òµ¡£

ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>

5.Ïú»Ù¼¯Èº

ec2$ ./spark-ec2 destroy &lt;cluster-name&gt;.

ÕâЩ½Å±¾»á×Ô¶¯´´½¨Ò»¸ö±¾µØµÄHDFS¼¯ÈºÀ´Ìí¼ÓÊý¾Ý£¬copy-dirÃüÁî¿ÉÒÔͬ²½´úÂëºÍÊý¾Ýµ½¸Ã¼¯Èº¡£µ«ÊÇÄã×îºÃʹÓÃS3À´´æ´¢Êý¾Ý£¬´´½¨Ê¹ÓÃs3://URIÀ´¼ÓÔØÊý¾ÝµÄRDDs¡£

SparkÊÇʲô£¿

¼ÈÈ»ÉèÖúÃÁËSpark£¬ÏÖÔÚÎÒÃÇÌÖÂÛÏÂSparkÊÇʲô¡£SparkÊǸöͨÓõļ¯Èº¼ÆËã¿ò¼Ü£¬Í¨¹ý½«´óÁ¿Êý¾Ý¼¯¼ÆËãÈÎÎñ·ÖÅäµ½¶ą̀¼ÆËã»úÉÏ£¬Ìṩ¸ßЧÄÚ´æ¼ÆËã¡£Èç¹ûÄãÊìϤHadoop£¬ÄÇôÄãÖªµÀ·Ö²¼Ê½¼ÆËã¿ò¼ÜÒª½â¾öÁ½¸öÎÊÌ⣺ÈçºÎ·Ö·¢Êý¾ÝºÍÈçºÎ·Ö·¢¼ÆËã¡£HadoopʹÓÃHDFSÀ´½â¾ö·Ö²¼Ê½Êý¾ÝÎÊÌ⣬MapReduce¼ÆË㷶ʽÌṩÓÐЧµÄ·Ö²¼Ê½¼ÆËã¡£ÀàËÆµÄ£¬SparkÓµÓжàÖÖÓïÑԵĺ¯Êýʽ±à³ÌAPI£¬ÌṩÁ˳ýmapºÍreduceÖ®Íâ¸ü¶àµÄÔËËã·û£¬ÕâЩ²Ù×÷ÊÇͨ¹ýÒ»¸ö³Æ×÷µ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯(resilient distributed datasets, RDDs)µÄ·Ö²¼Ê½Êý¾Ý¿ò¼Ü½øÐеġ£

±¾ÖÊÉÏ£¬RDDÊÇÖÖ±à³Ì³éÏ󣬴ú±í¿ÉÒÔ¿ç»úÆ÷½øÐзָîµÄÖ»¶Á¶ÔÏ󼯺ϡ£RDD¿ÉÒÔ´ÓÒ»¸ö¼Ì³Ð½á¹¹£¨lineage£©Öؽ¨£¨Òò´Ë¿ÉÒÔÈÝ´í£©£¬Í¨¹ý²¢ÐвÙ×÷·ÃÎÊ£¬¿ÉÒÔ¶ÁдHDFS»òS3ÕâÑùµÄ·Ö²¼Ê½´æ´¢£¬¸üÖØÒªµÄÊÇ£¬¿ÉÒÔ»º´æµ½worker½ÚµãµÄÄÚ´æÖнøÐÐÁ¢¼´ÖØÓá£ÓÉÓÚRDD¿ÉÒÔ±»»º´æÔÚÄÚ´æÖУ¬Spark¶Ôµü´úÓ¦ÓÃÌØ±ðÓÐЧ£¬ÒòΪÕâЩӦÓÃÖУ¬Êý¾ÝÊÇÔÚÕû¸öËã·¨ÔËËã¹ý³ÌÖж¼¿ÉÒÔ±»ÖØÓᣴó¶àÊý»úÆ÷ѧϰºÍ×îÓÅ»¯Ëã·¨¶¼Êǵü´úµÄ£¬Ê¹µÃSpark¶ÔÊý¾Ý¿ÆÑ§À´ËµÊǸö·Ç³£ÓÐЧµÄ¹¤¾ß¡£ÁíÍ⣬ÓÉÓÚSpark·Ç³£¿ì£¬¿ÉÒÔͨ¹ýÀàËÆPython REPLµÄÃüÁîÐÐÌáʾ·û½»»¥Ê½·ÃÎÊ¡£

Spark¿â±¾Éí°üº¬ºÜ¶àÓ¦ÓÃÔªËØ£¬ÕâÐ©ÔªËØ¿ÉÒÔÓõ½´ó²¿·Ö´óÊý¾ÝÓ¦ÓÃÖУ¬ÆäÖаüÀ¨¶Ô´óÊý¾Ý½øÐÐÀàËÆSQL²éѯµÄÖ§³Ö£¬»úÆ÷ѧϰºÍͼËã·¨£¬ÉõÖÁ¶ÔʵʱÁ÷Êý¾ÝµÄÖ§³Ö¡£

ºËÐÄ×é¼þÈçÏ£º

Spark Core£º°üº¬SparkµÄ»ù±¾¹¦ÄÜ£»ÓÈÆäÊǶ¨ÒåRDDµÄAPI¡¢²Ù×÷ÒÔ¼°ÕâÁ½ÕßÉϵ͝×÷¡£ÆäËûSparkµÄ¿â¶¼Êǹ¹½¨ÔÚRDDºÍSpark CoreÖ®Éϵġ£

Spark SQL£ºÌṩͨ¹ýApache HiveµÄSQL±äÌåHive²éѯÓïÑÔ£¨HiveQL£©ÓëSpark½øÐн»»¥µÄAPI¡£Ã¿¸öÊý¾Ý¿â±í±»µ±×öÒ»¸öRDD£¬Spark SQL²éѯ±»×ª»»ÎªSpark²Ù×÷¡£¶ÔÊìϤHiveºÍHiveQLµÄÈË£¬Spark¿ÉÒÔÄÃÀ´¾ÍÓá£

Spark Streaming£ºÔÊÐí¶ÔʵʱÊý¾ÝÁ÷½øÐд¦ÀíºÍ¿ØÖÆ¡£ºÜ¶àʵʱÊý¾Ý¿â£¨ÈçApache Store£©¿ÉÒÔ´¦ÀíʵʱÊý¾Ý¡£Spark StreamingÔÊÐí³ÌÐòÄܹ»ÏñÆÕͨRDDÒ»Ñù´¦ÀíʵʱÊý¾Ý¡£

MLlib£ºÒ»¸ö³£ÓûúÆ÷ѧϰËã·¨¿â£¬Ëã·¨±»ÊµÏÖΪ¶ÔRDDµÄSpark²Ù×÷¡£Õâ¸ö¿â°üº¬¿ÉÀ©Õ¹µÄѧϰËã·¨£¬±ÈÈç·ÖÀà¡¢»Ø¹éµÈÐèÒª¶Ô´óÁ¿Êý¾Ý¼¯½øÐеü´úµÄ²Ù×÷¡£Ö®Ç°¿ÉÑ¡µÄ´óÊý¾Ý»úÆ÷ѧϰ¿âMahout£¬½«»áתµ½Spark£¬²¢ÔÚδÀ´ÊµÏÖ¡£

GraphX£º¿ØÖÆÍ¼¡¢²¢ÐÐͼ²Ù×÷ºÍ¼ÆËãµÄÒ»×éËã·¨ºÍ¹¤¾ßµÄ¼¯ºÏ¡£GraphXÀ©Õ¹ÁËRDD API£¬°üº¬¿ØÖÆÍ¼¡¢´´½¨×Óͼ¡¢·ÃÎÊ·¾¶ÉÏËùÓж¥µãµÄ²Ù×÷¡£

ÓÉÓÚÕâЩ×é¼þÂú×ãÁ˺ܶà´óÊý¾ÝÐèÇó£¬Ò²Âú×ãÁ˺ܶàÊý¾Ý¿ÆÑ§ÈÎÎñµÄËã·¨ºÍ¼ÆËãÉϵÄÐèÒª£¬Spark¿ìËÙÁ÷ÐÐÆðÀ´¡£²»½öÈç´Ë£¬SparkÒ²ÌṩÁËʹÓÃScala¡¢JavaºÍPython±àдµÄAPI£»Âú×ãÁ˲»Í¬ÍÅÌåµÄÐèÇó£¬ÔÊÐí¸ü¶àÊý¾Ý¿ÆÑ§¼Ò¼ò±ãµØ²ÉÓÃSpark×÷ΪËûÃǵĴóÊý¾Ý½â¾ö·½°¸¡£

¶ÔSpark±à³Ì

±àдSparkÓ¦ÓÃÓë֮ǰʵÏÖÔÚHadoopÉÏµÄÆäËûÊý¾ÝÁ÷ÓïÑÔÀàËÆ¡£´úÂëдÈëÒ»¸ö¶èÐÔÇóÖµµÄÇý¶¯³ÌÐò£¨driver program£©ÖУ¬Í¨¹ýÒ»¸ö¶¯×÷£¨action£©£¬Çý¶¯´úÂë±»·Ö·¢µ½¼¯ÈºÉÏ£¬Óɸ÷¸öRDD·ÖÇøÉϵÄworkerÀ´Ö´ÐС£È»ºó½á¹û»á±»·¢ËÍ»ØÇý¶¯³ÌÐò½øÐоۺϻò±àÒë¡£±¾ÖÊÉÏ£¬Çý¶¯³ÌÐò´´½¨Ò»¸ö»ò¶à¸öRDD£¬µ÷ÓòÙ×÷À´×ª»»RDD£¬È»ºóµ÷Óö¯×÷´¦Àí±»×ª»»ºóµÄRDD¡£

ÕâЩ²½Öè´óÌåÈçÏ£º

¶¨ÒåÒ»¸ö»ò¶à¸öRDD£¬¿ÉÒÔͨ¹ý»ñÈ¡´æ´¢ÔÚ´ÅÅÌÉϵÄÊý¾Ý£¨HDFS£¬Cassandra£¬HBase£¬Local Disk£©£¬²¢Ðл¯ÄÚ´æÖеÄijЩ¼¯ºÏ£¬×ª»»£¨transform£©Ò»¸öÒÑ´æÔÚµÄRDD£¬»òÕߣ¬»º´æ»ò±£´æ¡£

ͨ¹ý´«µÝÒ»¸ö±Õ°ü£¨º¯Êý£©¸øRDDÉϵÄÿ¸öÔªËØÀ´µ÷ÓÃRDDÉϵIJÙ×÷¡£SparkÌṩÁ˳ýÁËMapºÍReduceµÄ80¶àÖָ߼¶²Ù×÷¡£

ʹÓýá¹ûRDDµÄ¶¯×÷£¨action£©£¨Èçcount¡¢collect¡¢saveµÈ£©¡£¶¯×÷½«»áÆô¶¯¼¯ÈºÉϵļÆËã¡£

µ±SparkÔÚÒ»¸öworkerÉÏÔËÐбհüʱ£¬±Õ°üÖÐÓõ½µÄËùÓбäÁ¿¶¼»á±»¿½±´µ½½ÚµãÉÏ£¬µ«ÊÇÓɱհüµÄ¾Ö²¿×÷ÓÃÓòÀ´Î¬»¤¡£SparkÌṩÁËÁ½ÖÖÀàÐ͵Ĺ²Ïí±äÁ¿£¬ÕâЩ±äÁ¿¿ÉÒÔ°´ÕÕÏÞ¶¨µÄ·½Ê½±»ËùÓÐworker·ÃÎÊ¡£¹ã²¥±äÁ¿»á±»·Ö·¢¸øËùÓÐworker£¬µ«ÊÇÊÇÖ»¶ÁµÄ¡£ÀÛ¼ÓÆ÷ÕâÖÖ±äÁ¿£¬worker¿ÉÒÔʹÓùØÁª²Ù×÷À´¡°¼Ó¡±£¬Í¨³£ÓÃ×÷¼ÆÊýÆ÷¡£

SparkÓ¦Óñ¾ÖÊÉÏͨ¹ýת»»ºÍ¶¯×÷À´¿ØÖÆRDD¡£ºóÐøÎÄÕ½«»áÉîÈëÌÖÂÛ£¬µ«ÊÇÀí½âÁËÕâ¸ö¾Í×ãÒÔÖ´ÐÐÏÂÃæµÄÀý×ÓÁË¡£

SparkµÄÖ´ÐÐ

¼òÂÔÃèÊöÏÂSparkµÄÖ´ÐС£±¾ÖÊÉÏ£¬SparkÓ¦ÓÃ×÷Ϊ¶ÀÁ¢µÄ½ø³ÌÔËÐУ¬ÓÉÇý¶¯³ÌÐòÖеÄSparkContextЭµ÷¡£Õâ¸öcontext½«»áÁ¬½Óµ½Ò»Ð©¼¯Èº¹ÜÀíÕߣ¨ÈçYARN£©£¬ÕâЩ¹ÜÀíÕß·ÖÅäϵͳ×ÊÔ´¡£¼¯ÈºÉϵÄÿ¸öworkerÓÉÖ´ÐÐÕߣ¨executor£©¹ÜÀí£¬Ö´ÐÐÕß·´¹ýÀ´ÓÉSparkContext¹ÜÀí¡£Ö´ÐÐÕß¹ÜÀí¼ÆËã¡¢´æ´¢£¬»¹ÓÐÿ̨»úÆ÷ÉϵĻº´æ¡£

ÖØµãÒª¼ÇסµÄÊÇÓ¦ÓôúÂëÓÉÇý¶¯³ÌÐò·¢Ë͸øÖ´ÐÐÕߣ¬Ö´ÐÐÕßÖ¸¶¨contextºÍÒªÔËÐеÄÈÎÎñ¡£Ö´ÐÐÕßÓëÇý¶¯³ÌÐòͨÐŽøÐÐÊý¾Ý·ÖÏí»òÕß½»»¥¡£Çý¶¯³ÌÐòÊÇSpark×÷ÒµµÄÖ÷Òª²ÎÓëÕߣ¬Òò´ËÐèÒªÓ뼯Ⱥ´¦ÓÚÏàͬµÄÍøÂç¡£ÕâÓëHadoop´úÂ벻ͬ£¬HadoopÖÐÄã¿ÉÒÔÔÚÈÎÒâλÖÃÌá½»×÷Òµ¸øJobTracker£¬JobTracker´¦Àí¼¯ÈºÉϵÄÖ´ÐС£

ÓëSpark½»»¥

ʹÓÃSpark×î¼òµ¥µÄ·½Ê½¾ÍÊÇʹÓý»»¥Ê½ÃüÁîÐÐÌáʾ·û¡£´ò¿ªPySparkÖÕ¶Ë£¬ÔÚÃüÁîÐÐÖдò³öpyspark¡£

~$ pyspark
[¡­ snip ¡­]
>>>

PySpark½«»á×Ô¶¯Ê¹Óñ¾µØSparkÅäÖô´½¨Ò»¸öSparkContext¡£Äã¿ÉÒÔͨ¹ýsc±äÁ¿À´·ÃÎÊËü¡£ÎÒÃÇÀ´´´½¨µÚÒ»¸öRDD¡£

>>> text = sc.textFile("shakespeare.txt")
>>> print text
shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

textFile·½·¨½«É¯Ê¿±ÈÑÇÈ«²¿×÷Æ·¼ÓÔØµ½Ò»¸öRDDÃüÃûÎı¾¡£Èç¹û²é¿´ÁËRDD£¬Äã¾Í¿ÉÒÔ¿´³öËüÊǸöMappedRDD£¬Îļþ·¾¶ÊÇÏà¶ÔÓÚµ±Ç°¹¤×÷Ŀ¼µÄÒ»¸öÏà¶Ô·¾¶£¨¼ÇµÃ´«µÝ´ÅÅÌÉÏÕýÈ·µÄshakespear.txtÎļþ·¾¶£©¡£ÎÒÃÇת»»ÏÂÕâ¸öRDD£¬À´½øÐзֲ¼Ê½¼ÆËãµÄ¡°hello world¡±£º¡°×ÖÊýͳ¼Æ¡±¡£

>>> from operator import add
>>> def tokenize(text):
... return text.split()
...
>>> words = text.flatMap(tokenize)
>>> print words
PythonRDD[2] at RDD at PythonRDD.scala:43

ÎÒÃÇÊ×Ïȵ¼ÈëÁËadd²Ù×÷·û£¬ËüÊǸöÃüÃûº¯Êý£¬¿ÉÒÔ×÷Ϊ¼Ó·¨µÄ±Õ°üÀ´Ê¹Óá£ÎÒÃÇÉÔºóÔÙʹÓÃÕâ¸öº¯Êý¡£Ê×ÏÈÎÒÃÇÒª×öµÄÊǰÑÎı¾²ð·ÖΪµ¥´Ê¡£ÎÒÃÇ´´½¨ÁËÒ»¸ötokenizeº¯Êý£¬²ÎÊýÊÇÎı¾Æ¬¶Î£¬·µ»Ø¸ù¾Ý¿Õ¸ñ²ð·ÖµÄµ¥´ÊÁÐ±í¡£È»ºóÎÒÃÇͨ¹ý¸øflatMap²Ù×÷·û´«µÝtokenize±Õ°ü¶ÔtextRDD½øÐб任´´½¨ÁËÒ»¸öwordsRDD¡£Äã»á·¢ÏÖ£¬wordsÊǸöPythonRDD£¬µ«ÊÇÖ´Ðб¾Ó¦¸ÃÁ¢¼´½øÐС£ÏÔÈ»£¬ÎÒÃÇ»¹Ã»ÓаÑÕû¸öɯʿ±ÈÑÇÊý¾Ý¼¯²ð·ÖΪµ¥´ÊÁÐ±í¡£

Èç¹ûÄãÔøÊ¹ÓÃMapReduce×ö¹ýHadoop°æµÄ¡°×ÖÊýͳ¼Æ¡±£¬ÄãÓ¦¸ÃÖªµÀÏÂÒ»²½Êǽ«Ã¿¸öµ¥´ÊÓ³Éäµ½Ò»¸ö¼üÖµ¶Ô£¬ÆäÖмüÊǵ¥´Ê£¬ÖµÊÇ1£¬È»ºóʹÓÃreducer¼ÆËãÿ¸ö¼üµÄ1×ÜÊý¡£

Ê×ÏÈ£¬ÎÒÃÇmapһϡ£

>>> wc = words.map(lambda x: (x,1))
>>> print wc.toDebugString()
(2) PythonRDD[3] at RDD at PythonRDD.scala:43
| shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
| shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2

ÎÒʹÓÃÁËÒ»¸öÄäÃûº¯Êý£¨ÓÃÁËPythonÖеÄlambda¹Ø¼ü×Ö£©¶ø²»ÊÇÃüÃûº¯Êý¡£ÕâÐдúÂ뽫»á°ÑlambdaÓ³É䵽ÿ¸öµ¥´Ê¡£Òò´Ë£¬Ã¿¸öx¶¼ÊÇÒ»¸öµ¥´Ê£¬Ã¿¸öµ¥´Ê¶¼»á±»ÄäÃû±Õ°üת»»ÎªÔª×é(word, 1)¡£ÎªÁ˲鿴ת»»¹ØÏµ£¬ÎÒÃÇʹÓÃtoDebugString·½·¨À´²é¿´PipelinedRDDÊÇÔõô±»×ª»»µÄ¡£¿ÉÒÔʹÓÃreduceByKey¶¯×÷½øÐÐ×ÖÊýͳ¼Æ£¬È»ºó°Ñͳ¼Æ½á¹ûдµ½´ÅÅÌ¡£

>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("wc")

Ò»µ©ÎÒÃÇ×îÖÕµ÷ÓÃÁËsaveAsTextFile¶¯×÷£¬Õâ¸ö·Ö²¼Ê½×÷Òµ¾Í¿ªÊ¼Ö´ÐÐÁË£¬ÔÚ×÷Òµ¡°¿ç¼¯ÈºµØ¡±£¨»òÕßÄã±¾»úµÄºÜ¶à½ø³Ì£©ÔËÐÐʱ£¬ÄãÓ¦¸Ã¿ÉÒÔ¿´µ½ºÜ¶àINFOÓï¾ä¡£Èç¹ûÍ˳ö½âÊÍÆ÷£¬Äã¿ÉÒÔ¿´µ½µ±Ç°¹¤×÷Ŀ¼ÏÂÓиö¡°wc¡±Ä¿Â¼¡£

$ ls wc/
_SUCCESS part-00000 part-00001

ÿ¸öpartÎļþ¶¼´ú±íÄã±¾»úÉϵĽø³Ì¼ÆËãµÃµ½µÄ±»±£³Öµ½´ÅÅÌÉϵÄ×îÖÕRDD¡£Èç¹û¶ÔÒ»¸öpartÎļþ½øÐÐheadÃüÁÄãÓ¦¸ÃÄÜ¿´µ½×ÖÊýͳ¼ÆÔª×é¡£

$ head wc/part-00000
(u'fawn', 14)
(u'Fame.', 1)
(u'Fame,', 2)
(u'kinghenryviii@7731', 1)
(u'othello@36737', 1)
(u'loveslabourslost@51678', 1)
(u'1kinghenryiv@54228', 1)
(u'troilusandcressida@83747', 1)
(u'fleeces', 1)
(u'midsummersnightsdream@71681', 1)

×¢ÒâÕâЩ¼üûÓÐÏñHadoopÒ»Ñù±»ÅÅÐò£¨ÒòΪHadoopÖÐMapºÍReduceÈÎÎñÖÐÓиö±ØÒªµÄ´òÂÒºÍÅÅÐò½×¶Î£©¡£µ«ÊÇ£¬Äܱ£Ö¤Ã¿¸öµ¥´ÊÔÚËùÓÐÎļþÖÐÖ»³öÏÖÒ»´Î£¬ÒòΪÄãʹÓÃÁËreduceByKey²Ù×÷·û¡£Ä㻹¿ÉÒÔʹÓÃsort²Ù×÷·ûÈ·±£ÔÚдÈëµ½´ÅÅÌ֮ǰËùÓеļü¶¼±»ÅŹýÐò¡£

±àдһ¸öSparkÓ¦ÓÃ

±àдSparkÓ¦ÓÃÓëͨ¹ý½»»¥Ê½¿ØÖÆÌ¨Ê¹ÓÃSparkÀàËÆ¡£APIÊÇÏàͬµÄ¡£Ê×ÏÈ£¬ÄãÐèÒª·ÃÎÊ<SparkContext£¬ËüÒѾ­ÓÉ<pyspark×Ô¶¯¼ÓÔØºÃÁË¡£

ʹÓÃSpark±àдSparkÓ¦ÓõÄÒ»¸ö»ù±¾Ä£°åÈçÏ£º

## Spark Application - execute with spark-submit

## Imports
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "My Spark Application"

## Closure Functions

## Main functionality

def main(sc):
pass

if __name__ == "__main__":

# Configure Spark
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)


# Execute Main functionality
main(sc)

Õâ¸öÄ£°åÁгöÁËÒ»¸öSparkÓ¦ÓÃËùÐèµÄ¶«Î÷£ºµ¼ÈëPython¿â£¬Ä£¿é³£Á¿£¬ÓÃÓÚµ÷ÊÔºÍSpark UIµÄ¿Éʶ±ðµÄÓ¦ÓÃÃû³Æ£¬»¹ÓÐ×÷ΪÇý¶¯³ÌÐòÔËÐеÄһЩÖ÷Òª·ÖÎö·½·¨Ñ§¡£ÔÚifmainÖУ¬ÎÒÃÇ´´½¨ÁËSparkContext£¬Ê¹ÓÃÁËÅäÖúõÄcontextÖ´ÐÐmain¡£ÎÒÃÇ¿ÉÒÔ¼òµ¥µØµ¼ÈëÇý¶¯´úÂëµ½pyspark¶ø²»ÓÃÖ´ÐС£×¢ÒâÕâÀïSparkÅäÖÃͨ¹ýsetMaster·½·¨±»Ó²±àÂëµ½SparkConf£¬Ò»°ãÄãÓ¦¸ÃÔÊÐíÕâ¸öֵͨ¹ýÃüÁîÐÐÀ´ÉèÖã¬ËùÒÔÄãÄÜ¿´µ½ÕâÐÐ×öÁËռλ·û×¢ÊÍ¡£

ʹÓÃ<sc.stop()»ò<sys.exit(0)À´¹Ø±Õ»òÍ˳ö³ÌÐò¡£

## Spark Application - execute with spark-submit

## Imports
import csv
import matplotlib.pyplot as plt

from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight = namedtuple('Flight', fields)

## Closure Functions
def parse(row):

"""

Parses a row and returns a named tuple.

"""

row[0] = datetime.strptime(row[0], DATE_FMT).date()
row[5] = datetime.strptime(row[5], TIME_FMT).time()
row[6] = float(row[6])
row[7] = datetime.strptime(row[7], TIME_FMT).time()
row[8] = float(row[8])
row[9] = float(row[9])
row[10] = float(row[10])
return Flight(*row[:11])

def split(line):

"""

Operator function for splitting a line with csv module

"""
reader = csv.reader(StringIO(line))
return reader.next()

def plot(delays):

"""

Show a bar chart of the total delay per airline

"""
airlines = [d[0] for d in delays]
minutes = [d[1] for d in delays]
index = list(xrange(len(airlines)))

fig, axe = plt.subplots()
bars = axe.barh(index, minutes)


# Add the total minutes to the right
for idx, air, min in zip(index, airlines, minutes):
if min > 0:
bars[idx].set_color('#d9230f')
axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
else:
bars[idx].set_color('#469408')
axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')


# Set the ticks
ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
xt = plt.xticks()[0]
plt.xticks(xt, [' '] * len(xt))


# minimize chart junk
plt.grid(axis = 'x', color ='white', linestyle='-')

plt.title('Total Minutes Delayed per Airline')
plt.show()

## Main functionality
def main(sc):


# Load the airlines lookup dictionary
airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())


# Broadcast the lookup dictionary to the cluster
airline_lookup = sc.broadcast(airlines)


# Read the CSV Data into an RDD
flights = sc.textFile("ontime/flights.csv").map(split).map(parse)


# Map the total delay to the airline (joined using the broadcast value)
delays = flights.map(lambda f: (airline_lookup.value[f.airline],
add(f.dep_delay, f.arv_delay)))


# Reduce the total delay for the month to the airline
delays = delays.reduceByKey(add).collect()
delays = sorted(delays, key=itemgetter(1))


# Provide output from the driver
for d in delays:
print "%0.0f minutes delayed\t%s" % (d[1], d[0])


# Show a bar chart of the delays
plot(delays)

if __name__ == "__main__":

# Configure Spark
conf = SparkConf().setMaster("local[*]")
conf = conf.setAppName(APP_NAME)
sc = SparkContext(conf=conf)


# Execute Main functionality
main(sc)

ʹÓÃ<spark-submitÃüÁîÀ´ÔËÐÐÕâ¶Î´úÂ루¼ÙÉèÄãÒÑÓÐontimeĿ¼£¬Ä¿Â¼ÖÐÓÐÁ½¸öCSVÎļþ£©£º

~$ spark-submit app.py

Õâ¸öSpark×÷ҵʹÓñ¾»ú×÷Ϊmaster£¬²¢ËÑË÷app.pyͬĿ¼ÏµÄontimeĿ¼ÏµÄ2¸öCSVÎļþ¡£×îÖÕ½á¹ûÏÔʾ£¬4ÔµÄ×ÜÑÓÎóʱ¼ä£¨µ¥Î»·ÖÖÓ£©£¬¼ÈÓÐÔçµãµÄ£¨Èç¹ûÄã´ÓÃÀ¹ú´ó½·ÉÍùÏÄÍþÒÄ»òÕß°¢À­Ë¹¼Ó£©£¬µ«¶Ô´ó²¿·Ö´óÐͺ½¿Õ¹«Ë¾¶¼ÊÇÑÓÎóµÄ¡£×¢Ò⣬ÎÒÃÇÔÚapp.pyÖÐʹÓÃmatplotlibÖ±½Ó½«½á¹û¿ÉÊÓ»¯³öÀ´ÁË£º

Õâ¶Î´úÂë×öÁËÊ²Ã´ÄØ£¿ÎÒÃÇÌØ±ð×¢ÒâÏÂÓëSpark×îÖ±½ÓÏà¹ØµÄmainº¯Êý¡£Ê×ÏÈ£¬ÎÒÃǼÓÔØCSVÎļþµ½RDD£¬È»ºó°Ñsplitº¯ÊýÓ³É䏸Ëü¡£splitº¯ÊýʹÓÃcsvÄ£¿é½âÎöÎı¾µÄÿһÐУ¬²¢·µ»Ø´ú±íÿÐеÄÔª×é¡£×îºó£¬ÎÒÃǽ«collect¶¯×÷´«¸øRDD£¬Õâ¸ö¶¯×÷°ÑÊý¾ÝÒÔPythonÁбíµÄÐÎʽ´ÓRDD´«»ØÇý¶¯³ÌÐò¡£±¾ÀýÖУ¬airlines.csvÊǸöСÐ͵ÄÌø×ª±í£¨jump table£©£¬¿ÉÒÔ½«º½¿Õ¹«Ë¾´úÂëÓëÈ«Ãû¶ÔÓ¦ÆðÀ´¡£ÎÒÃǽ«×ªÒÆ±í´æ´¢ÎªPython×ֵ䣬ȻºóʹÓÃsc.broadcast¹ã²¥¸ø¼¯ÈºÉϵÄÿ¸ö½Úµã¡£

½Ó×Å£¬mainº¯Êý¼ÓÔØÁËÊý¾ÝÁ¿¸ü´óµÄflights.csv£¨[ÒëÕß×¢]×÷Õß±ÊÎóд³Éfights.csv£¬´Ë´¦¸üÕý£©¡£²ð·ÖCSVÐÐÍê³ÉÖ®ºó£¬ÎÒÃǽ«parseº¯ÊýÓ³É䏸CSVÐУ¬´Ëº¯Êý»á°ÑÈÕÆÚºÍʱ¼äת³ÉPythonµÄÈÕÆÚºÍʱ¼ä£¬²¢¶Ô¸¡µãÊý½øÐкÏÊʵÄÀàÐÍת»»¡£Ã¿ÐÐ×÷Ϊһ¸öNamedTuple±£´æ£¬ÃûΪFlight£¬ÒÔ±ã¸ßЧ¼ò±ãµØÊ¹Óá£

ÓÐÁËFlight¶ÔÏóµÄRDD£¬ÎÒÃÇÓ³ÉäÒ»¸öÄäÃûº¯Êý£¬Õâ¸öº¯Êý½«RDDת»»ÎªÒ»Ð©ÁеļüÖµ¶Ô£¬ÆäÖмüÊǺ½¿Õ¹«Ë¾µÄÃû×Ö£¬ÖµÊǵ½´ïºÍ³ö·¢µÄÑÓÎóʱ¼ä×ܺ͡£Ê¹ÓÃreduceByKey¶¯×÷ºÍadd²Ù×÷·û¿ÉÒԵõ½Ã¿¸öº½¿Õ¹«Ë¾µÄÑÓÎóʱ¼ä×ܺͣ¬È»ºóRDD±»´«µÝ¸øÇý¶¯³ÌÐò£¨Êý¾ÝÖк½¿Õ¹«Ë¾µÄÊýÄ¿Ïà¶Ô½ÏÉÙ£©¡£×îÖÕÑÓÎóʱ¼ä°´ÕÕÉýÐòÅÅÁУ¬Êä³ö´òÓ¡µ½ÁË¿ØÖÆÌ¨£¬²¢ÇÒʹÓÃmatplotlib½øÐÐÁË¿ÉÊÓ»¯¡£

Õâ¸öÀý×ÓÉÔ³¤£¬µ«ÊÇÏ£ÍûÄÜÑÝʾ³ö¼¯ÈººÍÇý¶¯³ÌÐòÖ®¼äµÄÏ໥×÷Ó㨷¢ËÍÊý¾Ý½øÐзÖÎö£¬½á¹ûÈ¡»Ø¸øÇý¶¯³ÌÐò£©£¬ÒÔ¼°Python´úÂëÔÚSparkÓ¦ÓÃÖеĽÇÉ«¡£

½áÂÛ

¾¡¹ÜËã²»ÉÏÒ»¸öÍêÕûµÄSparkÈëÃÅ£¬ÎÒÃÇÏ£ÍûÄãÄܸüºÃµØÁ˽âSparkÊÇʲô£¬ÈçºÎʹÓýøÐпìËÙ¡¢ÄÚ´æ·Ö²¼Ê½¼ÆËã¡£ÖÁÉÙ£¬ÄãÓ¦¸ÃÄܽ«SparkÔËÐÐÆðÀ´£¬²¢¿ªÊ¼ÔÚ±¾»ú»òAmazon EC2ÉÏ̽Ë÷Êý¾Ý¡£ÄãÓ¦¸Ã¿ÉÒÔÅäÖúÃiPython notebookÀ´ÔËÐÐSpark¡£

Spark²»Äܽâ¾ö·Ö²¼Ê½´æ´¢ÎÊÌ⣨ͨ³£Spark´ÓHDFSÖлñÈ¡Êý¾Ý£©£¬µ«ÊÇËüΪ·Ö²¼Ê½¼ÆËãÌṩÁ˷ḻµÄº¯Êýʽ±à³ÌAPI¡£Õâ¸ö¿ò¼Ü½¨Á¢ÔÚÉìËõ·Ö²¼Ê½Êý¾Ý¼¯£¨RDD£©Ö®ÉÏ¡£RDDÊÇÖÖ±à³Ì³éÏ󣬴ú±í±»·ÖÇøµÄ¶ÔÏ󼯺ϣ¬ÔÊÐí½øÐзֲ¼Ê½²Ù×÷¡£RDDÓÐÈÝ´íÄÜÁ¦£¨¿ÉÉìËõµÄ²¿·Ö£©£¬¸üÖØÒªµÄʱ£¬¿ÉÒÔ´æ´¢µ½½ÚµãÉϵÄworkerÄÚ´æÀï½øÐÐÁ¢¼´ÖØÓá£ÄÚ´æ´æ´¢ÌṩÁË¿ìËٺͼòµ¥±íʾµÄµü´úËã·¨£¬ÒÔ¼°ÊµÊ±½»»¥·ÖÎö¡£

ÓÉÓÚSpark¿âÌṩÁËPython¡¢Scale¡¢Java±àдµÄAPI£¬ÒÔ¼°ÄÚ½¨µÄ»úÆ÷ѧϰ¡¢Á÷Êý¾Ý¡¢Í¼Ëã·¨¡¢ÀàSQL²éѯµÈÄ£¿é£»SparkѸËÙ³ÉΪµ±½ñ×îÖØÒªµÄ·Ö²¼Ê½¼ÆËã¿ò¼ÜÖ®Ò»¡£ÓëYARN½áºÏ£¬SparkÌṩÁËÔöÁ¿£¬¶ø²»ÊÇÌæ´úÒÑ´æÔÚµÄHadoop¼¯Èº£¬Ëü½«³ÉΪδÀ´´óÊý¾ÝÖØÒªµÄÒ»²¿·Ö£¬ÎªÊý¾Ý¿ÆÑ§Ì½Ë÷ÆÌÉèÁËÒ»Ìõ¿µ×¯´óµÀ¡£

   
3437 ´Îä¯ÀÀ       28
 
Ïà¹ØÎÄÕÂ

ÊÖ»úÈí¼þ²âÊÔÓÃÀýÉè¼ÆÊµ¼ù
ÊÖ»ú¿Í»§¶ËUI²âÊÔ·ÖÎö
iPhoneÏûÏ¢ÍÆËÍ»úÖÆÊµÏÖÓë̽ÌÖ
AndroidÊÖ»ú¿ª·¢£¨Ò»£©
 
Ïà¹ØÎĵµ

Android_UI¹Ù·½Éè¼Æ½Ì³Ì
ÊÖ»ú¿ª·¢Æ½Ì¨½éÉÜ
androidÅÄÕÕ¼°ÉÏ´«¹¦ÄÜ
Android½²ÒåÖÇÄÜÊÖ»ú¿ª·¢
Ïà¹Ø¿Î³Ì

Android¸ß¼¶Òƶ¯Ó¦ÓóÌÐò
Androidϵͳ¿ª·¢
AndroidÓ¦Óÿª·¢
ÊÖ»úÈí¼þ²âÊÔ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

androidÈË»ú½çÃæÖ¸ÄÏ
AndroidÊÖ»ú¿ª·¢£¨Ò»£©
AndroidÊÖ»ú¿ª·¢£¨¶þ£©
AndroidÊÖ»ú¿ª·¢£¨Èý£©
AndroidÊÖ»ú¿ª·¢£¨ËÄ£©
iPhoneÏûÏ¢ÍÆËÍ»úÖÆÊµÏÖ̽ÌÖ
ÊÖ»úÈí¼þ²âÊÔÓÃÀýÉè¼ÆÊµ¼ù
ÊÖ»ú¿Í»§¶ËUI²âÊÔ·ÖÎö
ÊÖ»úÈí¼þ×Ô¶¯»¯²âÊÔÑо¿±¨¸æ

Android¸ß¼¶Òƶ¯Ó¦ÓóÌÐò
AndroidÓ¦Óÿª·¢
Androidϵͳ¿ª·¢
ÊÖ»úÈí¼þ²âÊÔ
ǶÈëʽÈí¼þ²âÊÔ
AndroidÈí¡¢Ó²¡¢ÔÆÕûºÏ

ÁìÏÈIT¹«Ë¾ android¿ª·¢Æ½Ì¨×î¼Ñʵ¼ù
±±¾© Android¿ª·¢¼¼Êõ½ø½×
ijÐÂÄÜÔ´ÁìÓòÆóÒµ Android¿ª·¢¼¼Êõ
ijº½Ì칫˾ Android¡¢IOSÓ¦ÓÃÈí¼þ¿ª·¢
°¢¶û¿¨ÌØ LinuxÄÚºËÇý¶¯
°¬Ä¬Éú ǶÈëʽÈí¼þ¼Ü¹¹Éè¼Æ
Î÷ÃÅ×Ó Ç¶Èëʽ¼Ü¹¹Éè¼Æ