Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
KafkaµÄ¼Ü¹¹Ô­Àí£¬ÄãÕæµÄÀí½âÂð£¿
 
  2062  次浏览      29
 2019-12-16
 
±à¼­ÍƼö:
±¾ÎĽéÉÜKafka ¼Ü¹¹Ô­Àí,Kafka ¼Ü¹¹Í¼£¬Ïû·ÑÄ£ÐÍ£¬ÍøÂçÄ£ÐÍ£¬¸ß¿É¿¿·Ö²¼Ê½´æ´¢Ä£ÐÍ£¬¸±±¾»úÖÆµÈµÈ£¬Ï£Íû¶ÔÄúÓÐËù°ïÖú
±¾ÎÄÀ´×ÔÓÚ51CTO£¬ÓÉ»ðÁú¹ûÈí¼þDelores±à¼­¡¢ÍƼö¡£

Apache Kafka ×îÔçÊÇÓÉ LinkedIn ¿ªÔ´³öÀ´µÄ·Ö²¼Ê½ÏûϢϵͳ£¬ÏÖÔÚÊÇ Apache ÆìϵÄÒ»¸ö×ÓÏîÄ¿£¬²¢ÇÒÒѾ­³ÉΪ¿ªÔ´ÁìÓòÓ¦ÓÃ×î¹ã·ºµÄÏûϢϵͳ֮һ¡£

Kafka ÉçÇø·Ç³£»îÔ¾£¬´Ó 0.9 °æ±¾¿ªÊ¼£¬Kafka µÄ±êÓïÒѾ­´Ó¡°Ò»¸ö¸ßÍÌÍÂÁ¿£¬·Ö²¼Ê½µÄÏûϢϵͳ¡±¸ÄΪ"Ò»¸ö·Ö²¼Ê½Á÷ƽ̨"¡£

Kafka ºÍ´«Í³µÄÏûϢϵͳ²»Í¬ÔÚÓÚ£º

KafkaÊÇÒ»¸ö·Ö²¼Ê½ÏµÍ³£¬Ò×ÓÚÏòÍâÀ©Õ¹¡£

ËüͬʱΪ·¢²¼ºÍ¶©ÔÄÌṩ¸ßÍÌÍÂÁ¿¡£

ËüÖ§³Ö¶à¶©ÔÄÕߣ¬µ±Ê§°ÜʱÄÜ×Ô¶¯Æ½ºâÏû·ÑÕß¡£

ÏûÏ¢µÄ³Ö¾Ã»¯¡£

Kafka ºÍÆäËûÏûÏ¢¶ÓÁеĶԱȣº

ÈëÃÅʵÀý

Éú²úÕß

´úÂëÈçÏ£º
import java.util.Properties;
import org.apache.kafka.clients
.producer.KafkaProducer;
import org.apache.kafka.clients
.producer.ProducerRecord;
public class UserKafkaProducer
extends Thread
{
private final KafkaProducer
<Integer, String> producer;
private final String topic;
private final Properties
props = new Properties();
public UserKafkaProducer
(String topic)
{
props.put("metadata.broker.list",
"localhost:9092");
props.put("bootstrap.servers",
"master2:6667");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.
serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.
serialization.StringSerializer");
producer = new KafkaProducer
<Integer, String>(props);
this.topic = topic;
}
@Override
public void run() {
int messageNo = 1;
while (true)
{
String messageStr = new String
("Message_" + messageNo);
System.out.println
("Send:" + messageStr);
//·µ»ØµÄÊÇFuture<RecordMetadata>,
Òì²½·¢ËÍ
producer.send(new ProducerRecord
<Integer, String>(topic, messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

Ïû·ÑÕß

´úÂëÈçÏ£º

Properties props =
new Properties();
/* ¶¨Òåkakfa ·þÎñµÄµØÖ·£¬
²»ÐèÒª½«ËùÓÐbrokerÖ¸¶¨ÉÏ */
props.put("bootstrap.servers",
"localhost:9092");
/* ÖÆ¶¨consumer group */
props.put("group.id", "test");
/* ÊÇ·ñ×Ô¶¯È·ÈÏoffset */
props.put
("enable.auto.commit", "true");
/* ×Ô¶¯È·ÈÏoffsetµÄʱ¼ä¼ä¸ô */
props.put
("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms","30000");
/* keyµÄÐòÁл¯Àà */
props.put("key.deserializer",
"org.apache.kafka.common.
serialization.StringDeserializer");
/* valueµÄÐòÁл¯Àà */
props.put("value.deserializer",
"org.apache.kafka.common.
serialization.StringDeserializer");
/* ¶¨Òåconsumer */
KafkaConsumer<String, String>
consumer = new KafkaConsumer<>(props);
/* Ïû·ÑÕß¶©ÔĵÄtopic, ¿Éͬʱ¶©ÔĶà¸ö */
consumer.subscribe
(Arrays.asList("foo", "bar"));
/* ¶ÁÈ¡Êý¾Ý£¬¶ÁÈ¡³¬Ê±Ê±¼äΪ100ms */
while (true) {
ConsumerRecords<String, String>
records=consumer.poll(100);
for (ConsumerRecord<String,
String> record : records)
System.out.printf("offset = %d,
key = %s, value = %s",
record.offset(), record.key(),
record.value());
}

Kafka ¼Ü¹¹Ô­Àí

¶ÔÓÚ Kafka µÄ¼Ü¹¹Ô­Àí£¬ÎÒÃÇÏÈÌá³öÈçϼ¸¸öÎÊÌ⣺

Kafka µÄ topic ºÍ·ÖÇøÄÚ²¿ÊÇÈçºÎ´æ´¢µÄ£¬ÓÐÊ²Ã´ÌØµã£¿

Ó봫ͳµÄÏûϢϵͳÏà±È£¬Kafka µÄÏû·ÑÄ£ÐÍÓÐʲôÓŵã?

Kafka ÈçºÎʵÏÖ·Ö²¼Ê½µÄÊý¾Ý´æ´¢ÓëÊý¾Ý¶ÁÈ¡?

Kafka ¼Ü¹¹Í¼

Kafka Ãû´Ê½âÊÍ

ÔÚÒ»Ì× Kafka ¼Ü¹¹ÖÐÓжà¸ö Producer£¬¶à¸ö Broker£¬¶à¸ö Consumer£¬Ã¿¸ö Producer ¿ÉÒÔ¶ÔÓ¦¶à¸ö Topic£¬Ã¿¸ö Consumer Ö»ÄܶÔÓ¦Ò»¸ö Consumer Group¡£

Õû¸ö Kafka ¼Ü¹¹¶ÔÓ¦Ò»¸ö ZK ¼¯Èº£¬Í¨¹ý ZK ¹ÜÀí¼¯ÈºÅäÖã¬Ñ¡¾Ù Leader£¬ÒÔ¼°ÔÚ Consumer Group ·¢Éú±ä»¯Ê±½øÐÐ Rebalance¡£

Topic ºÍ Partition

ÔÚ Kafka ÖеÄÿһÌõÏûÏ¢¶¼ÓÐÒ»¸ö Topic¡£Ò»°ãÀ´ËµÔÚÎÒÃÇÓ¦ÓÃÖвúÉú²»Í¬ÀàÐ͵ÄÊý¾Ý£¬¶¼¿ÉÒÔÉèÖò»Í¬µÄÖ÷Ìâ¡£

Ò»¸öÖ÷ÌâÒ»°ã»áÓжà¸öÏûÏ¢µÄ¶©ÔÄÕߣ¬µ±Éú²úÕß·¢²¼ÏûÏ¢µ½Ä³¸öÖ÷Ìâʱ£¬¶©ÔÄÁËÕâ¸öÖ÷ÌâµÄÏû·ÑÕß¶¼¿ÉÒÔ½ÓÊÕµ½Éú²úÕßдÈëµÄÐÂÏûÏ¢¡£

Kafka Ϊÿ¸öÖ÷Ìâά»¤ÁË·Ö²¼Ê½µÄ·ÖÇø(Partition)ÈÕÖ¾Îļþ£¬Ã¿¸ö Partition ÔÚ Kafka ´æ´¢²ãÃæÊÇ Append Log¡£

Èκη¢²¼µ½´Ë Partition µÄÏûÏ¢¶¼»á±»×·¼Óµ½ Log ÎļþµÄβ²¿£¬ÔÚ·ÖÇøÖеÄÿÌõÏûÏ¢¶¼»á°´ÕÕʱ¼ä˳Ðò·ÖÅäµ½Ò»¸öµ¥µ÷µÝÔöµÄ˳Ðò±àºÅ£¬Ò²¾ÍÊÇÎÒÃÇµÄ Offset¡£Offset ÊÇÒ»¸ö Long Ð͵ÄÊý×Ö¡£

ÎÒÃÇͨ¹ýÕâ¸ö Offset ¿ÉÒÔÈ·¶¨Ò»ÌõÔڸà Partition ϵÄΨһÏûÏ¢¡£ÔÚ Partition ÏÂÃæÊDZ£Ö¤ÁËÓÐÐòÐÔ£¬µ«ÊÇÔÚ Topic ÏÂÃæÃ»Óб£Ö¤ÓÐÐòÐÔ¡£

ÔÚÉÏͼÖÐÎÒÃǵÄÉú²úÕß»á¾ö¶¨·¢Ë͵½Äĸö Partition£º

Èç¹ûûÓÐ Key ÖµÔò½øÐÐÂÖѯ·¢ËÍ¡£

Èç¹ûÓÐ Key Öµ£¬¶Ô Key Öµ½øÐÐ Hash£¬È»ºó¶Ô·ÖÇøÊýÁ¿È¡Ó࣬±£Ö¤ÁËͬһ¸ö Key ÖµµÄ»á±»Â·Óɵ½Í¬Ò»¸ö·ÖÇø£»Èç¹ûÏë¶ÓÁеÄǿ˳ÐòÒ»ÖÂÐÔ£¬¿ÉÒÔÈÃËùÓеÄÏûÏ¢¶¼ÉèÖÃΪͬһ¸ö Key¡£

Ïû·ÑÄ£ÐÍ

ÏûÏ¢ÓÉÉú²úÕß·¢Ë͵½ Kafka ¼¯Èººó£¬»á±»Ïû·ÑÕßÏû·Ñ¡£Ò»°ãÀ´ËµÎÒÃǵÄÏû·ÑÄ£ÐÍÓÐÁ½ÖÖ£º

ÍÆËÍÄ£ÐÍ(Push)

À­È¡Ä£ÐÍ(Pull)

»ùÓÚÍÆËÍÄ£Ð͵ÄÏûϢϵͳ£¬ÓÉÏûÏ¢´úÀí¼Ç¼Ïû·Ñ״̬¡£ÏûÏ¢´úÀí½«ÏûÏ¢ÍÆË͵½Ïû·ÑÕߺ󣬱ê¼ÇÕâÌõÏûϢΪÒѾ­±»Ïû·Ñ£¬µ«ÊÇÕâÖÖ·½Ê½ÎÞ·¨ºÜºÃµØ±£Ö¤Ïû·ÑµÄ´¦ÀíÓïÒå¡£

±ÈÈçµ±ÎÒÃÇÒѾ­°ÑÏûÏ¢·¢Ë͸øÏû·ÑÕßÖ®ºó£¬ÓÉÓÚÏû·Ñ½ø³Ì¹Òµô»òÕßÓÉÓÚÍøÂçÔ­ÒòûÓÐÊÕµ½ÕâÌõÏûÏ¢£¬Èç¹ûÎÒÃÇÔÚÏû·Ñ´úÀí½«Æä±ê¼ÇΪÒÑÏû·Ñ£¬Õâ¸öÏûÏ¢¾Í***¶ªÊ§ÁË¡£

Èç¹ûÎÒÃÇÀûÓÃÉú²úÕßÊÕµ½ÏûÏ¢ºó»Ø¸´ÕâÖÖ·½·¨£¬ÏûÏ¢´úÀíÐèÒª¼Ç¼Ïû·Ñ״̬£¬ÕâÖÖ²»¿ÉÈ¡¡£

Èç¹û²ÉÓà Push£¬ÏûÏ¢Ïû·ÑµÄËÙÂʾÍÍêÈ«ÓÉÏû·Ñ´úÀí¿ØÖÆ£¬Ò»µ©Ïû·ÑÕß·¢Éú×èÈû£¬¾Í»á³öÏÖÎÊÌâ¡£

Kafka ²ÉÈ¡À­È¡Ä£ÐÍ(Poll)£¬ÓÉ×Ô¼º¿ØÖÆÏû·ÑËÙ¶È£¬ÒÔ¼°Ïû·ÑµÄ½ø¶È£¬Ïû·ÑÕß¿ÉÒÔ°´ÕÕÈÎÒâµÄÆ«ÒÆÁ¿½øÐÐÏû·Ñ¡£

±ÈÈçÏû·ÑÕß¿ÉÒÔÏû·ÑÒѾ­Ïû·Ñ¹ýµÄÏûÏ¢½øÐÐÖØÐ´¦Àí£¬»òÕßÏû·Ñ×î½üµÄÏûÏ¢µÈµÈ¡£

ÍøÂçÄ£ÐÍ

Kafka Client£ºµ¥Ïß³Ì Selector

µ¥Ïß³ÌģʽÊÊÓÃÓÚ²¢·¢Á´½ÓÊýС£¬Âß¼­¼òµ¥£¬Êý¾ÝÁ¿Ð¡µÄÇé¿ö¡£ÔÚ Kafka ÖУ¬Consumer ºÍ Producer ¶¼ÊÇʹÓõÄÉÏÃæµÄµ¥Ïß³Ìģʽ¡£

ÕâÖÖģʽ²»ÊÊºÏ Kafka µÄ·þÎñ¶Ë£¬ÔÚ·þÎñ¶ËÖÐÇëÇó´¦Àí¹ý³Ì±È½Ï¸´ÔÓ£¬»áÔì³ÉÏß³Ì×èÈû£¬Ò»µ©³öÏÖºóÐøÇëÇó¾Í»áÎÞ·¨´¦Àí£¬»áÔì³É´óÁ¿ÇëÇó³¬Ê±£¬ÒýÆðÑ©±À¡£¶øÔÚ·þÎñÆ÷ÖÐÓ¦¸Ã³ä·ÖÀûÓöàÏß³ÌÀ´´¦ÀíÖ´ÐÐÂß¼­¡£

Kafka Server£º¶àÏß³Ì Selector

ÔÚ Kafka ·þÎñ¶Ë²ÉÓõÄÊǶàÏß³ÌµÄ Selector Ä£ÐÍ£¬Acceptor ÔËÐÐÔÚÒ»¸öµ¥¶ÀµÄÏß³ÌÖУ¬¶ÔÓÚ¶ÁÈ¡²Ù×÷µÄÏ̳߳ØÖеÄÏ̶߳¼»áÔÚ Selector ×¢²á Read ʼþ£¬¸ºÔð·þÎñ¶Ë¶ÁÈ¡ÇëÇóµÄÂß¼­¡£

³É¹¦¶ÁÈ¡ºó£¬½«ÇëÇó·ÅÈë Message Queue¹²Ïí¶ÓÁÐÖС£È»ºóÔÚдÏ̳߳ØÖУ¬È¡³öÕâ¸öÇëÇó£¬¶ÔÆä½øÐÐÂß¼­´¦Àí¡£

ÕâÑù£¬¼´Ê¹Ä³¸öÇëÇóÏß³Ì×èÈûÁË£¬»¹ÓкóÐøµÄÏ̴߳ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÇëÇó²¢½øÐд¦Àí£¬ÔÚдÏß³ÌÖд¦ÀíÍêÂß¼­´¦Àí£¬ÓÉÓÚ×¢²áÁË OP_WIRTE ʼþ£¬ËùÒÔ»¹ÐèÒª¶ÔÆä·¢ËÍÏìÓ¦¡£

¸ß¿É¿¿·Ö²¼Ê½´æ´¢Ä£ÐÍ

ÔÚ Kafka Öб£Ö¤¸ß¿É¿¿Ä£ÐÍÒÀ¿¿µÄÊǸ±±¾»úÖÆ£¬ÓÐÁ˸±±¾»úÖÆÖ®ºó£¬¾ÍËã»úÆ÷å´»úÒ²²»»á·¢ÉúÊý¾Ý¶ªÊ§¡£

¸ßÐÔÄܵÄÈÕÖ¾´æ´¢

Kafka Ò»¸ö Topic ÏÂÃæµÄËùÓÐÏûÏ¢¶¼ÊÇÒÔ Partition µÄ·½Ê½·Ö²¼Ê½µÄ´æ´¢ÔÚ¶à¸ö½ÚµãÉÏ¡£

ͬʱÔÚ Kafka µÄ»úÆ÷ÉÏ£¬Ã¿¸ö Partition Æäʵ¶¼»á¶ÔÓ¦Ò»¸öÈÕ־Ŀ¼£¬ÔÚĿ¼ÏÂÃæ»á¶ÔÓ¦¶à¸öÈÕÖ¾·Ö¶Î(LogSegment)¡£

LogSegment ÎļþÓÉÁ½²¿·Ö×é³É£¬·Ö±ðΪ¡°.index¡±ÎļþºÍ¡°.log¡±Îļþ£¬·Ö±ð±íʾΪ Segment Ë÷ÒýÎļþºÍÊý¾ÝÎļþ¡£

ÕâÁ½¸öÎļþµÄÃüÁî¹æÔòΪ£ºPartition È«¾ÖµÄ***¸ö Segment ´Ó 0 ¿ªÊ¼£¬ºóÐøÃ¿¸ö Segment ÎļþÃûΪÉÏÒ»¸ö Segment Îļþ***Ò»ÌõÏûÏ¢µÄ Offset Öµ£¬ÊýÖµ´óСΪ 64 룬20 λÊý×Ö×Ö·û³¤¶È£¬Ã»ÓÐÊý×ÖÓà 0 Ìî³ä¡£

ÈçÏ£¬¼ÙÉèÓÐ 1000 ÌõÏûÏ¢£¬Ã¿¸ö LogSegment ´óСΪ 100£¬ÏÂÃæÕ¹ÏÖÁË 900-1000 µÄË÷ÒýºÍ Log£º

ÓÉÓÚ Kafka ÏûÏ¢Êý¾ÝÌ«´ó£¬Èç¹ûÈ«²¿½¨Á¢Ë÷Òý£¬¼ÈÕ¼Á˿ռäÓÖÔö¼ÓÁ˺Äʱ£¬ËùÒÔ Kafka Ñ¡ÔñÁËÏ¡ÊèË÷ÒýµÄ·½Ê½£¬ÕâÑùË÷Òý¿ÉÒÔÖ±½Ó½øÈëÄڴ棬¼Ó¿ìÆ«²éѯËÙ¶È¡£

¼òµ¥½éÉÜÒ»ÏÂÈçºÎ¶ÁÈ¡Êý¾Ý£¬Èç¹ûÎÒÃÇÒª¶ÁÈ¡µÚ 911 ÌõÊý¾ÝÊ×ÏÈ***²½£¬ÕÒµ½ËüÊÇÊôÓÚÄÄÒ»¶ÎµÄ¡£

¸ù¾Ý¶þ·Ö·¨²éÕÒµ½ËüÊôÓÚµÄÎļþ£¬ÕÒµ½ 0000900.index ºÍ 00000900.log Ö®ºó£¬È»ºóÈ¥ index ÖÐÈ¥²éÕÒ (911-900) = 11 Õâ¸öË÷Òý»òÕßСÓÚ 11 ×î½üµÄË÷Òý¡£

ÔÚÕâÀïͨ¹ý¶þ·Ö·¨ÎÒÃÇÕÒµ½ÁËË÷ÒýÊÇ [10,1367]£¬È»ºóÎÒÃÇͨ¹ýÕâÌõË÷ÒýµÄÎïÀíλÖà 1367£¬¿ªÊ¼ÍùºóÕÒ£¬Ö±µ½ÕÒµ½ 911 ÌõÊý¾Ý¡£

ÉÏÃæ½²µÄÊÇÈç¹ûÒªÕÒij¸ö Offset µÄÁ÷³Ì£¬µ«ÊÇÎÒÃÇ´ó¶àÊýʱºò²¢²»ÐèÒª²éÕÒij¸ö Offset£¬Ö»ÐèÒª°´ÕÕ˳Ðò¶Á¼´¿É¡£

¶øÔÚ˳Ðò¶ÁÖУ¬²Ù×÷ϵͳ»áÔÚÄÚ´æºÍ´ÅÅÌÖ®¼äÌí¼Ó Page Cache£¬Ò²¾ÍÊÇÎÒÃÇÆ½³£¼ûµ½µÄÔ¤¶Á²Ù×÷£¬ËùÒÔÎÒÃǵÄ˳Ðò¶Á²Ù×÷ʱËٶȺܿ졣

µ«ÊÇ Kafka ÓиöÎÊÌ⣬Èç¹û·ÖÇø¹ý¶à£¬ÄÇôÈÕÖ¾·Ö¶ÎÒ²»áºÜ¶à£¬Ð´µÄʱºòÓÉÓÚÊÇÅúÁ¿Ð´£¬Æäʵ¾Í»á±ä³ÉËæ»úдÁË£¬Ëæ»ú I/O Õâ¸öʱºò¶ÔÐÔÄÜÓ°ÏìºÜ´ó¡£ËùÒÔÒ»°ãÀ´Ëµ Kafka ²»ÄÜÓÐÌ«¶àµÄ Partition¡£

Õë¶ÔÕâÒ»µã£¬RocketMQ °ÑËùÓеÄÈÕÖ¾¶¼Ð´ÔÚÒ»¸öÎļþÀïÃæ£¬¾ÍÄܱä³É˳Ðòд£¬Í¨¹ýÒ»¶¨ÓÅ»¯£¬¶ÁÒ²ÄܽӽüÓÚ˳Ðò¶Á¡£

´ó¼Ò¿ÉÒÔ˼¿¼Ò»Ï£º

ΪʲôÐèÒª·ÖÇø£¬Ò²¾ÍÊÇ˵Ö÷ÌâÖ»ÓÐÒ»¸ö·ÖÇø£¬ÄѵÀ²»ÐÐÂð£¿

ÈÕ־ΪʲôÐèÒª·Ö¶Î£¿

¸±±¾»úÖÆ

Kafka µÄ¸±±¾»úÖÆÊǶà¸ö·þÎñ¶Ë½Úµã¶ÔÆäËû½ÚµãµÄÖ÷Ìâ·ÖÇøµÄÈÕÖ¾½øÐи´ÖÆ¡£

µ±¼¯ÈºÖеÄij¸ö½Úµã³öÏÖ¹ÊÕÏ£¬·ÃÎʹÊÕϽڵãµÄÇëÇó»á±»×ªÒƵ½ÆäËûÕý³£½Úµã(ÕâÒ»¹ý³Ìͨ³£½Ð Reblance)¡£

Kafka ÿ¸öÖ÷ÌâµÄÿ¸ö·ÖÇø¶¼ÓÐÒ»¸öÖ÷¸±±¾ÒÔ¼° 0 ¸ö»òÕß¶à¸ö¸±±¾£¬¸±±¾±£³ÖºÍÖ÷¸±±¾µÄÊý¾Ýͬ²½£¬µ±Ö÷¸±±¾³ö¹ÊÕÏʱ¾Í»á±»Ìæ´ú¡£

ÔÚ Kafka Öв¢²»ÊÇËùÓеĸ±±¾¶¼Äܱ»ÄÃÀ´Ìæ´úÖ÷¸±±¾£¬ËùÒÔÔÚ Kafka µÄ Leader ½ÚµãÖÐά»¤×ÅÒ»¸ö ISR(In Sync Replicas)¼¯ºÏ¡£

·­Òë¹ýÀ´Ò²½ÐÕýÔÚͬ²½Öм¯ºÏ£¬ÔÚÕâ¸ö¼¯ºÏÖеÄÐèÒªÂú×ãÁ½¸öÌõ¼þ£º

½Úµã±ØÐëºÍ ZK ±£³ÖÁ¬½Ó¡£

ÔÚͬ²½µÄ¹ý³ÌÖÐÕâ¸ö¸±±¾²»ÄÜÂäºóÖ÷¸±±¾Ì«¶à¡£

ÁíÍ⻹Óиö AR(Assigned Replicas)ÓÃÀ´±êʶ¸±±¾µÄÈ«¼¯£¬OSR ÓÃÀ´±íʾÓÉÓÚÂäºó±»ÌÞ³ýµÄ¸±±¾¼¯ºÏ¡£

ËùÒÔ¹«Ê½ÈçÏ£ºISR = Leader + ûÓÐÂäºóÌ«¶àµÄ¸±±¾£»AR = OSR+ ISR¡£

ÕâÀïÏÈҪ˵ÏÂÁ½¸öÃû´Ê£ºHW(¸ßˮλ)ÊÇ Consumer Äܹ»¿´µ½µÄ´Ë Partition µÄλÖã¬LEO ÊÇÿ¸ö Partition µÄ Log ***Ò»Ìõ Message µÄλÖá£

HW Äܱ£Ö¤ Leader ËùÔÚµÄ Broker ʧЧ£¬¸ÃÏûÏ¢ÈÔÈ»¿ÉÒÔ´ÓÐÂÑ¡¾ÙµÄ Leader ÖлñÈ¡£¬²»»áÔì³ÉÏûÏ¢¶ªÊ§¡£

µ± Producer Ïò Leader ·¢ËÍÊý¾Ýʱ£¬¿ÉÒÔͨ¹ý request.required.acks ²ÎÊýÀ´ÉèÖÃÊý¾Ý¿É¿¿ÐԵļ¶±ð£º

1£¨Ä¬ÈÏ£©£ºÕâÒâζ×Å Producer ÔÚ ISR ÖÐµÄ Leader Òѳɹ¦ÊÕµ½µÄÊý¾Ý²¢µÃµ½È·ÈϺó·¢ËÍÏÂÒ»Ìõ Message¡£Èç¹û Leader å´»úÁË£¬Ôò»á¶ªÊ§Êý¾Ý¡£

0£ºÕâÒâζ×Å Producer ÎÞÐèµÈ´ýÀ´×Ô Broker µÄÈ·È϶ø¼ÌÐø·¢ËÍÏÂÒ»ÅúÏûÏ¢¡£ÕâÖÖÇé¿öÏÂÊý¾Ý´«ÊäЧÂÊ***£¬µ«ÊÇÊý¾Ý¿É¿¿ÐÔÈ´ÊÇ***µÄ¡£

-1£ºProducer ÐèÒªµÈ´ý ISR ÖеÄËùÓÐ Follower ¶¼È·ÈϽÓÊÕµ½Êý¾Ýºó²ÅËãÒ»´Î·¢ËÍÍê³É£¬¿É¿¿ÐÔ***¡£ µ«ÊÇÕâÑùÒ²²»Äܱ£Ö¤Êý¾Ý²»¶ªÊ§£¬±ÈÈçµ± ISR ÖÐÖ»ÓÐ Leader ʱ(ÆäËû½Úµã¶¼ºÍ ZK ¶Ï¿ªÁ¬½Ó£¬»òÕß¶¼Ã»×·ÉÏ)£¬ÕâÑù¾Í±ä³ÉÁË acks = 1 µÄÇé¿ö¡£

¸ß¿ÉÓÃÄ£Ðͼ°ÃݵÈ

ÔÚ·Ö²¼Ê½ÏµÍ³ÖÐÒ»°ãÓÐÈýÖÖ´¦ÀíÓïÒ壺

at-least-once

ÖÁÉÙÒ»´Î£¬ÓпÉÄÜ»áÓжà´Î¡£Èç¹û Producer ÊÕµ½À´×Ô Ack µÄÈ·ÈÏ£¬Ôò±íʾ¸ÃÏûÏ¢ÒѾ­Ð´Èëµ½ Kafka ÁË£¬´Ëʱ¸ÕºÃÊÇÒ»´Î£¬Ò²¾ÍÊÇÎÒÃǺóÃæµÄ Exactly-once¡£

µ«ÊÇÈç¹û Producer ³¬Ê±»òÊÕµ½´íÎ󣬲¢ÇÒ request.required.acks ÅäÖõIJ»ÊÇ -1£¬Ôò»áÖØÊÔ·¢ËÍÏûÏ¢£¬¿Í»§¶Ë»áÈÏΪ¸ÃÏûϢδдÈë Kafka¡£

Èç¹û Broker ÔÚ·¢ËÍ Ack ֮ǰʧ°Ü£¬µ«ÔÚÏûÏ¢³É¹¦Ð´Èë Kafka Ö®ºó£¬ÕâÒ»´ÎÖØÊÔ½«»áµ¼ÖÂÎÒÃǵÄÏûÏ¢»á±»Ð´ÈëÁ½´Î¡£

ËùÒÔÏûÏ¢¾Í²»Ö¹Ò»´ÎµØ´«µÝ¸ø×îÖÕ Consumer£¬Èç¹û Consumer ´¦ÀíÂß¼­Ã»Óб£Ö¤Ãݵȵϰ¾Í»áµÃµ½²»ÕýÈ·µÄ½á¹û¡£

ÔÚÕâÖÖÓïÒåÖлá³öÏÖÂÒÐò£¬Ò²¾ÍÊǵ±***´Î Ack ʧ°Ü×¼±¸ÖØÊÔµÄʱºò£¬µ«ÊǵڶþÏûÏ¢ÒѾ­·¢Ë͹ýÈ¥ÁË£¬Õâ¸öʱºò»á³öÏÖµ¥·ÖÇøÖÐÂÒÐòµÄÏÖÏó¡£

ÎÒÃÇÐèÒªÉèÖà Prouducer µÄ²ÎÊý max.in.flight.requests.per.connection£¬flight.requests ÊÇ Producer ¶ËÓÃÀ´±£´æ·¢ËÍÇëÇóÇÒûÓÐÏìÓ¦µÄ¶ÓÁУ¬±£Ö¤ Produce r¶ËδÏìÓ¦µÄÇëÇó¸öÊýΪ 1¡£

at-most-once

Èç¹ûÔÚ Ack ³¬Ê±»ò·µ»Ø´íÎóʱ Producer ²»ÖØÊÔ£¬Ò²¾ÍÊÇÎÒÃǽ² request.required.acks = -1£¬Ôò¸ÃÏûÏ¢¿ÉÄÜ×îÖÕûÓÐдÈë Kafka£¬ËùÒÔ Consumer ²»»á½ÓÊÕÏûÏ¢¡£

exactly-once

¸ÕºÃÒ»´Î£¬¼´Ê¹ Producer ÖØÊÔ·¢ËÍÏûÏ¢£¬ÏûÏ¢Ò²»á±£Ö¤×î¶àÒ»´ÎµØ´«µÝ¸ø Consumer¡£¸ÃÓïÒåÊÇ×îÀíÏëµÄ£¬Ò²ÊÇ×îÄÑʵÏֵġ£

ÔÚ 0.10 ֮ǰ²¢²»Äܱ£Ö¤ exactly-once£¬ÐèҪʹÓà Consumer ×Ô´øµÄÃݵÈÐÔ±£Ö¤¡£0.11.0 ʹÓÃÊÂÎñ±£Ö¤ÁË¡£

ÈçºÎʵÏÖ exactly-once

ҪʵÏÖ exactly-once ÔÚ Kafka 0.11.0 ÖÐÓÐÁ½¸ö¹Ù·½²ßÂÔ£º

µ¥ Producer µ¥ Topic

ÿ¸ö Producer ÔÚ³õʼ»¯µÄʱºò¶¼»á±»·ÖÅäÒ»¸öΨһµÄ PID£¬¶ÔÓÚÿ¸öΨһµÄ PID£¬Producer ÏòÖ¸¶¨µÄ Topic ÖÐij¸öÌØ¶¨µÄ Partition ·¢Ë͵ÄÏûÏ¢¶¼»áЯ´øÒ»¸ö´Ó 0 µ¥µ÷µÝÔöµÄ Sequence Number¡£

ÔÚÎÒÃÇµÄ Broker ¶ËÒ²»áά»¤Ò»¸öά¶ÈΪ£¬Ã¿´ÎÌá½»Ò»´ÎÏûÏ¢µÄʱºò¶¼»á¶ÔÆë½øÐÐУÑ飺

Èç¹ûÏûÏ¢ÐòºÅ±È Broker ά»¤µÄÐòºÅ´óÒ»ÒÔÉÏ£¬ËµÃ÷ÖмäÓÐÊý¾ÝÉÐδдÈ룬Ҳ¼´ÂÒÐò£¬´Ëʱ Broker ¾Ü¾ø¸ÃÏûÏ¢£¬Producer Å׳ö InvalidSequenceNumber¡£

Èç¹ûÏûÏ¢ÐòºÅСÓÚµÈÓÚ Broker ά»¤µÄÐòºÅ£¬ËµÃ÷¸ÃÏûÏ¢Òѱ»±£´æ£¬¼´ÎªÖظ´ÏûÏ¢£¬Broker Ö±½Ó¶ªÆú¸ÃÏûÏ¢£¬Producer Å׳ö DuplicateSequenceNumber¡£

Èç¹ûÏûÏ¢ÐòºÅ¸ÕºÃ´óÒ»£¬¾ÍÖ¤Ã÷ÊǺϷ¨µÄ¡£

ÉÏÃæËù˵µÄ½â¾öÁËÁ½¸öÎÊÌ⣺

µ± Prouducer ·¢ËÍÁËÒ»ÌõÏûÏ¢Ö®ºóʧ°Ü£¬Broker ²¢Ã»Óб£´æ£¬µ«ÊǵڶþÌõÏûϢȴ·¢Ëͳɹ¦£¬Ôì³ÉÁËÊý¾ÝµÄÂÒÐò¡£

µ± Producer ·¢ËÍÁËÒ»ÌõÏûÏ¢Ö®ºó£¬Broker ±£´æ³É¹¦£¬Ack »Ø´«Ê§°Ü£¬Producer ÔÙ´ÎͶµÝÖØ¸´µÄÏûÏ¢¡£

ÉÏÃæËù˵µÄ¶¼ÊÇÔÚͬһ¸ö PID ÏÂÃæ£¬Òâζ×űØÐë±£Ö¤ÔÚµ¥¸ö Producer ÖеÄͬһ¸ö Seesion ÄÚ£¬Èç¹û Producer ¹ÒÁË£¬±»·ÖÅäÁËÐ嵀 PID£¬ÕâÑù¾ÍÎÞ·¨±£Ö¤ÁË£¬ËùÒÔ Kafka ÖÐÓÖÓÐÊÂÎñ»úÖÆÈ¥±£Ö¤¡£

ÊÂÎñ

ÔÚ Kafka ÖÐÊÂÎñµÄ×÷ÓÃÊÇ£º

ʵÏÖ exactly-once ÓïÒå¡£

±£Ö¤²Ù×÷µÄÔ­×ÓÐÔ£¬ÒªÃ´È«²¿³É¹¦£¬ÒªÃ´È«²¿Ê§°Ü¡£

ÓÐ״̬µÄ²Ù×÷µÄ»Ö¸´¡£

ÊÂÎñ¿ÉÒÔ±£Ö¤¾ÍËã¿ç¶à¸ö£¬ÔÚ±¾´ÎÊÂÎñÖеĶÔÏû·Ñ¶ÓÁеIJÙ×÷¶¼µ±³ÉÔ­×ÓÐÔ£¬ÒªÃ´È«²¿³É¹¦£¬ÒªÃ´È«²¿Ê§°Ü¡£

²¢ÇÒ£¬ÓÐ״̬µÄÓ¦ÓÃÒ²¿ÉÒÔ±£Ö¤ÖØÆôºó´Ó¶Ïµã´¦¼ÌÐø´¦Àí£¬Ò²¼´ÊÂÎñ»Ö¸´¡£

ÔÚ Kafka µÄÊÂÎñÖУ¬Ó¦ÓóÌÐò±ØÐëÌṩһ¸öΨһµÄÊÂÎñ ID£¬¼´ Transaction ID£¬²¢ÇÒå´»úÖØÆôÖ®ºó£¬Ò²²»»á·¢Éú¸Ä±ä¡£

Transactin ID Óë PID ¿ÉÄÜÒ»Ò»¶ÔÓ¦£¬Çø±ðÔÚÓÚ Transaction ID ÓÉÓû§Ìṩ£¬¶ø PID ÊÇÄÚ²¿µÄʵÏÖ¶ÔÓû§Í¸Ã÷¡£

ΪÁË Producer ÖØÆôÖ®ºó£¬¾ÉµÄ Producer ¾ßÓÐÏàͬµÄ Transaction ID ʧЧ£¬Ã¿´Î Producer ͨ¹ý Transaction ID Äõ½ PID µÄͬʱ£¬»¹»á»ñȡһ¸öµ¥µ÷µÝÔöµÄ Epoch¡£

ÓÉÓÚ¾ÉµÄ Producer µÄ Epoch ±ÈРProducer µÄ Epoch С£¬Kafka ¿ÉÒÔºÜÈÝÒ×ʶ±ð³ö¸Ã Producer ÊÇÀϵģ¬Producer ²¢¾Ü¾øÆäÇëÇó¡£

ΪÁËʵÏÖÕâÒ»µã£¬Kafka 0.11.0.0 ÒýÈëÁËÒ»¸ö·þÎñÆ÷¶ËµÄÄ£¿é£¬ÃûΪ Transaction Coordinator£¬ÓÃÓÚ¹ÜÀí Producer ·¢Ë͵ÄÏûÏ¢µÄÊÂÎñÐÔ¡£

¸Ã Transaction Coordinator ά»¤ Transaction Log£¬¸Ã Log ´æÓÚÒ»¸öÄÚ²¿µÄ Topic ÄÚ¡£

ÓÉÓÚ Topic Êý¾Ý¾ßÓг־ÃÐÔ£¬Òò´ËÊÂÎñµÄ״̬Ҳ¾ßÓг־ÃÐÔ¡£Producer ²¢²»Ö±½Ó¶Áд Transaction Log£¬ËüÓë Transaction Coordinator ͨÐÅ£¬È»ºóÓÉ Transaction Coordinator ½«¸ÃÊÂÎñµÄ״̬²åÈëÏàÓ¦µÄ Transaction Log¡£

Transaction Log µÄÉè¼ÆÓë Offset Log ÓÃÓÚ±£´æ Consumer µÄ Offset ÀàËÆ¡£

 
   
2062 ´Îä¯ÀÀ       29
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ