安装kafka
下载
mkdir -p /opt/kafka && cd /opt/kafka wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
|
解压
tar -zxvf kafka_2.12-2.5.0.tgz
|
配置环境变量
vi /etc/profile
#将下面的代码放到末尾 #set kafka environment export KAFKA_HOME=/opt/kafka/kafka_2.12-2.5.0 export PATH=$PATH:$KAFKA_HOME/bin
|
重新加载环境变量
配置
vi /opt/kafka/kafka_2.12-2.5.0/config/server.properties
|
# (1).配置 broker 的ID broker.id=1 # 第一个kafka配置为 1,第二个配置为2,以此类推 # (2).打开监听端口 # 尽量写ip地址,以免造成错误 listeners=PLAINTEXT://10.211.55.3:9092 # (3).修改 log 的目录、在指定的位置创建好文件夹logs log.dirs=/opt/kafka/kafka_2.12-2.5.0/logs # (4).修改 zookeeper.connect,尽量写ip地址,以免造成错误 zookeeper.connect=10.211.55.3:2181 # (5).网络线程数量 num.network.threads=3 # (6).Zookeeper每6秒监视kafka是否还活着(默认) zookeeper.connection.timeout.ms=6000
|
启动
启动kafka之前需要先启动zookeeper,然后再启动kafka
#启动zookeeper zkServer.sh start
#启动kafka kafka-server-start.sh $KAFKA_HOME/config/server.properties & 打印日志启动 kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties & 不打印日志启动
#通过jps查看是否启动 [root@nmk kafka]# jps 2049 QuorumPeerMain 3496 Jps 3101 Kafka
|
命令
#创建topic kafka-topics.sh --create --zookeeper 10.211.55.3:2181 --replication-factor 1 --partitions 1 --topic test
#列出kafka [root@nmk kafka]# kafka-topics.sh --list --zookeeper 10.211.55.3:2181 test #有刚刚的测试topic名,表示创建成功
#如果需要查看topic的详细信息,需要使用describe命令 kafka-topics.sh --describe --zookeeper 10.211.55.3:2181--topic test-topic
#若不指定topic,则查看所有topic的信息 kafka-topics.sh --describe --zookeeper 10.211.55.3:2181
#删除topic kafka-topics.sh --delete --zookeeper 10.211.55.3:2181 --topic nmk
#查看消费者组 kafka-consumer-groups.sh --bootstrap-server 10.211.55.3:9092 --list
#创建消费者组的两种方式 1.kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --topic nmk --consumer.config config/consumer.properties
2.kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --topic nmk --group nmkgroup
#生产者操作 kafka-console-producer.sh --broker-list 10.211.55.3:9092 --topic nmk
#消费者操作
# 通过以上命令,可以看到消费者可以接收生产者发送的消息 kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --topic nmk # 如果需要从头开始接收数据,需要添加--from-beginning参数 kafka-console-consumer.sh --bootstrap-server 10.211.55.3:9092 --from-beginning --topic nmk
#不同版本的kafka操作版本不同.高版本可使用--bootstrap-server 低版本仅支持--zookeeper
|
Kafka管理工具
kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。具体支持以下内容:
- 管理多个集群
- 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发)
- 运行首选副本选举
- 使用选项生成分区分配以选择要使用的代理
- 运行分区重新分配(基于生成的分配)
- 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置)
- 删除主题(仅支持0.8.2+并记住在代理配置中设置delete.topic.enable = true)
- 主题列表现在指示标记为删除的主题(仅支持0.8.2+)
- 批量生成多个主题的分区分配,并可选择要使用的代理
- 批量运行重新分配多个主题的分区
- 将分区添加到现有主题
- 更新现有主题的配置
kafka-manager 项目地址:https://github.com/yahoo/CMAK
下载
cd /opt/kafka/ wget https://github.com/yahoo/CMAK/releases/download/3.0.0.4/cmak-3.0.0.4.zip mv cmak-3.0.0.4.zip kafka-manager-3.0.0.4.zip unzip kafka-manager-3.0.0.4.zip
|
配置
vi conf/application.conf #修改kafka-manager.zkhosts列表为自己的zk节点 kafka-manager.zkhosts="10.211.55.3"
|
启动
bin/kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9001 &
|