AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

kafka

kafka

一、是什么

分布式消息系统
分布式流平台
kafka是apache开源的 分布式事件流平台

二、核心原理

1. broker 代理人

kafka服务启动后的jvm主进程叫做Broker

2. topic 主题

kafka为了将存储的数据进行逻辑划分,设计了topic
生产和消费前,都需要指定topic

3. partition 分区

每个topic创建时可以指定包含的分区个数,默认为1个分区
分区是存储在每台broker机器上一个文件夹
分区文件夹的命名 topic名-分区编号

4. replication-factor 副本因子

每个topic创建时可以指定副本的个数,默认为1个副本
副本数据不能超过当前可用的broker数量
相同主题的相同分区编号的多个副本之间会选举产生一个leader和多个follower
leader:负责处理客户端(生产者、消费者)请求
follower:负责实时拉取同步leader数据

5. offset 偏移量(消息编号)

每个分区内部,对消息从0开始进行编号,这个编号称之为offset,offset单调递增
kafka无法保证数据的全局有序,只能做到分区内有序
通常生产环境在生产者发送消息前会根据数据的key对分区数据量取模,将数据指定分区进行发送
为了避免数据倾斜,会对大key做二次hash

6. producer 生产者

是kafka的一类客户端,负责将需要保存的数据封装为ProducerRecord,发送到kafka集群
如果不指定发送的具体分区,则轮询发送到所有分区

7. consumer 消费者

是kafka的一类客户端,负责读取kafka数据,读取的数据为ConsumerRecord,解析消费者记录对象获取内部的数据进行其他处理

消费者 \
消费者 - 组 - 读取进度
消费者 /

消费者 - 组 - 读取进度

同组消费者共享一份读取进度(offset记录)
topic的分区数量会影响消费者的并发度

一个分区,同一时刻只能被同组消费者中的一个实例读取

存储原理

  • kafka为每个分区创建三类文件进行数据的存储
    • index 用于存储消息的offset,消息在log文件中开始位置,消息在log文件中的长度
    • log 用于顺序的将message写入到log中
    • timeindex 用于存储消息的offset,消息的时间戳

三、集群搭建

  • 版本:
    • 生产环境主流: 0.11.0.1 1.1.x 2.x
    • 最新版本: 3.x
    • 当前: 2.x的最新版 2.8.1
  1. 下载解压
  2. 修改配置文件

    server.properties

# 每个broker设置集群内部全局唯一的整数值作为id
broker.id=91
# 设置每个broker服务的主机名和端口
listeners=PLAINTEXT://bd0901:9092
# 设置kafka保存message使用的log文件存储的路径
# kafka中除了server运行时会打印的运行日志外
#   kafka用来存储message的文件也叫做log
#  这里设置的是 kafka用来存储message的log文件使用的路径
log.dirs=/opt/kafka-2.8.0/kafka-logs
# 设置kafka中数据的生命周期,超时数据会被kafka自动移除
log.retention.hours=168
# 设置kafka log文件的分割大小
log.segment.bytes=1073741824
# 设置kafka需要连接的zk集群地址
zookeeper.connect=bd0901:2181,bd0902:2181,bd0903:2181
  1. 分发安装包

    • 分别修改每个机器的
      broker.id
      listeners
  2. 添加环境变量

echo 'export KFK_HOME=/opt/kafka-2.8.0' >> /etc/profile
echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动测试
kafka-server-start.sh -daemon /opt/kafka-2.8.0/config/server.properties

四、kafka操作

1. 命令行

  • topic相关
# 查看topic列表
kafka-topics.sh --list --zookeeper bd0901:2181
# 创建topic
kafka-topics.sh \
--zookeeper bd0901:2181 \
--create \
--topic topic-01 \
--partitions 3 \
--replication-factor 2
# 查看topic详情
kafka-topics.sh \
--describe \
--topic topic01 \
--zookeeper bd0901:2181
  • 消费者相关

  • 生产者相关

2. kafka可视化工具

  • kafka官方没有提供web界面
  • 有很多第三方的kafka可视化工具,可以大大提高工作效率
  1. 下载解压
  2. 修改配置文件
    • 为ke提前创建数据库
    • system-config.properties
# 设置集群别名
efak.zk.cluster.alias=cluster1
# 集群对应的zk连接信息
cluster1.zk.list=bd0901:2181,bd0902:2181,bd0903:2181
# ke数据保存的数据库
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://bd0901:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
  1. 配置环境变量
echo 'export KE_HOME=/opt/efak-web-2.1.0' >> /etc/profile
echo 'export PATH=$KE_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动

3. javaAPI