±à¼ÍƼö: |
±¾ÎĽ²ÊöʲôÊÇÏûϢϵͳ£¬Ê²Ã´ÊÇKafka£¿Kafka¼Ü¹¹£¬´´½¨topic£¬Æô¶¯Ò»¸öÏû·ÑÕߣ¬Ï£Íû¶ÔÄúÓÐËù°ïÖú
±¾ÎÄÀ´×ÔÓÚÌÚÑ¶ÔÆ£¬ÓÉ»ðÁú¹ûÈí¼þDelores±à¼¡¢ÍƼö¡£ |
|
1¡¢Ê²Ã´ÊÇÏûϢϵͳ£¿
ÏûϢϵͳ¸ºÔð½«Êý¾Ý´ÓÒ»¸öÓ¦ÓóÌÐò´«Êäµ½ÁíÒ»¸öÓ¦ÓóÌÐò£¬Òò´ËÓ¦ÓóÌÐò¿ÉÒÔרעÓÚÊý¾Ý£¬µ«²»µ£ÐÄÈçºÎ¹²ÏíËü¡£ ·Ö²¼Ê½ÏûÏ¢´«µÝ»ùÓÚ¿É¿¿ÏûÏ¢¶ÓÁеĸÅÄî¡£ ÏûÏ¢ÔÚ¿Í»§¶ËÓ¦ÓóÌÐòºÍÏûÏ¢´«µÝϵͳ֮¼äÒì²½ÅŶӡ£ ÓÐÁ½ÖÖÀàÐ͵ÄÏûϢģʽ¿ÉÓà - Ò»ÖÖÊǵã¶Ôµã£¬ÁíÒ»ÖÖÊÇ·¢²¼ - ¶©ÔÄ(pub-sub)ÏûϢϵͳ¡£ ´ó¶àÊýÏûϢģʽ×ñÑ pub-sub ¡£
£¨1£©µã¶ÔµãÏûϢϵͳ
ÔÚµã¶ÔµãϵͳÖУ¬ÏûÏ¢±»±£ÁôÔÚ¶ÓÁÐÖС£ Ò»¸ö»ò¶à¸öÏû·ÑÕß¿ÉÒÔÏûºÄ¶ÓÁÐÖеÄÏûÏ¢£¬µ«ÊÇÌØ¶¨ÏûÏ¢Ö»ÄÜÓÉ×î¶àÒ»¸öÏû·ÑÕßÏû·Ñ¡£ Ò»µ©Ïû·ÑÕß¶ÁÈ¡¶ÓÁÐÖеÄÏûÏ¢£¬Ëü¾Í´Ó¸Ã¶ÓÁÐÖÐÏûʧ¡£ ¸ÃϵͳµÄµäÐÍʾÀýÊǶ©µ¥´¦Àíϵͳ£¬ÆäÖÐÿ¸ö¶©µ¥½«ÓÉÒ»¸ö¶©µ¥´¦ÀíÆ÷´¦Àí£¬µ«¶à¸ö¶©µ¥´¦ÀíÆ÷Ò²¿ÉÒÔͬʱ¹¤×÷¡£ ÏÂͼÃèÊöÁ˽ṹ¡£

£¨2£©·¢²¼ - ¶©ÔÄÏûϢϵͳ
ÔÚ·¢²¼ - ¶©ÔÄϵͳÖУ¬ÏûÏ¢±»±£ÁôÔÚÖ÷ÌâÖС£ Óëµã¶Ôµãϵͳ²»Í¬£¬Ïû·ÑÕß¿ÉÒÔ¶©ÔÄÒ»¸ö»ò¶à¸öÖ÷ÌⲢʹÓøÃÖ÷ÌâÖеÄËùÓÐÏûÏ¢¡£ ÔÚ·¢²¼ - ¶©ÔÄϵͳÖУ¬ÏûÏ¢Éú²úÕß³ÆÎª·¢²¼Õߣ¬ÏûϢʹÓÃÕß³ÆÎª¶©ÔÄÕß¡£ Ò»¸öÏÖʵÉú»îµÄÀý×ÓÊÇDishµçÊÓ£¬Ëü·¢²¼²»Í¬µÄÇþµÀ£¬ÈçÔ˶¯£¬µçÓ°£¬ÒôÀֵȣ¬ÈκÎÈ˶¼¿ÉÒÔ¶©ÔÄ×Ô¼ºµÄƵµÀ¼¯£¬²¢»ñµÃËûÃǶ©ÔĵįµµÀʱ¿ÉÓá£

2¡¢Ê²Ã´ÊÇKafka£¿
Apache KafkaÊÇÒ»¸ö·Ö²¼Ê½·¢²¼ - ¶©ÔÄÏûϢϵͳºÍÒ»¸öÇ¿´óµÄ¶ÓÁУ¬¿ÉÒÔ´¦Àí´óÁ¿µÄÊý¾Ý£¬²¢Ê¹ÄúÄܹ»½«ÏûÏ¢´ÓÒ»¸ö¶Ëµã´«µÝµ½ÁíÒ»¸ö¶Ëµã¡£ KafkaÊʺÏÀëÏߺÍÔÚÏßÏûÏ¢Ïû·Ñ¡£ KafkaÏûÏ¢±£ÁôÔÚ´ÅÅÌÉÏ£¬²¢ÔÚȺ¼¯ÄÚ¸´ÖÆÒÔ·ÀÖ¹Êý¾Ý¶ªÊ§¡£ Kafka¹¹½¨ÔÚZooKeeperͬ²½·þÎñÖ®ÉÏ¡£ ËüÓëApache StormºÍSpark·Ç³£ºÃµØ¼¯³É£¬ÓÃÓÚʵʱÁ÷ʽÊý¾Ý·ÖÎö¡£
KafkaרΪ·Ö²¼Ê½¸ßÍÌÍÂÁ¿ÏµÍ³¶øÉè¼Æ¡£ ÓëÆäËûÏûÏ¢´«µÝϵͳÏà±È£¬Kafka¾ßÓиüºÃµÄÍÌÍÂÁ¿£¬ÄÚÖ÷ÖÇø£¬¸´Öƺ͹ÌÓеÄÈÝ´íÄÜÁ¦£¬ÕâʹµÃËü·Ç³£Êʺϴó¹æÄ£ÏûÏ¢´¦ÀíÓ¦ÓóÌÐò¡£
Kafka¿ÉÒÔÔÚÐí¶àÓÃÀýÖÐʹÓ㬠ÆäÖÐһЩÁгöÈçÏ£º
Ö¸±ê - Kafkaͨ³£ÓÃÓÚ²Ù×÷¼à¿ØÊý¾Ý¡£ ÕâÉæ¼°¾ÛºÏÀ´×Ô·Ö²¼Ê½Ó¦ÓóÌÐòµÄͳ¼ÆÐÅÏ¢£¬ÒÔ²úÉú²Ù×÷Êý¾ÝµÄ¼¯ÖÐÀ¡ËÍ¡£
ÈÕÖ¾¾ÛºÏ½â¾ö·½°¸ - Kafka¿ÉÓÃÓÚ¿ç×éÖ¯´Ó¶à¸ö·þÎñÊÕ¼¯ÈÕÖ¾£¬²¢Ê¹ËüÃÇÒÔ±ê×¼¸ñʽÌṩ¸ø¶à¸ö·þÎñÆ÷¡£
Á÷´¦Àí - Á÷ÐеĿò¼Ü(ÈçStormºÍSpark Streaming)´ÓÖ÷ÌâÖжÁÈ¡Êý¾Ý£¬¶ÔÆä½øÐд¦Àí£¬²¢½«´¦ÀíºóµÄÊý¾ÝдÈëÐÂÖ÷Ì⣬¹©Óû§ºÍÓ¦ÓóÌÐòʹÓᣠKafkaµÄÇ¿Ä;ÃÐÔÔÚÁ÷´¦ÀíµÄÉÏÏÂÎÄÖÐÒ²·Ç³£ÓÐÓá£
3¡¢Kafka¼Ü¹¹
ÉîÈëѧϰKafka֮ǰ£¬±ØÐëÁ˽âÖ÷Ì⣨Topic£©¡¢¾¼ÍÈË£¨Broker£©¡¢Éú²úÕߣ¨Producer£©»òÕß·¢²¼Õߣ¬ÒÔ¼°Ïû·ÑÕߣ¨Consumer£©»òÕß¶©ÔÄÕßµÈÖ÷ÒªÊõÓï¡£ ÏÂͼ˵Ã÷ÁËÖ÷ÒªÊõÓ±í¸ñÏêϸÃèÊöÁËͼ±í×é¼þ¡£



£¨1£©Topics£¨Ö÷Ì⣩
ÊôÓÚÌØ¶¨Àà±ðµÄÏûÏ¢Á÷³ÆÎªÖ÷Ìâ¡£ Êý¾Ý´æ´¢ÔÚÖ÷ÌâÖС£TopicÏ൱ÓÚQueue¡£
Ö÷Ìâ±»²ð·Ö³É·ÖÇø¡£ ÿ¸öÕâÑùµÄ·ÖÇø°üº¬²»¿É±äÓÐÐòÐòÁеÄÏûÏ¢¡£ ·ÖÇø±»ÊµÏÖΪ¾ßÓÐÏàµÈ´óСµÄÒ»×é·Ö¶ÎÎļþ¡£
£¨2£©Partition£¨·ÖÇø£©

Ò»¸öTopic¿ÉÒԷֳɶà¸öPartition£¬ÕâÊÇΪÁËÆ½Ðл¯´¦Àí¡£
ÿ¸öPartitionÄÚ²¿ÏûÏ¢ÓÐÐò£¬ÆäÖÐÿ¸öÏûÏ¢¶¼ÓÐÒ»¸öoffsetÐòºÅ¡£
Ò»¸öPartitionÖ»¶ÔÓ¦Ò»¸öBroker£¬Ò»¸öBroker¿ÉÒÔ¹ÜÀí¶à¸öPartition¡£
£¨3£©Partition offset£¨·ÖÇøÆ«ÒÆ£©
ÿ¸ö·ÖÇøÏûÏ¢¾ßÓгÆÎª offset µÄΨһÐòÁбêʶ¡£
£¨4£©Replicas of partition£¨·ÖÇø±¸·Ý£©
¸±±¾Ö»ÊÇÒ»¸ö·ÖÇøµÄ±¸·Ý¡£ ¸±±¾´Ó²»¶ÁÈ¡»òдÈëÊý¾Ý¡£ ËüÃÇÓÃÓÚ·ÀÖ¹Êý¾Ý¶ªÊ§¡£
£¨5£©Brokers£¨¾¼ÍÈË£©
´úÀíÊǸºÔðά»¤·¢²¼Êý¾ÝµÄ¼òµ¥ÏµÍ³¡£ ÿ¸ö´úÀí¿ÉÒÔÿ¸öÖ÷Ìâ¾ßÓÐÁã¸ö»ò¶à¸ö·ÖÇø¡£ ¼ÙÉ裬Èç¹ûÔÚÒ»¸öÖ÷ÌâºÍN¸ö´úÀíÖÐÓÐN¸ö·ÖÇø£¬Ã¿¸ö´úÀí½«ÓÐÒ»¸ö·ÖÇø¡£
¼ÙÉèÔÚÒ»¸öÖ÷ÌâÖÐÓÐN¸ö·ÖÇø²¢ÇÒ¶àÓÚN¸ö´úÀí(n + m)£¬ÔòµÚÒ»¸öN´úÀí½«¾ßÓÐÒ»¸ö·ÖÇø£¬²¢ÇÒÏÂÒ»¸öM´úÀí½«²»¾ßÓÐÓÃÓÚ¸ÃÌØ¶¨Ö÷ÌâµÄÈκηÖÇø¡£
¼ÙÉèÔÚÒ»¸öÖ÷ÌâÖÐÓÐN¸ö·ÖÇø²¢ÇÒСÓÚN¸ö´úÀí(n-m)£¬Ã¿¸ö´úÀí½«ÔÚËüÃÇÖ®¼ä¾ßÓÐÒ»¸ö»ò¶à¸ö·ÖÇø¹²Ïí¡£ ÓÉÓÚ´úÀíÖ®¼äµÄ¸ºÔØ·Ö²¼²»ÏàµÈ£¬²»ÍƼöʹÓô˷½°¸¡£
£¨6£©Kafka Cluster£¨Kafka¼¯Èº£©
KafkaÓжà¸ö´úÀí±»³ÆÎªKafka¼¯Èº¡£ ¿ÉÒÔÀ©Õ¹Kafka¼¯Èº£¬ÎÞÐèÍ£»ú¡£ ÕâЩ¼¯ÈºÓÃÓÚ¹ÜÀíÏûÏ¢Êý¾ÝµÄ³Ö¾ÃÐԺ͸´ÖÆ¡£
£¨7£©Producers£¨Éú²úÕߣ©
Éú²úÕßÊÇ·¢Ë͸øÒ»¸ö»ò¶à¸öKafkaÖ÷ÌâµÄÏûÏ¢µÄ·¢²¼Õß¡£ Éú²úÕßÏòKafka¾¼ÍÈË·¢ËÍÊý¾Ý¡£ ÿµ±Éú²úÕß½«ÏûÏ¢·¢²¼¸ø´úÀíʱ£¬´úÀíÖ»Ð轫ÏûÏ¢¸½¼Óµ½×îºóÒ»¸ö¶ÎÎļþ¡£Êµ¼ÊÉÏ£¬¸ÃÏûÏ¢½«±»¸½¼Óµ½·ÖÇø¡£ Éú²úÕß»¹¿ÉÒÔÏòËûÃÇÑ¡ÔñµÄ·ÖÇø·¢ËÍÏûÏ¢¡£
£¨8£©Consumers£¨Ïû·ÑÕߣ©
Consumers´Ó¾¼ÍÈË´¦¶ÁÈ¡Êý¾Ý¡£ Ïû·ÑÕß¶©ÔÄÒ»¸ö»ò¶à¸öÖ÷Ì⣬²¢Í¨¹ý´Ó´úÀíÖÐÌáÈ¡Êý¾ÝÀ´Ê¹ÓÃÒÑ·¢²¼µÄÏûÏ¢¡£
Consumer×Ô¼ºÎ¬»¤Ïû·Ñµ½Äĸöoffet
ÿ¸öConsumer¶¼ÓжÔÓ¦µÄgroup
groupÄÚÊÇqueueÏû·ÑÄ£ÐÍ£º¸÷¸öConsumerÏû·Ñ²»Í¬µÄpartition£¬Òò´ËÒ»¸öÏûÏ¢ÔÚgroupÄÚÖ»Ïû·ÑÒ»´Î
group¼äÊÇpublish-subscribeÏû·ÑÄ£ÐÍ£º¸÷¸ögroup¸÷×Ô¶ÀÁ¢Ïû·Ñ£¬»¥²»Ó°Ï죬Òò´ËÒ»¸öÏûÏ¢±»Ã¿¸ögroupÏû·ÑÒ»´Î¡£
4¡¢´´½¨topic
´´½¨Ò»¸ö½Ð×ö¡°test¡±µÄtopic£¬ËüÖ»ÓÐÒ»¸ö·ÖÇø£¬Ò»¸ö¸±±¾¡£
[root@node1
kafka_2.11-0.11.0.1]# bin/kafka-topics.sh --create
--zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic test
Created topic "test".
[2017-10-29 07:44:33,497] INFO [ReplicaFetcherManager
on broker 1] Removed fetcher for partitions test-0
(kafka.server.ReplicaFetcherManager)
[2017-10-29 07:44:33,602] INFO Loading producer
state from offset 0 for partition test-0 with
message format version 2 (kafka.log.Log)
[2017-10-29 07:44:33,618] INFO Completed load
of log test-0 with 1 log segments, log start offset
0 and log end offset 0 in 66 ms (kafka.log.Log)
[2017-10-29 07:44:33,658] INFO Created log for
partition [test,0] in /var/log/kafka-logs with
properties {compression.type -> producer, message.format.version
-> 0.11.0-IV2, file.delete.delay.ms -> 60000,
max.message.bytes -> 1000012, min.compaction.lag.ms
-> 0, message.timestamp.type -> CreateTime,
min.insync.replicas -> 1, segment.jitter.ms
-> 0, preallocate -> false, min.cleanable. dirty.ratio
-> 0.5, index.interval. bytes -> 4096, unclean.leader. election.enable
-> false, retention. bytes -> -1, delete.retention.ms
-> 86400000, cleanup.policy -> [delete],
flush.ms -> 9223372036854775807, segment.ms
-> 604800000, segment.bytes -> 1073741824,
retention.ms -> 604800000, message.timestamp.difference.max.ms
-> 9223372036854775807, segment.index. bytes
-> 10485760, flush.messages -> 9223372036854775807}.
(kafka.log.LogManager)
[2017-10-29 07:44:33,660] INFO Partition [test,0]
on broker 1: No checkpointed highwatermark is
found for partition test-0 (kafka.cluster.Partition)
[2017-10-29 07:44:33,665] INFO Replica loaded
for partition test-0 with initial high watermark
0 (kafka.cluster.Replica)
[2017-10-29 07:44:33,667] INFO Partition [test,0]
on broker 1: test-0 starts at Leader Epoch 0 from
offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[root@node1 kafka_2.11-0.11.0.1]# |
¿ÉÒÔͨ¹ýlistÃüÁî²é¿´´´½¨µÄtopic
[root@node1
kafka_2.11-0.11.0.1] # bin/kafka-topics.sh --list
--zookeeper localhost:2181
test
[root@node1 kafka_2.11-0.11.0.1]# |
5¡¢·¢ËÍÏûÏ¢
[root@node1
kafka_2.11-0.11.0.1] # bin/kafka-console-producer.sh
--broker-list localhost:9092 --topic test
>This is a message
>[2017-10-29 07:47:28,399] INFO Updated PartitionLeaderEpoch.
New: {epoch:0, offset:0}, Current: {epoch:-1,
offset-1} for Partition: test-0. Cache now contains
0 entries. (kafka.server.epoch.LeaderEpochFileCache)
This is another message
>^C[root@node1 kafka_2.11-0.11.0.1]# |
6¡¢Æô¶¯Ò»¸öÏû·ÑÕß
Kafka»¹ÓиöÏû·ÑÕß¿ØÖÆÌ¨£¬»á°ÑÏûÏ¢Êä³öµ½±ê×¼Êä³ö
[root@node2 kafka_2.11-0.11.0.1]
# bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092 --topic test
--from-beginning
[2017-10-29 07:49:32,094] INFO Topic creation
{"version":1,"partitions": {"45":[1],"34":[2],"12":[1],"8":[3], "19":[2],"23":[3],"4":[2],"40":[2],"15": [1],"11":[3],"9":[1],"44":[3],"33":[1], "22":[2],"26":[3],"37":[2],"13":[2],"46": [2],"24":[1],"35":[3],"16":[2],"5":[3], "10":[2],"48":[1],"21":[1],"43":[2],"32": [3],"49":[2],"6":[1],"36":[1],"1":[2],"39": [1],"17":[3],"25":[2],"14":[3], "47":[3],"31": [2],"42":[1],"0":[1],"20":[3],"27": [1],"2":[3], "38":[3],"18":[1],"30":[1],"7":[2], "29":[3],"41": [3],"3":[1],"28":[2]}}
(kafka.admin.AdminUtils$)
[2017-10-29 07:49:32,121] INFO [KafkaApi-2] Auto
creation of topic __consumer_offsets with 50 partitions
and replication factor 1 is successful (kafka.server.KafkaApis)
[2017-10-29 07:49:36,792] INFO [ReplicaFetcherManager
on broker 2] Removed fetcher for partitions __consumer_offsets-22, __consumer_offsets-4,__consumer_offsets-7, __consumer_offsets-46,__consumer_offsets-25 ,__consumer_offsets-49,__consumer_offsets-16, __consumer_offsets-28,__consumer_offsets-31, __consumer_offsets-37,__consumer_offsets-19, __consumer_offsets-13,__consumer_offsets-43, __consumer_offsets-1,__consumer_offsets-34, __consumer_offsets-10,__consumer_offsets-40
(kafka.server.ReplicaFetcherManager)
[2017-10-29 07:49:36,919] INFO Loading producer
state from offset 0 for partition __consumer_offsets-10
with message format version 2 (kafka.log.Log)
....
....
[2017-10-29 07:49:38,414] INFO [GroupCoordinator
2]: Stabilized group console-consumer -45516 generation
1 (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)
[2017-10-29 07:49:38,476] INFO [GroupCoordinator
2]: Assignment received from leader for group
console-consumer-45516 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2017-10-29 07:49:38,566] INFO Updated PartitionLeaderEpoch.
New: {epoch:0, offset:0}, Current: {epoch:-1,
offset-1} for Partition: __consumer_offsets-22.
Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
This is a message
This is another message |
|