Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
kafkaÈëÃÅ£º¼ò½é¡¢Ê¹Óó¡¾°¡¢Éè¼ÆÔ­Àí¡¢Ö÷ÒªÅäÖü°¼¯Èº´î½¨
 
À´Ô´£º²©¿ÍÔ° ·¢²¼ÓÚ£º2017-7-24
  3284  次浏览      31
 

Ò»¡¢ÈëÃÅ

1¡¢¼ò½é

Kafka is a distributed,partitioned,replicated commit logservice¡£ËüÌṩÁËÀàËÆÓÚJMSµÄÌØÐÔ£¬µ«ÊÇÔÚÉè¼ÆÊµÏÖÉÏÍêÈ«²»Í¬£¬´ËÍâËü²¢²»ÊÇJMS¹æ·¶µÄʵÏÖ¡£kafka¶ÔÏûÏ¢±£´æÊ±¸ù¾ÝTopic½øÐйéÀ࣬·¢ËÍÏûÏ¢Õß³ÉΪProducer,ÏûÏ¢½ÓÊÜÕß³ÉΪConsumer,´ËÍâkafka¼¯ÈºÓжà¸ökafkaʵÀý×é³É£¬Ã¿¸öʵÀý(server)³ÉΪbroker¡£ÎÞÂÛÊÇkafka¼¯Èº£¬»¹ÊÇproducerºÍconsumer¶¼ÒÀÀµÓÚzookeeperÀ´±£Ö¤ÏµÍ³¿ÉÓÃÐÔ¼¯Èº±£´æÒ»Ð©metaÐÅÏ¢¡£

2¡¢Topics/logs

Ò»¸öTopic¿ÉÒÔÈÏΪÊÇÒ»ÀàÏûÏ¢£¬Ã¿¸ötopic½«±»·Ö³É¶à¸öpartition(Çø),ÿ¸öpartitionÔÚ´æ´¢²ãÃæÊÇappend logÎļþ¡£Èκη¢²¼µ½´ËpartitionµÄÏûÏ¢¶¼»á±»Ö±½Ó×·¼Óµ½logÎļþµÄβ²¿£¬Ã¿ÌõÏûÏ¢ÔÚÎļþÖеÄλÖóÆÎªoffset£¨Æ«ÒÆÁ¿£©£¬offsetΪһ¸ölongÐÍÊý×Ö£¬ËüÊÇΨһ±ê¼ÇÒ»ÌõÏûÏ¢¡£ËüΨһµÄ±ê¼ÇÒ»ÌõÏûÏ¢¡£kafka²¢Ã»ÓÐÌṩÆäËû¶îÍâµÄË÷Òý»úÖÆÀ´´æ´¢offset£¬ÒòΪÔÚkafkaÖм¸ºõ²»ÔÊÐí¶ÔÏûÏ¢½øÐС°Ëæ»ú¶Áд¡±¡£

kafkaºÍJMS£¨Java Message Service£©ÊµÏÖ(activeMQ)²»Í¬µÄÊÇ:¼´Ê¹ÏûÏ¢±»Ïû·Ñ,ÏûÏ¢ÈÔÈ»²»»á±»Á¢¼´É¾³ý.ÈÕÖ¾Îļþ½«»á¸ù¾ÝbrokerÖеÄÅäÖÃÒªÇó,±£ÁôÒ»¶¨µÄʱ¼äÖ®ºóɾ³ý;±ÈÈçlogÎļþ±£Áô2Ìì,ÄÇôÁ½Ììºó,Îļþ»á±»Çå³ý,ÎÞÂÛÆäÖеÄÏûÏ¢ÊÇ·ñ±»Ïû·Ñ.kafkaͨ¹ýÕâÖÖ¼òµ¥µÄÊÖ¶Î,À´ÊÍ·Å´ÅÅ̿ռä,ÒÔ¼°¼õÉÙÏûÏ¢Ïû·ÑÖ®ºó¶ÔÎļþÄÚÈݸ͝µÄ´ÅÅÌIO¿ªÖ§.

¶ÔÓÚconsumer¶øÑÔ,ËüÐèÒª±£´æÏû·ÑÏûÏ¢µÄoffset,¶ÔÓÚoffsetµÄ±£´æºÍʹÓÃ,ÓÐconsumerÀ´¿ØÖÆ;µ±consumerÕý³£Ïû·ÑÏûϢʱ,offset½«»á"ÏßÐÔ"µÄÏòǰÇý¶¯,¼´ÏûÏ¢½«ÒÀ´Î˳Ðò±»Ïû·Ñ.ÊÂʵÉÏconsumer¿ÉÒÔʹÓÃÈÎÒâ˳ÐòÏû·ÑÏûÏ¢,ËüÖ»ÐèÒª½«offsetÖØÖÃΪÈÎÒâÖµ..(offset½«»á±£´æÔÚzookeeperÖÐ,²Î¼ûÏÂÎÄ)

kafka¼¯Èº¼¸ºõ²»ÐèҪά»¤ÈκÎconsumerºÍproducer״̬ÐÅÏ¢,ÕâЩÐÅÏ¢ÓÐzookeeper±£´æ;Òò´ËproducerºÍconsumerµÄ¿Í»§¶ËʵÏַdz£ÇáÁ¿¼¶,ËüÃÇ¿ÉÒÔËæÒâÀ뿪,¶ø²»»á¶Ô¼¯ÈºÔì³É¶îÍâµÄÓ°Ïì.

partitionsµÄÉè¼ÆÄ¿µÄÓжà¸ö.×î¸ù±¾Ô­ÒòÊÇkafka»ùÓÚÎļþ´æ´¢.ͨ¹ý·ÖÇø,¿ÉÒÔ½«ÈÕÖ¾ÄÚÈÝ·ÖÉ¢µ½¶à¸öserverÉÏ,À´±ÜÃâÎļþ³ß´ç´ïµ½µ¥»ú´ÅÅ̵ÄÉÏÏÞ,ÿ¸öpartiton¶¼»á±»µ±Ç°server(kafkaʵÀý)±£´æ;¿ÉÒÔ½«Ò»¸ötopicÇзֶàÈÎÒâ¶à¸öpartitions,À´ÏûÏ¢±£´æ/Ïû·ÑµÄЧÂÊ.´ËÍâÔ½¶àµÄpartitionsÒâζ×Å¿ÉÒÔÈÝÄɸü¶àµÄconsumer,ÓÐЧÌáÉý²¢·¢Ïû·ÑµÄÄÜÁ¦.(¾ßÌåÔ­Àí²Î¼ûÏÂÎÄ).

3¡¢Distribution

Ò»¸öTopicµÄ¶à¸öpartitions,±»·Ö²¼ÔÚkafka¼¯ÈºÖеĶà¸öserverÉÏ;ÿ¸öserver(kafkaʵÀý)¸ºÔðpartitionsÖÐÏûÏ¢µÄ¶Áд²Ù×÷;´ËÍâkafka»¹¿ÉÒÔÅäÖÃpartitionsÐèÒª±¸·ÝµÄ¸öÊý(replicas),ÿ¸öpartition½«»á±»±¸·Ýµ½¶ą̀»úÆ÷ÉÏ,ÒÔÌá¸ß¿ÉÓÃÐÔ.

»ùÓÚreplicated·½°¸,ÄÇô¾ÍÒâζ×ÅÐèÒª¶Ô¶à¸ö±¸·Ý½øÐе÷¶È;ÿ¸öpartition¶¼ÓÐÒ»¸öserverΪ"leader";leader¸ºÔðËùÓеĶÁд²Ù×÷,Èç¹ûleaderʧЧ,ÄÇô½«»áÓÐÆäËûfollowerÀ´½Ó¹Ü(³ÉΪеÄleader);followerÖ»Êǵ¥µ÷µÄºÍleader¸ú½ø,ͬ²½ÏûÏ¢¼´¿É..Óɴ˿ɼû×÷ΪleaderµÄserver³ÐÔØÁËÈ«²¿µÄÇëÇóѹÁ¦,Òò´Ë´Ó¼¯ÈºµÄÕûÌ忼ÂÇ,ÓжàÉÙ¸öpartitions¾ÍÒâζ×ÅÓжàÉÙ¸ö"leader",kafka»á½«"leader"¾ùºâµÄ·ÖÉ¢ÔÚÿ¸öʵÀýÉÏ,À´È·±£ÕûÌåµÄÐÔÄÜÎȶ¨.

Producers

Producer½«ÏûÏ¢·¢²¼µ½Ö¸¶¨µÄTopicÖÐ,ͬʱProducerÒ²Äܾö¶¨½«´ËÏûÏ¢¹éÊôÓÚÄĸöpartition;±ÈÈç»ùÓÚ"round-robin"·½Ê½»òÕßͨ¹ýÆäËûµÄһЩËã·¨µÈ.

Consumers

±¾ÖÊÉÏkafkaÖ»Ö§³ÖTopic.ÿ¸öconsumerÊôÓÚÒ»¸öconsumer group;·´¹ýÀ´Ëµ,ÿ¸ögroupÖпÉÒÔÓжà¸öconsumer.·¢Ë͵½TopicµÄÏûÏ¢,Ö»»á±»¶©ÔÄ´ËTopicµÄÿ¸ögroupÖеÄÒ»¸öconsumerÏû·Ñ.

Èç¹ûËùÓеÄconsumer¶¼¾ßÓÐÏàͬµÄgroup,ÕâÖÖÇé¿öºÍqueueģʽºÜÏñ;ÏûÏ¢½«»áÔÚconsumersÖ®¼ä¸ºÔؾùºâ.

Èç¹ûËùÓеÄconsumer¶¼¾ßÓв»Í¬µÄgroup,ÄÇÕâ¾ÍÊÇ"·¢²¼-¶©ÔÄ";ÏûÏ¢½«»á¹ã²¥¸øËùÓеÄÏû·ÑÕß.

ÔÚkafkaÖÐ,Ò»¸öpartitionÖеÄÏûÏ¢Ö»»á±»groupÖеÄÒ»¸öconsumerÏû·Ñ;ÿ¸ögroupÖÐconsumerÏûÏ¢Ïû·Ñ»¥Ïà¶ÀÁ¢;ÎÒÃÇ¿ÉÒÔÈÏΪһ¸ögroupÊÇÒ»¸ö"¶©ÔÄ"Õß,Ò»¸öTopicÖеÄÿ¸öpartions,Ö»»á±»Ò»¸ö"¶©ÔÄÕß"ÖеÄÒ»¸öconsumerÏû·Ñ,²»¹ýÒ»¸öconsumer¿ÉÒÔÏû·Ñ¶à¸öpartitionsÖеÄÏûÏ¢.kafkaÖ»Äܱ£Ö¤Ò»¸öpartitionÖеÄÏûÏ¢±»Ä³¸öconsumerÏû·Ñʱ,ÏûÏ¢ÊÇ˳ÐòµÄ.ÊÂʵÉÏ,´ÓTopic½Ç¶ÈÀ´Ëµ,ÏûÏ¢ÈÔ²»ÊÇÓÐÐòµÄ.

kafkaµÄÉè¼ÆÔ­Àí¾ö¶¨,¶ÔÓÚÒ»¸ötopic,ͬһ¸ögroupÖв»ÄÜÓжàÓÚpartitions¸öÊýµÄconsumerͬʱÏû·Ñ,·ñÔò½«Òâζ×ÅijЩconsumer½«ÎÞ·¨µÃµ½ÏûÏ¢.

Guarantees

1) ·¢Ë͵½partitionsÖеÄÏûÏ¢½«»á°´ÕÕËü½ÓÊÕµÄ˳Ðò×·¼Óµ½ÈÕÖ¾ÖÐ

2) ¶ÔÓÚÏû·ÑÕß¶øÑÔ,ËüÃÇÏû·ÑÏûÏ¢µÄ˳ÐòºÍÈÕÖ¾ÖÐÏûϢ˳ÐòÒ»ÖÂ.

3) Èç¹ûTopicµÄ"replicationfactor"ΪN,ÄÇôÔÊÐíN-1¸ökafkaʵÀýʧЧ.

¶þ¡¢Ê¹Óó¡¾°

1¡¢Messaging

¶ÔÓÚһЩ³£¹æµÄÏûϢϵͳ,kafkaÊǸö²»´íµÄÑ¡Ôñ;partitons/replicationºÍÈÝ´í,¿ÉÒÔʹkafka¾ßÓÐÁ¼ºÃµÄÀ©Õ¹ÐÔºÍÐÔÄÜÓÅÊÆ.²»¹ýµ½Ä¿Ç°ÎªÖ¹,ÎÒÃÇÓ¦¸ÃºÜÇå³þÈÏʶµ½,kafka²¢Ã»ÓÐÌṩJMSÖеÄ"ÊÂÎñÐÔ""ÏûÏ¢´«Êäµ£±£(ÏûϢȷÈÏ»úÖÆ)""ÏûÏ¢·Ö×é"µÈÆóÒµ¼¶ÌØÐÔ;kafkaÖ»ÄÜʹÓÃ×÷Ϊ"³£¹æ"µÄÏûϢϵͳ,ÔÚÒ»¶¨³Ì¶ÈÉÏ,ÉÐδȷ±£ÏûÏ¢µÄ·¢ËÍÓë½ÓÊÕ¾ø¶Ô¿É¿¿(±ÈÈç,ÏûÏ¢ÖØ·¢,ÏûÏ¢·¢ËͶªÊ§µÈ)

2¡¢Websit activity tracking

kafka¿ÉÒÔ×÷Ϊ"ÍøÕ¾»îÐÔ¸ú×Ù"µÄ×î¼Ñ¹¤¾ß;¿ÉÒÔ½«ÍøÒ³/Óû§²Ù×÷µÈÐÅÏ¢·¢Ë͵½kafkaÖÐ.²¢ÊµÊ±¼à¿Ø,»òÕßÀëÏßͳ¼Æ·ÖÎöµÈ

3¡¢Log Aggregation

kafkaµÄÌØÐÔ¾ö¶¨Ëü·Ç³£ÊʺÏ×÷Ϊ"ÈÕÖ¾ÊÕ¼¯ÖÐÐÄ";application¿ÉÒÔ½«²Ù×÷ÈÕÖ¾"ÅúÁ¿""Òì²½"µÄ·¢Ë͵½kafka¼¯ÈºÖÐ,¶ø²»ÊDZ£´æÔÚ±¾µØ»òÕßDBÖÐ;kafka¿ÉÒÔÅúÁ¿Ìá½»ÏûÏ¢/ѹËõÏûÏ¢µÈ,Õâ¶Ôproducer¶Ë¶øÑÔ,¼¸ºõ¸Ð¾õ²»µ½ÐÔÄܵĿªÖ§.´Ëʱconsumer¶Ë¿ÉÒÔʹhadoopµÈÆäËûϵͳ»¯µÄ´æ´¢ºÍ·ÖÎöϵͳ.

Èý¡¢Éè¼ÆÔ­Àí

kafkaµÄÉè¼Æ³õÖÔÊÇÏ£Íû×÷Ϊһ¸öͳһµÄÐÅÏ¢ÊÕ¼¯Æ½Ì¨,Äܹ»ÊµÊ±µÄÊÕ¼¯·´À¡ÐÅÏ¢,²¢ÐèÒªÄܹ»Ö§³Å½Ï´óµÄÊý¾ÝÁ¿,ÇҾ߱¸Á¼ºÃµÄÈÝ´íÄÜÁ¦.

1¡¢³Ö¾ÃÐÔ

kafkaʹÓÃÎļþ´æ´¢ÏûÏ¢,Õâ¾ÍÖ±½Ó¾ö¶¨kafkaÔÚÐÔÄÜÉÏÑÏÖØÒÀÀµÎļþϵͳµÄ±¾ÉíÌØÐÔ.ÇÒÎÞÂÛÈκÎOSÏÂ,¶ÔÎļþϵͳ±¾ÉíµÄÓÅ»¯¼¸ºõûÓпÉÄÜ.Îļþ»º´æ/Ö±½ÓÄÚ´æÓ³ÉäµÈÊdz£ÓõÄÊÖ¶Î.ÒòΪkafkaÊǶÔÈÕÖ¾Îļþ½øÐÐappend²Ù×÷,Òò´Ë´ÅÅ̼ìË÷µÄ¿ªÖ§ÊǽÏСµÄ;ͬʱΪÁ˼õÉÙ´ÅÅÌдÈëµÄ´ÎÊý,broker»á½«ÏûÏ¢ÔÝʱbufferÆðÀ´,µ±ÏûÏ¢µÄ¸öÊý(»ò³ß´ç)´ïµ½Ò»¶¨·§ÖµÊ±,ÔÙflushµ½´ÅÅÌ,ÕâÑù¼õÉÙÁË´ÅÅÌIOµ÷ÓõĴÎÊý.

2¡¢ÐÔÄÜ

ÐèÒª¿¼ÂǵÄÓ°ÏìÐÔÄܵãºÜ¶à,³ý´ÅÅÌIOÖ®Íâ,ÎÒÃÇ»¹ÐèÒª¿¼ÂÇÍøÂçIO,ÕâÖ±½Ó¹ØÏµµ½kafkaµÄÍÌÍÂÁ¿ÎÊÌâ.kafka²¢Ã»ÓÐÌṩ̫¶à¸ß³¬µÄ¼¼ÇÉ;¶ÔÓÚproducer¶Ë,¿ÉÒÔ½«ÏûÏ¢bufferÆðÀ´,µ±ÏûÏ¢µÄÌõÊý´ïµ½Ò»¶¨·§ÖµÊ±,ÅúÁ¿·¢Ë͸øbroker;¶ÔÓÚconsumer¶ËÒ²ÊÇÒ»Ñù,ÅúÁ¿fetch¶àÌõÏûÏ¢.²»¹ýÏûÏ¢Á¿µÄ´óС¿ÉÒÔͨ¹ýÅäÖÃÎļþÀ´Ö¸¶¨.¶ÔÓÚkafka broker¶Ë,ËÆºõÓиösendfileϵͳµ÷ÓÿÉÒÔDZÔÚµÄÌáÉýÍøÂçIOµÄÐÔÄÜ:½«ÎļþµÄÊý¾ÝÓ³É䵽ϵͳÄÚ´æÖÐ,socketÖ±½Ó¶ÁÈ¡ÏàÓ¦µÄÄÚ´æÇøÓò¼´¿É,¶øÎÞÐè½ø³ÌÔÙ´ÎcopyºÍ½»»». Æäʵ¶ÔÓÚproducer/consumer/brokerÈýÕß¶øÑÔ,CPUµÄ¿ªÖ§Ó¦¸Ã¶¼²»´ó,Òò´ËÆôÓÃÏûϢѹËõ»úÖÆÊÇÒ»¸öÁ¼ºÃµÄ²ßÂÔ;ѹËõÐèÒªÏûºÄÉÙÁ¿µÄCPU×ÊÔ´,²»¹ý¶ÔÓÚkafka¶øÑÔ,ÍøÂçIO¸üÓ¦¸ÃÐèÒª¿¼ÂÇ.¿ÉÒÔ½«ÈκÎÔÚÍøÂçÉÏ´«ÊäµÄÏûÏ¢¶¼¾­¹ýѹËõ.kafkaÖ§³Ögzip/snappyµÈ¶àÖÖѹËõ·½Ê½.

3¡¢Éú²úÕß

¸ºÔؾùºâ: producer½«»áºÍTopicÏÂËùÓÐpartition leader±£³ÖsocketÁ¬½Ó;ÏûÏ¢ÓÉproducerÖ±½Óͨ¹ýsocket·¢Ë͵½broker,Öм䲻»á¾­¹ýÈκÎ"·Óɲã".ÊÂʵÉÏ,ÏûÏ¢±»Â·Óɵ½ÄĸöpartitionÉÏ,ÓÐproducer¿Í»§¶Ë¾ö¶¨.±ÈÈç¿ÉÒÔ²ÉÓÃ"random""key-hash""ÂÖѯ"µÈ,Èç¹ûÒ»¸ötopicÖÐÓжà¸öpartitions,ÄÇôÔÚproducer¶ËʵÏÖ"ÏûÏ¢¾ùºâ·Ö·¢"ÊDZØÒªµÄ.

ÆäÖÐpartition leaderµÄλÖÃ(host:port)×¢²áÔÚzookeeperÖÐ,producer×÷Ϊzookeeper client,ÒѾ­×¢²áÁËwatchÓÃÀ´¼àÌýpartition leaderµÄ±ä¸üʼþ.

Òì²½·¢ËÍ£º½«¶àÌõÏûÏ¢ÔÝÇÒÔÚ¿Í»§¶ËbufferÆðÀ´£¬²¢½«ËûÃÇÅúÁ¿µÄ·¢Ë͵½broker£¬Ð¡Êý¾ÝIOÌ«¶à£¬»áÍÏÂýÕûÌåµÄÍøÂçÑÓ³Ù£¬ÅúÁ¿ÑÓ³Ù·¢ËÍÊÂʵÉÏÌáÉýÁËÍøÂçЧÂÊ¡£²»¹ýÕâÒ²ÓÐÒ»¶¨µÄÒþ»¼£¬±ÈÈç˵µ±producerʧЧʱ£¬ÄÇЩÉÐδ·¢Ë͵ÄÏûÏ¢½«»á¶ªÊ§¡£

4¡¢Ïû·ÑÕß

consumer¶ËÏòbroker·¢ËÍ"fetch"ÇëÇó,²¢¸æÖªÆä»ñÈ¡ÏûÏ¢µÄoffset;´Ëºóconsumer½«»á»ñµÃÒ»¶¨ÌõÊýµÄÏûÏ¢;consumer¶ËÒ²¿ÉÒÔÖØÖÃoffsetÀ´ÖØÐÂÏû·ÑÏûÏ¢.

ÔÚJMSʵÏÖÖÐ,TopicÄ£ÐÍ»ùÓÚpush·½Ê½,¼´broker½«ÏûÏ¢ÍÆË͸øconsumer¶Ë.²»¹ýÔÚkafkaÖÐ,²ÉÓÃÁËpull·½Ê½,¼´consumerÔÚºÍbroker½¨Á¢Á¬½ÓÖ®ºó,Ö÷¶¯È¥pull(»òÕß˵fetch)ÏûÏ¢;ÕâÖÐģʽÓÐЩÓŵã,Ê×ÏÈconsumer¶Ë¿ÉÒÔ¸ù¾Ý×Ô¼ºµÄÏû·ÑÄÜÁ¦ÊÊʱµÄÈ¥fetchÏûÏ¢²¢´¦Àí,ÇÒ¿ÉÒÔ¿ØÖÆÏûÏ¢Ïû·ÑµÄ½ø¶È(offset);´ËÍâ,Ïû·ÑÕß¿ÉÒÔÁ¼ºÃµÄ¿ØÖÆÏûÏ¢Ïû·ÑµÄÊýÁ¿,batch fetch.

ÆäËûJMSʵÏÖ,ÏûÏ¢Ïû·ÑµÄλÖÃÊÇÓÐprodiver±£Áô,ÒÔ±ã±ÜÃâÖØ¸´·¢ËÍÏûÏ¢»òÕß½«Ã»ÓÐÏû·Ñ³É¹¦µÄÏûÏ¢ÖØ·¢µÈ,ͬʱ»¹Òª¿ØÖÆÏûÏ¢µÄ״̬.Õâ¾ÍÒªÇóJMS brokerÐèҪ̫¶à¶îÍâµÄ¹¤×÷.ÔÚkafkaÖÐ,partitionÖеÄÏûÏ¢Ö»ÓÐÒ»¸öconsumerÔÚÏû·Ñ,ÇÒ²»´æÔÚÏûϢ״̬µÄ¿ØÖÆ,ҲûÓи´ÔÓµÄÏûϢȷÈÏ»úÖÆ,¿É¼ûkafka broker¶ËÊÇÏ൱ÇáÁ¿¼¶µÄ.µ±ÏûÏ¢±»consumer½ÓÊÕÖ®ºó,consumer¿ÉÒÔÔÚ±¾µØ±£´æ×îºóÏûÏ¢µÄoffset,²¢¼äЪÐÔµÄÏòzookeeper×¢²áoffset.Óɴ˿ɼû,consumer¿Í»§¶ËÒ²ºÜÇáÁ¿¼¶.

5¡¢ÏûÏ¢´«ËÍ»úÖÆ

¶ÔÓÚJMSʵÏÖ,ÏûÏ¢´«Êäµ£±£·Ç³£Ö±½Ó:ÓÐÇÒÖ»ÓÐÒ»´Î(exactly once).ÔÚkafkaÖÐÉÔÓв»Í¬:

1) at most once: ×î¶àÒ»´Î,Õâ¸öºÍJMSÖÐ"·Ç³Ö¾Ã»¯"ÏûÏ¢ÀàËÆ.·¢ËÍÒ»´Î,ÎÞÂ۳ɰÜ,½«²»»áÖØ·¢.

2) at least once: ÏûÏ¢ÖÁÉÙ·¢ËÍÒ»´Î,Èç¹ûÏûϢδÄܽÓÊܳɹ¦,¿ÉÄÜ»áÖØ·¢,Ö±µ½½ÓÊճɹ¦.

3) exactly once: ÏûÏ¢Ö»»á·¢ËÍÒ»´Î.

at most once: Ïû·ÑÕßfetchÏûÏ¢,È»ºó±£´æoffset,È»ºó´¦ÀíÏûÏ¢;µ±client±£´æoffsetÖ®ºó,µ«ÊÇÔÚÏûÏ¢´¦Àí¹ý³ÌÖгöÏÖÁËÒì³£,µ¼Ö²¿·ÖÏûϢδÄܼÌÐø´¦Àí.ÄÇô´Ëºó"δ´¦Àí"µÄÏûÏ¢½«²»Äܱ»fetchµ½,Õâ¾ÍÊÇ"at most once".

at least once: Ïû·ÑÕßfetchÏûÏ¢,È»ºó´¦ÀíÏûÏ¢,È»ºó±£´æoffset.Èç¹ûÏûÏ¢´¦Àí³É¹¦Ö®ºó,µ«ÊÇÔÚ±£´æoffset½×¶ÎzookeeperÒì³£µ¼Ö±£´æ²Ù×÷δÄÜÖ´Ðгɹ¦,Õâ¾Íµ¼Ö½ÓÏÂÀ´ÔÙ´Îfetchʱ¿ÉÄÜ»ñµÃÉÏ´ÎÒѾ­´¦Àí¹ýµÄÏûÏ¢,Õâ¾ÍÊÇ"at least once"£¬Ô­ÒòoffsetûÓм°Ê±µÄÌá½»¸øzookeeper£¬zookeeper»Ö¸´Õý³£»¹ÊÇ֮ǰoffset״̬.

exactly once: kafkaÖв¢Ã»ÓÐÑϸñµÄȥʵÏÖ(»ùÓÚ2½×¶ÎÌá½»,ÊÂÎñ),ÎÒÃÇÈÏΪÕâÖÖ²ßÂÔÔÚkafkaÖÐÊÇûÓбØÒªµÄ.

ͨ³£Çé¿öÏÂ"at-least-once"ÊÇÎÒÃÇËÑÑ¡.(Ïà±Èat most once¶øÑÔ,ÖØ¸´½ÓÊÕÊý¾Ý×ܱȶªÊ§Êý¾ÝÒªºÃ).

6¡¢¸´ÖƱ¸·Ý

kafka½«Ã¿¸öpartitionÊý¾Ý¸´ÖƵ½¶à¸öserverÉÏ,ÈκÎÒ»¸öpartitionÓÐÒ»¸öleaderºÍ¶à¸öfollower(¿ÉÒÔûÓÐ);±¸·ÝµÄ¸öÊý¿ÉÒÔͨ¹ýbrokerÅäÖÃÎļþÀ´É趨.leader´¦ÀíËùÓеÄread-writeÇëÇó,followerÐèÒªºÍleader±£³Öͬ²½.FollowerºÍconsumerÒ»Ñù,Ïû·ÑÏûÏ¢²¢±£´æÔÚ±¾µØÈÕÖ¾ÖÐ;leader¸ºÔð¸ú×ÙËùÓеÄfollower״̬,Èç¹ûfollower"Âäºó"Ì«¶à»òÕßʧЧ,leader½«»á°ÑËü´Óreplicasͬ²½ÁбíÖÐɾ³ý.µ±ËùÓеÄfollower¶¼½«Ò»ÌõÏûÏ¢±£´æ³É¹¦,´ËÏûÏ¢²Å±»ÈÏΪÊÇ"committed",ÄÇô´Ëʱconsumer²ÅÄÜÏû·ÑËü.¼´Ê¹Ö»ÓÐÒ»¸öreplicasʵÀý´æ»î,ÈÔÈ»¿ÉÒÔ±£Ö¤ÏûÏ¢µÄÕý³£·¢ËͺͽÓÊÕ,Ö»Òªzookeeper¼¯Èº´æ»î¼´¿É.(²»Í¬ÓÚÆäËû·Ö²¼Ê½´æ´¢,±ÈÈçhbaseÐèÒª"¶àÊýÅÉ"´æ»î²ÅÐÐ)

µ±leaderʧЧʱ,ÐèÔÚfollowersÖÐѡȡ³öеÄleader,¿ÉÄÜ´ËʱfollowerÂäºóÓÚleader,Òò´ËÐèҪѡÔñÒ»¸ö"up-to-date"µÄfollower.Ñ¡ÔñfollowerʱÐèÒª¼æ¹ËÒ»¸öÎÊÌâ,¾ÍÊÇÐÂleaderserverÉÏËùÒѾ­³ÐÔØµÄpartition leaderµÄ¸öÊý,Èç¹ûÒ»¸öserverÉÏÓйý¶àµÄpartition leader,Òâζ×Å´Ëserver½«³ÐÊÜןü¶àµÄIOѹÁ¦.ÔÚÑ¡¾ÙÐÂleader,ÐèÒª¿¼Âǵ½"¸ºÔؾùºâ".

7.ÈÕÖ¾

Èç¹ûÒ»¸ötopicµÄÃû³ÆÎª"my_topic",ËüÓÐ2¸öpartitions,ÄÇôÈÕÖ¾½«»á±£´æÔÚmy_topic_0ºÍmy_topic_1Á½¸öĿ¼ÖÐ;ÈÕÖ¾ÎļþÖб£´æÁËÒ»ÐòÁÐ"log entries"(ÈÕÖ¾ÌõÄ¿),ÿ¸ölog entry¸ñʽΪ"4¸ö×Ö½ÚµÄÊý×ÖN±íʾÏûÏ¢µÄ³¤¶È" + "N¸ö×Ö½ÚµÄÏûÏ¢ÄÚÈÝ";ÿ¸öÈÕÖ¾¶¼ÓÐÒ»¸öoffsetÀ´Î¨Ò»µÄ±ê¼ÇÒ»ÌõÏûÏ¢,offsetµÄֵΪ8¸ö×Ö½ÚµÄÊý×Ö,±íʾ´ËÏûÏ¢ÔÚ´ËpartitionÖÐËù´¦µÄÆðʼλÖÃ..ÿ¸öpartitionÔÚÎïÀí´æ´¢²ãÃæ,Óжà¸ölog file×é³É(³ÆÎªsegment).segmentfileµÄÃüÃûΪ"×îСoffset".kafka.ÀýÈç"00000000000.kafka";ÆäÖÐ"×îСoffset"±íʾ´ËsegmentÖÐÆðʼÏûÏ¢µÄoffset.

ÆäÖÐÿ¸öpartitonÖÐËù³ÖÓеÄsegmentsÁбíÐÅÏ¢»á´æ´¢ÔÚzookeeperÖÐ.

µ±segmentÎļþ³ß´ç´ïµ½Ò»¶¨·§ÖµÊ±(¿ÉÒÔͨ¹ýÅäÖÃÎļþÉ趨,ĬÈÏ1G),½«»á´´½¨Ò»¸öеÄÎļþ;µ±bufferÖÐÏûÏ¢µÄÌõÊý´ïµ½·§ÖµÊ±½«»á´¥·¢ÈÕÖ¾ÐÅÏ¢flushµ½ÈÕÖ¾ÎļþÖÐ,ͬʱÈç¹û"¾àÀë×î½üÒ»´ÎflushµÄʱ¼ä²î"´ïµ½·§ÖµÊ±,Ò²»á´¥·¢flushµ½ÈÕÖ¾Îļþ.Èç¹ûbrokerʧЧ,¼«ÓпÉÄܻᶪʧÄÇЩÉÐδflushµ½ÎļþµÄÏûÏ¢.ÒòΪserverÒâÍâʵÏÖ,ÈÔÈ»»áµ¼ÖÂlogÎļþ¸ñʽµÄÆÆ»µ(Îļþβ²¿),ÄÇô¾ÍÒªÇóµ±serverÆô¶«ÊÇÐèÒª¼ì²â×îºóÒ»¸ösegmentµÄÎļþ½á¹¹ÊÇ·ñºÏ·¨²¢½øÐбØÒªµÄÐÞ¸´.

»ñÈ¡ÏûϢʱ,ÐèÒªÖ¸¶¨offsetºÍ×î´óchunk³ß´ç,offsetÓÃÀ´±íʾÏûÏ¢µÄÆðʼλÖÃ,chunk sizeÓÃÀ´±íʾ×î´ó»ñÈ¡ÏûÏ¢µÄ×ܳ¤¶È(¼ä½ÓµÄ±íʾÏûÏ¢µÄÌõÊý).¸ù¾Ýoffset,¿ÉÒÔÕÒµ½´ËÏûÏ¢ËùÔÚsegmentÎļþ,È»ºó¸ù¾ÝsegmentµÄ×îСoffsetÈ¡²îÖµ,µÃµ½ËüÔÚfileÖеÄÏà¶ÔλÖÃ,Ö±½Ó¶ÁÈ¡Êä³ö¼´¿É.

ÈÕÖ¾ÎļþµÄɾ³ý²ßÂԷdz£¼òµ¥:Æô¶¯Ò»¸öºǫ́Ï̶߳¨ÆÚɨÃèlog fileÁбí,°Ñ±£´æÊ±¼ä³¬¹ý·§ÖµµÄÎļþÖ±½Óɾ³ý(¸ù¾ÝÎļþµÄ´´½¨Ê±¼ä).ΪÁ˱ÜÃâɾ³ýÎļþʱÈÔÈ»ÓÐread²Ù×÷(consumerÏû·Ñ),²ÉÈ¡copy-on-write·½Ê½.

8¡¢·ÖÅä

kafkaʹÓÃzookeeperÀ´´æ´¢Ò»Ð©metaÐÅÏ¢,²¢Ê¹ÓÃÁËzookeeper watch»úÖÆÀ´·¢ÏÖmetaÐÅÏ¢µÄ±ä¸ü²¢×÷³öÏàÓ¦µÄ¶¯×÷(±ÈÈçconsumerʧЧ,´¥·¢¸ºÔؾùºâµÈ)

1) Broker node registry: µ±Ò»¸ökafkabrokerÆô¶¯ºó,Ê×ÏÈ»áÏòzookeeper×¢²á×Ô¼ºµÄ½ÚµãÐÅÏ¢(ÁÙʱznode),ͬʱµ±brokerºÍzookeeper¶Ï¿ªÁ¬½Óʱ,´ËznodeÒ²»á±»É¾³ý.

¸ñʽ: /broker/ids/[0...N] -->host:port;ÆäÖÐ[0..N]±íʾbroker id,ÿ¸öbrokerµÄÅäÖÃÎļþÖж¼ÐèÒªÖ¸¶¨Ò»¸öÊý×ÖÀàÐ͵Äid(È«¾Ö²»¿ÉÖØ¸´),znodeµÄֵΪ´ËbrokerµÄhost:portÐÅÏ¢.

2) Broker Topic Registry: µ±Ò»¸öbrokerÆô¶¯Ê±,»áÏòzookeeper×¢²á×Ô¼º³ÖÓеÄtopicºÍpartitionsÐÅÏ¢,ÈÔÈ»ÊÇÒ»¸öÁÙʱznode.

¸ñʽ: /broker/topics/[topic]/[0...N] ÆäÖÐ[0..N]±íʾpartitionË÷ÒýºÅ.

3) Consumer and Consumer group: ÿ¸öconsumer¿Í»§¶Ë±»´´½¨Ê±,»áÏòzookeeper×¢²á×Ô¼ºµÄÐÅÏ¢;´Ë×÷ÓÃÖ÷ÒªÊÇΪÁË"¸ºÔؾùºâ".

Ò»¸ögroupÖеĶà¸öconsumer¿ÉÒÔ½»´íµÄÏû·ÑÒ»¸ötopicµÄËùÓÐpartitions;¼ò¶øÑÔÖ®,±£Ö¤´ËtopicµÄËùÓÐpartitions¶¼Äܱ»´ËgroupËùÏû·Ñ,ÇÒÏû·ÑʱΪÁËÐÔÄÜ¿¼ÂÇ,ÈÃpartitionÏà¶Ô¾ùºâµÄ·ÖÉ¢µ½Ã¿¸öconsumerÉÏ.

4) Consumer id Registry: ÿ¸öconsumer¶¼ÓÐÒ»¸öΨһµÄID(host:uuid,¿ÉÒÔͨ¹ýÅäÖÃÎļþÖ¸¶¨,Ò²¿ÉÒÔÓÉϵͳÉú³É),´ËidÓÃÀ´±ê¼ÇÏû·ÑÕßÐÅÏ¢.

¸ñʽ:/consumers/[group_id]/ids/[consumer_id]

ÈÔÈ»ÊÇÒ»¸öÁÙʱµÄznode,´Ë½ÚµãµÄֵΪ{"topic_name":#streams...},¼´±íʾ´ËconsumerĿǰËùÏû·ÑµÄtopic + partitionsÁбí.

5) Consumer offset Tracking: ÓÃÀ´¸ú×Ùÿ¸öconsumerĿǰËùÏû·ÑµÄpartitionÖÐ×î´óµÄoffset.

¸ñʽ:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value

´ËznodeΪ³Ö¾Ã½Úµã,¿ÉÒÔ¿´³öoffset¸úgroup_idÓйØ,ÒÔ±íÃ÷µ±groupÖÐÒ»¸öÏû·ÑÕßʧЧ,ÆäËûconsumer¿ÉÒÔ¼ÌÐøÏû·Ñ.

6) Partition Owner registry: ÓÃÀ´±ê¼Çpartition±»ÄĸöconsumerÏû·Ñ.ÁÙʱznode

¸ñʽ:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_idµ±consumerÆô¶¯Ê±,Ëù´¥·¢µÄ²Ù×÷:

A) Ê×ÏȽøÐÐ"Consumer id Registry";

B) È»ºóÔÚ"Consumer id Registry"½ÚµãÏÂ×¢²áÒ»¸öwatchÓÃÀ´¼àÌýµ±Ç°groupÖÐÆäËûconsumerµÄ"leave"ºÍ"join";Ö»Òª´Ëznode pathϽڵãÁбí±ä¸ü,¶¼»á´¥·¢´ËgroupÏÂconsumerµÄ¸ºÔؾùºâ.(±ÈÈçÒ»¸öconsumerʧЧ,ÄÇôÆäËûconsumer½Ó¹Üpartitions).

C) ÔÚ"Broker id registry"½ÚµãÏÂ,×¢²áÒ»¸öwatchÓÃÀ´¼àÌýbrokerµÄ´æ»îÇé¿ö;Èç¹ûbrokerÁбí±ä¸ü,½«»á´¥·¢ËùÓеÄgroupsϵÄconsumerÖØÐÂbalance.

1) Producer¶ËʹÓÃzookeeperÓÃÀ´"·¢ÏÖ"brokerÁбí,ÒÔ¼°ºÍTopicÏÂÿ¸öpartition leader½¨Á¢socketÁ¬½Ó²¢·¢ËÍÏûÏ¢.

2) Broker¶ËʹÓÃzookeeperÓÃÀ´×¢²ábrokerÐÅÏ¢,ÒѾ­¼à²âpartitionleader´æ»îÐÔ.

3) Consumer¶ËʹÓÃzookeeperÓÃÀ´×¢²áconsumerÐÅÏ¢,ÆäÖаüÀ¨consumerÏû·ÑµÄpartitionÁбíµÈ,ͬʱҲÓÃÀ´·¢ÏÖbrokerÁбí,²¢ºÍpartition leader½¨Á¢socketÁ¬½Ó,²¢»ñÈ¡ÏûÏ¢.

ËÄ¡¢Ö÷ÒªÅäÖÃ

1¡¢BrokerÅäÖÃ

 

2.ConsumerÖ÷ÒªÅäÖÃ

3.ProducerÖ÷ÒªÅäÖÃ

ÒÔÉÏÊǹØÓÚkafkaһЩ»ù´¡ËµÃ÷£¬ÔÚÆäÖÐÎÒÃÇÖªµÀÈç¹ûÒªkafkaÕý³£ÔËÐУ¬±ØÐëÅäÖÃzookeeper£¬·ñÔòÎÞÂÛÊÇkafka¼¯Èº»¹Êǿͻ§¶ËµÄÉú´æÕߺÍÏû·ÑÕß¶¼ÎÞ·¨Õý³£µÄ¹¤×÷µÄ£¬ÒÔÏÂÊǶÔzookeeper½øÐÐһЩ¼òµ¥µÄ½éÉÜ£º

Îå¡¢zookeeper¼¯Èº

zookeeperÊÇÒ»¸öΪ·Ö²¼Ê½Ó¦ÓÃÌṩһÖÂÐÔ·þÎñµÄÈí¼þ£¬ËüÊÇ¿ªÔ´µÄHadoopÏîÄ¿µÄÒ»¸ö×ÓÏîÄ¿£¬²¢¸ù¾Ýgoogle·¢±íµÄһƪÂÛÎÄÀ´ÊµÏֵġ£zookeeperΪ·Ö²¼Ê½ÏµÍ³ÌṩÁ˸ßЦÇÒÒ×ÓÚʹÓõÄЭͬ·þÎñ£¬Ëü¿ÉÒÔΪ·Ö²¼Ê½Ó¦ÓÃÌṩÏ൱¶àµÄ·þÎñ£¬ÖîÈçͳһÃüÃû·þÎñ£¬ÅäÖùÜÀí£¬×´Ì¬Í¬²½ºÍ×é·þÎñµÈ¡£zookeeper½Ó¿Ú¼òµ¥£¬ÎÒÃDz»±Ø¹ý¶àµØ¾À½áÔÚ·Ö²¼Ê½ÏµÍ³±à³ÌÄÑÓÚ´¦ÀíµÄͬ²½ºÍÒ»ÖÂÐÔÎÊÌâÉÏ£¬Äã¿ÉÒÔʹÓÃzookeeperÌṩµÄÏÖ³É(off-the-shelf)·þÎñÀ´ÊµÏÖÀ´ÊµÏÖ·Ö²¼Ê½ÏµÍ³¶îÅäÖùÜÀí£¬×é¹ÜÀí£¬LeaderÑ¡¾ÙµÈ¹¦ÄÜ¡£

zookeeper¼¯ÈºµÄ°²×°,×¼±¸Èý̨·þÎñÆ÷server1:192.168.0.1,server2:192.168.0.2,

server3:192.168.0.3.

1)ÏÂÔØzookeeper

µ½http://zookeeper.apache.org/releases.htmlÈ¥ÏÂÔØ×îа汾Zookeeper-3.4.5µÄ°²×°°üzookeeper-3.4.5.tar.gz.½«Îļþ±£´æserver1µÄ~Ŀ¼ÏÂ

2)°²×°zookeeper

ÏÈÔÚ·þÎñÆ÷server·Ö±ðÖ´ÐÐa-c²½Öè

a)½âѹ

tar -zxvf zookeeper-3.4.5.tar.gz

½âѹÍê³ÉºóÔÚĿ¼~Ï»ᷢÏÖ¶à³öÒ»¸öĿ¼zookeeper-3.4.5,ÖØÐÂÃüÁîΪzookeeper

b£©ÅäÖÃ

½«conf/zoo_sample.cfg¿½±´Ò»·ÝÃüÃûΪzoo.cfg£¬Ò²·ÅÔÚconfĿ¼Ï¡£È»ºó°´ÕÕÈçÏÂÖµÐÞ¸ÄÆäÖеÄÅäÖãº

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/wwb/zookeeper /data
dataLogDir=/home/wwb/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#http://zookeeper.apache.org/doc/ ...

html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=192.168.0.1:3888:4888
server.2=192.168.0.2:3888:4888
server.3=192.168.0.3:3888:4888

tickTime£ºÕâ¸öʱ¼äÊÇ×÷Ϊ Zookeeper ·þÎñÆ÷Ö®¼ä»ò¿Í»§¶ËÓë·þÎñÆ÷Ö®¼äά³ÖÐÄÌøµÄʱ¼ä¼ä¸ô£¬Ò²¾ÍÊÇÿ¸ö tickTime ʱ¼ä¾Í»á·¢ËÍÒ»¸öÐÄÌø¡£

dataDir£º¹ËÃû˼Òå¾ÍÊÇ Zookeeper ±£´æÊý¾ÝµÄĿ¼£¬Ä¬ÈÏÇé¿öÏ£¬Zookeeper ½«Ð´Êý¾ÝµÄÈÕÖ¾ÎļþÒ²±£´æÔÚÕâ¸öĿ¼Àï¡£

clientPort£ºÕâ¸ö¶Ë¿Ú¾ÍÊǿͻ§¶ËÁ¬½Ó Zookeeper ·þÎñÆ÷µÄ¶Ë¿Ú£¬Zookeeper »á¼àÌýÕâ¸ö¶Ë¿Ú£¬½ÓÊܿͻ§¶ËµÄ·ÃÎÊÇëÇó¡£

initLimit£ºÕâ¸öÅäÖÃÏîÊÇÓÃÀ´ÅäÖà Zookeeper ½ÓÊܿͻ§¶Ë£¨ÕâÀïËù˵µÄ¿Í»§¶Ë²»ÊÇÓû§Á¬½Ó Zookeeper ·þÎñÆ÷µÄ¿Í»§¶Ë£¬¶øÊÇ Zookeeper ·þÎñÆ÷¼¯ÈºÖÐÁ¬½Óµ½ Leader µÄ Follower ·þÎñÆ÷£©³õʼ»¯Á¬½Óʱ×ÄÜÈÌÊܶàÉÙ¸öÐÄÌøÊ±¼ä¼ä¸ôÊý¡£µ±ÒѾ­³¬¹ý 5¸öÐÄÌøµÄʱ¼ä£¨Ò²¾ÍÊÇ tickTime£©³¤¶Èºó Zookeeper ·þÎñÆ÷»¹Ã»ÓÐÊÕµ½¿Í»§¶ËµÄ·µ»ØÐÅÏ¢£¬ÄÇô±íÃ÷Õâ¸ö¿Í»§¶ËÁ¬½Óʧ°Ü¡£×ܵÄʱ¼ä³¤¶È¾ÍÊÇ 5*2000=10 Ãë

syncLimit£ºÕâ¸öÅäÖÃÏî±êʶ Leader ÓëFollower Ö®¼ä·¢ËÍÏûÏ¢£¬ÇëÇóºÍÓ¦´ðʱ¼ä³¤¶È£¬×²»Äܳ¬¹ý¶àÉÙ¸ö tickTime µÄʱ¼ä³¤¶È£¬×ܵÄʱ¼ä³¤¶È¾ÍÊÇ2*2000=4 Ãë

server.A=B£ºC£ºD£ºÆäÖÐ A ÊÇÒ»¸öÊý×Ö£¬±íʾÕâ¸öÊǵڼ¸ºÅ·þÎñÆ÷£»B ÊÇÕâ¸ö·þÎñÆ÷µÄ ip µØÖ·£»C ±íʾµÄÊÇÕâ¸ö·þÎñÆ÷Ó뼯ȺÖÐµÄ Leader ·þÎñÆ÷½»»»ÐÅÏ¢µÄ¶Ë¿Ú£»D ±íʾµÄÊÇÍòÒ»¼¯ÈºÖÐµÄ Leader ·þÎñÆ÷¹ÒÁË£¬ÐèÒªÒ»¸ö¶Ë¿ÚÀ´ÖØÐ½øÐÐÑ¡¾Ù£¬Ñ¡³öÒ»¸öÐ嵀 Leader£¬¶øÕâ¸ö¶Ë¿Ú¾ÍÊÇÓÃÀ´Ö´ÐÐÑ¡¾Ùʱ·þÎñÆ÷Ï໥ͨÐŵĶ˿ڡ£Èç¹ûÊÇα¼¯ÈºµÄÅäÖ÷½Ê½£¬ÓÉÓÚ B ¶¼ÊÇÒ»Ñù£¬ËùÒÔ²»Í¬µÄ Zookeeper ʵÀýͨÐŶ˿ںŲ»ÄÜÒ»Ñù£¬ËùÒÔÒª¸øËüÃÇ·ÖÅ䲻ͬµÄ¶Ë¿ÚºÅ

×¢Òâ:dataDir,dataLogDirÖеÄwwbÊǵ±Ç°µÇ¼Óû§Ãû£¬data£¬logsĿ¼¿ªÊ¼ÊDz»´æÔÚ£¬ÐèҪʹÓÃmkdirÃüÁî´´½¨ÏàÓ¦µÄĿ¼¡£²¢ÇÒÔÚ¸ÃĿ¼Ï´´½¨Îļþmyid,serve1,server2,server3¸ÃÎļþÄÚÈÝ·Ö±ðΪ1,2,3¡£

Õë¶Ô·þÎñÆ÷server2,server3¿ÉÒÔ½«server1¸´ÖƵ½ÏàÓ¦µÄĿ¼£¬²»¹ýÐèҪעÒâdataDir,dataLogDirĿ¼,²¢ÇÒÎļþmyidÄÚÈÝ·Ö±ðΪ2,3

3)ÒÀ´ÎÆô¶¯server1£¬server2,server3µÄzookeeper.

/home/wwb/zookeeper/bin/zkServer.sh start,³öÏÖÀàËÆÒÔÏÂÄÚÈÝ

/home/wwb/zookeeper/bin/zkServer.sh start,³öÏÖÀàËÆÒÔÏÂÄÚÈÝ
JMX enabled by default
Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

4) ²âÊÔzookeeperÊÇ·ñÕý³£¹¤×÷£¬ÔÚserver1ÉÏÖ´ÐÐÒÔÏÂÃüÁî

/home/wwb/zookeeper/bin/zkCli.sh -server192.

168.0.2:2181,³öÏÖÀàËÆÒÔÏÂÄÚÈÝ
JLine support is enabled
2013-11-27 19:59:40,560 - INFO [main-SendThread(localhost.localdomain:2181):

ClientCnxn$SendThread@736]- Session

establishmentcomplete on server localhost.localdomain/127.0.0.1:2181, sessionid = 0x1429cdb49220000, negotiatedtimeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0]

[root@localhostzookeeper2]#

¼´´ú±í¼¯Èº¹¹½¨³É¹¦ÁË,Èç¹û³öÏÖ´íÎóÄÇÓ¦¸ÃÊǵÚÈý²¿Ê±Ã»ÓÐÆô¶¯ºÃ¼¯Èº£¬

ÔËÐУ¬ÏÈÀûÓÃ

ps aux | grep zookeeper²é¿´ÊÇ·ñÓÐÏàÓ¦µÄ½ø³ÌµÄ£¬Ã»Óл°£¬ËµÃ÷¼¯ÈºÆô¶¯³öÏÖÎÊÌ⣬¿ÉÒÔÔÚÿ¸ö·þÎñÆ÷ÉÏʹÓÃ

./home/wwb/zookeeper/bin/zkServer.sh stop¡£ÔÙÒÀ´ÎʹÓÃ./home/wwb/zookeeper/binzkServer.sh start£¬ÕâʱÔÚÖ´ÐÐ4Ò»°ãÊÇûÓÐÎÊÌ⣬Èç¹û»¹ÊÇÓÐÎÊÌ⣬ÄÇôÏÈstopÔÙµ½binµÄÉϼ¶Ä¿Â¼Ö´ÐÐ./bin/zkServer.shstartÊÔÊÔ¡£

×¢Ò⣺zookeeper¼¯ÈºÊ±£¬zookeeperÒªÇó°ëÊýÒÔÉϵĻúÆ÷¿ÉÓã¬zookeeper²ÅÄÜÌṩ·þÎñ¡£

Áù¡¢kafka¼¯Èº

(ÀûÓÃÉÏÃæserver1,server2,server3,ÏÂÃæÒÔserver1ΪʵÀý)

1)ÏÂÔØkafka0.8(http://kafka.apache.org/downloads.html),±£´æµ½·þÎñÆ÷/home/wwbĿ¼ÏÂkafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)

2)½âѹ tar -zxvf kafka-0.8.0-beta1-src.tgz,²úÉúÎļþ¼Ðkafka-0.8.0-beta1-src¸ü¸ÄΪkafka01

3)ÅäÖÃ

ÐÞ¸Äkafka01/config/server.properties,ÆäÖÐbroker.id,log.dirs,zookeeper.connect±ØÐë¸ù¾Ýʵ¼ÊÇé¿ö½øÐÐÐ޸쬯äËûÏî¸ù¾ÝÐèÒª×ÔÐÐÕå×ᣴóÖÂÈçÏ£º

broker.id=1
port=9091
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
num.replica.fetchers=2
log.cleanup.interval.mins=10
zookeeper.connect=192.168.0.1:2181,192.

168.0.2:2182,192.168.0.3:2183
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.

KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false

4£©³õʼ»¯ÒòΪkafkaÓÃscalaÓïÑÔ±àд£¬Òò´ËÔËÐÐk

> cd kafka01
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency

afkaÐèÒªÊ×ÏÈ×¼±¸scalaÏà¹Ø»·¾³¡£

ÔÚµÚ¶þ¸öÃüÁîʱ¿ÉÄÜÐèÒªÒ»¶¨Ê±¼ä£¬ÓÉÓÚÒªÏÂÔØ¸üÐÂһЩÒÀÀµ°ü¡£ËùÒÔÇë´ó¼Ò ÄÍÐĵ㡣

5) Æô¶¯kafka01

>JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties &

a)kafka02²Ù×÷²½ÖèÓëkafka01À×ͬ£¬²»Í¬µÄµØ·½ÈçÏÂ

ÐÞ¸Äkafka02/config/server.properties
broker.id=2
port=9092
##ÆäËûÅäÖúÍkafka-0±£³ÖÒ»ÖÂ
Æô¶¯kafka02
JMX_PORT=9998 bin/kafka-server-start.

shconfig/server.properties &

b)kafka03²Ù×÷²½ÖèÓëkafka01À×ͬ£¬²»Í¬µÄµØ·½ÈçÏÂ

ÐÞ¸Äkafka03/config/server.properties
broker.id=3
port=9093
##ÆäËûÅäÖúÍkafka-0±£³ÖÒ»ÖÂ
Æô¶¯kafka02
JMX_PORT=9999 bin/kafka-server-start.

shconfig/server.properties &

6)´´½¨Topic(°üº¬Ò»¸ö·ÖÇø£¬Èý¸ö¸±±¾)

>bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic

7)²é¿´topicÇé¿ö

>bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181
topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0

8)´´½¨·¢ËÍÕß

>bin/kafka-console-producer.sh--broker-list

192.168.0.1:9091 --topic my-replicated-topic
my test message1
my test message2
^C

9)´´½¨Ïû·ÑÕß

>bin/kafka-console-consumer.sh --zookeeper

127.0.0.1:2181 --from-beginning --topic my-replicated-topic
...
my test message1
my test message2
^C

10)ɱµôserver1ÉϵÄbroker

>pkill -9 -f config/server.properties

11)²é¿´topic

>bin/kafka-list-top.sh --zookeeper192.168.0.1:2181
topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0

11£©´´½¨Ïû·ÑÕߣ¬¿´ÊÇ·ñÄܲéѯµ½ÏûÏ¢

>bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

˵Ã÷Ò»Çж¼ÊÇÕý³£µÄ¡£

OK,ÒÔÉϾÍÊǶÔKafka¸öÈ˵ÄÀí½â£¬²»¶ÔÖ®´¦Çë´ó¼Ò¼°Ê±Ö¸³ö¡£

²¹³ä˵Ã÷£º

1¡¢public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams (Map <String, Integer> topicCountMap)£¬ÆäÖи÷½·¨µÄ²ÎÊýMapµÄkeyΪtopicÃû³Æ£¬valueΪtopic¶ÔÓ¦µÄ·ÖÇøÊý£¬Æ©Èç˵Èç¹ûÔÚkafkaÖв»´æÔÚÏàÓ¦µÄtopicʱ£¬Ôò»á´´½¨Ò»¸ötopic£¬·ÖÇøÊýΪvalue£¬Èç¹û´æÔڵϰ£¬¸Ã´¦µÄvalueÔò²»Æðʲô×÷ÓÃ

2¡¢¹ØÓÚÉú²úÕßÏòÖ¸¶¨µÄ·ÖÇø·¢ËÍÊý¾Ý£¬Í¨¹ýÉèÖÃpartitioner.classµÄÊôÐÔÀ´Ö¸¶¨ÏòÄǸö·ÖÇø·¢ËÍÊý¾Ý£¬Èç¹û×Ô¼ºÖ¸¶¨±ØÐë±àдÏàÓ¦µÄ³ÌÐò£¬Ä¬ÈÏÊÇkafka.producer.DefaultPartitioner,·ÖÇø³ÌÐòÊÇ»ùÓÚÉ¢Áеļü¡£

3¡¢ÔÚ¶à¸öÏû·ÑÕß¶Áȡͬһ¸ötopicµÄÊý¾Ý£¬ÎªÁ˱£Ö¤Ã¿¸öÏû·ÑÕß¶ÁÈ¡Êý¾ÝµÄΨһÐÔ£¬±ØÐ뽫ÕâЩÏû·ÑÕßgroup_id¶¨ÒåΪͬһ¸öÖµ£¬ÕâÑù¾Í¹¹½¨ÁËÒ»¸öÀàËÆ¶ÓÁеÄÊý¾Ý½á¹¹£¬Èç¹û¶¨Ò岻ͬ£¬ÔòÀàËÆÒ»Öֹ㲥½á¹¹µÄ¡£

4¡¢ÔÚconsumerapiÖУ¬²ÎÊýÉè¼Æµ½Êý×Ö²¿·Ö£¬ÀàËÆMap<String,Integer>,

numStream,Ö¸µÄ¶¼ÊÇÔÚtopic²»´æÔÚµÄʱ£¬»á´´½¨Ò»¸ötopic£¬²¢ÇÒ·ÖÇø¸öÊýΪInteger,numStream,×¢ÒâÈç¹ûÊý×Ö´óÓÚbrokerµÄÅäÖÃÖÐnum.partitionsÊôÐÔ£¬»áÒÔnum.partitionsΪÒÀ¾Ý´´½¨·ÖÇø¸öÊýµÄ¡£

5¡¢producerapi£¬µ÷ÓÃsendʱ£¬Èç¹û²»´æÔÚtopic£¬Ò²»á´´½¨topic£¬Ôڸ÷½·¨ÖÐûÓÐÌṩ·ÖÇø¸öÊýµÄ²ÎÊý£¬ÔÚÕâÀï·ÖÇø¸öÊýÊÇÓÉ·þÎñ¶ËbrokerµÄÅäÖÃÖÐnum.partitionsÊôÐÔ¾ö¶¨µÄ

   
3284 ´Îä¯ÀÀ       31
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ