Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
SparkÈëÃÅʵսϵÁУ¨Ï£©--ʵʱÁ÷¼ÆËãSpark Streamingʵս
 
×÷ÕߣºÊ¯É½Ô°
 
  7260  次浏览      28
2020-9-30 
 
±à¼­ÍƼö:
±¾ÎÄÖ÷Òª½éÉÜÁË Á÷Êý¾ÝÄ£ÄâÆ÷¡¢¶ÁÈ¡ÎļþÑÝʾ¡¢ÍøÂçÊý¾ÝÑÝʾ¡¢ÏúÊÛÊý¾Ýͳ¼ÆÑÝʾµÈÏà¹ØÄÚÈÝ

±¾ÎÄÀ´×Ô²©¿ÍÔ°£¬ÓÉ»ðÁú¹ûÈí¼þAnna±à¼­¡¢ÍƼö¡£

1¡¢ÊµÀýÑÝʾ

1.1 Á÷Êý¾ÝÄ£ÄâÆ÷

1.1.1 Á÷Êý¾Ý˵Ã÷

ÔÚʵÀýÑÝʾÖÐÄ£Äâʵ¼ÊÇé¿ö£¬ÐèÒªÔ´Ô´²»¶ÏµØ½ÓÈëÁ÷Êý¾Ý£¬ÎªÁËÔÚÑÝʾ¹ý³ÌÖиü½Ó½üÕæÊµ»·¾³½«¶¨ÒåÁ÷Êý¾ÝÄ£ÄâÆ÷¡£¸ÃÄ£ÄâÆ÷Ö÷Òª¹¦ÄÜ£ºÍ¨¹ýSocket·½Ê½¼àÌýÖ¸¶¨µÄ¶Ë¿ÚºÅ£¬µ±Íⲿ³ÌÐòͨ¹ý¸Ã¶Ë¿ÚÁ¬½Ó²¢ÇëÇóÊý¾Ýʱ£¬Ä£ÄâÆ÷½«¶¨Ê±½«Ö¸¶¨µÄÎļþÊý¾ÝËæ»ú»ñÈ¡·¢Ë͸øÍⲿ³ÌÐò¡£

1.1.2 Ä£ÄâÆ÷´úÂë

import java.io.{PrintWriter}

import java.net.ServerSocket

import scala.io.Source

object StreamingSimulation {

// ¶¨ÒåËæ»ú»ñÈ¡ÕûÊýµÄ·½·¨

def index(length: Int) = {

import java.util.Random

val rdm = new Random

rdm.nextInt(length)

}

def main(args: Array[String]) {

// µ÷ÓøÃÄ£ÄâÆ÷ÐèÒªÈý¸ö²ÎÊý£¬·ÖΪΪÎļþ·¾¶¡¢¶Ë¿ÚºÅºÍ¼ä¸ôʱ¼ä£¨µ¥Î»£ººÁÃ룩

if (args.length != 3) {

System.err.println("Usage: <filename> <port> <millisecond>")

System.exit(1)

}

// »ñȡָ¶¨Îļþ×ܵÄÐÐÊý

val filename = args(0)

val lines = Source.fromFile(filename).getLines.toList

val filerow = lines.length

// Ö¸¶¨¼àÌýij¶Ë¿Ú£¬µ±Íⲿ³ÌÐòÇëÇóʱ½¨Á¢Á¬½Ó

val listener = new ServerSocket(args(1).toInt)

while (true) {

val socket = listener.accept()

new Thread() {

override def run = {

println("Got client connected from: " + socket.getInetAddress)

val out = new PrintWriter(socket.getOutputStream(), true)

while (true) {

Thread.sleep(args(2).toLong)

// µ±¸Ã¶Ë¿Ú½ÓÊÜÇëÇóʱ£¬Ëæ»ú»ñȡijÐÐÊý¾Ý·¢Ë͸ø¶Ô·½

val content = lines(index(filerow))

println(content)

out.write(content + '\n')

out.flush()

}

socket.close()

}

}.start()

}

}

}

 

1.1.3 Éú³É´ò°üÎļþ

¡¾×¢¡¿¿ÉÒԲμûµÚ3¿Î¡¶Spark±à³ÌÄ£ÐÍ£¨Ï£©--IDEA´î½¨¼°ÊµÕ½¡·½øÐдò°ü

ÔÚ´ò°üÅäÖýçÃæÖУ¬ÐèÒªÔÚClass Path¼ÓÈ룺/app/scala- 2.10.4/lib/scala-swing.jar /app/scala- 2.10.4/lib/scala-library.jar /app/scala-2.10.4/lib/scala-actors.jar £¬¸÷¸öjar°üÖ®¼äÓÿոñ·Ö¿ª£¬

µã»÷²Ëµ¥Build->Build Artifacts£¬µ¯³öÑ¡Ôñ¶¯×÷£¬Ñ¡ÔñBuild»òÕßRebuild¶¯×÷£¬Ê¹ÓÃÈçÏÂÃüÁî¸´ÖÆ´ò°üÎļþµ½Spark¸ùĿ¼ÏÂ

cd /home/hadoop/IdeaProjects /out/artifacts/LearnSpark_jar

cp LearnSpark.jar /app /hadoop/spark-1.1.0/

ll /app/hadoop/spark-1.1.0/

1.2 ʵÀý1£º¶ÁÈ¡ÎļþÑÝʾ

1.2.1 ÑÝʾ˵Ã÷

ÔÚ¸ÃʵÀýÖÐSpark Streaming½«¼à¿ØÄ³Ä¿Â¼ÖеÄÎļþ£¬»ñÈ¡ÔÚ¼ä¸ôʱ¼ä¶ÎÄڱ仯µÄÊý¾Ý£¬È»ºóͨ¹ýSpark Streaming¼ÆËã³ö¸Äʱ¼ä¶ÎÄÚµ¥´Êͳ¼ÆÊý¡£

1.2.2 ÑÝʾ´úÂë

import org.apache.spark.SparkConf

import org.apache.spark.streaming. {Seconds, StreamingContext}

import org.apache.spark .streaming.StreamingContext._

object FileWordCount {

def main(args: Array[String]) {

val sparkConf = new SparkConf( ).setAppName ( "FileWordCount").setMaster ("local[2]")

// ´´½¨StreamingµÄÉÏÏÂÎÄ£¬°üÀ¨SparkµÄÅäÖúÍʱ¼ä¼ä¸ô£¬ÕâÀïʱ¼äΪ¼ä¸ô20Ãë

val ssc = new StreamingContext( sparkConf, Seconds(20))

// Ö¸¶¨¼à¿ØµÄĿ¼£¬ÔÚÕâÀïΪ/home/hadoop/temp/

val lines = ssc.textFileStream ("/home/hadoop/temp/")

// ¶ÔÖ¸¶¨Îļþ¼Ð±ä»¯µÄÊý¾Ý½øÐе¥´Êͳ¼Æ²¢ÇÒ´òÓ¡

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

// Æô¶¯Streaming

ssc.start()

ssc.awaitTermination()

}

}

1.2.3 ÔËÐдúÂë

µÚÒ»²½ ´´½¨Streaming¼à¿ØÄ¿Â¼

´´½¨/home/hadoop/tempΪSpark Streaming¼à¿ØµÄĿ¼£¬Í¨¹ýÔÚ¸ÃĿ¼Öж¨Ê±Ìí¼ÓÎļþÄÚÈÝ£¬È»ºóÓÉSpark Streamingͳ¼Æ³öµ¥´Ê¸öÊý

µÚ¶þ²½ ʹÓÃÈçÏÂÃüÁîÆô¶¯Spark¼¯Èº

$cd /app/hadoop/spark-1.1.0

$sbin/start-all.sh

µÚÈý²½ ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò

ÔÚIDEAÖÐÔËÐиÃʵÀý£¬ÓÉÓÚ¸ÃʵÀýûÓÐÊäÈë²ÎÊý¹Ê²»ÐèÒªÅäÖòÎÊý£¬ÔÚÔËÐÐÈÕÖ¾Öн«¶¨Ê±´òӡʱ¼ä´Á¡£Èç¹ûÔÚ¼à¿ØÄ¿Â¼ÖмÓÈëÎļþÄÚÈÝ£¬½«Êä³öʱ¼ä´ÁµÄͬʱ½«Êä³öµ¥´Êͳ¼Æ¸öÊý¡£

1.2.4 Ìí¼ÓÎı¾¼°ÄÚÈÝ

1.2.5 ²é¿´½á¹û

µÚÒ»²½ ²é¿´IDEAÖÐÔËÐÐÇé¿ö

ÔÚIDEAµÄÔËÐÐÈÕÖ¾´°¿ÚÖУ¬¿ÉÒԹ۲쵽Êä³öʱ¼ä´ÁµÄͬʱ½«Êä³öµ¥´Êͳ¼Æ¸öÊý

µÚ¶þ²½ ͨ¹ýwebUI¼à¿ØÔËÐÐÇé¿ö

ÔÚhttp://hadoop1:4040¼à¿ØSpark StreamingÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ20ÃëÔËÐÐÒ»´Î×÷Òµ

²¢ÇÒÓëÆäËûÔËÐÐ×÷ÒµÏà±ÈÔÚ¼à¿Ø²Ëµ¥Ôö¼ÓÁË"Streaming"ÏîÄ¿£¬µã»÷¿ÉÒÔ¿´µ½¼à¿ØÄÚÈÝ£º

1.3 ʵÀý2£ºÍøÂçÊý¾ÝÑÝʾ

1.3.1 ÑÝʾ˵Ã÷

ÔÚ¸ÃʵÀýÖн«ÓÉ4.1Á÷Êý¾ÝÄ£ÄâÒÔ1ÃëµÄƵ¶È·¢ËÍÄ£ÄâÊý¾Ý£¬Spark Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿20ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡¸Ãʱ¼ä¶ÎÄÚÊý¾Ý³öÏֵįµ¶È£¬¼´ÔÚ¸÷´¦Àí¶Îʱ¼äÖ®¼ä״̬²¢ÎÞ¹ØÏµ¡£

1.3.2 ÑÝʾ´úÂë

import org.apache.spark. {SparkContext, SparkConf}

import org.apache.spark.streaming. {Milliseconds, Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.storage.StorageLevel

object NetworkWordCount {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName ("NetworkWordCount").setMaster( "local[2]")

val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(20))

// ͨ¹ýSocket»ñÈ¡Êý¾Ý£¬¸Ã´¦ÐèÒªÌṩSocketµÄÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬Êý¾Ý±£´æÔÚÄÚ´æºÍÓ²ÅÌÖÐ

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

// ¶Ô¶ÁÈëµÄÊý¾Ý½øÐзָ¼ÆÊý

val words = lines.flatMap(_.split (","))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()

}

}

 

1.3.3 ÔËÐдúÂë

µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷

Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢ËÍ/home/hadoop/upload/class7Ŀ¼ÏµÄpeople.txtÊý¾ÝÎļþ£¨¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class7ÖÐÕÒµ½£©£¬ÆäÖÐpeople.txtÊý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ã룬

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people. txt 9999 1000

ÔÚûÓгÌÐòÁ¬½Óʱ£¬¸Ã³ÌÐò´¦ÓÚ×èÈû״̬

µÚ¶þ²½ ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò

ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1ºÍ¶Ë¿ÚºÅΪ9999

1.3.4 ²é¿´½á¹û

µÚÒ»²½ ¹Û²ìÄ£ÄâÆ÷·¢ËÍÇé¿ö

IDEAÖеÄSpark Streaming³ÌÐòÔËÐÐÓëÄ£ÄâÆ÷½¨Á¢Á¬½Ó£¬µ±Ä£ÄâÆ÷¼ì²âµ½ÍⲿÁ¬½Óʱ¿ªÊ¼·¢ËͲâÊÔÊý¾Ý£¬Êý¾ÝÊÇËæ»úµÄÔÚÖ¸¶¨µÄÎļþÖлñȡһÐÐÊý¾Ý²¢·¢ËÍ£¬Ê±¼ä¼ä¸ôΪ1Ãë

µÚ¶þ²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö

ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ20ÃëÔËÐÐÒ»´Î×÷Òµ

µÚÈý²½ IDEAÔËÐÐÇé¿ö

ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲⵽µÄͳ¼Æ½á¹û£¬Í¨¹ý·ÖÎöÔÚSpark Streamingÿ¶Îʱ¼äÄÚµ¥´ÊÊýΪ20£¬ÕýºÃÊÇ20ÃëÄÚÿÃë·¢ËÍ×ÜÊý¡£

1.4 ʵÀý3£ºÏúÊÛÊý¾Ýͳ¼ÆÑÝʾ

1.4.1 ÑÝʾ˵Ã÷

ÔÚ¸ÃʵÀýÖн«ÓÉ4.1Á÷Êý¾ÝÄ£ÄâÆ÷ÒÔ1ÃëµÄƵ¶È·¢ËÍÄ£ÄâÊý¾Ý£¨ÏúÊÛÊý¾Ý£©£¬Spark Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿5ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡¸Ãʱ¼ä¶ÎÄÚÏúÊÛÊý¾Ý×ܺͣ¬ÐèҪעÒâµÄÊǸ÷´¦Àí¶Îʱ¼äÖ®¼ä״̬²¢ÎÞ¹ØÏµ¡£

1.4.2 ÑÝʾ´úÂë

import org.apache.log4j.{Level, Logger}

import org.apache.spark. {SparkContext, SparkConf}

import org.apache.spark.streaming. {Milliseconds, Seconds, StreamingContext}

import org.apache.spark.streaming .StreamingContext._

import org.apache.spark.storage.StorageLevel

object SaleAmount {

def main(args: Array[String]) {

if (args.length != 2) {

System.err.println ("Usage: SaleAmount <hostname> <port> ")

System.exit(1)

}

Logger.getLogger ("org.apache.spark").setLevel( Level.ERROR)

Logger.getLogger ("org.eclipse.jetty.server" ).setLevel(Level.OFF)

val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")

val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(5))

// ͨ¹ýSocket»ñÈ¡Êý¾Ý£¬¸Ã´¦ÐèÒªÌṩSocketµÄÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬Êý¾Ý±£´æÔÚÄÚ´æºÍÓ²ÅÌÖÐ

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

val words = lines.map(_.split(",")).filter(_.length == 6)

val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()

}

}

1.4.3 ÔËÐдúÂë

µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷

Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢Ë͵ÚÎå¿Î/home/hadoop/upload/class5/saledataĿ¼ÏµÄtbStockDetail.txtÊý¾ÝÎļþ£¨²Î¼ûµÚÎå¿Î¡¶5.Hive£¨Ï£©--Hiveʵս¡·ÖÐ2.1.2Êý¾ÝÃèÊö£¬¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class5/saledataÖÐÕÒµ½£©£¬ÆäÖбítbStockDetail×ֶηֱðΪ¶©µ¥ºÅ¡¢Ðкš¢»õÆ·¡¢ÊýÁ¿¡¢½ð¶î£¬Êý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ãë

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class5 /saledata/tbStockDetail.txt 9999 1000

ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1ºÍ¶Ë¿ÚºÅΪ9999

1.4.4 ²é¿´½á¹û

µÚÒ»²½ ¹Û²ìÄ£ÄâÆ÷·¢ËÍÇé¿ö

IDEAÖеÄSpark Streaming³ÌÐòÔËÐÐÓëÄ£ÄâÆ÷½¨Á¢Á¬½Ó£¬µ±Ä£ÄâÆ÷¼ì²âµ½ÍⲿÁ¬½Óʱ¿ªÊ¼·¢ËÍÏúÊÛÊý¾Ý£¬Ê±¼ä¼ä¸ôΪ1Ãë

µÚ¶þ²½ IDEAÔËÐÐÇé¿ö

ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲쵽ÿ5ÃëÔËÐÐÒ»´Î×÷Òµ£¨Á½´ÎÔËÐмä¸ôΪ5000ºÁÃ룩£¬ÔËÐÐÍê±Ïºó´òÓ¡¸Ãʱ¼ä¶ÎÄÚÏúÊÛÊý¾Ý×ܺ͡£

µÚÈý²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö

ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ5ÃëÔËÐÐÒ»´Î×÷Òµ

1.5 ʵÀý4£ºStatefulÑÝʾ

1.5.1 ÑÝʾ˵Ã÷

¸ÃʵÀýΪSpark Streaming״̬²Ù×÷£¬Ä£ÄâÊý¾ÝÓÉ4.1Á÷Êý¾ÝÄ£ÄâÒÔ1ÃëµÄƵ¶È·¢ËÍ£¬Spark Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿5ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡³ÌÐòÆô¶¯ºóµ¥´Ê³öÏֵįµ¶È£¬Ïà±È½ÏÇ°Ãæ4.3ʵÀýÔÚ¸ÃʵÀýÖи÷ʱ¼ä¶ÎÖ®¼ä״̬ÊÇÏà¹ØµÄ¡£

1.5.2 ÑÝʾ´úÂë

import org.apache.log4j. {Level, Logger}

import org.apache.spark. {SparkContext, SparkConf}

import org.apache.spark.streaming. {Seconds, StreamingContext}

import org.apache.spark.streaming. StreamingContext._

object StatefulWordCount {

def main(args: Array[String]) {

if (args.length != 2) {

System.err.println ("Usage: StatefulWordCount <filename> <port> ")

System.exit(1)

}

Logger.getLogger ("org.apache.spark").setLevel(Level.ERROR)

Logger.getLogger ("org.eclipse.jetty.server" ).setLevel(Level.OFF)

// ¶¨Òå¸üÐÂ״̬·½·¨£¬ ²ÎÊývaluesΪµ±Ç°Åú´Îµ¥´ÊƵ¶È£¬stateΪÒÔÍùÅú´Îµ¥´ÊƵ¶È

val updateFunc = (values: Seq[Int], state: Option[Int]) => {

val currentCount = values.foldLeft(0)(_ + _)

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)

}

val conf = new SparkConf( ).setAppName("StatefulWordCount" ).setMaster("local[2]")

val sc = new SparkContext(conf)

// ´´½¨StreamingContext£¬ Spark SteamingÔËÐÐʱ¼ä¼ä¸ôΪ5Ãë

val ssc = new StreamingContext(sc, Seconds(5))

// ¶¨ÒåcheckpointĿ¼Ϊµ±Ç°Ä¿Â¼

ssc.checkpoint(".")

// »ñÈ¡´ÓSocket·¢Ë͹ýÀ´Êý¾Ý

val lines = ssc.socketTextStream(args(0), args(1).toInt)

val words = lines.flatMap(_.split(","))

val wordCounts = words.map(x => (x, 1))

// ʹÓÃupdateStateByKeyÀ´¸üÐÂ״̬£¬Í³¼Æ´ÓÔËÐпªÊ¼ÒÔÀ´µ¥´Ê×ܵĴÎÊý

val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)

stateDstream.print()

ssc.start()

ssc.awaitTermination()

}

}

1.5.3 ÔËÐдúÂë

µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷

Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢ËÍ/home/hadoop/upload/class7Ŀ¼ÏµÄpeople.txtÊý¾ÝÎļþ£¨¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class7ÖÐÕÒµ½£©£¬ÆäÖÐpeople.txtÊý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ãë

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

ÔÚûÓгÌÐòÁ¬½Óʱ£¬¸Ã³ÌÐò´¦ÓÚ×èÈû״̬£¬ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò

ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1ºÍ¶Ë¿ÚºÅΪ9999

1.5.4 ²é¿´½á¹û

µÚÒ»²½ IDEAÔËÐÐÇé¿ö

ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲쵽µÚÒ»´ÎÔËÐÐͳ¼Æµ¥´Ê×ÜÊýΪ1£¬µÚ¶þ´ÎΪ6£¬µÚN´ÎΪ5(N-1)+1£¬¼´Í³¼Æµ¥´ÊµÄ×ÜÊýΪ³ÌÐòÔËÐе¥´ÊÊý×ܺ͡£

µÚ¶þ²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö

ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ5ÃëÔËÐÐÒ»´Î×÷Òµ

µÚÈý²½ ²é¿´CheckPointÇé¿ö

ÔÚÏîÄ¿¸ùĿ¼Ï¿ÉÒÔ¿´µ½checkpointÎļþ

1.6 ʵÀý5£ºWindowÑÝʾ

1.6.1 ÑÝʾ˵Ã÷

¸ÃʵÀýΪSpark Streaming´°¿Ú²Ù×÷£¬Ä£ÄâÊý¾ÝÓÉ4.1Á÷Êý¾ÝÄ£ÄâÒÔ1ÃëµÄƵ¶È·¢ËÍ£¬Spark Streamingͨ¹ýSocket½ÓÊÕÁ÷Êý¾Ý²¢Ã¿10ÃëÔËÐÐÒ»´ÎÓÃÀ´´¦Àí½ÓÊÕµ½Êý¾Ý£¬´¦ÀíÍê±Ïºó´òÓ¡³ÌÐòÆô¶¯ºóµ¥´Ê³öÏֵįµ¶È¡£Ïà±ÈÇ°ÃæµÄʵÀý£¬Spark Streaming´°¿Úͳ¼ÆÊÇͨ¹ýreduceByKeyAndWindow()·½·¨ÊµÏֵģ¬Ôڸ÷½·¨ÖÐÐèÒªÖ¸¶¨´°¿Úʱ¼ä³¤¶ÈºÍ»¬¶¯Ê±¼ä¼ä¸ô¡£

1.6.2 ÑÝʾ´úÂë

import org.apache.log4j. {Level, Logger}

import org.apache.spark .{SparkContext, SparkConf}

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {

def main(args: Array[String]) {

if (args.length != 4) {

System.err.println ("Usage: WindowWorldCount <filename> <port> <windowDuration> <slideDuration>")

System.exit(1)

}

Logger.getLogger ("org.apache.spark").setLevel( Level.ERROR)

Logger.getLogger ("org.eclipse.jetty.server").setLevel( Level.OFF)

val conf = new SparkConf().setAppName ("WindowWordCount").setMaster("local[2]")

val sc = new SparkContext(conf)

// ´´½¨StreamingContext

val ssc = new StreamingContext(sc, Seconds(5))

// ¶¨ÒåcheckpointĿ¼Ϊµ±Ç°Ä¿Â¼

ssc.checkpoint(".")

// ͨ¹ýSocket»ñÈ¡Êý¾Ý£¬¸Ã´¦ÐèÒªÌṩSocketµÄÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬Êý¾Ý±£´æÔÚÄÚ´æºÍÓ²ÅÌÖÐ

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)

val words = lines.flatMap(_.split(","))

// windows²Ù×÷£¬µÚÒ»ÖÖ·½Ê½Îªµþ¼Ó´¦Àí£¬µÚ¶þÖÖ·½Ê½ÎªÔöÁ¿´¦Àí

val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow( (a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))

//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))

wordCounts.print()

ssc.start()

ssc.awaitTermination()

}

}

1.6.3 ÔËÐдúÂë

µÚÒ»²½ Æô¶¯Á÷Êý¾ÝÄ£ÄâÆ÷

Æô¶¯4.1´ò°üºÃµÄÁ÷Êý¾ÝÄ£ÄâÆ÷£¬ÔÚ¸ÃʵÀýÖн«¶¨Ê±·¢ËÍ/home/hadoop/upload/class7Ŀ¼ÏµÄpeople.txtÊý¾ÝÎļþ£¨¸ÃÎļþ¿ÉÒÔÔÚ±¾ÏµÁÐÅäÌ××ÊԴĿ¼/data/class7ÖÐÕÒµ½£©£¬ÆäÖÐpeople.txtÊý¾ÝÄÚÈÝÈçÏ£º

Ä£ÄâÆ÷Socket¶Ë¿ÚºÅΪ9999£¬Æµ¶ÈΪ1Ãë

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

ÔÚûÓгÌÐòÁ¬½Óʱ£¬¸Ã³ÌÐò´¦ÓÚ×èÈû״̬£¬ÔÚIDEAÖÐÔËÐÐStreaming³ÌÐò

ÔÚIDEAÖÐÔËÐиÃʵÀý£¬¸ÃʵÀýÐèÒªÅäÖÃÁ¬½ÓSocketÖ÷»úÃûºÍ¶Ë¿ÚºÅ£¬ÔÚÕâÀïÅäÖòÎÊý»úÆ÷ÃûΪhadoop1¡¢¶Ë¿ÚºÅΪ9999¡¢Ê±¼ä´°¿ÚΪ30ÃëºÍ»¬¶¯Ê±¼ä¼ä¸ô10Ãë

1.6.4 ²é¿´½á¹û

µÚÒ»²½ IDEAÔËÐÐÇé¿ö

ÔÚIDEAµÄÔËÐд°¿ÚÖУ¬¿ÉÒԹ۲쵽µÚÒ»´ÎÔËÐÐͳ¼Æµ¥´Ê×ÜÊýΪ4£¬µÚ¶þ´ÎΪ14£¬µÚN´ÎΪ10(N-1)+4£¬¼´Í³¼Æµ¥´ÊµÄ×ÜÊýΪ³ÌÐòÔËÐе¥´ÊÊý×ܺ͡£

µÚ¶þ²½ ÔÚ¼à¿ØÒ³Ãæ¹Û²ìÖ´ÐÐÇé¿ö

ÔÚwebUIÉÏ¼à¿Ø×÷ÒµÔËÐÐÇé¿ö£¬¿ÉÒԹ۲쵽ÿ10ÃëÔËÐÐÒ»´Î×÷Òµ

 
   
7260 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]
 
×îÐÂÎÄÕÂ
´óÊý¾Ýƽ̨ϵÄÊý¾ÝÖÎÀí
ÈçºÎÉè¼ÆÊµÊ±Êý¾Ýƽ̨£¨¼¼Êõƪ£©
´óÊý¾Ý×ʲú¹ÜÀí×ÜÌå¿ò¼Ü¸ÅÊö
Kafka¼Ü¹¹ºÍÔ­Àí
ELK¶àÖּܹ¹¼°ÓÅÁÓ
×îпγÌ
´óÊý¾Ýƽ̨´î½¨Óë¸ßÐÔÄܼÆËã
´óÊý¾Ýƽ̨¼Ü¹¹ÓëÓ¦ÓÃʵս
´óÊý¾ÝϵͳÔËά
´óÊý¾Ý·ÖÎöÓë¹ÜÀí
Python¼°Êý¾Ý·ÖÎö
³É¹¦°¸Àý
ijͨÐÅÉ豸ÆóÒµ PythonÊý¾Ý·ÖÎöÓëÍÚ¾ò
Ä³ÒøÐÐ È˹¤ÖÇÄÜ+Python+´óÊý¾Ý
±±¾© Python¼°Êý¾Ý·ÖÎö
ÉñÁúÆû³µ ´óÊý¾Ý¼¼Êõƽ̨-Hadoop
ÖйúµçÐÅ ´óÊý¾Ýʱ´úÓëÏÖ´úÆóÒµµÄÊý¾Ý»¯ÔËӪʵ¼ù