| 编辑推荐: | 
								   
										
                                    本文来自于csdn,Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统. 
                                   | 
								   
							 
						 | 
                           
                         
                        
                       一、 前言 
                        什么是消息系统? 
                           早期两个应用程序间进行消息传递需要保证两个应用程序同时在线,并且耦合度很高。为了解决应用程序不在线的情况下业务正常运转,就产生了消息系统,消费发送者(生产者)将消息发送至消息系统,消息接受者(消费者)从消息系统中获取消息。 
                           提到消息系统,不得不说一下JMS即Java消息服务(Java Message Service)应用程序接口。是一个Java平台中关于面向消息中间件的API。用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API。 
                           通常消息传递有两种类型的消息模式可用一种是点对点queue队列模式(p2p),另一种是topic发布-订阅模式(public-subscribe)。 
                        点对点消息系统 
                           在点对点系统中,消息被保留在队列中。 一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。一旦消费者读取队列中的消息,它就从该队列中消失。该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。下图描述了结构。 
                            
                       发布 - 订阅消息系统 
                           在发布-订阅系统中,消息被保留在主题中。与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。 
                            在发布 - 订阅系统中,消息生产者称为发布者,消息使用者称为订阅者。一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。 
                             
                        MQ消息队列对比 
                          下面针对RabbitMQ与kafka进行对比 
                          应用场景上 
                           RabbitMQ:遵循AMQP(Advanced Message 
                            Queuing Protocol)协议,由内在高并发的erlanng语言开发,用在实时的对可靠性要求比较高的消息传递上。 
                           kafka:是Linkedin于2010年12月份开源的消息发布订阅系统,它主要用于处理活跃的流式数据,大数据量的数据处理上。 
                          在吞吐量上 
                          RabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘。 
                           kafka具有高的吞吐量,内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率很高。 
                          在集群负载均衡上 
                           RabbitMQ的负载均衡需要单独的loadbalancer进行支持。 
                           kafka采用zookeeper对集群中的broker、consumer进行协调管理。 
                       什么是Kafka? 
                           Apache Kafka是一个分布式发布-订阅消息系统和一个强大的队列,实际上就是JMS的一个变形,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。Kafka适合离线和在线消息消费。Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。Kafka构建在ZooKeeper同步服务之上。 
                          Kafka的特性 
                          以下是Kafka的几个好处 
                           高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, 
                            consumer group 对partition进行消费操作。 
                           可扩展性:kafka集群支持热扩展 
                           持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失 
                           容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败) 
                           高并发:支持数千个客户端同时读写 
                         应用场景 
                          Kafka可以在许多用例中使用。 其中一些列出如下: 
                           日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。 
                           消息系统:解耦和生产者和消费者、缓存消息等。 
                          用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka 
                            的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。 
                           运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。 
                           流式处理:比如spark streaming和storm 
                          事件源 
                            Kafka基本概念 
                          Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。 
                           Broker(经纪人):Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。 
                          Topic(主题):一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。 
                          Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。 
                          Segment:partition物理上由多个segment组成,每个Segment存着message信息。 
                           offset:一条消息在消息系统中的偏移量。 
                           Producer : 生产message发送到topic。 
                           Consumer : 订阅topic消费message, consumer作为一个线程来消费。 
                          ConsumerGroup:一个ConsumerGroup包含多个consumer,这个是预先在配置文件中配置好的。各个consumer(consumer线程)可以组成一个组(Consumer 
                            group),partition中的每个message只能被组(Consumer group) 中的一个consumer(consumer 
                            线程)消费,如果一个message可以被多个consumer(consumer 线程) 消费的话,那么这些consumer必须在不同的组。Kafka不支持一个partition中的message由两个或两个以上的consumer 
                            thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观锁(for 
                            update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer 
                            thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。 
                            
                          生产者和消费者 
                          针对生产者和消费者,需要注意以下几点 
                          分区在producer端进行 
                          一个分区只会由消费者组内的一个consumer消费,kafka会通过负载均衡机制自动分配 
                          offset由consumer端进行维护,一般交给zookeeper进行维护 
                          只能保证一个分区内的数据是有序的 
                            
                       二、 Apache Kafka - 安装步骤 
                          注:安装kafka前需要提前安装JDK与zookeeper 
                          Step 1: 下载Kafka并解压 
                           
                               > tar -xzfkafka_2.9.2-0.8.1.1.tgz 
                                   
                                > cdkafka_2.9.2-0.8.1.1  | 
                           
                         
                        
                          
                            Step 2: 配置环境变量(可选) 
                           
                               vi/etc/profile 
                                   
                                KAFKA_HOME=/opt/kafka_2.9.2-0.8.1.1    
                                PATH=$PATH:$KAFKA_HOME/bin | 
                           
                         
                          
                          
                             
                         
                          
                            Step 3: 修改配置文件中的以下内容 
                           
                               cd /opt/kafka_2.9.2-0.8.1.1/config 
                                   
                                viserver.properties    
                                broker.id=0 //为依次增长的:0、1、2、3、4,集群中唯一id    
                                log.dirs=/opt/kafka_2.9.2-0.8.1.1/logs //日志地址 
                                   
                                zookeeper.connect=localhost:2181 //zookeeperServers列表,各节点以逗号分开 
                                       
                                cd /opt/kafka_2.9.2-0.8.1.1/config    
                                vi zookeeper.properties    
                                dataDir=/usr/local/kafka/zookeeper    
                                dataLogDir=/usr/local/kafka/log/zookeeper  | 
                           
                         
                        
                          Step 4: 启动单节点服务 
                          在kafka的bin中存在很多sh文件,其中包含对zookeeper的启动与停止。首先启动zookeeper再启动kafka的broker。 
                         
                           
                               ./bin/zookeeper-server-start.shconfig/zookeeper.properties 
                                &    
                                ./bin/kafka-server-start.shconfig/server.properties 
                                &  | 
                           
                         
                        
                          Step 5: 创建topic 
                           
                              |  ./bin/kafka-topics.sh 
                                --create --zookeeper192.168.2.105:2181 --replication-factor 
                                1 --partitions 1 --topic testlzy  | 
                           
                         
                         
                          
                            列出所有topic 
                           
                              |  ./bin/kafka-topics.sh 
                                --zookeeper 192.168.2.105:2181--list  | 
                           
                         
                          
                          
                            Step 5: 创建生产者 
                           
                              |  ./bin/kafka-console-producer.sh--broker-list 
                                192.168.2.105:9093 --topic testlzy  | 
                           
                         
                          
                           Step 6: 创建消费者 
                        
                           
                              |  ./bin/kafka-console-consumer.sh 
                                --zookeeperlocalhost:2181 --topic testlzy --from-beginning 
                               | 
                           
                         
                         
                          
                            此时如果在生产者控制台中发布消息,消费者端能接收到,就算成功了。 
                           kafka常用命令 
                          以下是kafka常用命令行总结: 
                          0. 查看有哪些主题: 
                           
                              |  ./kafka-topics.sh 
                                --list --zookeeper192.168.0.201:2181  | 
                           
                         
                          
                          
                            1. 查看topic的详细信息 
                           
                              |  ./kafka-topics.sh 
                                -zookeeper 127.0.0.1:2181-describe -topic testKJ1 
                               | 
                           
                         
                         
                          
                            2. 为topic增加副本 
                           
                              |  ./kafka-reassign-partitions.sh 
                                -zookeeper127.0.0.1:2181 -reassignment-json-file 
                                json/partitions-to-move.json -execute  | 
                           
                         
                        
                          
                            3. 创建topic 
                           
                              |  ./kafka-topics.sh 
                                --create --zookeeperlocalhost:2181 --replication-factor 
                                1 --partitions 1 --topic testKJ1 | 
                           
                         
                          
                          
                            4. 为topic增加partition 
                           
                              |  ./bin/kafka-topics.sh 
                                –zookeeper127.0.0.1:2181 –alter –partitions 20 
                                –topic testKJ1  | 
                           
                         
                          
                          
                            5. kafka生产者客户端命令 
                           
                              |  ./kafka-console-producer.sh 
                                --broker-listlocalhost:9092 --topic testKJ1  | 
                           
                         
                          
                          
                            6. kafka消费者客户端命令 
                           
                              |  ./kafka-console-consumer.sh 
                                -zookeeperlocalhost:2181 --from-beginning --topic 
                                testKJ1  | 
                           
                         
                          
                          
                            7. kafka服务启动 
                           
                              |  ./kafka-server-start.sh 
                                -daemon../config/server.properties  | 
                           
                         
                         
                          
                            8. 下线broker 
                           
                               ./kafka-run-class.shkafka.admin.ShutdownBroker 
                                --zookeeper 127.0.0.1:2181 --broker #brokerId#--num.retries 
                                3 --retry.interval.ms 60    
                                shutdown broker | 
                           
                         
                         
                          
                            9. 删除topic 
                           
                               ./kafka-run-class.shkafka.admin.DeleteTopicCommand 
                                --topic testKJ1 --zookeeper 127.0.0.1:2181   
                                 
                                ./kafka-topics.sh --zookeeperlocalhost:2181 --delete 
                                --topic testKJ1  | 
                           
                         
                         
                          
                            10. 查看consumer组内消费的offset 
                           
                               ./kafka-run-class.shkafka.tools.ConsumerOffsetChecker 
                                --zookeeper localhost:2181 --group test--topic 
                                testKJ1    
                                ./kafka-consumer-offset-checker.sh --zookeeper192.168.0.201:2181 
                                --group group1 --topic group1  | 
                           
                         
                    
                          
                            三、 Apache Kafka – 核心原理 
                           负载均衡 
                          负载均衡的两种策略(消费端配置) 
                           
                              |  partition.assignment.strategy=range|round-robin | 
                           
                         
                          
                          
                            在kafka中partition分发消息给消费者不是已消费为力度进行分配的,是以消费者线程为力度进行分配的。 
                            
                          Range 
                          kafka中的每个topic的分区是独立进行分配的,topic间不受到任何影响。 
                          topic中先是对partition进行数字排序,线程按照字典排序。 
                          接下来用分区的数量除以线程数量就是每个线程能够分到的消息数量 
                          partition_per_thread= 分区数量/线程数量  
                          如果整除了,那么每个线程依次分配partition_per_thread个分区 
                          如果不整除,低位的几个thread会多消费分区 
                          如果分区个数少于线程数量,就会出现线程空闲的时候,因为kafka会保证一个分区只能被一个消费者进行消费。所以建议在配置的时候分区数量和消费者线程数量相等最好。 
                          Round-robin 
                          在kafka中一个消费者组是可以订阅多个topic的。当订阅了多个topic后,他内部会把所有topic进行混乱以后再按照range策略走一遍,他会保证每个topic在consumer中的线程数量必须相等。 
                          备注 
                          一般应用range的比较多,如果consumer组中有个线程shutdown了,那么kafka会自动的重新进行负载均衡的分配。这个负载均衡增加了下游的消费能力。而且非常方便的进行消费者的扩展。当然kafka也可以去除这样的负载均衡策略,默认消费端分为high 
                            level的客户端(启用负载均衡机制)和simple的客户端(不启用负载均衡,需要自己决定消费哪个分区的消息)。 
                           主从及副本分布 
                          kafka的主从主要提供了分区容错的能力,可以配置一个leader和若干follower,leader是处理消息,而follower只是leader的一个备份,平常的连接都是连在leader上的。当leader宕机以后,kafka会从follower中选举一台leader来进行服务。 
                            
                          对于第R个副本,先随机取一个broker放分区0,然后顺序放其他分区。这样保证了leader和follower均匀的分布在了每个broker上。 
                          通过-topic命令可以查看指定topic的分区和副本的分布情况 
                             
                          topic:topic名称 
                          partition:分区名称 
                          leader:此分区的leader在哪个broker上 
                          replicas:所有的副本分布在哪个broker上 
                          isr:replicas中所有in-sync的节点 
                          对于in-sync 
                          节点必须可以维护和zookeeper的连接,zookeeper通过心跳机制检查每个节点的连接。 
                          如果节点是个follower。他必须能及时的同步leader的写操作,延时不能太久。 
                          设置方式 
                          replica.lag.max.messages:落后的消息数量 
                          replica.lag.time.max.ms:卡住的时间 
                          kafka是通过这两个参数去判断是不是一个有效的副本follower。当leader宕机以后,是从这些有效副本中进行选举的。无效的是不参加选举的。 
                           kafka的持久化 
                          消息格式 
                          kafka的消息格式如图 
                           
                          文件系统 
                          kafka会将消息组织到硬盘上,在broker的数据目录中会有以topic名称-分区号命名的文件夹, 
                          在文件夹中存在成对出现的文件。kafka不是将所有消息放到一个大文件里,而是根据消息的offset进行了分段。每一个段内放多少消息是可以配置的。文件名字代表此文件中的第一个数据的offset。index为索引文件,log为数据文件,存放的消息格式见上图。对于index文件维护的是一个稀疏索引,由消息的编号指向物理偏移,运行时会被加载到内存。 
                             
                          过期数据清理 
                          kafka既然支持了持久化,他对磁盘空间是有要求的。对于删除过期数据kafka提供了两种策略 
                          1、默认策略为直接删除 
                          l 超过指定的时间的消息: 
                          log.retention.hours=168 
                          l 超过指定大小后,删除旧的消息: 
                          log.retention.bytes=1073741824 
                          2、压缩(只在特定的业务场景下有意义) 
                          全局:log.cleaner.enable=true 
                          在特定的topic上:log.cleanup.policy=compact 
                          保留每个key最后一个版本的信息,若最后一个版本消息内容为空,这个key被删除 
                           
  |