AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

kafka

Kafka

是什么

kafka是一个开源的分布式事件流平台
高吞吐分布式消息队列
用于构建流式数据分析高性能应用程序

kafka集群架构

核心名词

  • Broker 代理人
    kafka启动起来的Jvm进程实例
    在同一个kafka集群内,每个broker有一个全局唯一的id

  • topic 主题
    kafka集群内的一个逻辑概念
    kafka的生产者和消费者在生产消费时都需要指定topic

  • partition 分区
    每个topic在创建时可以指定包含的分区个数
    分区本质上就是一个目录
    目录的名称 topic名-分区编号

  • offset偏移量
    消息写入分区时,会记录一个单调递增的序号称之为offset

  • replication 副本
    每个topic在创建时可以指定副本的个数
    副本个数是指分区在kafka集群中存储的份数
    同一个topic的同一个分区的多个副本之间会选举产生1个leader和若干个follower
    leader负责接收客户端的读写请求
    follower负责同步leader的数据

  • producer 生产者
    生产者可以是一个命令行客户端,或者是一个Java/Scala程序
    负责将消息发送到kafka集群

  • consumer 消费者
    消费者可以是一个命令行客户端,或者是一个Java/Scala程序
    负责从kafka集群读取消息

  • consumer 消费者组
    多个消费者线程可以使用相同的group.id,同组消费者共享offset记录

kafka集群搭建

  • 前提:
    kafka需要使用zk集群进行集群管理,并且完成副本之间的选举

  • 关于版本

    1. 最新版本 3.0.0
    2. 生产环境流行版本 0.8 0.10 0.11 1.1.x 2.1.x
    3. 当前: 2.4.1 同时兼容scala 2.11、2.12、2.13
  1. 下载解压
  2. 修改配置文件

    • server.properties
# 为每个broker设置一个唯一的整数id
broker.id=51
# 设置当前broker监听的主机名和端口
listeners = PLAINTEXT://host01:9092
# Kafka中有两类日志
# 1.与其他应用程序相似,运行时会记录运行状态,运行日志
# 2.用来存储生产者发送的消息内容的文件,数据日志
# 可以将数据日志文件存储在kafka的安装目录中
log.dirs=/opt/kafka-2.4.1/kafka-logs
# kafka中数据保存的生命周期,超过时间会自动滚动删除
log.retention.hours=168
# 当数据日志文件到达设置的大小时会自动滚动生成新的文件
#  默认分段大小为1G
log.segment.bytes=1073741824
# 设置kafka使用的zk集群地址
zookeeper.connect=host01:2181,host02:2181,host03:2181
  1. 分发安装包

    • 为每台机器修改broker.id和listeners
  2. 添加环境变量

echo 'export KFK_HOME=/opt/kafka-2.4.1' >> /etc/profile
echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动
kafka-server-start.sh -daemon /opt/kafka-2.4.1/config/server.properties

kafka操作

1. 命令操作

  • topic相关
# 创建3个分区两个副本的topic
kafka-topics.sh \
--bootstrap-server host01:9092 \
--create \
--topic topic01 \
--partitions 3 \
--replication-factor 2
# 查看可用的topic列表
kafka-topics.sh \
--bootstrap-server host01:9092 \
--list
# 查看topic详细信息
kafka-topics.sh \
--bootstrap-server host01:9092 \
--describe \
--topic topic01
  • 控制台生产者
kafka-console-producer.sh
--broker-list host02:9092 \
--topic topic01
  • 控制台消费者
kafka-console-consumer.sh \
--bootstrap-server host03:9092 \
--topic topic01 \
--from-beginning
kafka-producer-perf-test.sh \
--topic topic02 \
--num-records 1000000 \
--throughput -1 \
--producer-props bootstrap.servers=host01:9092 \
--payload-file /root/log/access.log-20211014

2. 可视化工具操作

  • kafka-eagle是一个第三方的kafka集群可视化工具

  • 官网https://www.kafka-eagle.org/

  • 安装

    1. 下载解压
    2. 修改配置文件

      • system-config.properties
# 设置ke连接的kafka集群的zk地址
efak.zk.cluster.alias=cluster1
cluster1.zk.list=host01:2181,host02:2181,host03:2181
# 设置保存统计数据使用的mysql数据库
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://host01:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
  1. 配置ke的环境变量
echo 'export KE_HOME=/opt/ke-2.0.7' >> /etc/profile
echo 'export PATH=$KE_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动
ke.sh start

3. api操作