安装kafka

下载

1
2
mkdir -p /opt/kafka && cd /opt/kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

解压

1
tar -zxvf kafka_2.12-2.5.0.tgz

配置环境变量

1
2
3
4
5
6
vi /etc/profile

#将下面的代码放到末尾
#set kafka environment
export KAFKA_HOME=/opt/kafka/kafka_2.12-2.5.0
export PATH=$PATH:$KAFKA_HOME/bin

重新加载环境变量

1
source /etc/profile

配置

1
vi /opt/kafka/kafka_2.12-2.5.0/config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
# (1).配置 broker 的ID
broker.id=1 # 第一个kafka配置为 1,第二个配置为2,以此类推
# (2).打开监听端口
# 尽量写ip地址,以免造成错误
listeners=PLAINTEXT://10.211.55.3:9092
# (3).修改 log 的目录、在指定的位置创建好文件夹logs
log.dirs=/opt/kafka/kafka_2.12-2.5.0/logs
# (4).修改 zookeeper.connect,尽量写ip地址,以免造成错误
zookeeper.connect=10.211.55.3:2181
# (5).网络线程数量
num.network.threads=3
# (6).Zookeeper每6秒监视kafka是否还活着(默认)
zookeeper.connection.timeout.ms=6000

启动

启动kafka之前需要先启动zookeeper,然后再启动kafka

1
2
3
4
5
6
7
8
9
10
11
12
#启动zookeeper
zkServer.sh start

#启动kafka
kafka-server-start.sh $KAFKA_HOME/config/server.properties & 打印日志启动
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties & 不打印日志启动

#通过jps查看是否启动
[root@nmk kafka]# jps
2049 QuorumPeerMain
3496 Jps
3101 Kafka

命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#创建topic
kafka-topics.sh --create --zookeeper 10.211.55.3:2181 --replication-factor 1 --partitions 1 --topic test

#列出kafka
[root@nmk kafka]# kafka-topics.sh --list --zookeeper 10.211.55.3:2181
test
#有刚刚的测试topic名,表示创建成功

#如果需要查看topic的详细信息,需要使用describe命令
kafka-topics.sh --describe --zookeeper 10.211.55.3:2181--topic test-topic

#若不指定topic,则查看所有topic的信息
kafka-topics.sh --describe --zookeeper 10.211.55.3:2181

#删除topic
kafka-topics.sh --delete --zookeeper 10.211.55.3:2181 --topic nmk

#查看消费者组
kafka-consumer-groups.sh --bootstrap-server 10.211.55.3:9092 --list

#创建消费者组的两种方式
1.kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --topic nmk --consumer.config config/consumer.properties

2.kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --topic nmk --group nmkgroup

#生产者操作
kafka-console-producer.sh --broker-list 10.211.55.3:9092 --topic nmk

#消费者操作

# 通过以上命令,可以看到消费者可以接收生产者发送的消息
kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --topic nmk

# 如果需要从头开始接收数据,需要添加--from-beginning参数
kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --from-beginning --topic nmk

#不同版本的kafka操作版本不同.高版本可使用--bootstrap-server 低版本仅支持--zookeeper

Kafka管理工具

kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。具体支持以下内容:

  • 管理多个集群
  • 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发)
  • 运行首选副本选举
  • 使用选项生成分区分配以选择要使用的代理
  • 运行分区重新分配(基于生成的分配)
  • 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置)
  • 删除主题(仅支持0.8.2+并记住在代理配置中设置delete.topic.enable = true)
  • 主题列表现在指示标记为删除的主题(仅支持0.8.2+)
  • 批量生成多个主题的分区分配,并可选择要使用的代理
  • 批量运行重新分配多个主题的分区
  • 将分区添加到现有主题
  • 更新现有主题的配置

kafka-manager 项目地址:https://github.com/yahoo/CMAK

下载

1
2
3
4
cd /opt/kafka/
wget https://github.com/yahoo/CMAK/releases/download/3.0.0.4/cmak-3.0.0.4.zip
mv cmak-3.0.0.4.zip kafka-manager-3.0.0.4.zip
unzip kafka-manager-3.0.0.4.zip

配置

1
2
3
vi conf/application.conf 
#修改kafka-manager.zkhosts列表为自己的zk节点
kafka-manager.zkhosts="10.211.55.3"

启动

1
2
3
4
bin/kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; 
-Dconfig.file=conf/application.conf指定配置文件:

nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9001 &