什么是 Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库。
节点信息
序号 | IP |
---|---|
1 | 192.168.56.11 |
2 | 192.168.56.12 |
3 | 192.168.56.13 |
每个节点安装 JDK
Kafka是java应用,故依赖JDK。下载 openj9 并解压,这里创建一个软连接,方便以后切换版本
1
2
3
4
5wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u212-b04_openj9-0.14.2/OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz
wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u212-b04_openj9-0.14.2/OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz.sha256.txt
shasum -a 512 -c OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz.sha256.txt
tar -xzf OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz -C /usr/share
ln -s /usr/share/jdk8u212-b04 /usr/share/jdk若出现
shasum: command not found
报错信息,安装perl-Digest-SHA
即可1
yum install -y perl-Digest-SHA
添加环境变量
1
2
3echo 'export JAVA_HOME=/usr/share/jdk' >> /etc/profile
echo 'export PATH=$PATH:$JAVA_HOME/bin' >> /etc/profile
source /etc/profile
每个节点安装 Zookeeper
Kafka 把它的 meta 数据、consumer的消费状态、group的管理以及offset的值都存储在 Zookeeper 上,所以说 Zookeeper 是他的必要存在没有 Zookeeper 没法运行 Kafka;在老版本(0.8.1以前)里面消费段(consumer)也是依赖 Zookeeper 的,在新版本中移除了客户端对 Zookeeper 的依赖,但是broker依然依赖于 Zookeeper。
下载并解压 Zookeeper
1
2wget http://mirror.azure.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
tar -xzf zookeeper-3.4.14.tar.gz -C /usr/share创建日志目录
1
mkdir -p /data/zookeeper/{data,logs}
创建 Zookeeper 配置文件
1
2
3
4
5
6
7
8
9
10
11cat <<EOF > /usr/share/zookeeper-3.4.14/conf/zoo.cfg
tickTime=2000
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.56.11:2888:3888
server.2=192.168.56.12:2888:3888
server.3=192.168.56.13:2888:3888
EOF创建 myid PID文件
1
2
3
4
5# 每个节点分别执行,以确定PID,标号为server.*,*为/opt/zookeeper-3.4.14/conf/zoo.cfg所写内容
# 例如server.1=192.168.56.11:2888:3888,那么给ip为192.168.56.11的节点/data/zookeeper/data/myid文件内容填充1
echo "1" > /data/zookeeper/data/myid
# echo "2" > /data/zookeeper/data/myid
# echo "3" > /data/zookeeper/data/myid创建 systemd service 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18cat <<EOF > /usr/lib/systemd/system/kafka-zookeeper.service
[Unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Environment=JAVA_HOME=/usr/share/jdk
ExecStart=/opt/zookeeper-3.4.14/bin/zkServer.sh start-foreground
ExecStop=/opt/zookeeper-3.4.14/bin/zkServer.sh stop
Restart=always
StartLimitInterval=0
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF启动zookeeper
1
2
3systemctl daemon-reload
systemctl enable kafka-zookeeper
systemctl start kafka-zookeeper验证是否启动成功
1
PATH=$PATH:$JAVA_HOME/bin /opt/zookeeper-3.4.14/bin/zkServer.sh status
每个节点安装 Kafka
下载并解压 Kafka
1
2wget http://mirror.azure.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzf kafka_2.12-2.2.1.tgz -C /usr/share创建数据目录
1
mkdir -p /data/kafka/
各节点分别创建kafka配置文件
节点1
1
2
3
4
5
6
7
8
9
10
11
12
13
14cat <<EOF > /usr/share/kafka_2.12-2.2.1/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.56.11:9092
log.dirs=/data/kafka/
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
unclean.leader.election.enable=false
num.partitions=3
default.replication.factor=3
message.max.bytes=6525000
socket.request.max.bytes=104857600
log.retention.hours=24
EOF节点2
1
2
3
4
5
6
7
8
9
10
11
12
13
14cat <<EOF > /usr/share/kafka_2.12-2.2.1/config/server.properties
broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.56.12:9092
log.dirs=/data/kafka/
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
unclean.leader.election.enable=false
num.partitions=3
default.replication.factor=3
message.max.bytes=6525000
socket.request.max.bytes=104857600
log.retention.hours=24
EOF节点3
1
2
3
4
5
6
7
8
9
10
11
12
13
14cat <<EOF > /usr/share/kafka_2.12-2.2.1/config/server.properties
broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.56.13:9092
log.dirs=/data/kafka/
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
unclean.leader.election.enable=false
num.partitions=3
default.replication.factor=3
message.max.bytes=6525000
socket.request.max.bytes=104857600
log.retention.hours=24
EOF
各节点创建kafka配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22cat <<EOF > /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target kafka-zookeeper.service
After=network.target remote-fs.target kafka-zookeeper.service
[Service]
Environment=JAVA_HOME=/opt/jdk8u212-b03
ExecStart=/usr/share/kafka_2.12-2.2.1/bin/kafka-server-start.sh /usr/share/kafka_2.12-2.2.1/config/server.properties
# 这里是为了在数据不重要时确保能够正常启动才这么做的,生产环境请删除此项
ExecStartPre=/bin/rm -rf /data/kafka/*
ExecStop=-/usr/share/kafka_2.12-2.2.1/bin/kafka-server-stop.sh
# 这里是为了在数据不重要时确保能够正常启动才这么做的,生产环境请删除此项
ExecStopPost=/bin/rm -rf /data/kafka/*
Restart=always
StartLimitInterval=0
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF启动kafka
1
2
3systemctl daemon-reload
systemctl enable kafka
systemctl start kafka验证
1
2
3/usr/share/kafka_2.12-2.2.1/bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --replication-factor 3 --partitions 1 --topic my-test
/usr/share/kafka_2.12-2.2.1/bin/kafka-console-producer.sh --broker-list 192.168.56.11:9092,192.168.56.12:9092,192.168.56.13:9092 --topic my-test
/usr/share/kafka_2.12-2.2.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.11:9092,192.168.56.12:9092,192.168.56.13:9092 --from-beginning --topic my-test