Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
kafka ѧϰ ·Ç³£ÏêϸµÄ¾­µä½Ì³Ì
 
  9641  次浏览      27
 2018-1-11  
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚcsdn,ÎÄÕ½éÉܱȽÏÏêϸ£¬ÓÉdzÈëÉ´Ó»ù´¡¸ÅÄî´î½¨»·¾³¶¼ÐÔÄÜÓÅ»¯£¬¿Í»§¶ËµÄAPIµÈµÈ£¬´úÂë²ã²ãÐðÊö¡£

Ò»¡¢»ù±¾¸ÅÄî

½éÉÜ

KafkaÊÇÒ»¸ö·Ö²¼Ê½µÄ¡¢¿É·ÖÇøµÄ¡¢¿É¸´ÖƵÄÏûϢϵͳ¡£ËüÌṩÁËÆÕͨÏûϢϵͳµÄ¹¦ÄÜ£¬µ«¾ßÓÐ×Ô¼º¶ÀÌØµÄÉè¼Æ¡£

Õâ¸ö¶ÀÌØµÄÉè¼ÆÊÇʲôÑùµÄÄØ£¿

Ê×ÏÈÈÃÎÒÃÇ¿´¼¸¸ö»ù±¾µÄÏûϢϵͳÊõÓ

Kafka½«ÏûÏ¢ÒÔtopicΪµ¥Î»½øÐйéÄÉ¡£

½«ÏòKafka topic·¢²¼ÏûÏ¢µÄ³ÌÐò³ÉΪproducers.

½«Ô¤¶©topics²¢Ïû·ÑÏûÏ¢µÄ³ÌÐò³ÉΪconsumer.

KafkaÒÔ¼¯ÈºµÄ·½Ê½ÔËÐУ¬¿ÉÒÔÓÉÒ»¸ö»ò¶à¸ö·þÎñ×é³É£¬Ã¿¸ö·þÎñ½Ð×öÒ»¸öbroker.

producersͨ¹ýÍøÂ罫ÏûÏ¢·¢Ë͵½Kafka¼¯Èº£¬¼¯ÈºÏòÏû·ÑÕßÌṩÏûÏ¢£¬ÈçÏÂͼËùʾ£º

¿Í»§¶ËºÍ·þÎñ¶Ëͨ¹ýTCPЭÒéͨÐÅ¡£KafkaÌṩÁËJava¿Í»§¶Ë£¬²¢ÇÒ¶Ô¶àÖÖÓïÑÔ¶¼ÌṩÁËÖ§³Ö¡£

Topics ºÍLogs

ÏÈÀ´¿´Ò»ÏÂKafkaÌṩµÄÒ»¸ö³éÏó¸ÅÄî:topic.

Ò»¸ötopicÊǶÔÒ»×éÏûÏ¢µÄ¹éÄÉ¡£¶Ôÿ¸ötopic£¬Kafka ¶ÔËüµÄÈÕÖ¾½øÐÐÁË·ÖÇø£¬ÈçÏÂͼËùʾ£º

ÿ¸ö·ÖÇø¶¼ÓÉһϵÁÐÓÐÐòµÄ¡¢²»¿É±äµÄÏûÏ¢×é³É£¬ÕâЩÏûÏ¢±»Á¬ÐøµÄ×·¼Óµ½·ÖÇøÖС£·ÖÇøÖеÄÿ¸öÏûÏ¢¶¼ÓÐÒ»¸öÁ¬ÐøµÄÐòÁкŽÐ×öoffset,ÓÃÀ´ÔÚ·ÖÇøÖÐΨһµÄ±êʶÕâ¸öÏûÏ¢¡£

ÔÚÒ»¸ö¿ÉÅäÖõÄʱ¼ä¶ÎÄÚ£¬Kafka¼¯Èº±£ÁôËùÓз¢²¼µÄÏûÏ¢£¬²»¹ÜÕâЩÏûÏ¢ÓÐûÓб»Ïû·Ñ¡£±ÈÈ磬Èç¹ûÏûÏ¢µÄ±£´æ²ßÂÔ±»ÉèÖÃΪ2Ì죬ÄÇôÔÚÒ»¸öÏûÏ¢±»·¢²¼µÄÁ½Ììʱ¼äÄÚ£¬Ëü¶¼ÊÇ¿ÉÒÔ±»Ïû·ÑµÄ¡£Ö®ºóËü½«±»¶ªÆúÒÔÊͷſռ䡣KafkaµÄÐÔÄÜÊǺÍÊý¾ÝÁ¿Î޹صij£Á¿¼¶µÄ£¬ËùÒÔ±£ÁôÌ«¶àµÄÊý¾Ý²¢²»ÊÇÎÊÌâ¡£

ʵ¼ÊÉÏÿ¸öconsumerΨһÐèҪά»¤µÄÊý¾ÝÊÇÏûÏ¢ÔÚÈÕÖ¾ÖеÄλÖã¬Ò²¾ÍÊÇoffset.Õâ¸öoffsetÓÐconsumerÀ´Î¬»¤£ºÒ»°ãÇé¿öÏÂËæ×Åconsumer²»¶ÏµÄ¶ÁÈ¡ÏûÏ¢£¬ÕâoffsetµÄÖµ²»¶ÏÔö¼Ó£¬µ«Æäʵconsumer¿ÉÒÔÒÔÈÎÒâµÄ˳Ðò¶ÁÈ¡ÏûÏ¢£¬±ÈÈçËü¿ÉÒÔ½«offsetÉèÖóÉΪһ¸ö¾ÉµÄÖµÀ´ÖضÁ֮ǰµÄÏûÏ¢¡£

ÒÔÉÏÌØµãµÄ½áºÏ£¬Ê¹Kafka consumers·Ç³£µÄÇáÁ¿¼¶£ºËüÃÇ¿ÉÒÔÔÚ²»¶Ô¼¯ÈººÍÆäËûconsumerÔì³ÉÓ°ÏìµÄÇé¿ö϶ÁÈ¡ÏûÏ¢¡£Äã¿ÉÒÔʹÓÃÃüÁîÐÐÀ´"tail"ÏûÏ¢¶ø²»»á¶ÔÆäËûÕýÔÚÏû·ÑÏûÏ¢µÄconsumerÔì³ÉÓ°Ïì¡£

½«ÈÕÖ¾·ÖÇø¿ÉÒÔ´ïµ½ÒÔÏÂÄ¿µÄ£ºÊ×ÏÈÕâʹµÃÿ¸öÈÕÖ¾µÄÊýÁ¿²»»áÌ«´ó£¬¿ÉÒÔÔÚµ¥¸ö·þÎñÉϱ£´æ¡£ÁíÍâÿ¸ö·ÖÇø¿ÉÒÔµ¥¶À·¢²¼ºÍÏû·Ñ£¬Îª²¢·¢²Ù×÷topicÌṩÁËÒ»ÖÖ¿ÉÄÜ¡£

·Ö²¼Ê½

ÿ¸ö·ÖÇøÔÚKafka¼¯ÈºµÄÈô¸É·þÎñÖж¼Óи±±¾£¬ÕâÑùÕâЩ³ÖÓи±±¾µÄ·þÎñ¿ÉÒÔ¹²Í¬´¦ÀíÊý¾ÝºÍÇëÇ󣬸±±¾ÊýÁ¿ÊÇ¿ÉÒÔÅäÖõġ£¸±±¾Ê¹Kafka¾ß±¸ÁËÈÝ´íÄÜÁ¦¡£

ÿ¸ö·ÖÇø¶¼ÓÉÒ»¸ö·þÎñÆ÷×÷Ϊ¡°leader¡±£¬Áã»òÈô¸É·þÎñÆ÷×÷Ϊ¡°followers¡±,leader¸ºÔð´¦ÀíÏûÏ¢µÄ¶ÁºÍд£¬followersÔòÈ¥¸´ÖÆleader.Èç¹ûleader downÁË£¬followersÖеÄһ̨Ôò»á×Ô¶¯³ÉΪleader¡£¼¯ÈºÖеÄÿ¸ö·þÎñ¶¼»áͬʱ°çÑÝÁ½¸ö½ÇÉ«£º×÷ΪËüËù³ÖÓеÄÒ»²¿·Ö·ÖÇøµÄleader£¬Í¬Ê±×÷ΪÆäËû·ÖÇøµÄfollowers£¬ÕâÑù¼¯Èº¾Í»á¾ÝÓнϺõĸºÔؾùºâ¡£

Producers

Producer½«ÏûÏ¢·¢²¼µ½ËüÖ¸¶¨µÄtopicÖÐ,²¢¸ºÔð¾ö¶¨·¢²¼µ½Äĸö·ÖÇø¡£Í¨³£¼òµ¥µÄÓɸºÔؾùºâ»úÖÆËæ»úÑ¡Ôñ·ÖÇø£¬µ«Ò²¿ÉÒÔͨ¹ýÌØ¶¨µÄ·ÖÇøº¯ÊýÑ¡Ôñ·ÖÇø¡£Ê¹Óõĸü¶àµÄÊǵڶþÖÖ¡£

Consumers

·¢²¼ÏûϢͨ³£ÓÐÁ½ÖÖģʽ£º¶ÓÁÐģʽ£¨queuing£©ºÍ·¢²¼-¶©ÔÄģʽ(publish-subscribe)¡£¶ÓÁÐģʽÖУ¬consumers¿ÉÒÔͬʱ´Ó·þÎñ¶Ë¶ÁÈ¡ÏûÏ¢£¬Ã¿¸öÏûÏ¢Ö»±»ÆäÖÐÒ»¸öconsumer¶Áµ½£»·¢²¼-¶©ÔÄģʽÖÐÏûÏ¢±»¹ã²¥µ½ËùÓеÄconsumerÖС£Consumers¿ÉÒÔ¼ÓÈëÒ»¸öconsumer ×飬¹²Í¬¾ºÕùÒ»¸ötopic£¬topicÖеÄÏûÏ¢½«±»·Ö·¢µ½×éÖеÄÒ»¸ö³ÉÔ±ÖС£Í¬Ò»×éÖеÄconsumer¿ÉÒÔÔÚ²»Í¬µÄ³ÌÐòÖУ¬Ò²¿ÉÒÔÔÚ²»Í¬µÄ»úÆ÷ÉÏ¡£Èç¹ûËùÓеÄconsumer¶¼ÔÚÒ»¸ö×éÖУ¬Õâ¾Í³ÉΪÁË´«Í³µÄ¶ÓÁÐģʽ£¬ÔÚ¸÷consumerÖÐʵÏÖ¸ºÔؾùºâ¡£Èç¹ûËùÓеÄconsumer¶¼²»ÔÚ²»Í¬µÄ×éÖУ¬Õâ¾Í³ÉΪÁË·¢²¼-¶©ÔÄģʽ£¬ËùÓеÄÏûÏ¢¶¼±»·Ö·¢µ½ËùÓеÄconsumerÖС£¸ü³£¼ûµÄÊÇ£¬Ã¿¸ötopic¶¼ÓÐÈô¸ÉÊýÁ¿µÄconsumer×飬ÿ¸ö×é¶¼ÊÇÒ»¸öÂß¼­Éϵġ°¶©ÔÄÕß¡±£¬ÎªÁËÈÝ´íºÍ¸üºÃµÄÎȶ¨ÐÔ£¬Ã¿¸ö×éÓÉÈô¸Éconsumer×é³É¡£ÕâÆäʵ¾ÍÊÇÒ»¸ö·¢²¼-¶©ÔÄģʽ£¬Ö»²»¹ý¶©ÔÄÕßÊǸö×é¶ø²»Êǵ¥¸öconsumer¡£

ÓÉÁ½¸ö»úÆ÷×é³ÉµÄ¼¯ÈºÓµÓÐ4¸ö·ÖÇø (P0-P3) 2¸öconsumer×é. A×éÓÐÁ½¸öconsumerB×éÓÐ4¸ö

Ïà±È´«Í³µÄÏûϢϵͳ£¬Kafka¿ÉÒԺܺõı£Ö¤ÓÐÐòÐÔ¡£

´«Í³µÄ¶ÓÁÐÔÚ·þÎñÆ÷Éϱ£´æÓÐÐòµÄÏûÏ¢£¬Èç¹û¶à¸öconsumersͬʱ´ÓÕâ¸ö·þÎñÆ÷Ïû·ÑÏûÏ¢£¬·þÎñÆ÷¾Í»áÒÔÏûÏ¢´æ´¢µÄ˳ÐòÏòconsumer·Ö·¢ÏûÏ¢¡£ËäÈ»·þÎñÆ÷°´Ë³Ðò·¢²¼ÏûÏ¢£¬µ«ÊÇÏûÏ¢ÊDZ»Òì²½µÄ·Ö·¢µ½¸÷consumerÉÏ£¬ËùÒÔµ±ÏûÏ¢µ½´ïʱ¿ÉÄÜÒѾ­Ê§È¥ÁËÔ­À´µÄ˳Ðò£¬ÕâÒâζ×Ų¢·¢Ïû·Ñ½«µ¼ÖÂ˳Ðò´íÂÒ¡£ÎªÁ˱ÜÃâ¹ÊÕÏ£¬ÕâÑùµÄÏûϢϵͳͨ³£Ê¹Óá°×¨ÓÃconsumer¡±µÄ¸ÅÄÆäʵ¾ÍÊÇÖ»ÔÊÐíÒ»¸öÏû·ÑÕßÏû·ÑÏûÏ¢£¬µ±È»Õâ¾ÍÒâζ×ÅʧȥÁ˲¢·¢ÐÔ¡£

ÔÚÕâ·½ÃæKafka×öµÄ¸üºÃ£¬Í¨¹ý·ÖÇøµÄ¸ÅÄKafka¿ÉÒÔÔÚ¶à¸öconsumer×é²¢·¢µÄÇé¿öÏÂÌṩ½ÏºÃµÄÓÐÐòÐԺ͸ºÔؾùºâ¡£½«Ã¿¸ö·ÖÇø·ÖÖ»·Ö·¢¸øÒ»¸öconsumer×飬ÕâÑùÒ»¸ö·ÖÇø¾ÍÖ»±»Õâ¸ö×éµÄÒ»¸öconsumerÏû·Ñ£¬¾Í¿ÉÒÔ˳ÐòµÄÏû·ÑÕâ¸ö·ÖÇøµÄÏûÏ¢¡£ÒòΪÓжà¸ö·ÖÇø£¬ÒÀÈ»¿ÉÒÔÔÚ¶à¸öconsumer×éÖ®¼ä½øÐиºÔؾùºâ¡£×¢Òâconsumer×éµÄÊýÁ¿²»ÄܶàÓÚ·ÖÇøµÄÊýÁ¿£¬Ò²¾ÍÊÇÓжàÉÙ·ÖÇø¾ÍÔÊÐí¶àÉÙ²¢·¢Ïû·Ñ¡£

KafkaÖ»Äܱ£Ö¤Ò»¸ö·ÖÇøÖ®ÄÚÏûÏ¢µÄÓÐÐòÐÔ£¬ÔÚ²»Í¬µÄ·ÖÇøÖ®¼äÊDz»¿ÉÒԵģ¬ÕâÒѾ­¿ÉÒÔÂú×ã´ó²¿·ÖÓ¦ÓõÄÐèÇó¡£Èç¹ûÐèÒªtopicÖÐËùÓÐÏûÏ¢µÄÓÐÐòÐÔ£¬ÄǾÍÖ»ÄÜÈÃÕâ¸ötopicÖ»ÓÐÒ»¸ö·ÖÇø£¬µ±È»Ò²¾ÍÖ»ÓÐÒ»¸öconsumer×éÏû·ÑËü¡£

###########################################

¶þ¡¢»·¾³´î½¨

Step 1: ÏÂÔØKafka

> tar -xzf kafka_2.9.2-0.8.1.1.tgz
> cd kafka_2.9.2-0.8.1.1

Step 2: Æô¶¯·þÎñ

KafkaÓõ½ÁËZookeeper£¬ËùÓÐÊ×ÏÈÆô¶¯Zookper£¬ÏÂÃæ¼òµ¥µÄÆôÓÃÒ»¸öµ¥ÊµÀýµÄZookkeeper·þÎñ¡£¿ÉÒÔÔÚÃüÁîµÄ½áβ¼Ó¸ö&·ûºÅ£¬ÕâÑù¾Í¿ÉÒÔÆô¶¯ºóÀ뿪¿ØÖÆÌ¨¡£

> bin/zookeeper-server-start.sh config/zookeeper.properties &
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

ÏÖÔÚÆô¶¯Kafka:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: ´´½¨ topic

´´½¨Ò»¸ö½Ð×ö¡°test¡±µÄtopic£¬ËüÖ»ÓÐÒ»¸ö·ÖÇø£¬Ò»¸ö¸±±¾¡£

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

¿ÉÒÔͨ¹ýlistÃüÁî²é¿´´´½¨µÄtopic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

³ýÁËÊÖ¶¯´´½¨topic£¬»¹¿ÉÒÔÅäÖÃbrokerÈÃËü×Ô¶¯´´½¨topic.

Step 4:·¢ËÍÏûÏ¢.

Kafka ʹÓÃÒ»¸ö¼òµ¥µÄÃüÁîÐÐproducer£¬´ÓÎļþÖлòÕß´Ó±ê×¼ÊäÈëÖжÁÈ¡ÏûÏ¢²¢·¢Ë͵½·þÎñ¶Ë¡£Ä¬ÈϵÄÿÌõÃüÁ·¢ËÍÒ»ÌõÏûÏ¢¡£

ÔËÐÐproducer²¢ÔÚ¿ØÖÆÌ¨ÖÐÊäһЩÏûÏ¢£¬ÕâЩÏûÏ¢½«±»·¢Ë͵½·þÎñ¶Ë£º

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message

ctrl+c¿ÉÒÔÍ˳ö·¢ËÍ¡£

Step 5: Æô¶¯consumer

Kafka also has a command line consumer that will dump out messages to standard output.

KafkaÒ²ÓÐÒ»¸öÃüÁîÐÐconsumer¿ÉÒÔ¶ÁÈ¡ÏûÏ¢²¢Êä³öµ½±ê×¼Êä³ö£º

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

ÄãÔÚÒ»¸öÖÕ¶ËÖÐÔËÐÐconsumerÃüÁîÐУ¬ÁíÒ»¸öÖÕ¶ËÖÐÔËÐÐproducerÃüÁîÐУ¬¾Í¿ÉÒÔÔÚÒ»¸öÖÕ¶ËÊäÈëÏûÏ¢£¬ÁíÒ»¸öÖն˶ÁÈ¡ÏûÏ¢¡£

ÕâÁ½¸öÃüÁî¶¼ÓÐ×Ô¼ºµÄ¿ÉÑ¡²ÎÊý£¬¿ÉÒÔÔÚÔËÐеÄʱºò²»¼ÓÈκβÎÊý¿ÉÒÔ¿´µ½°ïÖúÐÅÏ¢¡£

Step 6: ´î½¨Ò»¸ö¶à¸öbrokerµÄ¼¯Èº

¸Õ²ÅÖ»ÊÇÆô¶¯Á˵¥¸öbroker£¬ÏÖÔÚÆô¶¯ÓÐ3¸öbroker×é³ÉµÄ¼¯Èº£¬ÕâЩbroker½ÚµãÒ²¶¼ÊÇÔÚ±¾»úÉϵģº

Ê×ÏÈΪÿ¸ö½Úµã±àдÅäÖÃÎļþ£º

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

ÔÚ¿½±´³öµÄÐÂÎļþÖÐÌí¼ÓÒÔϲÎÊý£º

config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

broker.idÔÚ¼¯ÈºÖÐΨһµÄ±ê×¢Ò»¸ö½Úµã£¬ÒòΪÔÚͬһ¸ö»úÆ÷ÉÏ£¬ËùÒÔ±ØÐëÖÆ¶¨²»Í¬µÄ¶Ë¿ÚºÍÈÕÖ¾Îļþ£¬±ÜÃâÊý¾Ý±»¸²¸Ç¡£

We already have Zookeeper and our single node started, so we just need to start the two new nodes:

¸Õ²ÅÒѾ­Æô¶¯¿ÉZookeeperºÍÒ»¸ö½Úµã£¬ÏÖÔÚÆô¶¯ÁíÍâÁ½¸ö½Úµã£º

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

´´½¨Ò»¸öÓµÓÐ3¸ö¸±±¾µÄtopic:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

ÏÖÔÚÎÒÃǴÁËÒ»¸ö¼¯Èº£¬Ôõô֪µÀÿ¸ö½ÚµãµÄÐÅÏ¢ÄØ£¿ÔËÐС°"describe topics¡±ÃüÁî¾Í¿ÉÒÔÁË£º

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

ÏÂÃæ½âÊÍÒ»ÏÂÕâЩÊä³ö¡£µÚÒ»ÐÐÊǶÔËùÓзÖÇøµÄÒ»¸öÃèÊö£¬È»ºóÿ¸ö·ÖÇø¶¼»á¶ÔÓ¦Ò»ÐУ¬ÒòΪÎÒÃÇÖ»ÓÐÒ»¸ö·ÖÇøËùÒÔÏÂÃæ¾ÍÖ»¼ÓÁËÒ»ÐС£

leader£º¸ºÔð´¦ÀíÏûÏ¢µÄ¶ÁºÍд£¬leaderÊÇ´ÓËùÓнڵãÖÐËæ»úÑ¡ÔñµÄ.

replicas£ºÁгöÁËËùÓеĸ±±¾½Úµã£¬²»¹Ü½ÚµãÊÇ·ñÔÚ·þÎñÖÐ.

isr£ºÊÇÕýÔÚ·þÎñÖеĽڵã.

ÔÚÎÒÃǵÄÀý×ÓÖУ¬½Úµã1ÊÇ×÷ΪleaderÔËÐС£

Ïòtopic·¢ËÍÏûÏ¢£º

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...
my test message 1my test message 2^C

Ïû·ÑÕâЩÏûÏ¢£º

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

...
my test message 1
my test message 2
^C

²âÊÔÒ»ÏÂÈÝ´íÄÜÁ¦.Broker 1×÷ΪleaderÔËÐУ¬ÏÖÔÚÎÒÃÇkillµôËü£º

> ps | grep server-1.properties7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564

ÁíÍâÒ»¸ö½Úµã±»Ñ¡×öÁËleader,node 1 ²»ÔÙ³öÏÖÔÚ in-sync ¸±±¾ÁбíÖУº

> bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

ËäÈ»×î³õ¸ºÔðÐøÐ´ÏûÏ¢µÄleader downµôÁË£¬µ«Ö®Ç°µÄÏûÏ¢»¹ÊÇ¿ÉÒÔÏû·ÑµÄ£º

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2

¿´À´KafkaµÄÈÝ´í»úÖÆ»¹ÊDz»´íµÄ¡£

################################################

Èý¡¢´î½¨Kafka¿ª·¢»·¾³

ÎÒÃǴÁËkafkaµÄ·þÎñÆ÷£¬²¢¿ÉÒÔʹÓÃKafkaµÄÃüÁîÐй¤¾ß´´½¨topic£¬·¢ËͺͽÓÊÕÏûÏ¢¡£ÏÂÃæÎÒÃÇÀ´´î½¨kafkaµÄ¿ª·¢»·¾³¡£

Ìí¼ÓÒÀÀµ

´î½¨¿ª·¢»·¾³ÐèÒªÒýÈëkafkaµÄjar°ü£¬Ò»ÖÖ·½Ê½Êǽ«Kafka°²×°°üÖÐlibϵÄjar°ü¼ÓÈëµ½ÏîÄ¿µÄclasspathÖУ¬ÕâÖֱȽϼòµ¥ÁË¡£²»¹ýÎÒÃÇʹÓÃÁíÒ»ÖÖ¸ü¼ÓÁ÷Ðеķ½Ê½£ºÊ¹ÓÃmaven¹ÜÀíjar°üÒÀÀµ¡£

´´½¨ºÃmavenÏîÄ¿ºó£¬ÔÚpom.xmlÖÐÌí¼ÓÒÔÏÂÒÀÀµ£º

<dependency>
<groupId> org.apache.kafka</groupId >
<artifactId> kafka_2.10</artifactId >
<version> 0.8.0</ version>
</dependency>

Ìí¼ÓÒÀÀµºóÄã»á·¢ÏÖÓÐÁ½¸öjar°üµÄÒÀÀµÕÒ²»µ½¡£Ã»¹ØÏµÎÒ¶¼°ïÄãÏëºÃÁË£¬µã»÷ÕâÀïÏÂÔØÕâÁ½¸öjar°ü£¬½âѹºóÄãÓÐÁ½ÖÖÑ¡Ôñ£¬µÚÒ»ÖÖÊÇʹÓÃmvnµÄinstallÃüÁjar°ü°²×°µ½±¾µØ²Ö¿â£¬ÁíÒ»ÖÖÊÇÖ±½Ó½«½âѹºóµÄÎļþ¼Ð¿½±´µ½mvn±¾µØ²Ö¿âµÄcomÎļþ¼ÐÏ£¬±ÈÈçÎҵı¾µØ²Ö¿âÊÇd:\mvn,Íê³ÉºóÎÒµÄĿ¼½á¹¹ÊÇÕâÑùµÄ£º

ÅäÖóÌÐò

Ê×ÏÈÊÇÒ»¸ö³äµ±ÅäÖÃÎļþ×÷ÓõĽӿÚ,ÅäÖÃÁËKafkaµÄ¸÷ÖÖÁ¬½Ó²ÎÊý£º

package com.sohu.kafkademon;
public interface KafkaProperties
{
final static String zkConnect = "10.22.10.139:2181";
final static String groupId = "group1";
final static String topic = "topic1";
final static String kafkaServerURL = "10.22.10.139";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
final static String topic2 = "topic2";
final static String topic3 = "topic3";
final static String clientId = "SimpleConsumerDemoClient";
}

producer

package com.sohu.kafkademon;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaProducer extends Thread
{
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public KafkaProducer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "10.22.10.139:9092");
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
@Override
public void run() {
int messageNo = 1;
while (true)
{
String messageStr = new String("Message_" + messageNo);
System.out.println("Send:" + messageStr);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

 

consumer

package com.sohu.kafkademon;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumer extends Thread
{
private final ConsumerConnector consumer;
private final String topic;
public KafkaConsumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig()
{
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive£º" + new String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

¼òµ¥µÄ·¢ËͽÓÊÕ

ÔËÐÐÏÂÃæÕâ¸ö³ÌÐò£¬¾Í¿ÉÒÔ½øÐмòµ¥µÄ·¢ËͽÓÊÕÏûÏ¢ÁË£º

package com.sohu.kafkademon;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumerProducerDemo
{
public static void main(String[] args)
{
KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
producerThread.start();
KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}

¸ß¼¶±ðµÄconsumer

ÏÂÃæÊDZȽϸºÔصķ¢ËͽÓÊյijÌÐò£º

package com.sohu.kafkademon;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumer extends Thread
{
private final ConsumerConnector consumer;
private final String topic;
public KafkaConsumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig()
{
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive£º" + new String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

############################################################

ËÄ¡¢Êý¾Ý³Ö¾Ã»¯

²»ÒªÎ·¾åÎļþϵͳ!

Kafka´óÁ¿ÒÀÀµÎļþϵͳȥ´æ´¢ºÍ»º´æÏûÏ¢¡£¶ÔÓÚÓ²ÅÌÓиö´«Í³µÄ¹ÛÄîÊÇÓ²ÅÌ×ÜÊǺÜÂý£¬ÕâʹºÜ¶àÈË»³ÒÉ»ùÓÚÎļþϵͳµÄ¼Ü¹¹ÄÜ·ñÌṩÓÅÒìµÄÐÔÄÜ¡£Êµ¼ÊÉÏÓ²Å̵ĿìÂýÍêȫȡ¾öÓÚʹÓÃËüµÄ·½Ê½¡£Éè¼ÆÁ¼ºÃµÄÓ²Å̼ܹ¹¿ÉÒÔºÍÄÚ´æÒ»Ñù¿ì¡£

ÔÚ6¿é7200תµÄSATA RAID-5´ÅÅÌÕóÁеÄÏßÐÔдËٶȲ¶àÊÇ600MB/s£¬µ«ÊÇËæ¼´Ð´µÄËÙ¶ÈÈ´ÊÇ100k/s£¬²îÁ˲¶à6000±¶¡£ÏÖ´úµÄ²Ù×÷ϵͳ¶¼¶Ô´Î×öÁË´óÁ¿µÄÓÅ»¯£¬Ê¹ÓÃÁË read-ahead ºÍ write-behindµÄ¼¼ÇÉ£¬¶ÁÈ¡µÄʱºò³É¿éµÄÔ¤¶ÁÈ¡Êý¾Ý£¬Ð´µÄʱºò½«¸÷ÖÖ΢СËöËéµÄÂß¼­Ð´Èë×éÖ¯ºÏ²¢³ÉÒ»´Î½Ï´óµÄÎïÀíдÈë¡£¶Ô´ËµÄÉîÈëÌÖÂÛ¿ÉÒԲ鿴ÕâÀËüÃÇ·¢ÏÖÏßÐԵķÃÎÊ´ÅÅÌ£¬ºÜ¶àʱºò±ÈËæ»úµÄÄÚ´æ·ÃÎÊ¿ìµÃ¶à¡£

ΪÁËÌá¸ßÐÔÄÜ£¬ÏÖ´ú²Ù×÷ϵͳÍùÍùʹÓÃÄÚ´æ×÷Ϊ´ÅÅ̵Ļº´æ£¬ÏÖ´ú²Ù×÷ϵͳÀÖÓÚ°ÑËùÓпÕÏÐÄÚ´æÓÃ×÷´ÅÅÌ»º´æ£¬ËäÈ»Õâ¿ÉÄÜÔÚ»º´æ»ØÊÕºÍÖØÐ·ÖÅäʱÎþÉüһЩÐÔÄÜ¡£ËùÓеĴÅÅ̶Áд²Ù×÷¶¼»á¾­¹ýÕâ¸ö»º´æ£¬Õⲻ̫¿ÉÄܱ»ÈÆ¿ª³ý·ÇÖ±½ÓʹÓÃI/O¡£ËùÒÔËäȻÿ¸ö³ÌÐò¶¼ÔÚ×Ô¼ºµÄÏß³ÌÀïÖ»»º´æÁËÒ»·ÝÊý¾Ý£¬µ«ÔÚ²Ù×÷ϵͳµÄ»º´æÀﻹÓÐÒ»·Ý£¬ÕâµÈÓÚ´æÁËÁ½·ÝÊý¾Ý¡£

ÁíÍâÔÙÀ´ÌÖÂÛÒ»ÏÂJVM,ÒÔÏÂÁ½¸öÊÂʵÊÇÖÚËùÖÜÖªµÄ£º

Java¶ÔÏóÕ¼ÓÿռäÊǷdz£´óµÄ£¬²î²»¶àÊÇÒª´æ´¢µÄÊý¾ÝµÄÁ½±¶ÉõÖÁ¸ü¸ß¡£

Ëæ×ŶÑÖÐÊý¾ÝÁ¿µÄÔö¼Ó£¬À¬»ø»ØÊջرäµÄÔ½À´Ô½À§ÄÑ¡£

»ùÓÚÒÔÉÏ·ÖÎö£¬Èç¹û°ÑÊý¾Ý»º´æÔÚÄÚ´æÀÒòΪÐèÒª´æ´¢Á½·Ý£¬²»µÃ²»Ê¹ÓÃÁ½±¶µÄÄÚ´æ¿Õ¼ä£¬Kafka»ùÓÚJVM£¬ÓÖ²»µÃ²»½«¿Õ¼äÔٴμӱ¶,ÔÙ¼ÓÉÏÒª±ÜÃâGC´øÀ´µÄÐÔÄÜÓ°Ï죬ÔÚÒ»¸ö32GÄÚ´æµÄ»úÆ÷ÉÏ£¬²»µÃ²»Ê¹Óõ½28-30GµÄÄÚ´æ¿Õ¼ä¡£²¢ÇÒµ±ÏµÍ³ÖØÆôµÄʱºò£¬ÓÖ±ØÐëÒª½«Êý¾ÝË¢µ½ÄÚ´æÖУ¨ 10GB ÄÚ´æ²î²»¶àÒªÓÃ10·ÖÖÓ£©£¬¾ÍËãʹÓÃÀäˢУ¨²»ÊÇÒ»´ÎÐÔË¢½øÄڴ棬¶øÊÇÔÚʹÓÃÊý¾ÝµÄʱºòûÓоÍË¢µ½Äڴ棩Ҳ»áµ¼ÖÂ×î³õµÄʱºòÐÂÄܷdz£Âý¡£µ«ÊÇʹÓÃÎļþϵͳ£¬¼´Ê¹ÏµÍ³ÖØÆôÁË£¬Ò²²»ÐèҪˢÐÂÊý¾Ý¡£Ê¹ÓÃÎļþϵͳҲ¼ò»¯ÁËά»¤Êý¾ÝÒ»ÖÂÐÔµÄÂß¼­¡£

ËùÒÔÓ봫ͳµÄ½«Êý¾Ý»º´æÔÚÄÚ´æÖÐÈ»ºóË¢µ½Ó²Å̵ÄÉè¼Æ²»Í¬£¬KafkaÖ±½Ó½«Êý¾Ýдµ½ÁËÎļþϵͳµÄÈÕÖ¾ÖС£

³£Á¿Ê±¼äµÄ²Ù×÷ЧÂÊ

ÔÚ´ó¶àÊýµÄÏûϢϵͳÖУ¬Êý¾Ý³Ö¾Ã»¯µÄ»úÖÆÍùÍùÊÇΪÿ¸öcosumerÌṩһ¸öBÊ÷»òÕ߯äËûµÄËæ»ú¶ÁдµÄÊý¾Ý½á¹¹¡£BÊ÷µ±È»ÊǺܰôµÄ£¬µ«ÊÇÒ²´øÁËһЩ´ú¼Û£º±ÈÈçBÊ÷µÄ¸´ÔÓ¶ÈÊÇO(log N)£¬O(log N)ͨ³£±»ÈÏΪ¾ÍÊdz£Á¿¸´ÔÓ¶ÈÁË£¬µ«¶ÔÓÚÓ²Å̲Ù×÷À´Ëµ²¢·ÇÈç´Ë¡£´ÅÅ̽øÐÐÒ»´ÎËÑË÷ÐèÒª10ms£¬Ã¿¸öÓ²ÅÌÔÚͬһʱ¼äÖ»ÄܽøÐÐÒ»´ÎËÑË÷£¬ÕâÑù²¢·¢´¦Àí¾Í³ÉÁËÎÊÌâ¡£ËäÈ»´æ´¢ÏµÍ³Ê¹Óûº´æ½øÐÐÁË´óÁ¿ÓÅ»¯£¬µ«ÊǶÔÓÚÊ÷½á¹¹µÄÐÔÄܵĹ۲ì½á¹ûÈ´±íÃ÷£¬ËüµÄÐÔÄÜÍùÍùËæ×ÅÊý¾ÝµÄÔö³¤¶øÏßÐÔϽµ£¬Êý¾ÝÔö³¤Ò»±¶£¬ËٶȾͻήµÍÒ»±¶¡£

Ö±¹ÛµÄ½²£¬¶ÔÓÚÖ÷ÒªÓÃÓÚÈÕÖ¾´¦ÀíµÄÏûϢϵͳ£¬Êý¾ÝµÄ³Ö¾Ã»¯¿ÉÒÔ¼òµ¥µÄͨ¹ý½«Êý¾Ý×·¼Óµ½ÎļþÖÐʵÏÖ£¬¶ÁµÄʱºò´ÓÎļþÖжÁ¾ÍºÃÁË¡£ÕâÑù×öµÄºÃ´¦ÊǶÁºÍд¶¼ÊÇ O(1) µÄ£¬²¢ÇÒ¶Á²Ù×÷²»»á×èÈûд²Ù×÷ºÍÆäËû²Ù×÷¡£ÕâÑù´øÀ´µÄÐÔÄÜÓÅÊÆÊǺÜÃ÷ÏԵģ¬ÒòΪÐÔÄܺÍÊý¾ÝµÄ´óСûÓйØÏµÁË¡£

¼ÈÈ»¿ÉÒÔʹÓü¸ºõûÓÐÈÝÁ¿ÏÞÖÆ£¨Ïà¶ÔÓÚÄÚ´æÀ´Ëµ£©µÄÓ²Å̿ռ佨Á¢ÏûϢϵͳ£¬¾Í¿ÉÒÔÔÚûÓÐÐÔÄÜËðʧµÄÇé¿öÏÂÌṩһЩһ°ãÏûϢϵͳ²»¾ß±¸µÄÌØÐÔ¡£±ÈÈ磬һ°ãµÄÏûϢϵͳ¶¼ÊÇÔÚÏûÏ¢±»Ïû·ÑºóÁ¢¼´É¾³ý£¬KafkaÈ´¿ÉÒÔ½«ÏûÏ¢±£´æÒ»¶Îʱ¼ä£¨±ÈÈçÒ»ÐÇÆÚ£©£¬Õâ¸øconsumerÌṩÁ˺ܺõĻú¶¯ÐÔºÍÁé»îÐÔ£¬ÕâµãÔÚ½ñºóµÄÎÄÕÂÖлáÓÐÏêÊö¡£

############################################################

Îå¡¢ÏûÏ¢´«ÊäµÄÊÂÎñ¶¨Òå

֮ǰÌÖÂÛÁËconsumerºÍproducerÊÇÔõô¹¤×÷µÄ£¬ÏÖÔÚÀ´ÌÖÂÛÒ»ÏÂÊý¾Ý´«Êä·½Ãæ¡£Êý¾Ý´«ÊäµÄÊÂÎñ¶¨Òåͨ³£ÓÐÒÔÏÂÈýÖÖ¼¶±ð£º

×î¶àÒ»´Î: ÏûÏ¢²»»á±»Öظ´·¢ËÍ£¬×î¶à±»´«ÊäÒ»´Î£¬µ«Ò²ÓпÉÄÜÒ»´Î²»´«Êä¡£

×îÉÙÒ»´Î: ÏûÏ¢²»»á±»Â©·¢ËÍ£¬×îÉÙ±»´«ÊäÒ»´Î£¬µ«Ò²ÓпÉÄܱ»Öظ´´«Êä.

¾«È·µÄÒ»´Î£¨Exactly once£©: ²»»á©´«ÊäÒ²²»»áÖØ¸´´«Êä,ÿ¸öÏûÏ¢¶¼´«Êä±»Ò»´Î¶øÇÒ½ö½ö±»´«ÊäÒ»´Î£¬ÕâÊÇ´ó¼ÒËùÆÚÍûµÄ¡£

´ó¶àÊýÏûϢϵͳÉù³Æ¿ÉÒÔ×öµ½¡°¾«È·µÄÒ»´Î¡±£¬µ«ÊÇ×ÐϸÔĶÁËüÃǵĵÄÎĵµ¿ÉÒÔ¿´µ½ÀïÃæ´æÔÚÎóµ¼£¬±ÈÈçûÓÐ˵Ã÷µ±consumer»òproducerʧ°ÜʱÔõôÑù£¬»òÕßµ±Óжà¸öconsumer²¢ÐÐʱÔõôÑù£¬»òдÈëÓ²Å̵ÄÊý¾Ý¶ªÊ§Ê±ÓÖ»áÔõôÑù¡£kafkaµÄ×ö·¨Òª¸üÏȽøÒ»Ð©¡£µ±·¢²¼ÏûϢʱ£¬KafkaÓÐÒ»¸ö¡°committed¡±µÄ¸ÅÄһµ©ÏûÏ¢±»Ìá½»ÁË£¬Ö»ÒªÏûÏ¢±»Ð´ÈëµÄ·ÖÇøµÄËùÔڵĸ±±¾brokerÊǻµÄ£¬Êý¾Ý¾Í²»»á¶ªÊ§¡£¹ØÓÚ¸±±¾µÄ»î¶¯µÄ¸ÅÄϽÚÎĵµ»áÌÖÂÛ¡£ÏÖÔÚ¼ÙÉèbrokerÊDz»»ádownµÄ¡£

Èç¹ûproducer·¢²¼ÏûϢʱ·¢ÉúÁËÍøÂç´íÎ󣬵«ÓÖ²»È·¶¨ÊµÔÚÌύ֮ǰ·¢ÉúµÄ»¹ÊÇÌá½»Ö®ºó·¢ÉúµÄ£¬ÕâÖÖÇé¿öËäÈ»²»³£¼û£¬µ«ÊDZØÐ뿼ÂǽøÈ¥£¬ÏÖÔÚKafka°æ±¾»¹Ã»Óнâ¾öÕâ¸öÎÊÌ⣬½«À´µÄ°æ±¾ÕýÔÚŬÁ¦³¢ÊÔ½â¾ö¡£

²¢²»ÊÇËùÓеÄÇé¿ö¶¼ÐèÒª¡°¾«È·µÄÒ»´Î¡±ÕâÑù¸ßµÄ¼¶±ð£¬KafkaÔÊÐíproducerÁé»îµÄÖ¸¶¨¼¶±ð¡£±ÈÈçproducer¿ÉÒÔÖ¸¶¨±ØÐëµÈ´ýÏûÏ¢±»Ìá½»µÄ֪ͨ£¬»òÕßÍêÈ«µÄÒì²½·¢ËÍÏûÏ¢¶ø²»µÈ´ýÈκÎ֪ͨ£¬»òÕß½ö½öµÈ´ýleaderÉùÃ÷ËüÄõ½ÁËÏûÏ¢£¨followersûÓбØÒª£©¡£

ÏÖÔÚ´ÓconsumerµÄ·½Ã濼ÂÇÕâ¸öÎÊÌ⣬ËùÓеĸ±±¾¶¼ÓÐÏàͬµÄÈÕÖ¾ÎļþºÍÏàͬµÄoffset£¬consumerά»¤×Ô¼ºÏû·ÑµÄÏûÏ¢µÄoffset£¬Èç¹ûconsumer²»»á±ÀÀ£µ±È»¿ÉÒÔÔÚÄÚ´æÖб£´æÕâ¸öÖµ£¬µ±È»Ë­Ò²²»Äܱ£Ö¤Õâµã¡£Èç¹ûconsumer±ÀÀ£ÁË£¬»áÓÐÁíÍâÒ»¸öconsumer½Ó×ÅÏû·ÑÏûÏ¢£¬ËüÐèÒª´ÓÒ»¸öºÏÊʵÄoffset¼ÌÐø´¦Àí¡£ÕâÖÖÇé¿öÏ¿ÉÒÔÓÐÒÔÏÂÑ¡Ôñ£º

consumer¿ÉÒÔÏȶÁÈ¡ÏûÏ¢£¬È»ºó½«offsetдÈëÈÕÖ¾ÎļþÖУ¬È»ºóÔÙ´¦ÀíÏûÏ¢¡£Õâ´æÔÚÒ»ÖÖ¿ÉÄܾÍÊÇÔÚ´æ´¢offsetºó»¹Ã»´¦ÀíÏûÏ¢¾ÍcrashÁË£¬ÐµÄconsumer¼ÌÐø´ÓÕâ¸öoffset´¦Àí£¬ÄÇô¾Í»áÓÐЩÏûÏ¢ÓÀÔ¶²»»á±»´¦Àí£¬Õâ¾ÍÊÇÉÏÃæËµµÄ¡°×î¶àÒ»´Î¡±¡£

consumer¿ÉÒÔÏȶÁÈ¡ÏûÏ¢£¬´¦ÀíÏûÏ¢£¬×îºó¼Ç¼offset£¬µ±È»Èç¹ûÔڼǼoffset֮ǰ¾ÍcrashÁË£¬ÐµÄconsumer»áÖØ¸´µÄÏû·ÑһЩÏûÏ¢£¬Õâ¾ÍÊÇÉÏÃæËµµÄ¡°×îÉÙÒ»´Î¡±¡£

¡°¾«È·Ò»´Î¡±¿ÉÒÔͨ¹ý½«Ìá½»·ÖΪÁ½¸ö½×¶ÎÀ´½â¾ö£º±£´æÁËoffsetºóÌá½»Ò»´Î£¬ÏûÏ¢´¦Àí³É¹¦Ö®ºóÔÙÌá½»Ò»´Î¡£µ«ÊÇ»¹Óиö¸ü¼òµ¥µÄ×ö·¨£º½«ÏûÏ¢µÄoffsetºÍÏûÏ¢±»´¦ÀíºóµÄ½á¹û±£´æÔÚÒ»Æð¡£±ÈÈçÓÃHadoop ETL´¦ÀíÏûϢʱ£¬½«´¦ÀíºóµÄ½á¹ûºÍoffsetͬʱ±£´æÔÚHDFSÖУ¬ÕâÑù¾ÍÄܱ£Ö¤ÏûÏ¢ºÍoffserͬʱ±»´¦ÀíÁË¡£

############################################################

Áù¡¢ÐÔÄÜÓÅ»¯

KafkaÔÚÌá¸ßЧÂÊ·½Ãæ×öÁ˺ܴóŬÁ¦¡£KafkaµÄÒ»¸öÖ÷ҪʹÓó¡¾°ÊÇ´¦ÀíÍøÕ¾»î¶¯ÈÕÖ¾£¬ÍÌÍÂÁ¿ÊǷdz£´óµÄ£¬Ã¿¸öÒ³Ãæ¶¼»á²úÉúºÃ¶à´Îд²Ù×÷¡£¶Á·½Ã棬¼ÙÉèÿ¸öÏûÏ¢Ö»±»Ïû·ÑÒ»´Î£¬¶ÁµÄÁ¿µÄÒ²ÊǺܴóµÄ£¬KafkaÒ²¾¡Á¿Ê¹¶ÁµÄ²Ù×÷¸üÇáÁ¿»¯¡£

ÎÒÃÇ֮ǰÌÖÂÛÁË´ÅÅ̵ÄÐÔÄÜÎÊÌ⣬ÏßÐÔ¶ÁдµÄÇé¿öÏÂÓ°Ïì´ÅÅÌÐÔÄÜÎÊÌâ´óÔ¼ÓÐÁ½¸ö·½Ã棺̫¶àµÄËöËéµÄI/O²Ù×÷ºÍÌ«¶àµÄ×Ö½Ú¿½±´¡£I/OÎÊÌâ·¢ÉúÔÚ¿Í»§¶ËºÍ·þÎñ¶ËÖ®¼ä£¬Ò²·¢ÉúÔÚ·þÎñ¶ËÄÚ²¿µÄ³Ö¾Ã»¯µÄ²Ù×÷ÖС£

ÏûÏ¢¼¯£¨message set£©

ΪÁ˱ÜÃâÕâЩÎÊÌ⣬Kafka½¨Á¢ÁË¡°ÏûÏ¢¼¯£¨message set£©¡±µÄ¸ÅÄ½«ÏûÏ¢×éÖ¯µ½Ò»Æð£¬×÷Ϊ´¦ÀíµÄµ¥Î»¡£ÒÔÏûÏ¢¼¯Îªµ¥Î»´¦ÀíÏûÏ¢£¬±ÈÒÔµ¥¸öµÄÏûϢΪµ¥Î»´¦Àí£¬»áÌáÉý²»ÉÙÐÔÄÜ¡£Producer°ÑÏûÏ¢¼¯Ò»¿é·¢Ë͸ø·þÎñ¶Ë£¬¶ø²»ÊÇÒ»ÌõÌõµÄ·¢ËÍ£»·þÎñ¶Ë°ÑÏûÏ¢¼¯Ò»´ÎÐÔµÄ×·¼Óµ½ÈÕÖ¾ÎļþÖУ¬ÕâÑù¼õÉÙÁËËöËéµÄI/O²Ù×÷¡£consumerÒ²¿ÉÒÔÒ»´ÎÐÔµÄÇëÇóÒ»¸öÏûÏ¢¼¯¡£

ÁíÍâÒ»¸öÐÔÄÜÓÅ»¯ÊÇÔÚ×Ö½Ú¿½±´·½Ãæ¡£Ôڵ͸ºÔصÄÇé¿öÏÂÕâ²»ÊÇÎÊÌ⣬µ«ÊÇÔڸ߸ºÔصÄÇé¿öÏÂËüµÄÓ°Ï컹ÊǺܴóµÄ¡£ÎªÁ˱ÜÃâÕâ¸öÎÊÌ⣬KafkaʹÓÃÁ˱ê×¼µÄ¶þ½øÖÆÏûÏ¢¸ñʽ£¬Õâ¸ö¸ñʽ¿ÉÒÔÔÚproducer,brokerºÍproducerÖ®¼ä¹²Ïí¶øÎÞÐè×öÈκθ͝¡£

zero copy

Brokerά»¤µÄÏûÏ¢ÈÕÖ¾½ö½öÊÇһЩĿ¼Îļþ£¬ÏûÏ¢¼¯ÒԹ̶¨¶ÓµÄ¸ñʽдÈëµ½ÈÕÖ¾ÎļþÖУ¬Õâ¸ö¸ñʽproducerºÍconsumerÊǹ²ÏíµÄ£¬ÕâʹµÃKafka¿ÉÒÔÒ»¸öºÜÖØÒªµÄµã½øÐÐÓÅ»¯£ºÏûÏ¢ÔÚÍøÂçÉϵĴ«µÝ¡£ÏÖ´úµÄunix²Ù×÷ϵͳÌṩÁ˸ßÐÔÄܵĽ«Êý¾Ý´ÓÒ³Ãæ»º´æ·¢Ë͵½socketµÄϵͳº¯Êý£¬ÔÚlinuxÖУ¬Õâ¸öº¯ÊýÊÇsendfile.

ΪÁ˸üºÃµÄÀí½âsendfileµÄºÃ´¦£¬ÎÒÃÇÏÈÀ´¿´ÏÂÒ»°ã½«Êý¾Ý´ÓÎļþ·¢Ë͵½socketµÄÊý¾ÝÁ÷Ïò£º

²Ù×÷ϵͳ°ÑÊý¾Ý´ÓÎļþ¿½±´ÄÚºËÖеÄÒ³»º´æÖÐ

Ó¦ÓóÌÐò´ÓÒ³»º´æ´Ó°ÑÊý¾Ý¿½±´×Ô¼ºµÄÄڴ滺´æÖÐ

Ó¦ÓóÌÐò½«Êý¾ÝдÈëµ½ÄÚºËÖÐsocket»º´æÖÐ

²Ù×÷ϵͳ°ÑÊý¾Ý´Ósocket»º´æÖп½±´µ½Íø¿¨½Ó¿Ú»º´æ£¬´ÓÕâÀï·¢Ë͵½ÍøÂçÉÏ¡£

ÕâÏÔÈ»ÊǵÍЧÂʵģ¬ÓÐ4´Î¿½±´ºÍ2´Îϵͳµ÷Óá£Sendfileͨ¹ýÖ±½Ó½«Êý¾Ý´ÓÒ³Ãæ»º´æ·¢ËÍÍø¿¨½Ó¿Ú»º´æ£¬±ÜÃâÁËÖØ¸´¿½±´£¬´ó´óµÄÓÅ»¯ÁËÐÔÄÜ¡£

ÔÚÒ»¸ö¶àconsumersµÄ³¡¾°ÀÊý¾Ý½ö½ö±»¿½±´µ½Ò³Ã滺´æÒ»´Î¶ø²»ÊÇÿ´ÎÏû·ÑÏûÏ¢µÄʱºò¶¼Öظ´µÄ½øÐп½±´¡£ÕâʹµÃÏûÏ¢ÒÔ½üºõÍøÂç´ø¿íµÄËÙÂÊ·¢ËͳöÈ¥¡£ÕâÑùÔÚ´ÅÅ̲ãÃæÄ㼸ºõ¿´²»µ½ÈκεĶÁ²Ù×÷£¬ÒòΪÊý¾Ý¶¼ÊÇ´ÓÒ³Ãæ»º´æÖÐÖ±½Ó·¢Ë͵½ÍøÂçÉÏÈ¥ÁË¡£

ÕâÆªÎÄÕÂÏêϸ½éÉÜÁËsendfileºÍzero-copy¼¼ÊõÔÚJava·½ÃæµÄÓ¦Óá£

Êý¾ÝѹËõ

ºÜ¶àʱºò£¬ÐÔÄܵį¿¾±²¢·ÇCPU»òÕßÓ²Å̶øÊÇÍøÂç´ø¿í£¬¶ÔÓÚÐèÒªÔÚÊý¾ÝÖÐÐÄÖ®¼ä´«ËÍ´óÁ¿Êý¾ÝµÄÓ¦ÓøüÊÇÈç´Ë¡£µ±È»Óû§¿ÉÒÔÔÚûÓÐKafkaÖ§³ÖµÄÇé¿öϸ÷×ÔѹËõ×Ô¼ºµÄÏûÏ¢£¬µ«ÊÇÕ⽫µ¼Ö½ϵ͵ÄѹËõÂÊ£¬ÒòΪÏà±ÈÓÚ½«ÏûÏ¢µ¥¶ÀѹËõ£¬½«´óÁ¿ÎļþѹËõÔÚÒ»Æð²ÅÄÜÆðµ½×îºÃµÄѹËõЧ¹û¡£

Kafka²ÉÓÃÁ˶˵½¶ËµÄѹËõ£ºÒòΪÓС°ÏûÏ¢¼¯¡±µÄ¸ÅÄ¿Í»§¶ËµÄÏûÏ¢¿ÉÒÔÒ»Æð±»Ñ¹ËõºóË͵½·þÎñ¶Ë£¬²¢ÒÔѹËõºóµÄ¸ñʽдÈëÈÕÖ¾Îļþ£¬ÒÔѹËõµÄ¸ñʽ·¢Ë͵½consumer£¬ÏûÏ¢´Óproducer·¢³öµ½consumerÄõ½¶¼±»ÊÇѹËõµÄ£¬Ö»ÓÐÔÚconsumerʹÓõÄʱºò²Å±»½âѹËõ£¬ËùÒÔ½Ð×ö¡°¶Ëµ½¶ËµÄѹËõ¡±¡£

KafkaÖ§³ÖGZIPºÍSnappyѹËõЭÒé¡£¸üÏêϸµÄÄÚÈÝ¿ÉÒԲ鿴ÕâÀï¡£

##########################################################

Æß¡¢ProducerºÍConsumer

Kafka ProducerÏûÏ¢·¢ËÍ

producerÖ±½Ó½«Êý¾Ý·¢Ë͵½brokerµÄleader(Ö÷½Úµã)£¬²»ÐèÒªÔÚ¶à¸ö½Úµã½øÐзַ¢¡£ÎªÁ˰ïÖúproducer×öµ½Õâµã£¬ËùÓеÄKafka½Úµã¶¼¿ÉÒÔ¼°Ê±µÄ¸æÖª:ÄÄЩ½ÚµãÊǻµÄ£¬Ä¿±êtopicÄ¿±ê·ÖÇøµÄleaderÔÚÄÄ¡£ÕâÑùproducer¾Í¿ÉÒÔÖ±½Ó½«ÏûÏ¢·¢Ë͵½Ä¿µÄµØÁË¡£

¿Í»§¶Ë¿ØÖÆÏûÏ¢½«±»·Ö·¢µ½Äĸö·ÖÇø¡£¿ÉÒÔͨ¹ý¸ºÔؾùºâËæ»úµÄÑ¡Ôñ£¬»òÕßʹÓ÷ÖÇøº¯Êý¡£KafkaÔÊÐíÓû§ÊµÏÖ·ÖÇøº¯Êý£¬Ö¸¶¨·ÖÇøµÄkey£¬½«ÏûÏ¢hashµ½²»Í¬µÄ·ÖÇøÉÏ(µ±È»ÓÐÐèÒªµÄ»°£¬Ò²¿ÉÒÔ¸²¸ÇÕâ¸ö·ÖÇøº¯Êý×Ô¼ºÊµÏÖÂß¼­).±ÈÈçÈç¹ûÄãÖ¸¶¨µÄkeyÊÇuser id£¬ÄÇôͬһ¸öÓû§·¢Ë͵ÄÏûÏ¢¶¼±»·¢Ë͵½Í¬Ò»¸ö·ÖÇøÉÏ¡£¾­¹ý·ÖÇøÖ®ºó£¬consumer¾Í¿ÉÒÔÓÐÄ¿µÄµÄÏû·Ñij¸ö·ÖÇøµÄÏûÏ¢¡£

Òì²½·¢ËÍ

ÅúÁ¿·¢ËÍ¿ÉÒÔºÜÓÐЧµÄÌá¸ß·¢ËÍЧÂÊ¡£Kafka producerµÄÒì²½·¢ËÍģʽÔÊÐí½øÐÐÅúÁ¿·¢ËÍ£¬ÏȽ«ÏûÏ¢»º´æÔÚÄÚ´æÖУ¬È»ºóÒ»´ÎÇëÇóÅúÁ¿·¢ËͳöÈ¥¡£Õâ¸ö²ßÂÔ¿ÉÒÔÅäÖõ쬱ÈÈç¿ÉÒÔÖ¸¶¨»º´æµÄÏûÏ¢´ïµ½Ä³¸öÁ¿µÄʱºò¾Í·¢³öÈ¥£¬»òÕß»º´æÁ˹̶¨µÄʱ¼äºó¾Í·¢ËͳöÈ¥£¨±ÈÈç100ÌõÏûÏ¢¾Í·¢ËÍ£¬»òÕßÿ5Ãë·¢ËÍÒ»´Î£©¡£ÕâÖÖ²ßÂÔ½«´ó´ó¼õÉÙ·þÎñ¶ËµÄI/O´ÎÊý¡£

¼ÈÈ»»º´æÊÇÔÚproducer¶Ë½øÐеģ¬ÄÇôµ±producer±ÀÀ£Ê±£¬ÕâЩÏûÏ¢¾Í»á¶ªÊ§¡£Kafka0.8.1µÄÒì²½·¢ËÍģʽ»¹²»Ö§³Ö»Øµ÷£¬¾Í²»ÄÜÔÚ·¢Ëͳö´íʱ½øÐд¦Àí¡£Kafka 0.9¿ÉÄÜ»áÔö¼ÓÕâÑùµÄ»Øµ÷º¯Êý¡£¼ûProposed Producer API.

Kafka Consumer

Kafa consumerÏû·ÑÏûϢʱ£¬Ïòbroker·¢³ö"fetch"ÇëÇóÈ¥Ïû·ÑÌØ¶¨·ÖÇøµÄÏûÏ¢¡£consumerÖ¸¶¨ÏûÏ¢ÔÚÈÕÖ¾ÖÐµÄÆ«ÒÆÁ¿£¨offset£©£¬¾Í¿ÉÒÔÏû·Ñ´ÓÕâ¸öλÖÿªÊ¼µÄÏûÏ¢¡£customerÓµÓÐÁËoffsetµÄ¿ØÖÆÈ¨£¬¿ÉÒÔÏòºó»Ø¹öÈ¥ÖØÐÂÏû·Ñ֮ǰµÄÏûÏ¢£¬ÕâÊǺÜÓÐÒâÒåµÄ¡£

ÍÆ»¹ÊÇÀ­£¿

Kafka×î³õ¿¼ÂǵÄÎÊÌâÊÇ£¬customerÓ¦¸Ã´ÓbrokesÀ­È¡ÏûÏ¢»¹ÊÇbrokers½«ÏûÏ¢ÍÆË͵½consumer£¬Ò²¾ÍÊÇpull»¹push¡£ÔÚÕâ·½Ãæ£¬Kafka×ñÑ­ÁËÒ»Öִ󲿷ÖÏûϢϵͳ¹²Í¬µÄ´«Í³µÄÉè¼Æ£ºproducer½«ÏûÏ¢ÍÆË͵½broker£¬consumer´ÓbrokerÀ­È¡ÏûÏ¢¡£

һЩÏûϢϵͳ±ÈÈçScribeºÍApache Flume²ÉÓÃÁËpushģʽ£¬½«ÏûÏ¢ÍÆË͵½ÏÂÓεÄconsumer¡£ÕâÑù×öÓкô¦Ò²Óлµ´¦£ºÓÉbroker¾ö¶¨ÏûÏ¢ÍÆË͵ÄËÙÂÊ£¬¶ÔÓÚ²»Í¬Ïû·ÑËÙÂʵÄconsumer¾Í²»Ì«ºÃ´¦ÀíÁË¡£ÏûϢϵͳ¶¼ÖÂÁ¦ÓÚÈÃconsumerÒÔ×î´óµÄËÙÂÊ×î¿ìËÙµÄÏû·ÑÏûÏ¢£¬µ«²»ÐÒµÄÊÇ£¬pushģʽÏ£¬µ±brokerÍÆË͵ÄËÙÂÊÔ¶´óÓÚconsumerÏû·ÑµÄËÙÂÊʱ£¬consumer¿ÖžÍÒª±ÀÀ£ÁË¡£×îÖÕKafka»¹ÊÇѡȡÁË´«Í³µÄpullģʽ¡£

PullģʽµÄÁíÍâÒ»¸öºÃ´¦ÊÇconsumer¿ÉÒÔ×ÔÖ÷¾ö¶¨ÊÇ·ñÅúÁ¿µÄ´ÓbrokerÀ­È¡Êý¾Ý¡£Pushģʽ±ØÐëÔÚ²»ÖªµÀÏÂÓÎconsumerÏû·ÑÄÜÁ¦ºÍÏû·Ñ²ßÂÔµÄÇé¿öϾö¶¨ÊÇÁ¢¼´ÍÆËÍÿÌõÏûÏ¢»¹ÊÇ»º´æÖ®ºóÅúÁ¿ÍÆËÍ¡£Èç¹ûΪÁ˱ÜÃâconsumer±ÀÀ£¶ø²ÉÓýϵ͵ÄÍÆËÍËÙÂÊ£¬½«¿ÉÄܵ¼ÖÂÒ»´ÎÖ»ÍÆËͽÏÉÙµÄÏûÏ¢¶øÔì³ÉÀË·Ñ¡£PullģʽÏ£¬consumer¾Í¿ÉÒÔ¸ù¾Ý×Ô¼ºµÄÏû·ÑÄÜÁ¦È¥¾ö¶¨ÕâЩ²ßÂÔ¡£

PullÓиöȱµãÊÇ£¬Èç¹ûbrokerûÓпɹ©Ïû·ÑµÄÏûÏ¢£¬½«µ¼ÖÂconsumer²»¶ÏÔÚÑ­»·ÖÐÂÖѯ£¬Ö±µ½ÐÂÏûÏ¢µ½t´ï¡£ÎªÁ˱ÜÃâÕâµã£¬KafkaÓиö²ÎÊý¿ÉÒÔÈÃconsumer×èÈûÖªµÀÐÂÏûÏ¢µ½´ï(µ±È»Ò²¿ÉÒÔ×èÈûÖªµÀÏûÏ¢µÄÊýÁ¿´ïµ½Ä³¸öÌØ¶¨µÄÁ¿ÕâÑù¾Í¿ÉÒÔÅúÁ¿·¢ËÍ)¡£

Ïû·Ñ״̬¸ú×Ù

¶ÔÏû·ÑÏûϢ״̬µÄ¼Ç¼ҲÊǺÜÖØÒªµÄ¡£

´ó²¿·ÖÏûϢϵͳÔÚbroker¶ËµÄά»¤ÏûÏ¢±»Ïû·ÑµÄ¼Ç¼£ºÒ»¸öÏûÏ¢±»·Ö·¢µ½consumerºóbroker¾ÍÂíÉϽøÐбê¼Ç»òÕߵȴýcustomerµÄ֪ͨºó½øÐбê¼Ç¡£ÕâÑùÒ²¿ÉÒÔÔÚÏûÏ¢ÔÚÏû·ÑºóÁ¢Âí¾Íɾ³ýÒÔ¼õÉÙ¿Õ¼äÕ¼Óá£

µ«ÊÇÕâÑù»á²»»áÓÐʲôÎÊÌâÄØ£¿Èç¹ûÒ»ÌõÏûÏ¢·¢ËͳöÈ¥Ö®ºó¾ÍÁ¢¼´±»±ê¼ÇΪÏû·Ñ¹ýµÄ£¬Ò»µ©consumer´¦ÀíÏûϢʱʧ°ÜÁË£¨±ÈÈç³ÌÐò±ÀÀ££©ÏûÏ¢¾Í¶ªÊ§ÁË¡£ÎªÁ˽â¾öÕâ¸öÎÊÌ⣬ºÜ¶àÏûϢϵͳÌṩÁËÁíÍâÒ»¸ö¸ö¹¦ÄÜ£ºµ±ÏûÏ¢±»·¢ËͳöÈ¥Ö®ºó½ö½ö±»±ê¼ÇΪÒÑ·¢ËÍ״̬£¬µ±½Óµ½consumerÒѾ­Ïû·Ñ³É¹¦µÄ֪ͨºó²Å±ê¼ÇΪÒѱ»Ïû·ÑµÄ״̬¡£ÕâËäÈ»½â¾öÁËÏûÏ¢¶ªÊ§µÄÎÊÌ⣬µ«²úÉúÁËÐÂÎÊÌ⣬Ê×ÏÈÈç¹ûconsumer´¦ÀíÏûÏ¢³É¹¦Á˵«ÊÇÏòbroker·¢ËÍÏìӦʱʧ°ÜÁË£¬ÕâÌõÏûÏ¢½«±»Ïû·ÑÁ½´Î¡£µÚ¶þ¸öÎÊÌâʱ£¬broker±ØÐëά»¤Ã¿ÌõÏûÏ¢µÄ״̬£¬²¢ÇÒÿ´Î¶¼ÒªÏÈËø×¡ÏûϢȻºó¸ü¸Ä״̬ȻºóÊÍ·ÅËø¡£ÕâÑùÂé·³ÓÖÀ´ÁË£¬ÇÒ²»ËµÒªÎ¬»¤´óÁ¿µÄ״̬Êý¾Ý£¬±ÈÈçÈç¹ûÏûÏ¢·¢ËͳöÈ¥µ«Ã»ÓÐÊÕµ½Ïû·Ñ³É¹¦µÄ֪ͨ£¬ÕâÌõÏûÏ¢½«Ò»Ö±´¦ÓÚ±»Ëø¶¨µÄ״̬£¬

Kafka²ÉÓÃÁ˲»Í¬µÄ²ßÂÔ¡£Topic±»·Ö³ÉÁËÈô¸É·ÖÇø£¬Ã¿¸ö·ÖÇøÔÚͬһʱ¼äÖ»±»Ò»¸öconsumerÏû·Ñ¡£ÕâÒâζ×Åÿ¸ö·ÖÇø±»Ïû·ÑµÄÏûÏ¢ÔÚÈÕÖ¾ÖеÄλÖýö½öÊÇÒ»¸ö¼òµ¥µÄÕûÊý£ºoffset¡£ÕâÑù¾ÍºÜÈÝÒ×±ê¼Çÿ¸ö·ÖÇøÏû·Ñ״̬¾ÍºÜÈÝÒ×ÁË£¬½ö½öÐèÒªÒ»¸öÕûÊý¶øÒÑ¡£ÕâÑùÏû·Ñ״̬µÄ¸ú×پͺܼòµ¥ÁË¡£

Õâ´øÀ´ÁËÁíÍâÒ»¸öºÃ´¦£ºconsumer¿ÉÒÔ°Ñoffsetµ÷³ÉÒ»¸ö½ÏÀϵÄÖµ£¬È¥ÖØÐÂÏû·ÑÀϵÄÏûÏ¢¡£Õâ¶Ô´«Í³µÄÏûϢϵͳÀ´Ëµ¿´ÆðÀ´ÓÐЩ²»¿É˼Ò飬µ«È·ÊµÊǷdz£ÓÐÓõģ¬Ë­¹æ¶¨ÁËÒ»ÌõÏûÏ¢Ö»Äܱ»Ïû·ÑÒ»´ÎÄØ£¿consumer·¢ÏÖ½âÎöÊý¾ÝµÄ³ÌÐòÓÐbug£¬ÔÚÐÞ¸ÄbugºóÔÙÀ´½âÎöÒ»´ÎÏûÏ¢£¬¿´ÆðÀ´ÊǺܺÏÀíµÄ¶îѽ£¡

ÀëÏß´¦ÀíÏûÏ¢

¸ß¼¶µÄÊý¾Ý³Ö¾Ã»¯ÔÊÐíconsumerÿ¸ö¸ôÒ»¶Îʱ¼äÅúÁ¿µÄ½«Êý¾Ý¼ÓÔØµ½ÏßÏÂϵͳÖбÈÈçHadoop»òÕßÊý¾Ý²Ö¿â¡£ÕâÖÖÇé¿öÏ£¬Hadoop¿ÉÒÔ½«¼ÓÔØÈÎÎñ·Ö²ð£¬²ð³Éÿ¸öbroker»òÿ¸ötopic»òÿ¸ö·ÖÇøÒ»¸ö¼ÓÔØÈÎÎñ¡£Hadoop¾ßÓÐÈÎÎñ¹ÜÀí¹¦ÄÜ£¬µ±Ò»¸öÈÎÎñʧ°ÜÁ˾ͿÉÒÔÖØÆô¶ø²»Óõ£ÐÄÊý¾Ý±»ÖØÐ¼ÓÔØ£¬Ö»Òª´ÓÉϴμÓÔØµÄλÖüÌÐø¼ÓÔØÏûÏ¢¾Í¿ÉÒÔÁË¡£

#########################################################

°Ë¡¢Ö÷´Óͬ²½

KafkaÔÊÐítopicµÄ·ÖÇøÓµÓÐÈô¸É¸±±¾£¬Õâ¸öÊýÁ¿ÊÇ¿ÉÒÔÅäÖõģ¬Äã¿ÉÒÔΪÿ¸ötopciÅäÖø±±¾µÄÊýÁ¿¡£Kafka»á×Ô¶¯ÔÚÿ¸ö¸ö¸±±¾Éϱ¸·ÝÊý¾Ý£¬ËùÒÔµ±Ò»¸ö½ÚµãdownµôʱÊý¾ÝÒÀÈ»ÊÇ¿ÉÓõġ£

KafkaµÄ¸±±¾¹¦Äܲ»ÊDZØÐëµÄ£¬Äã¿ÉÒÔÅäÖÃÖ»ÓÐÒ»¸ö¸±±¾£¬ÕâÑùÆäʵ¾ÍÏ൱ÓÚÖ»ÓÐÒ»·ÝÊý¾Ý¡£

´´½¨¸±±¾µÄµ¥Î»ÊÇtopicµÄ·ÖÇø£¬Ã¿¸ö·ÖÇø¶¼ÓÐÒ»¸öleaderºÍÁã»ò¶à¸öfollowers.ËùÓеĶÁд²Ù×÷¶¼ÓÉleader´¦Àí£¬Ò»°ã·ÖÇøµÄÊýÁ¿¶¼±ÈbrokerµÄÊýÁ¿¶àµÄ¶à£¬¸÷·ÖÇøµÄleader¾ùÔȵķֲ¼ÔÚbrokersÖС£ËùÓеÄfollowers¶¼¸´ÖÆleaderµÄÈÕÖ¾£¬ÈÕÖ¾ÖеÄÏûÏ¢ºÍ˳Ðò¶¼ºÍleaderÖеÄÒ»Ö¡£flowersÏòÆÕͨµÄconsumerÄÇÑù´ÓleaderÄÇÀïÀ­È¡ÏûÏ¢²¢±£´æÔÚ×Ô¼ºµÄÈÕÖ¾ÎļþÖС£

Ðí¶à·Ö²¼Ê½µÄÏûϢϵͳ×Ô¶¯µÄ´¦Àíʧ°ÜµÄÇëÇó£¬ËüÃǶÔÒ»¸ö½ÚµãÊÇ·ñ

×Å£¨alive£©¡±ÓÐ×ÅÇåÎúµÄ¶¨Òå¡£KafkaÅжÏÒ»¸ö½ÚµãÊÇ·ñ»î×ÅÓÐÁ½¸öÌõ¼þ£º

½Úµã±ØÐë¿ÉÒÔά»¤ºÍZooKeeperµÄÁ¬½Ó£¬Zookeeperͨ¹ýÐÄÌø»úÖÆ¼ì²éÿ¸ö½ÚµãµÄÁ¬½Ó¡£

Èç¹û½ÚµãÊǸöfollower,Ëû±ØÐëÄܼ°Ê±µÄͬ²½leaderµÄд²Ù×÷£¬ÑÓʱ²»ÄÜÌ«¾Ã¡£

·ûºÏÒÔÉÏÌõ¼þµÄ½Úµã׼ȷµÄ˵Ӧ¸ÃÊÇ¡°Í¬²½Öеģ¨in sync£©¡±£¬¶ø²»ÊÇÄ£ºýµÄ˵ÊÇ¡°»î×ŵġ±»òÊÇ¡°Ê§°ÜµÄ¡±¡£Leader»á×·×ÙËùÓС°Í¬²½ÖС±µÄ½Úµã£¬Ò»µ©Ò»¸ödownµôÁË£¬»òÊÇ¿¨×¡ÁË£¬»òÊÇÑÓʱ̫¾Ã£¬leader¾Í»á°ÑËüÒÆ³ý¡£ÖÁÓÚÑÓʱ¶à¾ÃËãÊÇ¡°Ì«¾Ã¡±£¬ÊÇÓɲÎÊýreplica.lag.max.messages¾ö¶¨µÄ£¬ÔõÑùËãÊÇ¿¨×¡ÁË£¬ÔõÊÇÓɲÎÊýreplica.lag.time.max.ms¾ö¶¨µÄ¡£

Ö»Óе±ÏûÏ¢±»ËùÓеĸ±±¾¼ÓÈëµ½ÈÕÖ¾ÖÐʱ£¬²ÅËãÊÇ¡°committed¡±£¬Ö»ÓÐcommittedµÄÏûÏ¢²Å»á·¢Ë͸øconsumer£¬ÕâÑù¾Í²»Óõ£ÐÄÒ»µ©leader downµôÁËÏûÏ¢»á¶ªÊ§¡£ProducerÒ²¿ÉÒÔÑ¡ÔñÊÇ·ñµÈ´ýÏûÏ¢±»Ìá½»µÄ֪ͨ£¬Õâ¸öÊÇÓɲÎÊýrequest.required.acks¾ö¶¨µÄ¡£

Kafka±£Ö¤Ö»ÒªÓÐÒ»¸ö¡°Í¬²½ÖС±µÄ½Úµã£¬¡°committed¡±µÄÏûÏ¢¾Í²»»á¶ªÊ§¡£

LeaderµÄÑ¡Ôñ

KafkaµÄºËÐÄÊÇÈÕÖ¾Îļþ£¬ÈÕÖ¾ÎļþÔÚ¼¯ÈºÖеÄͬ²½ÊÇ·Ö²¼Ê½Êý¾Ýϵͳ×î»ù´¡µÄÒªËØ¡£

Èç¹ûleadersÓÀÔ¶²»»ádownµÄ»°ÎÒÃǾͲ»ÐèÒªfollowersÁË£¡Ò»µ©leader downµôÁË£¬ÐèÒªÔÚfollowersÖÐÑ¡ÔñÒ»¸öеÄleader.µ«ÊÇfollowers±¾ÉíÓпÉÄÜÑÓʱ̫¾Ã»òÕßcrash£¬ËùÒÔ±ØÐëÑ¡Ôñ¸ßÖÊÁ¿µÄfollower×÷Ϊleader.±ØÐë±£Ö¤£¬Ò»µ©Ò»¸öÏûÏ¢±»Ìá½»ÁË£¬µ«ÊÇleader downµôÁË£¬ÐÂÑ¡³öµÄleader±ØÐë¿ÉÒÔÌṩÕâÌõÏûÏ¢¡£´ó²¿·ÖµÄ·Ö²¼Ê½ÏµÍ³²ÉÓÃÁ˶àÊýͶƱ·¨ÔòÑ¡ÔñеÄleader,¶ÔÓÚ¶àÊýͶƱ·¨Ôò£¬¾ÍÊǸù¾ÝËùÓи±±¾½ÚµãµÄ×´¿ö¶¯Ì¬µÄÑ¡Ôñ×îÊʺϵÄ×÷Ϊleader.Kafka²¢²»ÊÇʹÓÃÕâÖÖ·½·¨¡£

Kafaka¶¯Ì¬Î¬»¤ÁËÒ»¸öͬ²½×´Ì¬µÄ¸±±¾µÄ¼¯ºÏ£¨a set of in-sync replicas£©£¬¼ò³ÆISR£¬ÔÚÕâ¸ö¼¯ºÏÖеĽڵ㶼ÊǺÍleader±£³Ö¸ß¶ÈÒ»Öµģ¬ÈκÎÒ»ÌõÏûÏ¢±ØÐë±»Õâ¸ö¼¯ºÏÖеÄÿ¸ö½Úµã¶ÁÈ¡²¢×·¼Óµ½ÈÕÖ¾ÖÐÁË£¬²Å»ØÍ¨ÖªÍⲿÕâ¸öÏûÏ¢ÒѾ­±»Ìá½»ÁË¡£Òò´ËÕâ¸ö¼¯ºÏÖеÄÈκÎÒ»¸ö½ÚµãËæÊ±¶¼¿ÉÒÔ±»Ñ¡Îªleader.ISRÔÚZooKeeperÖÐά»¤¡£ISRÖÐÓÐf+1¸ö½Úµã£¬¾Í¿ÉÒÔÔÊÐíÔÚf¸ö½ÚµãdownµôµÄÇé¿öϲ»»á¶ªÊ§ÏûÏ¢²¢Õý³£Ìṩ·þ¡£ISRµÄ³ÉÔ±ÊǶ¯Ì¬µÄ£¬Èç¹ûÒ»¸ö½Úµã±»ÌÔÌ­ÁË£¬µ±ËüÖØÐ´ﵽ¡°Í¬²½ÖС±µÄ״̬ʱ£¬Ëû¿ÉÒÔÖØÐ¼ÓÈëISR.ÕâÖÖleaderµÄÑ¡Ôñ·½Ê½ÊǷdz£¿ìËٵģ¬ÊʺÏkafkaµÄÓ¦Óó¡¾°¡£

Ò»¸öа¶ñµÄÏë·¨£ºÈç¹ûËùÓнڵ㶼downµôÁËÔõô°ì£¿Kafka¶ÔÓÚÊý¾Ý²»»á¶ªÊ§µÄ±£Ö¤£¬ÊÇ»ùÓÚÖÁÉÙÒ»¸ö½ÚµãÊÇ´æ»îµÄ£¬Ò»µ©ËùÓнڵ㶼downÁË£¬Õâ¸ö¾Í²»Äܱ£Ö¤ÁË¡£

ʵ¼ÊÓ¦ÓÃÖУ¬µ±ËùÓеĸ±±¾¶¼downµôʱ£¬±ØÐ뼰ʱ×÷³ö·´Ó¦¡£¿ÉÒÔÓÐÒÔÏÂÁ½ÖÖÑ¡Ôñ:

µÈ´ýISRÖеÄÈκÎÒ»¸ö½Úµã»Ö¸´²¢µ£ÈÎleader¡£

Ñ¡ÔñËùÓнڵãÖУ¨²»Ö»ÊÇISR£©µÚÒ»¸ö»Ö¸´µÄ½Úµã×÷Ϊleader.

ÕâÊÇÒ»¸öÔÚ¿ÉÓÃÐÔºÍÁ¬ÐøÐÔÖ®¼äµÄȨºâ¡£Èç¹ûµÈ´ýISRÖеĽڵã»Ö¸´£¬Ò»µ©ISRÖеĽڵãÆð²»ÆðÀ´»òÕßÊý¾Ý¶¼ÊÇÁË£¬ÄǼ¯Èº¾ÍÓÀÔ¶»Ö¸´²»ÁËÁË¡£Èç¹ûµÈ´ýISRÒâÍâµÄ½Úµã»Ö¸´£¬Õâ¸ö½ÚµãµÄÊý¾Ý¾Í»á±»×÷ΪÏßÉÏÊý¾Ý£¬ÓпÉÄܺÍÕæÊµµÄÊý¾ÝÓÐËù³öÈ룬ÒòΪÓÐЩÊý¾ÝËü¿ÉÄÜ»¹Ã»Í¬²½µ½¡£KafkaĿǰѡÔñÁ˵ڶþÖÖ²ßÂÔ£¬ÔÚδÀ´µÄ°æ±¾Öн«Ê¹Õâ¸ö²ßÂÔµÄÑ¡Ôñ¿ÉÅäÖ㬿ÉÒÔ¸ù¾Ý³¡¾°Áé»îµÄÑ¡Ôñ¡£

ÕâÖÖ¾½¾³²»Ö»Kafka»áÓöµ½£¬¼¸ºõËùÓеķֲ¼Ê½Êý¾Ýϵͳ¶¼»áÓöµ½¡£

¸±±¾¹ÜÀí

ÒÔÉϽö½öÒÔÒ»¸ötopicÒ»¸ö·ÖÇøÎªÀý×Ó½øÐÐÁËÌÖÂÛ£¬µ«Êµ¼ÊÉÏÒ»¸öKafka½«»á¹ÜÀí³ÉǧÉÏÍòµÄtopic·ÖÇø.Kafka¾¡Á¿µÄʹËùÓзÖÇø¾ùÔȵķֲ¼µ½¼¯ÈºËùÓеĽڵãÉ϶ø²»ÊǼ¯ÖÐÔÚijЩ½ÚµãÉÏ£¬ÁíÍâÖ÷´Ó¹ØÏµÒ²¾¡Á¿¾ùºâÕâÑùÿ¸ö¼¸µã¶¼»áµ£ÈÎÒ»¶¨±ÈÀýµÄ·ÖÇøµÄleader.

ÓÅ»¯leaderµÄÑ¡Ôñ¹ý³ÌÒ²ÊǺÜÖØÒªµÄ£¬Ëü¾ö¶¨ÁËϵͳ·¢Éú¹ÊÕÏʱµÄ¿Õ´°ÆÚÓжà¾Ã¡£KafkaÑ¡ÔñÒ»¸ö½Úµã×÷Ϊ¡°controller¡±,µ±·¢ÏÖÓнڵãdownµôµÄʱºòËü¸ºÔðÔÚÓÎÓ¾·ÖÇøµÄËùÓнڵãÖÐÑ¡ÔñеÄleader,ÕâʹµÃKafka¿ÉÒÔÅúÁ¿µÄ¸ßЧµÄ¹ÜÀíËùÓзÖÇø½ÚµãµÄÖ÷´Ó¹ØÏµ¡£Èç¹ûcontroller downµôÁË£¬»î×ŵĽڵãÖеÄÒ»¸ö»á±¸Çл»ÎªÐµÄcontroller.

###################################################

¾Å¡¢¿Í»§¶ËAPI

Kafka Producer APIs

Procuder APIÓÐÁ½ÖÖ£ºkafka.producer.SyncProducerºÍkafka.producer.async.AsyncProducer.ËüÃǶ¼ÊµÏÖÁËͬһ¸ö½Ó¿Ú£º

class Producer {
/* ½«ÏûÏ¢·¢Ë͵½Ö¸¶¨·ÖÇø */
publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData);
/* ÅúÁ¿·¢ËÍÒ»ÅúÏûÏ¢ */
publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
/* ¹Ø±Õproducer */
publicvoid close();
}

Producer APIÌṩÁËÒÔϹ¦ÄÜ£º

¿ÉÒÔ½«¶à¸öÏûÏ¢»º´æµ½±¾µØ¶ÓÁÐÀȻºóÒì²½µÄÅúÁ¿·¢Ë͵½broker£¬¿ÉÒÔͨ¹ý²ÎÊýproducer.type=async×öµ½¡£»º´æµÄ´óС¿ÉÒÔͨ¹ýһЩ²ÎÊýÖ¸¶¨£ºqueue.timeºÍbatch.size¡£Ò»¸öºǫ́Ị̈߳¨(kafka.producer.async.ProducerSendThread£©´Ó¶ÓÁÐÖÐÈ¡³öÊý¾Ý²¢ÈÃkafka.producer.EventHandler½«ÏûÏ¢·¢Ë͵½broker£¬Ò²¿ÉÒÔͨ¹ý²ÎÊýevent.handler¶¨ÖÆhandler£¬ÔÚproducer¶Ë´¦ÀíÊý¾ÝµÄ²»Í¬µÄ½×¶Î×¢²á´¦ÀíÆ÷£¬±ÈÈç¿ÉÒÔ¶ÔÕâÒ»¹ý³Ì½øÐÐÈÕÖ¾×·×Ù£¬»ò½øÐÐһЩ¼à¿Ø¡£Ö»ÐèʵÏÖkafka.producer.async.CallbackHandler½Ó¿Ú£¬²¢ÔÚcallback.handlerÖÐÅäÖá£

×Ô¼º±àдEncoderÀ´ÐòÁл¯ÏûÏ¢£¬Ö»ÐèʵÏÖÏÂÃæÕâ¸ö½Ó¿Ú¡£Ä¬ÈϵÄEncoderÊÇkafka.serializer.DefaultEncoder¡£

interface Encoder<T> {

public Message toMessage(T data);

}

ÌṩÁË»ùÓÚZookeeperµÄbroker×Ô¶¯¸ÐÖªÄÜÁ¦£¬¿ÉÒÔͨ¹ý²ÎÊýzk.connectʵÏÖ¡£Èç¹û²»Ê¹ÓÃZookeeper£¬Ò²¿ÉÒÔʹÓÃbroker.list²ÎÊýÖ¸¶¨Ò»¸ö¾²Ì¬µÄbrokersÁÐ±í£¬ÕâÑùÏûÏ¢½«±»Ëæ»úµÄ·¢Ë͵½Ò»¸öbrokerÉÏ£¬Ò»µ©Ñ¡ÖеÄbrokerʧ°ÜÁË£¬ÏûÏ¢·¢ËÍÒ²¾Íʧ°ÜÁË¡£

ͨ¹ý·ÖÇøº¯Êýkafka.producer.PartitionerÀà¶ÔÏûÏ¢·ÖÇø¡£

interface Partitioner<T> {

int partition(T key, int numPartitions);

}

·ÖÇøº¯ÊýÓÐÁ½¸ö²ÎÊý£ºkeyºÍ¿ÉÓõķÖÇøÊýÁ¿£¬´Ó·ÖÇøÁбíÖÐÑ¡ÔñÒ»¸ö·ÖÇø²¢·µ»Øid¡£Ä¬ÈϵķÖÇø²ßÂÔÊÇhash(key)%numPartitions.Èç¹ûkeyÊÇnull,¾ÍËæ»úµÄÑ¡ÔñÒ»¸ö¡£¿ÉÒÔͨ¹ý²ÎÊýpartitioner.class¶¨ÖÆ·ÖÇøº¯Êý¡£

KafKa Consumer APIs

Consumer APIÓÐÁ½¸ö¼¶±ð¡£µÍ¼¶±ðµÄºÍÒ»¸öÖ¸¶¨µÄbroker±£³ÖÁ¬½Ó£¬²¢ÔÚ½ÓÊÕÍêÏûÏ¢ºó¹Ø±ÕÁ¬½Ó£¬Õâ¸ö¼¶±ðÊÇÎÞ״̬µÄ£¬Ã¿´Î¶ÁÈ¡ÏûÏ¢¶¼´ø×Åoffset¡£

¸ß¼¶±ðµÄAPIÒþ²ØÁ˺ÍbrokersÁ¬½ÓµÄϸ½Ú£¬ÔÚ²»±Ø¹ØÐÄ·þÎñ¶Ë¼Ü¹¹µÄÇé¿öϺͷþÎñ¶ËͨÐÅ¡£»¹¿ÉÒÔ×Ô¼ºÎ¬»¤Ïû·Ñ״̬£¬²¢¿ÉÒÔͨ¹ýһЩÌõ¼þÖ¸¶¨¶©ÔÄÌØ¶¨µÄtopic,±ÈÈç°×Ãûµ¥ºÚÃûµ¥»òÕßÕýÔò±í´ïʽ¡£

µÍ¼¶±ðµÄAPI

class SimpleConsumer {
/*ÏòÒ»¸öbroker·¢ËͶÁÈ¡ÇëÇ󲢵õ½ÏûÏ¢¼¯ */
public ByteBufferMessageSet fetch(FetchRequest request);
/*ÏòÒ»¸öbroker·¢ËͶÁÈ¡ÇëÇ󲢵õ½Ò»¸öÏàÓ¦¼¯ */
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
* µÃµ½Ö¸¶¨Ê±¼ä֮ǰµÄoffsets
* ·µ»ØÖµÊÇoffsetsÁÐ±í£¬ÒÔµ¹ÐòÅÅÐò
* @param time: ʱ¼ä£¬ºÁÃë,
* Èç¹ûÖ¸¶¨ÎªOffsetRequest$.MODULE$.LATIEST_TIME(), µÃµ½×îеÄoffset.
* Èç¹ûÖ¸¶¨ÎªOffsetRequest$.MODULE$.EARLIEST_TIME(),µÃµ½×îÀϵÄoffset.
*/
publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

µÍ¼¶±ðµÄAPIÊǸ߼¶±ðAPIʵÏֵĻù´¡£¬Ò²ÊÇΪÁËһЩ¶Ôά³ÖÏû·Ñ״̬ÓÐÌØÊâÐèÇóµÄ³¡¾°£¬±ÈÈçHadoop consumerÕâÑùµÄÀëÏßconsumer¡£

¸ß¼¶±ðµÄAPI

/* ´´½¨Á¬½Ó */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
* Õâ¸ö·½·¨¿ÉÒԵõ½Ò»¸öÁ÷µÄÁÐ±í£¬Ã¿¸öÁ÷¶¼ÊÇMessageAndMetadataµÄµü´ú£¬Í¨¹ýMessageAndMetadata¿ÉÒÔÄõ½ÏûÏ¢ºÍÆäËûµÄÔªÊý¾Ý£¨Ä¿Ç°Ö®ºótopic£©
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
* ÄãÒ²¿ÉÒԵõ½Ò»¸öÁ÷µÄÁÐ±í£¬Ëü°üº¬ÁË·ûºÏTopicFilerµÄÏûÏ¢µÄµü´ú£¬
* Ò»¸öTopicFilterÊÇÒ»¸ö·â×°Á˰×Ãûµ¥»òºÚÃûµ¥µÄÕýÔò±í´ïʽ¡£
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
/* ÌύĿǰÏû·Ñµ½µÄoffset */
public commitOffsets()
/* ¹Ø±ÕÁ¬½Ó */
public shutdown()
}

Õâ¸öAPIÎ§ÈÆ×ÅÓÉKafkaStreamʵÏֵĵü´úÆ÷Õ¹¿ª£¬Ã¿¸öÁ÷´ú±íһϵÁдÓÒ»¸ö»ò¶à¸ö·ÖÇø¶àºÍbrokerÉÏ»ã¾ÛÀ´µÄÏûÏ¢£¬Ã¿¸öÁ÷ÓÉÒ»¸öÏ̴߳¦Àí£¬ËùÒÔ¿Í»§¶Ë¿ÉÒÔÔÚ´´½¨µÄʱºòͨ¹ý²ÎÊýÖ¸¶¨ÏëÒª¼¸¸öÁ÷¡£Ò»¸öÁ÷ÊǶà¸ö·ÖÇø¶à¸öbrokerµÄºÏ²¢£¬µ«ÊÇÿ¸ö·ÖÇøµÄÏûÏ¢Ö»»áÁ÷ÏòÒ»¸öÁ÷¡£

ÿµ÷ÓÃÒ»´ÎcreateMessageStreams¶¼»á½«consumer×¢²áµ½topicÉÏ£¬ÕâÑùconsumerºÍbrokersÖ®¼äµÄ¸ºÔؾùºâ¾Í»á½øÐе÷Õû¡£API¹ÄÀøÃ¿´Îµ÷Óô´½¨¸ü¶àµÄtopicÁ÷ÒÔ¼õÉÙÕâÖÖµ÷Õû¡£createMessageStreamsByFilter·½·¨×¢²á¼àÌý¿ÉÒÔ¸Ð֪еķûºÏfilterµÄtipic¡£

#######################################################

Ê®¡¢ÏûÏ¢ºÍÈÕÖ¾

ÏûÏ¢ÓÉÒ»¸ö¹Ì¶¨³¤¶ÈµÄÍ·²¿ºÍ¿É±ä³¤¶ÈµÄ×Ö½ÚÊý×é×é³É¡£Í·²¿°üº¬ÁËÒ»¸ö°æ±¾ºÅºÍCRC32УÑéÂë¡£

/**
* ¾ßÓÐN¸ö×Ö½ÚµÄÏûÏ¢µÄ¸ñʽÈçÏÂ
*
* Èç¹û°æ±¾ºÅÊÇ0
*
* 1. 1¸ö×Ö½ÚµÄ "magic" ±ê¼Ç
*
* 2. 4¸ö×Ö½ÚµÄCRC32УÑéÂë
*
* 3. N - 5¸ö×ֽڵľßÌåÐÅÏ¢
*
* Èç¹û°æ±¾ºÅÊÇ1
*
* 1. 1¸ö×Ö½ÚµÄ "magic" ±ê¼Ç
*
* 2.1¸ö×ֽڵIJÎÊýÔÊÐí±êעһЩ¸½¼ÓµÄÐÅÏ¢±ÈÈçÊÇ·ñѹËõÁË£¬½âÂëÀàÐ͵È
*
* 3.4¸ö×Ö½ÚµÄCRC32УÑéÂë
*
* 4. N - 6 ¸ö×ֽڵľßÌåÐÅÏ¢
*
*/

ÈÕÖ¾Ò»¸ö½Ð×ö¡°my_topic¡±ÇÒÓÐÁ½¸ö·ÖÇøµÄµÄtopic,ËüµÄÈÕÖ¾ÓÐÁ½¸öÎļþ¼Ð×é³É£¬my_topic_0ºÍmy_topic_1,ÿ¸öÎļþ¼ÐÀï·ÅמßÌåµÄÊý¾ÝÎļþ£¬Ã¿¸öÊý¾ÝÎļþ¶¼ÊÇһϵÁеÄÈÕ־ʵÌ壬ÿ¸öÈÕ־ʵÌåÓÐÒ»¸ö4¸ö×Ö½ÚµÄÕûÊýN±ê×¢ÏûÏ¢µÄ³¤¶È£¬ºó±ß¸ú×ÅN¸ö×Ö½ÚµÄÏûÏ¢¡£Ã¿¸öÏûÏ¢¶¼¿ÉÒÔÓÉÒ»¸ö64λµÄÕûÊýoffset±ê×¢£¬offset±ê×¢ÁËÕâÌõÏûÏ¢ÔÚ·¢Ë͵½Õâ¸ö·ÖÇøµÄÏûÏ¢Á÷ÖÐµÄÆðʼλÖá£Ã¿¸öÈÕÖ¾ÎļþµÄÃû³Æ¶¼ÊÇÕâ¸öÎļþµÚÒ»ÌõÈÕÖ¾µÄoffset.ËùÒÔµÚÒ»¸öÈÕÖ¾ÎļþµÄÃû×Ö¾ÍÊÇ00000000000.kafka.ËùÒÔÿÏàÁÚµÄÁ½¸öÎļþÃû×ֵIJî¾ÍÊÇÒ»¸öÊý×ÖS,S²î²»¶à¾ÍÊÇÅäÖÃÎļþÖÐÖ¸¶¨µÄÈÕÖ¾ÎļþµÄ×î´óÈÝÁ¿¡£

ÏûÏ¢µÄ¸ñʽ¶¼ÓÉÒ»¸öͳһµÄ½Ó¿Úά»¤£¬ËùÒÔÏûÏ¢¿ÉÒÔÔÚproducer,brokerºÍconsumerÖ®¼äÎÞ·ìµÄ´«µÝ¡£´æ´¢ÔÚÓ²ÅÌÉϵÄÏûÏ¢¸ñʽÈçÏÂËùʾ£º

ÏûÏ¢³¤¶È: 4 bytes (value: 1+4+n)

°æ±¾ºÅ: 1 byte

CRCУÑéÂë: 4 bytes

¾ßÌåµÄÏûÏ¢: n bytes

д²Ù×÷ÏûÏ¢±»²»¶ÏµÄ×·¼Óµ½×îºóÒ»¸öÈÕÖ¾µÄĩ⣬µ±ÈÕÖ¾µÄ´óС´ïµ½Ò»¸öÖ¸¶¨µÄֵʱ¾Í»á²úÉúÒ»¸öеÄÎļþ¡£¶ÔÓÚд²Ù×÷ÓÐÁ½¸ö²ÎÊý£¬Ò»¸ö¹æ¶¨ÁËÏûÏ¢µÄÊýÁ¿´ïµ½Õâ¸öֵʱ±ØÐ뽫Êý¾Ýˢе½Ó²ÅÌÉÏ£¬ÁíÍâÒ»¸ö¹æ¶¨ÁËˢе½Ó²Å̵Äʱ¼ä¼ä¸ô£¬Õâ¶ÔÊý¾ÝµÄ³Ö¾ÃÐÔÊǸö±£Ö¤£¬ÔÚϵͳ±ÀÀ£µÄʱºòÖ»»á¶ªÊ§Ò»¶¨ÊýÁ¿µÄÏûÏ¢»òÕßÒ»¸öʱ¼ä¶ÎµÄÏûÏ¢¡£

¶Á²Ù×÷

¶Á²Ù×÷ÐèÒªÁ½¸ö²ÎÊý£ºÒ»¸ö64λµÄoffsetºÍÒ»¸öS×Ö½ÚµÄ×î´ó¶ÁÈ¡Á¿¡£Sͨ³£±Èµ¥¸öÏûÏ¢µÄ´óСҪ´ó£¬µ«ÔÚһЩ¸ö±ðÏûÏ¢±È½Ï´óµÄÇé¿öÏ£¬S»áСÓÚµ¥¸öÏûÏ¢µÄ´óС¡£ÕâÖÖÇé¿ö϶Á²Ù×÷»á²»¶ÏÖØÊÔ£¬Ã¿´ÎÖØÊÔ¶¼»á½«¶ÁÈ¡Á¿¼Ó±¶£¬Ö±µ½¶ÁÈ¡µ½Ò»¸öÍêÕûµÄÏûÏ¢¡£¿ÉÒÔÅäÖõ¥¸öÏûÏ¢µÄ×î´óÖµ£¬ÕâÑù·þÎñÆ÷¾Í»á¾Ü¾ø´óС³¬¹ýÕâ¸öÖµµÄÏûÏ¢¡£Ò²¿ÉÒÔ¸ø¿Í»§¶ËÖ¸¶¨Ò»¸ö³¢ÊÔ¶ÁÈ¡µÄ×î´óÉÏÏÞ£¬±ÜÃâΪÁ˶Áµ½Ò»¸öÍêÕûµÄÏûÏ¢¶øÎÞÏ޴εÄÖØÊÔ¡£

ÔÚʵ¼ÊÖ´ÐжÁÈ¡²Ù×Ýʱ£¬Ê×ÏÈÐèÒª¶¨Î»Êý¾ÝËùÔÚµÄÈÕÖ¾Îļþ£¬È»ºó¸ù¾Ýoffset¼ÆËã³öÔÚÕâ¸öÈÕÖ¾ÖеÄoffset(Ç°ÃæµÄµÄoffsetÊÇÕû¸ö·ÖÇøµÄoffset),È»ºóÔÚÕâ¸öoffsetµÄλÖýøÐжÁÈ¡¡£¶¨Î»²Ù×÷ÊÇÓɶþ·Ö²éÕÒ·¨Íê³ÉµÄ£¬KafkaÔÚÄÚ´æÖÐΪÿ¸öÎļþά»¤ÁËoffsetµÄ·¶Î§¡£

ÏÂÃæÊÇ·¢Ë͸øconsumerµÄ½á¹ûµÄ¸ñʽ£º

MessageSetSend (fetch result)

total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)

total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n

ɾ³ý

ÈÕÖ¾¹ÜÀíÆ÷ÔÊÐí¶¨ÖÆÉ¾³ý²ßÂÔ¡£Ä¿Ç°µÄ²ßÂÔÊÇɾ³ýÐÞ¸Äʱ¼äÔÚNÌì֮ǰµÄÈÕÖ¾£¨°´Ê±¼äɾ³ý£©£¬Ò²¿ÉÒÔʹÓÃÁíÍâÒ»¸ö²ßÂÔ£º±£Áô×îºóµÄN GBÊý¾ÝµÄ²ßÂÔ(°´´óСɾ³ý)¡£ÎªÁ˱ÜÃâÔÚɾ³ýʱ×èÈû¶Á²Ù×÷£¬²ÉÓÃÁËcopy-on-writeÐÎʽµÄʵÏÖ£¬É¾³ý²Ù×÷½øÐÐʱ£¬¶ÁÈ¡²Ù×÷µÄ¶þ·Ö²éÕÒ¹¦ÄÜʵ¼ÊÊÇÔÚÒ»¸ö¾²Ì¬µÄ¿ìÕÕ¸±±¾ÉϽøÐеģ¬ÕâÀàËÆÓÚJavaµÄCopyOnWriteArrayList¡£

¿É¿¿ÐÔ±£Ö¤

ÈÕÖ¾ÎļþÓÐÒ»¸ö¿ÉÅäÖõIJÎÊýM£¬»º´æ³¬¹ýÕâ¸öÊýÁ¿µÄÏûÏ¢½«±»Ç¿ÐÐˢе½Ó²ÅÌ¡£Ò»¸öÈÕÖ¾½ÃÕýÏ߳̽«Ñ­»·¼ì²é×îеÄÈÕÖ¾ÎļþÖеÄÏûϢȷÈÏÿ¸öÏûÏ¢¶¼ÊǺϷ¨µÄ¡£ºÏ·¨µÄ±ê׼Ϊ£ºËùÓÐÎļþµÄ´óСµÄºÍ×î´óµÄoffsetСÓÚÈÕÖ¾ÎļþµÄ´óС£¬²¢ÇÒÏûÏ¢µÄCRC32УÑéÂëÓë´æ´¢ÔÚÏûϢʵÌåÖеÄУÑéÂëÒ»Ö¡£Èç¹ûÔÚij¸öoffset·¢ÏÖ²»ºÏ·¨µÄÏûÏ¢£¬´ÓÕâ¸öoffsetµ½ÏÂÒ»¸öºÏ·¨µÄoffsetÖ®¼äµÄÄÚÈݽ«±»ÒƳý¡£

ÓÐÁ½ÖÖÇé¿ö±ØÐ뿼ÂÇ£º

1£¬µ±·¢Éú±ÀÀ£Ê±ÓÐЩÊý¾Ý¿éδÄÜдÈë¡£

2£¬Ð´ÈëÁËһЩ¿Õ°×Êý¾Ý¿é¡£µÚ¶þÖÖÇé¿öµÄÔ­ÒòÊÇ£¬¶ÔÓÚÿ¸öÎļþ£¬²Ù×÷ϵͳ¶¼ÓÐÒ»¸öinode£¨inodeÊÇÖ¸ÔÚÐí¶à¡°ÀàUnixÎļþϵͳ¡±ÖеÄÒ»ÖÖÊý¾Ý½á¹¹¡£Ã¿¸öinode±£´æÁËÎļþϵͳÖеÄÒ»¸öÎļþϵͳ¶ÔÏó,°üÀ¨Îļþ¡¢Ä¿Â¼¡¢´óС¡¢É豸Îļþ¡¢socket¡¢¹ÜµÀ, µÈµÈ£©£¬µ«ÎÞ·¨±£Ö¤¸üÐÂinodeºÍдÈëÊý¾ÝµÄ˳Ðò£¬µ±inode±£´æµÄ´óСÐÅÏ¢±»¸üÐÂÁË£¬µ«Ð´ÈëÊý¾Ýʱ·¢ÉúÁ˱ÀÀ££¬¾Í²úÉúÁ˿հ×Êý¾Ý¿é¡£CRCУÑéÂë¿ÉÒÔ¼ì²éÕâЩ¿é²¢ÒƳý£¬µ±È»ÒòΪ±ÀÀ£¶øÎ´Ð´ÈëµÄÊý¾Ý¿éÒ²¾Í¶ªÊ§ÁË¡£

   
9641 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ