AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Kafka

概述

高吞吐低延迟的分布式消息队列(流平台)

核心概念

  1. Broker 代理人
    kafka每个节点启动起来的JVM进程称之为Broker

  2. topic 主题
    在对kafka集群进行生产和消费之前,需要先创建topic
    topic是一个对kafka集群全局用来划分消息归类的概念

  3. partition 分区
    每个topic会在创建时指定分区个数,这些属于同一个topic的分区会分散存储在集群的各个节点上,
    分区的本质是一个文件夹,生产者发送给kafka的数据会存储在分区这个文件夹中的文件里

  4. replication 副本
    topic的每个分区在集群中保存的数量
    同一个topic中的编号相同的分区的多个副本 会选举出一个leader,leader负责完成客户端的交互(生产消费)
    其他副本作为follower,负责同步leader的数据
    当节点存货状态发生变化时重新选举

  5. producer
    生产者负责将数据封装成 message,发送到kafka集群

  6. offset 偏移量
    kafka会将生产者写入的每一条数据在分区内存储时,设置一个分区内唯一的offset,从0开始单调递增

  7. consumer
    consumer负责从kafka中拉取message并做其他处理
    consumer拉取时,会先将某个分区的数据全部消费完,再切换下一个分区进行消息

  8. consumer group
    同组消费者共用一份offset记录
    同组消费者不会重复消费同一条消息

集群搭建

0. zookeeper

kafka的broker之间通过zk感知集群在线状态
consumer也需要zk管理topic和offset数据
所以在安装kafka之前需要保证zk集群的正常运行

1. kafka集群搭建

  1. 上传安装包解压改名字

流行版本
0.11 1.x

最新版本
2.x

  1. 修改配置文件
  • server.properties
    # 每个broker设置一个集群内唯一的整数id
    broker.id=0
    # 每个broker设置kafka服务的主机名和端口
    listeners=PLAINTEXT://bd0801:9092
    # 设置保存topic数据的目录
    log.dirs=/opt/kafka-1.1.1/kafka-logs
    # 配置kafka使用的zk集群地址
    zookeeper.connect=bd0801:2181,bd0802:2181,bd0803:2181
  1. 分发安装包

  2. 修改每个节点个性化配置

    broker.id
    listeners
  3. 设置环境变量

    echo 'export KFK_HOME=/opt/kafka-1.1.1' >> /etc/profile
    echo 'export PATH=.:$KFK_HOME/bin:$PATH' >> /etc/profile
    source /etc/profile
  4. 启动服务

kafka-server-start.sh -daemon /opt/kafka-1.1.1/config/server.properties

kafka的命令行操作

  1. topic相关

    • 查看topic列表

      kafka-topics.sh \
      --zookeeper bd0801:2181 \
      --list
    • 创建topic

      kafka-topics.sh --zookeeper bd0801:2181 \
      --create \
      --topic topic-1 \
      --partitions 3 \
      --replication-factor 2
    • 查看topic详情

      kafka-topics.sh --zookeeper bd0801:2181 \
      --topic topic-1 \
      --describe
  2. kafka可视化工具 kafka-eagle的安装和使用

下载解压安装包,修改配置文件

  • system-config.properties
    # 设置需要接管的本地zk(kafka)集群
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=bd0801:2181,bd0802:2181,bd0803:2181
    # 注释掉自带的sqlite配置信息
    # 使用mysql进行数据管理
    kafka.eagle.driver=com.mysql.cj.jdbc.Driver
    kafka.eagle.url=jdbc:mysql://bd0802:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    kafka.eagle.username=root
    kafka.eagle.password=123456
  1. 消费者相关

    • 启动一个控制台消费者进行消费测试
      kafka-console-consumer.sh \
      --bootstrap-server bd0801:9092,bd0802:9092,bd0803:9092 \
      --topic topic-1 \
      --from-beginning
    • 消费者性能测试
  2. 生产者相关

    • 启动一个控制台生产者进行生产测试

      kafka-console-producer.sh  \
      --broker-list  bd0801:9092,bd0802:9092,bd0803:9092 \
      --topic topic-1
    • 生产者性能测试

      kafka-producer-perf-test.sh \
      --topic topic-1 \
      --producer-props bootstrap.servers=bd0801:9092 \
      --num-records 1000000 \
      --throughput 1000000 \
      --payload-file /root/access.log-20190926

kafka的Java/ScalaApi

  • kafka的其他组件

    • kafka-mirror-maker
      进行两个kafka集群之间的数据同步

    • kafka-streams
      kafka提供的一个流式计算模块(性能较差所以生产环境会使用专门的计算框架比如spark、flink进行流式计算)

生产者

  • 老API

  • 新API

消费者

  • 老API

  • 新API