±à¼ÍƼö: |
±¾ÎĽéÉÜKafka
¼Ü¹¹ÔÀí,Kafka ¼Ü¹¹Í¼£¬Ïû·ÑÄ£ÐÍ£¬ÍøÂçÄ£ÐÍ£¬¸ß¿É¿¿·Ö²¼Ê½´æ´¢Ä£ÐÍ£¬¸±±¾»úÖÆµÈµÈ£¬Ï£Íû¶ÔÄúÓÐËù°ïÖú
±¾ÎÄÀ´×ÔÓÚ51CTO£¬ÓÉ»ðÁú¹ûÈí¼þDelores±à¼¡¢ÍƼö¡£ |
|
Apache Kafka ×îÔçÊÇÓÉ LinkedIn ¿ªÔ´³öÀ´µÄ·Ö²¼Ê½ÏûϢϵͳ£¬ÏÖÔÚÊÇ Apache ÆìϵÄÒ»¸ö×ÓÏîÄ¿£¬²¢ÇÒÒѾ³ÉΪ¿ªÔ´ÁìÓòÓ¦ÓÃ×î¹ã·ºµÄÏûϢϵͳ֮һ¡£
Kafka ÉçÇø·Ç³£»îÔ¾£¬´Ó 0.9 °æ±¾¿ªÊ¼£¬Kafka µÄ±êÓïÒѾ´Ó¡°Ò»¸ö¸ßÍÌÍÂÁ¿£¬·Ö²¼Ê½µÄÏûϢϵͳ¡±¸ÄΪ"Ò»¸ö·Ö²¼Ê½Á÷ƽ̨"¡£
Kafka ºÍ´«Í³µÄÏûϢϵͳ²»Í¬ÔÚÓÚ£º
KafkaÊÇÒ»¸ö·Ö²¼Ê½ÏµÍ³£¬Ò×ÓÚÏòÍâÀ©Õ¹¡£
ËüͬʱΪ·¢²¼ºÍ¶©ÔÄÌṩ¸ßÍÌÍÂÁ¿¡£
ËüÖ§³Ö¶à¶©ÔÄÕߣ¬µ±Ê§°ÜʱÄÜ×Ô¶¯Æ½ºâÏû·ÑÕß¡£
ÏûÏ¢µÄ³Ö¾Ã»¯¡£
Kafka ºÍÆäËûÏûÏ¢¶ÓÁеĶԱȣº
ÈëÃÅʵÀý
Éú²úÕß
´úÂëÈçÏ£º
import java.util.Properties;
import org.apache.kafka.clients .producer.KafkaProducer;
import org.apache.kafka.clients .producer.ProducerRecord;
public class UserKafkaProducer extends Thread
{
private final KafkaProducer <Integer, String>
producer;
private final String topic;
private final Properties props = new Properties();
public UserKafkaProducer (String topic)
{
props.put("metadata.broker.list", "localhost:9092");
props.put("bootstrap.servers", "master2:6667");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common. serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common. serialization.StringSerializer");
producer = new KafkaProducer <Integer, String>(props);
this.topic = topic;
}
@Override
public void run() {
int messageNo = 1;
while (true)
{
String messageStr = new String ("Message_"
+ messageNo);
System.out.println ("Send:" + messageStr);
//·µ»ØµÄÊÇFuture<RecordMetadata>, Òì²½·¢ËÍ
producer.send(new ProducerRecord <Integer, String>(topic,
messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} |
Ïû·ÑÕß
´úÂëÈçÏ£º
Properties props
= new Properties();
/* ¶¨Òåkakfa ·þÎñµÄµØÖ·£¬ ²»ÐèÒª½«ËùÓÐbrokerÖ¸¶¨ÉÏ */
props.put("bootstrap.servers", "localhost:9092");
/* ÖÆ¶¨consumer group */
props.put("group.id", "test");
/* ÊÇ·ñ×Ô¶¯È·ÈÏoffset */
props.put ("enable.auto.commit", "true");
/* ×Ô¶¯È·ÈÏoffsetµÄʱ¼ä¼ä¸ô */
props.put ("auto.commit.interval.ms",
"1000");
props.put("session.timeout.ms","30000");
/* keyµÄÐòÁл¯Àà */
props.put("key.deserializer", "org.apache.kafka.common. serialization.StringDeserializer");
/* valueµÄÐòÁл¯Àà */
props.put("value.deserializer", "org.apache.kafka.common. serialization.StringDeserializer");
/* ¶¨Òåconsumer */
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
/* Ïû·ÑÕß¶©ÔĵÄtopic, ¿Éͬʱ¶©ÔĶà¸ö */
consumer.subscribe (Arrays.asList("foo",
"bar"));
/* ¶ÁÈ¡Êý¾Ý£¬¶ÁÈ¡³¬Ê±Ê±¼äΪ100ms */
while (true) {
ConsumerRecords<String, String> records=consumer.poll(100);
for (ConsumerRecord<String, String> record
: records)
System.out.printf("offset = %d, key = %s,
value = %s", record.offset(), record.key(),
record.value());
} |
Kafka ¼Ü¹¹ÔÀí
¶ÔÓÚ Kafka µÄ¼Ü¹¹ÔÀí£¬ÎÒÃÇÏÈÌá³öÈçϼ¸¸öÎÊÌ⣺
Kafka µÄ topic ºÍ·ÖÇøÄÚ²¿ÊÇÈçºÎ´æ´¢µÄ£¬ÓÐÊ²Ã´ÌØµã£¿
Ó봫ͳµÄÏûϢϵͳÏà±È£¬Kafka µÄÏû·ÑÄ£ÐÍÓÐʲôÓŵã?
Kafka ÈçºÎʵÏÖ·Ö²¼Ê½µÄÊý¾Ý´æ´¢ÓëÊý¾Ý¶ÁÈ¡?
Kafka ¼Ü¹¹Í¼
Kafka Ãû´Ê½âÊÍ
ÔÚÒ»Ì× Kafka ¼Ü¹¹ÖÐÓжà¸ö Producer£¬¶à¸ö Broker£¬¶à¸ö Consumer£¬Ã¿¸ö Producer ¿ÉÒÔ¶ÔÓ¦¶à¸ö Topic£¬Ã¿¸ö Consumer Ö»ÄܶÔÓ¦Ò»¸ö Consumer Group¡£
Õû¸ö Kafka ¼Ü¹¹¶ÔÓ¦Ò»¸ö ZK ¼¯Èº£¬Í¨¹ý ZK ¹ÜÀí¼¯ÈºÅäÖã¬Ñ¡¾Ù Leader£¬ÒÔ¼°ÔÚ Consumer Group ·¢Éú±ä»¯Ê±½øÐÐ Rebalance¡£
Topic ºÍ Partition
ÔÚ Kafka ÖеÄÿһÌõÏûÏ¢¶¼ÓÐÒ»¸ö Topic¡£Ò»°ãÀ´ËµÔÚÎÒÃÇÓ¦ÓÃÖвúÉú²»Í¬ÀàÐ͵ÄÊý¾Ý£¬¶¼¿ÉÒÔÉèÖò»Í¬µÄÖ÷Ìâ¡£
Ò»¸öÖ÷ÌâÒ»°ã»áÓжà¸öÏûÏ¢µÄ¶©ÔÄÕߣ¬µ±Éú²úÕß·¢²¼ÏûÏ¢µ½Ä³¸öÖ÷Ìâʱ£¬¶©ÔÄÁËÕâ¸öÖ÷ÌâµÄÏû·ÑÕß¶¼¿ÉÒÔ½ÓÊÕµ½Éú²úÕßдÈëµÄÐÂÏûÏ¢¡£
Kafka Ϊÿ¸öÖ÷Ìâά»¤ÁË·Ö²¼Ê½µÄ·ÖÇø(Partition)ÈÕÖ¾Îļþ£¬Ã¿¸ö Partition ÔÚ Kafka ´æ´¢²ãÃæÊÇ Append Log¡£
Èκη¢²¼µ½´Ë Partition µÄÏûÏ¢¶¼»á±»×·¼Óµ½ Log ÎļþµÄβ²¿£¬ÔÚ·ÖÇøÖеÄÿÌõÏûÏ¢¶¼»á°´ÕÕʱ¼ä˳Ðò·ÖÅäµ½Ò»¸öµ¥µ÷µÝÔöµÄ˳Ðò±àºÅ£¬Ò²¾ÍÊÇÎÒÃÇµÄ Offset¡£Offset ÊÇÒ»¸ö Long Ð͵ÄÊý×Ö¡£
ÎÒÃÇͨ¹ýÕâ¸ö Offset ¿ÉÒÔÈ·¶¨Ò»ÌõÔڸà Partition ϵÄΨһÏûÏ¢¡£ÔÚ Partition ÏÂÃæÊDZ£Ö¤ÁËÓÐÐòÐÔ£¬µ«ÊÇÔÚ Topic ÏÂÃæÃ»Óб£Ö¤ÓÐÐòÐÔ¡£
ÔÚÉÏͼÖÐÎÒÃǵÄÉú²úÕß»á¾ö¶¨·¢Ë͵½Äĸö Partition£º
Èç¹ûûÓÐ Key ÖµÔò½øÐÐÂÖѯ·¢ËÍ¡£
Èç¹ûÓÐ Key Öµ£¬¶Ô Key Öµ½øÐÐ Hash£¬È»ºó¶Ô·ÖÇøÊýÁ¿È¡Ó࣬±£Ö¤ÁËͬһ¸ö Key ÖµµÄ»á±»Â·Óɵ½Í¬Ò»¸ö·ÖÇø£»Èç¹ûÏë¶ÓÁеÄǿ˳ÐòÒ»ÖÂÐÔ£¬¿ÉÒÔÈÃËùÓеÄÏûÏ¢¶¼ÉèÖÃΪͬһ¸ö Key¡£
Ïû·ÑÄ£ÐÍ
ÏûÏ¢ÓÉÉú²úÕß·¢Ë͵½ Kafka ¼¯Èººó£¬»á±»Ïû·ÑÕßÏû·Ñ¡£Ò»°ãÀ´ËµÎÒÃǵÄÏû·ÑÄ£ÐÍÓÐÁ½ÖÖ£º
ÍÆËÍÄ£ÐÍ(Push)
ÀȡģÐÍ(Pull)
»ùÓÚÍÆËÍÄ£Ð͵ÄÏûϢϵͳ£¬ÓÉÏûÏ¢´úÀí¼Ç¼Ïû·Ñ״̬¡£ÏûÏ¢´úÀí½«ÏûÏ¢ÍÆË͵½Ïû·ÑÕߺ󣬱ê¼ÇÕâÌõÏûϢΪÒѾ±»Ïû·Ñ£¬µ«ÊÇÕâÖÖ·½Ê½ÎÞ·¨ºÜºÃµØ±£Ö¤Ïû·ÑµÄ´¦ÀíÓïÒå¡£
±ÈÈçµ±ÎÒÃÇÒѾ°ÑÏûÏ¢·¢Ë͸øÏû·ÑÕßÖ®ºó£¬ÓÉÓÚÏû·Ñ½ø³Ì¹Òµô»òÕßÓÉÓÚÍøÂçÔÒòûÓÐÊÕµ½ÕâÌõÏûÏ¢£¬Èç¹ûÎÒÃÇÔÚÏû·Ñ´úÀí½«Æä±ê¼ÇΪÒÑÏû·Ñ£¬Õâ¸öÏûÏ¢¾Í***¶ªÊ§ÁË¡£
Èç¹ûÎÒÃÇÀûÓÃÉú²úÕßÊÕµ½ÏûÏ¢ºó»Ø¸´ÕâÖÖ·½·¨£¬ÏûÏ¢´úÀíÐèÒª¼Ç¼Ïû·Ñ״̬£¬ÕâÖÖ²»¿ÉÈ¡¡£
Èç¹û²ÉÓà Push£¬ÏûÏ¢Ïû·ÑµÄËÙÂʾÍÍêÈ«ÓÉÏû·Ñ´úÀí¿ØÖÆ£¬Ò»µ©Ïû·ÑÕß·¢Éú×èÈû£¬¾Í»á³öÏÖÎÊÌâ¡£
Kafka ²ÉÈ¡ÀȡģÐÍ(Poll)£¬ÓÉ×Ô¼º¿ØÖÆÏû·ÑËÙ¶È£¬ÒÔ¼°Ïû·ÑµÄ½ø¶È£¬Ïû·ÑÕß¿ÉÒÔ°´ÕÕÈÎÒâµÄÆ«ÒÆÁ¿½øÐÐÏû·Ñ¡£
±ÈÈçÏû·ÑÕß¿ÉÒÔÏû·ÑÒѾÏû·Ñ¹ýµÄÏûÏ¢½øÐÐÖØÐ´¦Àí£¬»òÕßÏû·Ñ×î½üµÄÏûÏ¢µÈµÈ¡£
ÍøÂçÄ£ÐÍ
Kafka Client£ºµ¥Ïß³Ì Selector
µ¥Ïß³ÌģʽÊÊÓÃÓÚ²¢·¢Á´½ÓÊýС£¬Âß¼¼òµ¥£¬Êý¾ÝÁ¿Ð¡µÄÇé¿ö¡£ÔÚ Kafka ÖУ¬Consumer ºÍ Producer ¶¼ÊÇʹÓõÄÉÏÃæµÄµ¥Ïß³Ìģʽ¡£
ÕâÖÖģʽ²»ÊÊºÏ Kafka µÄ·þÎñ¶Ë£¬ÔÚ·þÎñ¶ËÖÐÇëÇó´¦Àí¹ý³Ì±È½Ï¸´ÔÓ£¬»áÔì³ÉÏß³Ì×èÈû£¬Ò»µ©³öÏÖºóÐøÇëÇó¾Í»áÎÞ·¨´¦Àí£¬»áÔì³É´óÁ¿ÇëÇó³¬Ê±£¬ÒýÆðÑ©±À¡£¶øÔÚ·þÎñÆ÷ÖÐÓ¦¸Ã³ä·ÖÀûÓöàÏß³ÌÀ´´¦ÀíÖ´ÐÐÂß¼¡£
Kafka Server£º¶àÏß³Ì Selector
ÔÚ Kafka ·þÎñ¶Ë²ÉÓõÄÊǶàÏß³ÌµÄ Selector Ä£ÐÍ£¬Acceptor ÔËÐÐÔÚÒ»¸öµ¥¶ÀµÄÏß³ÌÖУ¬¶ÔÓÚ¶ÁÈ¡²Ù×÷µÄÏ̳߳ØÖеÄÏ̶߳¼»áÔÚ Selector ×¢²á Read ʼþ£¬¸ºÔð·þÎñ¶Ë¶ÁÈ¡ÇëÇóµÄÂß¼¡£
³É¹¦¶ÁÈ¡ºó£¬½«ÇëÇó·ÅÈë Message Queue¹²Ïí¶ÓÁÐÖС£È»ºóÔÚдÏ̳߳ØÖУ¬È¡³öÕâ¸öÇëÇó£¬¶ÔÆä½øÐÐÂß¼´¦Àí¡£
ÕâÑù£¬¼´Ê¹Ä³¸öÇëÇóÏß³Ì×èÈûÁË£¬»¹ÓкóÐøµÄÏ̴߳ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÇëÇó²¢½øÐд¦Àí£¬ÔÚдÏß³ÌÖд¦ÀíÍêÂß¼´¦Àí£¬ÓÉÓÚ×¢²áÁË OP_WIRTE ʼþ£¬ËùÒÔ»¹ÐèÒª¶ÔÆä·¢ËÍÏìÓ¦¡£
¸ß¿É¿¿·Ö²¼Ê½´æ´¢Ä£ÐÍ
ÔÚ Kafka Öб£Ö¤¸ß¿É¿¿Ä£ÐÍÒÀ¿¿µÄÊǸ±±¾»úÖÆ£¬ÓÐÁ˸±±¾»úÖÆÖ®ºó£¬¾ÍËã»úÆ÷å´»úÒ²²»»á·¢ÉúÊý¾Ý¶ªÊ§¡£
¸ßÐÔÄܵÄÈÕÖ¾´æ´¢
Kafka Ò»¸ö Topic ÏÂÃæµÄËùÓÐÏûÏ¢¶¼ÊÇÒÔ Partition µÄ·½Ê½·Ö²¼Ê½µÄ´æ´¢ÔÚ¶à¸ö½ÚµãÉÏ¡£
ͬʱÔÚ Kafka µÄ»úÆ÷ÉÏ£¬Ã¿¸ö Partition Æäʵ¶¼»á¶ÔÓ¦Ò»¸öÈÕ־Ŀ¼£¬ÔÚĿ¼ÏÂÃæ»á¶ÔÓ¦¶à¸öÈÕÖ¾·Ö¶Î(LogSegment)¡£
LogSegment ÎļþÓÉÁ½²¿·Ö×é³É£¬·Ö±ðΪ¡°.index¡±ÎļþºÍ¡°.log¡±Îļþ£¬·Ö±ð±íʾΪ Segment Ë÷ÒýÎļþºÍÊý¾ÝÎļþ¡£
ÕâÁ½¸öÎļþµÄÃüÁî¹æÔòΪ£ºPartition È«¾ÖµÄ***¸ö Segment ´Ó 0 ¿ªÊ¼£¬ºóÐøÃ¿¸ö Segment ÎļþÃûΪÉÏÒ»¸ö Segment Îļþ***Ò»ÌõÏûÏ¢µÄ Offset Öµ£¬ÊýÖµ´óСΪ 64 룬20 λÊý×Ö×Ö·û³¤¶È£¬Ã»ÓÐÊý×ÖÓà 0 Ìî³ä¡£
ÈçÏ£¬¼ÙÉèÓÐ 1000 ÌõÏûÏ¢£¬Ã¿¸ö LogSegment ´óСΪ 100£¬ÏÂÃæÕ¹ÏÖÁË 900-1000 µÄË÷ÒýºÍ Log£º
ÓÉÓÚ Kafka ÏûÏ¢Êý¾ÝÌ«´ó£¬Èç¹ûÈ«²¿½¨Á¢Ë÷Òý£¬¼ÈÕ¼Á˿ռäÓÖÔö¼ÓÁ˺Äʱ£¬ËùÒÔ Kafka Ñ¡ÔñÁËÏ¡ÊèË÷ÒýµÄ·½Ê½£¬ÕâÑùË÷Òý¿ÉÒÔÖ±½Ó½øÈëÄڴ棬¼Ó¿ìÆ«²éѯËÙ¶È¡£
¼òµ¥½éÉÜÒ»ÏÂÈçºÎ¶ÁÈ¡Êý¾Ý£¬Èç¹ûÎÒÃÇÒª¶ÁÈ¡µÚ 911 ÌõÊý¾ÝÊ×ÏÈ***²½£¬ÕÒµ½ËüÊÇÊôÓÚÄÄÒ»¶ÎµÄ¡£
¸ù¾Ý¶þ·Ö·¨²éÕÒµ½ËüÊôÓÚµÄÎļþ£¬ÕÒµ½ 0000900.index ºÍ 00000900.log Ö®ºó£¬È»ºóÈ¥ index ÖÐÈ¥²éÕÒ (911-900) = 11 Õâ¸öË÷Òý»òÕßСÓÚ 11 ×î½üµÄË÷Òý¡£
ÔÚÕâÀïͨ¹ý¶þ·Ö·¨ÎÒÃÇÕÒµ½ÁËË÷ÒýÊÇ [10,1367]£¬È»ºóÎÒÃÇͨ¹ýÕâÌõË÷ÒýµÄÎïÀíλÖà 1367£¬¿ªÊ¼ÍùºóÕÒ£¬Ö±µ½ÕÒµ½ 911 ÌõÊý¾Ý¡£
ÉÏÃæ½²µÄÊÇÈç¹ûÒªÕÒij¸ö Offset µÄÁ÷³Ì£¬µ«ÊÇÎÒÃÇ´ó¶àÊýʱºò²¢²»ÐèÒª²éÕÒij¸ö Offset£¬Ö»ÐèÒª°´ÕÕ˳Ðò¶Á¼´¿É¡£
¶øÔÚ˳Ðò¶ÁÖУ¬²Ù×÷ϵͳ»áÔÚÄÚ´æºÍ´ÅÅÌÖ®¼äÌí¼Ó Page Cache£¬Ò²¾ÍÊÇÎÒÃÇÆ½³£¼ûµ½µÄÔ¤¶Á²Ù×÷£¬ËùÒÔÎÒÃǵÄ˳Ðò¶Á²Ù×÷ʱËٶȺܿ졣
µ«ÊÇ Kafka ÓиöÎÊÌ⣬Èç¹û·ÖÇø¹ý¶à£¬ÄÇôÈÕÖ¾·Ö¶ÎÒ²»áºÜ¶à£¬Ð´µÄʱºòÓÉÓÚÊÇÅúÁ¿Ð´£¬Æäʵ¾Í»á±ä³ÉËæ»úдÁË£¬Ëæ»ú I/O Õâ¸öʱºò¶ÔÐÔÄÜÓ°ÏìºÜ´ó¡£ËùÒÔÒ»°ãÀ´Ëµ Kafka ²»ÄÜÓÐÌ«¶àµÄ Partition¡£
Õë¶ÔÕâÒ»µã£¬RocketMQ °ÑËùÓеÄÈÕÖ¾¶¼Ð´ÔÚÒ»¸öÎļþÀïÃæ£¬¾ÍÄܱä³É˳Ðòд£¬Í¨¹ýÒ»¶¨ÓÅ»¯£¬¶ÁÒ²ÄܽӽüÓÚ˳Ðò¶Á¡£
´ó¼Ò¿ÉÒÔ˼¿¼Ò»Ï£º
ΪʲôÐèÒª·ÖÇø£¬Ò²¾ÍÊÇ˵Ö÷ÌâÖ»ÓÐÒ»¸ö·ÖÇø£¬ÄѵÀ²»ÐÐÂð£¿
ÈÕ־ΪʲôÐèÒª·Ö¶Î£¿
¸±±¾»úÖÆ
Kafka µÄ¸±±¾»úÖÆÊǶà¸ö·þÎñ¶Ë½Úµã¶ÔÆäËû½ÚµãµÄÖ÷Ìâ·ÖÇøµÄÈÕÖ¾½øÐи´ÖÆ¡£
µ±¼¯ÈºÖеÄij¸ö½Úµã³öÏÖ¹ÊÕÏ£¬·ÃÎʹÊÕϽڵãµÄÇëÇó»á±»×ªÒƵ½ÆäËûÕý³£½Úµã(ÕâÒ»¹ý³Ìͨ³£½Ð Reblance)¡£
Kafka ÿ¸öÖ÷ÌâµÄÿ¸ö·ÖÇø¶¼ÓÐÒ»¸öÖ÷¸±±¾ÒÔ¼° 0 ¸ö»òÕß¶à¸ö¸±±¾£¬¸±±¾±£³ÖºÍÖ÷¸±±¾µÄÊý¾Ýͬ²½£¬µ±Ö÷¸±±¾³ö¹ÊÕÏʱ¾Í»á±»Ìæ´ú¡£
ÔÚ Kafka Öв¢²»ÊÇËùÓеĸ±±¾¶¼Äܱ»ÄÃÀ´Ìæ´úÖ÷¸±±¾£¬ËùÒÔÔÚ Kafka µÄ Leader ½ÚµãÖÐά»¤×ÅÒ»¸ö ISR(In Sync Replicas)¼¯ºÏ¡£
·Òë¹ýÀ´Ò²½ÐÕýÔÚͬ²½Öм¯ºÏ£¬ÔÚÕâ¸ö¼¯ºÏÖеÄÐèÒªÂú×ãÁ½¸öÌõ¼þ£º
½Úµã±ØÐëºÍ ZK ±£³ÖÁ¬½Ó¡£
ÔÚͬ²½µÄ¹ý³ÌÖÐÕâ¸ö¸±±¾²»ÄÜÂäºóÖ÷¸±±¾Ì«¶à¡£
ÁíÍ⻹Óиö AR(Assigned Replicas)ÓÃÀ´±êʶ¸±±¾µÄÈ«¼¯£¬OSR ÓÃÀ´±íʾÓÉÓÚÂäºó±»ÌÞ³ýµÄ¸±±¾¼¯ºÏ¡£
ËùÒÔ¹«Ê½ÈçÏ£ºISR = Leader + ûÓÐÂäºóÌ«¶àµÄ¸±±¾£»AR = OSR+ ISR¡£
ÕâÀïÏÈҪ˵ÏÂÁ½¸öÃû´Ê£ºHW(¸ßˮλ)ÊÇ Consumer Äܹ»¿´µ½µÄ´Ë Partition µÄλÖã¬LEO ÊÇÿ¸ö Partition µÄ Log ***Ò»Ìõ Message µÄλÖá£
HW Äܱ£Ö¤ Leader ËùÔÚµÄ Broker ʧЧ£¬¸ÃÏûÏ¢ÈÔÈ»¿ÉÒÔ´ÓÐÂÑ¡¾ÙµÄ Leader ÖлñÈ¡£¬²»»áÔì³ÉÏûÏ¢¶ªÊ§¡£
µ± Producer Ïò Leader ·¢ËÍÊý¾Ýʱ£¬¿ÉÒÔͨ¹ý request.required.acks ²ÎÊýÀ´ÉèÖÃÊý¾Ý¿É¿¿ÐԵļ¶±ð£º
1£¨Ä¬ÈÏ£©£ºÕâÒâζ×Å Producer ÔÚ ISR ÖÐµÄ Leader Òѳɹ¦ÊÕµ½µÄÊý¾Ý²¢µÃµ½È·ÈϺó·¢ËÍÏÂÒ»Ìõ Message¡£Èç¹û Leader å´»úÁË£¬Ôò»á¶ªÊ§Êý¾Ý¡£
0£ºÕâÒâζ×Å Producer ÎÞÐèµÈ´ýÀ´×Ô Broker µÄÈ·È϶ø¼ÌÐø·¢ËÍÏÂÒ»ÅúÏûÏ¢¡£ÕâÖÖÇé¿öÏÂÊý¾Ý´«ÊäЧÂÊ***£¬µ«ÊÇÊý¾Ý¿É¿¿ÐÔÈ´ÊÇ***µÄ¡£
-1£ºProducer ÐèÒªµÈ´ý ISR ÖеÄËùÓÐ Follower ¶¼È·ÈϽÓÊÕµ½Êý¾Ýºó²ÅËãÒ»´Î·¢ËÍÍê³É£¬¿É¿¿ÐÔ***¡£
µ«ÊÇÕâÑùÒ²²»Äܱ£Ö¤Êý¾Ý²»¶ªÊ§£¬±ÈÈçµ± ISR ÖÐÖ»ÓÐ Leader ʱ(ÆäËû½Úµã¶¼ºÍ ZK ¶Ï¿ªÁ¬½Ó£¬»òÕß¶¼Ã»×·ÉÏ)£¬ÕâÑù¾Í±ä³ÉÁË acks = 1 µÄÇé¿ö¡£
¸ß¿ÉÓÃÄ£Ðͼ°ÃݵÈ
ÔÚ·Ö²¼Ê½ÏµÍ³ÖÐÒ»°ãÓÐÈýÖÖ´¦ÀíÓïÒ壺
at-least-once
ÖÁÉÙÒ»´Î£¬ÓпÉÄÜ»áÓжà´Î¡£Èç¹û Producer ÊÕµ½À´×Ô Ack µÄÈ·ÈÏ£¬Ôò±íʾ¸ÃÏûÏ¢ÒѾдÈëµ½ Kafka ÁË£¬´Ëʱ¸ÕºÃÊÇÒ»´Î£¬Ò²¾ÍÊÇÎÒÃǺóÃæµÄ Exactly-once¡£
µ«ÊÇÈç¹û Producer ³¬Ê±»òÊÕµ½´íÎ󣬲¢ÇÒ request.required.acks ÅäÖõIJ»ÊÇ -1£¬Ôò»áÖØÊÔ·¢ËÍÏûÏ¢£¬¿Í»§¶Ë»áÈÏΪ¸ÃÏûϢδдÈë Kafka¡£
Èç¹û Broker ÔÚ·¢ËÍ Ack ֮ǰʧ°Ü£¬µ«ÔÚÏûÏ¢³É¹¦Ð´Èë Kafka Ö®ºó£¬ÕâÒ»´ÎÖØÊÔ½«»áµ¼ÖÂÎÒÃǵÄÏûÏ¢»á±»Ð´ÈëÁ½´Î¡£
ËùÒÔÏûÏ¢¾Í²»Ö¹Ò»´ÎµØ´«µÝ¸ø×îÖÕ Consumer£¬Èç¹û Consumer ´¦ÀíÂ߼ûÓб£Ö¤Ãݵȵϰ¾Í»áµÃµ½²»ÕýÈ·µÄ½á¹û¡£
ÔÚÕâÖÖÓïÒåÖлá³öÏÖÂÒÐò£¬Ò²¾ÍÊǵ±***´Î Ack ʧ°Ü×¼±¸ÖØÊÔµÄʱºò£¬µ«ÊǵڶþÏûÏ¢ÒѾ·¢Ë͹ýÈ¥ÁË£¬Õâ¸öʱºò»á³öÏÖµ¥·ÖÇøÖÐÂÒÐòµÄÏÖÏó¡£
ÎÒÃÇÐèÒªÉèÖà Prouducer µÄ²ÎÊý max.in.flight.requests.per.connection£¬flight.requests ÊÇ Producer ¶ËÓÃÀ´±£´æ·¢ËÍÇëÇóÇÒûÓÐÏìÓ¦µÄ¶ÓÁУ¬±£Ö¤ Produce r¶ËδÏìÓ¦µÄÇëÇó¸öÊýΪ 1¡£
at-most-once
Èç¹ûÔÚ Ack ³¬Ê±»ò·µ»Ø´íÎóʱ Producer ²»ÖØÊÔ£¬Ò²¾ÍÊÇÎÒÃǽ² request.required.acks = -1£¬Ôò¸ÃÏûÏ¢¿ÉÄÜ×îÖÕûÓÐдÈë Kafka£¬ËùÒÔ Consumer ²»»á½ÓÊÕÏûÏ¢¡£
exactly-once
¸ÕºÃÒ»´Î£¬¼´Ê¹ Producer ÖØÊÔ·¢ËÍÏûÏ¢£¬ÏûÏ¢Ò²»á±£Ö¤×î¶àÒ»´ÎµØ´«µÝ¸ø Consumer¡£¸ÃÓïÒåÊÇ×îÀíÏëµÄ£¬Ò²ÊÇ×îÄÑʵÏֵġ£
ÔÚ 0.10 ֮ǰ²¢²»Äܱ£Ö¤ exactly-once£¬ÐèҪʹÓà Consumer ×Ô´øµÄÃݵÈÐÔ±£Ö¤¡£0.11.0 ʹÓÃÊÂÎñ±£Ö¤ÁË¡£
ÈçºÎʵÏÖ exactly-once
ҪʵÏÖ exactly-once ÔÚ Kafka 0.11.0 ÖÐÓÐÁ½¸ö¹Ù·½²ßÂÔ£º
µ¥ Producer µ¥ Topic
ÿ¸ö Producer ÔÚ³õʼ»¯µÄʱºò¶¼»á±»·ÖÅäÒ»¸öΨһµÄ PID£¬¶ÔÓÚÿ¸öΨһµÄ PID£¬Producer ÏòÖ¸¶¨µÄ Topic ÖÐij¸öÌØ¶¨µÄ Partition ·¢Ë͵ÄÏûÏ¢¶¼»áЯ´øÒ»¸ö´Ó 0 µ¥µ÷µÝÔöµÄ Sequence Number¡£
ÔÚÎÒÃÇµÄ Broker ¶ËÒ²»áά»¤Ò»¸öά¶ÈΪ£¬Ã¿´ÎÌá½»Ò»´ÎÏûÏ¢µÄʱºò¶¼»á¶ÔÆë½øÐÐУÑ飺
Èç¹ûÏûÏ¢ÐòºÅ±È Broker ά»¤µÄÐòºÅ´óÒ»ÒÔÉÏ£¬ËµÃ÷ÖмäÓÐÊý¾ÝÉÐδдÈ룬Ҳ¼´ÂÒÐò£¬´Ëʱ Broker ¾Ü¾ø¸ÃÏûÏ¢£¬Producer Å׳ö InvalidSequenceNumber¡£
Èç¹ûÏûÏ¢ÐòºÅСÓÚµÈÓÚ Broker ά»¤µÄÐòºÅ£¬ËµÃ÷¸ÃÏûÏ¢Òѱ»±£´æ£¬¼´ÎªÖظ´ÏûÏ¢£¬Broker Ö±½Ó¶ªÆú¸ÃÏûÏ¢£¬Producer Å׳ö DuplicateSequenceNumber¡£
Èç¹ûÏûÏ¢ÐòºÅ¸ÕºÃ´óÒ»£¬¾ÍÖ¤Ã÷ÊǺϷ¨µÄ¡£
ÉÏÃæËù˵µÄ½â¾öÁËÁ½¸öÎÊÌ⣺
µ± Prouducer ·¢ËÍÁËÒ»ÌõÏûÏ¢Ö®ºóʧ°Ü£¬Broker ²¢Ã»Óб£´æ£¬µ«ÊǵڶþÌõÏûϢȴ·¢Ëͳɹ¦£¬Ôì³ÉÁËÊý¾ÝµÄÂÒÐò¡£
µ± Producer ·¢ËÍÁËÒ»ÌõÏûÏ¢Ö®ºó£¬Broker ±£´æ³É¹¦£¬Ack »Ø´«Ê§°Ü£¬Producer ÔÙ´ÎͶµÝÖØ¸´µÄÏûÏ¢¡£
ÉÏÃæËù˵µÄ¶¼ÊÇÔÚͬһ¸ö PID ÏÂÃæ£¬Òâζ×űØÐë±£Ö¤ÔÚµ¥¸ö Producer ÖеÄͬһ¸ö Seesion ÄÚ£¬Èç¹û Producer ¹ÒÁË£¬±»·ÖÅäÁËÐ嵀 PID£¬ÕâÑù¾ÍÎÞ·¨±£Ö¤ÁË£¬ËùÒÔ Kafka ÖÐÓÖÓÐÊÂÎñ»úÖÆÈ¥±£Ö¤¡£
ÊÂÎñ
ÔÚ Kafka ÖÐÊÂÎñµÄ×÷ÓÃÊÇ£º
ʵÏÖ exactly-once ÓïÒå¡£
±£Ö¤²Ù×÷µÄÔ×ÓÐÔ£¬ÒªÃ´È«²¿³É¹¦£¬ÒªÃ´È«²¿Ê§°Ü¡£
ÓÐ״̬µÄ²Ù×÷µÄ»Ö¸´¡£
ÊÂÎñ¿ÉÒÔ±£Ö¤¾ÍËã¿ç¶à¸ö£¬ÔÚ±¾´ÎÊÂÎñÖеĶÔÏû·Ñ¶ÓÁеIJÙ×÷¶¼µ±³ÉÔ×ÓÐÔ£¬ÒªÃ´È«²¿³É¹¦£¬ÒªÃ´È«²¿Ê§°Ü¡£
²¢ÇÒ£¬ÓÐ״̬µÄÓ¦ÓÃÒ²¿ÉÒÔ±£Ö¤ÖØÆôºó´Ó¶Ïµã´¦¼ÌÐø´¦Àí£¬Ò²¼´ÊÂÎñ»Ö¸´¡£
ÔÚ Kafka µÄÊÂÎñÖУ¬Ó¦ÓóÌÐò±ØÐëÌṩһ¸öΨһµÄÊÂÎñ ID£¬¼´ Transaction ID£¬²¢ÇÒå´»úÖØÆôÖ®ºó£¬Ò²²»»á·¢Éú¸Ä±ä¡£
Transactin ID Óë PID ¿ÉÄÜÒ»Ò»¶ÔÓ¦£¬Çø±ðÔÚÓÚ Transaction ID ÓÉÓû§Ìṩ£¬¶ø PID ÊÇÄÚ²¿µÄʵÏÖ¶ÔÓû§Í¸Ã÷¡£
ΪÁË Producer ÖØÆôÖ®ºó£¬¾ÉµÄ Producer ¾ßÓÐÏàͬµÄ Transaction ID ʧЧ£¬Ã¿´Î Producer ͨ¹ý Transaction ID Äõ½ PID µÄͬʱ£¬»¹»á»ñȡһ¸öµ¥µ÷µÝÔöµÄ Epoch¡£
ÓÉÓÚ¾ÉµÄ Producer µÄ Epoch ±ÈРProducer µÄ Epoch С£¬Kafka ¿ÉÒÔºÜÈÝÒ×ʶ±ð³ö¸Ã Producer ÊÇÀϵģ¬Producer ²¢¾Ü¾øÆäÇëÇó¡£
ΪÁËʵÏÖÕâÒ»µã£¬Kafka 0.11.0.0 ÒýÈëÁËÒ»¸ö·þÎñÆ÷¶ËµÄÄ£¿é£¬ÃûΪ Transaction Coordinator£¬ÓÃÓÚ¹ÜÀí Producer ·¢Ë͵ÄÏûÏ¢µÄÊÂÎñÐÔ¡£
¸Ã Transaction Coordinator ά»¤ Transaction Log£¬¸Ã Log ´æÓÚÒ»¸öÄÚ²¿µÄ Topic ÄÚ¡£
ÓÉÓÚ Topic Êý¾Ý¾ßÓг־ÃÐÔ£¬Òò´ËÊÂÎñµÄ״̬Ҳ¾ßÓг־ÃÐÔ¡£Producer ²¢²»Ö±½Ó¶Áд Transaction Log£¬ËüÓë Transaction Coordinator ͨÐÅ£¬È»ºóÓÉ Transaction Coordinator ½«¸ÃÊÂÎñµÄ״̬²åÈëÏàÓ¦µÄ Transaction Log¡£
Transaction Log µÄÉè¼ÆÓë Offset Log ÓÃÓÚ±£´æ Consumer µÄ Offset ÀàËÆ¡£
|