±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚ²©¿ÍÔ°£¬±¾ÎÄÖ÷ÒªÒÔkafka_2.11-0.10.0.0ΪÀý£¬½éÉÜÁËKafka¼¯ÈºµÄ°²×°ºÍʹÓã¬Ï£Íû¶ÔÄúµÄѧϰÓÐËù°ïÖú¡£
|
|
KafkaÊÇÒ»ÖÖ¸ßÍÌÍÂÁ¿µÄ·Ö²¼Ê½·¢²¼¶©ÔĵÄÏûÏ¢¶ÓÁÐϵͳ£¬Ô±¾¿ª·¢×ÔLinkedIn£¬ÓÃ×÷LinkedInµÄ»î¶¯Á÷£¨ActivityStream£©ºÍÔËÓªÊý¾Ý´¦Àí¹ÜµÀ£¨Pipeline£©µÄ»ù´¡¡£ÏÖÔÚËüÒѱ»¶à¼Ò²»Í¬ÀàÐ͵Ĺ«Ë¾×÷Ϊ¶àÖÖÀàÐ͵ÄÊý¾Ý¹ÜµÀºÍÏûϢϵͳʹÓá£
1 KafkaÏûÏ¢¶ÓÁмò½é
1.1 »ù±¾ÊõÓï
Broker
Kafka¼¯Èº°üº¬Ò»¸ö»ò¶à¸ö·þÎñÆ÷£¬ÕâÖÖ·þÎñÆ÷±»³ÆÎªbroker[5]
Topic
ÿÌõ·¢²¼µ½Kafka¼¯ÈºµÄÏûÏ¢¶¼ÓÐÒ»¸öÀà±ð£¬Õâ¸öÀà±ð±»³ÆÎªTopic¡££¨ÎïÀíÉϲ»Í¬TopicµÄÏûÏ¢·Ö¿ª´æ´¢£¬Âß¼ÉÏÒ»¸öTopicµÄÏûÏ¢ËäÈ»±£´æÓÚÒ»¸ö»ò¶à¸öbrokerÉϵ«Óû§Ö»ÐèÖ¸¶¨ÏûÏ¢µÄTopic¼´¿ÉÉú²ú»òÏû·ÑÊý¾Ý¶ø²»±Ø¹ØÐÄÊý¾Ý´æÓں䦣©
Partition
PartitionÊÇÎïÀíÉϵĸÅÄÿ¸öTopic°üº¬Ò»¸ö»ò¶à¸öPartition.£¨Ò»°ãΪkafka½ÚµãÊýcpuµÄ×ܺËÊý£©
Producer
¸ºÔð·¢²¼ÏûÏ¢µ½Kafka broker
Consumer
ÏûÏ¢Ïû·ÑÕߣ¬ÏòKafka broker¶ÁÈ¡ÏûÏ¢µÄ¿Í»§¶Ë¡£
Consumer Group
ÿ¸öConsumerÊôÓÚÒ»¸öÌØ¶¨µÄConsumer Group£¨¿ÉΪÿ¸öConsumerÖ¸¶¨group
name£¬Èô²»Ö¸¶¨group nameÔòÊôÓÚĬÈϵÄgroup£©¡£
1.2 ÏûÏ¢¶ÓÁÐ
1.2.1 »ù±¾ÌØÐÔ
¿ÉÀ©Õ¹
ÔÚ²»ÐèÒªÏÂÏßµÄÇé¿öϽøÐÐÀ©ÈÝ
Êý¾ÝÁ÷·ÖÇø(partition)´æ´¢ÔÚ¶à¸ö»úÆ÷ÉÏ
¸ßÐÔÄÜ
µ¥¸öbroker¾ÍÄÜ·þÎñÉÏǧ¿Í»§¶Ë
µ¥¸öbrokerÿÃëÖÖ¶Á/д¿É´ïÿÃ뼸°ÙÕ××Ö½Ú
¶à¸öbrokers×é³ÉµÄ¼¯Èº½«´ïµ½·Ç³£Ç¿µÄÍÌÍÂÄÜÁ¦
ÐÔÄÜÎȶ¨£¬ÎÞÂÛÊý¾Ý¶à´ó
KafkaÔڵײãÞðÆúÁËJava¶Ñ»º´æ»úÖÆ£¬²ÉÓÃÁ˲Ù×÷ϵͳ¼¶±ðµÄÒ³»º´æ£¬Í¬Ê±½«Ëæ»úд²Ù×÷¸ÄΪ˳Ðòд£¬ÔÙ½áºÏZero-CopyµÄÌØÐÔ¼«´óµØ¸ÄÉÆÁËIOÐÔÄÜ¡£
³Ö¾Ã´æ´¢
´æ´¢ÔÚ´ÅÅÌÉÏ
ÈßÓ౸·Ýµ½ÆäËû·þÎñÆ÷ÉÏÒÔ·ÀÖ¹¶ªÊ§
1.2.2 ÏûÏ¢¸ñʽ
Ò»¸ötopic¶ÔÓ¦Ò»ÖÖÏûÏ¢¸ñʽ£¬Òò´ËÏûÏ¢ÓÃtopic·ÖÀà
Ò»¸ötopic´ú±íµÄÏûÏ¢ÓÐ1¸ö»òÕß¶à¸öpatition(s)×é³É
Ò»¸öpartitionÖÐ
Ò»¸öpartitionÓ¦¸Ã´æ·ÅÔÚÒ»µ½¶à¸öserverÉÏ
Èç¹ûÖ»ÓÐÒ»¸öserver£¬¾ÍûÓÐÈßÓ౸·Ý£¬Êǵ¥»ú¶ø²»ÊǼ¯Èº
Èç¹ûÓжà¸öserver
Ò»¸öserverΪleader£¬ÆäËûserversΪfollowers£»leaderÐèÒª½ÓÊܶÁдÇëÇó£»followers½ö×÷ÈßÓ౸·Ý£»leader³öÏÖ¹ÊÕÏ£¬»á×Ô¶¯Ñ¡¾ÙÒ»¸öfollower×÷Ϊleader£¬±£Ö¤·þÎñ²»Öжϣ»Ã¿¸öserver¶¼¿ÉÄܰçÑÝһЩpartitionsµÄleaderºÍÆäËüpartitionsµÄfollower½ÇÉ«£¬ÕâÑùÕû¸ö¼¯Èº¾Í»á´ïµ½¸ºÔؾùºâµÄЧ¹û
ÏûÏ¢°´Ë³Ðò´æ·Å£¬Ë³Ðò²»¿É±ä
Ö»ÄÜ×·¼ÓÏûÏ¢£¬²»ÄܲåÈë
ÿ¸öÏûÏ¢¶¼ÓÐÒ»¸öoffset£¬ÓÃ×÷ÏûÏ¢ID, ÔÚÒ»¸öpartitionÖÐΨһ
offsetÓÐconsumer±£´æºÍ¹ÜÀí£¬Òò´Ë¶Áȡ˳Ðòʵ¼ÊÉÏÊÇÍêÈ«ÓÐconsumer¾ö¶¨µÄ£¬²»Ò»¶¨ÊÇÏßÐÔµÄ
ÏûÏ¢Óг¬Ê±ÈÕÆÚ£¬¹ýÆÚÔòɾ³ý
1.2.3 Éú²úÕß producer
producer½«ÏûϢдÈëkafka
дÈëÒªÖ¸¶¨topicºÍpartition
ÏûÏ¢ÈçºÎ·Öµ½²»Í¬µÄpartition£¬Ëã·¨ÓÉproducerÖ¸¶¨
1.2.4 Ïû·ÑÕß consumer
consumer¶ÁÈ¡ÏûÏ¢²¢×÷´¦Àí
consumer group
Õâ¸ö¸ÅÄîµÄÒýÈëΪÁËÖ§³ÖÁ½ÖÖ³¡¾°£ºÃ¿ÌõÏûÏ¢·Ö·¢Ò»¸öÏû·ÑÕߣ¬Ã¿ÌõÏûÏ¢¹ã²¥¸øÏû·Ñ×éµÄËùÓÐÏû·ÑÕß
¶à¸öconsumer group¶©ÔÄÒ»¸ötopic£¬¸ÃtopciµÄÏûÏ¢¹ã²¥¸øgroupÄÚËùÓÐconsumer
Ò»ÌõÏûÏ¢·¢Ë͵½Ò»¸öconsumer groupºó£¬Ö»ÄÜÓɸÃgroupµÄÒ»¸öconsumer½ÓÊÕºÍʹÓÃ
Ò»¸ögroupÖеÄÿ¸öconsumer¶ÔÓ¦Ò»¸öpartition¿ÉÒÔ´øÀ´ÈçϺô¦
¿ÉÒÔ°´ÕÕpartitionµÄÊýÄ¿½øÐв¢·¢´¦Àí
ÿ¸öpartition¶¼Ö»ÓÐÒ»¸öconsumer¶ÁÈ¡£¬Òò¶ø±£Ö¤ÁËÏûÏ¢±»´¦ÀíµÄ˳ÐòÊǰ´ÕÕpartitionµÄ´æ·Å˳Ðò½øÐУ¬×¢ÒâÕâ¸ö˳ÐòÊܵ½producer´æ·ÅÏûÏ¢µÄËã·¨Ó°Ïì
Ò»¸öConsumer¿ÉÒÔÓжà¸öÏ߳̽øÐÐÏû·Ñ£¬Ïß³ÌÊýÓ¦²»¶àÓÚtopicµÄpartitionÊý£¬ÒòΪ¶ÔÓÚÒ»¸ö°üº¬Ò»»ò¶àÏû·ÑÏ̵߳Äconsumer
groupÀ´Ëµ£¬Ò»¸öpartitionÖ»ÄÜ·Ö¸øÆäÖеÄÒ»¸öÏû·ÑÏß³ÌÏû·Ñ£¬ÇÒÈþ¡¿ÉÄܶàµÄÏß³ÌÄÜ·ÖÅäµ½partition£¨²»¹ýʵ¼ÊÉÏÕæÕýÈ¥Ïû·ÑµÄÏ̼߳°Ïß³ÌÊý»¹ÊÇÓÉÏ̳߳صĵ÷¶È»úÖÆÀ´¾ö¶¨£©¡£ÕâÑùÈç¹ûÏß³ÌÊý±ÈpartitionÊý¶à£¬ÄÇôµ¥Éä·ÖÅäÒ²»áÓжà³öµÄỊ̈߳¬ËüÃǾͲ»»áÏû·Ñµ½ÈκÎÒ»¸öpartitionµÄÊý¾Ý¶ø¿ÕתºÄ×ÊÔ´
¡£
Èç¹ûconsumer´Ó¶à¸öpartition¶Áµ½Êý¾Ý£¬²»±£Ö¤Êý¾Ý¼äµÄ˳ÐòÐÔ£¬kafkaÖ»±£Ö¤ÔÚÒ»¸öpartitionÉÏÊý¾ÝÊÇÓÐÐòµÄ£¬µ«¶à¸öpartition£¬¸ù¾ÝÄã¶ÁµÄ˳Ðò»áÓв»Í¬
Ôö¼õconsumer£¬broker£¬partition»áµ¼ÖÂrebalance£¬ËùÒÔrebalanceºóconsumer¶ÔÓ¦µÄpartition»á·¢Éú±ä»¯
2. °²×°ºÍʹÓÃ
ÒÔkafka_2.11-0.10.0.0ΪÀý¡£
ÏÂÔØ½âѹºó£¬½øÈëkafka_2.11-0.10.0.0/
2.1 Æô¶¯Zookeeper
²âÊÔʱ¿ÉÒÔʹÓÃKafka¸½´øµÄZookeeper£º
Æô¶¯£º
./bin/zookeeper-server-start.sh
config/zookeeper.properties & £¬config/zookeeper.propertiesÊÇZookeeperµÄÅäÖÃÎļþ¡£
|
½áÊø£º
./bin/zookeeper-server-stop.sh
|
²»¹ý×îºÃ×Ô¼º´î½¨Ò»¸öZookeeper¼¯Èº£¬Ìá¸ß¿ÉÓÃÐԺͿɿ¿ÐÔ¡£Ïê¼û£ºZookeeperµÄ°²×°ºÍʹÓ᪡ªMarchOn
2.2 Æô¶¯Kafka·þÎñÆ÷
2.2.1 ÅäÖÃÎļþ
ÅäÖÃconfig/server.propertiesÎļþ£¬Ò»°ãÐèÒªÅäÖÃÈçÏÂ×ֶΣ¬ÆäËû°´Ä¬Èϼ´¿É£º
broker.id£º
¡¡¡¡¡¡¡¡¡¡¡¡Ã¿Ò»¸öbrokerÔÚ¼¯ÈºÖеÄΨһ±íʾ£¬ÒªÇóÊÇÕýÊý
listeners£¨Ð§¹û֮ͬǰµÄ°æ±¾µÄhost.name¼°port£©£º×¢Òâ°ó¶¨host.name£¬·ñÔò¿ÉÄܳöÏÖĪÃûÆäÃîµÄ´íÎóÈçconsumerÕÒ²»µ½broker¡£Õâ¸öhost.nameÊÇKafkaµÄserverµÄ»úÆ÷Ãû×Ö£¬»á×¢²áµ½ZookeeperÖÐ
log.dirs£º ¡¡¡¡¡¡¡¡¡¡¡¡ kafkaÊý¾ÝµÄ´æ·ÅµØÖ·£¬¶à¸öµØÖ·µÄ»°ÓöººÅ·Ö¸î,¶à¸öĿ¼·Ö²¼ÔÚ²»Í¬´ÅÅÌÉÏ¿ÉÒÔÌá¸ß¶ÁдÐÔÄÜ
log.retention.hours£º ¡¡Êý¾ÝÎļþ±£Áô¶à³¤Ê±¼ä£¬ ´æ´¢µÄ×î´óʱ¼ä³¬¹ýÕâ¸öʱ¼ä»á¸ù¾Ýlog.cleanup.policyÉèÖÃÊý¾ÝÇå³ý²ßÂÔ
zookeeper.connect£º ¡¡¡¡Ö¸¶¨ZooKeeperµÄconnect string£¬ÒÔhostname:portµÄÐÎʽ£¬¿ÉÓжà¸öÒÔ¶ººÅ·Ö¸ô£¬Èçhostname1:port1,hostname2:port2,hostname3:port3£¬»¹¿ÉÓз¾¶£¬È磺hostname1:port1,hostname2:port2,hostname3:port3/kafka£¬×¢ÒâÒªÊÂÏÈÔÚzkÖд´½¨/kafka½Úµã£¬·ñÔò»á±¨³ö´íÎó£ºjava.lang.IllegalArgumentException:
Path length must be > 0
|
ËùÓвÎÊýµÄº¬Òå¼°ÅäÖÿɲο¼£ºhttp://orchome.com/12¡¢http://blog.csdn.net/lizhitao/article/details/25667831
Ò»¸öÅäÖÃʾÀýÈçÏ£º
1
# Licensed to the Apache Software Foundation
(ASF) under one or more
2 # contributor license agreements. See the
NOTICE file distributed with
3 # this work for additional information regarding
copyright ownership.
4 # The ASF licenses this file to You under
the Apache License, Version 2.0
5 # (the "License"); you may not use
this file except in compliance with
6 # the License. You may obtain a copy of the
License at
7 #
8 # http://www.apache.org/licenses/LICENSE -
2.0
9 #
10 # Unless required by applicable law or agreed
to in writing , software
11 # distributed under the License is distributed
on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.
13 # See the License for the specific language
governing permissions and
14 # limitations under the License.
15 # see kafka.server.KafkaConfig for additional
details and defaults
16
17 Server Basics
18
19 # The id of the broker. This must be set
to a unique integer for each broker.
20 broker.id=1
21
22 Socket Server Settings
23
24 # The address the socket server listens on.
It will get the value returned from
25 # java.net.InetAddress.getCanonicalHostName()
if not configured.
26 # FORMAT:
27 # listeners = security_protocol://host_name:port
28 # EXAMPLE:
29 # listeners = PLAINTEXT://your. host . name
: 9092
30 listeners=PLAINTEXT://192 .168 .6 .128 :
9092
31
32 # Hostname and port the broker will advertise
to producers and consumers. If not set,
33 # it uses the value for "listeners"
if configured. Otherwise, it will use the value
34 # returned from java.net.InetAddress.getCanonicalHostName().
35 #advertised.listeners=PLAINTEXT://your.host.name:9092
36
37 # The number of threads handling network
requests
38 num.network.threads=3
39
40 # The number of threads doing disk I/O
41 num.io.threads=8
42
43 # The send buffer (SO_SNDBUF) used by the
socket server
44 socket.send.buffer.bytes=102400
45
46 # The receive buffer (SO_RCVBUF) used by
the socket server
47 socket.receive.buffer.bytes=102400
48
49 # The maximum size of a request that the
socket server will accept (protection against
OOM)
50 socket.request.max.bytes=104857600
51
52
53 Log Basics
54
55 # A comma seperated list of directories under
which to store log files
56 log.dirs=/usr/local/kafka/ kafka_2.11 - 0.10
.0 .0 /kfk_data/
57
58 # The default number of log partitions per
topic. More partitions allow greater
59 # parallelism for consumption, but this will
also result in more files across
60 # the brokers.
61 num.partitions=2
62 auto.create.topics.enable=false
63
64 # The number of threads per data directory
to be used for log recovery at startup and flushing
at shutdown.
65 # This value is recommended to be increased
for installations with data dirs located in
RAID array.
66 num.recovery.threads.per.data.dir=1
67
68 Log Flush Policy
69
70 # Messages are immediately written to the
filesystem but by default we only fsync() to
sync
71 # the OS cache lazily. The following configurations
control the flush of data to disk.
72 # There are a few important trade-offs here:
73 # 1. Durability: Unflushed data may be lost
if you are not using replication.
74 # 2. Latency: Very large flush intervals
may lead to latency spikes when the flush does
occur as there will be a lot of data to flush.
75 # 3. Throughput: The flush is generally the
most expensive operation, and a small flush
interval may lead to exceessive seeks.
76 # The settings below allow one to configure
the flush policy to flush data after a period
of time or
77 # every N messages (or both). This can be
done globally and overridden on a per-topic
basis.
78
79 # The number of messages to accept before
forcing a flush of data to disk
80 #log.flush.interval.messages=10000
81
82 # The maximum amount of time a message can
sit in a log before we force a flush
83 #log.flush.interval.ms=1000
84
85Log Retention Policy
86
87 # The following configurations control the
disposal of log segments. The policy can
88 # be set to delete segments after a period
of time, or after a given size has accumulated.
89 # A segment will be deleted whenever *either*
of these criteria are met. Deletion always happens
90 # from the end of the log.
91
92 # The minimum age of a log file to be eligible
for deletion
93 log.retention.hours=4
94
95 # A size-based retention policy for logs.
Segments are pruned from the log as long as
the remaining
96 # segments don't drop below log.retention.bytes.
97 #log.retention.bytes=1073741824
98
99 # The maximum size of a log segment file.
When this size is reached a new log segment
will be created.
100 log.segment.bytes=1073741824
101
102 # The interval at which log segments are
checked to see if they can be deleted according
103 # to the retention policies
104 log.retention.check.interval.ms=300000
105
106 Zookeeper
107
108 # Zookeeper connection string (see zookeeper
docs for details).
109 # This is a comma separated host:port pairs,
each corresponding to a zk
110 # server. e.g. "127.0.0.1:3000,127.0.0.1
: 3001 ,127 .0 .0 .1: 3002".
111 # You can also append an optional chroot
string to the urls to specify the
112 # root directory for all kafka znodes.
113 zookeeper.connect=192.168 . 6 .131: 2181
,192 .168 .6 .132 :2181 ,192 .168 .6 .133 :
2181
114
115 # Timeout in ms for connecting to zookeeper
116 zookeeper.connection.timeout.ms=6000
|
×¢Òâauto.create.topics.enable×ֶΣ¬ÈôΪtrueÔòÈç¹ûproducerдÈëij¸ö²»´æÔÚµÄtopicʱ»á×Ô¶¯´´½¨¸Ãtopic£¬ÈôΪfalseÔòÐèÒªÊÂÏÈ´´½¨·ñÔò»á±¨´í£ºfailed
after 3 retries¡£
2.2.2 ÃüÁî
Æô¶¯£º bin/kafka-server-start.sh config/server.properties
£¬Éú²ú»·¾³×îºÃÒÔÊØ»¤³ÌÐòÆô¶¯£ºnohup &
½áÊø£º bin/kafka-server-stop.sh
2.2.3 KafkaÔÚZookeeperÖеĴ洢½á¹¹
ÈôÉÏÊöµÄzookeeper.connectµÄֵûÓз¾¶£¬ÔòΪ¸ù·¾¶£¬Æô¶¯ZookeeperºÍKafka£¬ÃüÁîÐÐÁ¬½ÓZookeeperºó£¬ÓÃ
get / ÃüÁî¿É·¢ÏÖÓÐ consumers¡¢config¡¢controller¡¢admin¡¢brokers¡¢zookeeper¡¢controller_epoch
Õ⼸¸öĿ¼¡£
Æä½á¹¹ÈçÏ£º£¨¾ßÌå¿É²Î¿¼£ºapache kafkaϵÁÐÖ®ÔÚzookeeperÖд洢½á¹¹£©
2.3 ʹÓÃ
kafka±¾ÉíÊǺÍzookeeperÏàÁ¬µÄ£¬¶ø¶ÔÓ¦producerºÍconsumerµÄ״̬±£´æÒ²¶¼ÊÇͨ¹ýzookeeperÍê³ÉµÄ¡£¶ÔKafkaµÄ¸÷ÖÖ²Ù×÷ͨ¹ýÆäËùÁ¬½ÓµÄZookeeperÍê³É¡£
2.3.1 ÃüÁîÐпͻ§¶Ë
´´½¨topic£º bin/kafka-topics.sh --create --zookeeper
localhost:2181 --replication-factor 1 --partitions
1 --topic test
ÁгöËùÓÐtopic£º bin/kafka-topics.sh --list --zookeeper
localhost:2181
²é¿´topicÐÅÏ¢£¨°üÀ¨·ÖÇø¡¢¸±±¾Çé¿öµÈ£©£º kafka-topics.sh --describe
--zookeeper localhost:2181 --topic my-replicated-topic
£¬»áÁгö·ÖÇøÊý¡¢¸±±¾Êý¡¢¸±±¾leader½Úµã¡¢¸±±¾½Úµã¡¢»î×ŵĸ±±¾½Úµã
ÍùijtopicÉú²úÏûÏ¢£º bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic test
´ÓijtopicÏû·ÑÏûÏ¢£º bin/kafka-console-consumer.sh --zookeeper
localhost:2181 --topic test --from-beginning £¨Ä¬ÈÏÓÃÒ»¸öÏß³ÌÏû·ÑÖ¸¶¨topicµÄËùÓзÖÇøµÄÊý¾Ý£©
ɾ³ýij¸öKafka groupid£ºÁ¬½ÓZookeeperºóÓÃrmrÃüÁÈçɾ³ýÃûΪJSIµÄÏû·Ñ×飺
rmr /consumers/JSI
²é¿´Ïû·Ñ½ø¶È£º
./bin/kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker --group test-mirror-consumer-zsm
--zkconnect ec2-12345.cn-north-1.compute.amazonaws.com.cn:2181/kafka/blink/0822
--topic GPS2
¸÷²ÎÊý£º
--groupÖ¸MirrorMakerÏû·ÑÔ´¼¯ÈºÊ±Ö¸¶¨µÄgroup.id
-zkconnectÖ¸Ô´¼¯ÈºµÄzookeeperµØÖ·
--topicÖ¸¶¨²éµÄtopic£¬Ã»Ö¸¶¨Ôò·µ»ØËùÓÐtopicµÄÏû·ÑÇé¿ö
|
2.3.2 Java¿Í»§¶Ë
1¡¢Topic²Ù×÷£º
import
kafka.admin.DeleteTopicCommand;
2 import kafka.admin.TopicCommand;
3
4 /**
5 * @author zsm
6 * @date 2016Äê9ÔÂ27ÈÕ ÉÏÎç10:26:42
7 * @version 1.0
8 * @parameter
9 * @since
10 * @return
11 */
12 public class JTopic {
13 public static void createTopic(String zkAddr,
String topicName, int partition, int replication)
{
14 String[] options = new String[] { "--create",
"--zookeeper", zkAddr, "--topic",
topicName, "--partitions",
15 partition + "", "--replication-factor",
replication + "" };
16 TopicCommand.main(options);
17 }
18
19 public static void listTopic(String zkAddr)
{
20 String[] options = new String[] { "--list",
"--zookeeper", zkAddr };
21 TopicCommand.main(options);
22 }
23
24 public static void describeTopic(String zkAddr,
String topicName) {
25 String[] options = new String[] { "--describe",
"--zookeeper", zkAddr, "--topic",
topicName, };
26 TopicCommand.main(options);
27 }
28
29 public static void alterTopic(String zkAddr,
String topicName) {
30 String[] options = new String[] { "--alter",
"--zookeeper", zkAddr, "--topic",
topicName, "--partitions", "5"
};
31 TopicCommand.main(options);
32 }
33
34 // ͨ¹ýɾ³ýzkÀïÃæ¶ÔÓ¦µÄ·¾¶À´ÊµÏÖɾ³ýtopicµÄ¹¦ÄÜ,Ö»»áɾ³ýzkÀïÃæµÄÐÅÏ¢£¬KafkaÉÏÕæÊµµÄÊý¾Ý²¢Ã»ÓÐɾ³ý
35 public static void deleteTopic(String zkAddr,
String topicName) {
36 String[] options = new String[] { "--zookeeper",
zkAddr, "--topic", topicName };
37 DeleteTopicCommand.main(options);
38 }
39
40 public static void main(String[] args) {
41 // TODO Auto-generated method stub
42
43 String myTestTopic = "ZsmTestTopic";
44 int myPartition = 4;
45 int myreplication = 1;
46
47 //createTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic, myPartition, myreplication);
48 // listTopic(ConfigureAPI.KafkaProperties.ZK);
49 describeTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic);
50 // alterTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic);
51 // deleteTopic(ConfigureAPI.KafkaProperties.ZK,
myTestTopic);
52 }
53
54 }
|
2¡¢Ð´£º£¨Ð´Ê±¿ÉÒÔÖ¸¶¨keyÒÔ¹©Kafka¸ù¾Ýkey½«Êý¾ÝдÈëij¸ö·ÖÇø£¬ÈôÎÞÖ¸¶¨£¬Ôò¼¸ºõ¾ÍÊÇËæ»úÕÒÒ»¸ö·ÖÇø·¢ËÍÎÞkeyµÄÏûÏ¢£¬È»ºó°ÑÕâ¸ö·ÖÇøºÅ¼ÓÈëµ½»º´æÖÐÒÔ±¸ºóÃæÖ±½ÓʹÓ᪡ªµ±È»£¬Kafka±¾ÉíÒ²»áÇå¿Õ¸Ã»º´æ£¨Ä¬ÈÏÿ10·ÖÖÓ»òÿ´ÎÇëÇótopicÔªÊý¾Ýʱ£©£©
1 package com.zsm.kfkdemo;
2
3 import java.util.ArrayList;
4 import java.util.List;
5 import java.util.Properties;
6
7 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
8
9 import kafka.javaapi.producer.Producer;
10 import kafka.producer.KeyedMessage;
11 import kafka.producer.ProducerConfig;
12
13 /**
14 * ¿ÉÒÔÖ¸¶¨¹æÔò(keyºÍ·ÖÇøº¯Êý)ÒÔÈÃÏûϢдµ½Ìض¨·ÖÇø£º
15 * <p>
16 * 1¡¢Èô·¢Ë͵ÄÏûϢûÓÐÖ¸¶¨keyÔòKafka»áËæ»úÑ¡ÔñÒ»¸ö·ÖÇø
17 * </p>
18 * <p>
19 * 2¡¢·ñÔò£¬ÈôÖ¸¶¨ÁË·ÖÇøº¯Êý(ͨ¹ýpartitioner.class)Ôò¸Ãº¯ÊýÒÔkeyΪ²ÎÊýÈ·¶¨Ð´µ½Äĸö·ÖÇø
20 * </p>
21 * <p>
22 * 3¡¢·ñÔò£¬Kafka¸ù¾Ýhash(key)%partitionNumÈ·¶¨Ð´µ½Äĸö·ÖÇø
23 * </p>
24 *
25 * @author zsm
26 * @date 2016Äê9ÔÂ27ÈÕ ÉÏÎç10:26:42
27 * @version 1.0
28 * @parameter
29 * @since
30 * @return
31 */
32 public class JProducer extends Thread {
33 private Producer<String, String> producer;
34 private String topic;
35 private final int SLEEP = 10;
36 private final int msgNum = 1000;
37
38 public JProducer(String topic) {
39 Properties props = new Properties();
40 props.put("metadata.broker.list",
KafkaProperties.BROKER_LIST);// Èç192.168.6.127:9092,192.168.6.128:9092
41 // request.required.acks
42 // 0, which means that the producer never
waits for an acknowledgement from the broker
(the same behavior as 0.7). This option provides
the lowest latency but the weakest durability
guarantees
43 // (some data will be lost when a server
fails).
44 // 1, which means that the producer gets
an acknowledgement after the leader replica
has received the data. This option provides
better durability as the client waits until
the server
45 // acknowledges the request as successful
(only messages that were written to the now-dead
leader but not yet replicated will be lost).
46 // -1, which means that the producer gets
an acknowledgement after all in-sync replicas
have received the data. This option provides
the best durability, we guarantee that no messages
will be
47 // lost as long as at least one in sync replica
remains.
48 props.put("request.required.acks",
"-1");
49 // ÅäÖÃvalueµÄÐòÁл¯Àà
50 props.put("serializer.class", "kafka.serializer.StringEncoder");
51 // ÅäÖÃkeyµÄÐòÁл¯Àà
52 props.put("key.serializer.class",
"kafka.serializer.StringEncoder");
53 // Ìṩ×Ô¶¨ÒåµÄ·ÖÇøº¯Êý½«ÏûϢдµ½·ÖÇøÉÏ£¬Î´Ö¸¶¨µÄ»°Kafka¸ù¾Ýhash(messageKey)%partitionNumÈ·¶¨Ð´µ½Äĸö·ÖÇø
54 props.put("partitioner.class",
"com.zsm.kfkdemo.MyPartitioner");
55 producer = new Producer<String, String>(new
ProducerConfig(props));
56 this.topic = topic;
57 }
58
59 @Override
60 public void run() {
61 boolean isBatchWriteMode = true;
62 System.out.println("isBatchWriteMode:
" + isBatchWriteMode);
63 if (isBatchWriteMode) {
64 // ÅúÁ¿·¢ËÍ
65 int batchSize = 100;
66 List<KeyedMessage<String, String>>
msgList = new ArrayList<KeyedMessage<String,
String>>(batchSize);
67 for (int i = 0; i < msgNum; i++) {
68 String msg = "Message_" + i;
69 msgList.add(new KeyedMessage<String, String>(topic,
i + "", msg));
70 // msgList.add(new KeyedMessage<String,
String>(topic, msg));//δָ¶¨key£¬Kafka»á×Ô¶¯Ñ¡ÔñÒ»¸ö·ÖÇø
71 if (i % batchSize == 0) {
72 producer.send(msgList);
73 System.out.println("Send->["
+ msgList + "]");
74 msgList.clear();
75 try {
76 sleep(SLEEP);
77 } catch (Exception ex) {
78 ex.printStackTrace();
79 }
80 }
81 }
82 producer.send(msgList);
83 } else {
84 // µ¥¸ö·¢ËÍ
85 for (int i = 0; i < msgNum; i++) {
86 KeyedMessage<String, String> msg =
new KeyedMessage<String, String>(topic,
i + "", "Message_" + i);
87 // KeyedMessage<String, String> msg
= new KeyedMessage<String, String>(topic,
"Message_" + i);//δָ¶¨key£¬Kafka»á×Ô¶¯Ñ¡ÔñÒ»¸ö·ÖÇø
88 producer.send(msg);
89 System.out.println("Send->["
+ msg + "]");
90 try {
91 sleep(SLEEP);
92 } catch (Exception ex) {
93 ex.printStackTrace();
94 }
95 }
96 }
97
98 System.out.println("send done");
99 }
100
101 public static void main(String[] args) {
102 JProducer pro = new JProducer(KafkaProperties.TOPIC);
103 pro.start();
104 }
105 }
|
3¡¢¶Á£º£¨¶ÔÓÚConsumer£¬ÐèҪעÒâ auto.commit.enable ºÍ auto.offset.reset
ÕâÁ½¸ö×ֶΣ©
package
com.zsm.kfkdemo;
2
3 import java.text.MessageFormat;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.Properties;
8
9 import com.zsm.kfkdemo.ConfigureAPI.KafkaProperties;
10
11 import kafka.consumer.Consumer;
12 import kafka.consumer.ConsumerConfig;
13 import kafka.consumer.ConsumerIterator;
14 import kafka.consumer.KafkaStream;
15 import kafka.javaapi.consumer.ConsumerConnector;
16 import kafka.message.MessageAndMetadata;
17
18 /**
19 * ͬһconsumer groupµÄ¶àÏß³ÌÏû·Ñ¿ÉÒÔÁ½ÖÖ·½·¨ÊµÏÖ£º
20 * <p>
21 * 1¡¢ÊµÏÖµ¥Ï߳̿ͻ§¶Ë£¬Æô¶¯¶à¸öÈ¥Ïû·Ñ
22 * </p>
23 * <p>
24 * 2¡¢ÔÚ¿Í»§¶ËµÄcreateMessageStreamsÀïΪtopicÖ¸¶¨
´óÓÚ1µÄÏß³ÌÊý£¬ÔÙÆô¶¯¶à¸öÏ̴߳¦Àíÿ¸östream
25 * </p>
26 *
27 * @author zsm
28 * @date 2016Äê9ÔÂ27ÈÕ ÉÏÎç10:26:42
29 * @version 1.0
30 * @parameter
31 * @since
32 * @return
33 */
34 public class JConsumer extends Thread {
35
36 private ConsumerConnector consumer;
37 private String topic;
38 private final int SLEEP = 20;
39
40 public JConsumer(String topic) {
41 consumer = Consumer.createJavaConsumerConnector(this.consumerConfig
());
42 this.topic = topic;
43 }
44
45 private ConsumerConfig consumerConfig() {
46 Properties props = new Properties();
47 props.put("zookeeper.connect",
KafkaProperties.ZK);
48 props.put("group.id", KafkaProperties.GROUP_ID);
49 props.put("auto.commit.enable",
"true");// ĬÈÏΪtrue£¬
ÈÃconsumer¶¨ÆÚcommit
offset£¬zookeeper»á½«offset³Ö¾Ã»¯£¬
·ñÔòÖ»ÔÚÄڴ棬Èô¹ÊÕÏÔòÔÙÏû·Ñʱ»á´Ó×îºóÒ»´Î±£´æµÄoffset¿ªÊ¼
50 props.put("auto.commit.interval.ms",
KafkaProperties.INTERVAL + "");//
¾¹ýINTERVALʱ¼äÌá½»Ò»´Îoffset
51 props.put("auto.offset.reset",
"largest");// What to do when there
is no initial offset in ZooKeeper or if an offset
is out of range
52 props.put("zookeeper.session.timeout.ms",
KafkaProperties.TIMEOUT + "");
53 props.put("zookeeper.sync.time.ms",
"200");
54 return new ConsumerConfig(props);
55 }
56
57 @Override
58 public void run() {
59 Map<String, Integer> topicCountMap
= new HashMap<String, Integer>();
60 topicCountMap.put(topic, new Integer(1));//
Ïß³ÌÊý
61 Map<String, List<KafkaStream<byte[],
byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
62 KafkaStream<byte[], byte[]> stream
= streams.get(topic).get(0);// ÈôÉÏÃæÉèÁ˶à¸öÏß³ÌÈ¥Ïû·Ñ£¬
ÔòÕâÀïÐèΪÿ¸östream¿ª¸öÏß³Ì×öÈçϵĴ¦Àí
63
64 ConsumerIterator<byte[], byte[]> it
= stream.iterator();
65 MessageAndMetadata<byte[], byte[]>
messageAndMetaData = null;
66 while (it.hasNext()) {
67 messageAndMetaData = it.next();
68 System.out.println(MessageFormat.format("Receive->[
message:{0} , key:{1} , partition:{2} , offset:{3}
]",
69 new String(messageAndMetaData.message()),
new String(messageAndMetaData.key()),
70 messageAndMetaData.partition() + "",
messageAndMetaData.offset() + ""));
71 try {
72 sleep(SLEEP);
73 } catch (Exception ex) {
74 ex.printStackTrace();
75 }
76 }
77 }
78
79 public static void main(String[] args) {
80 JConsumer con = new JConsumer(KafkaProperties.TOPIC);
81 con.start();
82 }
83 }
|
ÓëKafkaÏà¹ØµÄMavenÒÀÀµ£º
1 <dependency>
2 <groupId>org.apache.kafka</groupId>
3 <artifactId>kafka_2.9.2</artifactId>
4 <version>0.8.1.1</version>
5 <exclusions>
6 <exclusion>
7 <groupId>com.sun.jmx</groupId>
8 <artifactId>jmxri</artifactId>
9 </exclusion>
10 <exclusion>
11 <groupId>com.sun.jdmk</groupId>
12 <artifactId>jmxtools</artifactId>
13 </exclusion>
14 <exclusion>
15 <groupId>javax.jms</groupId>
16 <artifactId>jms</artifactId>
17 </exclusion>
18 </exclusions>
19 </dependency>
|
3 MirrorMaker
Kafka×ÔÉíÌṩµÄMirrorMaker¹¤¾ßÓÃÓÚ°ÑÒ»¸ö¼¯ÈºµÄÊý¾Ýͬ²½µ½ÁíÒ»¼¯Èº£¬ÆäÔÀí¾ÍÊǶÔÔ´¼¯ÈºÏû·Ñ¡¢¶ÔÄ¿±ê¼¯ÈºÉú²ú¡£
ÔËÐÐʱÐèÒªÖ¸¶¨Ô´¼¯ÈºµÄZookeeperµØÖ·£¨pullģʽ£©»òÄ¿±ê¼¯ÈºµÄBrokerÁÐ±í£¨pushģʽ£©¡£
3.1 ʹÓÃ
ÔËÐÐ ./kafka-run-class.sh kafka.tools.MirrorMaker --help
²é¿´Ê¹ÓÃ˵Ã÷£¬ÈçÏ£º
1
Option Description
2 ------ -----------
3 --blacklist <Java regex (String)> Blacklist
of topics to mirror.
4 --consumer.config <config file> Consumer
config to consume from a
5 source cluster. You may specify
6 multiple of these.
7 --help Print this message.
8 --num.producers <Integer: Number of Number
of producer instances (default:
9 producers> 1)
10 --num.streams <Integer: Number of Number
of consumption streams.
11 threads> (default: 1)
12 --producer.config <config file> Embedded
producer config.
13 --queue.size <Integer: Queue size in Number
of messages that are buffered
14 terms of number of messages> between the
consumer and producer
15 (default: 10000)
16 --whitelist <Java regex (String)> Whitelist
of topics to mirror.
|
3.2 Æô¶¯
./bin/kafka-run-class.sh
kafka.tools.MirrorMaker --consumer.config zsmSourceClusterConsumer.config
--num.streams 2 --producer.config zsmTargetClusterProducer.config
--whitelist="ds*"
--consumer.configËùÖ¸¶¨µÄÎļþÀïÖÁÉÙÐèÒªÓÐzookeeper.connect¡¢group.idÁ½×Ö¶Î
--producer.configÖÁÉÙÐèÒªÓÐmetadata.broker.list×ֶΣ¬Ö¸¶¨Ä¿±ê¼¯ÈºµÄbrookerÁбí
--whitelistÖ¸¶¨ÒªÍ¬²½µÄtopic
|
¿ÉÒÔÓÃ2.3.1Ëù˵µÄ²é¿´Ïû·Ñ½ø¶ÈÀ´²é¿´¶ÔÔ¼¯ÈºµÄͬ²½×´¿ö£¨¼´Ïû·Ñ×´¿ö£©¡£
4 Kafka¼à¿Ø¹¤¾ß£¨KafkaOffsetMonitor£©
¿ÉÒÔ½èÖúKafkaOffsetMonitorÀ´Í¼Ðλ¯Õ¹Ê¾KafkaµÄbroker½Úµã¡¢topic¡¢consumer¼°offsetµÈÐÅÏ¢¡£
ÒÔKafkaOffsetMonitor-assembly-0.2.0.jarΪÀý£¬ÏÂÔØºóÖ´ÐУº
#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m
-XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar
\
com.quantifind.kafka.offsetapp.OffsetGetterWeb
\
--zk 192.168.5.131:2181,192.168.6.132:2181,192.168.6.133:2181
\
--port 8087 \
--refresh 10.seconds \
--retain 1.days 1>./zsm-logs/stdout.log 2>./zsm-logs/stderr.log
&
|
ÆäÖУ¬zk°´ÕÕhost1:port1,host2:port2¡µÄ¸ñʽȥд¼´¿É£¬portΪ¿ªÆôweb½çÃæµÄ¶Ë¿ÚºÅ£¬refreshΪˢÐÂʱ¼ä£¬retainΪÊý¾Ý±£Áôʱ¼ä£¨µ¥Î»seconds,
minutes, hours, days£©
5 Kafka¼¯Èº¹ÜÀí¹¤¾ß£¨Kafka Manager£©
kafka-managerÊÇyahoo¿ªÔ´³öÀ´µÄÏîÄ¿£¬ÊôÓÚÉÌÒµ¼¶±ðµÄ¹¤¾ßÓÃScala±àд¡£
Õâ¸ö¹ÜÀí¹¤¾ß¿ÉÒÔºÜÈÝÒ׵ط¢ÏÖ·Ö²¼ÔÚ¼¯ÈºÖеÄÄÄЩtopic·Ö²¼²»¾ùÔÈ£¬»òÕßÊÇ·ÖÇøÔÚÕû¸ö¼¯Èº·Ö²¼²»¾ùÔȵĵÄÇé¿ö¡£ËüÖ§³Ö¹ÜÀí¶à¸ö¼¯Èº¡¢Ñ¡Ôñ¸±±¾¡¢¸±±¾ÖØÐ·ÖÅäÒÔ¼°´´½¨Topic¡£Í¬Ê±£¬Õâ¸ö¹ÜÀí¹¤¾ßÒ²ÊÇÒ»¸ö·Ç³£ºÃµÄ¿ÉÒÔ¿ìËÙä¯ÀÀÕâ¸ö¼¯ÈºµÄ¹¤¾ß¡£
´Ë¹¤¾ßÒÔ¼¯ÈºµÄ·½Ê½ÔËÐУ¬ÐèÒªZookeeper¡£
²Î¿¼×ÊÁÏ£ºhttp://hengyunabc.github.io/kafka-manager-install/
5.1 °²×°
ÐèÒª´ÓGithubÏÂÔØÔ´Âë²¢°²×°sbt¹¤¾ß±àÒëÉú³É°²×°°ü£¬Éú³ÉµÄʱ¼äºÜ³¤ÇÒ²»ÖªÎªºÎÒ»Ö±³ö´í£¬ËùÒÔÕâÀïÓÃÍøÓÑÒѱàÒëºÃµÄ°ü
£¨±¸·ÝÁ´½Ó£©¡£
°üΪkafka-manager-1.0-SNAPSHOT.zip
>½âѹ£º
unzip
kafka-manager-1.0-SNAPSHOT.zip
|
>ÅäÖÃconf/application.confÀïµÄkafka-manager.zkhosts£º
kafka-manager.zkhosts="192.168.6.131:2181,192.168.
6.132:2181,192.168.6.133:2181"
|
>Æô¶¯£º
./bin/kafka-manager
-Dconfig.file=conf/application.conf
|
ĬÈÏÊÇ9000¶Ë¿Ú£¬ÒªÊ¹ÓÃÆäËû¶Ë¿Ú¿ÉÒÔÔÚÃüÁîÐÐÖ¸¶¨http.port£¬´ËÍâkafka-manager.zkhostsÒ²¿ÉÒÔÔÚÃüÁîÐÐÖ¸¶¨£¬È磺
./bin/kafka-manager
-Dhttp.port=9001 -Dkafka-manager.zkhosts="192.168.6.131:2181,192.168.
6.132:2181,192.168.6.133:2181"
|
5.2 ʹÓÃ
·ÃÎÊwebÒ³Ãæ£¬ÔÚCluster->Add Cluster£¬ÊäÈëÒª¼à¿ØµÄKafka¼¯ÈºµÄZookeeper¼´¿É¡£
6 ½ø½×
ÔÚµ±Ç°µÄkafka°æ±¾ÊµÏÖÖУ¬¶ÔÓÚzookeeperµÄËùÓвÙ×÷¶¼ÊÇÓÉkafka controllerÀ´Íê³ÉµÄ£¨seriallyµÄ·½Ê½£©
offset¹ÜÀí£ºkafka»á¼Ç¼offsetµ½zkÖС£µ«ÊÇ£¬zk client api¶ÔzkµÄƵ·±Ð´ÈëÊÇÒ»¸öµÍЧµÄ²Ù×÷¡£0.8.2
kafkaÒýÈëÁËnative offset storage£¬½«offset¹ÜÀí´ÓzkÒÆ³ö£¬²¢ÇÒ¿ÉÒÔ×öµ½Ë®Æ½À©Õ¹¡£ÆäÔÀí¾ÍÊÇÀûÓÃÁËkafkaµÄcompacted
topic£¬offsetÒÔconsumer group,topicÓëpartionµÄ×éºÏ×÷ΪkeyÖ±½ÓÌá½»µ½compacted
topicÖС£Í¬Ê±KafkaÓÖÔÚÄÚ´æÖÐά»¤ÁËÈýÔª×éÀ´Î¬»¤×îеÄoffsetÐÅÏ¢£¬consumerÀ´È¡×îÐÂoffsetÐÅϢʱֱ½Ó´ÓÄÚ´æÄü´¿É¡£µ±È»£¬kafkaÔÊÐíÄã¿ìËÙcheckpoint×îеÄoffsetÐÅÏ¢µ½´ÅÅÌÉÏ¡£
ÈçºÎÈ·¶¨·ÖÇøÊý£º·ÖÇøÊýµÄÈ·¶¨ÓëÓ²¼þ¡¢Èí¼þ¡¢¸ºÔØÇé¿öµÈ¶¼Óйأ¬ÒªÊÓ¾ßÌåÇé¿ö¶ø¶¨£¬²»¹ýÒÀÈ»¿ÉÒÔ×ñÑÒ»¶¨µÄ²½ÖèÀ´³¢ÊÔÈ·¶¨·ÖÇøÊý£º´´½¨Ò»¸öÖ»ÓÐ1¸ö·ÖÇøµÄtopic£¬È»ºó²âÊÔÕâ¸ötopicµÄproducerÍÌÍÂÁ¿ºÍconsumerÍÌÍÂÁ¿¡£¼ÙÉèËüÃǵÄÖµ·Ö±ðÊÇTpºÍTc£¬µ¥Î»ÊÇMB/s¡£È»ºó¼ÙÉè×ܵÄÄ¿±êÍÌÍÂÁ¿ÊÇTt£¬ÄÇô·ÖÇøÊý
= Tt / max(Tp, Tc) |