CentOS安装kafka

前面写到过在Docker中安装kafka,今天讲讲直接在直接在主机上安装kafka。

1、安装JDK1.8

直接用yum安装,还可以省去配置的麻烦。

yum install -y java-1.8.0-openjdk

2. 下载kafka

去官网:http://kafka.apache.org/downloads.html找到自己需要的版本。用wget直接下载。并解压到 /opt/kafka目录中

wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar zxf kafka_2.12-2.8.0.tgz
mv kafka_2.12-2.8.0 /opt/kafka

2. 单zookeeper单broker

2.1 启动 zookeeper

cd /opt/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

2.2 启动kafka

cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties

2.3 测试

2.3.1 创建topic

使用kafka-topics.sh 创建单分区单副本的topic test

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

2.3.2 消费消息

使用kafka-console-consumer.sh 接收消息并在终端打印

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2.3.3 产生消息

另启一个终端,使用kafka-console-producer.sh 发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

在生产消息这边输入的字符串,在消费消息终端实时显示出来

2.4 配置开机自启

2.4.1 配置zookeeper

新建 /etc/systemd/system/zookeeper.service文件并添加以下内容

vim /etc/systemd/system/zookeeper.service

[Unit]
Description=Zookeeper service
After=network.target

[Service]
Type=simple
User=root
Group=root
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

2.4.2 配置kafka

新建 /etc/systemd/system/kafka.service文件并添加以下内容

vim /etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service

[Service]
Type=simple
User=root
Group=root
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

:: 这里要等zookeeper.service启动后再启动

2.4.2 使配置生效

systemctl enable zookeeper
systemctl enable kafka

2.4.3 启动、停止

# 启动
systemctl start zookeeper
systemctl start kafka

# 停止
systemctl stop zookeeper
systemctl stop kafka

# 查看状态
systemctl status zookeeper
systemctl status kafka

3. 单zookeeper多broker 集群配置

3.1 准备工作

  • zookeeper在 192.168.0.108:2181
  • broker0: 192.168.0.108:9092
  • broker1: 192.168.0.103:9092

3.2 修改kafka配置

这里需要修改/opt/kafka/config/server.properties配置

3.2.1 broker0

vim /opt/kafka/config/server.properties

# 修改broker.id
broker.id=0

# 高可用值改为3,默认是1,多broker时改为3或以上,不然topic的分区副本全在broker0上,干掉broker0无法容灾
offsets.topic.replication.factor=3

# 修改监听
listeners=PLAINTEXT://192.168.0.108:9092

3.2.2 broker1

vim /opt/kafka/config/server.properties

# 修改broker.id
broker.id=1

# 高可用值改为3,默认是1,多broker时改为3或以上,不然topic的分区副本全在broker0上,干掉broker0无法容灾
offsets.topic.replication.factor=3

# 修改监听
listeners=PLAINTEXT://192.168.0.103:9092

# 修改zookeeper地址
zookeeper.connect=192.168.0.108:2181

3.3 启动

先启动zookeeper。再以bin/kafka-server-start.sh config/server.properties命令或配置systemctl方式分辨启动broker0和broker1

3.4 测试

3.4.1 创建

创建2复本(replication-factor 2)、1分区(partitions 1)的topic test2

# 创建
bin/kafka-topics.sh --create --bootstrap-server 192.168.0.108:9092 --replication-factor 2 --partitions 1 --topic test2

# 查看状态
bin/kafka-topics.sh --describe --bootstrap-server 192.168.0.108:9092 --topic test2

3.4.2 消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.103:9092 --from-beginning --topic test2

3.4.3 生产消息

另启动终端发送消息

bin/kafka-console-producer.sh --bootstrap-server 192.168.0.103:9092 --topic test2

3.4.4 测试容灾

这里我把刚才producer 和 consumer 连接的 192.168.0.103节点给干掉,可以看到 consumer的日志打印,已经自动切换到了192.168.0.108节点上,再次测试发送消息接收消息,producer也打印了切换节点的日志

4. 多zookeeper多broker 集群配置

分别在多个节点按上述方式安装Kafka,配置启动多个Zookeeper 实例。

  • zookeeper在 192.168.0.103:2181
  • zookeeper在 192.168.0.108:2181
  • broker0: 192.168.0.108:9092
  • broker1: 192.168.0.103:9092

在每个zookeeper的配置/opt/kafka/config/zookeeper.properties最后添加以下配置

vim /opt/kafka/config/zookeeper.properties

initLimit=5    ##Leader服务器等待Follower启动并完成数据同步的时间,默认值10,表示tickTime的10倍
syncLimit=2    ##Leader服务器和Follower之间进行心跳检测的最大延时时间,默认值5,表示tickTime的5倍
server.1=192.168.0.103:2888:3888
server.2=192.168.0.108:2888:3888

在每个kafka的配置/opt/kafka/config/server.properties修改zookeeper.connect

zookeeper.connect=192.168.0.103:2181,192.168.0.108:2181

测试流程和单zookeeper多broker一样。

5. server.properties配置说明

#broker的全局唯一编号,不能重复
broker.id=0

#用来监听链接IP和端口,producer或consumer将在此建立连接
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.0.108:9092

#处理网络请求的线程数量
num.network.threads=3

#用来处理磁盘IO的线程数量
num.io.threads=8

#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小
socket.request.max.bytes=104857600

#kafka运行日志存放的路径
log.dirs=/tmp/kafka-logs

#topic在当前broker上的分片个数
num.partitions=1

#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 高可用broker值,多broker集群时,这个值建议大于等于3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

#segment文件保留的最长时间,超时将被删除
log.retention.hours=168

#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

#周期性检查文件大小的时间
log.retention.check.interval.ms=300000

#broker需要使用zookeeper连接
zookeeper.connect=192.168.0.108:2181

#zookeeper链接超时时间
zookeeper.connection.timeout.ms=18000

# 空消费组延时,默认为3
group.initial.rebalance.delay.ms=3
0%