HadoopÊǶԴóÊý¾Ý¼¯½øÐзֲ¼Ê½¼ÆËãµÄ±ê×¼¹¤¾ß£¬ÕâÒ²ÊÇΪʲôµ±Äã´©¹ý»ú³¡Ê±ÄÜ¿´µ½¡±´óÊý¾Ý(Big
Data)¡±¹ã¸æµÄÔÒò¡£ËüÒѾ³ÉΪ´óÊý¾ÝµÄ²Ù×÷ϵͳ£¬ÌṩÁ˰üÀ¨¹¤¾ßºÍ¼¼ÇÉÔÚÄڵķḻÉú̬ϵͳ£¬ÔÊÐíʹÓÃÏà¶Ô±ãÒ˵ÄÉÌÒµÓ²¼þ¼¯Èº½øÐг¬¼¶¼ÆËã»ú¼¶±ðµÄ¼ÆËã¡£2003ºÍ2004Ä꣬Á½¸öÀ´×ÔGoogleµÄ¹ÛµãʹHadoop³ÉΪ¿ÉÄÜ£ºÒ»¸ö·Ö²¼Ê½´æ´¢¿ò¼Ü(GoogleÎļþϵͳ)£¬ÔÚHadoopÖб»ÊµÏÖΪHDFS£»Ò»¸ö·Ö²¼Ê½¼ÆËã¿ò¼Ü(MapReduce)¡£
ÕâÁ½¸ö¹Ûµã³ÉΪ¹ýȥʮÄê¹æÄ£·ÖÎö£¨scaling analytics£©¡¢´ó¹æÄ£»úÆ÷ѧϰ£¨machine
learning£©£¬ÒÔ¼°ÆäËû´óÊý¾ÝÓ¦ÓóöÏÖµÄÖ÷ÒªÍÆ¶¯Á¦£¡µ«ÊÇ£¬´Ó¼¼Êõ½Ç¶ÈÉϽ²£¬Ê®ÄêÊÇÒ»¶Î·Ç³£³¤µÄʱ¼ä£¬¶øÇÒHadoop»¹´æÔںܶàÒÑÖªÏÞÖÆ£¬ÓÈÆäÊÇMapReduce¡£¶ÔMapReduce±à³ÌÃ÷ÏÔÊÇÀ§Äѵġ£¶Ô´ó¶àÊý·ÖÎö£¬Äã¶¼±ØÐëÓúܶಽÖ轫MapºÍReduceÈÎÎñ´®½ÓÆðÀ´¡£ÕâÔì³ÉÀàSQLµÄ¼ÆËã»ò»úÆ÷ѧϰÐèҪרÃŵÄϵͳÀ´½øÐС£¸üÔãµÄÊÇ£¬MapReduceÒªÇóÿ¸ö²½Öè¼äµÄÊý¾ÝÒªÐòÁл¯µ½´ÅÅÌ£¬ÕâÒâζ×ÅMapReduce×÷ÒµµÄI/O³É±¾ºÜ¸ß£¬µ¼Ö½»»¥·ÖÎöºÍµü´úËã·¨£¨iterative
algorithms£©¿ªÏúºÜ´ó£»¶øÊÂʵÊÇ£¬¼¸ºõËùÓеÄ×îÓÅ»¯ºÍ»úÆ÷ѧϰ¶¼Êǵü´úµÄ¡£
ΪÁ˽â¾öÕâЩÎÊÌ⣬HadoopÒ»Ö±ÔÚÏòÒ»ÖÖ¸üΪͨÓõÄ×ÊÔ´¹ÜÀí¿ò¼Üת±ä£¬¼´YARN£¨Yet Another
Resource Negotiator, ÓÖÒ»¸ö×ÊԴе÷Õߣ©¡£YARNʵÏÖÁËÏÂÒ»´úµÄMapReduce£¬µ«Í¬Ê±Ò²ÔÊÐíÓ¦ÓÃÀûÓ÷ֲ¼Ê½×ÊÔ´¶ø²»±Ø²ÉÓÃMapReduce½øÐмÆË㡣ͨ¹ý½«¼¯Èº¹ÜÀíÒ»°ã»¯£¬Ñо¿×ªµ½·Ö²¼Ê½¼ÆËãµÄÒ»°ã»¯ÉÏ£¬À´À©Õ¹ÁËMapReduceµÄ³õÖÔ¡£
SparkÊǵÚÒ»¸öÍÑÌ¥ÓÚ¸Ãת±äµÄ¿ìËÙ¡¢Í¨Ó÷ֲ¼Ê½¼ÆË㷶ʽ£¬²¢ÇҺܿìÁ÷ÐÐÆðÀ´¡£SparkʹÓú¯Êýʽ±à³Ì·¶Ê½À©Õ¹ÁËMapReduceÄ£ÐÍÒÔÖ§³Ö¸ü¶à¼ÆËãÀàÐÍ£¬¿ÉÒÔº¸Ç¹ã·ºµÄ¹¤×÷Á÷£¬ÕâЩ¹¤×÷Á÷֮ǰ±»ÊµÏÖΪHadoopÖ®ÉϵÄÌØÊâϵͳ¡£SparkʹÓÃÄڴ滺´æÀ´ÌáÉýÐÔÄÜ£¬Òò´Ë½øÐн»»¥Ê½·ÖÎöÒ²×ã¹»¿ìËÙ(¾ÍÈçͬʹÓÃPython½âÊÍÆ÷£¬Ó뼯Ⱥ½øÐн»»¥Ò»Ñù)¡£»º´æÍ¬Ê±ÌáÉýÁ˵ü´úËã·¨µÄÐÔÄÜ£¬ÕâʹµÃSpark·Ç³£ÊʺÏÊý¾ÝÀíÂÛÈÎÎñ£¬ÌرðÊÇ»úÆ÷ѧϰ¡£
±¾ÎÄÖУ¬ÎÒÃǽ«Ê×ÏÈÌÖÂÛÈçºÎÔÚ±¾µØ»úÆ÷ÉÏ»òÕßEC2µÄ¼¯ÈºÉÏÉèÖÃSpark½øÐмòµ¥·ÖÎö¡£È»ºó£¬ÎÒÃÇÔÚÈëÃż¶Ë®Æ½Ì½Ë÷Spark£¬Á˽âSparkÊÇʲôÒÔ¼°ËüÈçºÎ¹¤×÷£¨Ï£Íû¿ÉÒÔ¼¤·¢¸ü¶à̽Ë÷£©¡£×îºóÁ½½ÚÎÒÃÇ¿ªÊ¼Í¨¹ýÃüÁîÐÐÓëSpark½øÐн»»¥£¬È»ºóÑÝʾÈçºÎÓÃPythonдSparkÓ¦Ó㬲¢×÷ΪSpark×÷ÒµÌá½»µ½¼¯ÈºÉÏ¡£
ÉèÖÃSpark
ÔÚ±¾»úÉèÖúÍÔËÐÐSpark·Ç³£¼òµ¥¡£ÄãÖ»ÐèÒªÏÂÔØÒ»¸öÔ¤¹¹½¨µÄ°ü£¬Ö»ÒªÄã°²×°ÁËJava 6+ºÍPython
2.6+£¬¾Í¿ÉÒÔÔÚWindows¡¢Mac OS XºÍLinuxÉÏÔËÐÐSpark¡£È·±£java³ÌÐòÔÚPATH»·¾³±äÁ¿ÖУ¬»òÕßÉèÖÃÁËJAVA_HOME»·¾³±äÁ¿¡£ÀàËÆµÄ£¬pythonÒ²ÒªÔÚPATHÖС£
¼ÙÉèÄãÒѾ°²×°ÁËJavaºÍPython£º
1.·ÃÎÊSparkÏÂÔØÒ³
2.Ñ¡ÔñSpark×îз¢²¼°æ(±¾ÎÄд×÷ʱÊÇ1.2.0)£¬Ò»¸öÔ¤¹¹½¨µÄHadoop
2.4°ü£¬Ö±½ÓÏÂÔØ¡£
ÏÖÔÚ£¬ÈçºÎ¼ÌÐøÒÀÀµÓÚÄãµÄ²Ù×÷ϵͳ£¬¿¿Äã×Ô¼ºÈ¥Ì½Ë÷ÁË¡£WindowsÓû§¿ÉÒÔÔÚÆÀÂÛÇø¶ÔÈçºÎÉèÖõÄÌáʾ½øÐÐÆÀÂÛ¡£
Ò»°ã£¬ÎҵĽ¨ÒéÊǰ´ÕÕÏÂÃæµÄ²½Öè(ÔÚPOSIX²Ù×÷ϵͳÉÏ)£º
1.½âѹSpark
~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz |
2.½«½âÑ¹Ä¿Â¼ÒÆ¶¯µ½ÓÐЧӦÓóÌÐòĿ¼ÖÐ(ÈçWindowsÉϵÄ
~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0 |
3.´´½¨Ö¸Ïò¸ÃSpark°æ±¾µÄ·ûºÅÁ´½Óµ½<sparkĿ¼¡£ÕâÑùÄã¿ÉÒÔ¼òµ¥µØÏÂÔØÐÂ/¾É°æ±¾µÄSpark£¬È»ºóÐÞ¸ÄÁ´½ÓÀ´¹ÜÀíSpark°æ±¾£¬¶ø²»Óøü¸Ä·¾¶»ò»·¾³±äÁ¿¡£
~$ ln -s /srv/spark-1.2.0 /srv/spark |
4.ÐÞ¸ÄBASHÅäÖ㬽«SparkÌí¼Óµ½PATHÖУ¬ÉèÖÃSPARK_HOME»·¾³±äÁ¿¡£ÕâЩС¼¼ÇÉÔÚÃüÁîÐÐÉÏ»á°ïµ½Äã¡£ÔÚUbuntuÉÏ£¬Ö»Òª±à¼~/.bash_profile»ò~/.profileÎļþ£¬½«ÒÔÏÂÓï¾äÌí¼Óµ½ÎļþÖУº
export SPARK_HOME=/srv/spark export PATH=$SPARK_HOME/bin:$PATH |
5.sourceÕâЩÅäÖ㨻òÕßÖØÆôÖÕ¶Ë£©Ö®ºó£¬Äã¾Í¿ÉÒÔÔÚ±¾µØÔËÐÐÒ»¸öpyspark½âÊÍÆ÷¡£Ö´ÐÐpysparkÃüÁÄã»á¿´µ½ÒÔϽá¹û£º
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [¡ snip ¡] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>> |
ÏÖÔÚSparkÒѾ°²×°Íê±Ï£¬¿ÉÒÔÔÚ±¾»úÒÔ¡±µ¥»úģʽ¡°£¨standalone mode£©Ê¹Óá£Äã¿ÉÒÔÔÚ±¾»ú¿ª·¢Ó¦Óò¢Ìá½»Spark×÷Òµ£¬ÕâЩ×÷Òµ½«ÒÔ¶à½ø³Ì/¶àÏß³ÌģʽÔËÐе쬻òÕߣ¬ÅäÖøûúÆ÷×÷Ϊһ¸ö¼¯ÈºµÄ¿Í»§¶Ë£¨²»ÍƼöÕâÑù×ö£¬ÒòΪÔÚSpark×÷ÒµÖУ¬Çý¶¯³ÌÐò(driver)ÊǸöºÜÖØÒªµÄ½ÇÉ«£¬²¢ÇÒÓ¦¸ÃÓ뼯ȺµÄÆäËû²¿·Ö´¦ÓÚÏàÍ¬ÍøÂ磩¡£¿ÉÄܳýÁË¿ª·¢£¬ÄãÔÚ±¾»úʹÓÃSpark×öµÃ×î¶àµÄ¾ÍÊÇÀûÓÃspark-ec2½Å±¾À´ÅäÖÃAmazonÔÆÉϵÄÒ»¸öEC2
Spark¼¯ÈºÁË¡£
¼òÂÔSparkÊä³ö
Spark£¨ºÍPySpark£©µÄÖ´ÐпÉÒÔÌØ±ðÏêϸ£¬ºÜ¶àINFOÈÕÖ¾ÏûÏ¢¶¼»á´òÓ¡µ½ÆÁÄ»¡£¿ª·¢¹ý³ÌÖУ¬ÕâЩ·Ç³£ÄÕÈË£¬ÒòΪ¿ÉÄܶªÊ§PythonÕ»¸ú×Ù»òÕßprintµÄÊä³ö¡£ÎªÁ˼õÉÙSparkÊä³ö
¨C Äã¿ÉÒÔÉèÖÃ$SPARK_HOME/confϵÄlog4j¡£Ê×ÏÈ£¬¿½±´Ò»·Ý$SPARK_HOME/conf/log4j.properties.templateÎļþ£¬È¥µô¡°.template¡±À©Õ¹Ãû¡£
~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties |
±à¼ÐÂÎļþ£¬ÓÃWARNÌæ»»´úÂëÖгöÏÖµÄINFO¡£ÄãµÄlog4j.propertiesÎļþÀàËÆ£º
# Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN |
ÏÖÔÚÔËÐÐPySpark£¬Êä³öÏûÏ¢½«»á¸ü¼òÂÔ£¡¸Ðл@genomegeekÔÚÒ»´ÎDistrict Data
LabsµÄÑÐÌÖ»áÖÐÖ¸³öÕâÒ»µã¡£
ÔÚSparkÖÐʹÓÃIPython Notebook
µ±ËÑË÷ÓÐÓõÄSparkС¼¼ÇÉʱ£¬ÎÒ·¢ÏÖÁËһЩÎÄÕÂÌáµ½ÔÚPySparkÖÐÅäÖÃIPython notebook¡£IPython
notebook¶ÔÊý¾Ý¿ÆÑ§¼ÒÀ´ËµÊǸö½»»¥µØ³ÊÏÖ¿ÆÑ§ºÍÀíÂÛ¹¤×÷µÄ±Ø±¸¹¤¾ß£¬Ëü¼¯³ÉÁËÎı¾ºÍPython´úÂë¡£¶ÔºÜ¶àÊý¾Ý¿ÆÑ§¼Ò£¬IPython
notebookÊÇËûÃǵÄPythonÈëÃÅ£¬²¢ÇÒʹÓ÷dz£¹ã·º£¬ËùÒÔÎÒÏëÖµµÃÔÚ±¾ÎÄÖÐÌá¼°¡£
ÕâÀïµÄ´ó²¿·Ö˵Ã÷¶¼À´¸Ä±à×ÔIPython notebook: ÔÚPySparkÖÐÉèÖÃIPython¡£µ«ÊÇ£¬ÎÒÃǽ«¾Û½¹ÔÚ±¾»úÒÔµ¥»úģʽ½«IPtyon
shellÁ¬½Óµ½PySpark£¬¶ø²»ÊÇÔÚEC2¼¯Èº¡£Èç¹ûÄãÏëÔÚÒ»¸ö¼¯ÈºÉÏʹÓÃPySpark/IPython£¬²é¿´²¢ÆÀÂÛÏÂÎĵÄ˵Ã÷°É£¡
1.ΪSpark´´½¨Ò»¸öiPython notebookÅäÖÃ
~$ ipython profile create spark [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py' [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py' [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py' |
¼ÇסÅäÖÃÎļþµÄλÖã¬Ìæ»»ÏÂÎĸ÷²½ÖèÏàÓ¦µÄ·¾¶£º
2.´´½¨Îļþ$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py£¬²¢Ìí¼ÓÈçÏ´úÂ룺
import os import sys # Configure the environment if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME'] = '/srv/spark' # Create a variable for our root path SPARK_HOME = os.environ['SPARK_HOME'] # Add the PySpark/py4j to the Python Path sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build")) sys.path.insert(0, os.path.join(SPARK_HOME, "python")) |
3.ʹÓÃÎÒÃǸոմ´½¨µÄÅäÖÃÀ´Æô¶¯IPython notebook¡£
~$ ipython notebook --profile spark |
4.ÔÚnotebookÖУ¬ÄãÓ¦¸ÃÄÜ¿´µ½ÎÒÃǸոմ´½¨µÄ±äÁ¿¡£
5.ÔÚIPython notebook×îÉÏÃæ£¬È·±£ÄãÌí¼ÓÁËSpark context¡£
from pyspark import SparkContext sc = SparkContext( 'local', 'pyspark') |
6.ʹÓÃIPython×ö¸ö¼òµ¥µÄ¼ÆËãÀ´²âÊÔSpark context¡£
def isprime(n): """ check if integer n is a prime """ # make sure n is a positive integer n = abs(int(n)) # 0 and 1 are not primes if n < 2: return False # 2 is the only even prime number if n == 2: return True # all other even numbers are not primes if not n & 1: return False # range starts with 3 and only needs to go up the square root of n # for all odd numbers for x in range(3, int(n**0.5)+1, 2): if n % x == 0: return False return True # Create an RDD of numbers from 0 to 1,000,000 nums = sc.parallelize(xrange(1000000)) # Compute the number of primes in the RDD print nums.filter(isprime).count() |
Èç¹ûÄãÄܵõ½Ò»¸öÊý×Ö¶øÇÒûÓдíÎó·¢Éú£¬ÄÇôÄãµÄcontextÕýÈ·¹¤×÷ÁË£¡
±à¼Ìáʾ£ºÉÏÃæÅäÖÃÁËÒ»¸öʹÓÃPySparkÖ±½Óµ÷ÓÃIPython notebookµÄIPython context¡£µ«ÊÇ£¬ÄãÒ²¿ÉÒÔʹÓÃPySpark°´ÒÔÏ·½Ê½Ö±½ÓÆô¶¯Ò»¸önotebook£º
$ IPYTHON_OPTS=¡±notebook ¨Cpylab inline¡± pyspark
Äĸö·½·¨ºÃÓÃÈ¡¾öÓÚÄãʹÓÃPySparkºÍIPythonµÄ¾ßÌåÇé¾°¡£Ç°Ò»¸öÔÊÐíÄã¸üÈÝÒ×µØÊ¹ÓÃIPython
notebookÁ¬½Óµ½Ò»¸ö¼¯Èº£¬Òò´ËÊÇÎÒϲ»¶µÄ·½·¨¡£
ÔÚEC2ÉÏʹÓÃSpark
ÔÚ½²ÊÚʹÓÃHadoop½øÐзֲ¼Ê½¼ÆËãʱ£¬ÎÒ·¢Ïֺܶà¿ÉÒÔͨ¹ýÔÚ±¾µØÎ±·Ö²¼Ê½½Úµã£¨pseudo-distributed
node£©»òÒÔµ¥½Úµãģʽ£¨single-node mode£©½²ÊÚ¡£µ«ÊÇΪÁËÁ˽âÕæÕý·¢ÉúÁËʲô£¬¾ÍÐèÒªÒ»¸ö¼¯Èº¡£µ±Êý¾Ý±äµÃÅÓ´ó£¬ÕâЩÊéÃæ½²Êڵļ¼ÄܺÍÕæÊµ¼ÆËãÐèÇó¼ä¾³£³öÏÖ¸ôĤ¡£Èç¹ûÄã¿ÏÔÚѧϰÏêϸʹÓÃSparkÉÏ»¨Ç®£¬ÎÒ½¨ÒéÄãÉèÖÃÒ»¸ö¿ìËÙSpark¼¯Èº×ö×öʵÑé¡£
°üº¬5¸öslave£¨ºÍ1¸ömaster£©Ã¿ÖÜ´ó¸ÅʹÓÃ10СʱµÄ¼¯ÈºÃ¿Ô´ó¸ÅÐèÒª$45.18¡£
ÍêÕûµÄÌÖÂÛ¿ÉÒÔÔÚSparkÎĵµÖÐÕÒµ½£ºÔÚEC2ÉÏÔËÐÐSparkÔÚÄã¾ö¶¨¹ºÂòEC2¼¯ÈºÇ°Ò»¶¨ÒªÍ¨¶ÁÕâÆªÎĵµ£¡ÎÒÁгöÁËһЩ¹Ø¼üµã£º
ͨ¹ýAWS Console»ñÈ¡AWS EC2 key¶Ô£¨·ÃÎÊkeyºÍÃÜÔ¿key£©¡£
½«key¶Ôµ¼³öµ½ÄãµÄ»·¾³ÖС£ÔÚshellÖÐÇóöÒÔÏÂÃüÁ»òÕß½«ËüÃÇÌí¼Óµ½ÅäÖÃÖС£
export AWS_ACCESS_KEY_ID=myaccesskeyid export AWS_SECRET_ACCESS_KEY=mysecretaccesskey |
×¢ÒⲻͬµÄ¹¤¾ßʹÓò»Í¬µÄ»·¾³Ãû³Æ£¬È·±£ÄãÓõÄÊÇSpark½Å±¾ËùʹÓõÄÃû³Æ¡£
3.Æô¶¯¼¯Èº£º
~$ cd $SPARK_HOME/ec2 ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name> |
4.SSHµ½¼¯ÈºÀ´ÔËÐÐSpark×÷Òµ¡£
ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name> |
5.Ïú»Ù¼¯Èº
ec2$ ./spark-ec2 destroy <cluster-name>. |
ÕâЩ½Å±¾»á×Ô¶¯´´½¨Ò»¸ö±¾µØµÄHDFS¼¯ÈºÀ´Ìí¼ÓÊý¾Ý£¬copy-dirÃüÁî¿ÉÒÔͬ²½´úÂëºÍÊý¾Ýµ½¸Ã¼¯Èº¡£µ«ÊÇÄã×îºÃʹÓÃS3À´´æ´¢Êý¾Ý£¬´´½¨Ê¹ÓÃs3://URIÀ´¼ÓÔØÊý¾ÝµÄRDDs¡£
SparkÊÇʲô£¿
¼ÈÈ»ÉèÖúÃÁËSpark£¬ÏÖÔÚÎÒÃÇÌÖÂÛÏÂSparkÊÇʲô¡£SparkÊǸöͨÓõļ¯Èº¼ÆËã¿ò¼Ü£¬Í¨¹ý½«´óÁ¿Êý¾Ý¼¯¼ÆËãÈÎÎñ·ÖÅäµ½¶ą̀¼ÆËã»úÉÏ£¬Ìṩ¸ßЧÄÚ´æ¼ÆËã¡£Èç¹ûÄãÊìϤHadoop£¬ÄÇôÄãÖªµÀ·Ö²¼Ê½¼ÆËã¿ò¼ÜÒª½â¾öÁ½¸öÎÊÌ⣺ÈçºÎ·Ö·¢Êý¾ÝºÍÈçºÎ·Ö·¢¼ÆËã¡£HadoopʹÓÃHDFSÀ´½â¾ö·Ö²¼Ê½Êý¾ÝÎÊÌ⣬MapReduce¼ÆË㷶ʽÌṩÓÐЧµÄ·Ö²¼Ê½¼ÆËã¡£ÀàËÆµÄ£¬SparkÓµÓжàÖÖÓïÑԵĺ¯Êýʽ±à³ÌAPI£¬ÌṩÁ˳ýmapºÍreduceÖ®Íâ¸ü¶àµÄÔËËã·û£¬ÕâЩ²Ù×÷ÊÇͨ¹ýÒ»¸ö³Æ×÷µ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯(resilient
distributed datasets, RDDs)µÄ·Ö²¼Ê½Êý¾Ý¿ò¼Ü½øÐеġ£
±¾ÖÊÉÏ£¬RDDÊÇÖÖ±à³Ì³éÏ󣬴ú±í¿ÉÒÔ¿ç»úÆ÷½øÐзָîµÄÖ»¶Á¶ÔÏ󼯺ϡ£RDD¿ÉÒÔ´ÓÒ»¸ö¼Ì³Ð½á¹¹£¨lineage£©Öؽ¨£¨Òò´Ë¿ÉÒÔÈÝ´í£©£¬Í¨¹ý²¢ÐвÙ×÷·ÃÎÊ£¬¿ÉÒÔ¶ÁдHDFS»òS3ÕâÑùµÄ·Ö²¼Ê½´æ´¢£¬¸üÖØÒªµÄÊÇ£¬¿ÉÒÔ»º´æµ½worker½ÚµãµÄÄÚ´æÖнøÐÐÁ¢¼´ÖØÓá£ÓÉÓÚRDD¿ÉÒÔ±»»º´æÔÚÄÚ´æÖУ¬Spark¶Ôµü´úÓ¦ÓÃÌØ±ðÓÐЧ£¬ÒòΪÕâЩӦÓÃÖУ¬Êý¾ÝÊÇÔÚÕû¸öËã·¨ÔËËã¹ý³ÌÖж¼¿ÉÒÔ±»ÖØÓᣴó¶àÊý»úÆ÷ѧϰºÍ×îÓÅ»¯Ëã·¨¶¼Êǵü´úµÄ£¬Ê¹µÃSpark¶ÔÊý¾Ý¿ÆÑ§À´ËµÊǸö·Ç³£ÓÐЧµÄ¹¤¾ß¡£ÁíÍ⣬ÓÉÓÚSpark·Ç³£¿ì£¬¿ÉÒÔͨ¹ýÀàËÆPython
REPLµÄÃüÁîÐÐÌáʾ·û½»»¥Ê½·ÃÎÊ¡£
Spark¿â±¾Éí°üº¬ºÜ¶àÓ¦ÓÃÔªËØ£¬ÕâÐ©ÔªËØ¿ÉÒÔÓõ½´ó²¿·Ö´óÊý¾ÝÓ¦ÓÃÖУ¬ÆäÖаüÀ¨¶Ô´óÊý¾Ý½øÐÐÀàËÆSQL²éѯµÄÖ§³Ö£¬»úÆ÷ѧϰºÍͼËã·¨£¬ÉõÖÁ¶ÔʵʱÁ÷Êý¾ÝµÄÖ§³Ö¡£
ºËÐÄ×é¼þÈçÏ£º
Spark Core£º°üº¬SparkµÄ»ù±¾¹¦ÄÜ£»ÓÈÆäÊǶ¨ÒåRDDµÄAPI¡¢²Ù×÷ÒÔ¼°ÕâÁ½ÕßÉϵ͝×÷¡£ÆäËûSparkµÄ¿â¶¼Êǹ¹½¨ÔÚRDDºÍSpark
CoreÖ®Éϵġ£
Spark SQL£ºÌṩͨ¹ýApache HiveµÄSQL±äÌåHive²éѯÓïÑÔ£¨HiveQL£©ÓëSpark½øÐн»»¥µÄAPI¡£Ã¿¸öÊý¾Ý¿â±í±»µ±×öÒ»¸öRDD£¬Spark
SQL²éѯ±»×ª»»ÎªSpark²Ù×÷¡£¶ÔÊìϤHiveºÍHiveQLµÄÈË£¬Spark¿ÉÒÔÄÃÀ´¾ÍÓá£
Spark Streaming£ºÔÊÐí¶ÔʵʱÊý¾ÝÁ÷½øÐд¦ÀíºÍ¿ØÖÆ¡£ºÜ¶àʵʱÊý¾Ý¿â£¨ÈçApache
Store£©¿ÉÒÔ´¦ÀíʵʱÊý¾Ý¡£Spark StreamingÔÊÐí³ÌÐòÄܹ»ÏñÆÕͨRDDÒ»Ñù´¦ÀíʵʱÊý¾Ý¡£
MLlib£ºÒ»¸ö³£ÓûúÆ÷ѧϰËã·¨¿â£¬Ëã·¨±»ÊµÏÖΪ¶ÔRDDµÄSpark²Ù×÷¡£Õâ¸ö¿â°üº¬¿ÉÀ©Õ¹µÄѧϰËã·¨£¬±ÈÈç·ÖÀà¡¢»Ø¹éµÈÐèÒª¶Ô´óÁ¿Êý¾Ý¼¯½øÐеü´úµÄ²Ù×÷¡£Ö®Ç°¿ÉÑ¡µÄ´óÊý¾Ý»úÆ÷ѧϰ¿âMahout£¬½«»áתµ½Spark£¬²¢ÔÚδÀ´ÊµÏÖ¡£
GraphX£º¿ØÖÆÍ¼¡¢²¢ÐÐͼ²Ù×÷ºÍ¼ÆËãµÄÒ»×éËã·¨ºÍ¹¤¾ßµÄ¼¯ºÏ¡£GraphXÀ©Õ¹ÁËRDD
API£¬°üº¬¿ØÖÆÍ¼¡¢´´½¨×Óͼ¡¢·ÃÎÊ·¾¶ÉÏËùÓж¥µãµÄ²Ù×÷¡£
ÓÉÓÚÕâЩ×é¼þÂú×ãÁ˺ܶà´óÊý¾ÝÐèÇó£¬Ò²Âú×ãÁ˺ܶàÊý¾Ý¿ÆÑ§ÈÎÎñµÄËã·¨ºÍ¼ÆËãÉϵÄÐèÒª£¬Spark¿ìËÙÁ÷ÐÐÆðÀ´¡£²»½öÈç´Ë£¬SparkÒ²ÌṩÁËʹÓÃScala¡¢JavaºÍPython±àдµÄAPI£»Âú×ãÁ˲»Í¬ÍÅÌåµÄÐèÇó£¬ÔÊÐí¸ü¶àÊý¾Ý¿ÆÑ§¼Ò¼ò±ãµØ²ÉÓÃSpark×÷ΪËûÃǵĴóÊý¾Ý½â¾ö·½°¸¡£
¶ÔSpark±à³Ì
±àдSparkÓ¦ÓÃÓë֮ǰʵÏÖÔÚHadoopÉÏµÄÆäËûÊý¾ÝÁ÷ÓïÑÔÀàËÆ¡£´úÂëдÈëÒ»¸ö¶èÐÔÇóÖµµÄÇý¶¯³ÌÐò£¨driver
program£©ÖУ¬Í¨¹ýÒ»¸ö¶¯×÷£¨action£©£¬Çý¶¯´úÂë±»·Ö·¢µ½¼¯ÈºÉÏ£¬Óɸ÷¸öRDD·ÖÇøÉϵÄworkerÀ´Ö´ÐС£È»ºó½á¹û»á±»·¢ËÍ»ØÇý¶¯³ÌÐò½øÐоۺϻò±àÒë¡£±¾ÖÊÉÏ£¬Çý¶¯³ÌÐò´´½¨Ò»¸ö»ò¶à¸öRDD£¬µ÷ÓòÙ×÷À´×ª»»RDD£¬È»ºóµ÷Óö¯×÷´¦Àí±»×ª»»ºóµÄRDD¡£
ÕâЩ²½Öè´óÌåÈçÏ£º
¶¨ÒåÒ»¸ö»ò¶à¸öRDD£¬¿ÉÒÔͨ¹ý»ñÈ¡´æ´¢ÔÚ´ÅÅÌÉϵÄÊý¾Ý£¨HDFS£¬Cassandra£¬HBase£¬Local
Disk£©£¬²¢Ðл¯ÄÚ´æÖеÄijЩ¼¯ºÏ£¬×ª»»£¨transform£©Ò»¸öÒÑ´æÔÚµÄRDD£¬»òÕߣ¬»º´æ»ò±£´æ¡£
ͨ¹ý´«µÝÒ»¸ö±Õ°ü£¨º¯Êý£©¸øRDDÉϵÄÿ¸öÔªËØÀ´µ÷ÓÃRDDÉϵIJÙ×÷¡£SparkÌṩÁ˳ýÁËMapºÍReduceµÄ80¶àÖָ߼¶²Ù×÷¡£
ʹÓýá¹ûRDDµÄ¶¯×÷£¨action£©£¨Èçcount¡¢collect¡¢saveµÈ£©¡£¶¯×÷½«»áÆô¶¯¼¯ÈºÉϵļÆËã¡£
µ±SparkÔÚÒ»¸öworkerÉÏÔËÐбհüʱ£¬±Õ°üÖÐÓõ½µÄËùÓбäÁ¿¶¼»á±»¿½±´µ½½ÚµãÉÏ£¬µ«ÊÇÓɱհüµÄ¾Ö²¿×÷ÓÃÓòÀ´Î¬»¤¡£SparkÌṩÁËÁ½ÖÖÀàÐ͵Ĺ²Ïí±äÁ¿£¬ÕâЩ±äÁ¿¿ÉÒÔ°´ÕÕÏÞ¶¨µÄ·½Ê½±»ËùÓÐworker·ÃÎÊ¡£¹ã²¥±äÁ¿»á±»·Ö·¢¸øËùÓÐworker£¬µ«ÊÇÊÇÖ»¶ÁµÄ¡£ÀÛ¼ÓÆ÷ÕâÖÖ±äÁ¿£¬worker¿ÉÒÔʹÓùØÁª²Ù×÷À´¡°¼Ó¡±£¬Í¨³£ÓÃ×÷¼ÆÊýÆ÷¡£
SparkÓ¦Óñ¾ÖÊÉÏͨ¹ýת»»ºÍ¶¯×÷À´¿ØÖÆRDD¡£ºóÐøÎÄÕ½«»áÉîÈëÌÖÂÛ£¬µ«ÊÇÀí½âÁËÕâ¸ö¾Í×ãÒÔÖ´ÐÐÏÂÃæµÄÀý×ÓÁË¡£
SparkµÄÖ´ÐÐ
¼òÂÔÃèÊöÏÂSparkµÄÖ´ÐС£±¾ÖÊÉÏ£¬SparkÓ¦ÓÃ×÷Ϊ¶ÀÁ¢µÄ½ø³ÌÔËÐУ¬ÓÉÇý¶¯³ÌÐòÖеÄSparkContextе÷¡£Õâ¸öcontext½«»áÁ¬½Óµ½Ò»Ð©¼¯Èº¹ÜÀíÕߣ¨ÈçYARN£©£¬ÕâЩ¹ÜÀíÕß·ÖÅäϵͳ×ÊÔ´¡£¼¯ÈºÉϵÄÿ¸öworkerÓÉÖ´ÐÐÕߣ¨executor£©¹ÜÀí£¬Ö´ÐÐÕß·´¹ýÀ´ÓÉSparkContext¹ÜÀí¡£Ö´ÐÐÕß¹ÜÀí¼ÆËã¡¢´æ´¢£¬»¹ÓÐÿ̨»úÆ÷ÉϵĻº´æ¡£
ÖØµãÒª¼ÇסµÄÊÇÓ¦ÓôúÂëÓÉÇý¶¯³ÌÐò·¢Ë͸øÖ´ÐÐÕߣ¬Ö´ÐÐÕßÖ¸¶¨contextºÍÒªÔËÐеÄÈÎÎñ¡£Ö´ÐÐÕßÓëÇý¶¯³ÌÐòͨÐŽøÐÐÊý¾Ý·ÖÏí»òÕß½»»¥¡£Çý¶¯³ÌÐòÊÇSpark×÷ÒµµÄÖ÷Òª²ÎÓëÕߣ¬Òò´ËÐèÒªÓ뼯Ⱥ´¦ÓÚÏàͬµÄÍøÂç¡£ÕâÓëHadoop´úÂ벻ͬ£¬HadoopÖÐÄã¿ÉÒÔÔÚÈÎÒâλÖÃÌá½»×÷Òµ¸øJobTracker£¬JobTracker´¦Àí¼¯ÈºÉϵÄÖ´ÐС£
ÓëSpark½»»¥
ʹÓÃSpark×î¼òµ¥µÄ·½Ê½¾ÍÊÇʹÓý»»¥Ê½ÃüÁîÐÐÌáʾ·û¡£´ò¿ªPySparkÖÕ¶Ë£¬ÔÚÃüÁîÐÐÖдò³öpyspark¡£
~$ pyspark [¡ snip ¡] >>> |
PySpark½«»á×Ô¶¯Ê¹Óñ¾µØSparkÅäÖô´½¨Ò»¸öSparkContext¡£Äã¿ÉÒÔͨ¹ýsc±äÁ¿À´·ÃÎÊËü¡£ÎÒÃÇÀ´´´½¨µÚÒ»¸öRDD¡£
>>> text = sc.textFile("shakespeare.txt") >>> print text shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 |
textFile·½·¨½«É¯Ê¿±ÈÑÇÈ«²¿×÷Æ·¼ÓÔØµ½Ò»¸öRDDÃüÃûÎı¾¡£Èç¹û²é¿´ÁËRDD£¬Äã¾Í¿ÉÒÔ¿´³öËüÊǸöMappedRDD£¬Îļþ·¾¶ÊÇÏà¶ÔÓÚµ±Ç°¹¤×÷Ŀ¼µÄÒ»¸öÏà¶Ô·¾¶£¨¼ÇµÃ´«µÝ´ÅÅÌÉÏÕýÈ·µÄshakespear.txtÎļþ·¾¶£©¡£ÎÒÃÇת»»ÏÂÕâ¸öRDD£¬À´½øÐзֲ¼Ê½¼ÆËãµÄ¡°hello
world¡±£º¡°×ÖÊýͳ¼Æ¡±¡£
>>> from operator import add >>> def tokenize(text): ... return text.split() ... >>> words = text.flatMap(tokenize) >>> print words PythonRDD[2] at RDD at PythonRDD.scala:43 |
ÎÒÃÇÊ×Ïȵ¼ÈëÁËadd²Ù×÷·û£¬ËüÊǸöÃüÃûº¯Êý£¬¿ÉÒÔ×÷Ϊ¼Ó·¨µÄ±Õ°üÀ´Ê¹Óá£ÎÒÃÇÉÔºóÔÙʹÓÃÕâ¸öº¯Êý¡£Ê×ÏÈÎÒÃÇÒª×öµÄÊǰÑÎı¾²ð·ÖΪµ¥´Ê¡£ÎÒÃÇ´´½¨ÁËÒ»¸ötokenizeº¯Êý£¬²ÎÊýÊÇÎı¾Æ¬¶Î£¬·µ»Ø¸ù¾Ý¿Õ¸ñ²ð·ÖµÄµ¥´ÊÁÐ±í¡£È»ºóÎÒÃÇͨ¹ý¸øflatMap²Ù×÷·û´«µÝtokenize±Õ°ü¶ÔtextRDD½øÐб任´´½¨ÁËÒ»¸öwordsRDD¡£Äã»á·¢ÏÖ£¬wordsÊǸöPythonRDD£¬µ«ÊÇÖ´Ðб¾Ó¦¸ÃÁ¢¼´½øÐС£ÏÔÈ»£¬ÎÒÃÇ»¹Ã»ÓаÑÕû¸öɯʿ±ÈÑÇÊý¾Ý¼¯²ð·ÖΪµ¥´ÊÁÐ±í¡£
Èç¹ûÄãÔøÊ¹ÓÃMapReduce×ö¹ýHadoop°æµÄ¡°×ÖÊýͳ¼Æ¡±£¬ÄãÓ¦¸ÃÖªµÀÏÂÒ»²½Êǽ«Ã¿¸öµ¥´ÊÓ³Éäµ½Ò»¸ö¼üÖµ¶Ô£¬ÆäÖмüÊǵ¥´Ê£¬ÖµÊÇ1£¬È»ºóʹÓÃreducer¼ÆËãÿ¸ö¼üµÄ1×ÜÊý¡£
Ê×ÏÈ£¬ÎÒÃÇmapһϡ£
>>> wc = words.map(lambda x: (x,1)) >>> print wc.toDebugString() (2) PythonRDD[3] at RDD at PythonRDD.scala:43 | shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 | shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 |
ÎÒʹÓÃÁËÒ»¸öÄäÃûº¯Êý£¨ÓÃÁËPythonÖеÄlambda¹Ø¼ü×Ö£©¶ø²»ÊÇÃüÃûº¯Êý¡£ÕâÐдúÂ뽫»á°ÑlambdaÓ³É䵽ÿ¸öµ¥´Ê¡£Òò´Ë£¬Ã¿¸öx¶¼ÊÇÒ»¸öµ¥´Ê£¬Ã¿¸öµ¥´Ê¶¼»á±»ÄäÃû±Õ°üת»»ÎªÔª×é(word,
1)¡£ÎªÁ˲鿴ת»»¹ØÏµ£¬ÎÒÃÇʹÓÃtoDebugString·½·¨À´²é¿´PipelinedRDDÊÇÔõô±»×ª»»µÄ¡£¿ÉÒÔʹÓÃreduceByKey¶¯×÷½øÐÐ×ÖÊýͳ¼Æ£¬È»ºó°Ñͳ¼Æ½á¹ûдµ½´ÅÅÌ¡£
>>> counts = wc.reduceByKey(add) >>> counts.saveAsTextFile("wc") |
Ò»µ©ÎÒÃÇ×îÖÕµ÷ÓÃÁËsaveAsTextFile¶¯×÷£¬Õâ¸ö·Ö²¼Ê½×÷Òµ¾Í¿ªÊ¼Ö´ÐÐÁË£¬ÔÚ×÷Òµ¡°¿ç¼¯ÈºµØ¡±£¨»òÕßÄã±¾»úµÄºÜ¶à½ø³Ì£©ÔËÐÐʱ£¬ÄãÓ¦¸Ã¿ÉÒÔ¿´µ½ºÜ¶àINFOÓï¾ä¡£Èç¹ûÍ˳ö½âÊÍÆ÷£¬Äã¿ÉÒÔ¿´µ½µ±Ç°¹¤×÷Ŀ¼ÏÂÓиö¡°wc¡±Ä¿Â¼¡£
$ ls wc/ _SUCCESS part-00000 part-00001 |
ÿ¸öpartÎļþ¶¼´ú±íÄã±¾»úÉϵĽø³Ì¼ÆËãµÃµ½µÄ±»±£³Öµ½´ÅÅÌÉϵÄ×îÖÕRDD¡£Èç¹û¶ÔÒ»¸öpartÎļþ½øÐÐheadÃüÁÄãÓ¦¸ÃÄÜ¿´µ½×ÖÊýͳ¼ÆÔª×é¡£
$ head wc/part-00000 (u'fawn', 14) (u'Fame.', 1) (u'Fame,', 2) (u'kinghenryviii@7731', 1) (u'othello@36737', 1) (u'loveslabourslost@51678', 1) (u'1kinghenryiv@54228', 1) (u'troilusandcressida@83747', 1) (u'fleeces', 1) (u'midsummersnightsdream@71681', 1) |
×¢ÒâÕâЩ¼üûÓÐÏñHadoopÒ»Ñù±»ÅÅÐò£¨ÒòΪHadoopÖÐMapºÍReduceÈÎÎñÖÐÓиö±ØÒªµÄ´òÂÒºÍÅÅÐò½×¶Î£©¡£µ«ÊÇ£¬Äܱ£Ö¤Ã¿¸öµ¥´ÊÔÚËùÓÐÎļþÖÐÖ»³öÏÖÒ»´Î£¬ÒòΪÄãʹÓÃÁËreduceByKey²Ù×÷·û¡£Ä㻹¿ÉÒÔʹÓÃsort²Ù×÷·ûÈ·±£ÔÚдÈëµ½´ÅÅÌ֮ǰËùÓеļü¶¼±»ÅŹýÐò¡£
±àдһ¸öSparkÓ¦ÓÃ
±àдSparkÓ¦ÓÃÓëͨ¹ý½»»¥Ê½¿ØÖÆÌ¨Ê¹ÓÃSparkÀàËÆ¡£APIÊÇÏàͬµÄ¡£Ê×ÏÈ£¬ÄãÐèÒª·ÃÎÊ<SparkContext£¬ËüÒѾÓÉ<pyspark×Ô¶¯¼ÓÔØºÃÁË¡£
ʹÓÃSpark±àдSparkÓ¦ÓõÄÒ»¸ö»ù±¾Ä£°åÈçÏ£º
## Spark Application - execute with spark-submit ## Imports from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "My Spark Application" ## Closure Functions ## Main functionality def main(sc): pass if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) # Execute Main functionality main(sc) |
Õâ¸öÄ£°åÁгöÁËÒ»¸öSparkÓ¦ÓÃËùÐèµÄ¶«Î÷£ºµ¼ÈëPython¿â£¬Ä£¿é³£Á¿£¬ÓÃÓÚµ÷ÊÔºÍSpark UIµÄ¿Éʶ±ðµÄÓ¦ÓÃÃû³Æ£¬»¹ÓÐ×÷ΪÇý¶¯³ÌÐòÔËÐеÄһЩÖ÷Òª·ÖÎö·½·¨Ñ§¡£ÔÚifmainÖУ¬ÎÒÃÇ´´½¨ÁËSparkContext£¬Ê¹ÓÃÁËÅäÖúõÄcontextÖ´ÐÐmain¡£ÎÒÃÇ¿ÉÒÔ¼òµ¥µØµ¼ÈëÇý¶¯´úÂëµ½pyspark¶ø²»ÓÃÖ´ÐС£×¢ÒâÕâÀïSparkÅäÖÃͨ¹ýsetMaster·½·¨±»Ó²±àÂëµ½SparkConf£¬Ò»°ãÄãÓ¦¸ÃÔÊÐíÕâ¸öֵͨ¹ýÃüÁîÐÐÀ´ÉèÖã¬ËùÒÔÄãÄÜ¿´µ½ÕâÐÐ×öÁËռλ·û×¢ÊÍ¡£
ʹÓÃ<sc.stop()»ò<sys.exit(0)À´¹Ø±Õ»òÍ˳ö³ÌÐò¡£
## Spark Application - execute with spark-submit ## Imports import csv import matplotlib.pyplot as plt from StringIO import StringIO from datetime import datetime from collections import namedtuple from operator import add, itemgetter from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "Flight Delay Analysis" DATE_FMT = "%Y-%m-%d" TIME_FMT = "%H%M" fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance') Flight = namedtuple('Flight', fields) ## Closure Functions def parse(row): """ Parses a row and returns a named tuple. """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11]) def split(line): """ Operator function for splitting a line with csv module """ reader = csv.reader(StringIO(line)) return reader.next() def plot(delays): """ Show a bar chart of the total delay per airline """ airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] index = list(xrange(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) # Add the total minutes to the right for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#d9230f') axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center') else: bars[idx].set_color('#469408') axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center') # Set the ticks ticks = plt.yticks([idx+ 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] * len(xt)) # minimize chart junk plt.grid(axis = 'x', color ='white', linestyle='-') plt.title('Total Minutes Delayed per Airline') plt.show() ## Main functionality def main(sc): # Load the airlines lookup dictionary airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect()) # Broadcast the lookup dictionary to the cluster airline_lookup = sc.broadcast(airlines) # Read the CSV Data into an RDD flights = sc.textFile("ontime/flights.csv").map(split).map(parse) # Map the total delay to the airline (joined using the broadcast value) delays = flights.map(lambda f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay))) # Reduce the total delay for the month to the airline delays = delays.reduceByKey(add).collect() delays = sorted(delays, key=itemgetter(1)) # Provide output from the driver for d in delays: print "%0.0f minutes delayed\t%s" % (d[1], d[0]) # Show a bar chart of the delays plot(delays) if __name__ == "__main__": # Configure Spark conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) # Execute Main functionality main(sc) |
ʹÓÃ<spark-submitÃüÁîÀ´ÔËÐÐÕâ¶Î´úÂ루¼ÙÉèÄãÒÑÓÐontimeĿ¼£¬Ä¿Â¼ÖÐÓÐÁ½¸öCSVÎļþ£©£º
Õâ¸öSpark×÷ҵʹÓñ¾»ú×÷Ϊmaster£¬²¢ËÑË÷app.pyͬĿ¼ÏµÄontimeĿ¼ÏµÄ2¸öCSVÎļþ¡£×îÖÕ½á¹ûÏÔʾ£¬4ÔµÄ×ÜÑÓÎóʱ¼ä£¨µ¥Î»·ÖÖÓ£©£¬¼ÈÓÐÔçµãµÄ£¨Èç¹ûÄã´ÓÃÀ¹ú´ó½·ÉÍùÏÄÍþÒÄ»òÕß°¢À˹¼Ó£©£¬µ«¶Ô´ó²¿·Ö´óÐͺ½¿Õ¹«Ë¾¶¼ÊÇÑÓÎóµÄ¡£×¢Ò⣬ÎÒÃÇÔÚapp.pyÖÐʹÓÃmatplotlibÖ±½Ó½«½á¹û¿ÉÊÓ»¯³öÀ´ÁË£º

Õâ¶Î´úÂë×öÁËÊ²Ã´ÄØ£¿ÎÒÃÇÌØ±ð×¢ÒâÏÂÓëSpark×îÖ±½ÓÏà¹ØµÄmainº¯Êý¡£Ê×ÏÈ£¬ÎÒÃǼÓÔØCSVÎļþµ½RDD£¬È»ºó°Ñsplitº¯ÊýÓ³É䏸Ëü¡£splitº¯ÊýʹÓÃcsvÄ£¿é½âÎöÎı¾µÄÿһÐУ¬²¢·µ»Ø´ú±íÿÐеÄÔª×é¡£×îºó£¬ÎÒÃǽ«collect¶¯×÷´«¸øRDD£¬Õâ¸ö¶¯×÷°ÑÊý¾ÝÒÔPythonÁбíµÄÐÎʽ´ÓRDD´«»ØÇý¶¯³ÌÐò¡£±¾ÀýÖУ¬airlines.csvÊǸöСÐ͵ÄÌø×ª±í£¨jump
table£©£¬¿ÉÒÔ½«º½¿Õ¹«Ë¾´úÂëÓëÈ«Ãû¶ÔÓ¦ÆðÀ´¡£ÎÒÃǽ«×ªÒÆ±í´æ´¢ÎªPython×ֵ䣬ȻºóʹÓÃsc.broadcast¹ã²¥¸ø¼¯ÈºÉϵÄÿ¸ö½Úµã¡£
½Ó×Å£¬mainº¯Êý¼ÓÔØÁËÊý¾ÝÁ¿¸ü´óµÄflights.csv£¨[ÒëÕß×¢]×÷Õß±ÊÎóд³Éfights.csv£¬´Ë´¦¸üÕý£©¡£²ð·ÖCSVÐÐÍê³ÉÖ®ºó£¬ÎÒÃǽ«parseº¯ÊýÓ³É䏸CSVÐУ¬´Ëº¯Êý»á°ÑÈÕÆÚºÍʱ¼äת³ÉPythonµÄÈÕÆÚºÍʱ¼ä£¬²¢¶Ô¸¡µãÊý½øÐкÏÊʵÄÀàÐÍת»»¡£Ã¿ÐÐ×÷Ϊһ¸öNamedTuple±£´æ£¬ÃûΪFlight£¬ÒÔ±ã¸ßЧ¼ò±ãµØÊ¹Óá£
ÓÐÁËFlight¶ÔÏóµÄRDD£¬ÎÒÃÇÓ³ÉäÒ»¸öÄäÃûº¯Êý£¬Õâ¸öº¯Êý½«RDDת»»ÎªÒ»Ð©ÁеļüÖµ¶Ô£¬ÆäÖмüÊǺ½¿Õ¹«Ë¾µÄÃû×Ö£¬ÖµÊǵ½´ïºÍ³ö·¢µÄÑÓÎóʱ¼ä×ܺ͡£Ê¹ÓÃreduceByKey¶¯×÷ºÍadd²Ù×÷·û¿ÉÒԵõ½Ã¿¸öº½¿Õ¹«Ë¾µÄÑÓÎóʱ¼ä×ܺͣ¬È»ºóRDD±»´«µÝ¸øÇý¶¯³ÌÐò£¨Êý¾ÝÖк½¿Õ¹«Ë¾µÄÊýÄ¿Ïà¶Ô½ÏÉÙ£©¡£×îÖÕÑÓÎóʱ¼ä°´ÕÕÉýÐòÅÅÁУ¬Êä³ö´òÓ¡µ½ÁË¿ØÖÆÌ¨£¬²¢ÇÒʹÓÃmatplotlib½øÐÐÁË¿ÉÊÓ»¯¡£
Õâ¸öÀý×ÓÉÔ³¤£¬µ«ÊÇÏ£ÍûÄÜÑÝʾ³ö¼¯ÈººÍÇý¶¯³ÌÐòÖ®¼äµÄÏ໥×÷Ó㨷¢ËÍÊý¾Ý½øÐзÖÎö£¬½á¹ûÈ¡»Ø¸øÇý¶¯³ÌÐò£©£¬ÒÔ¼°Python´úÂëÔÚSparkÓ¦ÓÃÖеĽÇÉ«¡£
½áÂÛ
¾¡¹ÜËã²»ÉÏÒ»¸öÍêÕûµÄSparkÈëÃÅ£¬ÎÒÃÇÏ£ÍûÄãÄܸüºÃµØÁ˽âSparkÊÇʲô£¬ÈçºÎʹÓýøÐпìËÙ¡¢ÄÚ´æ·Ö²¼Ê½¼ÆËã¡£ÖÁÉÙ£¬ÄãÓ¦¸ÃÄܽ«SparkÔËÐÐÆðÀ´£¬²¢¿ªÊ¼ÔÚ±¾»ú»òAmazon
EC2ÉÏ̽Ë÷Êý¾Ý¡£ÄãÓ¦¸Ã¿ÉÒÔÅäÖúÃiPython notebookÀ´ÔËÐÐSpark¡£
Spark²»Äܽâ¾ö·Ö²¼Ê½´æ´¢ÎÊÌ⣨ͨ³£Spark´ÓHDFSÖлñÈ¡Êý¾Ý£©£¬µ«ÊÇËüΪ·Ö²¼Ê½¼ÆËãÌṩÁ˷ḻµÄº¯Êýʽ±à³ÌAPI¡£Õâ¸ö¿ò¼Ü½¨Á¢ÔÚÉìËõ·Ö²¼Ê½Êý¾Ý¼¯£¨RDD£©Ö®ÉÏ¡£RDDÊÇÖÖ±à³Ì³éÏ󣬴ú±í±»·ÖÇøµÄ¶ÔÏ󼯺ϣ¬ÔÊÐí½øÐзֲ¼Ê½²Ù×÷¡£RDDÓÐÈÝ´íÄÜÁ¦£¨¿ÉÉìËõµÄ²¿·Ö£©£¬¸üÖØÒªµÄʱ£¬¿ÉÒÔ´æ´¢µ½½ÚµãÉϵÄworkerÄÚ´æÀï½øÐÐÁ¢¼´ÖØÓá£ÄÚ´æ´æ´¢ÌṩÁË¿ìËٺͼòµ¥±íʾµÄµü´úËã·¨£¬ÒÔ¼°ÊµÊ±½»»¥·ÖÎö¡£
ÓÉÓÚSpark¿âÌṩÁËPython¡¢Scale¡¢Java±àдµÄAPI£¬ÒÔ¼°ÄÚ½¨µÄ»úÆ÷ѧϰ¡¢Á÷Êý¾Ý¡¢Í¼Ëã·¨¡¢ÀàSQL²éѯµÈÄ£¿é£»SparkѸËÙ³ÉΪµ±½ñ×îÖØÒªµÄ·Ö²¼Ê½¼ÆËã¿ò¼ÜÖ®Ò»¡£ÓëYARN½áºÏ£¬SparkÌṩÁËÔöÁ¿£¬¶ø²»ÊÇÌæ´úÒÑ´æÔÚµÄHadoop¼¯Èº£¬Ëü½«³ÉΪδÀ´´óÊý¾ÝÖØÒªµÄÒ»²¿·Ö£¬ÎªÊý¾Ý¿ÆÑ§Ì½Ë÷ÆÌÉèÁËÒ»Ìõ¿µ×¯´óµÀ¡£
|