Kafka
Zookeeper
1. 是什么
分布式集群协调服务
- 使用场景
-
高可用集群的主备切换管理
zk的ZAB zookeeper原子广播,分布式投票选举机制 -
分布式场景配置文件管理
观察者模式 注册监听线程,数据发生变化时进行回调 -
分布式锁
zk维护单调递增的zxid,对数据进行自动编号
2. zk安装
2.1. 单机模式
- zk支持单机模式运行,standalone
- 解压直接启动
2.2. 集群模式
- 下载解压
-
修改配置文件
zoo.cfg
# 配置zk保存数据的路径
dataDir=/opt/zookeeper-3.4.14/data
#配置zk集群列表
#server.A=B:C:D
#A: zk集群中每个节点都有一个全局唯一的整数,叫做myid
#B: myid对应的主机名或者ip地址
#C: 集群节点之间数据同步使用的端口 默认2888
#D: 集群节点之间进行投票使用的端口 默认3888
server.1=bd1001:2888:3888
server.2=bd1002:2888:3888
server.3=bd1003:2888:3888
-
分发安装包
- 为每台机器单独修改myid文件
-
配置环境变量
echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
echo 'export PATH=$ZK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
- 启动服务
zkServer.sh start
3. zk操作
- 使用zk的cli连接服务端
# 启动控制台客户端
# 默认连接localhost:2181
zkCli.sh
# 连接远程zk服务器
zkCli.sh -server host:port
# 查看帮助
help
# 创建znode
# -s创建带序号的znode
# -e创建临时znode
# path在zk中需要使用从/开始的绝对路径
create [-s] [-e] path data
# 查询znode的内容数据和元数据
get path
# 修改znode的内容数据
set path data
# 删除节点
delete path
# 退出
quit
-
可视化工具
- zooinspector
- idea的插件
4. zk核心原理
-
zk集群的选举机制
- zk集群中半数以下机器启动时,每台节点都会通过选举投票端口向集群的其他机器发送选举投票
- zk集群中半数以上机器启动时,所有已启动节点将选票投给myid最大的机器
- 半数以上选票推选的myid主机,当选为leader
- 其他机器作为follower
-
数据模型
-
znode的分类
-
命名方式
- 自定义名称 : 用户创建时指定znode名称
- 带序号的 : 用户创建时,zk会自动根据pZxid为znode名称后添加单调递增的序号
-
生命周期
- 永久 : 不主动删除则永久保存,用于存储配置文件信息
- 临时 : 当创建znode的客户端断开连接时,znode会自动删除,通常用于保存节点的在线状态
-
Kafka
1 是什么
消息系统
流平台
事件流平台
- kafka是一个分布式,高吞吐,消息队列。
- 使用发布订阅模型管理数据通道。
2 核心名词
1. broker 代理人
broker是kafka服务启动后的主进程
集群中每个broker,有一个全局唯一的整数值,
作为当前节点的broker.id
2. producer 生产者
producer是kafka的一类客户端
专门负责将数据封装为ProducerRecord对象,
并发送给kafka集群
生产者发送数据时需要指定数据送达的topic
3. topic 主题
kafka中使用topic这个抽象概念对kafka管理的数据进行分类
创建topic时 需要指定topic的分区数和副本数
4. partition 分区
分区是存储在服务器上的具体文件夹
文件夹的命名方式为 topic名称-分区编号 (例如topicxxx-0)
分区文件夹中会保存三类主要的文件
xxx.index 用于构建log文件的索引
xxx.log 用于保存生产者发送的消息内容
xxx.timeindex 用于构建log文件的时间索引
5. replication-factor 副本因子
创建topic时可以指定数据保存的份数
kafka集群,不允许创建超过可用broker数量的副本数
通常生产环境1~3份副本
同一个主题下的,相同分区的不同副本之间,会选举产生一个leader和若干个follower
leader负责与客户端(生产者、消费者)进行交互,处理客户端请求
follower负责实时同步leader的数据
6. offset 偏移量(消息编号)
kafka中每个topic的每个分区,会独立维护一个单调递增的offset,
每条消息写入分区后,都会分配一个唯一offset值
7. consumer 消费者
consumer是kafka的一类客户端,负责从kafka中的指定topic拉取数据
consumer在消费数据时,会记录自己的offset读取进度
8. consumer group 消费者组
创建消费者时可以指定group.id
相同group.id的消费者,会共享一份offset读取进度
- kafka数据顺序的问题
3 集群搭建
- 关于版本:
kafka是为数不多的所有版本都提供向下兼容的开源组件
跨大版本号的新版本 依然兼容老版本api
kafka的版本迭代主要提供性能改进
生产环境主流版本: 0.10.x、0.11.x、1.1.x、2.2.x
最新版本: 3.1.0 主要添加了kafka-stream模块,kafka自己也提供流式计算框架,用于替代sparkstreaming或者flink
当前版本:2.6.3 ,与spark3.1.2兼容性最佳版本
- 下载解压
- 修改配置
server.properties
# 为每台broker设置全局唯一的整数值作为id号
broker.id=102
# 设置broker服务的主机名和端口
listeners=PLAINTEXT://bd1002:9092
# 设置 kafka用于保存 消息数据的log文件
# 这里的log不是程序运行过程中的日志
# 而是kafka用于保存生产者发送的消息,使用的一种特殊文件格式,kafka将其
称之为log文件
log.dirs=/opt/kafka-2.6.3/kafka-logs
# 设置消息的全局生命周期
# 每条进入kafka的数据,会带有发送时的时间戳,超时后kafka会自动删除数据
log.retention.hours=168
# 设置数据文件的分片大小 默认1GB
log.segment.bytes=1073741824
# 设置kafka集群使用的zk集群地址
zookeeper.connect=bd1001:2181,bd1002:2181,bd1003:2181
-
分发安装包
- 单独修改
broker.id
和listeners
- 单独修改
-
配置环境变量
echo 'export KFK_HOME=/opt/kafka-2.6.3' >> /etc/profile
echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
- 启动集群
4 命令行操作
- 主题相关操作
# 查看主题列表
kafka-topics.sh \
--bootstrap-server bd1001:9092 \
--list
# 创建主题
kafka-topics.sh \
--bootstrap-server bd1001:9092 \
--create \
--topic topic-test-01 \
--partitions 3 \
--replication-factor 2
# 查看主题详情
kafka-topics.sh \
--bootstrap-server bd1001:9092 \
--describe \
--topic topic-test-01
- 控制台生产者消费者
# 控制台消费者
kafka-console-consumer.sh \
--bootstrap-server bd1001:9092,bd1002:9092 \
--topic topic-test-01 \
--from-beginning
# 控制台生产者
kafka-console-producer.sh \
--bootstrap-server bd1001:9092,bd1002:9092 \
--topic topic-test-01
-
kafka可视化
- 选用 kafka-eagle作为kafka可视化工具
- kafka-eagle又叫EFAK,通过jvm的一个web程序,提供可视化操作和集群管理
-
安装
- 下载解压
- 修改配置文件
- system-config.properties
# zk集群的配置
efak.zk.cluster.alias=cluster1
cluster1.zk.list=bd1001:2181,bd1002:2181,bd1003:2181
# 配置ke使用的数据库
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://bd1001:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
- 配置环境变量
echo 'export KE_HOME=/opt/efak-web-2.1.0' >> /etc/profile
echo 'export PATH=$KE_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
- 启动
ke.sh start
- 访问并操作
http://host:8048