前面写到过在Docker中安装kafka,今天讲讲直接在直接在主机上安装kafka。
1、安装JDK1.8
直接用yum安装,还可以省去配置的麻烦。
yum install -y java-1.8.0-openjdk
2. 下载kafka
去官网:http://kafka.apache.org/downloads.html找到自己需要的版本。用wget直接下载。并解压到 /opt/kafka
目录中
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar zxf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0 /opt/kafka
2. 单zookeeper单broker
2.1 启动 zookeeper
cd /opt/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2.2 启动kafka
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties
2.3 测试
2.3.1 创建topic
使用kafka-topics.sh 创建单分区单副本的topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
2.3.2 消费消息
使用kafka-console-consumer.sh 接收消息并在终端打印
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2.3.3 产生消息
另启一个终端,使用kafka-console-producer.sh 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在生产消息这边输入的字符串,在消费消息终端实时显示出来
2.4 配置开机自启
2.4.1 配置zookeeper
新建 /etc/systemd/system/zookeeper.service
文件并添加以下内容
vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=simple
User=root
Group=root
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
2.4.2 配置kafka
新建 /etc/systemd/system/kafka.service
文件并添加以下内容
vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
User=root
Group=root
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
:: 这里要等zookeeper.service启动后再启动
2.4.2 使配置生效
systemctl enable zookeeper
systemctl enable kafka
2.4.3 启动、停止
# 启动
systemctl start zookeeper
systemctl start kafka
# 停止
systemctl stop zookeeper
systemctl stop kafka
# 查看状态
systemctl status zookeeper
systemctl status kafka
3. 单zookeeper多broker 集群配置
3.1 准备工作
- zookeeper在 192.168.0.108:2181
- broker0: 192.168.0.108:9092
- broker1: 192.168.0.103:9092
3.2 修改kafka配置
这里需要修改/opt/kafka/config/server.properties
配置
3.2.1 broker0
vim /opt/kafka/config/server.properties
# 修改broker.id
broker.id=0
# 高可用值改为3,默认是1,多broker时改为3或以上,不然topic的分区副本全在broker0上,干掉broker0无法容灾
offsets.topic.replication.factor=3
# 修改监听
listeners=PLAINTEXT://192.168.0.108:9092
3.2.2 broker1
vim /opt/kafka/config/server.properties
# 修改broker.id
broker.id=1
# 高可用值改为3,默认是1,多broker时改为3或以上,不然topic的分区副本全在broker0上,干掉broker0无法容灾
offsets.topic.replication.factor=3
# 修改监听
listeners=PLAINTEXT://192.168.0.103:9092
# 修改zookeeper地址
zookeeper.connect=192.168.0.108:2181
3.3 启动
先启动zookeeper。再以bin/kafka-server-start.sh config/server.properties
命令或配置systemctl方式分辨启动broker0和broker1
3.4 测试
3.4.1 创建
创建2复本(replication-factor 2)、1分区(partitions 1)的topic test2
# 创建
bin/kafka-topics.sh --create --bootstrap-server 192.168.0.108:9092 --replication-factor 2 --partitions 1 --topic test2
# 查看状态
bin/kafka-topics.sh --describe --bootstrap-server 192.168.0.108:9092 --topic test2
3.4.2 消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.103:9092 --from-beginning --topic test2
3.4.3 生产消息
另启动终端发送消息
bin/kafka-console-producer.sh --bootstrap-server 192.168.0.103:9092 --topic test2
3.4.4 测试容灾
这里我把刚才producer 和 consumer 连接的 192.168.0.103节点给干掉,可以看到 consumer的日志打印,已经自动切换到了192.168.0.108节点上,再次测试发送消息接收消息,producer也打印了切换节点的日志
4. 多zookeeper多broker 集群配置
分别在多个节点按上述方式安装Kafka,配置启动多个Zookeeper 实例。
- zookeeper在 192.168.0.103:2181
- zookeeper在 192.168.0.108:2181
- broker0: 192.168.0.108:9092
- broker1: 192.168.0.103:9092
在每个zookeeper的配置/opt/kafka/config/zookeeper.properties
最后添加以下配置
vim /opt/kafka/config/zookeeper.properties
initLimit=5 ##Leader服务器等待Follower启动并完成数据同步的时间,默认值10,表示tickTime的10倍
syncLimit=2 ##Leader服务器和Follower之间进行心跳检测的最大延时时间,默认值5,表示tickTime的5倍
server.1=192.168.0.103:2888:3888
server.2=192.168.0.108:2888:3888
在每个kafka的配置/opt/kafka/config/server.properties
修改zookeeper.connect
zookeeper.connect=192.168.0.103:2181,192.168.0.108:2181
测试流程和单zookeeper多broker一样。
5. server.properties配置说明
#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接IP和端口,producer或consumer将在此建立连接
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.0.108:9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/tmp/kafka-logs
#topic在当前broker上的分片个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 高可用broker值,多broker集群时,这个值建议大于等于3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824
#周期性检查文件大小的时间
log.retention.check.interval.ms=300000
#broker需要使用zookeeper连接
zookeeper.connect=192.168.0.108:2181
#zookeeper链接超时时间
zookeeper.connection.timeout.ms=18000
# 空消费组延时,默认为3
group.initial.rebalance.delay.ms=3