| ±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚ²©¿ÍÔ°£¬±¾ÎĽáºÏÒ»¸öСÑùÀý£¬½øÐÐÄ£Ðͱà³Ì£¬´´½¨Êý¾Ý¿òÁ÷ºÍÊý¾Ý¼¯Á÷ÒÔ¼°¹ÜÀíÁ÷ʽ²éѯ£¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
|
|
¸ÅÀÀ
Structured Streaming ÊÇÒ»¸ö¿ÉÍØÕ¹£¬ÈÝ´íµÄ£¬»ùÓÚSpark SQLÖ´ÐÐÒýÇæµÄÁ÷´¦ÀíÒýÇæ¡£Ê¹ÓÃСÁ¿µÄ¾²Ì¬Êý¾ÝÄ£ÄâÁ÷´¦Àí¡£°éËæÁ÷Êý¾ÝµÄµ½À´£¬Spark
SQLÒýÇæ»áÖð½¥Á¬Ðø´¦ÀíÊý¾Ý²¢ÇÒ¸üнá¹ûµ½×îÖÕµÄTableÖС£Äã¿ÉÒÔÔÚSpark SQLÉÏÒýÇæÉÏʹÓÃDataSet/DataFrame
API´¦ÀíÁ÷Êý¾ÝµÄ¾Û¼¯£¬Ê¼þ´°¿Ú£¬ºÍÁ÷ÓëÅú´ÎµÄÁ¬½Ó²Ù×÷µÈ¡£×îºóStructured Streaming
ϵͳ¿ìËÙ£¬Îȶ¨£¬¶Ëµ½¶ËµÄÇ¡ºÃÒ»´Î±£Ö¤£¬Ö§³ÖÈÝ´íµÄ´¦Àí¡£
СÑùÀý
| import
org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split("
"))
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination() |
±à³ÌÄ£ÐÍ
½á¹¹»¯Á÷µÄ¹Ø¼ü˼ÏëÊǽ«ÊµÊ±Êý¾ÝÁ÷ÊÓΪһ¸öÁ¬Ðø¸½¼ÓµÄ±í
»ù±¾¸ÅÄî
½«ÊäÈëµÄÊý¾Ýµ±³ÉÒ»¸öÊäÈëµÄ±í¸ñ£¬Ã¿Ò»¸öÊý¾Ýµ±³ÉÊäÈë±íµÄÒ»¸öÐÂÐС£


"Output"ÊÇдÈëµ½Íⲿ´æ´¢µÄд·½Ê½£¬Ð´È뷽ʽÓв»Í¬µÄģʽ£º
1.Completeģʽ£º ½«Õû¸ö¸üбíдÈëµ½Íⲿ´æ´¢£¬Ð´ÈëÕû¸ö±íµÄ·½Ê½ÓÉ´æ´¢Á¬½ÓÆ÷¾ö¶¨¡£
2.Appendģʽ£ºÖ»ÓÐ×ÔÉϴδ¥·¢ºóÔÚ½á¹û±íÖи½¼ÓµÄÐÂÐн«±»Ð´ÈëÍⲿ´æ´¢Æ÷¡£Õâ½öÊÊÓÃÓÚ½á¹û±íÖеÄÏÖÓÐÐв»»á¸ü¸ÄµÄ²éѯ¡£
3.Updateģʽ£ºÖ»ÓÐ×ÔÉϴδ¥·¢ºóÔÚ½á¹û±íÖиüеÄÐн«±»Ð´ÈëÍⲿ´æ´¢Æ÷£¨ÔÚSpark
2.0ÖÐÉв»¿ÉÓã©¡£×¢Ò⣬ÕâÓëÍêȫģʽ²»Í¬£¬ÒòΪ´Ëģʽ²»Êä³öδ¸ü¸ÄµÄÐС£
´¦Àíʼþʱ¼äºÍÑÓ³ÙÊý¾Ý
ʼþʱ¼äÊÇǶÈëÔÚÊý¾Ý±¾ÉíÖеÄʱ¼ä¡£¶ÔÓÚÐí¶àÓ¦ÓóÌÐò£¬Äú¿ÉÄÜÏ£ÍûÔÚ´Ëʼþʱ¼ä²Ù×÷¡£ÀýÈ磬Èç¹ûÒª»ñÈ¡IoTÉ豸ÿ·ÖÖÓÉú³ÉµÄʼþÊý£¬Ôò¿ÉÄÜÐèҪʹÓÃÉú³ÉÊý¾ÝµÄʱ¼ä£¨¼´Êý¾ÝÖеÄʼþʱ¼ä£©£¬¶ø²»ÊÇSpark½ÓÊÕµÄʱ¼äËûÃÇ¡£´Ëʼþʱ¼äÔÚ´ËÄ£ÐÍÖзdz£×ÔÈ»µØ±íʾ
- À´×ÔÉ豸µÄÿ¸öʼþ¶¼ÊDZíÖеÄÒ»ÐУ¬Ê¼þʱ¼äÊǸÃÐÐÖеÄÒ»¸öÁÐÖµ¡£ÕâÔÊÐí»ùÓÚ´°¿ÚµÄ¾ÛºÏ£¨ÀýÈçÿ·ÖÖÓµÄʼþÊý£©½ö½öÊÇżÊýʱ¼äÁÐÉϵÄÌØÊâÀàÐ͵ķÖ×éºÍ¾ÛºÏ
- ÿ¸öʱ¼ä´°¿ÚÊÇÒ»¸ö×飬²¢ÇÒÿһÐпÉÒÔÊôÓÚ¶à¸ö´°¿Ú/×é¡£Òò´Ë£¬¿ÉÒÔÔÚ¾²Ì¬Êý¾Ý¼¯£¨ÀýÈ磬À´×ÔÊÕ¼¯µÄÉ豸ʼþÈÕÖ¾£©ÒÔ¼°Êý¾ÝÁ÷ÉÏÒ»Öµض¨ÒåÕâÖÖ»ùÓÚʼþʱ¼ä´°µÄ¾ÛºÏ²éѯ£¬Ê¹µÃÓû§µÄÉú»î¸üÈÝÒס£
´ËÍ⣬¸ÃÄ£ÐÍ×ÔÈ»µØ´¦Àí»ùÓÚÆäʼþʱ¼ä±ÈÔ¤ÆÚµ½´ïµÄÊý¾Ý¡£ÓÉÓÚSparkÕýÔÚ¸üнá¹û±í£¬Òò´Ëµ±´æÔÚÑÓ³ÙÊý¾Ýʱ£¬Ëü¿ÉÒÔÍêÈ«¿ØÖƸüоɾۺϣ¬ÒÔ¼°Çå³ý¾É¾ÛºÏÒÔÏÞÖÆÖмä״̬Êý¾ÝµÄ´óС¡£ÓÉÓÚSpark
2.1£¬ÎÒÃÇÖ§³Öˮӡ£¬ÔÊÐíÓû§Ö¸¶¨ºóÆÚÊý¾ÝµÄãÐÖµ£¬²¢ÔÊÐíÒýÇæÏàÓ¦µØÇå³ý¾ÉµÄ״̬¡£ÉÔºó½«ÔÚ¡°´°¿Ú²Ù×÷¡±²¿·ÖÖжԴ˽øÐÐÏêϸ˵Ã÷¡£
ÈÝ´íÓïÒå
Ìṩ¶Ëµ½¶ËµÄÒ»´ÎÐÔÓïÒåÊǽṹ»¯Á÷µÄÉè¼Æ±³ºóµÄ¹Ø¼üÄ¿±êÖ®Ò»¡£ÎªÁËʵÏÖÕâÒ»µã£¬ÎÒÃÇÉè¼ÆÁ˽ṹ»¯Á÷Ô´£¬½ÓÊÕÆ÷ºÍÖ´ÐÐÒýÇæ£¬ÒÔ¿É¿¿µØ¸ú×Ù´¦ÀíµÄÈ·ÇнøÕ¹£¬ÒÔ±ãËü¿ÉÒÔͨ¹ýÖØÐÂÆô¶¯ºÍ/»òÖØÐ´¦ÀíÀ´´¦ÀíÈκÎÀàÐ͵ĹÊÕÏ¡£¼Ù¶¨Ã¿¸öÁ÷Ô´¾ßÓÐÆ«ÒÆÁ¿£¨ÀàËÆÓÚKafkaÆ«ÒÆÁ¿»òKinesisÐòÁкţ©ÒÔ¸ú×ÙÁ÷ÖеĶÁȡλÖá£ÒýÇæÊ¹Óüì²éµãºÍԤдÈÕÖ¾À´¼Ç¼ÿ¸ö´¥·¢Æ÷ÖÐÕýÔÚ´¦ÀíµÄÊý¾ÝµÄÆ«ÒÆ·¶Î§¡£Á÷½ÓÊÕÆ÷±»Éè¼ÆÎªÓÃÓÚ´¦ÀíÔÙ´¦ÀíµÄÃݵȡ£½áºÏʹÓÿÉÖØ·ÅÔ´ºÍÃݵÈËÞ£¬½á¹¹»¯Á÷¿ÉÒÔÈ·±£ÔÚÈκιÊÕÏϵĶ˵½¶ËµÄÒ»´ÎÐÔÓïÒå¡£
ʹÓÃDataFrameºÍDataSet API
´ÓSpark 2.0¿ªÊ¼£¬DataFramesºÍDatasets¿ÉÒÔ±íʾ¾²Ì¬£¬ÓнçÊý¾Ý£¬ÒÔ¼°Á÷ʽ£¬ÎÞ½çÊý¾Ý¡£Ó뾲̬DataSets/
DataFramesÀàËÆ£¬Äú¿ÉÒÔʹÓù«¹²Èë¿Úµã SparkSession£¨Scala / Java
/ Python Îĵµ£©´ÓÁ÷Ô´´´½¨Á÷ DataFrames /DataSets£¬²¢¶ÔËüÃÇÓ¦ÓÃÓ뾲̬
DataFrames / DatasetsÏàͬµÄ²Ù×÷¡£Èç¹ûÄú²»ÊìϤ Datasets / DataFrames£¬Ç¿ÁÒ½¨ÒéÄúʹÓÃ
DataFrame / Dataset±à³ÌÖ¸ÄÏÊìϤËüÃÇ¡£
´´½¨Êý¾Ý¿òÁ÷ºÍÊý¾Ý¼¯Á÷
Streaming DataFrames ¿ÉÒÔͨ¹ý SparkSession.readStream£¨£©·µ»ØµÄ
DataStreamReader ½Ó¿Ú£¨Scala / Java / Python docs£©´´½¨¡£ÀàËÆÓÚÓÃÓÚ´´½¨¾²Ì¬DataFrameµÄ¶ÁÈ¡½Ó¿Ú£¬Äú¿ÉÒÔÖ¸¶¨Ô´
- Êý¾Ý¸ñʽ£¬Ä£Ê½£¬Ñ¡ÏîµÈµÄÏêϸÐÅÏ¢¡£
Êý¾ÝÔ´
ÔÚSpark 2.0£¬Óм¸¸öÄÚÖõÄÊý¾ÝÔ´£º
1.ÎļþÔ´£º½«Ð´ÈëĿ¼ÖеÄÎļþ¶ÁȡΪÊý¾ÝÁ÷¡£Ö§³ÖµÄÎļþ¸ñʽÓÐtext£¬csv£¬json£¬parquet¡£Çë²ÎÔÄDataStreamReader½çÃæµÄÎĵµÒÔ»ñÈ¡¸üеÄÁÐ±í£¬ÒÔ¼°Ã¿ÖÖÎļþ¸ñʽ֧³ÖµÄÑ¡Ïî¡£×¢Ò⣬Îļþ±ØÐëÔ×ӵطÅÖÃÔÚ¸ø¶¨Ä¿Â¼ÖУ¬ÔÚ´ó¶àÊýÎļþϵͳÖУ¬¿ÉÒÔͨ¹ýÎļþÒÆ¶¯²Ù×÷À´ÊµÏÖ¡£
2.KafkaÔ´£º´ÓkafkaÀÈ¡Êý¾Ý£¬Ö§³Ö kafka broker
versions 0.10.0 or higher. ´Ó kafka ¼¯³ÉÖ¸ÄÏ»ñÈ¡¸ü¶àÐÅÏ¢¡£
3.SocketÔ´£¨²âÊÔÓã©£º´ÓÌ×½Ó×ÖÁ¬½Ó¶ÁÈ¡UTF8Îı¾Êý¾Ý¡£¼àÌý·þÎñÆ÷Ì×½Ó×ÖÔÚÇý¶¯³ÌÐò¡£×¢Ò⣬ÕâÓ¦¸Ã½öÓÃÓÚ²âÊÔ£¬ÒòΪÕâ²»Ìṩ¶Ëµ½¶ËÈÝ´í±£Ö¤
ÕâЩʾÀýÉú³ÉÎÞÀàÐ͵ÄÁ÷ʽDataFrames£¬ÕâÒâζ×ÅÔÚ±àÒëʱ²»¼ì²éDataFrameµÄģʽ£¬½öÔÚÌá½»²éѯʱÔÚÔËÐÐʱ¼ì²é¡£Ò»Ð©²Ù×÷£¬Èçmap£¬flatMapµÈ£¬ÐèÒªÔÚ±àÒëʱ֪µÀÀàÐÍ¡£Òª×öµ½ÕâЩ£¬Äã¿ÉÒÔʹÓÃÓ뾲̬DataFrameÏàͬµÄ·½·¨½«ÕâЩÎÞÀàÐ͵ÄÁ÷DataFramesת»»ÎªÀàÐÍ»¯Á÷Êý¾Ý¼¯¡£Óйظü¶àÏêϸÐÅÏ¢£¬Çë²ÎÔÄSQL±à³ÌÖ¸ÄÏ¡£´ËÍ⣬ÓйØÖ§³ÖµÄÁ÷ýÌåÔ´µÄ¸ü¶àÏêϸÐÅÏ¢½«ÔÚÎĵµÖÐÉÔºóÌÖÂÛ¡£
Êý¾Ý¿ò/Êý¾Ý¼¯Á÷µÄÄ£Ê½ÍÆÀíºÍ·ÖÇø
ĬÈÏÇé¿öÏ£¬»ùÓÚÎļþµÄÔ´µÄ½á¹¹»¯Á÷ÒªÇóÄúÖ¸¶¨Ä£Ê½£¬¶ø²»ÊÇÒÀ¿¿Spark×Ô¶¯Íƶϡ£´ËÏÞÖÆÈ·±£¼´Ê¹ÔÚ·¢Éú¹ÊÕϵÄÇé¿öÏ£¬Ò»ÖµÄģʽҲ½«ÓÃÓÚÁ÷ʽ²éѯ¡£¶ÔÓÚÁÙʱÓÃÀý£¬¿ÉÒÔͨ¹ý½«
spark . sql . streaming . schemaInference ÉèÖÃΪtrueÀ´ÖØÐÂÆôÓÃÄ£Ê½ÍÆ¶Ï¡£
µ±ÃûΪ/ key = value /µÄ×ÓĿ¼´æÔÚʱ£¬·¢Éú·ÖÇø·¢ÏÖ£¬²¢ÇÒÁÐ±í½«×Ô¶¯µÝ¹éµ½ÕâЩĿ¼ÖС£Èç¹ûÕâЩÁгöÏÖÔÚÓû§ÌṩµÄģʽÖУ¬ËüÃǽ«ÓÉSpark¸ù¾ÝÕýÔÚ¶ÁÈ¡µÄÎļþµÄ·¾¶Ìî³ä¡£µ±²éѯ¿ªÊ¼Ê±£¬×é³É·ÖÇø·½°¸µÄĿ¼±ØÐë´æÔÚ£¬²¢ÇÒ±ØÐë±£³Ö¾²Ì¬¡£ÀýÈ磬¿ÉÒÔÌí¼Ó/
data / year = 2016 / when / data / year = 2015 /´æÔÚ£¬µ«ÊǸü¸Ä·ÖÇøÁÐÊÇÎÞЧµÄ£¨¼´Í¨¹ý´´½¨Ä¿Â¼/
data / date = 2016-04-17 /£©¡£
Á÷ʽDataFrames/DatasetsÉϵIJÙ×÷
Äú¿ÉÒÔ¶ÔÁ÷ʽDataFrames /Êý¾Ý¼¯Ó¦Óø÷ÖÖ²Ù×÷ - ´ÓÎÞÀàÐÍ£¬ÀàËÆSQLµÄ²Ù×÷£¨ÀýÈçselect£¬where£¬groupBy£©µ½ÀàÐÍ»¯µÄRDDÀà²Ù×÷£¨ÀýÈçmap£¬filter£¬flatMap£©¡£Óйظü¶àÏêϸÐÅÏ¢£¬Çë²ÎÔÄSQL±à³ÌÖ¸ÄÏ¡£ÈÃÎÒÃÇÀ´¿´¿´Ò»Ð©Äã¿ÉÒÔʹÓõÄʾÀý²Ù×÷¡£
»ù±¾²Ù×÷ - Ñ¡Ôñ£¬Í¶Ó°£¬¾ÛºÏ
| case
class DeviceData(device: String, type: String,
signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame
with IOT device data with schema { device: string,
type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]
// streaming Dataset with IOT device data
// Select the devices which have signal more
than 10
df.select("device").where("signal
> 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) //
using typed APIs
// Running count of the number of updates for
each device type
df.groupBy("type").count() // using
untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal))
// using typed API |
ʼþʱ¼äÉϵĴ°¿Ú²Ù×÷
»¬¶¯Ê¼þʱ¼ä´°¿ÚÉϵľۺÏͨ¹ý½á¹¹»¯Á÷Ö±½Ó½øÐС£Àí½â»ùÓÚ´°¿ÚµÄ¾ÛºÏµÄ¹Ø¼ü˼ÏëÓë·Ö×é¾ÛºÏ·Ç³£ÏàËÆ¡£ÔÚ·Ö×é¾ÛºÏÖУ¬ÎªÓû§Ö¸¶¨µÄ·Ö×éÁÐÖеÄÿ¸öΨһֵά»¤¾ÛºÏÖµ£¨ÀýÈç¼ÆÊý£©¡£ÔÚ»ùÓÚ´°¿ÚµÄ¾ÛºÏµÄÇé¿öÏ£¬¶ÔÓÚÐеÄʼþʱ¼äÂäÈëµÄÿ¸ö´°¿Úά³Ö¾ÛºÏÖµ¡£ÈÃÎÒÃÇÓòåͼÀ´Àí½âÕâÒ»µã¡£
ÏëÏóһϣ¬ÎÒÃǵĿìËÙʾÀý±»Ð޸ģ¬Á÷ÏÖÔÚ°üº¬ÐÐÒÔ¼°Éú³ÉÐеÄʱ¼ä¡£ÎÒÃDz»ÏëÔËÐÐ×ÖÊý£¬¶øÊǼÆËã10·ÖÖÓÄÚµÄ×ÖÊý£¬Ã¿5·ÖÖÓ¸üÐÂÒ»´Î¡£Ò²¾ÍÊÇ˵£¬ÔÚ10·ÖÖÓ´°¿Ú12£º00-12£º10,12£º05-12£º15,12£º10-12£º20µÈÖ®¼ä½ÓÊյĴÊÖеÄ×ÖÊý¡£×¢Ò⣬12:00
-12£º10Òâζ×ÅÊý¾ÝÔÚ12:00Ö®ºóµ«ÔÚ12:10֮ǰµ½´ï¡£ÏÖÔÚ£¬¿¼ÂÇÔÚ12:07ÊÕµ½µÄÒ»¸ö×Ö¡£Õâ¸öµ¥´ÊÓ¦¸ÃÔö¼Ó¶ÔÓ¦ÓÚÁ½¸ö´°¿Ú12:00
- 12:10ºÍ12:05 - 12:15µÄ¼ÆÊý¡£Òò´Ë£¬¼ÆÊý½«Í¨¹ý·Ö×é¼ü£¨¼´×Ö£©ºÍ´°¿Ú£¨¿ÉÒÔ´Óʼþʱ¼ä¼ÆË㣩À´Ë÷Òý¡£
½á¹û±í½«ÈçÏÂËùʾ£º

ÓÉÓÚ´Ë´°¿ÚÀàËÆÓÚ·Ö×飬Òò´ËÔÚ´úÂëÖУ¬¿ÉÒÔʹÓÃgroupBy£¨£©ºÍwindow£¨£©²Ù×÷À´±íʾ´°¿Ú»¯¾ÛºÏ¡£Äú¿ÉÒÔÔÚScala
/ Java / PythonÖв鿴ÒÔÏÂʾÀýµÄÍêÕû´úÂë¡£
´¦ÀíÑÓ³ÙÊý¾ÝºÍˮλÏß
ÏÖÔÚ¿¼ÂÇÈç¹ûʼþÖеÄÒ»¸öµ½´ïÓ¦ÓóÌÐòµÄ³Ùµ½»á·¢Éúʲô¡£ÀýÈ磬ÀýÈ磬ÔÚ12:04£¨¼´Ê¼þʱ¼ä£©Éú³ÉµÄ´Ê¿ÉÒÔÓÉÓ¦ÓÃÔÚ12:11½ÓÊÕµ½¡£Ó¦ÓóÌÐòӦʹÓÃʱ¼ä12:04¶ø²»ÊÇ12:11À´¸üд°¿Ú12:00
- 12:10µÄ¾É¼ÆÊý¡£ÕâÔÚÎÒÃǵĻùÓÚ´°¿ÚµÄ·Ö×éÖÐ×ÔÈ»µØ·¢Éú - ½á¹¹»¯Á÷¿ÉÒÔ³¤Ê±¼äµØ±£³Ö²¿·Ö¾ÛºÏµÄÖмä״̬£¬Ê¹µÃÍíÆÚÊý¾Ý¿ÉÒÔÕýÈ·µØ¸üоɴ°¿ÚµÄ¾Û¼¯£¬ÈçÏÂËùʾ¡£

µ«ÊÇ£¬ÒªÔËÐд˲éѯµÄÌìÊý£¬ÏµÍ³±ØÐë°ó¶¨ÆäÀÛ»ýµÄÖмäÄÚ´æÖÐ״̬µÄÊýÁ¿¡£ÕâÒâζ×ÅϵͳÐèÒªÖªµÀºÎʱ¿ÉÒÔ´ÓÄÚ´æÖÐ״̬ɾ³ý¾É¾ÛºÏ£¬ÒòΪӦÓóÌÐò½«²»ÔÙ½ÓÊոþۺϵÄÑÓ³ÙÊý¾Ý¡£ÎªÁËʵÏÖÕâÒ»µã£¬ÔÚSpark
2.1ÖУ¬ÎÒÃÇÒýÈëÁËˮӡ£¬ÈÃÎÒÃǵÄÒýÇæ×Ô¶¯¸ú×ÙÊý¾ÝÖеĵ±Ç°Ê¼þʱ¼ä£¬²¢³¢ÊÔÏàÓ¦µØÇåÀí¾ÉµÄ״̬¡£Äú¿ÉÒÔͨ¹ýÖ¸¶¨Ê¼þʱ¼äÁк͸ù¾Ýʼþʱ¼äÔ¤¼ÆÊý¾ÝÑÓ³ÙµÄãÐÖµÀ´¶¨Òå²éѯµÄˮӡ¡£¶ÔÓÚÔÚʱ¼äT¿ªÊ¼µÄÌØ¶¨´°¿Ú£¬ÒýÇæ½«±£³Ö״̬²¢ÔÊÐíºóÆÚÊý¾Ý¸üÐÂ״̬£¬Ö±µ½£¨ÓÉÒýÇæ¿´µ½µÄ×î´óʼþʱ¼ä
- ºóÆÚãÐÖµ> T£©¡£»»¾ä»°Ëµ£¬ãÐÖµÄÚµÄÍíÊý¾Ý½«±»¾ÛºÏ£¬µ«ÍíÓÚãÐÖµµÄÊý¾Ý½«±»¶ªÆú¡£ÈÃÎÒÃÇÓÃÒ»¸öÀý×ÓÀ´Àí½âÕâ¸ö¡£ÎÒÃÇ¿ÉÒÔʹÓÃwithWatermark£¨£©ÔÚÉÏÃæµÄÀý×ÓÖÐÇáËɶ¨Òåˮӡ£¬ÈçÏÂËùʾ¡£
| import
spark.implicits._
val words = ... // streaming DataFrame of schema
{ timestamp: Timestamp, word: String }
// Group the data by window and word and compute
the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10
minutes")
.groupBy(
window($"timestamp", "10 minutes",
"5 minutes"),
$"word")
.count() |
ÔÚÕâ¸öÀý×ÓÖУ¬ÎÒÃǶ¨Òå²éѯµÄˮӡ¶ÔÁС°timestamp¡±µÄÖµ£¬²¢ÇÒ»¹¶¨Òå¡°10·ÖÖÓ¡±×÷ΪÔÊÐíÊý¾Ý³¬Ê±µÄãÐÖµ¡£Èç¹û´Ë²éѯÔÚAppendÊä³öģʽ£¨ÉÔºóÔÚ¡°Êä³öģʽ¡±²¿·ÖÖÐÌÖÂÛ£©ÖÐÔËÐУ¬ÔòÒýÇæ½«´ÓÁС°timestamp¡±¸ú×Ùµ±Ç°Ê¼þʱ¼ä£¬²¢ÔÚ×îÖÕÈ·¶¨´°¿Ú¼ÆÊýºÍÌí¼Ó֮ǰµÈ´ýʼþʱ¼äµÄ¶îÍâ¡°10·ÖÖÓ¡±ËûÃǵ½½á¹û±í¡£ÕâÊÇÒ»¸öÀýÖ¤¡£
ÈçͼËùʾ£¬ÓÉÒýÇæ¸ú×ÙµÄ×î´óʼþʱ¼äÊÇÀ¶É«ÐéÏߣ¬²¢ÇÒÔÚÿ¸ö´¥·¢µÄ¿ªÊ¼´¦ÉèÖÃΪ£¨×î´óʼþʱ¼ä - '10·ÖÖÓ'£©µÄˮӡÊǺìÉ«Ïß¡£ÀýÈ磬µ±ÒýÇæ¹Û²ìÊý¾Ý£¨12:14£¬¹·£©£¬Ëü½«ÏÂÒ»¸ö´¥·¢Æ÷µÄˮӡÉèÖÃΪ12:04¡£¶ÔÓÚ´°¿Ú12:00
- 12:10£¬²¿·Ö¼ÆÊý±£³ÖΪÄÚ²¿×´Ì¬£¬¶øÏµÍ³ÕýÔڵȴýÑÓ³ÙÊý¾Ý¡£ÔÚϵͳ·¢ÏÖÊý¾Ý£¨¼´£¨12:21£¬owl£©£©Ê¹µÃˮӡ³¬¹ý12:10Ö®ºó£¬²¿·Ö¼ÆÊý±»×îÖÕÈ·¶¨²¢¸½¼Óµ½±í¡£´Ë¼ÆÊý½«²»»á½øÒ»²½¸ü¸Ä£¬ÒòΪËùÓг¬¹ý12:10µÄ¡°Ì«Íí¡±Êý¾Ý½«±»ºöÂÔ¡£
Çë×¢Ò⣬ÔÚ×·¼ÓÊä³öģʽÏ£¬ÏµÍ³±ØÐëµÈ´ý¡°ÑÓ³ÙãÐÖµ¡±Ê±¼ä²ÅÄÜÊä³ö´°¿ÚµÄ¾ÛºÏ¡£Èç¹ûÊý¾Ý¿ÉÄܺÜÍí£¬£¨ÀýÈç1Ì죩£¬²¢ÇÒÄúÏ£Íû²¿·Ö¼ÆÊý¶ø²»µÈ´ýÒ»Ì죬Õâ¿ÉÄܲ»ÊÇÀíÏëµÄ¡£½«À´£¬ÎÒÃǽ«Ìí¼Ó¸üÐÂÊä³öģʽ£¬Õ⽫ÔÊÐíÿ´Î¸üоۺÏдÈ뵽ÿ¸ö´¥·¢Æ÷¡£
ÓÃÓÚÇå³ý¾ÛºÏ״̬µÄˮӡµÄÌõ¼þÖØÒªµÄÊÇҪעÒ⣬ˮӡӦµ±Âú×ãÒÔÏÂÌõ¼þÒÔÇå³ý¾ÛºÏ²éѯÖеÄ״̬£¨´ÓSpark
2.1¿ªÊ¼£¬½«À´»á¸Ä±ä£©¡£
1.Êä³öģʽ±ØÐëΪ׷¼Ó¡£Íê³ÉģʽҪÇó±£ÁôËùÓоۺÏÊý¾Ý£¬Òò´Ë²»ÄÜʹÓÃˮӡÀ´É¾³ýÖмä״̬¡£ÓйØÃ¿ÖÖÊä³öģʽµÄÓïÒåµÄÏêϸ˵Ã÷£¬Çë²Î¼û¡°Êä³öģʽ¡±²¿·Ö¡£
2.¾ÛºÏ±ØÐë¾ßÓÐʼþʱÁУ¬»òʼþʱÁÐÉϵĴ°¿Ú¡£
3.withWatermark±ØÐëÔÚÓë¾ÛºÏÖÐʹÓõÄʱ¼ä´ÁÁÐÏàͬµÄÁÐÉϵ÷Óá£ÀýÈ磬
df.withWatermark £¨¡°time¡±£¬¡°1 min¡±£©¡£groupBy£¨¡°time2¡±£©¡£count£¨£©ÔÚAppendÊä³öģʽÏÂÎÞЧ£¬ÒòΪˮӡÊÇÔÚÓë¾ÛºÏÁв»Í¬µÄÁÐÉ϶¨ÒåµÄ¡£
4.ÆäÖÐÔÚҪʹÓÃˮӡϸ½ÚµÄ¾ÛºÏ֮ǰ±ØÐëµ÷ÓÃwithWatermark¡£ÀýÈ磬
df . groupBy £¨¡°time¡±£©. count £¨£© . withWatermark £¨
¡° time ¡± £¬¡° 1 min ¡±£© ÔÚAppendÊä³öģʽÖÐ ÎÞЧ¡£
Join²Ù×÷
Á÷DataFrames¿ÉÒÔÓ뾲̬DataFramesÁ¬½ÓÒÔ´´½¨ÐµÄÁ÷DataFrames¡£ÕâÀïÓм¸¸öÀý×Ó¡£
| val
staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")
// inner equi-join with a static DF
streamingDf.join(staticDf, "type",
"right_join") // right outer join
with a static DF |
²»Ö§³ÖµÄ²Ù×÷
µ«ÊÇ£¬Çë×¢Ò⣬ËùÓÐÊÊÓÃÓÚ¾²Ì¬DataFrames /Êý¾Ý¼¯µÄ²Ù×÷ÔÚÁ÷ʽDataFrames /Êý¾Ý¼¯Öв»ÊÜÖ§³Ö¡£ËäÈ»ÕâЩ²»Ö§³ÖµÄ²Ù×÷ÖеÄһЩ½«ÔÚδÀ´µÄSpark°æ±¾Öеõ½Ö§³Ö£¬µ«»¹ÓÐһЩ»ù±¾ÉÏÄÑÒÔÓÐЧµØÔÚÁ÷Êý¾ÝÉÏʵÏÖ¡£ÀýÈ磬ÊäÈëÁ÷Êý¾Ý¼¯²»Ö§³ÖÅÅÐò£¬ÒòΪËüÐèÒª¸ú×ÙÁ÷ÖнÓÊÕµÄËùÓÐÊý¾Ý¡£Òò´Ë£¬ÕâÔÚ¸ù±¾ÉÏÄÑÒÔÓÐЧµØÖ´ÐС£´ÓSpark
2.0¿ªÊ¼£¬Ò»Ð©²»ÊÜÖ§³ÖµÄ²Ù×÷ÈçÏ£º
1.ÔÚÁ÷Êý¾Ý¼¯ÉÏ»¹²»Ö§³Ö¶à¸öÁ÷¾Û¼¯£¨¼´£¬Á÷DFÉϵľۺÏÁ´£©¡£
2.ÔÚÁ÷Êý¾Ý¼¯Éϲ»Ö§³ÖÏÞÖÆºÍ»ñȡǰNÐС£
3.²»Ö§³Ö¶ÔÁ÷Êý¾Ý¼¯½øÐв»Í¬²Ù×÷¡£
4.ÅÅÐò²Ù×÷½öÔھۺϺóÔÚÍêÕûÊä³öģʽÏÂÖ§³ÖÁ÷Êý¾Ý¼¯¡£
5.Ìõ¼þÖ§³ÖÁ÷ʽ´«ÊäºÍ¾²Ì¬Êý¾Ý¼¯Ö®¼äµÄÍâÁ¬½Ó¡£
6.²»Ö§³Ö´øÓÐÁ÷Êý¾Ý¼¯µÄÍêÈ«ÍâÁ¬½Ó
7.²»Ö§³Ö×óÍⲿÁ¬½ÓÓëÓÒ²àµÄÁ÷Êý¾Ý¼¯
8.²»Ö§³Ö×ó²àµÄÁ÷Êý¾Ý¼¯µÄÓÒÍⲿÁª½Ó
9.Éв»Ö§³ÖÁ½¸öÁ÷Êý¾Ý¼¯Ö®¼äµÄÈκÎÀàÐ͵ÄÁ¬½Ó¡£
´ËÍ⣬»¹ÓÐһЩDataset·½·¨²»ÄÜÓÃÓÚÁ÷Êý¾Ý¼¯¡£ËüÃÇÊǽ«Á¢¼´ÔËÐвéѯ²¢·µ»Ø½á¹ûµÄ²Ù×÷£¬Õâ¶ÔÁ÷Êý¾Ý¼¯Ã»ÓÐÒâÒå¡£Ïà·´£¬ÕâЩ¹¦ÄÜ¿ÉÒÔͨ¹ýÏÔʽµØÆô¶¯Á÷²éѯÀ´Íê³É£¨²Î¼ûÏÂÒ»²¿·Ö£©¡£
count£¨£© - ÎÞ·¨´ÓÁ÷Êý¾Ý¼¯·µ»Øµ¥¸ö¼ÆÊý¡£
Ïà·´£¬Ê¹ÓÃds.groupBy.count£¨£©·µ»Ø°üº¬ÔËÐмÆÊýµÄÁ÷Êý¾Ý¼¯¡£
foreach£¨£© - ¶øÊÇʹÓÃds.writeStream.foreach£¨...£©£¨²Î¼ûÏÂÒ»½Ú£©¡£
show£¨£© - ¶øÊÇʹÓÿØÖÆÌ¨½ÓÊÕÆ÷£¨Çë²ÎÔÄÏÂÒ»½Ú£©¡£
Èç¹ûÄú³¢ÊÔÈκÎÕâЩ²Ù×÷£¬Äú½«¿´µ½Ò»¸öAnalysisExceptionÈç¡°²Ù×÷XYZ²»Ö§³ÖÓëÁ÷DataFrames
/Êý¾Ý¼¯¡±¡£
Æô¶¯Á÷ʽ²éѯ
Ò»µ©¶¨ÒåÁË×îÖÕ½á¹ûDataFrame / Dataset£¬Ê£ÏµľÍÊÇÆô¶¯Á÷¼ÆË㡣Ϊ´Ë£¬Äú±ØÐëʹÓÃͨ¹ýDataset.writeStream£¨£©·µ»ØµÄDataStreamWriter£¨Scala
/ Java / PythonÎĵµ£©¡£Äú±ØÐëÔڴ˽çÃæÖÐÖ¸¶¨ÒÔÏÂÒ»¸ö»ò¶à¸ö¡£
Êä³ö½ÓÊÕÆ÷µÄÏêϸÐÅÏ¢£ºÊý¾Ý¸ñʽ£¬Î»ÖõÈ
Êä³öģʽ£ºÖ¸¶¨Ð´ÈëÊä³ö½ÓÊÕÆ÷µÄÄÚÈÝ¡£
²éѯÃû³Æ£º£¨¿ÉÑ¡£©Ö¸¶¨²éѯµÄΨһÃû³ÆÒÔ½øÐбêʶ¡£
´¥·¢¼ä¸ô£º¿ÉÑ¡ÔñÖ¸¶¨´¥·¢¼ä¸ô¡£Èç¹ûδָ¶¨£¬ÏµÍ³½«ÔÚÉÏÒ»¸ö´¦ÀíÍê³ÉºóÁ¢¼´¼ì²éÐÂÊý¾ÝµÄ¿ÉÓÃÐÔ¡£Èç¹ûÓÉÓÚÏÈǰ´¦ÀíÉÐδÍê³É¶ø´í¹ý´¥·¢Ê±¼ä£¬Ôòϵͳ½«³¢ÊÔÔÚÏÂÒ»´¥·¢µã´¦´¥·¢£¬¶ø²»ÊÇÔÚ´¦ÀíÍê³ÉÖ®ºóÁ¢¼´´¥·¢¡£
¼ì²éµãλÖ㺶ÔÓÚ¿ÉÒÔ±£Ö¤¶Ëµ½¶ËÈÝ´íµÄijЩÊä³ö½ÓÊÕÆ÷£¬ÇëÖ¸¶¨ÏµÍ³½«Ð´ÈëËùÓмì²éµãÐÅÏ¢µÄλÖá£ÕâÓ¦¸ÃÊÇHDFS¼æÈݵÄÈÝ´íÎļþϵͳÖеÄĿ¼¡£¼ì²éµãµÄÓïÒ彫ÔÚÏÂÒ»½ÚÖиüÏêϸµØÌÖÂÛ¡£
Êä³öģʽ
Óм¸ÖÖÀàÐ͵ÄÊä³öģʽ£º
1.¸½¼Óģʽ£¨Ä¬ÈÏ£© - ÕâÊÇĬÈÏģʽ£¬ÆäÖÐÖ»ÓÐ×ÔÉϴδ¥·¢ºóÌí¼Óµ½½á¹û±íÖеÄÐÂÐн«Êä³öµ½½ÓÊÕÆ÷¡£Õâ½öÖ§³ÖÄÇЩÌí¼Óµ½½á¹û±íÖеÄÐдӲ»»á¸ü¸ÄµÄ²éѯ¡£Òò´Ë£¬¸Ãģʽ±£Ö¤Ã¿ÐÐÖ»Êä³öÒ»´Î£¨¼ÙÉèÈÝ´íËÞ£©¡£ÀýÈ磬ֻÓÐselect£¬where£¬map£¬flatMap£¬filter£¬joinµÈµÄ²éѯ½«Ö§³ÖAppendģʽ¡£
2.Íê³Éģʽ - ÿ´Î´¥·¢ºó£¬Õû¸ö½á¹û±í½«Êä³öµ½½ÓÊÕÆ÷¡£¾ÛºÏ²éѯ֧³Ö´ËÑ¡Ïî¡£
3.¸üÐÂģʽ - £¨ÔÚSpark 2.1Öв»¿ÉÓã©Ö»Óнá¹û±íÖÐ×ÔÉϴδ¥·¢ºó¸üеÄÐвŻáÊä³öµ½½ÓÊÕÆ÷¡£¸ü¶àÐÅÏ¢½«ÔÚδÀ´°æ±¾ÖÐÌí¼Ó¡£
²»Í¬ÀàÐ͵ÄÁ÷²éѯ֧³Ö²»Í¬µÄÊä³öģʽ¡£ÕâÀïÊǼæÈÝÐÔ¾ØÕó£º

Êä³ö½ÓÊÕÆ÷
Óм¸ÖÖÀàÐ͵ÄÄÚÖÃÊä³ö½ÓÊÕÆ÷£º
1.Îļþ½ÓÊÕÆ÷ - ½«Êä³ö´æ´¢µ½Ä¿Â¼¡£
2.Foreach sink - ¶ÔÊä³öÖеļǼÔËÐÐÈÎÒâ¼ÆËã¡£ÓйØÏêϸÐÅÏ¢£¬Çë²ÎÔĺóÃæµÄ²¿·Ö¡£
3.¿ØÖÆÌ¨½ÓÊÕÆ÷£¨ÓÃÓÚµ÷ÊÔ£© - ÿ´ÎÓд¥·¢Æ÷ʱ½«Êä³ö´òÓ¡µ½¿ØÖÆÌ¨/
stdout¡£ÕâÓ¦¸ÃÓÃÓÚµÍÊý¾ÝÁ¿Éϵĵ÷ÊÔÄ¿µÄ£¬ÒòΪÿ´Î´¥·¢ºó£¬Õû¸öÊä³ö±»ÊÕ¼¯²¢´æ´¢ÔÚÇý¶¯³ÌÐòµÄÄÚ´æÖС£
4.ÄÚ´æ½ÓÊÕÆ÷£¨ÓÃÓÚµ÷ÊÔ£© - Êä³ö×÷ΪÄÚ´æ±í´æ´¢ÔÚÄÚ´æÖС£Ö§³Ö¸½¼ÓºÍÍê³ÉÊä³öģʽ¡£ÕâÓ¦¸ÃÓÃÓÚµÍÊý¾ÝÁ¿Éϵĵ÷ÊÔÄ¿µÄ£¬ÒòΪÿ´Î´¥·¢ºó£¬Õû¸öÊä³ö±»ÊÕ¼¯²¢´æ´¢ÔÚÇý¶¯³ÌÐòµÄÄÚ´æÖС£
ÏÂÃæÊÇËùÓнÓÊÕÆ÷µÄ±í¸ñºÍÏàÓ¦µÄÉèÖãº

×îºó£¬Äã±ØÐëµ÷ÓÃstart£¨£©²ÅÄÜÕæÕý¿ªÊ¼Ö´Ðвéѯ¡£Õâ·µ»ØÒ»¸öStreamingQuery¶ÔÏó£¬ËüÊÇÁ¬ÐøÔËÐеÄÖ´Ðеľä±ú¡£Äú¿ÉÒÔʹÓô˶ÔÏóÀ´¹ÜÀí²éѯ£¬ÎÒÃǽ«ÔÚÏÂһС½ÚÖÐÌÖÂÛ¡£ÏÖÔÚ£¬ÈÃÎÒÃÇͨ¹ý¼¸¸öÀý×ÓÀ´Àí½âÕâÒ»ÇС£
| //
========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal
> 10");
// Print new data to console
noAggDF
.writeStream()
.format("console")
.start();
// Write new data to Parquet files
noAggDF
.writeStream()
.parquet("path/to/destination/directory")
.start();
// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();
// Print updated aggregations to console
aggDF
.writeStream()
.outputMode("complete")
.format("console")
.start();
// Have all the aggregates in an in-memory
table
aggDF
.writeStream()
.queryName("aggregates") // this query
name will be the table name
.outputMode("complete")
.format("memory")
.start();
spark.sql("select * from aggregates").show();
// interactively query in-memory table |
ʹÓÃforeach
foreach²Ù×÷ÔÊÐí¶ÔÊä³öÊý¾Ý¼ÆËãÈÎÒâ²Ù×÷¡£´ÓSpark 2.1¿ªÊ¼£¬ÕâÖ»ÊÊÓÃÓÚScalaºÍJava¡£ÒªÊ¹ÓÃÕâ¸ö£¬Äã±ØÐëʵÏÖ½Ó¿ÚForeachWriter£¨Scala
/ Java docs£©£¬ËüÓÐÒ»¸ö·½·¨£¬µ±´¥·¢ºó²úÉúһϵÁÐÐÐ×÷ΪÊä³öʱ±»µ÷Óá£Çë×¢ÒâÒÔÏÂÒªµã¡£
±àдÆ÷±ØÐëÊÇ¿ÉÐòÁл¯µÄ£¬Òò- ΪËü½«±»ÐòÁл¯²¢·¢Ë͵½Ö´ÐÐÆ÷ÒÔ¹©Ö´ÐС£
ËùÓÐÈý¸ö·½·¨£¬´ò¿ª£¬´¦ÀíºÍ¹Ø±Õ½«±»µ÷ÓõÄÖ´ÐÐÕß¡£
Ö»Óе±µ÷ÓÃopen·½·¨Ê±£¬Ð´³ÌÐò±ØÐëÖ´ÐÐËùÓеijõʼ»¯£¨ÀýÈç´ò¿ªÁ¬½Ó£¬Æô¶¯ÊÂÎñµÈ£©¡£Çë×¢Ò⣬Èç¹ûÔÚ´´½¨¶ÔÏóʱÔÚÀàÖÐÓÐÈκγõʼ»¯£¬ÄÇô¸Ã³õʼ»¯½«ÔÚÇý¶¯³ÌÐòÖнøÐУ¨ÒòΪÕâÊÇ´´½¨ÊµÀýµÄµØ·½£©£¬Õâ¿ÉÄܲ»ÊÇÄúÏëÒªµÄ¡£
°æ±¾ºÍ·ÖÇøÊÇopenÖеÄÁ½¸ö²ÎÊý£¬ËüÃÇΨһµØ±íʾÐèÒª±»ÍƳöµÄÒ»×éÐС£°æ±¾ÊÇÒ»¸öµ¥µ÷Ôö¼ÓµÄid£¬Ëæ×Åÿ¸ö´¥·¢Æ÷Ôö¼Ó¡£partitionÊDZíʾÊä³öµÄ·ÖÇøµÄid£¬ÒòΪÊä³öÊÇ·Ö²¼Ê½µÄ£¬²¢ÇÒ½«ÔÚ¶à¸öÖ´ÐÐÆ÷ÉÏ´¦Àí¡£
open¿ÉÒÔʹÓð汾ºÍ·ÖÇøÀ´Ñ¡ÔñÊÇ·ñÐèҪдÐÐÐòÁС£Òò´Ë£¬Ëü¿ÉÒÔ·µ»Øtrue£¨¼ÌÐøÐ´È룩»òfalse£¨²»ÐèҪдÈ룩¡£Èç¹û·µ»Øfalse£¬ÄÇô½«²»»áÔÚÈκÎÐÐÉϵ÷Óýø³Ì¡£ÀýÈ磬ÔÚ²¿·Ö¹ÊÕÏÖ®ºó£¬Ê§°Ü´¥·¢Æ÷µÄһЩÊä³ö·ÖÇø¿ÉÄÜÒѾ±»Ìá½»µ½Êý¾Ý¿â¡£»ùÓÚ´æ´¢ÔÚÊý¾Ý¿âÖеÄÔªÊý¾Ý£¬Ð´Õß¿ÉÒÔʶ±ðÒѾÌá½»µÄ·ÖÇø£¬Òò´Ë·µ»ØfalseÒÔÌø¹ýÔÙ´ÎÌá½»ËüÃÇ¡£
ÿµ±µ÷ÓÃopenʱ£¬Ò²½«µ÷ÓÃclose£¨³ý·ÇJVMÓÉÓÚijЩ´íÎó¶øÍ˳ö£©¡£¼´Ê¹open·µ»Øfalse£¬Ò²ÊÇÈç´Ë¡£Èç¹ûÔÚ´¦ÀíºÍдÈëÊý¾Ýʱ³öÏÖÈκδíÎ󣬽«Ê¹ÓôíÎóµ÷ÓÃclose¡£ÄúÓÐÔðÈÎÇå³ýÔÚ¿ª·ÅÖд´½¨µÄ״̬£¨ÀýÈçÁ¬½Ó£¬ÊÂÎñµÈ£©£¬ÒÔ±ãûÓÐ×ÊԴй©¡£
¹ÜÀíÁ÷ʽ²éѯ
Æô¶¯²éѯʱ´´½¨µÄStreamingQuery¶ÔÏó¿ÉÓÃÓÚ¼àÊӺ͹ÜÀí²éѯ¡£
| StreamingQuery
query = df.writeStream().format("console").start();
// get the query object
query.id(); // get the unique identifier of
the running query
query.name(); // get the name of the auto-generated
or user-specified name
query.explain(); // print detailed explanations
of the query
query.stop(); // stop the query
query.awaitTermination(); // block until query
is terminated, with stop() or with error
query.exception(); // the exception if the
query has been terminated with error
query.sourceStatus(); // progress information
about data has been read from the input sources
query.sinkStatus(); // progress information
about data written to the output sink |
Äú¿ÉÒÔÔÚµ¥¸öSparkSessionÖÐÆô¶¯ÈÎÒâÊýÁ¿µÄ²éѯ¡£ËûÃǽ«Í¬Ê±ÔËÐй²Ïí¼¯Èº×ÊÔ´¡£Äú¿ÉÒÔʹÓÃsparkSession.streams£¨£©»ñÈ¡¿ÉÓÃÓÚ¹ÜÀíµ±Ç°»î¶¯²éѯµÄStreamingQueryManager£¨Scala
/ Java / PythonÎĵµ£©¡£
| SparkSession
spark = ...
spark.streams().active(); // get the list of
currently active streaming queries
spark.streams().get(id); // get a query object
by its unique id
spark.streams().awaitAnyTermination(); // block
until any one of them terminates |
¼àÊÓÁ÷²éѯ
ÓÐÁ½¸öAPIÓÃÓÚÒÔ½»»¥Ê½ºÍÒì²½·½Ê½¼àÊӺ͵÷ÊԻµÄ²éѯ¡£
½»»¥Ê½API
Äú¿ÉÒÔʹÓà streamingQuery.lastProgress£¨£©ºÍ
streamingQuery.status£¨£©Ö±½Ó»ñÈ¡»î¶¯²éѯµÄµ±Ç°×´Ì¬ºÍÖ¸±ê¡£ lastProgress£¨£©ÔÚ
Scala ºÍJavaÖзµ»ØÒ»¸ö StreamingQueryProgress ¶ÔÏó£¬ÔÚPythonÖзµ»ØÒ»¸ö¾ßÓÐÏàͬ×ֶεÄ×ֵ䡣Ëü¾ßÓйØÓÚÔÚÁ÷µÄ×îºó´¥·¢ÖÐËù½øÐеĽøÕ¹µÄËùÓÐÐÅÏ¢
- ʲôÊý¾Ý±»´¦Àí£¬Ê²Ã´ÊÇ´¦ÀíËÙÂÊ£¬µÈ´ýʱ¼äµÈ¡£»¹ÓÐ streamingQuery.recentProgress
£¬Ëü·µ»Ø×îºó¼¸¸ö½ø¶ÈµÄÊý×é¡£
´ËÍ⣬streamingQuery.status£¨£©ÔÚScalaºÍJavaÖзµ»ØStreamingQueryStatus¶ÔÏó£¬ÔÚPythonÖзµ»Ø¾ßÓÐÏàͬ×ֶεÄ×ֵ䡣ËüÌṩÓйزéѯÁ¢¼´Ö´ÐеIJÙ×÷µÄÐÅÏ¢
- ÊÇ´¥·¢Æ÷»î¶¯£¬ÕýÔÚ´¦ÀíÊý¾ÝµÈ¡£ÕâÀïÓм¸¸öÀý×Ó¡£
| StreamingQuery
query = ...
System.out.println(query.lastProgress());
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
System.out.println(query.status());
/* Will print something like the following.
{
"message" : "Waiting for data
to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
|
Òì²½API
Äú»¹¿ÉÒÔͨ¹ý¸½¼Ó StreamingQueryListener£¨
Scala / Java docs £©Òì²½¼àÊÓÓë SparkSession Ïà¹ØÁªµÄËùÓвéѯ¡£Ê¹ÓÃ
sparkSession . streams . attachListener£¨£©¸½¼Ó×Ô¶¨ÒåStreamingQueryListener¶ÔÏóºó£¬µ±²éѯÆô¶¯ºÍÍ£Ö¹ÒÔ¼°»î¶¯²éѯÖÐÓнø¶Èʱ£¬Äú½«»ñµÃ»Øµ÷¡£ÕâÀïÊÇÒ»¸öÀý×Ó
| SparkSession
spark = ...
spark.streams.addListener(new StreamingQueryListener()
{
@Overrides void onQueryStarted(QueryStartedEvent
queryStarted) {
System.out.println("Query started: "
+ queryStarted.id());
}
@Overrides void onQueryTerminated(QueryTerminatedEvent
queryTerminated) {
System.out.println("Query terminated: "
+ queryTerminated.id());
}
@Overrides void onQueryProgress(QueryProgressEvent
queryProgress) {
System.out.println("Query made progress:
" + queryProgress.progress());
}
}); |
ʹÓüì²éµã´Ó¹ÊÕÏÖлָ´
ÔÚ¹ÊÕÏ»ò¹ÊÒâ¹Ø±ÕµÄÇé¿öÏ£¬Äú¿ÉÒÔ»Ö¸´ÏÈǰ²éѯµÄÏÈǰ½ø¶ÈºÍ״̬£¬²¢¼ÌÐøÔÚÆäÍ£Ö¹µÄµØ·½¡£ÕâÊÇͨ¹ýʹÓüì²éµãºÍԤдÈÕÖ¾À´Íê³ÉµÄ¡£Äú¿ÉÒÔÅäÖþßÓмì²éµãλÖõIJéѯ£¬²¢ÇÒ²éѯ½«±£´æËùÓнø¶ÈÐÅÏ¢£¨¼´Ã¿¸ö´¥·¢Æ÷Öд¦ÀíµÄÆ«ÒÆ·¶Î§£©ºÍÕýÔÚÔËÐеľۺϣ¨ÀýÈç¿ìËÙʾÀýÖеÄ×Ö¼ÆÊý£©µ½¼ì²éµãλÖᣴ˼ì²éµãλÖñØÐëÊÇHDFS¼æÈÝÎļþϵͳÖеÄ·¾¶£¬²¢ÇÒ¿ÉÒÔÔÚÆô¶¯²éѯʱÔÚDataStreamWriterÖÐÉèÖÃΪѡÏî¡£
| aggDF
.writeStream()
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start();
|
|