±¾ÎĽéÉÜÁËÈçºÎÏÂÔØ¡¢²¿Êð
Spark ¼°Ê¾Àý´úÂëµÄÔËÐС£´ËÍ⣬ÉîÈë½éÉÜÁËÔËÐдúÂëµÄ¹ý³Ì¡¢½Å±¾ÄÚÈÝ£¬Í¨¹ýÕâЩ½éÉÜÁ¦ÇóÈöÁÕß¿ÉÒÔ¿ìËÙµØÉÏÊÖ
Spark¡£Ä¿Ç°ÊÐÃæÉÏ·¢²¼µÄ Spark ÖÐÎÄÊé¼®¶ÔÓÚ³õѧÕßÀ´Ëµ´ó¶à½ÏΪÄѶÁ¶®£¬×÷ÕßÁ¦ÇóÍÆ³öһϵÁÐ Spark
ÎÄÕ£¬ÈöÁÕßÄܹ»´Óʵ¼ÊÈëÊֵĽǶÈÀ´Á˽â Spark¡£ºóÐø³ýÁËÓ¦ÓÃÖ®ÍâµÄÎÄÕ£¬»¹»áÖÂÁ¦ÓÚ»ùÓÚ Spark
µÄϵͳ¼Ü¹¹¡¢Ô´´úÂë½âÊ͵ȷ½ÃæµÄÎÄÕ·¢²¼¡£
Spark ²¿Êð
¿¼Âǵ½¶ÁÕß¿ÉÄÜʹÓá°ssh secure shell¡±ÕâÑùµÄ¹¤¾ßµÇ½ Linux£¬ËùÒÔ½âÊÍÒ»ÏÂÈçºÎÉèÖù¤¾ßÖ§³ÖÖÐÎÄ¡£
ÈçºÎÉèÖà ssh secure shell Ö§³ÖÖÐÎÄ
Ö»ÐèÒªÉèÖÃÏÂ/etc/sysconfig/i18n ÎļþÄÚÈÝÈçÇåµ¥ 1
Ëùʾ¡£
Çåµ¥ 1. ÎļþÄÚÈÝ
LANG="zh_CN.GB18030" SUPPORTED="zh_CN.GB18030:zh_CN:zh:en_US.UTF-8:en_US:en" SYSFONT="latarcyrheb-sun16" |
±£´æÖ®ºó,SSH ÖØÐµÇ¼»òÔËÐÐÃüÁî source ./i18n ¾Í¿ÉÒÔÏÔʾÖÐÎÄ¡£
Èç¹ûÏëÒªÖ§³ÖÓû§µÇ½ºó×Ô¶¯Ö§³ÖÖÐÎÄ£¬¿ÉÒÔ°Ñ source /etc/sysconfig/i18n ÕâÒ»ÐдúÂë¼ÓÈëµ½/etc/profile
Õâ¸öÎļþÄÚ£¬ÕâÑù¿ÉÒÔÈ·±£ root ÕË»§µÇ½ʱ×Ô¶¯¼ÓÔØÖÐÎÄÉèÖá£
×¢Ò⣺±¾ÎÄËùÉæ¼°µÄ Linux ²Ù×÷ϵͳÊÇ CentosV6.5 °æ±¾£¬JDK Ϊ JDK7 °æ±¾£¬Spark
°æ±¾Îª v1.2.1¡£
Spark ÏÂÔØ¼°°²×°
È¥ Apache Spark ¹ÙÍøÏÂÔØ Spark Ô´´úÂë¡¢±àÒëºÃµÄ°²×°Îļþ£¬Apache ¹Ù·½µÄÏÂÔØµØÖ·Îª
http://spark.apache.org/downloads.html¡£

ͼ 1. ÏÂÔØÍøÕ¾½ØÍ¼
Èçͼ 1 Ëùʾ£¬Ñ¡ÔñµÄÊÇ v1.2.1 µÄÔ´´úÂ룬ÓÉÓÚÊÇÕë¶Ô Hadoop2.4 ±àÒëµÄ°²×°Îļþ£¬ËùÒÔ»¹ÐèÒªÏÂÔØ
Hadoop2.4 µÄ°²×°°ü¡£³ý´ËÖ®Í⣬Spark ÒÀÀµÓÚ Java ºÍ Python£¬ÐèҪȷ±£ Linux
·þÎñÆ÷Éϰ²×°ÁËÕâÁ½¸öÈí¼þµÄ¿ª·¢°ü¡£Çåµ¥ 2 Ëùʾ´úÂë¿ÉÒԲ鿴Á½¸öÈí¼þµÄ°æ±¾¡£
Çåµ¥ 2. ²é¿´°æ±¾
[root@localhost:3 spark]# java -version \java version "1.7.0_65" OpenJDK Runtime Environment (rhel-2.5.1.2.el6_5-x86_64 u65-b17) OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode) [root@localhost:3 spark]# python -v # installing zipimport hook import zipimport # builtin # installed zipimport hook # /usr/lib64/python2.6/site.pyc matches /usr/lib64/python2.6/site.py import site # precompiled from /usr/lib64/python2.6/site.pyc # /usr/lib64/python2.6/os.pyc matches /usr/lib64/python2.6/os.py import os # precompiled from /usr/lib64/python2.6/os.pyc import errno # builtin import posix # builtin |
Çåµ¥ 2 Ëùʾ£¬Java °æ±¾ÊÇ 1.7.0_65£¬Python µÄ°æ±¾ÊÇ 2.6£¬×Ô¶¯½øÈëµ½ Python
ÃüÁîģʽ£¬¿ÉÒÔͨ¹ý°´ Ctrl+D Í˳ö¸Õ²Å½øÈëµÄ Python ÃüÁîģʽ¡£
Spark ÎļþÏÂÔØÍê±Ïºó£¬Í¨¹ýÇåµ¥ 3 ËùʾÃüÁî½âѹËõÎļþ¡£
Çåµ¥ 3. ½âѹËõÎļþ
gunzip spark-1.2.1-bin-hadoop2.4.tgz tar xvf spark-1.2.1-bin-hadoop2.4.tar |
ÕâÑù»ù±¾ÉϾͿÉÒÔË㲿ÊðÍê±ÏÁË£¬¶Ô£¬¾ÍÕâô¼òµ¥¡£
Spark ÔËÐÐģʽ
Spark µÄÔËÐÐģʽ¶àÖÖ¶àÑù¡¢Áé»î¶à±ä£¬²¿ÊðÔÚµ¥»úÉÏʱ£¬¼È¿ÉÒÔÓñ¾µØÄ£Ê½ÔËÐУ¬Ò²¿ÉÒÔÓÃα·Ö²¼Ê½Ä£Ê½ÔËÐУ¬¶øµ±ÒÔ·Ö²¼Ê½¼¯ÈºµÄ·½Ê½²¿Êðʱ£¬Ò²ÓÐÖÚ¶àµÄÔËÐÐģʽ¿ÉÒÔ¹©Ñ¡Ôñ£¬ÕâÈ¡¾öÓÚ¼¯ÈºµÄʵ¼ÊÇé¿ö£¬µ×²ãµÄ×ÊÔ´µ÷¶È¼È¿ÉÒÔÒÀÀµÓÚÍⲿµÄ×ÊÔ´µ÷¶È¿ò¼Ü£¬Ò²¿ÉÒÔʹÓÃ
Spark ÄÚ½¨µÄ Standalone ģʽ¡£¶ÔÓÚÍⲿ×ÊÔ´µ÷¶È¿ò¼ÜµÄÖ§³Ö£¬Ä¿Ç°µÄʵÏÖ°üÀ¨Ïà¶ÔÎȶ¨µÄ Mesos
ģʽ£¬ÒÔ¼°»¹ÔÚ³ÖÐø¿ª·¢¸üÐÂÖÐµÄ Hadoop YARN ģʽ¡£
ÔÚʵ¼ÊÓ¦ÓÃÖУ¬Spark Ó¦ÓóÌÐòµÄÔËÐÐģʽȡ¾öÓÚ´«µÝ¸ø SparkContext µÄ MASTER
»·¾³±äÁ¿µÄÖµ£¬¸ö±ðģʽ»¹ÐèÒªÒÀÀµ¸¨ÖúµÄ³ÌÐò½Ó¿ÚÀ´ÅäºÏʹÓã¬Ä¿Ç°ËùÖ§³ÖµÄ MASTER »·¾³±äÁ¿ÓÉÌØ¶¨µÄ×Ö·û´®»ò
URL Ëù×é³É¡£ÀýÈ磺
Local[N]£º±¾µØÄ£Ê½£¬Ê¹Óà N ¸öÏ̡߳£
Local Cluster[Worker,core,Memory]£ºÎ±·Ö²¼Ê½Ä£Ê½£¬¿ÉÒÔÅäÖÃËùÐèÒªÆô¶¯µÄÐéÄ⹤×÷½ÚµãµÄÊýÁ¿£¬ÒÔ¼°Ã¿¸ö¹¤×÷½ÚµãËù¹ÜÀíµÄ
CPU ÊýÁ¿ºÍÄÚ´æ³ß´ç¡£
Spark://hostname:port:Standalone ģʽ£¬ÐèÒª²¿Êð Spark µ½Ïà¹Ø½Úµã£¬URL
Ϊ Spark Master Ö÷»úµØÖ·ºÍ¶Ë¿Ú¡£
Mesos://hostname:port:Mesos ģʽ£¬ÐèÒª²¿Êð Spark ºÍ Mesos µ½Ïà¹Ø½Úµã£¬URL
Ϊ Mesos Ö÷»úµØÖ·ºÍ¶Ë¿Ú¡£
YARN standalone/Yarn cluster:YARN ģʽһ£¬Ö÷³ÌÐòÂß¼ºÍÈÎÎñ¶¼ÔËÐÐÔÚ
YARN ¼¯ÈºÖС£
YARN client:YARN ģʽ¶þ£¬Ö÷³ÌÐòÂß¼ÔËÐÐÔÚ±¾µØ£¬¾ßÌåÈÎÎñÔËÐÐÔÚ YARN ¼¯ÈºÖС£
ÔËÐÐ Spark ʾÀý
±¾ÎÄËùÓеÄÀý×Ó¶¼ÊÇÔÚµ¥»ú»·¾³ÏÂÔËÐеģ¬Ñ¡ÔñµÄ¶¼ÊDZ¾µØÄ£Ê½¡£Ëæ Spark °²×°°üÏÂÔØµÄʾÀý´úÂë¶¼ÔÚ examples/src/main
Ŀ¼ÏÂÃæ£¬¿ÉÒÔͨ¹ýÔËÐÐ bin/run-example<class>[params] ÃüÁʽÔËÐÐʾÀý³ÌÐò¡£ÀýÈ磬ÔËÐÐ
SparkPI µÄ³ÌÐò£¬¸Ã³ÌÐò»á¼ÆËã³öÒ»¸ö PI Öµ£¬²¢´òÓ¡½á¹ûÔÚ¿ØÖÆÌ¨ÉÏ¡£
ÎÒÃÇÕâÀï°ÑÊä³öÈÕÖ¾ÖØ¶¨Ïòµ½µ±Ç°Ä¿Â¼Ï嵀 Sparkpilong.txt ÈÕÖ¾Îļþ¡£
Çåµ¥ 4. ÔËÐдúÂë
[root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example SparkPi 10 > Sparkpilog.txt |
Êä³öµÄÈÕÖ¾·ÖΪÁ½²¿·Ö£¬Ò»²¿·ÖÊÇͨÓÃÈÕÖ¾ÐÅÏ¢£¬ËüÓɺóÃæ»á½éÉܵÄһϵÁнű¾¼°³ÌÐò²úÉú£¬ÁíÒ»²¿·ÖÊÇÔËÐгÌÐòµÄÊä³ö½á¹û£¬´Ë´¦ÊǼÆËã
PI µÄÊä³ö½á¹û¡£Çåµ¥ 5 ËùʾÊÇͨÓÃÈÕÖ¾ÐÅÏ¢£¬Çåµ¥ 6 ËùʾÊÇ SparkPI ³ÌÐòµÄÔËËã½á¹û¡£
Çåµ¥ 5. ͨÓÃÈÕÖÁÐÅÏ¢
Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/05/19 09:58:38 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.10.19.186 instead (on interface eth0) 15/05/19 09:58:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/05/19 09:58:38 INFO SecurityManager: Changing view acls to: root 15/05/19 09:58:38 INFO SecurityManager: Changing modify acls to: root 15/05/19 09:58:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/05/19 09:58:43 INFO DAGScheduler: Stopping DAGScheduler 15/05/19 09:58:44 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/05/19 09:58:44 INFO MemoryStore: MemoryStore cleared 15/05/19 09:58:44 INFO BlockManager: BlockManager stopped 15/05/19 09:58:44 INFO BlockManagerMaster: BlockManagerMaster stopped 15/05/19 09:58:44 INFO SparkContext: Successfully stopped SparkContext 15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
|
Çåµ¥ 6. ¼ÆËã½á¹û
ÉÏÃæÕë¶ÔÊäÈë²ÎÊý 10 µÄ PI ¼ÆËã½á¹ûΪ 3.142888¡£
Çåµ¥ 7 Ëùʾ´úÂëÊÇ Spark °²×°°üÀï×Ô´øµÄ SparkPI ÀàµÄÔ´´úÂë¡£
Çåµ¥ 7. SparkPI ³ÌÐòÔ´´úÂë
public final class JavaSparkPi {
public static void main(String[] args) throws
Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0])
: 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l,
slices);
int count = dataSet.map(new Function<Integer,
Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer integer, Integer integer2)
{
return integer + integer2;
}
});
System.out.println("Pi is roughly "
+ 4.0 * count / n);
jsc.stop();
}
} |
Ò»¸ö Spark µÄÈÎÎñ¶ÔÓ¦Ò»¸ö RDD£¬RDD Êǵ¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯£¬¼´Ò»¸ö RDD ´ú±íÒ»¸ö±»·ÖÇøµÄÖ»¶ÁÊý¾Ý¼¯¡£Ò»¸ö
RDD µÄÉú³ÉÖ»ÓÐÁ½ÖÖ;¾¶£¬Ò»ÊÇÀ´×ÔÓÚÄڴ漯ºÏºÍÍⲿ´æ´¢ÏµÍ³£¬ÁíÒ»ÖÖÊÇͨ¹ýת»»²Ù×÷À´×ÔÓÚÆäËû RDD£¬±ÈÈç
map¡¢filter¡¢join£¬µÈµÈ¡£Çåµ¥ 7 Ëùʾ³ÌÐò¶¨ÒåÁËÒ»¸öÃûΪ dataSet µÄ RDD¡£
Çåµ¥ 5 Êä³öµÄ´óÁ¿ÐÅÏ¢¶¼ÊǼÆËã»úÐÅÏ¢¡¢Spark ÐÅÏ¢£¬ÕâЩÐÅÏ¢¶¼ÊÇͨ¹ýÄÚ²¿µ÷ÓõÄÈô¸É½Å±¾Êä³öµÄ£¬ÎÒÃÇÀ´¿´¿´¾ßÌåÔËÐÐʾÀý´úÂëµÄ½Å±¾¡£ÎÒÃÇÊ×ÏÈÔËÐнű¾
run-example£¬ËüµÄºËÐÄ´úÂëÈçÇåµ¥ 8-10 Ëùʾ¡£
Çåµ¥ 8. run-example ½Å±¾Ô´´úÂë
FWDIR="$(cd "`dirname "$0"`"/..; pwd)" export SPARK_HOME="$FWDIR" EXAMPLES_DIR="$FWDIR"/examples |
Çåµ¥ 8 Ëùʾ´úÂëÉèÖÃÁËʾÀý´úÂëĿ¼£¬ÕâÀïÊǵ±Ç°Ä¿Â¼Ï嵀 examples Îļþ¼Ð¡£½ÓÏÂÀ´Ö¸¶¨µÚÒ»¸ö²ÎÊýÊÇÔËÐеÄÀàÃû³Æ£¬ÈçÇåµ¥
9 Ëùʾ¡£
Çåµ¥ 9. run-example ½Å±¾Ô´´úÂë
if [ -n "$1" ]; then EXAMPLE_CLASS="$1" Shift |
½Å±¾µ÷Óà spark-submit ½Å±¾½øÈëÏÂÒ»Ö´Ðв㼶¡£
Çåµ¥ 10. run-example ½Å±¾Ô´´úÂë
"$FWDIR"/bin/spark-submit \ --master $EXAMPLE_MASTER \ --class $EXAMPLE_CLASS \ "$SPARK_EXAMPLES_JAR" \ "$@" |
½ÓÏÂÀ´ÎÒÃÇÀ´¿´¿´ spark-submit ½Å±¾ÀïÃæµÄÄÚÈÝ¡£
Çåµ¥ 11. spark-submit ½Å±¾Ô´´úÂë
while (($#)); do if [ "$1" = "--deploy-mode" ]; then SPARK_SUBMIT_DEPLOY_MODE=$2 elif [ "$1" = "--properties-file" ]; then SPARK_SUBMIT_PROPERTIES_FILE=$2 elif [ "$1" = "--driver-memory" ]; then export SPARK_SUBMIT_DRIVER_MEMORY=$2 elif [ "$1" = "--driver-library-path" ]; then export SPARK_SUBMIT_LIBRARY_PATH=$2 elif [ "$1" = "--driver-class-path" ]; then export SPARK_SUBMIT_CLASSPATH=$2 elif [ "$1" = "--driver-java-options" ]; then export SPARK_SUBMIT_OPTS=$2 elif [ "$1" = "--master" ]; then export MASTER=$2 fi shift done |
ÉÏÃæ´úÂëͨ¹ýÓû§´Ó run-example ½Å±¾Àï´«ÈëµÄ²ÎÊý£¬´Ë´¦Îª master£¬À´È·¶¨ÔËÐÐģʽ£¬È»ºóµ÷ÓÃ
spark-class ½Å±¾£¬ÈçÇåµ¥ 12 Ëùʾ¡£
Çåµ¥ 12. spark-class ½Å±¾Ô´´úÂë
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" |
Çåµ¥ 12 Ëùʾ´úÂëµ÷ÓÃÁË Spark-Class ÔËÐгÌÐò SparkSubmit¡£
¾ßÌå½éÉÜ Spark-Class ³ÌÐò´úÂë֮ǰ£¬ÎÒÃÇÀ´³¢ÊÔÔËÐÐ WordCount ʵÑ飬¼ÙÉèÒѾ´´½¨ÁËÒ»¸öÃûΪ
wordcountdata.txt µÄÎı¾Îļþ£¬¸ÃÎļþ±»·ÅÖÃÔÚĿ¼ (/home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/spark-1.2.1-bin-hadoop2.4)£¬½Ó×ÅÈçÇåµ¥
13 Ëùʾ£¬¿ªÊ¼ÔËÐгÌÐò¡£
Çåµ¥ 13. ÔËÐÐ WordCount ³ÌÐò
root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example JavaWordCount ./wordcountdata.txt |
Êä³öÈçÇåµ¥ 14 Ëùʾ£¬ÕâÀïºöÂÔÁËÓëÇåµ¥ 5 ÏàͬÊä³öµÄÄÚÈÝ£¬ÒÔ¼°´ó²¿·ÖÇåµ¥ 13 µÄ¼ÆËã½á¹û£¬Ö»ÁгöÉÙÁ¿½á¹û¡£Í³¼Æ×Ö·û³öÏÖ´ÎÊýµÄÏêϸÐÅÏ¢ÈçÇåµ¥
14 Ëùʾ¡£
Çåµ¥ 14. Êä³ö½á¹û
For: 4 SQLMLlib: 1 subfolder: 1 OS).: 1 Streaming,: 1 APIs: 1 full: 1 --master: 3 through: 1 Provisioning3rd-Party: 1 applications: 4 graph: 3 over: 1 |
½âÊÍʾÀýÔËÐйý³Ì

ͼ 2. Spark ʾÀý´úÂëÔËÐйý³ÌÃèÊöͼ
ͨ¹ýÉÏÃæµÄ½éÉÜ£¬ÎÒÃÇ¿ÉÒÔ´óÖ¿´µ½£¬Ê¾Àý´úÂëµÄÔËÐÐ˳ÐòÊÇÒÀ´Î´Ó×óÏòÓҵģ¬Run-example.sh->load-spark-env.sh->lib
Ŀ¼Ï嵀 jar °üÎļþ->spark-submit.sh->spark-class
Çåµ¥ 15 ËùʾÊÇ lib Ŀ¼ÏµÄÎļþ¡£
Çåµ¥ 15. ÎļþÄÚÈÝ
[root@localhost:3 bin]# ls -lrt ../lib ×ÜÓÃÁ¿ 236232 -rw-rw-r--. 1 1000 1000 87065934 2 Ô 3 11:45 spark-examples-1.2.1-hadoop2.4.0.jar -rw-rw-r--. 1 1000 1000 148865850 2 Ô 3 11:45 spark-assembly-1.2.1-hadoop2.4.0.jar -rw-rw-r--. 1 1000 1000 1916671 2 Ô 3 11:45 spark-1.2.1-yarn-shuffle.jar -rw-rw-r--. 1 1000 1000 1809447 2 Ô 3 11:45 datanucleus-rdbms-3.2.9.jar -rw-rw-r--. 1 1000 1000 1890075 2 Ô 3 11:45 datanucleus-core-3.2.10.jar -rw-rw-r--. 1 1000 1000 339666 2 Ô 3 11:45 datanucleus-api-jdo-3.2.6.jar |
spark-examples ½Å±¾ÀïÃæÓÐÇåµ¥ 16 Ëùʾ´úÂ룬Ö÷ÒªÊÇÓÃÓÚ¼ÓÔØÀà¡£
Çåµ¥ 16. spark-examples ´úÂë 1
for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do if [[ ! -e "$f" ]]; then echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2 echo "You need to build Spark before running this program" 1>&2 exit 1 fi SPARK_EXAMPLES_JAR="$f" JAR_COUNT=$((JAR_COUNT+1)) done |
Çåµ¥ 17 Ëùʾ´úÂë×öÁËÒ»²ã±£»¤£¬Èç¹û·¢ÏÖ³¬¹ý 1 ¸öÒÔÉ쵀 spark-example °üÎļþ£¬Å׳ö´íÎó¡£
Çåµ¥ 17. spark-examples ´úÂë 2
if [ "$JAR_COUNT" -gt "1" ]; then echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2 ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2 echo "Please remove all but one jar." 1>&2 exit 1 fi |
Çåµ¥ 11 ºÍÇåµ¥ 12 ÒѾ½éÉܹý£¬×îÖÕ³ÌÐòÓÉ spark-class Ö´ÐС£Client ģʽ»áÔËÐÐ
Spark Çý¶¯ÔÚͬһ¸ö JVM ÀïÃæ£¬È»ºóµ÷Óà spark-class ÔËÐгÌÐò¡£Çåµ¥ 12 ÔËÐеÄÊä³öÈçÇåµ¥
18 Ëùʾ¡£
Çåµ¥ 18. ÔËÐÐÊä³ö
master local[*] --class org.apache.spark.examples.SparkPi /home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/spark-1.2.1-bin-hadoop2.4/ lib/spark-examples-1.2.1-hadoop2.4.0.jar 10 |
spark-class ½Å±¾Ê×ÏÈÈ·¶¨ÔËÐÐģʽ£¬ÈçÇåµ¥ 19 Ëùʾ¡£
Çåµ¥ 19. Ŀ¼Áбí
case "$1" in # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. 'org.apache.spark.deploy.master.Master') OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS" OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM} ;; |
¶ÔÓÚ JDK8 ÓÐÌØÊâµÄÉèÖã¬JDK8 ¿ªÊ¼²»ÔÙÖ§³Ö MaxPermSize µÈ²ÎÊýÉèÖà JVM¡£
Çåµ¥ 20. JDK8
# Set JAVA_OPTS to be able to load native libraries and to set heap size if [ "$JAVA_VERSION" -ge 18 ]; then JAVA_OPTS="$OUR_JAVA_OPTS" else JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS" fi JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM" |
ÈÆÁËÒ»´óȦ£¬×îÖյįô¶¯³ÌÐòÈçÇåµ¥ 21 Ëùʾ¡£
Çåµ¥ 21. Æô¶¯³ÌÐò
if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then # This is used only if the properties file actually contains these special configs # Export the environment variables needed by SparkSubmitDriverBootstrapper export RUNNER export CLASSPATH export JAVA_OPTS export OUR_JAVA_MEM export SPARK_CLASS=1 shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@" else # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then echo -n "Spark Command: " 1>&2 echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 echo -e "========================================\n" 1>&2 fi exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" fi |
×îÖÕÖ´ÐÐʾÀýµÄÈÝÆ÷ÊÇÓÉ org.apache.spark.deploy.SparkSubmitDriverBootstrapper
Àà²úÉúµÄ£¬²ÎÊýΪÇåµ¥ 22 Ëùʾ¡£
Çåµ¥ 22. ÉèÖÃÔËÐвÎÊý
org.apache.spark.deploy.SparkSubmit --master local[*] --class org.apache.spark.examples.SparkPi /home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/ spark-1.2.1-bin-hadoop2.4/lib/spark-examples-1.2.1-hadoop2.4.0.jar 10 |
Çåµ¥ 23. SparkSubmitDriverBootstrapper ´úÂë
private[spark] object SparkSubmitDriverBootstrapper { // Start the driver JVM val filteredCommand = command.filter(_.nonEmpty) val builder = new ProcessBuilder(filteredCommand) val env = builder.environment()
if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty)
{
val libraryPaths = confLibraryPath ++ sys.env.get(
Utils.libraryPathEnvName)
env.put(Utils.libraryPathEnvName, libraryPaths.mkString(
sys.props("path.separator")))
}
val process = builder.start()
// If we kill an app while it's running, its
sub-process should be killed too.
Runtime.getRuntime().addShutdownHook(new Thread()
{
override def run() = {
if (process != null) {
process.destroy()
process.waitFor()
}
}
})
// Redirect stdout and stderr from the child
JVM
val stdoutThread = new RedirectThread(process.getInputStream,
System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream,
System.err, "redirect stderr")
stdoutThread.start()
stderrThread.start()
// Redirect stdin to child JVM only if we're
not running Windows. This is because the
// subprocess there already reads directly from
our stdin, so we should avoid spawning a
// thread that contends with the subprocess in
reading from System.in.
val isWindows = Utils.isWindows
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in,
process.getOutputStream,
"redirect stdin",propagateEof = true)
stdinThread.start()
// Spark submit (JVM) may run as a subprocess,and
so this JVM should terminate on
// broken pipe, signaling that the parent process
has exited.
//This is the case if the application is launched
directly from python,
//as in the PySpark shell. In Windows,the termination
logic is handled in java_gateway.py
if (isSubprocess) {
stdinThread.join()
process.destroy()
}
}
val returnCode = process.waitFor()
sys.exit(returnCode)
}
} |
´ÓÉÏÃæµÄ Scala ´úÂëÀïÃæ¿ÉÒÔ¿´µ½£¬Scala ×îÖÕÆô¶¯µÄÊÇ JVM Ị̈߳¬ËùÒÔËü¿ÉÒÔ·ÃÎÊ Java
µÄ¿âÎļþ£¬ÀýÈç java.io.File¡£Í¨¹ý Main º¯ÊýµÄ·½Ê½Æô¶¯ÁËÒ»¸ö JVM ½ø³Ì£¬ËæºóÕë¶Ô¸Ã½ø³ÌÓÖÍйÜÁËһϵÁÐÏ̼߳¶±ðµÄ²Ù×÷¡£
WordCount µÄ Java ºÍ Scala ʵÏÖ
WordCount µÄ Java ´úÂëÈçÇåµ¥ 24 Ëùʾ¡£ |
Çåµ¥ 24. WordCount µÄ Java ʵÏÖ´úÂë
public final class JavaWordCount { private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws
Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount
<file>");
System.exit(1);
}
//¶ÔÓÚËùÓÐµÄ Spark ³ÌÐò¶øÑÔ£¬Òª½øÐÐÈκβÙ×÷£¬Ê×ÏÈÒª´´½¨Ò»¸ö Spark µÄÉÏÏÂÎÄ£¬
//ÔÚ´´½¨ÉÏÏÂÎĵĹý³ÌÖУ¬³ÌÐò»áÏò¼¯ÈºÉêÇë×ÊÔ´ÒÔ¼°¹¹½¨ÏàÓ¦µÄÔËÐл·¾³¡£
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//ÀûÓà textFile ½Ó¿Ú´ÓÎļþϵͳÖжÁÈëÖ¸¶¨µÄÎļþ£¬·µ»ØÒ»¸ö RDD ʵÀý¶ÔÏó¡£
//RDD µÄ³õʼ´´½¨¶¼ÊÇÓÉ SparkContext À´¸ºÔðµÄ£¬½«ÄÚ´æÖеļ¯ºÏ»òÕßÍⲿÎļþϵͳ×÷ΪÊäÈëÔ´
JavaRDD<String> lines = ctx.textFile(args[0],
1);
JavaRDD<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>()
{
@Override
public Tuple2<String, Integer> call(String
s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts =
ones.reduceByKey(
new Function2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output
= counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": "
+ tuple._2());
}
ctx.stop();
}
} |
ÕâÀïÓбØÒª½éÉÜÒ»ÏÂÕâÀïÓõ½µÄ¼¸¸öº¯Êý¡£Ê×ÏÈÊÇ map º¯Êý£¬Ëü¸ù¾ÝÏÖÓеÄÊý¾Ý¼¯·µ»ØÒ»¸öеķֲ¼Ê½Êý¾Ý¼¯£¬ÓÉÿ¸öÔÔªËØ¾¹ý
func º¯Êýת»»ºó×é³É£¬Õâ¸ö¹ý³ÌÒ»°ã½Ð×öת»»£¨transformation£©£»flatMap º¯ÊýÀàËÆÓÚ
map º¯Êý£¬µ«ÊÇÿһ¸öÊäÈëÔªËØ£¬»á±»Ó³ÉäΪ 0 µ½¶à¸öÊä³öÔªËØ£¬Òò´Ë£¬func º¯ÊýµÄ·µ»ØÖµÊÇÒ»¸ö Seq£¬¶ø²»Êǵ¥Ò»ÔªËØ£¬¿ÉÒÔ´ÓÉÏÃæµÄ´úÂëÖп´³ö£»reduceByKey
º¯ÊýÔÚÒ»¸ö£¨K£¬V) ¶ÔµÄÊý¾Ý¼¯ÉÏʹÓ㬷µ»ØÒ»¸ö£¨K£¬V£©¶ÔµÄÊý¾Ý¼¯£¬key ÏàͬµÄÖµ£¬¶¼±»Ê¹ÓÃÖ¸¶¨µÄ
reduce º¯Êý¾ÛºÏµ½Ò»Æð¡£ ¶ÔÓ¦µÄ Scala °æ±¾´úÂëÈçÇåµ¥ 25 Ëùʾ¡£
Çåµ¥ 25. WordCount µÄ Scala ʵÏÖ´úÂë
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ //ͳ¼Æ×Ö·û³öÏÖ´ÎÊý object WordCount { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: <file>") System.exit(1) } val conf = new SparkConf() val sc = new SparkContext(conf) val line = sc.textFile(args(0)) line.flatMap(_.split(" ")).map((_, )).reduceByKey(_+_). collect().foreach(println) sc.stop() } } |
´ÓÇåµ¥ 24 ºÍ 25 ¶Ô±È¿ÉÒÔ¿´³ö£¬Scala ÓïÑÔ¼°Æä¼òµ¥¡¢ÇáÇÉ£¬Ïà¶ÔÓÚ Java ÓïÑÔ¶øÑÔ£¬·Ç³£ÊʺÏÓÚ²¢ÐмÆËã¿ò¼ÜµÄ±àд£¬ÕâÒ²ÊÇΪʲô
Spark ¿ò¼ÜÊÇÓú¯ÊýʽÓïÑÔ Scala дµÄ£¬¶ø²»ÊÇ Java ÕâÑùµÄÃæÏò¶ÔÏóÓïÑÔ¡£
ÔËÐÐģʽ×ÜÌåÉÏÀ´Ëµ£¬¶¼»ùÓÚÒ»¸öÏàËÆµÄ¹¤×÷Á÷³Ì¡£¸ù±¾É϶¼Êǽ« Spark µÄÓ¦Ó÷ÖΪÈÎÎñµ÷¶ÈºÍÈÎÎñÖ´ÐÐÁ½¸ö²¿·Ö¡£ÎÞÂÛ±¾µØÄ£Ê½
or ·Ö²¼Ê½Ä£Ê½£¬ÆäÄÚ²¿³ÌÐòÂß¼½á¹¹¶¼ÊÇÀàËÆµÄ£¬Ö»ÊÇÆäÖв¿·ÖÄ£¿éÓÐËù¼ò»¯£¬ÀýÈç±¾µØÄ£Ê½ÖУ¬¼¯Èº¹ÜÀíÄ£¿é±»¼ò»¯Îª½ø³ÌÄÚ²¿µÄÏ̳߳ء£
ËùÓÐµÄ Spark Ó¦ÓóÌÐò¶¼Àë²»¿ª SparkContext ºÍ Executor Á½²¿·Ö£¬Executor
¸ºÔðÖ´ÐÐÈÎÎñ£¬ÔËÐÐ Executor µÄ»úÆ÷³ÆÎª Worker ½Úµã£¬SparkContext ÓÉÓû§³ÌÐòÆô¶¯£¬Í¨¹ý×ÊÔ´µ÷¶ÈÄ£¿éºÍ
Executor ͨÐÅ¡£SparkContext ºÍ Executor ÕâÁ½²¿·ÖµÄºËÐÄ´úÂëʵÏÖÔÚ¸÷ÖÖÔËÐÐģʽÖж¼Êǹ«Óõģ¬ÔÚËüÃÇÖ®ÉÏ£¬¸ù¾ÝÔËÐв¿ÊðģʽµÄ²»Í¬£¬°ü×°Á˲»Í¬µ÷¶ÈÄ£¿éÒÔ¼°Ïà¹ØµÄÊÊÅä´úÂë¡£¾ßÌåÀ´Ëµ£¬ÒÔ
SparkContext Ϊ³ÌÐòÔËÐеÄ×ÜÈë¿Ú£¬ÔÚ SparkContext µÄ³õʼ»¯¹ý³ÌÖУ¬Spark
»á·Ö±ð´´½¨ DAGScheduler ×÷Òµµ÷¶ÈºÍ TaskScheduler ÈÎÎñµ÷¶ÈÁ½¼«µ÷¶ÈÄ£¿é¡£ÆäÖУ¬×÷Òµµ÷¶ÈÄ£¿éÊÇ»ùÓÚÈÎÎñ½×¶ÎµÄ¸ß²ãµ÷¶ÈÄ£¿é£¬ËüΪÿ¸ö
Spark ×÷Òµ¼ÆËã¾ßÓÐÒÀÀµ¹ØÏµµÄ¶à¸öµ÷¶È½×¶Î (ͨ³£¸ù¾Ý Shuffle À´»®·Ö)£¬È»ºóΪÿ¸ö½×¶Î¹¹½¨³öÒ»×é¾ßÌåµÄÈÎÎñ
(ͨ³£»á¿¼ÂÇÊý¾ÝµÄ±¾µØÐÔµÈ)£¬È»ºóÒÔ TaskSets(ÈÎÎñ×é) µÄÐÎʽÌá½»¸øÈÎÎñµ÷¶ÈÄ£¿éÀ´¾ßÌåÖ´ÐС£¶øÈÎÎñµ÷¶ÈÄ£¿éÔò¸ºÔð¾ßÌåÆô¶¯ÈÎÎñ¡¢¼à¿ØºÍ»ã±¨ÈÎÎñÔËÐÐÇé¿ö¡£
±¾ÎÄÊDz¿Êð¼°Ê¾Àý´úÂë½âÊ͵ÄÉÏÆª£¬ÔÚϵͳÎÄÕµÄÖÐÆª»á¶Ô Scala ÓïÑÔ½øÐнâÊÍ£¬Óà Java ºÍ Scala
ʵÏÖÏàͬ¹¦Äܵķ½Ê½ÈöÁÕß¿ìËÙÕÆÎÕ Scala ÓïÑÔ¡£
½áÊøÓï
ͨ¹ý±¾ÎĵÄѧϰ£¬¶ÁÕßÁ˽âÁËÈçºÎÏÂÔØ¡¢²¿Êð Spark¡¢ÔËÐÐʾÀý´úÂë¡£´ËÍ⣬ÉîÈë½éÉÜÁËÔËÐдúÂëµÄ¹ý³Ì¡¢½Å±¾ÄÚÈÝ£¬Í¨¹ýÕâЩ½éÉÜÁ¦ÇóÈöÁÕß¿ÉÒÔ¿ìËÙµØÉÏÊÖ
Spark¡£Ä¿Ç°ÊÐÃæÉÏ·¢²¼µÄ Spark ÖÐÎÄÊé¼®¶ÔÓÚ³õѧÕßÀ´Ëµ´ó¶à½ÏΪÄѶÁ¶®£¬×÷ÕßÁ¦ÇóÍÆ³öһϵÁÐ Spark
ÎÄÕ£¬ÈöÁÕßÄܹ»´Óʵ¼ÊÈëÊֵĽǶÈÀ´Á˽â Spark¡£ºóÐø³ýÁËÓ¦ÓÃÖ®ÍâµÄÎÄÕ£¬»¹»áÖÂÁ¦ÓÚ»ùÓÚ Spark
µÄϵͳ¼Ü¹¹¡¢Ô´´úÂë½âÊ͵ȷ½ÃæµÄÎÄÕ·¢²¼¡£
|