ǰÑÔ
ÔÚWeTestÓßÇéÏîÄ¿ÖУ¬ÐèÒª¶ÔÿÌìǧÍò¼¶µÄÓÎÏ·ÆÀÂÛÐÅÏ¢½øÐÐ´ÊÆµÍ³¼Æ£¬ÔÚÉú²úÕßÒ»¶Ë£¬ÎÒÃǽ«Êý¾Ý°´ÕÕÿÌìµÄÀȡʱ¼ä´æÈëÁËKafkaµ±ÖУ¬¶øÔÚÏû·ÑÕßÒ»¶Ë£¬ÎÒÃÇÀûÓÃÁËspark streaming´ÓkafkaÖв»¶ÏÀÈ¡Êý¾Ý½øÐÐ´ÊÆµÍ³¼Æ¡£±¾ÎÄÊ×ÏȶÔspark streamingǶÈëkafkaµÄ·½Ê½½øÐйéÄÉ×ܽᣬ֮ºó¼òµ¥²ûÊöSpark streaming+kafkaÔÚÓßÇéÏîÄ¿ÖеÄÓ¦Óã¬×îºó½«×Ô¼ºÔÚSpark Streaming+kafkaµÄʵ¼ÊÓÅ»¯ÖеÄһЩ¾Ñé½øÐйéÄÉ×ܽᡣ£¨ÈçÓÐÈκÎç¢Â©»¶Ó²¹³äÀ´²È£¬ÎÒ»áµÚһʱ¼ä¸ÄÕý^v^£©
Spark streaming½ÓÊÕKafkaÊý¾Ý
ÓÃspark streamingÁ÷ʽ´¦ÀíkafkaÖеÄÊý¾Ý£¬µÚÒ»²½µ±È»ÊÇÏȰÑÊý¾Ý½ÓÊÕ¹ýÀ´£¬×ª»»Îªspark streamingÖеÄÊý¾Ý½á¹¹Dstream¡£½ÓÊÕÊý¾ÝµÄ·½Ê½ÓÐÁ½ÖÖ£º1.ÀûÓÃReceiver½ÓÊÕÊý¾Ý£¬2.Ö±½Ó´Ókafka¶ÁÈ¡Êý¾Ý¡£
»ùÓÚReceiverµÄ·½Ê½
ÕâÖÖ·½Ê½ÀûÓýÓÊÕÆ÷£¨Receiver£©À´½ÓÊÕkafkaÖеÄÊý¾Ý£¬Æä×î»ù±¾ÊÇʹÓÃKafka¸ß½×Óû§API½Ó¿Ú¡£¶ÔÓÚËùÓеĽÓÊÕÆ÷£¬´Ókafka½ÓÊÕÀ´µÄÊý¾Ý»á´æ´¢ÔÚsparkµÄexecutorÖУ¬Ö®ºóspark streamingÌá½»µÄjob»á´¦ÀíÕâЩÊý¾Ý¡£ÈçÏÂͼ£º

ÔÚʹÓÃʱ£¬ÎÒÃÇÐèÒªÌí¼ÓÏàÓ¦µÄÒÀÀµ°ü£º
<dependency><!-- Spark Streaming Kafka --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.3</version> </dependency> |
¶ø¶ÔÓÚScalaµÄ»ù±¾Ê¹Ó÷½Ê½ÈçÏ£º
import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) |
»¹Óм¸¸öÐèҪעÒâµÄµã£º
- ÔÚReceiverµÄ·½Ê½ÖУ¬SparkÖеÄpartitionºÍkafkaÖеÄpartition²¢²»ÊÇÏà¹ØµÄ£¬ËùÒÔÈç¹ûÎÒÃǼӴóÿ¸ötopicµÄpartitionÊýÁ¿£¬½ö½öÊÇÔö¼ÓÏß³ÌÀ´´¦ÀíÓɵ¥Ò»ReceiverÏû·ÑµÄÖ÷Ìâ¡£µ«ÊÇÕⲢûÓÐÔö¼ÓSparkÔÚ´¦ÀíÊý¾ÝÉϵIJ¢Ðжȡ£
- ¶ÔÓÚ²»Í¬µÄGroupºÍtopicÎÒÃÇ¿ÉÒÔʹÓöà¸öReceiver´´½¨²»Í¬µÄDstreamÀ´²¢ÐнÓÊÕÊý¾Ý£¬Ö®ºó¿ÉÒÔÀûÓÃunionÀ´Í³Ò»³ÉÒ»¸öDstream¡£
- Èç¹ûÎÒÃÇÆôÓÃÁËWrite Ahead Logs¸´ÖƵ½ÎļþϵͳÈçHDFS£¬ÄÇôstorage levelÐèÒªÉèÖÃ³É StorageLevel.MEMORY_AND_DISK_SER£¬Ò²¾ÍÊÇKafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
Ö±½Ó¶ÁÈ¡·½Ê½
ÔÚspark1.3Ö®ºó£¬ÒýÈëÁËDirect·½Ê½¡£²»Í¬ÓÚReceiverµÄ·½Ê½£¬Direct·½Ê½Ã»ÓÐreceiverÕâÒ»²ã£¬Æä»áÖÜÆÚÐԵĻñÈ¡KafkaÖÐÿ¸ötopicµÄÿ¸öpartitionÖеÄ×îÐÂoffsets£¬Ö®ºó¸ù¾ÝÉ趨µÄmaxRatePerPartitionÀ´´¦Àíÿ¸öbatch¡£ÆäÐÎʽÈçÏÂͼ£º

ÕâÖÖ·½·¨Ïà½ÏÓÚReceiver·½Ê½µÄÓÅÊÆÔÚÓÚ£º
- ¼ò»¯µÄ²¢ÐУºÔÚReceiverµÄ·½Ê½ÖÐÎÒÃÇÌáµ½´´½¨¶à¸öReceiverÖ®ºóÀûÓÃunionÀ´ºÏ²¢³ÉÒ»¸öDstreamµÄ·½Ê½Ìá¸ßÊý¾Ý´«Êä²¢Ðжȡ£¶øÔÚDirect·½Ê½ÖУ¬KafkaÖеÄpartitionÓëRDDÖеÄpartitionÊÇÒ»Ò»¶ÔÓ¦µÄ²¢ÐжÁÈ¡KafkaÊý¾Ý£¬ÕâÖÖÓ³Éä¹ØÏµÒ²¸üÀûÓÚÀí½âºÍÓÅ»¯¡£
- ¸ßЧ£ºÔÚReceiverµÄ·½Ê½ÖУ¬ÎªÁË´ïµ½0Êý¾Ý¶ªÊ§ÐèÒª½«Êý¾Ý´æÈëWrite Ahead LogÖУ¬ÕâÑùÔÚKafkaºÍÈÕÖ¾Öоͱ£´æÁËÁ½·ÝÊý¾Ý£¬ÀË·Ñ£¡¶øµÚ¶þÖÖ·½Ê½²»´æÔÚÕâ¸öÎÊÌ⣬ֻҪÎÒÃÇKafkaµÄÊý¾Ý±£Áôʱ¼ä×ã¹»³¤£¬ÎÒÃǶ¼Äܹ»´ÓKafka½øÐÐÊý¾Ý»Ö¸´¡£
- ¾«È·Ò»´Î£ºÔÚReceiverµÄ·½Ê½ÖУ¬Ê¹ÓõÄÊÇKafkaµÄ¸ß½×API½Ó¿Ú´ÓZookeeperÖлñÈ¡offsetÖµ£¬ÕâÒ²ÊÇ´«Í³µÄ´ÓKafkaÖжÁÈ¡Êý¾ÝµÄ·½Ê½£¬µ«ÓÉÓÚSpark StreamingÏû·ÑµÄÊý¾ÝºÍZookeeperÖмǼµÄoffset²»Í¬²½£¬ÕâÖÖ·½Ê½Å¼¶û»áÔì³ÉÊý¾ÝÖØ¸´Ïû·Ñ¡£¶øµÚ¶þÖÖ·½Ê½£¬Ö±½ÓʹÓÃÁ˼òµ¥µÄµÍ½×Kafka API£¬OffsetsÔòÀûÓÃSpark StreamingµÄcheckpoints½øÐмǼ£¬Ïû³ýÁËÕâÖÖ²»Ò»ÖÂÐÔ¡£
ÒÔÉÏÖ÷ÒªÊǶԹٷ½Îĵµ[1]µÄÒ»¸ö¼òµ¥·Ò룬ÏêϸÄÚÈÝ´ó¼Ò¿ÉÒÔÖ±½Ó¿´Ï¹ٷ½ÎĵµÕâÀï²»ÔÙ׸Êö¡£
²»Í¬ÓÚReceiverµÄ·½Ê½£¬ÊÇ´ÓZookeeperÖжÁÈ¡offsetÖµ£¬ÄÇô×ÔÈ»zookeeper¾Í±£´æÁ˵±Ç°Ïû·ÑµÄoffsetÖµ£¬ÄÇôÈç¹ûÖØÐÂÆô¶¯¿ªÊ¼Ïû·Ñ¾Í»á½Ó×ÅÉÏÒ»´ÎoffsetÖµ¼ÌÐøÏû·Ñ¡£¶øÔÚDirectµÄ·½Ê½ÖУ¬ÎÒÃÇÊÇÖ±½Ó´ÓkafkaÀ´¶ÁÊý¾Ý£¬ÄÇôoffsetÐèÒª×Ô¼º¼Ç¼£¬¿ÉÒÔÀûÓÃcheckpoint¡¢Êý¾Ý¿â»òÎļþ¼Ç¼»òÕß»ØÐ´µ½zookeeperÖнøÐмǼ¡£ÕâÀïÎÒÃǸø³öÀûÓÃKafkaµ×²ãAPI½Ó¿Ú£¬½«offset¼°Ê±Í¬²½µ½zookeeperÖеÄͨÓÃÀ࣬ÎÒ½«Æä·ÅÔÚÁËgithubÉÏ£º
Spark streaming+Kafka https:// github.com/xlturing /MySpark/tree/ master/SparkStreamingKafka
ʾÀýÖÐKafkaManagerÊÇÒ»¸öͨÓÃÀ࣬¶øKafkaClusterÊÇkafkaÔ´ÂëÖеÄÒ»¸öÀ࣬ÓÉÓÚ°üÃûȨÏÞµÄÔÒòÎÒ°ÑËüµ¥¶ÀÌá³öÀ´£¬ComsumerMain¼òµ¥Õ¹Ê¾ÁËͨÓÃÀàµÄʹÓ÷½·¨£¬ÔÚÿ´Î´´½¨KafkaStreamʱ£¬¶¼»áÏÈ´ÓzookerÖв鿴ÉϴεÄÏû·Ñ¼Ç¼offsets£¬¶øÃ¿¸öbatch´¦ÀíÍê³Éºó£¬»áͬ²½offsetsµ½zookeeperÖС£
SparkÏòkafkaÖÐдÈëÊý¾Ý
ÉÏÎIJûÊöÁËSparkÈçºÎ´ÓKafkaÖÐÁ÷ʽµÄ¶ÁÈ¡Êý¾Ý£¬ÏÂÃæÎÒÕûÀíÏòKafkaÖÐдÊý¾Ý¡£Óë¶ÁÊý¾Ý²»Í¬£¬Spark²¢Ã»ÓÐÌṩͳһµÄ½Ó¿ÚÓÃÓÚдÈëKafka£¬ËùÒÔÎÒÃÇÐèҪʹÓõײãKafka½Ó¿Ú½øÐаü×°¡£
×îÖ±½ÓµÄ×ö·¨ÎÒÃÇ¿ÉÒÔÏëµ½ÈçÏÂÕâÖÖ·½Ê½£º
input.foreachRDD(rdd =>
// ²»ÄÜÔÚÕâÀï´´½¨KafkaProducer
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
)
) |
µ«ÊÇÕâÖÖ·½Ê½È±µãºÜÃ÷ÏÔ£¬¶ÔÓÚÿ¸öpartitionµÄÿÌõ¼Ç¼£¬ÎÒÃǶ¼ÐèÒª´´½¨KafkaProducer£¬È»ºóÀûÓÃproducer½øÐÐÊä³ö²Ù×÷£¬×¢ÒâÕâÀïÎÒÃDz¢²»Äܽ«KafkaProducerµÄн¨ÈÎÎñ·ÅÔÚforeachPartitionÍâ±ß£¬ÒòΪKafkaProducerÊDz»¿ÉÐòÁл¯µÄ£¨not serializable£©¡£ÏÔÈ»ÕâÖÖ×ö·¨ÊDz»Áé»îÇÒµÍЧµÄ£¬ÒòΪÿÌõ¼Ç¼¶¼ÐèÒª½¨Á¢Ò»´ÎÁ¬½Ó¡£ÈçºÎ½â¾öÄØ£¿
- Ê×ÏÈ£¬ÎÒÃÇÐèÒª½«KafkaProducerÀûÓÃlazy valµÄ·½Ê½½øÐаü×°ÈçÏ£º
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
} |
- Ö®ºóÎÒÃÇÀûÓù㲥±äÁ¿µÄÐÎʽ£¬½«KafkaProducer¹ã²¥µ½Ã¿Ò»¸öexecutor£¬ÈçÏ£º
// ¹ã²¥KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", Conf.brokers)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
} |
ÕâÑùÎÒÃǾÍÄÜÔÚÿ¸öexecutorÖÐÓä¿ìµÄ½«Êý¾ÝÊäÈëµ½kafkaµ±ÖУº
//Êä³öµ½kafka
segmentedStream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreach(record => {
kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
// do something else
})
}
}) |
Spark streaming+KafkaÓ¦ÓÃ
WeTestÓßÇé¼à¿Ø¶ÔÓÚÿÌìÅÀÈ¡µÄǧÍò¼¶ÓÎÏ·Íæ¼ÒÆÀÂÛÐÅÏ¢¶¼ÒªÊµÊ±µÄ½øÐÐ´ÊÆµÍ³¼Æ£¬¶ÔÓÚÅÀÈ¡µ½µÄÓÎÏ·Íæ¼ÒÆÀÂÛÊý¾Ý£¬ÎÒÃÇ»áÉú²úµ½KafkaÖУ¬¶øÁíÒ»¶ËµÄÏû·ÑÕßÎÒÃDzÉÓÃÁËSpark StreamingÀ´½øÐÐÁ÷ʽ´¦Àí£¬Ê×ÏÈÀûÓÃÉÏÎÄÎÒÃDzûÊöµÄDirect·½Ê½´ÓKafkaÀÈ¡batch£¬Ö®ºó¾¹ý·Ö´Ê¡¢Í³¼ÆµÈÏà¹Ø´¦Àí£¬»ØÐ´µ½DBÉÏ£¨ÖÁÓÚSparkÖÐDBµÄ»ØÐ´·½Ê½¿É²Î¿¼ÎÒ֮ǰ×ܽáµÄ²©ÎÄ£ºSpark²È¿Ó¼Ç¡ª¡ªÊý¾Ý¿â£¨Hbase+Mysql£©£©£¬Óɴ˸ßЧʵʱµÄÍê³ÉÿÌì´óÁ¿Êý¾ÝµÄ´ÊƵͳ¼ÆÈÎÎñ¡£
Spark streaming+Kafkaµ÷ÓÅ
Spark streaming+KafkaµÄʹÓÃÖУ¬µ±Êý¾ÝÁ¿½ÏС£¬ºÜ¶àʱºòĬÈÏÅäÖúÍʹÓñãÄܹ»Âú×ãÇé¿ö£¬µ«Êǵ±Êý¾ÝÁ¿´óµÄʱºò£¬¾ÍÐèÒª½øÐÐÒ»¶¨µÄµ÷ÕûºÍÓÅ»¯£¬¶øÕâÖÖµ÷ÕûºÍÓÅ»¯±¾ÉíÒ²ÊDz»Í¬µÄ³¡¾°ÐèÒª²»Í¬µÄÅäÖá£
ºÏÀíµÄÅú´¦Àíʱ¼ä£¨batchDuration£©
¼¸ºõËùÓеÄSpark Streamingµ÷ÓÅÎĵµ¶¼»áÌá¼°Åú´¦Àíʱ¼äµÄµ÷Õû£¬ÔÚStreamingContext³õʼ»¯µÄʱºò£¬ÓÐÒ»¸ö²ÎÊý±ãÊÇÅú´¦Àíʱ¼äµÄÉ趨¡£Èç¹ûÕâ¸öÖµÉèÖõĹý¶Ì£¬¼´¸öbatchDurationËù²úÉúµÄJob²¢²»ÄÜÔÚÕâÆÚ¼äÍê³É´¦Àí£¬ÄÇô¾Í»áÔì³ÉÊý¾Ý²»¶Ï¶Ñ»ý£¬×îÖÕµ¼ÖÂSpark Streaming·¢Éú×èÈû¡£¶øÇÒ£¬Ò»°ã¶ÔÓÚbatchDurationµÄÉèÖò»»áСÓÚ500ms£¬ÒòΪ¹ýС»áµ¼ÖÂSparkStreamingƵ·±µÄÌá½»×÷Òµ£¬¶ÔÕû¸östreamingÔì³É¶îÍâµÄ¸ºµ£¡£ÔÚÆ½Ê±µÄÓ¦ÓÃÖУ¬¸ù¾Ý²»Í¬µÄÓ¦Óó¡¾°ºÍÓ²¼þÅäÖã¬ÎÒÉèÔÚ1~10sÖ®¼ä£¬ÎÒÃÇ¿ÉÒÔ¸ù¾ÝSparkStreamingµÄ¿ÉÊÓ»¯¼à¿Ø½çÃæ£¬¹Û²ìTotal DelayÀ´½øÐÐbatchDurationµÄµ÷Õû£¬ÈçÏÂͼ£º

ºÏÀíµÄKafkaÀÈ¡Á¿£¨maxRatePerPartitionÖØÒª£©
¶ÔÓÚSpark StreamingÏû·ÑkafkaÖÐÊý¾ÝµÄÓ¦Óó¡¾°£¬Õâ¸öÅäÖÃÊǷdz£¹Ø¼üµÄ£¬ÅäÖòÎÊýΪ£ºspark.streaming.kafka.maxRatePerPartition¡£Õâ¸ö²ÎÊýĬÈÏÊÇûÓÐÉÏÏߵ쬼´kafkaµ±ÖÐÓжàÉÙÊý¾ÝËü¾Í»áÖ±½ÓÈ«²¿À³ö¡£¶ø¸ù¾ÝÉú²úÕßдÈëKafkaµÄËÙÂÊÒÔ¼°Ïû·ÑÕß±¾Éí´¦ÀíÊý¾ÝµÄËÙ¶È£¬Í¬Ê±Õâ¸ö²ÎÊýÐèÒª½áºÏÉÏÃæµÄbatchDuration£¬Ê¹µÃÿ¸öpartitionÀÈ¡ÔÚÿ¸öbatchDurationÆÚ¼äÀÈ¡µÄÊý¾ÝÄܹ»Ë³ÀûµÄ´¦ÀíÍê±Ï£¬×öµ½¾¡¿ÉÄܸߵÄÍÌÍÂÁ¿£¬¶øÕâ¸ö²ÎÊýµÄµ÷Õû¿ÉÒԲο¼¿ÉÊÓ»¯¼à¿Ø½çÃæÖеÄInput RateºÍProcessing Time£¬ÈçÏÂͼ£º


»º´æ·´¸´Ê¹ÓõÄDstream£¨RDD£©
SparkÖеÄRDDºÍSparkStreamingÖеÄDstream£¬Èç¹û±»·´¸´µÄʹÓã¬×îºÃÀûÓÃcache()£¬½«¸ÃÊý¾ÝÁ÷»º´æÆðÀ´£¬·ÀÖ¹¹ý¶ÈµÄµ÷¶È×ÊÔ´Ôì³ÉµÄÍøÂ翪Ïú¡£¿ÉÒԲο¼¹Û²ìScheduling Delay²ÎÊý£¬ÈçÏÂͼ£º

ÉèÖúÏÀíµÄGC
³¤ÆÚʹÓÃJavaµÄС»ï°é¶¼ÖªµÀ£¬JVMÖеÄÀ¬»ø»ØÊÕ»úÖÆ£¬¿ÉÒÔÈÃÎÒÃDz»¹ý¶àµÄ¹Ø×¢ÓëÄÚ´æµÄ·ÖÅ仨ÊÕ£¬¸ü¼ÓרעÓÚÒµÎñÂß¼£¬JVM¶¼»áΪÎÒÃǸ㶨¡£¶ÔJVMÓÐЩÁ˽âµÄС»ï°éÓ¦¸ÃÖªµÀ£¬ÔÚJavaÐéÄâ»úÖУ¬½«ÄÚ´æ·ÖΪÁ˳õÉú´ú£¨eden generation£©¡¢ÄêÇá´ú£¨young generation£©¡¢ÀÏÄê´ú£¨old generation£©ÒÔ¼°ÓÀ¾Ã´ú£¨permanent generation£©£¬ÆäÖÐÿ´ÎGC¶¼ÊÇÐèÒªºÄ·ÑÒ»¶¨Ê±¼äµÄ£¬ÓÈÆäÊÇÀÏÄê´úµÄGC»ØÊÕ£¬ÐèÒª¶ÔÄÚ´æË鯬½øÐÐÕûÀí£¬Í¨³£²ÉÓñê¼Ç-Çå³þµÄ×ö·¨¡£Í¬ÑùµÄÔÚSpark³ÌÐòÖУ¬JVM GCµÄƵÂʺÍʱ¼äÒ²ÊÇÓ°ÏìÕû¸öSparkЧÂʵĹؼüÒòËØ¡£ÔÚͨ³£µÄʹÓÃÖн¨Ò飺
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" |
ÉèÖúÏÀíµÄCPU×ÊÔ´Êý
CPUµÄcoreÊýÁ¿£¬Ã¿¸öexecutor¿ÉÒÔÕ¼ÓÃÒ»¸ö»ò¶à¸öcore£¬¿ÉÒÔͨ¹ý¹Û²ìCPUµÄʹÓÃÂʱ仯À´ÁË½â¼ÆËã×ÊÔ´µÄʹÓÃÇé¿ö£¬ÀýÈ磬ºÜ³£¼ûµÄÒ»ÖÖÀË·ÑÊÇÒ»¸öexecutorÕ¼ÓÃÁ˶à¸öcore£¬µ«ÊÇ×ܵÄCPUʹÓÃÂÊÈ´²»¸ß£¨ÒòΪһ¸öexecutor²¢²»×ÜÄܳä·ÖÀûÓöàºËµÄÄÜÁ¦£©£¬Õâ¸öʱºò¿ÉÒÔ¿¼ÂÇÈÃô¸öexecutorÕ¼ÓøüÉÙµÄcore£¬Í¬Ê±workerÏÂÃæÔö¼Ó¸ü¶àµÄexecutor£¬»òÕßһ̨hostÉÏÃæÔö¼Ó¸ü¶àµÄworkerÀ´Ôö¼Ó²¢ÐÐÖ´ÐеÄexecutorµÄÊýÁ¿£¬´Ó¶øÔö¼ÓCPUÀûÓÃÂÊ¡£µ«ÊÇÔö¼ÓexecutorµÄʱºòÐèÒª¿¼ÂǺÃÄÚ´æÏûºÄ£¬ÒòΪһ̨»úÆ÷µÄÄÚ´æ·ÖÅä¸øÔ½¶àµÄexecutor£¬Ã¿¸öexecutorµÄÄÚ´æ¾ÍԽС£¬ÒÔÖ³öÏÖ¹ý¶àµÄÊý¾Ýspill overÉõÖÁout of memoryµÄÇé¿ö¡£
ÉèÖúÏÀíµÄparallelism
partitionºÍparallelism£¬partitionÖ¸µÄ¾ÍÊÇÊý¾Ý·ÖƬµÄÊýÁ¿£¬Ã¿Ò»´ÎtaskÖ»ÄÜ´¦ÀíÒ»¸öpartitionµÄÊý¾Ý£¬Õâ¸öֵ̫СÁ˻ᵼÖÂÿƬÊý¾ÝÁ¿Ì«´ó£¬µ¼ÖÂÄÚ´æÑ¹Á¦£¬»òÕßÖî¶àexecutorµÄ¼ÆËãÄÜÁ¦ÎÞ·¨ÀûÓóä·Ö£»µ«ÊÇÈç¹ûÌ«´óÁËÔò»áµ¼ÖÂ·ÖÆ¬Ì«¶à£¬Ö´ÐÐЧÂʽµµÍ¡£ÔÚÖ´ÐÐactionÀàÐͲÙ×÷µÄʱºò£¨±ÈÈç¸÷ÖÖreduce²Ù×÷£©£¬partitionµÄÊýÁ¿»áÑ¡Ôñparent RDDÖÐ×î´óµÄÄÇÒ»¸ö¡£¶øparallelismÔòÖ¸µÄÊÇÔÚRDD½øÐÐreduceÀà²Ù×÷µÄʱºò£¬Ä¬ÈÏ·µ»ØÊý¾ÝµÄparititionÊýÁ¿£¨¶øÔÚ½øÐÐmapÀà²Ù×÷µÄʱºò£¬partitionÊýÁ¿Í¨³£È¡×Ôparent RDDÖнϴóµÄÒ»¸ö£¬¶øÇÒÒ²²»»áÉæ¼°shuffle£¬Òò´ËÕâ¸öparallelismµÄ²ÎÊýûÓÐÓ°Ï죩¡£ËùÒÔ˵£¬ÕâÁ½¸ö¸ÅÄîÃÜÇÐÏà¹Ø£¬¶¼ÊÇÉæ¼°µ½Êý¾Ý·ÖƬµÄ£¬×÷Ó÷½Ê½ÆäʵÊÇͳһµÄ¡£Í¨¹ýspark.default.parallelism¿ÉÒÔÉèÖÃĬÈ쵀ᅮ¬ÊýÁ¿£¬¶øºÜ¶àRDDµÄ²Ù×÷¶¼¿ÉÒÔÖ¸¶¨Ò»¸öpartition²ÎÊýÀ´ÏÔʽ¿ØÖƾßÌåµÄ·ÖƬÊýÁ¿¡£
ÔÚSparkStreaming+kafkaµÄʹÓÃÖУ¬ÎÒÃDzÉÓÃÁËDirectÁ¬½Ó·½Ê½£¬Ç°ÎIJûÊö¹ýSparkÖеÄpartitionºÍKafkaÖеÄPartitionÊÇÒ»Ò»¶ÔÓ¦µÄ£¬ÎÒÃÇÒ»°ãĬÈÏÉèÖÃΪKafkaÖÐPartitionµÄÊýÁ¿¡£
ʹÓøßÐÔÄܵÄËã×Ó
ÕâÀï²Î¿¼ÁËÃÀÍż¼ÊõÍŶӵIJ©ÎÄ£¬²¢Ã»ÓÐ×ö¹ý¾ßÌåµÄÐÔÄܲâÊÔ£¬Æä½¨ÒéÈçÏ£º
- ʹÓÃreduceByKey/aggregateByKeyÌæ´úgroupByKey
- ʹÓÃmapPartitionsÌæ´úÆÕͨmap
- ʹÓÃforeachPartitionsÌæ´úforeach
- ʹÓÃfilterÖ®ºó½øÐÐcoalesce²Ù×÷
- ʹÓÃrepartitionAndSortWithinPartitionsÌæ´úrepartitionÓësortÀà²Ù×÷
ʹÓÃKryoÓÅ»¯ÐòÁл¯ÐÔÄÜ
Õâ¸öÓÅ»¯ÔÔòÎÒ±¾ÉíҲûÓо¹ý²âÊÔ£¬µ«ÊǺöàÓÅ»¯ÎĵµÓÐÌáµ½£¬ÕâÀïÒ²¼Ç¼ÏÂÀ´¡£
ÔÚSparkÖУ¬Ö÷ÒªÓÐÈý¸öµØ·½Éæ¼°µ½ÁËÐòÁл¯£º
- ÔÚËã×Óº¯ÊýÖÐʹÓõ½Íⲿ±äÁ¿Ê±£¬¸Ã±äÁ¿»á±»ÐòÁл¯ºó½øÐÐÍøÂç´«Ê䣨¼û¡°ÔÔòÆß£º¹ã²¥´ó±äÁ¿¡±ÖеĽ²½â£©¡£
- ½«×Ô¶¨ÒåµÄÀàÐÍ×÷ΪRDDµÄ·ºÐÍÀàÐÍʱ£¨±ÈÈçJavaRDD£¬StudentÊÇ×Ô¶¨ÒåÀàÐÍ£©£¬ËùÓÐ×Ô¶¨ÒåÀàÐͶÔÏ󣬶¼»á½øÐÐÐòÁл¯¡£Òò´ËÕâÖÖÇé¿öÏ£¬Ò²ÒªÇó×Ô¶¨ÒåµÄÀà±ØÐëʵÏÖSerializable½Ó¿Ú¡£
- ʹÓÿÉÐòÁл¯µÄ³Ö¾Ã»¯²ßÂÔʱ£¨±ÈÈçMEMORY_ONLY_SER£©£¬Spark»á½«RDDÖеÄÿ¸öpartition¶¼ÐòÁл¯³ÉÒ»¸ö´óµÄ×Ö½ÚÊý×é¡£
¶ÔÓÚÕâÈýÖÖ³öÏÖÐòÁл¯µÄµØ·½£¬ÎÒÃǶ¼¿ÉÒÔͨ¹ýʹÓÃKryoÐòÁл¯Àà¿â£¬À´ÓÅ»¯ÐòÁл¯ºÍ·´ÐòÁл¯µÄÐÔÄÜ¡£SparkĬÈÏʹÓõÄÊÇJavaµÄÐòÁл¯»úÖÆ£¬Ò²¾ÍÊÇObjectOutputStream/ObjectInputStream APIÀ´½øÐÐÐòÁл¯ºÍ·´ÐòÁл¯¡£µ«ÊÇSparkͬʱ֧³ÖʹÓÃKryoÐòÁл¯¿â£¬KryoÐòÁл¯Àà¿âµÄÐÔÄܱÈJavaÐòÁл¯Àà¿âµÄÐÔÄÜÒª¸ßºÜ¶à¡£¹Ù·½½éÉÜ£¬KryoÐòÁл¯»úÖÆ±ÈJavaÐòÁл¯»úÖÆ£¬ÐÔÄܸß10±¶×óÓÒ¡£SparkÖ®ËùÒÔĬÈÏûÓÐʹÓÃKryo×÷ΪÐòÁл¯Àà¿â£¬ÊÇÒòΪKryoÒªÇó×îºÃҪע²áËùÓÐÐèÒª½øÐÐÐòÁл¯µÄ×Ô¶¨ÒåÀàÐÍ£¬Òò´Ë¶ÔÓÚ¿ª·¢ÕßÀ´Ëµ£¬ÕâÖÖ·½Ê½±È½ÏÂé·³¡£
ÒÔÏÂÊÇʹÓÃKryoµÄ´úÂëʾÀý£¬ÎÒÃÇÖ»ÒªÉèÖÃÐòÁл¯À࣬ÔÙ×¢²áÒªÐòÁл¯µÄ×Ô¶¨ÒåÀàÐͼ´¿É£¨±ÈÈçËã×Óº¯ÊýÖÐʹÓõ½µÄÍⲿ±äÁ¿ÀàÐÍ¡¢×÷ΪRDD·ºÐÍÀàÐ͵Ä×Ô¶¨ÒåÀàÐ͵ȣ©£º
// ´´½¨SparkConf¶ÔÏó¡£
val conf = new SparkConf().setMaster(...).setAppName(...)
// ÉèÖÃÐòÁл¯Æ÷ΪKryoSerializer¡£
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// ×¢²áÒªÐòÁл¯µÄ×Ô¶¨ÒåÀàÐÍ¡£
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) |
½á¹û
¾¹ýÖÖÖÖµ÷ÊÔÓÅ»¯£¬ÎÒÃÇ×îÖÕÒª´ïµ½µÄÄ¿µÄÊÇ£¬Spark StreamingÄܹ»ÊµÊ±µÄÀÈ¡Kafkaµ±ÖеÄÊý¾Ý£¬²¢ÇÒÄܹ»±£³ÖÎȶ¨£¬ÈçÏÂͼËùʾ£º

µ±È»²»Í¬µÄÓ¦Óó¡¾°»áÓв»Í¬µÄͼÐΣ¬ÕâÊDZ¾ÎÄ´ÊÆµÍ³¼ÆÓÅ»¯Îȶ¨ºóµÄ¼à¿ØÍ¼£¬ÎÒÃÇ¿ÉÒÔ¿´µ½Processing TimeÕâÒ»ÖùÐÎͼÖÐÓÐÒ»StableµÄÐéÏߣ¬¶ø´ó¶àÊýBatch¶¼Äܹ»ÔÚÕâÒ»ÐéÏßÏ´¦ÀíÍê±Ï£¬ËµÃ÷ÕûÌåSpark StreamingÊÇÔËÐÐÎȶ¨µÄ¡£ |