kafka 安装笔记

什么是 Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库。

节点信息

序号 IP
1 192.168.56.11
2 192.168.56.12
3 192.168.56.13

每个节点安装 JDK

  • Kafka是java应用,故依赖JDK。下载 openj9 并解压,这里创建一个软连接,方便以后切换版本

    1
    2
    3
    4
    5
    wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u212-b04_openj9-0.14.2/OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz
    wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u212-b04_openj9-0.14.2/OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz.sha256.txt
    shasum -a 512 -c OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz.sha256.txt
    tar -xzf OpenJDK8U-jdk_x64_linux_openj9_8u212b04_openj9-0.14.2.tar.gz -C /usr/share
    ln -s /usr/share/jdk8u212-b04 /usr/share/jdk
  • 若出现 shasum: command not found 报错信息,安装 perl-Digest-SHA 即可

    1
    yum install -y perl-Digest-SHA
  • 添加环境变量

    1
    2
    3
    echo 'export JAVA_HOME=/usr/share/jdk' >> /etc/profile
    echo 'export PATH=$PATH:$JAVA_HOME/bin' >> /etc/profile
    source /etc/profile

每个节点安装 Zookeeper

Kafka 把它的 meta 数据、consumer的消费状态、group的管理以及offset的值都存储在 Zookeeper 上,所以说 Zookeeper 是他的必要存在没有 Zookeeper 没法运行 Kafka;在老版本(0.8.1以前)里面消费段(consumer)也是依赖 Zookeeper 的,在新版本中移除了客户端对 Zookeeper 的依赖,但是broker依然依赖于 Zookeeper。

  • 下载并解压 Zookeeper

    1
    2
    wget http://mirror.azure.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
    tar -xzf zookeeper-3.4.14.tar.gz -C /usr/share
  • 创建日志目录

    1
    mkdir -p /data/zookeeper/{data,logs}
  • 创建 Zookeeper 配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    cat <<EOF > /usr/share/zookeeper-3.4.14/conf/zoo.cfg
    tickTime=2000
    dataDir=/data/zookeeper/data
    dataLogDir=/data/zookeeper/logs
    clientPort=2181
    initLimit=5
    syncLimit=2
    server.1=192.168.56.11:2888:3888
    server.2=192.168.56.12:2888:3888
    server.3=192.168.56.13:2888:3888
    EOF
  • 创建 myid PID文件

    1
    2
    3
    4
    5
    # 每个节点分别执行,以确定PID,标号为server.*,*为/opt/zookeeper-3.4.14/conf/zoo.cfg所写内容
    # 例如server.1=192.168.56.11:2888:3888,那么给ip为192.168.56.11的节点/data/zookeeper/data/myid文件内容填充1
    echo "1" > /data/zookeeper/data/myid
    # echo "2" > /data/zookeeper/data/myid
    # echo "3" > /data/zookeeper/data/myid
  • 创建 systemd service 配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    cat <<EOF > /usr/lib/systemd/system/kafka-zookeeper.service
    [Unit]
    Description=Apache Zookeeper server (Kafka)
    Documentation=http://zookeeper.apache.org
    Requires=network.target remote-fs.target
    After=network.target remote-fs.target

    [Service]
    Environment=JAVA_HOME=/usr/share/jdk
    ExecStart=/opt/zookeeper-3.4.14/bin/zkServer.sh start-foreground
    ExecStop=/opt/zookeeper-3.4.14/bin/zkServer.sh stop
    Restart=always
    StartLimitInterval=0
    RestartSec=10

    [Install]
    WantedBy=multi-user.target
    EOF
  • 启动zookeeper

    1
    2
    3
    systemctl daemon-reload
    systemctl enable kafka-zookeeper
    systemctl start kafka-zookeeper
  • 验证是否启动成功

    1
    PATH=$PATH:$JAVA_HOME/bin /opt/zookeeper-3.4.14/bin/zkServer.sh status

每个节点安装 Kafka

  • 下载并解压 Kafka

    1
    2
    wget http://mirror.azure.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
    tar -xzf kafka_2.12-2.2.1.tgz -C /usr/share
  • 创建数据目录

    1
    mkdir -p /data/kafka/
  • 各节点分别创建kafka配置文件

    • 节点1

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      cat <<EOF > /usr/share/kafka_2.12-2.2.1/config/server.properties
      broker.id=0
      delete.topic.enable=true
      listeners=PLAINTEXT://0.0.0.0:9092
      advertised.listeners=PLAINTEXT://192.168.56.11:9092
      log.dirs=/data/kafka/
      zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
      unclean.leader.election.enable=false
      num.partitions=3
      default.replication.factor=3
      message.max.bytes=6525000
      socket.request.max.bytes=104857600
      log.retention.hours=24
      EOF
    • 节点2

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      cat <<EOF > /usr/share/kafka_2.12-2.2.1/config/server.properties
      broker.id=1
      delete.topic.enable=true
      listeners=PLAINTEXT://0.0.0.0:9092
      advertised.listeners=PLAINTEXT://192.168.56.12:9092
      log.dirs=/data/kafka/
      zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
      unclean.leader.election.enable=false
      num.partitions=3
      default.replication.factor=3
      message.max.bytes=6525000
      socket.request.max.bytes=104857600
      log.retention.hours=24
      EOF
    • 节点3

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      cat <<EOF > /usr/share/kafka_2.12-2.2.1/config/server.properties
      broker.id=2
      delete.topic.enable=true
      listeners=PLAINTEXT://0.0.0.0:9092
      advertised.listeners=PLAINTEXT://192.168.56.13:9092
      log.dirs=/data/kafka/
      zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
      unclean.leader.election.enable=false
      num.partitions=3
      default.replication.factor=3
      message.max.bytes=6525000
      socket.request.max.bytes=104857600
      log.retention.hours=24
      EOF
  • 各节点创建kafka配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    cat <<EOF > /usr/lib/systemd/system/kafka.service
    [Unit]
    Description=Apache Kafka server (broker)
    Documentation=http://kafka.apache.org/documentation.html
    Requires=network.target remote-fs.target kafka-zookeeper.service
    After=network.target remote-fs.target kafka-zookeeper.service

    [Service]
    Environment=JAVA_HOME=/opt/jdk8u212-b03
    ExecStart=/usr/share/kafka_2.12-2.2.1/bin/kafka-server-start.sh /usr/share/kafka_2.12-2.2.1/config/server.properties
    # 这里是为了在数据不重要时确保能够正常启动才这么做的,生产环境请删除此项
    ExecStartPre=/bin/rm -rf /data/kafka/*
    ExecStop=-/usr/share/kafka_2.12-2.2.1/bin/kafka-server-stop.sh
    # 这里是为了在数据不重要时确保能够正常启动才这么做的,生产环境请删除此项
    ExecStopPost=/bin/rm -rf /data/kafka/*
    Restart=always
    StartLimitInterval=0
    RestartSec=10

    [Install]
    WantedBy=multi-user.target
    EOF
  • 启动kafka

    1
    2
    3
    systemctl daemon-reload
    systemctl enable kafka
    systemctl start kafka
  • 验证

    1
    2
    3
    /usr/share/kafka_2.12-2.2.1/bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --replication-factor 3 --partitions 1 --topic my-test
    /usr/share/kafka_2.12-2.2.1/bin/kafka-console-producer.sh --broker-list 192.168.56.11:9092,192.168.56.12:9092,192.168.56.13:9092 --topic my-test
    /usr/share/kafka_2.12-2.2.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.11:9092,192.168.56.12:9092,192.168.56.13:9092 --from-beginning --topic my-test
setzero wechat