±à¼ÍƼö: |
±¾ÎÄÖ÷Òª½éÉÜÁË
Á÷Êý¾ÝÄ£ÄâÆ÷¡¢¶ÁÈ¡ÎļþÑÝʾ¡¢ÍøÂçÊý¾ÝÑÝʾ¡¢ÏúÊÛÊý¾Ýͳ¼ÆÑÝʾµÈÏà¹ØÄÚÈÝ
±¾ÎÄÀ´×Ô²©¿ÍÔ°£¬ÓÉ»ðÁú¹ûÈí¼þAnna±à¼¡¢ÍƼö¡£ |
|
1¡¢ÊµÀýÑÝʾ
1.1 Á÷Êý¾ÝÄ£ÄâÆ÷
1.1.1 Á÷Êý¾Ý˵Ã÷
ÔÚʵÀýÑÝʾÖÐÄ£Äâʵ¼ÊÇé¿ö£¬ÐèÒªÔ´Ô´²»¶ÏµØ½ÓÈëÁ÷Êý¾Ý£¬ÎªÁËÔÚÑÝʾ¹ý³ÌÖиü½Ó½üÕæÊµ»·¾³½«¶¨ÒåÁ÷Êý¾ÝÄ£ÄâÆ÷¡£¸ÃÄ£ÄâÆ÷Ö÷Òª¹¦ÄÜ£ºÍ¨¹ýSocket·½Ê½¼àÌýÖ¸¶¨µÄ¶Ë¿ÚºÅ£¬µ±Íⲿ³ÌÐòͨ¹ý¸Ã¶Ë¿ÚÁ¬½Ó²¢ÇëÇóÊý¾Ýʱ£¬Ä£ÄâÆ÷½«¶¨Ê±½«Ö¸¶¨µÄÎļþÊý¾ÝËæ»ú»ñÈ¡·¢Ë͸øÍⲿ³ÌÐò¡£
1.1.2 Ä£ÄâÆ÷´úÂë
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object StreamingSimulation {
// ¶¨ÒåËæ»ú»ñÈ¡ÕûÊýµÄ·½·¨
def index(length: Int) = {
import java.util.Random
val rdm = new Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
// µ÷ÓøÃÄ£ÄâÆ÷ÐèÒªÈý¸ö²ÎÊý£¬·ÖΪΪÎļþ·¾¶¡¢¶Ë¿ÚºÅºÍ¼ä¸ôʱ¼ä£¨µ¥Î»£ººÁÃ룩
if (args.length != 3) {
System.err.println("Usage: <filename>
<port> <millisecond>")
System.exit(1)
}
// »ñȡָ¶¨Îļþ×ܵÄÐÐÊý
val filename = args(0)
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length
// Ö¸¶¨¼àÌýij¶Ë¿Ú£¬µ±Íⲿ³ÌÐòÇëÇóʱ½¨Á¢Á¬½Ó
val listener = new ServerSocket(args(1).toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: "
+ socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(),
true)
while (true) {
Thread.sleep(args(2).toLong)
// µ±¸Ã¶Ë¿Ú½ÓÊÜÇëÇóʱ£¬Ëæ»ú»ñȡijÐÐÊý¾Ý·¢Ë͸ø¶Ô·½
val content = lines(index(filerow))
println(content)
out.write(content + '\n')
out.flush()
}
socket.close()
}
}.start()
}
}
} |

1.1.3 Éú³É´ò°üÎļþ
¡¾×¢¡¿¿ÉÒԲμûµÚ3¿Î¡¶Spark±à³ÌÄ£ÐÍ£¨Ï£©--IDEA´î½¨¼°ÊµÕ½¡·½øÐдò°ü

ÔÚ´ò°üÅäÖýçÃæÖУ¬ÐèÒªÔÚClass Path¼ÓÈ룺/app/scala-
2.10.4/lib/scala-swing.jar /app/scala- 2.10.4/lib/scala-library.jar
/app/scala-2.10.4/lib/scala-actors.jar £¬¸÷¸öjar°üÖ®¼äÓÿոñ·Ö¿ª£¬
µã»÷²Ëµ¥Build->Build Artifacts£¬µ¯³öÑ¡Ôñ¶¯×÷£¬Ñ¡ÔñBuild»òÕßRebuild¶¯×÷£¬Ê¹ÓÃÈçÏÂÃüÁî¸´ÖÆ´ò°üÎļþµ½Spark¸ùĿ¼ÏÂ
cd /home/hadoop/IdeaProjects /out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app /hadoop/spark-1.1.0/
ll /app/hadoop/spark-1.1.0/

1.2 ʵÀý1£º¶ÁÈ¡ÎļþÑÝʾ
1.2.1 ÑÝʾ˵Ã÷
ÔÚ¸ÃʵÀýÖÐSpark Streaming½«¼à¿ØÄ³Ä¿Â¼ÖеÄÎļþ£¬»ñÈ¡ÔÚ¼ä¸ôʱ¼ä¶ÎÄڱ仯µÄÊý¾Ý£¬È»ºóͨ¹ýSpark
Streaming¼ÆËã³ö¸Äʱ¼ä¶ÎÄÚµ¥´Êͳ¼ÆÊý¡£
1.2.2 ÑÝʾ´úÂë
import org.apache.spark.SparkConf
import org.apache.spark.streaming. {Seconds,
StreamingContext}
import org.apache.spark .streaming.StreamingContext._
object FileWordCount {
def main(args: Array[String]) {
val sparkConf = new SparkConf( ).setAppName
( "FileWordCount").setMaster ("local[2]")
// ´´½¨StreamingµÄÉÏÏÂÎÄ£¬°üÀ¨SparkµÄÅäÖúÍʱ¼ä¼ä¸ô£¬ÕâÀïʱ¼äΪ¼ä¸ô20Ãë
val ssc = new StreamingContext( sparkConf,
Seconds(20))
// Ö¸¶¨¼à¿ØµÄĿ¼£¬ÔÚÕâÀïΪ/home/hadoop/temp/
val lines = ssc.textFileStream ("/home/hadoop/temp/")
// ¶ÔÖ¸¶¨Îļþ¼Ð±ä»¯µÄÊý¾Ý½øÐе¥´Êͳ¼Æ²¢ÇÒ´òÓ¡
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_
+ _)
wordCounts.print()
// Æô¶¯Streaming
ssc.start()
ssc.awaitTermination()
}
} |

1.2.3 ÔËÐдúÂë
µÚÒ»²½ ´´½¨Streaming¼à¿ØÄ¿Â¼
´´½¨/home/hadoop/tempΪSpark Streaming¼à¿ØµÄĿ¼£¬Í¨¹ýÔÚ¸ÃĿ¼Öж¨Ê±Ìí¼ÓÎļþÄÚÈÝ£¬È»ºóÓÉSpark
Streamingͳ¼Æ³öµ¥´Ê¸öÊý

µÚ¶þ²½ ʹÓÃÈçÏÂÃüÁîÆô¶¯Spark¼¯Èº
$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh
µÚÈý²½ ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò
ÔÚIDEAÖÐÔËÐиÃʵÀý£¬ÓÉÓÚ¸ÃʵÀýûÓÐÊäÈë²ÎÊý¹Ê²»ÐèÒªÅäÖòÎÊý£¬ÔÚÔËÐÐÈÕÖ¾Öн«¶¨Ê±´òӡʱ¼ä´Á¡£Èç¹ûÔÚ¼à¿ØÄ¿Â¼ÖмÓÈëÎļþÄÚÈÝ£¬½«Êä³öʱ¼ä´ÁµÄͬʱ½«Êä³öµ¥´Êͳ¼Æ¸öÊý¡£

1.2.4 Ìí¼ÓÎı¾¼°ÄÚÈÝ 

1.2.5 ²é¿´½á¹û
µÚÒ»²½ ²é¿´IDEAÖÐÔËÐÐÇé¿ö
ÔÚIDEAµÄÔËÐÐÈÕÖ¾´°¿ÚÖУ¬¿ÉÒԹ۲쵽Êä³öʱ¼ä´ÁµÄͬʱ½«Êä³öµ¥´Êͳ¼Æ¸öÊý

µÚ¶þ²½ ͨ¹ýwebUI¼à¿ØÔËÐÐÇé¿ö
ÔÚhttp://hadoop1:4040¼à¿ØSpark StreamingÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ20ÃëÔËÐÐÒ»´Î×÷Òµ

²¢ÇÒÓëÆäËûÔËÐÐ×÷ÒµÏà±ÈÔÚ¼à¿Ø²Ëµ¥Ôö¼ÓÁË"Streaming"ÏîÄ¿£¬µã»÷¿ÉÒÔ¿´µ½¼à¿ØÄÚÈÝ£º

1.3 ʵÀý2£ºÍøÂçÊý¾ÝÑÝʾ
1.3.1 ÑÝʾ˵Ã÷
ÔÚ¸ÃʵÀýÖн«ÓÉ4.1Á÷Êý¾ÝÄ£ÄâÒÔ1ÃëµÄƵ¶È·¢ËÍÄ£ÄâÊý¾Ý£¬Spark Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿20ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡¸Ãʱ¼ä¶ÎÄÚÊý¾Ý³öÏֵįµ¶È£¬¼´ÔÚ¸÷´¦Àí¶Îʱ¼äÖ®¼ä״̬²¢ÎÞ¹ØÏµ¡£
1.3.2 ÑÝʾ´úÂë
import org.apache.spark. {SparkContext,
SparkConf}
import org.apache.spark.streaming. {Milliseconds,
Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName ("NetworkWordCount").setMaster( "local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(20))
// ͨ¹ýSocket»ñÈ¡Êý¾Ý£¬¸Ã´¦ÐèÒªÌṩSocketµÄÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬Êý¾Ý±£´æÔÚÄÚ´æºÍÓ²ÅÌÖÐ
val lines = ssc.socketTextStream(args(0),
args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
// ¶Ô¶ÁÈëµÄÊý¾Ý½øÐзָ¼ÆÊý
val words = lines.flatMap(_.split (","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_
+ _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
} |

1.3.3 ÔËÐдúÂë
µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷
Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢ËÍ/home/hadoop/upload/class7Ŀ¼ÏµÄpeople.txtÊý¾ÝÎļþ£¨¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class7ÖÐÕÒµ½£©£¬ÆäÖÐpeople.txtÊý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ã룬
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation
/home/hadoop/upload/class7/people. txt 9999 1000

ÔÚûÓгÌÐòÁ¬½Óʱ£¬¸Ã³ÌÐò´¦ÓÚ×èÈû״̬
µÚ¶þ²½ ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò
ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1ºÍ¶Ë¿ÚºÅΪ9999

1.3.4 ²é¿´½á¹û
µÚÒ»²½ ¹Û²ìÄ£ÄâÆ÷·¢ËÍÇé¿ö
IDEAÖеÄSpark Streaming³ÌÐòÔËÐÐÓëÄ£ÄâÆ÷½¨Á¢Á¬½Ó£¬µ±Ä£ÄâÆ÷¼ì²âµ½ÍⲿÁ¬½Óʱ¿ªÊ¼·¢ËͲâÊÔÊý¾Ý£¬Êý¾ÝÊÇËæ»úµÄÔÚÖ¸¶¨µÄÎļþÖлñȡһÐÐÊý¾Ý²¢·¢ËÍ£¬Ê±¼ä¼ä¸ôΪ1Ãë

µÚ¶þ²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö
ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ20ÃëÔËÐÐÒ»´Î×÷Òµ

µÚÈý²½ IDEAÔËÐÐÇé¿ö
ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲⵽µÄͳ¼Æ½á¹û£¬Í¨¹ý·ÖÎöÔÚSpark Streamingÿ¶Îʱ¼äÄÚµ¥´ÊÊýΪ20£¬ÕýºÃÊÇ20ÃëÄÚÿÃë·¢ËÍ×ÜÊý¡£

1.4 ʵÀý3£ºÏúÊÛÊý¾Ýͳ¼ÆÑÝʾ
1.4.1 ÑÝʾ˵Ã÷
ÔÚ¸ÃʵÀýÖн«ÓÉ4.1Á÷Êý¾ÝÄ£ÄâÆ÷ÒÔ1ÃëµÄƵ¶È·¢ËÍÄ£ÄâÊý¾Ý£¨ÏúÊÛÊý¾Ý£©£¬Spark Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿5ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡¸Ãʱ¼ä¶ÎÄÚÏúÊÛÊý¾Ý×ܺͣ¬ÐèҪעÒâµÄÊǸ÷´¦Àí¶Îʱ¼äÖ®¼ä״̬²¢ÎÞ¹ØÏµ¡£
1.4.2 ÑÝʾ´úÂë
import org.apache.log4j.{Level,
Logger}
import org.apache.spark. {SparkContext, SparkConf}
import org.apache.spark.streaming. {Milliseconds,
Seconds, StreamingContext}
import org.apache.spark.streaming .StreamingContext._
import org.apache.spark.storage.StorageLevel
object SaleAmount {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println ("Usage: SaleAmount
<hostname> <port> ")
System.exit(1)
}
Logger.getLogger ("org.apache.spark").setLevel( Level.ERROR)
Logger.getLogger ("org.eclipse.jetty.server" ).setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
// ͨ¹ýSocket»ñÈ¡Êý¾Ý£¬¸Ã´¦ÐèÒªÌṩSocketµÄÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬Êý¾Ý±£´æÔÚÄÚ´æºÍÓ²ÅÌÖÐ
val lines = ssc.socketTextStream(args(0),
args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.map(_.split(",")).filter(_.length
== 6)
val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_
+ _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
|

1.4.3 ÔËÐдúÂë
µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷
Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢Ë͵ÚÎå¿Î/home/hadoop/upload/class5/saledataĿ¼ÏµÄtbStockDetail.txtÊý¾ÝÎļþ£¨²Î¼ûµÚÎå¿Î¡¶5.Hive£¨Ï£©--Hiveʵս¡·ÖÐ2.1.2Êý¾ÝÃèÊö£¬¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class5/saledataÖÐÕÒµ½£©£¬ÆäÖбítbStockDetail×ֶηֱðΪ¶©µ¥ºÅ¡¢Ðкš¢»õÆ·¡¢ÊýÁ¿¡¢½ð¶î£¬Êý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ãë
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation
/home/hadoop/upload/class5 /saledata/tbStockDetail.txt
9999 1000

ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1ºÍ¶Ë¿ÚºÅΪ9999

1.4.4 ²é¿´½á¹û
µÚÒ»²½ ¹Û²ìÄ£ÄâÆ÷·¢ËÍÇé¿ö
IDEAÖеÄSpark Streaming³ÌÐòÔËÐÐÓëÄ£ÄâÆ÷½¨Á¢Á¬½Ó£¬µ±Ä£ÄâÆ÷¼ì²âµ½ÍⲿÁ¬½Óʱ¿ªÊ¼·¢ËÍÏúÊÛÊý¾Ý£¬Ê±¼ä¼ä¸ôΪ1Ãë

µÚ¶þ²½ IDEAÔËÐÐÇé¿ö
ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲쵽ÿ5ÃëÔËÐÐÒ»´Î×÷Òµ£¨Á½´ÎÔËÐмä¸ôΪ5000ºÁÃ룩£¬ÔËÐÐÍê±Ïºó´òÓ¡¸Ãʱ¼ä¶ÎÄÚÏúÊÛÊý¾Ý×ܺ͡£

µÚÈý²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö
ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ5ÃëÔËÐÐÒ»´Î×÷Òµ

1.5 ʵÀý4£ºStatefulÑÝʾ
1.5.1 ÑÝʾ˵Ã÷
¸ÃʵÀýΪSpark Streaming״̬²Ù×÷£¬Ä£ÄâÊý¾ÝÓÉ4.1Á÷Êý¾ÝÄ£ÄâÒÔ1ÃëµÄƵ¶È·¢ËÍ£¬Spark
Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿5ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡³ÌÐòÆô¶¯ºóµ¥´Ê³öÏֵįµ¶È£¬Ïà±È½ÏÇ°Ãæ4.3ʵÀýÔÚ¸ÃʵÀýÖи÷ʱ¼ä¶ÎÖ®¼ä״̬ÊÇÏà¹ØµÄ¡£
1.5.2 ÑÝʾ´úÂë
import org.apache.log4j. {Level,
Logger}
import org.apache.spark. {SparkContext, SparkConf}
import org.apache.spark.streaming. {Seconds,
StreamingContext}
import org.apache.spark.streaming. StreamingContext._
object StatefulWordCount {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println ("Usage: StatefulWordCount
<filename> <port> ")
System.exit(1)
}
Logger.getLogger ("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger ("org.eclipse.jetty.server" ).setLevel(Level.OFF)
// ¶¨Òå¸üÐÂ״̬·½·¨£¬ ²ÎÊývaluesΪµ±Ç°Åú´Îµ¥´ÊƵ¶È£¬stateΪÒÔÍùÅú´Îµ¥´ÊƵ¶È
val updateFunc = (values: Seq[Int], state:
Option[Int]) => {
val currentCount = values.foldLeft(0)(_ +
_)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val conf = new SparkConf( ).setAppName("StatefulWordCount" ).setMaster("local[2]")
val sc = new SparkContext(conf)
// ´´½¨StreamingContext£¬ Spark SteamingÔËÐÐʱ¼ä¼ä¸ôΪ5Ãë
val ssc = new StreamingContext(sc, Seconds(5))
// ¶¨ÒåcheckpointĿ¼Ϊµ±Ç°Ä¿Â¼
ssc.checkpoint(".")
// »ñÈ¡´ÓSocket·¢Ë͹ýÀ´Êý¾Ý
val lines = ssc.socketTextStream(args(0),
args(1).toInt)
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1))
// ʹÓÃupdateStateByKeyÀ´¸üÐÂ״̬£¬Í³¼Æ´ÓÔËÐпªÊ¼ÒÔÀ´µ¥´Ê×ܵĴÎÊý
val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
} |

1.5.3 ÔËÐдúÂë
µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷
Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢ËÍ/home/hadoop/upload/class7Ŀ¼ÏµÄpeople.txtÊý¾ÝÎļþ£¨¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class7ÖÐÕÒµ½£©£¬ÆäÖÐpeople.txtÊý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ãë
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation
/home/hadoop/upload/class7/people.txt 9999 1000

ÔÚûÓгÌÐòÁ¬½Óʱ£¬¸Ã³ÌÐò´¦ÓÚ×èÈû״̬£¬ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò
ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1ºÍ¶Ë¿ÚºÅΪ9999

1.5.4 ²é¿´½á¹û
µÚÒ»²½ IDEAÔËÐÐÇé¿ö
ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲쵽µÚÒ»´ÎÔËÐÐͳ¼Æµ¥´Ê×ÜÊýΪ1£¬µÚ¶þ´ÎΪ6£¬µÚN´ÎΪ5(N-1)+1£¬¼´Í³¼Æµ¥´ÊµÄ×ÜÊýΪ³ÌÐòÔËÐе¥´ÊÊý×ܺ͡£

µÚ¶þ²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö
ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ5ÃëÔËÐÐÒ»´Î×÷Òµ

µÚÈý²½ ²é¿´CheckPointÇé¿ö
ÔÚÏîÄ¿¸ùĿ¼Ï¿ÉÒÔ¿´µ½checkpointÎļþ

1.6 ʵÀý5£ºWindowÑÝʾ
1.6.1 ÑÝʾ˵Ã÷
¸ÃʵÀýΪSpark Streaming´°¿Ú²Ù×÷£¬Ä£ÄâÊý¾ÝÓÉ4.1Á÷Êý¾ÝÄ£ÄâÒÔ1ÃëµÄƵ¶È·¢ËÍ£¬Spark
Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿10ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡³ÌÐòÆô¶¯ºóµ¥´Ê³öÏֵįµ¶È¡£Ïà±ÈÇ°ÃæµÄʵÀý£¬Spark
Streaming´°¿Úͳ¼ÆÊÇͨ¹ýreduceByKeyAndWindow()·½·¨ÊµÏֵģ¬Ôڸ÷½·¨ÖÐÐèÒªÖ¸¶¨´°¿Úʱ¼ä³¤¶ÈºÍ»¬¶¯Ê±¼ä¼ä¸ô¡£
1.6.2 ÑÝʾ´úÂë
import org.apache.log4j. {Level,
Logger}
import org.apache.spark .{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowWordCount {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println ("Usage: WindowWorldCount
<filename> <port> <windowDuration>
<slideDuration>")
System.exit(1)
}
Logger.getLogger ("org.apache.spark").setLevel(
Level.ERROR)
Logger.getLogger ("org.eclipse.jetty.server").setLevel(
Level.OFF)
val conf = new SparkConf().setAppName ("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
// ´´½¨StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
// ¶¨ÒåcheckpointĿ¼Ϊµ±Ç°Ä¿Â¼
ssc.checkpoint(".")
// ͨ¹ýSocket»ñÈ¡Êý¾Ý£¬¸Ã´¦ÐèÒªÌṩSocketµÄÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬Êý¾Ý±£´æÔÚÄÚ´æºÍÓ²ÅÌÖÐ
val lines = ssc.socketTextStream(args(0),
args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))
// windows²Ù×÷£¬µÚÒ»ÖÖ·½Ê½Îªµþ¼Ó´¦Àí£¬µÚ¶þÖÖ·½Ê½ÎªÔöÁ¿´¦Àí
val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow( (a:Int,b:Int)
=> (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_,
_-_,Seconds(args(2).toInt), Seconds(args(3).toInt))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
} |

1.6.3 ÔËÐдúÂë
µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷
Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢ËÍ/home/hadoop/upload/class7Ŀ¼ÏµÄpeople.txtÊý¾ÝÎļþ£¨¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class7ÖÐÕÒµ½£©£¬ÆäÖÐpeople.txtÊý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ãë
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation
/home/hadoop/upload/class7/people.txt 9999 1000

ÔÚûÓгÌÐòÁ¬½Óʱ£¬¸Ã³ÌÐò´¦ÓÚ×èÈû״̬£¬ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò
ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1¡¢¶Ë¿ÚºÅΪ9999¡¢Ê±¼ä´°¿ÚΪ30ÃëºÍ»¬¶¯Ê±¼ä¼ä¸ô10Ãë

1.6.4 ²é¿´½á¹û
µÚÒ»²½ IDEAÔËÐÐÇé¿ö
ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲쵽µÚÒ»´ÎÔËÐÐͳ¼Æµ¥´Ê×ÜÊýΪ4£¬µÚ¶þ´ÎΪ14£¬µÚN´ÎΪ10(N-1)+4£¬¼´Í³¼Æµ¥´ÊµÄ×ÜÊýΪ³ÌÐòÔËÐе¥´ÊÊý×ܺ͡£

µÚ¶þ²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö
ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ10ÃëÔËÐÐÒ»´Î×÷Òµ

|