1.
SparkRµÄ°²×°ÅäÖÃ
1.1. RÓëRstudioµÄ°²×°
1.1.1. RµÄ°²×°
ÎÒÃǵŤ×÷»·¾³¶¼ÊÇÔÚUbuntuϲÙ×÷µÄ£¬ËùÒÔÖ»½éÉÜUbuntuϰ²×°RµÄ·½·¨£º
1£© ÔÚ/etc/apt/sources.listÌí¼ÓÔ´
deb http://mirror.bjtu.edu.cn/cran/bin/linux/ubuntu
precise/£¬
È»ºó¸üÐÂÔ´apt-get update£»
2£© ͨ¹ýapt-get°²×°£º
sudo apt-get install r-base
1.1.2. RstudioµÄ°²×°
¹ÙÍøÓÐÏêϸ½éÉÜ£º
http://www.rstudio.com/products/rstudio/download-server/
sudo apt-get install gdebi-core
sudo apt-get install libapparmor1 # Required only for
Ubuntu, not Debian
wget http://download2.rstudio.org/rstudio-server-0.97.551-amd64.deb
sudo gdebi rstudio-server-0.97.551-amd64.deb
1.2. rJava°²×°
1.2.1. rJava½éÉÜ
rJavaÊÇÒ»¸öRÓïÑÔºÍJavaÓïÑÔµÄͨÐŽӿڣ¬Í¨¹ýµ×²ãJNIʵÏÖµ÷Óã¬ÔÊÐíÔÚRÖÐÖ±½Óµ÷ÓÃJavaµÄ¶ÔÏóºÍ·½·¨¡£
rJava»¹ÌṩÁËJavaµ÷ÓÃRµÄ¹¦ÄÜ£¬ÊÇͨ¹ýJRI(Java/R Interface)ʵÏֵġ£JRIÏÖÔÚÒѾ±»Ç¶Èëµ½rJavaµÄ°üÖУ¬ÎÒÃÇÒ²¿ÉÒÔµ¥¶ÀÊÔÓÃÕâ¸ö¹¦ÄÜ¡£ÏÖÔÚrJava°ü£¬ÒѾ³ÉΪºÜ¶à»ùÓÚJava¿ª·¢R°üµÄ»ù´¡¹¦ÄÜ×é¼þ¡£
ÕýÊÇÓÉÓÚrJavaÊǵײã½Ó¿Ú£¬²¢Ê¹ÓÃJNI×÷Ϊ½Ó¿Úµ÷Óã¬ËùÒÔЧÂʷdz£¸ß¡£ÔÚJRIµÄ·½°¸ÖУ¬JVMͨ¹ýÄÚ´æÖ±½Ó¼ÓÔØRVM£¬µ÷Óùý³ÌÐÔÄܼ¸ºõÎÞËðºÄ£¬Òò´ËÊǷdz£¸ßЧÁ¬½ÓͨµÀ£¬ÊÇRºÍJavaͨÐŵÄÊ×Ñ¡¿ª·¢°ü¡£
1.2.2. rJava°²×°
1£© ÅäÖÃrJava»·¾³
Ö´ÐÐR CMD javareconf
root@testnode4:/home/payton# R CMD javareconf
2£© Æô¶¯R²¢°²×°rJava
root@testnode4:/home/payton# R
> install.packages("rJava")
1.3. SparkRµÄ°²×°
1.3.1. SparkRµÄ´úÂëÏÂÔØ
´ÓÍøÒ³ÏÂÔØ´úÂëSparkR-pkg-master.zip https://github.com/amplab-extras/SparkR-pkg
1.3.2. SparkRµÄ´úÂë±àÒë
1£© ½âѹSparkR-pkg-master.zip£¬È»ºócd SparkR-pkg-master/
2£© ±àÒëµÄʱºòÐèÒªÖ¸Ã÷Hadoop°æ±¾ºÍSpark°æ±¾
SPARK_HADOOP_VERSION=2.4.1 SPARK_VERSION=1.2.0 ./install-dev.sh
ÖÁ´Ë£¬µ¥»ú°æµÄSparkRÒѾ°²×°Íê³É¡£
1.3.3. ·Ö²¼Ê½SparkRµÄ²¿ÊðÅäÖÃ
1£© ±àÒë³É¹¦ºó£¬»áÉú³ÉÒ»¸ölibÎļþ¼Ð£¬½øÈëlibÎļþ¼Ð£¬´ò°üSparkRΪSparkR.tar.gz£¬Õâ¸öÊÇ·Ö²¼Ê½SparkR²¿ÊðµÄ¹Ø¼ü¡£
2£© ÓÉ´ò°üºÃµÄSparkR.tar.gzÔÚ¸÷¼¯Èº½ÚµãÉϰ²×°SparkR
R CMD INSTALL SparkR.tar.gz
ÖÁ´Ë·Ö²¼Ê½SparkR´î½¨Íê³É¡£
2. SparkRµÄÔËÐÐ
2.1. SparkRµÄÔËÐлúÖÆ
SparkRÊÇAMPLab·¢²¼µÄÒ»¸öR¿ª·¢°ü£¬ÎªApache SparkÌṩÁËÇáÁ¿µÄǰ¶Ë¡£SparkRÌṩÁËSparkÖе¯ÐÔ·Ö²¼Ê½Êý¾Ý¼¯£¨RDD£©µÄAPI£¬Óû§¿ÉÒÔÔÚ¼¯ÈºÉÏͨ¹ýR
shell½»»¥ÐÔµÄÔËÐÐjob¡£SparkR¼¯ºÏÁËSpark ºÍRµÄÓÅÊÆ£¬ÏÂÃæµÄÕâ3·ùͼºÜºÃµÄ²ûÊÍÁËSparkRµÄÔËÐлúÖÆ¡£



2.2. ÓÃSparkR ½øÐÐÊý¾Ý·ÖÎö
2.2.1. SparkR»ù±¾²Ù×÷
Ê×ÏȽéÉÜÏÂSparkRµÄ»ù±¾²Ù×÷£º
µÚÒ»²½£¬¼ÓÔØSparkR°ü
library(SparkR)
µÚ¶þ²½£¬³õʼ»¯Spark context
sc <- sparkR.init(master=" spark://localhost:7077"
,sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10"))
µÚÈý²½£¬¶ÁÈëÊý¾Ý£¬sparkµÄºËÐÄÊÇResilient Distributed Dataset (RDD)£¬RDDS¿ÉÒÔ´ÓHadoopµÄInputFormatsÀ´´´½¨£¨ÀýÈ磬HDFSÎļþ£©»òͨ¹ýת»¯ÆäËüRDDS¡£ÀýÈçÖ±½Ó´ÓHDFS¶ÁÈ¡Êý¾ÝΪRDDµÄʾÀýÈçÏ£º
lines <- textFile(sc, "hdfs://sparkR_test.txt")
ÁíÍ⣬Ҳ¿ÉÒÔͨ¹ýparallelizeº¯Êý´ÓÏòÁ¿»òÁÐ±í´´½¨RDD£¬È磺
rdd <- parallelize(sc, 1:10, 2)
µ½ÁËÕâÀÄÇôÎÒÃǾͿÉÒÔÔËÓÃRDDµÄ¶¯×÷£¨actions£©ºÍת»»£¨transformations£©À´¶ÔRDD½øÐвÙ×÷²¢²úÉúеÄRDD£»Ò²¿ÉÒÔºÜÈÝÒ׵ص÷ÓÃR¿ª·¢°ü£¬Ö»ÐèÒªÔÚ¼¯ÈºÉÏÖ´ÐвÙ×÷ǰÓÃincludePackage¶ÁÈ¡R¿ª·¢°ü¾Í¿ÉÒÔÁË£¨Àý£ºincludePackage(sc,
Matrix)£©£»µ±È»»¹¿ÉÒÔ°ÑRDDת»»ÎªRÓïÑÔ¸ñʽµÄÊý¾ÝÐÎʽÀ´¶ÔËü½øÐвÙ×÷¡£
¾ßÌå¿É²Î¼ûÈçÏÂÁ½¸öÁ´½Ó£º
http://amplab-extras.github.io/SparkR-pkg/
https://github.com/amplab-extras/SparkR-pkg/wiki/SparkR-Quick-Start
ÄÇôÏÂÃæÎÒÃǾÍͨ¹ýÁ½¸öʾÀýÀ´¿´Ï SparkRÊÇÈçºÎÔËÐеİɡ£
2.2.2. SparkRʹÓþÙÀý
1£© Example1£ºword count
# ¼ÓÔØSparkR°ü library(SparkR) # ³õʼ»¯ Spark context sc <- sparkR.init(master="spark://¼¯Èºip:7077" ,sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10")) # ´ÓHDFSÉ϶ÁÈ¡Îļþ lines <- textFile(sc, "hdfs://¼¯Èºip:8020/tmp/sparkR_test.txt") # °´·Ö¸ô·û²ð·ÖÿһÐÐΪ¶à¸öÔªËØ£¬ÕâÀï·µ»ØÒ»¸öÐòÁÐ words<-flatMap(lines,function(line) {strsplit(line,"\\|")[[1]]}) # ʹÓà lapply À´¶¨Òå¶ÔӦÿһ¸öRDDÔªËØµÄÔËË㣬ÕâÀï·µ»ØÒ»¸ö£¨K£¬V)¶Ô wordCount <-lapply(words, function(word) { list(word, 1L) }) # ¶Ô£¨K£¬V£©¶Ô½øÐоۺϼÆËã counts<-reduceByKey(wordCount,"+",2L) # ÒÔÊý×éµÄÐÎʽ£¬·µ»ØÊý¾Ý¼¯µÄËùÓÐÔªËØ output <- collect(counts) # °´¸ñʽÊä³ö½á¹û for (wordcount in output) { cat(wordcount[[1]], ": ", wordcount[[2]], "\n") } ¸´ÖÆ´úÂë |
( 2£© Example2£ºlogistic regression
# ¼ÓÔØSparkR°ü library(SparkR) # ³õʼ»¯ Spark context sc <- sparkR.init(master="¼¯Èºip:7077", appName='sparkr_logistic_regression', sparkEnvir=list(spark.executor.memory='1g', spark.cores.max="10")) # ´ÓhdfsÉ϶ÁÈ¡txtÎļþ£¬ ¸ÃRDDÓÉspark¼¯ÈºµÄ4¸ö·ÖÇø¹¹³É input_rdd <- textFile(sc, "hdfs://¼¯Èºip:8020/user/payton/german.data-numeric.txt", minSplits=4) # ½âÎöÿ¸öRDDÔªËØµÄÎı¾£¨ÔÚÿ¸ö·ÖÇøÉϲ¢ÐУ© dataset_rdd <- lapplyPartition(input_rdd, function(part) { part <- lapply(part, function(x) unlist(strsplit(x, '\\s'))) part <- lapply(part, function(x) as.numeric(x[x != ''])) part }) # ÎÒÃÇÐèÒª°ÑÊý¾Ý¼¯dataset_rdd·Ö¸îΪѵÁ·¼¯£¨train£©ºÍ²âÊÔ¼¯£¨test£©Á½²¿·Ö£¬ÕâÀï # ptestΪ²âÊÔ¼¯µÄÑù±¾±ÈÀý£¬ÈçÈ¡ptest=0.2£¬¼´È¡dataset_rddµÄ20%Ñù±¾Êý×÷Ϊ²âÊÔ # ¼¯£¬80%µÄÑù±¾Êý×÷ΪѵÁ·¼¯ split_dataset <- function(rdd, ptest) { #ÒÔÊäÈëÑù±¾Êýptest±ÈÀý´´½¨²âÊÔ¼¯RDD data_test_rdd <- lapplyPartition(rdd, function(part) { part_test <- part[1:(length(part)*ptest)] part_test }) # ÓÃʣϵÄÑù±¾Êý´´½¨ÑµÁ·¼¯RDD data_train_rdd <- lapplyPartition(rdd, function(part) { part_train <- part[((length(part)*ptest)+1):length(part)] part_train }) # ·µ»Ø²âÊÔ¼¯RDDºÍѵÁ·¼¯RDDµÄÁбí list(data_test_rdd, data_train_rdd) } # ½ÓÏÂÀ´ÎÒÃÇÐèҪת»¯Êý¾Ý¼¯ÎªRÓïÑԵľØÕóÐÎʽ£¬²¢Ôö¼ÓÒ»ÁÐÊý×ÖΪ1µÄ½Ø¾àÏ # ½«Êä³öÏîy±ê×¼»¯Îª0/1µÄÐÎʽ get_matrix_rdd <- function(rdd) { matrix_rdd <- lapplyPartition(rdd, function(part) { m <- matrix(data=unlist(part, F, F), ncol=25, byrow=T) m <- cbind(1, m) m[,ncol(m)] <- m[,ncol(m)]-1 m }) matrix_rdd } # ÓÉÓÚ¸ÃѵÁ·¼¯ÖÐyµÄֵΪ1Óë0µÄÑù±¾Êý±ÈֵΪ7:3£¬ËùÒÔÎÒÃÇÐèҪƽºâ1ºÍ0µÄÑù±¾ # Êý£¬Ê¹ËüÃǵÄÑù±¾ÊýÒ»Ö balance_matrix_rdd <- function(matrix_rdd) { balanced_matrix_rdd <- lapplyPartition(matrix_rdd, function(part) { y <- part[,26] index <- sample(which(y==0),length(which(y==1))) index <- c(index, which(y==1)) part <- part[index,] part }) balanced_matrix_rdd } # ·Ö¸îÊý¾Ý¼¯ÎªÑµÁ·¼¯ºÍ²âÊÔ¼¯ dataset <- split_dataset(dataset_rdd, 0.2) # ´´½¨²âÊÔ¼¯RDD matrix_test_rdd <- get_matrix_rdd(dataset[[1]]) # ´´½¨ÑµÁ·¼¯RDD matrix_train_rdd <- balance_matrix_rdd(get_matrix_rdd(dataset[[2]])) # ½«ÑµÁ·¼¯RDDºÍ²âÊÔ¼¯RDD·ÅÈëspark·Ö²¼Ê½¼¯ÈºÄÚ´æÖÐ cache(matrix_test_rdd) cache(matrix_train_rdd) # ³õʼ»¯ÏòÁ¿theta theta<- runif(n=25, min = -1, max = 1) # logisticº¯Êý hypot <- function(z) { 1/(1+exp(-z)) } # Ëðʧº¯ÊýµÄÌݶȼÆËã gCost <- function(t,X,y) { 1/nrow(X)*(t(X)%*%(hypot(X%*%t)-y)) # ¶¨ÒåѵÁ·º¯Êý train <- function(theta, rdd) { # ¼ÆËãÌÝ¶È gradient_rdd <- lapplyPartition(rdd, function(part) { X <- part[,1:25] y <- part[,26] p_gradient <- gCost(theta,X,y) list(list(1, p_gradient)) }) agg_gradient_rdd <- reduceByKey(gradient_rdd, '+', 1L) # Ò»´Îµü´ú¾ÛºÏÊä³ö collect(agg_gradient_rdd)[[1]][[2]] } # ÓÉÌݶÈϽµËã·¨ÓÅ»¯Ëðʧº¯Êý # alpha £ºÑ§Ï°ËÙÂÊ # steps £ºµü´ú´ÎÊý # tol £ºÊÕÁ²¾«¶È alpha <- 0.1 tol <- 1e-4 step <- 1 while(T) { cat("step: ",step,"\n") p_gradient <- train(theta, matrix_train_rdd) theta <- theta-alpha*p_gradient gradient <- train(theta, matrix_train_rdd) if(abs(norm(gradient,type="F")-norm(p_gradient,type="F"))<=tol) break step <- step+1 } # ÓÃѵÁ·ºÃµÄÄ£ÐÍÔ¤²â²âÊÔ¼¯ÐÅ´ûÆÀ²â½á¹û£¨¡°good¡±»ò¡°bad¡±£©£¬²¢¼ÆËãÔ¤²âÕýÈ·ÂÊ test <- lapplyPartition(matrix_test_rdd, function(part) { X <- part[,1:25] y <- part[,26] y_pred <- hypot(X%*%theta) result <- xor(as.vector(round(y_pred)),as.vector(y)) }) result<-unlist(collect(test)) corrects = length(result[result==F]) wrongs = length(result[result==T]) cat("\ncorrects: ",corrects,"\n") cat("wrongs: ",wrongs,"\n") cat("accuracy: ",corrects/length(y_pred),"\n") ¸´ÖÆ´úÂë |
Êý¾ÝºÍÌØÕ÷¾ö¶¨ÁËЧ¹ûÉÏÏÞ£¬Ä£ÐͺÍËã·¨¾ö¶¨Á˱ƽüÕâ¸öÉÏÏÞµÄ³Ì¶È |