Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Spark²È¿Ó¼Ç¡ª¡ªSpark Streaming+Kafka
 
×÷Õߣº²©¿Í ·¢²¼ÓÚ£º2017-11-16
  4241  次浏览      27
 

ǰÑÔ

ÔÚWeTestÓßÇéÏîÄ¿ÖУ¬ÐèÒª¶ÔÿÌìǧÍò¼¶µÄÓÎÏ·ÆÀÂÛÐÅÏ¢½øÐÐ´ÊÆµÍ³¼Æ£¬ÔÚÉú²úÕßÒ»¶Ë£¬ÎÒÃǽ«Êý¾Ý°´ÕÕÿÌìµÄÀ­È¡Ê±¼ä´æÈëÁËKafkaµ±ÖУ¬¶øÔÚÏû·ÑÕßÒ»¶Ë£¬ÎÒÃÇÀûÓÃÁËspark streaming´ÓkafkaÖв»¶ÏÀ­È¡Êý¾Ý½øÐÐ´ÊÆµÍ³¼Æ¡£±¾ÎÄÊ×ÏȶÔspark streamingǶÈëkafkaµÄ·½Ê½½øÐйéÄÉ×ܽᣬ֮ºó¼òµ¥²ûÊöSpark streaming+kafkaÔÚÓßÇéÏîÄ¿ÖеÄÓ¦Óã¬×îºó½«×Ô¼ºÔÚSpark Streaming+kafkaµÄʵ¼ÊÓÅ»¯ÖеÄһЩ¾­Ñé½øÐйéÄÉ×ܽᡣ£¨ÈçÓÐÈκÎç¢Â©»¶Ó­²¹³äÀ´²È£¬ÎÒ»áµÚһʱ¼ä¸ÄÕý^v^£©

Spark streaming½ÓÊÕKafkaÊý¾Ý

ÓÃspark streamingÁ÷ʽ´¦ÀíkafkaÖеÄÊý¾Ý£¬µÚÒ»²½µ±È»ÊÇÏȰÑÊý¾Ý½ÓÊÕ¹ýÀ´£¬×ª»»Îªspark streamingÖеÄÊý¾Ý½á¹¹Dstream¡£½ÓÊÕÊý¾ÝµÄ·½Ê½ÓÐÁ½ÖÖ£º1.ÀûÓÃReceiver½ÓÊÕÊý¾Ý£¬2.Ö±½Ó´Ókafka¶ÁÈ¡Êý¾Ý¡£

»ùÓÚReceiverµÄ·½Ê½

ÕâÖÖ·½Ê½ÀûÓýÓÊÕÆ÷£¨Receiver£©À´½ÓÊÕkafkaÖеÄÊý¾Ý£¬Æä×î»ù±¾ÊÇʹÓÃKafka¸ß½×Óû§API½Ó¿Ú¡£¶ÔÓÚËùÓеĽÓÊÕÆ÷£¬´Ókafka½ÓÊÕÀ´µÄÊý¾Ý»á´æ´¢ÔÚsparkµÄexecutorÖУ¬Ö®ºóspark streamingÌá½»µÄjob»á´¦ÀíÕâЩÊý¾Ý¡£ÈçÏÂͼ£º

ÔÚʹÓÃʱ£¬ÎÒÃÇÐèÒªÌí¼ÓÏàÓ¦µÄÒÀÀµ°ü£º

<dependency><!-- Spark Streaming Kafka --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.3</version> </dependency>

¶ø¶ÔÓÚScalaµÄ»ù±¾Ê¹Ó÷½Ê½ÈçÏ£º

import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

»¹Óм¸¸öÐèҪעÒâµÄµã£º

  • ÔÚReceiverµÄ·½Ê½ÖУ¬SparkÖеÄpartitionºÍkafkaÖеÄpartition²¢²»ÊÇÏà¹ØµÄ£¬ËùÒÔÈç¹ûÎÒÃǼӴóÿ¸ötopicµÄpartitionÊýÁ¿£¬½ö½öÊÇÔö¼ÓÏß³ÌÀ´´¦ÀíÓɵ¥Ò»ReceiverÏû·ÑµÄÖ÷Ìâ¡£µ«ÊÇÕⲢûÓÐÔö¼ÓSparkÔÚ´¦ÀíÊý¾ÝÉϵIJ¢Ðжȡ£
  • ¶ÔÓÚ²»Í¬µÄGroupºÍtopicÎÒÃÇ¿ÉÒÔʹÓöà¸öReceiver´´½¨²»Í¬µÄDstreamÀ´²¢ÐнÓÊÕÊý¾Ý£¬Ö®ºó¿ÉÒÔÀûÓÃunionÀ´Í³Ò»³ÉÒ»¸öDstream¡£
  • Èç¹ûÎÒÃÇÆôÓÃÁËWrite Ahead Logs¸´ÖƵ½ÎļþϵͳÈçHDFS£¬ÄÇôstorage levelÐèÒªÉèÖÃ³É StorageLevel.MEMORY_AND_DISK_SER£¬Ò²¾ÍÊÇKafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

Ö±½Ó¶ÁÈ¡·½Ê½

ÔÚspark1.3Ö®ºó£¬ÒýÈëÁËDirect·½Ê½¡£²»Í¬ÓÚReceiverµÄ·½Ê½£¬Direct·½Ê½Ã»ÓÐreceiverÕâÒ»²ã£¬Æä»áÖÜÆÚÐԵĻñÈ¡KafkaÖÐÿ¸ötopicµÄÿ¸öpartitionÖеÄ×îÐÂoffsets£¬Ö®ºó¸ù¾ÝÉ趨µÄmaxRatePerPartitionÀ´´¦Àíÿ¸öbatch¡£ÆäÐÎʽÈçÏÂͼ£º

ÕâÖÖ·½·¨Ïà½ÏÓÚReceiver·½Ê½µÄÓÅÊÆÔÚÓÚ£º

  • ¼ò»¯µÄ²¢ÐУºÔÚReceiverµÄ·½Ê½ÖÐÎÒÃÇÌáµ½´´½¨¶à¸öReceiverÖ®ºóÀûÓÃunionÀ´ºÏ²¢³ÉÒ»¸öDstreamµÄ·½Ê½Ìá¸ßÊý¾Ý´«Êä²¢Ðжȡ£¶øÔÚDirect·½Ê½ÖУ¬KafkaÖеÄpartitionÓëRDDÖеÄpartitionÊÇÒ»Ò»¶ÔÓ¦µÄ²¢ÐжÁÈ¡KafkaÊý¾Ý£¬ÕâÖÖÓ³Éä¹ØÏµÒ²¸üÀûÓÚÀí½âºÍÓÅ»¯¡£
  • ¸ßЧ£ºÔÚReceiverµÄ·½Ê½ÖУ¬ÎªÁË´ïµ½0Êý¾Ý¶ªÊ§ÐèÒª½«Êý¾Ý´æÈëWrite Ahead LogÖУ¬ÕâÑùÔÚKafkaºÍÈÕÖ¾Öоͱ£´æÁËÁ½·ÝÊý¾Ý£¬ÀË·Ñ£¡¶øµÚ¶þÖÖ·½Ê½²»´æÔÚÕâ¸öÎÊÌ⣬ֻҪÎÒÃÇKafkaµÄÊý¾Ý±£Áôʱ¼ä×ã¹»³¤£¬ÎÒÃǶ¼Äܹ»´ÓKafka½øÐÐÊý¾Ý»Ö¸´¡£
  • ¾«È·Ò»´Î£ºÔÚReceiverµÄ·½Ê½ÖУ¬Ê¹ÓõÄÊÇKafkaµÄ¸ß½×API½Ó¿Ú´ÓZookeeperÖлñÈ¡offsetÖµ£¬ÕâÒ²ÊÇ´«Í³µÄ´ÓKafkaÖжÁÈ¡Êý¾ÝµÄ·½Ê½£¬µ«ÓÉÓÚSpark StreamingÏû·ÑµÄÊý¾ÝºÍZookeeperÖмǼµÄoffset²»Í¬²½£¬ÕâÖÖ·½Ê½Å¼¶û»áÔì³ÉÊý¾ÝÖØ¸´Ïû·Ñ¡£¶øµÚ¶þÖÖ·½Ê½£¬Ö±½ÓʹÓÃÁ˼òµ¥µÄµÍ½×Kafka API£¬OffsetsÔòÀûÓÃSpark StreamingµÄcheckpoints½øÐмǼ£¬Ïû³ýÁËÕâÖÖ²»Ò»ÖÂÐÔ¡£

ÒÔÉÏÖ÷ÒªÊǶԹٷ½Îĵµ[1]µÄÒ»¸ö¼òµ¥·­Ò룬ÏêϸÄÚÈÝ´ó¼Ò¿ÉÒÔÖ±½Ó¿´Ï¹ٷ½ÎĵµÕâÀï²»ÔÙ׸Êö¡£

²»Í¬ÓÚReceiverµÄ·½Ê½£¬ÊÇ´ÓZookeeperÖжÁÈ¡offsetÖµ£¬ÄÇô×ÔÈ»zookeeper¾Í±£´æÁ˵±Ç°Ïû·ÑµÄoffsetÖµ£¬ÄÇôÈç¹ûÖØÐÂÆô¶¯¿ªÊ¼Ïû·Ñ¾Í»á½Ó×ÅÉÏÒ»´ÎoffsetÖµ¼ÌÐøÏû·Ñ¡£¶øÔÚDirectµÄ·½Ê½ÖУ¬ÎÒÃÇÊÇÖ±½Ó´ÓkafkaÀ´¶ÁÊý¾Ý£¬ÄÇôoffsetÐèÒª×Ô¼º¼Ç¼£¬¿ÉÒÔÀûÓÃcheckpoint¡¢Êý¾Ý¿â»òÎļþ¼Ç¼»òÕß»ØÐ´µ½zookeeperÖнøÐмǼ¡£ÕâÀïÎÒÃǸø³öÀûÓÃKafkaµ×²ãAPI½Ó¿Ú£¬½«offset¼°Ê±Í¬²½µ½zookeeperÖеÄͨÓÃÀ࣬ÎÒ½«Æä·ÅÔÚÁËgithubÉÏ£º

Spark streaming+Kafka https:// github.com/xlturing /MySpark/tree/ master/SparkStreamingKafka

ʾÀýÖÐKafkaManagerÊÇÒ»¸öͨÓÃÀ࣬¶øKafkaClusterÊÇkafkaÔ´ÂëÖеÄÒ»¸öÀ࣬ÓÉÓÚ°üÃûȨÏÞµÄÔ­ÒòÎÒ°ÑËüµ¥¶ÀÌá³öÀ´£¬ComsumerMain¼òµ¥Õ¹Ê¾ÁËͨÓÃÀàµÄʹÓ÷½·¨£¬ÔÚÿ´Î´´½¨KafkaStreamʱ£¬¶¼»áÏÈ´ÓzookerÖв鿴ÉϴεÄÏû·Ñ¼Ç¼offsets£¬¶øÃ¿¸öbatch´¦ÀíÍê³Éºó£¬»áͬ²½offsetsµ½zookeeperÖС£

SparkÏòkafkaÖÐдÈëÊý¾Ý

ÉÏÎIJûÊöÁËSparkÈçºÎ´ÓKafkaÖÐÁ÷ʽµÄ¶ÁÈ¡Êý¾Ý£¬ÏÂÃæÎÒÕûÀíÏòKafkaÖÐдÊý¾Ý¡£Óë¶ÁÊý¾Ý²»Í¬£¬Spark²¢Ã»ÓÐÌṩͳһµÄ½Ó¿ÚÓÃÓÚдÈëKafka£¬ËùÒÔÎÒÃÇÐèҪʹÓõײãKafka½Ó¿Ú½øÐаü×°¡£
×îÖ±½ÓµÄ×ö·¨ÎÒÃÇ¿ÉÒÔÏëµ½ÈçÏÂÕâÖÖ·½Ê½£º

input.foreachRDD(rdd =>
// ²»ÄÜÔÚÕâÀï´´½¨KafkaProducer
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
)
)

µ«ÊÇÕâÖÖ·½Ê½È±µãºÜÃ÷ÏÔ£¬¶ÔÓÚÿ¸öpartitionµÄÿÌõ¼Ç¼£¬ÎÒÃǶ¼ÐèÒª´´½¨KafkaProducer£¬È»ºóÀûÓÃproducer½øÐÐÊä³ö²Ù×÷£¬×¢ÒâÕâÀïÎÒÃDz¢²»Äܽ«KafkaProducerµÄн¨ÈÎÎñ·ÅÔÚforeachPartitionÍâ±ß£¬ÒòΪKafkaProducerÊDz»¿ÉÐòÁл¯µÄ£¨not serializable£©¡£ÏÔÈ»ÕâÖÖ×ö·¨ÊDz»Áé»îÇÒµÍЧµÄ£¬ÒòΪÿÌõ¼Ç¼¶¼ÐèÒª½¨Á¢Ò»´ÎÁ¬½Ó¡£ÈçºÎ½â¾öÄØ£¿

  1. Ê×ÏÈ£¬ÎÒÃÇÐèÒª½«KafkaProducerÀûÓÃlazy valµÄ·½Ê½½øÐаü×°ÈçÏ£º
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
  1. Ö®ºóÎÒÃÇÀûÓù㲥±äÁ¿µÄÐÎʽ£¬½«KafkaProducer¹ã²¥µ½Ã¿Ò»¸öexecutor£¬ÈçÏ£º
// ¹ã²¥KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", Conf.brokers)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}

ÕâÑùÎÒÃǾÍÄÜÔÚÿ¸öexecutorÖÐÓä¿ìµÄ½«Êý¾ÝÊäÈëµ½kafkaµ±ÖУº

//Êä³öµ½kafka
segmentedStream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreach(record => {
kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
// do something else
})
}
})

Spark streaming+KafkaÓ¦ÓÃ

WeTestÓßÇé¼à¿Ø¶ÔÓÚÿÌìÅÀÈ¡µÄǧÍò¼¶ÓÎÏ·Íæ¼ÒÆÀÂÛÐÅÏ¢¶¼ÒªÊµÊ±µÄ½øÐÐ´ÊÆµÍ³¼Æ£¬¶ÔÓÚÅÀÈ¡µ½µÄÓÎÏ·Íæ¼ÒÆÀÂÛÊý¾Ý£¬ÎÒÃÇ»áÉú²úµ½KafkaÖУ¬¶øÁíÒ»¶ËµÄÏû·ÑÕßÎÒÃDzÉÓÃÁËSpark StreamingÀ´½øÐÐÁ÷ʽ´¦Àí£¬Ê×ÏÈÀûÓÃÉÏÎÄÎÒÃDzûÊöµÄDirect·½Ê½´ÓKafkaÀ­È¡batch£¬Ö®ºó¾­¹ý·Ö´Ê¡¢Í³¼ÆµÈÏà¹Ø´¦Àí£¬»ØÐ´µ½DBÉÏ£¨ÖÁÓÚSparkÖÐDBµÄ»ØÐ´·½Ê½¿É²Î¿¼ÎÒ֮ǰ×ܽáµÄ²©ÎÄ£ºSpark²È¿Ó¼Ç¡ª¡ªÊý¾Ý¿â£¨Hbase+Mysql£©£©£¬Óɴ˸ßЧʵʱµÄÍê³ÉÿÌì´óÁ¿Êý¾ÝµÄ´ÊƵͳ¼ÆÈÎÎñ¡£

Spark streaming+Kafkaµ÷ÓÅ

Spark streaming+KafkaµÄʹÓÃÖУ¬µ±Êý¾ÝÁ¿½ÏС£¬ºÜ¶àʱºòĬÈÏÅäÖúÍʹÓñãÄܹ»Âú×ãÇé¿ö£¬µ«Êǵ±Êý¾ÝÁ¿´óµÄʱºò£¬¾ÍÐèÒª½øÐÐÒ»¶¨µÄµ÷ÕûºÍÓÅ»¯£¬¶øÕâÖÖµ÷ÕûºÍÓÅ»¯±¾ÉíÒ²ÊDz»Í¬µÄ³¡¾°ÐèÒª²»Í¬µÄÅäÖá£

ºÏÀíµÄÅú´¦Àíʱ¼ä£¨batchDuration£©

¼¸ºõËùÓеÄSpark Streamingµ÷ÓÅÎĵµ¶¼»áÌá¼°Åú´¦Àíʱ¼äµÄµ÷Õû£¬ÔÚStreamingContext³õʼ»¯µÄʱºò£¬ÓÐÒ»¸ö²ÎÊý±ãÊÇÅú´¦Àíʱ¼äµÄÉ趨¡£Èç¹ûÕâ¸öÖµÉèÖõĹý¶Ì£¬¼´¸öbatchDurationËù²úÉúµÄJob²¢²»ÄÜÔÚÕâÆÚ¼äÍê³É´¦Àí£¬ÄÇô¾Í»áÔì³ÉÊý¾Ý²»¶Ï¶Ñ»ý£¬×îÖÕµ¼ÖÂSpark Streaming·¢Éú×èÈû¡£¶øÇÒ£¬Ò»°ã¶ÔÓÚbatchDurationµÄÉèÖò»»áСÓÚ500ms£¬ÒòΪ¹ýС»áµ¼ÖÂSparkStreamingƵ·±µÄÌá½»×÷Òµ£¬¶ÔÕû¸östreamingÔì³É¶îÍâµÄ¸ºµ£¡£ÔÚÆ½Ê±µÄÓ¦ÓÃÖУ¬¸ù¾Ý²»Í¬µÄÓ¦Óó¡¾°ºÍÓ²¼þÅäÖã¬ÎÒÉèÔÚ1~10sÖ®¼ä£¬ÎÒÃÇ¿ÉÒÔ¸ù¾ÝSparkStreamingµÄ¿ÉÊÓ»¯¼à¿Ø½çÃæ£¬¹Û²ìTotal DelayÀ´½øÐÐbatchDurationµÄµ÷Õû£¬ÈçÏÂͼ£º

ºÏÀíµÄKafkaÀ­È¡Á¿£¨maxRatePerPartitionÖØÒª£©

¶ÔÓÚSpark StreamingÏû·ÑkafkaÖÐÊý¾ÝµÄÓ¦Óó¡¾°£¬Õâ¸öÅäÖÃÊǷdz£¹Ø¼üµÄ£¬ÅäÖòÎÊýΪ£ºspark.streaming.kafka.maxRatePerPartition¡£Õâ¸ö²ÎÊýĬÈÏÊÇûÓÐÉÏÏߵ쬼´kafkaµ±ÖÐÓжàÉÙÊý¾ÝËü¾Í»áÖ±½ÓÈ«²¿À­³ö¡£¶ø¸ù¾ÝÉú²úÕßдÈëKafkaµÄËÙÂÊÒÔ¼°Ïû·ÑÕß±¾Éí´¦ÀíÊý¾ÝµÄËÙ¶È£¬Í¬Ê±Õâ¸ö²ÎÊýÐèÒª½áºÏÉÏÃæµÄbatchDuration£¬Ê¹µÃÿ¸öpartitionÀ­È¡ÔÚÿ¸öbatchDurationÆÚ¼äÀ­È¡µÄÊý¾ÝÄܹ»Ë³ÀûµÄ´¦ÀíÍê±Ï£¬×öµ½¾¡¿ÉÄܸߵÄÍÌÍÂÁ¿£¬¶øÕâ¸ö²ÎÊýµÄµ÷Õû¿ÉÒԲο¼¿ÉÊÓ»¯¼à¿Ø½çÃæÖеÄInput RateºÍProcessing Time£¬ÈçÏÂͼ£º

»º´æ·´¸´Ê¹ÓõÄDstream£¨RDD£©

SparkÖеÄRDDºÍSparkStreamingÖеÄDstream£¬Èç¹û±»·´¸´µÄʹÓã¬×îºÃÀûÓÃcache()£¬½«¸ÃÊý¾ÝÁ÷»º´æÆðÀ´£¬·ÀÖ¹¹ý¶ÈµÄµ÷¶È×ÊÔ´Ôì³ÉµÄÍøÂ翪Ïú¡£¿ÉÒԲο¼¹Û²ìScheduling Delay²ÎÊý£¬ÈçÏÂͼ£º

ÉèÖúÏÀíµÄGC

³¤ÆÚʹÓÃJavaµÄС»ï°é¶¼ÖªµÀ£¬JVMÖеÄÀ¬»ø»ØÊÕ»úÖÆ£¬¿ÉÒÔÈÃÎÒÃDz»¹ý¶àµÄ¹Ø×¢ÓëÄÚ´æµÄ·ÖÅ仨ÊÕ£¬¸ü¼ÓרעÓÚÒµÎñÂß¼­£¬JVM¶¼»áΪÎÒÃǸ㶨¡£¶ÔJVMÓÐЩÁ˽âµÄС»ï°éÓ¦¸ÃÖªµÀ£¬ÔÚJavaÐéÄâ»úÖУ¬½«ÄÚ´æ·ÖΪÁ˳õÉú´ú£¨eden generation£©¡¢ÄêÇá´ú£¨young generation£©¡¢ÀÏÄê´ú£¨old generation£©ÒÔ¼°ÓÀ¾Ã´ú£¨permanent generation£©£¬ÆäÖÐÿ´ÎGC¶¼ÊÇÐèÒªºÄ·ÑÒ»¶¨Ê±¼äµÄ£¬ÓÈÆäÊÇÀÏÄê´úµÄGC»ØÊÕ£¬ÐèÒª¶ÔÄÚ´æË鯬½øÐÐÕûÀí£¬Í¨³£²ÉÓñê¼Ç-Çå³þµÄ×ö·¨¡£Í¬ÑùµÄÔÚSpark³ÌÐòÖУ¬JVM GCµÄƵÂʺÍʱ¼äÒ²ÊÇÓ°ÏìÕû¸öSparkЧÂʵĹؼüÒòËØ¡£ÔÚͨ³£µÄʹÓÃÖн¨Ò飺

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

ÉèÖúÏÀíµÄCPU×ÊÔ´Êý

CPUµÄcoreÊýÁ¿£¬Ã¿¸öexecutor¿ÉÒÔÕ¼ÓÃÒ»¸ö»ò¶à¸öcore£¬¿ÉÒÔͨ¹ý¹Û²ìCPUµÄʹÓÃÂʱ仯À´ÁË½â¼ÆËã×ÊÔ´µÄʹÓÃÇé¿ö£¬ÀýÈ磬ºÜ³£¼ûµÄÒ»ÖÖÀË·ÑÊÇÒ»¸öexecutorÕ¼ÓÃÁ˶à¸öcore£¬µ«ÊÇ×ܵÄCPUʹÓÃÂÊÈ´²»¸ß£¨ÒòΪһ¸öexecutor²¢²»×ÜÄܳä·ÖÀûÓöàºËµÄÄÜÁ¦£©£¬Õâ¸öʱºò¿ÉÒÔ¿¼ÂÇÈÃô¸öexecutorÕ¼ÓøüÉÙµÄcore£¬Í¬Ê±workerÏÂÃæÔö¼Ó¸ü¶àµÄexecutor£¬»òÕßһ̨hostÉÏÃæÔö¼Ó¸ü¶àµÄworkerÀ´Ôö¼Ó²¢ÐÐÖ´ÐеÄexecutorµÄÊýÁ¿£¬´Ó¶øÔö¼ÓCPUÀûÓÃÂÊ¡£µ«ÊÇÔö¼ÓexecutorµÄʱºòÐèÒª¿¼ÂǺÃÄÚ´æÏûºÄ£¬ÒòΪһ̨»úÆ÷µÄÄÚ´æ·ÖÅä¸øÔ½¶àµÄexecutor£¬Ã¿¸öexecutorµÄÄÚ´æ¾ÍԽС£¬ÒÔÖ³öÏÖ¹ý¶àµÄÊý¾Ýspill overÉõÖÁout of memoryµÄÇé¿ö¡£

ÉèÖúÏÀíµÄparallelism

partitionºÍparallelism£¬partitionÖ¸µÄ¾ÍÊÇÊý¾Ý·ÖƬµÄÊýÁ¿£¬Ã¿Ò»´ÎtaskÖ»ÄÜ´¦ÀíÒ»¸öpartitionµÄÊý¾Ý£¬Õâ¸öֵ̫СÁ˻ᵼÖÂÿƬÊý¾ÝÁ¿Ì«´ó£¬µ¼ÖÂÄÚ´æÑ¹Á¦£¬»òÕßÖî¶àexecutorµÄ¼ÆËãÄÜÁ¦ÎÞ·¨ÀûÓóä·Ö£»µ«ÊÇÈç¹ûÌ«´óÁËÔò»áµ¼ÖÂ·ÖÆ¬Ì«¶à£¬Ö´ÐÐЧÂʽµµÍ¡£ÔÚÖ´ÐÐactionÀàÐͲÙ×÷µÄʱºò£¨±ÈÈç¸÷ÖÖreduce²Ù×÷£©£¬partitionµÄÊýÁ¿»áÑ¡Ôñparent RDDÖÐ×î´óµÄÄÇÒ»¸ö¡£¶øparallelismÔòÖ¸µÄÊÇÔÚRDD½øÐÐreduceÀà²Ù×÷µÄʱºò£¬Ä¬ÈÏ·µ»ØÊý¾ÝµÄparititionÊýÁ¿£¨¶øÔÚ½øÐÐmapÀà²Ù×÷µÄʱºò£¬partitionÊýÁ¿Í¨³£È¡×Ôparent RDDÖнϴóµÄÒ»¸ö£¬¶øÇÒÒ²²»»áÉæ¼°shuffle£¬Òò´ËÕâ¸öparallelismµÄ²ÎÊýûÓÐÓ°Ï죩¡£ËùÒÔ˵£¬ÕâÁ½¸ö¸ÅÄîÃÜÇÐÏà¹Ø£¬¶¼ÊÇÉæ¼°µ½Êý¾Ý·ÖƬµÄ£¬×÷Ó÷½Ê½ÆäʵÊÇͳһµÄ¡£Í¨¹ýspark.default.parallelism¿ÉÒÔÉèÖÃĬÈ쵀ᅮ¬ÊýÁ¿£¬¶øºÜ¶àRDDµÄ²Ù×÷¶¼¿ÉÒÔÖ¸¶¨Ò»¸öpartition²ÎÊýÀ´ÏÔʽ¿ØÖƾßÌåµÄ·ÖƬÊýÁ¿¡£

ÔÚSparkStreaming+kafkaµÄʹÓÃÖУ¬ÎÒÃDzÉÓÃÁËDirectÁ¬½Ó·½Ê½£¬Ç°ÎIJûÊö¹ýSparkÖеÄpartitionºÍKafkaÖеÄPartitionÊÇÒ»Ò»¶ÔÓ¦µÄ£¬ÎÒÃÇÒ»°ãĬÈÏÉèÖÃΪKafkaÖÐPartitionµÄÊýÁ¿¡£

ʹÓøßÐÔÄܵÄËã×Ó

ÕâÀï²Î¿¼ÁËÃÀÍż¼ÊõÍŶӵIJ©ÎÄ£¬²¢Ã»ÓÐ×ö¹ý¾ßÌåµÄÐÔÄܲâÊÔ£¬Æä½¨ÒéÈçÏ£º

  • ʹÓÃreduceByKey/aggregateByKeyÌæ´úgroupByKey
  • ʹÓÃmapPartitionsÌæ´úÆÕͨmap
  • ʹÓÃforeachPartitionsÌæ´úforeach
  • ʹÓÃfilterÖ®ºó½øÐÐcoalesce²Ù×÷
  • ʹÓÃrepartitionAndSortWithinPartitionsÌæ´úrepartitionÓësortÀà²Ù×÷

ʹÓÃKryoÓÅ»¯ÐòÁл¯ÐÔÄÜ

Õâ¸öÓÅ»¯Ô­ÔòÎÒ±¾ÉíҲûÓо­¹ý²âÊÔ£¬µ«ÊǺöàÓÅ»¯ÎĵµÓÐÌáµ½£¬ÕâÀïÒ²¼Ç¼ÏÂÀ´¡£

ÔÚSparkÖУ¬Ö÷ÒªÓÐÈý¸öµØ·½Éæ¼°µ½ÁËÐòÁл¯£º

  • ÔÚËã×Óº¯ÊýÖÐʹÓõ½Íⲿ±äÁ¿Ê±£¬¸Ã±äÁ¿»á±»ÐòÁл¯ºó½øÐÐÍøÂç´«Ê䣨¼û¡°Ô­ÔòÆß£º¹ã²¥´ó±äÁ¿¡±ÖеĽ²½â£©¡£
  • ½«×Ô¶¨ÒåµÄÀàÐÍ×÷ΪRDDµÄ·ºÐÍÀàÐÍʱ£¨±ÈÈçJavaRDD£¬StudentÊÇ×Ô¶¨ÒåÀàÐÍ£©£¬ËùÓÐ×Ô¶¨ÒåÀàÐͶÔÏ󣬶¼»á½øÐÐÐòÁл¯¡£Òò´ËÕâÖÖÇé¿öÏ£¬Ò²ÒªÇó×Ô¶¨ÒåµÄÀà±ØÐëʵÏÖSerializable½Ó¿Ú¡£
  • ʹÓÿÉÐòÁл¯µÄ³Ö¾Ã»¯²ßÂÔʱ£¨±ÈÈçMEMORY_ONLY_SER£©£¬Spark»á½«RDDÖеÄÿ¸öpartition¶¼ÐòÁл¯³ÉÒ»¸ö´óµÄ×Ö½ÚÊý×é¡£

¶ÔÓÚÕâÈýÖÖ³öÏÖÐòÁл¯µÄµØ·½£¬ÎÒÃǶ¼¿ÉÒÔͨ¹ýʹÓÃKryoÐòÁл¯Àà¿â£¬À´ÓÅ»¯ÐòÁл¯ºÍ·´ÐòÁл¯µÄÐÔÄÜ¡£SparkĬÈÏʹÓõÄÊÇJavaµÄÐòÁл¯»úÖÆ£¬Ò²¾ÍÊÇObjectOutputStream/ObjectInputStream APIÀ´½øÐÐÐòÁл¯ºÍ·´ÐòÁл¯¡£µ«ÊÇSparkͬʱ֧³ÖʹÓÃKryoÐòÁл¯¿â£¬KryoÐòÁл¯Àà¿âµÄÐÔÄܱÈJavaÐòÁл¯Àà¿âµÄÐÔÄÜÒª¸ßºÜ¶à¡£¹Ù·½½éÉÜ£¬KryoÐòÁл¯»úÖÆ±ÈJavaÐòÁл¯»úÖÆ£¬ÐÔÄܸß10±¶×óÓÒ¡£SparkÖ®ËùÒÔĬÈÏûÓÐʹÓÃKryo×÷ΪÐòÁл¯Àà¿â£¬ÊÇÒòΪKryoÒªÇó×îºÃҪע²áËùÓÐÐèÒª½øÐÐÐòÁл¯µÄ×Ô¶¨ÒåÀàÐÍ£¬Òò´Ë¶ÔÓÚ¿ª·¢ÕßÀ´Ëµ£¬ÕâÖÖ·½Ê½±È½ÏÂé·³¡£

ÒÔÏÂÊÇʹÓÃKryoµÄ´úÂëʾÀý£¬ÎÒÃÇÖ»ÒªÉèÖÃÐòÁл¯À࣬ÔÙ×¢²áÒªÐòÁл¯µÄ×Ô¶¨ÒåÀàÐͼ´¿É£¨±ÈÈçËã×Óº¯ÊýÖÐʹÓõ½µÄÍⲿ±äÁ¿ÀàÐÍ¡¢×÷ΪRDD·ºÐÍÀàÐ͵Ä×Ô¶¨ÒåÀàÐ͵ȣ©£º

// ´´½¨SparkConf¶ÔÏó¡£
val conf = new SparkConf().setMaster(...).setAppName(...)
// ÉèÖÃÐòÁл¯Æ÷ΪKryoSerializer¡£
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// ×¢²áÒªÐòÁл¯µÄ×Ô¶¨ÒåÀàÐÍ¡£
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

½á¹û

¾­¹ýÖÖÖÖµ÷ÊÔÓÅ»¯£¬ÎÒÃÇ×îÖÕÒª´ïµ½µÄÄ¿µÄÊÇ£¬Spark StreamingÄܹ»ÊµÊ±µÄÀ­È¡Kafkaµ±ÖеÄÊý¾Ý£¬²¢ÇÒÄܹ»±£³ÖÎȶ¨£¬ÈçÏÂͼËùʾ£º

µ±È»²»Í¬µÄÓ¦Óó¡¾°»áÓв»Í¬µÄͼÐΣ¬ÕâÊDZ¾ÎÄ´ÊÆµÍ³¼ÆÓÅ»¯Îȶ¨ºóµÄ¼à¿ØÍ¼£¬ÎÒÃÇ¿ÉÒÔ¿´µ½Processing TimeÕâÒ»ÖùÐÎͼÖÐÓÐÒ»StableµÄÐéÏߣ¬¶ø´ó¶àÊýBatch¶¼Äܹ»ÔÚÕâÒ»ÐéÏßÏ´¦ÀíÍê±Ï£¬ËµÃ÷ÕûÌåSpark StreamingÊÇÔËÐÐÎȶ¨µÄ¡£

   
4241 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ