±¾ÎÄÊ×ÏÈ¶Ô MapReduce ºÍ Spark µÄ»ù±¾ÐÅÏ¢×öÁ˶ԱÈÐÔ½éÉÜ£¬½Ó×Å·Ö±ð¶Ô
MapReduce ºÍ Spark ½øÐлù´¡ÐÔ֪ʶ½²½â£¬È»ºóÔÚµ¥Ì¨ Linux ·þÎñÆ÷Éϰ²×°ÁË Spark£¬²¢ÒÔʵ¼Ê´úÂëÑÝʾÁË´Ó
MapReduce ת»»´úÂëµ½ Spark ʱÐèҪעÒâµÄÊÂÏî¡£±¾ÎÄÕë¶ÔµÄÊÇ¶Ô Spark ÍêȫûÓÐÁ˽âµÄÓû§£¬ºóÐøÎÄÕ»á´Óʵ¼ÊÓ¦Óóö·¢£¬´Ó°²×°¡¢Ó¦ÓóÌÐòµÄ½Ç¶È¸ø³ö¸ü¼ÓʵÓõĽ̡̳£
MapReduce VS Spark
ĿǰµÄ´óÊý¾Ý´¦Àí¿ÉÒÔ·ÖΪÒÔÏÂÈý¸öÀàÐÍ£º
1.¸´ÔÓµÄÅúÁ¿Êý¾Ý´¦Àí£¨batch data processing£©£¬Í¨³£µÄʱ¼ä¿ç¶ÈÔÚÊýÊ®·ÖÖÓµ½ÊýСʱ֮¼ä£»
2.»ùÓÚÀúÊ·Êý¾ÝµÄ½»»¥Ê½²éѯ£¨interactive query£©£¬Í¨³£µÄʱ¼ä¿ç¶ÈÔÚÊýÊ®Ãëµ½Êý·ÖÖÓÖ®¼ä£»
3.»ùÓÚʵʱÊý¾ÝÁ÷µÄÊý¾Ý´¦Àí£¨streaming data processing£©£¬Í¨³£µÄʱ¼ä¿ç¶ÈÔÚÊý°ÙºÁÃëµ½ÊýÃëÖ®¼ä¡£
´óÊý¾Ý´¦ÀíÊÆ±ØÐèÒªÒÀÀµ¼¯Èº»·¾³£¬¶ø¼¯Èº»·¾³ÓÐÈý´óÌôÕ½£¬·Ö±ðÊDz¢Ðл¯¡¢µ¥µãʧ°Ü´¦Àí¡¢×ÊÔ´¹²Ïí£¬·Ö±ð¿ÉÒÔ²ÉÓÃÒÔ²¢Ðл¯µÄ·½Ê½ÖØÐ´Ó¦ÓóÌÐò¡¢¶Ôµ¥µãʧ°ÜµÄ´¦Àí·½Ê½¡¢¶¯Ì¬µØ½øÐмÆËã×ÊÔ´µÄ·ÖÅäµÈ½â¾ö·½°¸À´Ãæ¶ÔÌôÕ½¡£
Õë¶Ô¼¯Èº»·¾³³öÏÖÁË´óÁ¿µÄ´óÊý¾Ý±à³Ì¿ò¼Ü£¬Ê×ÏÈÊÇ Google µÄ MapReduce£¬Ëü¸øÎÒÃÇչʾÁËÒ»¸ö¼òµ¥Í¨ÓúÍ×Ô¶¯ÈÝ´íµÄÅú´¦Àí¼ÆËãÄ£ÐÍ¡£µ«ÊǶÔÓÚÆäËûÀàÐ͵ļÆË㣬±ÈÈç½»»¥Ê½ºÍÁ÷ʽ¼ÆË㣬MapReduce
²¢²»Êʺϡ£ÕâÒ²µ¼ÖÂÁË´óÁ¿µÄ²»Í¬ÓÚ MapReduce µÄרÓеÄÊý¾Ý´¦ÀíÄ£Ð͵ijöÏÖ£¬±ÈÈç Storm¡¢Impala
µÈµÈ¡£µ«ÊÇÕâЩרÓÐϵͳҲ´æÔÚһЩ²»×㣺
ÖØ¸´¹¤×÷£ºÐí¶àרÓÐϵͳÔÚ½â¾öͬÑùµÄÎÊÌ⣬±ÈÈç·Ö²¼Ê½×÷ÒµÒÔ¼°ÈÝ´í£¬¾ÙÀýÀ´Ëµ£¬Ò»¸ö·Ö²¼Ê½µÄ
SQL ÒýÇæ»òÕßÒ»¸ö»úÆ÷ѧϰϵͳ¶¼ÐèҪʵÏÖ²¢Ðоۺϣ¬ÕâЩÎÊÌâÔÚÿ¸öרÓÐϵͳÖлáÖØ¸´µØ±»½â¾ö¡£
×éºÏÎÊÌ⣺ÔÚ²»Í¬µÄϵͳ֮¼ä½øÐÐ×éºÏ¼ÆËãÊÇÒ»¼þÂé·³µÄÊÂÇé¡£¶ÔÓÚÌØ¶¨µÄ´óÊý¾ÝÓ¦ÓóÌÐò¶øÑÔ£¬ÖмäÊý¾Ý¼¯ÊǷdz£´óµÄ£¬¶øÇÒÒÆ¶¯µÄ³É±¾ºÜ¸ß¡£ÔÚĿǰµÄ»·¾³Ï£¬ÎÒÃÇÐèÒª½«Êý¾Ý¸´ÖƵ½Îȶ¨µÄ´æ´¢ÏµÍ³£¬±ÈÈç
HDFS£¬ÒÔ±ãÔÚ²»Í¬µÄ¼ÆËãÒýÇæÖнøÐзÖÏí¡£È»¶ø£¬ÕâÑùµÄ¸´ÖÆ¿ÉÄܱÈÕæÕýµÄ¼ÆËãËù»¨·ÑµÄ´ú¼ÛÒª´ó£¬ËùÒÔÒÔÁ÷Ë®ÏßµÄÐÎʽ½«¶à¸öϵͳ×éºÏÆðÀ´Ð§Âʲ¢²»¸ß¡£
ÊÊÓ÷¶Î§µÄ¾ÖÏÞÐÔ£ºÈç¹ûÒ»¸öÓ¦Óò»ÊʺÏÒ»¸öרÓеļÆËãϵͳ£¬ÄÇôʹÓÃÕßÖ»ÄÜ»»Ò»¸öϵͳ£¬»òÕßÖØÐ´Ò»¸öеļÆËãϵͳ¡£
×ÊÔ´·ÖÅ䣺ÔÚ²»Í¬µÄ¼ÆËãÒýÇæÖ®¼ä½øÐÐ×ÊÔ´µÄ¶¯Ì¬¹²Ïí±È½ÏÀ§ÄÑ£¬ÒòΪ´ó¶àÊýµÄ¼ÆËãÒýÇæ¶¼»á¼ÙÉèËüÃÇÔÚ³ÌÐòÔËÐнáÊøÖ®Ç°ÓµÓÐÏàͬµÄ»úÆ÷½ÚµãµÄ×ÊÔ´¡£
¹ÜÀíÎÊÌ⣺¶ÔÓÚ¶à¸öרÓÐϵͳ£¬ÐèÒª»¨·Ñ¸ü¶àµÄ¾«Á¦ºÍʱ¼äÀ´¹ÜÀíºÍ²¿Êð£¬ÓÈÆäÊǶÔÓÚÖÕ¶ËʹÓÃÕß¶øÑÔ£¬ÐèҪѧϰ¶àÖÖ
API ºÍϵͳģÐÍ¡£
Spark ÊDz®¿ËÀû´óÑ§ÍÆ³öµÄ´óÊý¾Ý´¦Àí¿ò¼Ü£¬ËüÌá³öÁË RDD ¸ÅÄî (Resilient
Distributed Datasets)£¬¼´³éÏóµÄµ¯ÐÔÊý¾Ý¼¯¸ÅÄî¡£Spark ÊÇ¶Ô MapReduce
Ä£Ð͵ÄÒ»ÖÖÀ©Õ¹¡£ÒªÔÚ MapReduce ÉÏʵÏÖÆä²»Éó¤µÄ¼ÆË㹤×÷ (±ÈÈçµü´úʽ¡¢½»»¥Ê½ºÍÁ÷ʽ)£¬ÊDZȽÏÀ§Äѵģ¬ÒòΪ
MapReduce ȱÉÙÔÚ²¢ÐмÆËãµÄ¸÷¸ö½×¶Î½øÐÐÓÐЧµÄÊý¾Ý¹²ÏíµÄÄÜÁ¦£¬¶øÕâÖÖÄÜÁ¦ÊÇ RDD µÄ±¾ÖÊËùÔÚ¡£ÀûÓÃÕâÖÖÓÐЧµØÊý¾Ý¹²ÏíºÍÀàËÆ
MapReduce µÄ²Ù×÷½Ó¿Ú£¬ÉÏÊöµÄ¸÷ÖÖרÓÐÀàÐͼÆËã¶¼Äܹ»ÓÐЧµØ±í´ï£¬¶øÇÒÄܹ»»ñµÃÓëרÓÐϵͳͬµÈµÄÐÔÄÜ¡£
MapReduce ºÍ Spark ½éÉÜ
MapReduce
MapReduce ÊÇΪ Apache Hadoop Á¿Éí¶©×öµÄ£¬Ëü·Ç³£ÊÊÓÃÓÚ
Hadoop µÄʹÓó¡¾°£¬¼´´ó¹æÄ£ÈÕÖ¾´¦Àíϵͳ¡¢ÅúÁ¿Êý¾ÝÌáÈ¡¼ÓÔØ¹¤¾ß (ETL ¹¤¾ß) µÈÀàËÆ²Ù×÷¡£µ«ÊǰéËæ×Å
Hadoop µØÅ̵IJ»¶ÏÀ©ÕÅ£¬Hadoop µÄ¿ª·¢ÕßÃÇ·¢ÏÖ MapReduce Ôںܶೡ¾°Ï²¢²»ÊÇ×î¼ÑÑ¡Ôñ£¬ÓÚÊÇ
Hadoop ¿ªÊ¼°Ñ×ÊÔ´¹ÜÀí·ÅÈëµ½×Ô¼º¶ÀÁ¢µÄ×é¼þ YARN ÀïÃæ¡£´ËÍ⣬ÀàËÆÓÚ Impala ÕâÑùµÄÏîĿҲ¿ªÊ¼Öð½¥½øÈëµ½ÎÒÃǵļܹ¹ÖУ¬Impala
Ìṩ SQL ÓïÒ壬Äܲéѯ´æ´¢ÔÚ Hadoop µÄ HDFS ºÍ HBase ÖÐµÄ PB ¼¶´óÊý¾Ý¡£Ö®Ç°Ò²ÓÐÀàËÆµÄÏîÄ¿£¬ÀýÈç
Hive¡£Hive ϵͳËäȻҲÌṩÁË SQL ÓïÒ壬µ«ÓÉÓÚ Hive µ×²ãÖ´ÐÐʹÓõÄÊÇ MapReduce
ÒýÇæ£¬ÈÔÈ»ÊÇÒ»¸öÅú´¦Àí¹ý³Ì£¬ÄÑÒÔÂú×ã²éѯµÄ½»»¥ÐÔ¡£Ïà±È֮ϣ¬Impala µÄ×î´óÌØµãÒ²ÊÇ×î´óÂôµã¾ÍÊÇËüµÄЧÂÊ¡£
µÚÒ»´ú Hadoop MapReduce ÊÇÒ»¸öÔÚ¼ÆËã»ú¼¯ÈºÉÏ·Ö²¼Ê½´¦Àíº£Á¿Êý¾Ý¼¯µÄÈí¼þ¿ò¼Ü£¬°üÀ¨Ò»¸ö
JobTracker ºÍÒ»¶¨ÊýÁ¿µÄ TaskTracker¡£ÔËÐÐÁ÷³ÌͼÈçͼ 1 Ëùʾ¡£

ͼ 1. MapReduce ÔËÐÐÁ÷³Ìͼ
ÔÚ×îÉϲãÓÐ 4 ¸ö¶ÀÁ¢µÄʵÌ壬¼´¿Í»§¶Ë¡¢jobtracker¡¢tasktracker
ºÍ·Ö²¼Ê½Îļþϵͳ¡£¿Í»§¶ËÌá½» MapReduce ×÷Òµ£»jobtracker е÷×÷ÒµµÄÔËÐУ»jobtracker
ÊÇÒ»¸ö Java Ó¦ÓóÌÐò£¬ËüµÄÖ÷ÀàÊÇ JobTracker£»tasktracker ÔËÐÐ×÷Òµ»®·ÖºóµÄÈÎÎñ£¬tasktracker
Ò²ÊÇÒ»¸ö Java Ó¦ÓóÌÐò£¬ËüµÄÖ÷ÀàÊÇ TaskTracker¡£Hadoop ÔËÐÐ MapReduce
×÷ÒµµÄ²½ÖèÖ÷Òª°üÀ¨Ìá½»×÷Òµ¡¢³õʼ»¯×÷Òµ¡¢·ÖÅäÈÎÎñ¡¢Ö´ÐÐÈÎÎñ¡¢¸üнø¶ÈºÍ״̬¡¢Íê³É×÷ÒµµÈ 6 ¸ö²½Öè¡£
Spark ¼ò½é
Spark Éú̬ϵͳµÄÄ¿±ê¾ÍÊǽ«Åú´¦Àí¡¢½»»¥Ê½´¦Àí¡¢Á÷ʽ´¦ÀíÈںϵ½Ò»¸öÈí¼þ¿ò¼ÜÄÚ¡£Spark
ÊÇÒ»¸ö»ùÓÚÄÚ´æ¼ÆËãµÄ¿ªÔ´µÄ¼¯Èº¼ÆËãϵͳ£¬Ä¿µÄÊÇÈÃÊý¾Ý·ÖÎö¸ü¼Ó¿ìËÙ¡£Spark ·Ç³£Ð¡ÇÉÁáç磬ÓɼÓÖݲ®¿ËÀû´óѧ
AMP ʵÑéÊÒµÄ Matei ΪÖ÷µÄСÍŶÓËù¿ª·¢¡£Ê¹ÓõÄÓïÑÔÊÇ Scala£¬ÏîÄ¿µÄ core ²¿·ÖµÄ´úÂëÖ»ÓÐ
63 ¸ö Scala Îļþ£¬·Ç³£¶ÌС¾«º·¡£Spark ÆôÓÃÁËÄÚ´æ·Ö²¼Êý¾Ý¼¯£¬³ýÁËÄܹ»Ìṩ½»»¥Ê½²éѯÍ⣬Ëü»¹¿ÉÒÔÓÅ»¯µü´ú¹¤×÷¸ºÔØ¡£Spark
ÌṩÁË»ùÓÚÄÚ´æµÄ¼ÆË㼯Ⱥ£¬ÔÚ·ÖÎöÊý¾Ýʱ½«Êý¾Ýµ¼ÈëÄÚ´æÒÔʵÏÖ¿ìËÙ²éѯ£¬ËٶȱȻùÓÚ´ÅÅ̵Äϵͳ£¬Èç Hadoop
¿ìºÜ¶à¡£Spark ×î³õÊÇΪÁË´¦Àíµü´úËã·¨£¬Èç»úÆ÷ѧϰ¡¢Í¼ÍÚ¾òËã·¨µÈ£¬ÒÔ¼°½»»¥Ê½Êý¾ÝÍÚ¾òËã·¨¶ø¿ª·¢µÄ¡£ÔÚÕâÁ½ÖÖ³¡¾°Ï£¬Spark
µÄÔËÐÐËÙ¶È¿ÉÒÔ´ïµ½ Hadoop µÄ¼¸°Ù±¶¡£
Spark ÔÊÐíÓ¦ÓÃÔÚÄÚ´æÖб£´æ¹¤×÷¼¯ÒÔ±ã¸ßЧµØÖظ´ÀûÓã¬ËüÖ§³Ö¶àÖÖÊý¾Ý´¦ÀíÓ¦Óã¬Í¬Ê±Ò²±£³ÖÁË
MapReduce µÄÖØÒªÌØÐÔ£¬Èç¸ßÈÝ´íÐÔ¡¢Êý¾Ý±¾µØ»¯¡¢´ó¹æÄ£Êý¾Ý´¦ÀíµÈ¡£´ËÍ⣬Ìá³öÁ˵¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯
(Resilient Distributed Datasets) µÄ¸ÅÄ
RDD ±íÏÖΪһ¸ö Scala ¶ÔÏ󣬿ÉÓÉÒ»¸öÎļþ´´½¨¶øÀ´£»
·Ö²¼ÔÚÒ»¸ö¼¯ÈºÄڵ쬲»¿É±äµÄ¶ÔÏóÇзּ¯£»
ͨ¹ý²¢Ðд¦Àí£¨map¡¢filter¡¢groupby¡¢join£©¹Ì¶¨Êý¾Ý£¨BaseRDD£©´´½¨Ä£ÐÍ£¬Éú³É
Transformed RDD£»
¹ÊÕÏʱ¿ÉʹÓà RDD ѪͳÐÅÏ¢ÖØ½¨£»
¿É¸ßËÙ»º´æ£¬ÒÔ±ãÔÙÀûÓá£
ͼ 2 ËùʾÊÇÒ»¸öÈÕÖ¾ÍÚ¾òµÄʾÀý´úÂ룬Ê×ÏȽ«ÈÕÖ¾Êý¾ÝÖÐµÄ error ÐÅÏ¢µ¼ÈëÄڴ棬Ȼºó½øÐн»»¥ËÑË÷¡£

ͼ 2. RDD ´úÂëʾÀý
ÔÚµ¼ÈëÊý¾Ýʱ£¬Ä£ÐÍÒÔ block ÐÎʽ´æÔÚÓÚ worker ÉÏ£¬ÓÉ driver
Ïò worker ·Ö·¢ÈÎÎñ£¬´¦ÀíÍêºó work Ïò driver ·´À¡½á¹û¡£Ò²¿ÉÔÚ work É϶ÔÊý¾ÝÄ£Ðͽ¨Á¢¸ßËÙ»º´æ
cache£¬¶Ô cache µÄ´¦Àí¹ý³ÌÓë block ÀàËÆ£¬Ò²ÊÇÒ»¸ö·Ö·¢¡¢·´À¡µÄ¹ý³Ì¡£
Spark µÄ RDD ¸ÅÄîÄܹ»È¡µÃºÍרÓÐϵͳͬÑùµÄÐÔÄÜ£¬»¹ÄÜÌṩ°üÀ¨ÈÝ´í´¦Àí¡¢Öͺó½Úµã´¦ÀíµÈÕâЩרÓÐϵͳȱ·¦µÄÌØÐÔ¡£
µü´úËã·¨£ºÕâÊÇĿǰרÓÐϵͳʵÏֵķdz£ÆÕ±éµÄÒ»ÖÖÓ¦Óó¡¾°£¬±ÈÈçµü´ú¼ÆËã¿ÉÒÔÓÃÓÚͼ´¦ÀíºÍ»úÆ÷ѧϰ¡£RDD
Äܹ»ºÜºÃµØÊµÏÖÕâЩģÐÍ£¬°üÀ¨ Pregel¡¢HaLoop ºÍ GraphLab µÈÄ£ÐÍ¡£
¹ØÏµÐͲéѯ£º¶ÔÓÚ MapReduce À´Ëµ·Ç³£ÖØÒªµÄÐèÇó¾ÍÊÇÔËÐÐ SQL
²éѯ£¬°üÀ¨³¤ÆÚÔËÐС¢ÊýСʱµÄÅú´¦Àí×÷ÒµºÍ½»»¥Ê½µÄ²éѯ¡£È»¶ø¶ÔÓÚ MapReduce ¶øÑÔ£¬¶Ô±È²¢ÐÐÊý¾Ý¿â½øÐн»»¥Ê½²éѯ£¬ÓÐÆäÄÚÔÚµÄȱµã£¬±ÈÈçÓÉÓÚÆäÈÝ´íµÄÄ£ÐͶøµ¼ÖÂËٶȺÜÂý¡£ÀûÓÃ
RDD Ä£ÐÍ£¬¿ÉÒÔͨ¹ýʵÏÖÐí¶àͨÓõÄÊý¾Ý¿âÒýÇæÌØÐÔ£¬´Ó¶ø»ñµÃºÜºÃµÄÐÔÄÜ¡£
MapReduce Åú´¦Àí£ºRDD ÌṩµÄ½Ó¿ÚÊÇ MapReduce µÄ³¬¼¯£¬ËùÒÔ
RDD Äܹ»ÓÐЧµØÔËÐÐÀûÓà MapReduce ʵÏÖµÄÓ¦ÓóÌÐò£¬ÁíÍâ RDD »¹Êʺϸü¼Ó³éÏóµÄ»ùÓÚ DAG
µÄÓ¦ÓóÌÐò¡£
Á÷ʽ´¦Àí£ºÄ¿Ç°µÄÁ÷ʽϵͳҲֻÌṩÁËÓÐÏÞµÄÈÝ´í´¦Àí£¬ÐèÒªÏûºÄϵͳ·Ç³£´óµÄ¿½±´´úÂë»òÕ߷dz£³¤µÄÈÝ´íʱ¼ä¡£ÌرðÊÇÔÚĿǰµÄϵͳÖУ¬»ù±¾¶¼ÊÇ»ùÓÚÁ¬Ðø¼ÆËãµÄÄ£ÐÍ£¬³£×¡µÄÓÐ״̬µÄ²Ù×÷»á´¦Àíµ½´ïµÄÿһÌõ¼Ç¼¡£ÎªÁ˻ָ´Ê§°ÜµÄ½Úµã£¬ËüÃÇÐèҪΪÿһ¸ö²Ù×÷¸´ÖÆÁ½·Ý²Ù×÷£¬»òÕß½«ÉÏÓεÄÊý¾Ý½øÐдú¼Û½Ï´óµÄ²Ù×÷ÖØ·Å£¬ÀûÓÃ
RDD ʵÏÖÀëÉ¢Êý¾ÝÁ÷£¬¿ÉÒÔ¿Ë·þÉÏÊöÎÊÌâ¡£ÀëÉ¢Êý¾ÝÁ÷½«Á÷ʽ¼ÆËãµ±×÷һϵÁеĶÌС¶øÈ·¶¨µÄÅú´¦Àí²Ù×÷£¬¶ø²»Êdz£×¤µÄÓÐ״̬µÄ²Ù×÷£¬½«Á½¸öÀëÉ¢Á÷Ö®¼äµÄ״̬±£´æÔÚ
RDD ÖС£ÀëÉ¢Á÷Ä£ÐÍÄܹ»ÔÊÐíͨ¹ý RDD µÄ¼Ì³Ð¹ØÏµÍ¼½øÐв¢ÐÐÐԵĻָ´¶ø²»ÐèÒª½øÐÐÊý¾Ý¿½±´¡£
Spark ÄÚ²¿ÊõÓï½âÊÍ
Application£º»ùÓÚ Spark µÄÓû§³ÌÐò£¬°üº¬ÁË driver
³ÌÐòºÍ¼¯ÈºÉ쵀 executor£»
Driver Program£ºÔËÐÐ main º¯Êý²¢ÇÒн¨ SparkContext
µÄ³ÌÐò£»
Cluster Manager£ºÔÚ¼¯ÈºÉÏ»ñÈ¡×ÊÔ´µÄÍⲿ·þÎñ (ÀýÈç:standalone,Mesos,Yarn)£»
Worker Node£º¼¯ÈºÖÐÈκοÉÒÔÔËÐÐÓ¦ÓôúÂëµÄ½Úµã£»
Executor£ºÊÇÔÚÒ»¸ö worker node ÉÏΪijӦÓÃÆô¶¯µÄÒ»¸ö½ø³Ì£¬¸Ã½ø³Ì¸ºÔðÔËÐÐÈÎÎñ£¬²¢ÇÒ¸ºÔð½«Êý¾Ý´æÔÚÄÚ´æ»òÕß´ÅÅÌÉÏ¡£Ã¿¸öÓ¦Óö¼Óи÷×Ô¶ÀÁ¢µÄ
executors£»
Task£º±»Ë͵½Ä³¸ö executor ÉϵŤ×÷µ¥Ôª£»
Job£º°üº¬ºÜ¶àÈÎÎñµÄ²¢ÐмÆË㣬¿ÉÒÔÓë Spark µÄ action ¶ÔÓ¦£»
Stage£ºÒ»¸ö Job »á±»²ð·ÖºÜ¶à×éÈÎÎñ£¬Ã¿×éÈÎÎñ±»³ÆÎª Stage(¾ÍÏñ
Mapreduce ·Ö map ÈÎÎñºÍ reduce ÈÎÎñÒ»Ñù)¡£
SparkDemo ³ÌÐòÔËÐÐ
Spark Ô´´úÂë¿ÉÒÔÔÚ http://spark-project.org/download
´¦ÏÂÔØ£¬Ò²¿ÉÒÔµ½ github Ö±½Ó¸´ÖÆ Spark ÏîÄ¿¡£Spark Ìṩ»ù±¾Ô´ÂëѹËõ°ü£¬Í¬Ê±Ò²ÌṩÒѾ±àÒëºÃµÄѹËõ°ü¡£Spark
ÊÇͨ¹ý Scala Shell À´ÓëÍâ½ç½øÐн»»¥µÄ¡£
¿ªÊ¼°²×°£¬ÍƼöµÄ·½·¨ÊÇÊ×ÏÈÔÚµÚÒ»¸ö½ÚµãÉϲ¿ÊðºÍÆô¶¯ master£¬»ñÈ¡
master spark url£¬È»ºóÔÚ²¿Êðµ½ÆäËû½Úµã֮ǰÐÞ¸Ä conf/spark-env.sh ÖеÄÄÚÈÝ¡£
¿ªÊ¼µ¥»ú master ·þÎñ£º./bin/start-master.sh
ÏÂÔØÁË spark-0.9.1-bin-cdh4 ºóÉÏ´«µ½/home/zhoumingyao
Ŀ¼ (¿ÉÒÔ×Ô¶¨ÒåĿ¼£¬±¾ÀýʹÓõÄÊÇ CentosV6.5 ²Ù×÷ϵͳ) Ï£¬¾ßÌå×ÓĿ¼ÈçÇåµ¥ 1 Ëùʾ¡£
Çåµ¥ 1. Ŀ¼Áбí
-rw-r--r-- 1 root root 3899 3 ÔÂ 27 13:36 README.md -rw-r--r-- 1 root root 25379 3 ÔÂ 27 13:36 pom.xml -rw-r--r-- 1 root root 162 3 ÔÂ 27 13:36 NOTICE -rw-r--r-- 1 root root 4719 3 ÔÂ 27 13:36 make-distribution.sh -rw-r--r-- 1 root root 21118 3 ÔÂ 27 13:36 LICENSE -rw-r--r-- 1 root root 127117 3 ÔÂ 27 13:36 CHANGES.txt drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:35 assembly drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:36 bagel drwxr-xr-x 2 root root 4096 5 ÔÂ 6 13:36 bin drwxr-xr-x 2 root root 4096 5 ÔÂ 6 13:36 conf drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:37 core drwxr-xr-x 2 root root 4096 5 ÔÂ 6 13:37 data drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:37 dev drwxr-xr-x 3 root root 4096 5 ÔÂ 6 13:37 docker drwxr-xr-x 7 root root 4096 5 ÔÂ 6 13:37 docs drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:37 ec2 drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:37 examples drwxr-xr-x 7 root root 4096 5 ÔÂ 6 13:38 external drwxr-xr-x 3 root root 4096 5 ÔÂ 6 13:38 extras drwxr-xr-x 5 root root 4096 5 ÔÂ 6 13:38 graphx drwxr-xr-x 5 root root 4096 5 ÔÂ 6 13:38 mllib drwxr-xr-x 3 root root 4096 5 ÔÂ 6 13:38 project drwxr-xr-x 6 root root 4096 5 ÔÂ 6 13:38 python drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:38 repl drwxr-xr-x 2 root root 4096 5 ÔÂ 6 13:38 sbin drwxr-xr-x 2 root root 4096 5 ÔÂ 6 13:38 sbt drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:39 streaming drwxr-xr-x 3 root root 4096 5 ÔÂ 6 13:39 target drwxr-xr-x 4 root root 4096 5 ÔÂ 6 13:39 tools drwxr-xr-x 5 root root 4096 5 ÔÂ 6 13:39 yarn |
½øÈë bin Ŀ¼£¬Ö´ÐÐ spark-shell.sh£¬½øÈë scala shell ״̬£¬ÈçÇåµ¥ 2
Ëùʾ¡£
Çåµ¥ 2. ÔËÐÐÃüÁî
scala> val data = Array(1, 2, 3, 4, 5) //²úÉú data data: Array[Int] = Array(1, 2, 3, 4, 5) |
ÏÂÃæ¿ªÊ¼½« data ´¦Àí³É RDD£¬ÈçÇåµ¥ 3 Ëùʾ¡£
Çåµ¥ 3. ´¦Àí³É RDD
scala> val distData = sc.parallelize(data) //½« data ´¦Àí³É RDD distData: spark.RDD[Int] = spark.ParallelCollection@7a0ec850£¨ÏÔʾ³öµÄÀàÐÍΪ RDD£© |
Çåµ¥ 4. ÔÚ RDD ÉÏÔËËã
scala> distData.reduce(_+_) //ÔÚ RDD ÉϽøÐÐÔËË㣬¶Ô data ÀïÃæÔªËØ½øÐÐ¼ÓºÍ |
Çåµ¥ 5. Æô¶¯ Spark
[root@facenode1 sbin]# ./start-all.sh starting org.apache.spark.deploy.master.Master, logging to /home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../logs/ spark-root-org.apache.spark.deploy.master.Master-1-facenode1.out localhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts.localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../ logs/spark-root-org.apache.spark.deploy.worker.Worker-1-facenode1.out |
Çåµ¥ 6. ²é¿´·þÎñ
[root@facenode1 sbin]# ps -ef | grep spark root 29295 1 11 16:45 pts/1 00:00:03 /usr/java/jdk1.6.0_31/bin/java -cp :/home/zhoumingyao/spark-0.9.1-bin-cdh4/conf:/home/ zhoumingyao/spark-0.9.1-bin-cdh4/assembly/target/scala-2.10/ spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar:/etc/alternatives/ hadoopconf -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip facenode1 --port 7077 --webui-port 8080 root 29440 1 12 16:45 ? 00:00:03 java -cp :/home/zhoumingyao/ spark-0.9.1-bin-cdh4/conf:/home/zhoumingyao/spark-0.9.1-bin-cdh4/ assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://facenode1:7077 |
¿ÉÒÔÆô¶¯¶à¸ö¹¤×÷Õ¾£¬Í¨¹ýÒÔÏÂÃüÁîÁ¬½Óµ½ master ·þÎñÆ÷£¬ÈçÇåµ¥ 7 Ëùʾ¡£
Çåµ¥ 7. Á¬½Ó Master ·þÎñÆ÷
./spark-class org.apache.spark.deploy.worker.Worker spark://facenode1:7077 Êä³öÈçÏ£º 14/05/06 16:49:06 INFO ui.WorkerWebUI: Started Worker web UI at http://facenode1:8082 14/05/06 16:49:06 INFO worker.Worker: Connecting to master spark://facenode1:7077... 14/05/06 16:49:06 INFO worker.Worker: Successfully registered with master spark://facenode1:7077 |
½øÈë master server µÄ Web UI ¿ÉÒÔ¿´µ½Ö÷½Úµã¡¢´Ó½ÚµãµÄ¹¤×÷Çé¿ö£¬ÈçÇåµ¥ 8 Ëùʾ¡£
Çåµ¥ 8. ·ÃÎÊ Web ¿Í»§¶Ë
http://10.10.19.171:8080/ |
×¢Ò⣬Èç¹ûÊǼ¯Èº·½Ê½£¬ÇëÔÚ conf Îļþ¼ÐÏÂÃæµÄ slaves ÎļþÀïÃæÖðÐмÓÈëÐèÒª¼ÓÈ뼯ȺµÄ master¡¢works
·þÎñÆ÷µÄ ip µØÖ·»òÕß hostname¡£
MapReduce ת»»µ½ Spark
Spark ÊÇÀàËÆÓÚ MapReduce µÄ¼ÆËãÒýÇæ£¬ËüÌá³öµÄÄڴ淽ʽ½â¾öÁË MapReduce ´æÔڵĶÁÈ¡´ÅÅÌËٶȽÏÂýµÄÀ§ÄÑ£¬´ËÍ⣬Ëü»ùÓÚ
Scala µÄº¯Êýʽ±à³Ì·ç¸ñºÍ API£¬½øÐв¢ÐмÆËãʱЧÂʺܸߡ£
ÓÉÓÚ Spark ²ÉÓõÄÊÇ RDD(µ¯ÐÔ·Ö²¼Ê½½á¹û¼¯) ·½Ê½¶ÔÊý¾Ý½øÐмÆË㣬ÕâÖÖ·½Ê½Óë MapReduce
µÄ Map()¡¢Reduce() ·½Ê½²î¾à½Ï´ó£¬ËùÒÔºÜÄÑÖ±½ÓʹÓà Mapper¡¢Reducer µÄ API£¬ÕâÒ²ÊÇ×è°
MapReduce תΪ Spark µÄ°í½Åʯ¡£
Scala »òÕß Spark ÀïÃæµÄ map() ºÍ reduce() ·½·¨Óë Hadoop MapReduce
ÀïÃæµÄ map()¡¢reduce() ·½·¨Ïà±È£¬Hadoop MapReduce µÄ API ¸ü¼ÓÁé»îºÍ¸´ÔÓ£¬ÏÂÃæÁгöÁË
Hadoop MapReduce µÄÒ»Ð©ÌØÐÔ£º
Mappers ºÍ Reducers ͨ³£Ê¹Óà key-value ¼üÖµ¶Ô×÷ΪÊäÈëºÍÊä³ö£»
Ò»¸ö key ¶ÔÓ¦Ò»¸ö Reducer µÄ reduce£»
ÿһ¸ö Mapper »òÕß Reducer ¿ÉÄÜ·¢³öÀàËÆÓÚ 0,1 ÕâÑùµÄ¼üÖµ¶Ô×÷Ϊÿһ´ÎÊä³ö£»
Mappers ºÍ Reducers ¿ÉÄÜ·¢³öÈÎÒâµÄ key »òÕß value£¬¶ø²»ÊDZê×¼Êý¾Ý¼¯·½Ê½£»
Mapper ºÍ Reducer ¶ÔÏó¶Ôÿһ´Î map() ºÍ reduce() µÄµ÷Óö¼´æÔÚÉúÃüÖÜÆÚ¡£ËüÃÇÖ§³ÖÒ»¸ö
setup() ·½·¨ºÍ cleanup() ·½·¨£¬ÕâЩ·½·¨¿ÉÒÔ±»ÓÃÀ´ÔÚ´¦ÀíÅúÁ¿Êý¾Ý֮ǰµÄ²Ù×÷¡£
ÊÔÏëÕâôһ¸ö³¡¾°£¬ÎÒÃÇÐèÒª¼ÆËãÒ»¸öÎı¾ÎļþÀïÿһÐеÄ×Ö·ûÊýÁ¿¡£ÔÚ Hadoop MapReduce ÀÎÒÃÇÐèҪΪ
Mapper ·½·¨×¼±¸Ò»¸ö¼üÖµ¶Ô£¬key ÓÃ×÷ÐеÄÐÐÊý£¬value µÄÖµÊÇÕâÒ»ÐеÄ×Ö·ûÊýÁ¿¡£
Çåµ¥ 9. MapReduce ·½Ê½ Map º¯Êý
public class LineLengthCountMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { context.write(new IntWritable(line.getLength()), new IntWritable(1)); } } |
Çåµ¥ 9 Ëùʾ´úÂ룬ÓÉÓÚ Mappers ºÍ Reducers Ö»´¦Àí¼üÖµ¶Ô£¬ËùÒÔ¶ÔÓÚÀà LineLengthCountMapper
¶øÑÔ£¬ÊäÈëÊÇ TextInputFormat ¶ÔÏó£¬ËüµÄ key ÓÉÐÐÊýÌṩ£¬value ¾ÍÊǸÃÐÐËùÓÐ×Ö·û¡£»»³É
Spark Ö®ºóµÄ´úÂëÈçÇåµ¥ 10 Ëùʾ¡£
Çåµ¥ 10. Spark ·½Ê½ Map º¯Êý
lines.map(line => (line.length, 1)) |
ÔÚ Spark ÀÊäÈëÊǵ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯ (Resilient Distributed Dataset)£¬Spark
²»ÐèÒª key-value ¼üÖµ¶Ô£¬´úÖ®µÄÊÇ Scala Ôª×æ (tuple)£¬ËüÊÇͨ¹ý (line.length,
1) ÕâÑùµÄ (a,b) Óï·¨´´½¨µÄ¡£ÒÔÉÏ´úÂëÖÐ map() ²Ù×÷ÊÇÒ»¸ö RDD£¬(line.length,
1) Ôª×æ¡£µ±Ò»¸ö RDD °üº¬Ôª×æÊ±£¬ËüÒÀÀµÓÚÆäËû·½·¨£¬ÀýÈç reduceByKey()£¬¸Ã·½·¨¶ÔÓÚÖØÐÂÉú³É
MapReduce ÌØÐÔ¾ßÓÐÖØÒªÒâÒå¡£
Çåµ¥ 11 Ëùʾ´úÂëÊÇ Hadoop MapReduce ͳ¼ÆÃ¿Ò»ÐеÄ×Ö·ûÊý£¬È»ºóÒÔ Reduce ·½Ê½Êä³ö¡£
Çåµ¥ 11. MapReduce ·½Ê½ Reduce º¯Êý
public class LineLengthReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { @Override protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(length, new IntWritable(sum)); } } |
Spark ÀïÃæµÄ¶ÔÓ¦´úÂëÈçÇåµ¥ 12 Ëùʾ¡£
Çåµ¥ 12. Spark ·½Ê½ Reduce º¯Êý
val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _) |
Spark µÄ RDD API ÓÐÒ»¸ö reduce() ·½·¨£¬Ëü»á reduce ËùÓÐµÄ key-value
¼üÖµ¶Ôµ½Ò»¸ö¶ÀÁ¢µÄ value¡£
ÎÒÃÇÏÖÔÚÐèҪͳ¼Æ´óд×Öĸ¿ªÍ·µÄµ¥´ÊÊýÁ¿£¬¶ÔÓÚÎı¾µÄÿһÐжøÑÔ£¬Ò»¸ö Mapper ¿ÉÄÜÐèҪͳ¼ÆºÜ¶à¸ö¼üÖµ¶Ô£¬´úÂëÈçÇåµ¥
13 Ëùʾ¡£
Çåµ¥ 13. MapReduce ·½Ê½¼ÆËã×Ö·ûÊýÁ¿
public class CountUppercaseMapper extends Mapper<LongWritable,Text,Text,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { for (String word : line.toString().split(" ")) { if (Character.isUpperCase(word.charAt(0))) { context.write(new Text(word), new IntWritable(1)); } } } } |
ÔÚ Spark ÀïÃæ£¬¶ÔÓ¦µÄ´úÂëÈçÇåµ¥ 14 Ëùʾ¡£
Çåµ¥ 14. Spark ·½Ê½¼ÆËã×Ö·ûÊýÁ¿
lines.flatMap( _.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1)) ) |
MapReduce ÒÀÀµµÄ Map ·½·¨ÕâÀï²¢²»ÊÊÓã¬ÒòΪÿһ¸öÊäÈë±ØÐë¶ÔÓ¦Ò»¸öÊä³ö£¬ÕâÑùµÄ»°£¬Ã¿Ò»ÐпÉÄÜÕ¼Óõ½ºÜ¶àµÄÊä³ö¡£Ïà·´µÄ£¬Spark
ÀïÃæµÄ Map ·½·¨±È½Ï¼òµ¥¡£Spark ÀïÃæµÄ·½·¨ÊÇ£¬Ê×ÏȶÔÿһÐÐÊý¾Ý½øÐлã×ܺó´æÈëÒ»¸öÊä³ö½á¹ûÎïÊý×飬Õâ¸öÊý×é¿ÉÄÜÊǿյģ¬Ò²¿ÉÄܰüº¬Á˺ܶàµÄÖµ£¬×îÖÕÕâ¸öÊý×é»á×÷Ϊһ¸ö
RDD ×÷ΪÊä³öÎï¡£Õâ¾ÍÊÇ flatMap() ·½·¨µÄ¹¦ÄÜ£¬Ëü¶ÔÿһÐÐÎı¾ÀïµÄµ¥´Êת»»³Éº¯ÊýÄÚ²¿µÄÔª×éºó½øÐÐÁ˹ýÂË¡£
ÔÚ Spark ÀïÃæ£¬reduceByKey() ·½·¨¿ÉÒÔ±»ÓÃÀ´Í³¼ÆÃ¿ÆªÎÄÕÂÀïÃæ³öÏÖµÄ×ÖĸÊýÁ¿¡£Èç¹ûÎÒÃÇÏëͳ¼ÆÃ¿Ò»ÆªÎÄÕÂÀïÃæ³öÏֵĴóд×ÖĸÊýÁ¿£¬ÔÚ
MapReduce Àï³ÌÐò¿ÉÒÔÈçÇåµ¥ 15 Ëùʾ¡£
Çåµ¥ 15. MapReduce ·½Ê½
public class CountUppercaseReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text word, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum)); } } |
ÔÚ Spark À´úÂëÈçÇåµ¥ 16 Ëùʾ¡£
Çåµ¥ 16. Spark ·½Ê½
groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) } |
groupByKey() ·½·¨¸ºÔðÊÕ¼¯Ò»¸ö key µÄËùÓÐÖµ£¬²»Ó¦ÓÃÓÚÒ»¸ö reduce ·½·¨¡£±¾ÀýÖУ¬key
±»×ª»»³É´óд×Öĸ£¬Öµ±»Ö±½ÓÏà¼ÓËã³ö×ܺ͡£µ«ÕâÀïÐèҪעÒ⣬Èç¹ûÒ»¸ö key ÓëºÜ¶à¸ö value Ïà¹ØÁª£¬¿ÉÄÜ»á³öÏÖ
Out Of Memory ´íÎó¡£
Spark ÌṩÁËÒ»¸ö¼òµ¥µÄ·½·¨¿ÉÒÔת»» key ¶ÔÓ¦µÄÖµ£¬Õâ¸ö·½·¨°Ñ reduce ·½·¨¹ý³ÌÒÆ½»¸øÁË
Spark£¬¿ÉÒÔ±ÜÃâ³öÏÖ OOM Òì³£¡£
reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) } |
setup() ·½·¨ÔÚ MapReduce ÀïÃæÖ÷ÒªµÄ×÷ÓÃÊÇÔÚ map ·½·¨¿ªÊ¼Ç°¶ÔÊäÈë½øÐд¦Àí£¬³£Óõij¡¾°ÊÇÁ¬½ÓÊý¾Ý¿â£¬¿ÉÒÔÔÚ
cleanup() ·½·¨ÖÐÊÍ·ÅÔÚ setup() ·½·¨ÀïÃæÕ¼ÓõÄ×ÊÔ´¡£
Çåµ¥ 17. MapReduce ·½Ê½
public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> { private Connection dbConnection; @Override protected void setup(Context context) { dbConnection = ...; } ... @Override protected void cleanup(Context context) { dbConnection.close(); } } |
ÔÚ Spark ÀïÃæÃ»ÓÐÕâÑùµÄ·½·¨¡£
½áÊøÓï
ͨ¹ý±¾ÎĵÄѧϰ£¬¶ÁÕßÁ˽âÁË MapReduce ºÍ Spark Ö®¼äµÄ²îÒì¼°Çл»³É±¾¡£±¾ÎÄÕë¶ÔµÄÊǶÔ
Spark ÍêȫûÓÐÁ˽âµÄÓû§£¬ºóÐøÎÄÕ»á´Óʵ¼ÊÓ¦Óóö·¢£¬´Ó°²×°¡¢Ó¦ÓóÌÐòµÄ½Ç¶È¸ø³ö¸ü¼Óʵ¼ÊµÄ½Ì³Ì¡£
|