



(1)Kafka架构是由producer(消息生产者)、consumer(消息消费者)、borker(kafka集群的server,负责处理消息读、写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker)、topic(消息队列/分类相当于队列,里面有生产者和消费者模型)、zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成。





kafka中有一个被称为优先副本(preferred replicas)的概念。如果一个分区有3个副本,且这3个副本的优先级别分别为0,1,2,根据优先副本的概念,0会作为leader 。当0节点的broker挂掉时,会启动1这个节点broker当做leader。当0节点的broker再次启动后,会自动恢复为此partition的leader。不会导致负载不均衡和资源浪费,这就是leader的均衡机制。

在配置文件conf/ server.properties中配置开启(默认就是开启):

auto.leader.rebalance.enable true






   在node01 /opt/bigdata/下 解压 tar -zxvf zookeeper-3.4.6.tar.gz


         配置hosts vim /etc/hosts

1. node03
2. node02
3. node01


1.  export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.6 #zookeeper安装路径 


在/opt/bigdata下,复制cp zookeeper-3.4.5/conf/zoo_sample.cfg  zookeeper-3.4.5/conf/zoo.cfg 

编辑:vim zookeeper-3.4.5/conf/zoo.cfg 

1.  # The number of milliseconds of each tick
2.  tickTime=2000
3.  # The number of ticks that the initial 
4.  # synchronization phase can take
5.  initLimit=10
6.  # The number of ticks that can pass between 
7.  # sending a request and getting an acknowledgement
8.  syncLimit=5
9.  # the directory where the snapshot is stored.
10.  # do not use /tmp for storage, /tmp here is just 
11.  # example sakes.
12.  dataDir=/opt/bigdata/data/zookeeper/zkdata #zookeeper数据存放路径
13.  dataLogDir=/opt/bigdata/data/zookeeper/zkdatalog #zookeeper日志存放路径
14.  # the port at which the clients will connect
15.  clientPort=2181        ##zookeeper对外通信端口

17.  server.1=node01:2888:3888  
18.  server.2=node02:2888:3888  
19.  server.3=node03:2888:3888 
20.  #
21.  # Be sure to read the maintenance section of the 
22.  # administrator guide before turning on autopurge.
23.  #
24.  # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
25.  #
26.  # The number of snapshots to retain in dataDir
27.  #autopurge.snapRetainCount=3
28.  # Purge task interval in hours
29.  # Set to "0" to disable auto purge feature
30.  #autopurge.purgeInterval=1


vim myid 新建myid文件,内容分别为1、2、3保存


在node01 /opt/bigdata下 scp远程复制,分别分发到node02、node03对应目录下

1.  scp -r zookeeper-3.4.6 node02:`pwd`
2.  scp -r zookeeper-3.4.6 node03:`pwd`


分别在node01、node02、node03下执行 zkServer.sh start命令启动zookeeper

稍等片刻,分别在node01、node02、node03下执行zkServer.sh status命令,查看状态

1.  [root@node01 ~]# zkServer.sh status
2.  JMX enabled by default
3.  Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
4.  Mode: leader
5.  [root@node02 bigdata]# zkServer.sh status
6.  JMX enabled by default
7.  Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
8.  Mode: follower
9.  [root@node03 ~]# zkServer.sh status
10.  JMX enabled by default
11.  Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
12.  Mode: follower



wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz 去下载

在node01上  /opt/bigdata/下 解压

tar zxvf kafka_2.11-1.1.0.tgz  


在/opt/bigdata/下 vim kafka_2.11-1.1.0/config/server.properties编辑配置




1.  ############################# Server Basics #############################
2.  # The id of the broker. This must be set to a unique integer for each broker.
3.  broker.id=1
4.  ############################# Socket Server Settings #############################
5.  listeners=PLAINTEXT://:9092

7.  # The port the socket server listens on
8.  #port=9092

10.  # Hostname the broker will bind to. If not set, the server will bind to all interfaces
11.  #host.name=localhost

13.  # Hostname the broker will advertise to producers and consumers. If not set, it uses the
14.  # value for "host.name" if configured.  Otherwise, it will use the value returned from
15.  # java.net.InetAddress.getCanonicalHostName().
16.  #advertised.host.name=<hostname routable by clients>

18.  # The port to publish to ZooKeeper for clients to use. If this is not set,
19.  # it will publish the same port that the broker binds to.
20.  #advertised.port=<port accessible by clients>

22.  # The number of threads handling network requests
23.  num.network.threads=3

25.  # The number of threads doing disk I/O
26.  num.io.threads=8

28.  # The send buffer (SO_SNDBUF) used by the socket server
29.  socket.send.buffer.bytes=102400

31.  # The receive buffer (SO_RCVBUF) used by the socket server
32.  socket.receive.buffer.bytes=102400

34.  # The maximum size of a request that the socket server will accept (protection against OOM)
35.  socket.request.max.bytes=104857600

37.  ############################# Log Basics #############################
38.  # A comma seperated list of directories under which to store log files
39.  log.dirs=/opt/bigdata/kafka_2.11-1.1.0/kafka-logs

41.  # The default number of log partitions per topic. More partitions allow greater
42.  # parallelism for consumption, but this will also result in more files across
43.  # the brokers.
44.  num.partitions=1

46.  # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
47.  # This value is recommended to be increased for installations with data dirs located in RAID array.
48.  num.recovery.threads.per.data.dir=1

50.  ############################# Log Flush Policy #############################
51.  # The number of messages to accept before forcing a flush of data to disk
52.  #log.flush.interval.messages=10000

54.  # The maximum amount of time a message can sit in a log before we force a flush
55.  #log.flush.interval.ms=1000

57.  ############################# Log Retention Policy #############################
58.  # The following configurations control the disposal of log segments. The policy can
59.  # be set to delete segments after a period of time, or after a given size has accumulated.
60.  # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
61.  # from the end of the log.

63.  # The minimum age of a log file to be eligible for deletion
64.  log.retention.hours=168

66.  # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
67.  # segments don't drop below log.retention.bytes.
68.  #log.retention.bytes=1073741824

70.  # The maximum size of a log segment file. When this size is reached a new log segment will be created.
71.  log.segment.bytes=1073741824

73.  # The interval at which log segments are checked to see if they can be deleted according 
74.  # to the retention policies
75.  log.retention.check.interval.ms=300000

77.  # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
78.  log.cleaner.enable=false

80.  ############################# Zookeeper #############################
81.  zookeeper.connect=node01:2181,node02:2181,node03:2181

83.  # Timeout in ms for connecting to zookeeper
84.  zookeeper.connection.timeout.ms=6000


在 /opt/bigdata下 远程复制到node01、node02对应路径下,当然复制时需要ssh免登录

1.  scp -r  kafka_2.11-1.1.0 node02:`pwd`
2.  scp -r  kafka_2.11-1.1.0 node03:`pwd`




在/opt/bigdata下 ,三个节点分别执行如下命令,启动kafka集群

./kafka_2.11-1.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-1.1.0/config/server.properties &



./kafka_2.11/bin/kafka-topics.sh --create --zookeeper node02:2181,node03:2181,node04:2181 --replication-factor 3 --partitions 6 --topic kfk_test


./kafka_2.11/bin/kafka-topics.sh --list --zookeeper node02:2181,node03:2181,node04:2181


./kafka_2.11/bin/kafka-console-producer.sh -broker-list node02:9092,node03:9092,node04:9092 --topic kfk_test


1.  kafka 0.9版本之前用zookeeper 
2.  ./kafka_2.11/bin/kafka-console-consumer.sh --zookeeper node02:2181,node03:2181,node04:2181 --from-beginning --topic kfk_test

4.  kafka 0.9版本之后不推荐zookeeper方式,仍然支持,但逐渐会被取消,推荐bootstrap-server方式
5.  ./kafka_2.11/bin/kafka-console-consumer.sh --bootstrap-server node02:9092,node03:9092,node04:9092 --from-beginning --topic kfk_test


./kafka_2.11/bin/kafka-topics.sh --describe --zookeeper node02:2181,node03:2181,node04:2181 --topic kfk_test


1.  Topic:kfk_test  PartitionCount:6    ReplicationFactor:3    Configs:
2.  Topic: kfk_test     Partition: 0    Leader: 1    Replicas: 1,3,2    Isr: 2,3,1
3.  Topic: kfk_test     Partition: 1    Leader: 2    Replicas: 2,1,3    Isr: 2,3,1
4.  Topic: kfk_test     Partition: 2    Leader: 3    Replicas: 3,2,1    Isr: 2,3,1
5.  Topic: kfk_test     Partition: 3    Leader: 1    Replicas: 1,2,3    Isr: 2,3,1
6.  Topic: kfk_test     Partition: 4    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
7.  Topic: kfk_test     Partition: 5    Leader: 3    Replicas: 3,1,2    Isr: 2,3,1


partiton: partion id  分区id
leader:当前负责读写的lead broker id ,就是server.properties的broker.id
replicas:当前partition的所有replication broker  list 
isr:(in-sync replicas)relicas的子集,只包含出于活动状态的broker,离线或挂掉的broker不在此列表



./kafka_2.11/bin/kafka-topics.sh --delete --zookeeper node02:2181,node03:2181,node04:2181 --topic kfk_test


1.  rmr /brokers/topics/kfk_test 

3.  rmr /config/topics/kfk_test 

5.  rmr /admin/delete_topics/kfk_test

7.  rmr /consumers/kfk_test-group


rm -rf /var/local/kafka/data/kfk_test*