Kafka
Zookeeper
1.是什么
ZooKeeper 是一个集中式服务
-
在分布式集群中的作用
-
用于维护配置信息
zk提供了观察者模式的数据节点监听
可以在数据发生变动时,立即对所有监听此数据的客户端产生回调
通常会将分布式场景中的集群配置信息直接保存在zk中
并在zk中统一进行管理和维护 -
命名
平级服务的编号与命名
分布式锁,为每个线程分配编号和命名 -
提供分布式同步
zk中提供了临时节点(暂时节点),在客户端断开连接后,自动删除数据
用于kafka之类的,非主从架构的分布式集群中,每个服务不直接相互通信,而是通过查看zk中的节点上线状态,实现集群状态监控 -
提供组服务
zk提供类似文件目录结构的树形结构
可以为不同的应用程序,分不同目录进行数据存储
-
2.安装
-
下载解压
- 版本:为兼容Hadoop和Spark选择3.4.14
-
修改配置文件
- zoo.cfg
# 设置zk保存数据和myid文件的路径 dataDir=/opt/zookeeper-3.4.14/data # 设置zk集群所有节点的配置信息 #A:每个zk服务有一个全局唯一的整数值,作为myid #B:id对应服务器的主机名或者Ip地址 #C:集群之间进行数据同步的端口 #D:集群之间进行选举投票的端口 #server.A=B:C:D server.1=bd1201:2888:3888 server.2=bd1202:2888:3888 server.3=bd1203:2888:3888
- zoo.cfg
-
分发安装包
- 为每个服务器单独修改属于自己myid文件
-
添加环境变量
echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
echo 'export PATH=$ZK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
- 启动服务
3.zk操作
3.1命令行操作
# ls path 查看路径中包含的子路径
ls /
# create [-s] [-e] path data acl
# 创建新的子路径
# [-s]
# [-e]
# path 必须使用从根开始的绝对路径
# data 为创建的新路径添加数据
# acl 可选的,在开启权限验证时,需要添加授权文件
create /test1 helloZookeeper
# get path 读取路径中的数据
get /test1
## 路径的元数据
# helloZookeeper# 路径中的数据内容
# cZxid = 0x6 # 创建操作事务id
# ctime = Thu May 12 09:28:28 CST 2022 # 创建时间
# mZxid = 0x6 # 修改操作事务id
# mtime = Thu May 12 09:28:28 CST 2022 # 修改时间
# pZxid = 0x6 # 创建时,父路径的事务id
# cversion = 0 # 创建版本
# dataVersion = 0 # 数据版本
# aclVersion = 0 # 权限版本
# ephemeralOwner = 0x0 # 如果当前路径时一个临时路径,则会记录路径的具体创建人
# dataLength = 14 # 数据长度
# numChildren = 0 # 子路径个数
# set path data 更新路径中的数据
set /test1/a bbbbb
# rmr path 递归删除路径
rmr /test1
3.2 可视化工具
- ZooInspector
4.核心原理
4.1 存储模型
4.2 zk选举机制
- 半数以上,id最大当选leader
如果集群有3台机器,选举的过程如下- id为1的机器启动后,将选票投给自己,并通过3888端口与集群其他机器进行通信
- id为2的机器启动后,将选票投给自己,并通过3888端口与集群其他机器进行通信
- id为1的机器,发现id为2的机器id号较大,选择将选票投给id为2
- id为1的机器和id为2的机器,都将选票投给id为2的机器,选票的数量超过集群节点的半数,所以选举成功,2作为leader,1作为follower
Kafka
-
kafka是一个开源分布式事件流平台(消息系统、消息队列)
-
消息
早期 message
当前版本 record -
使用场景
高性能流管道
流式数据分析
数据整合中间件
构建关键任务流 -
消息队列的特点
解耦:保证通信的客户端之间,进行解耦,不需要同时在线
冗余:为同一份数据保存多个副本
扩展性:通过分布式架构,可以随时调整集群的实例个数
缓冲:由于数据产生的速度会存在峰值,如果不使用中间件进行缓冲,可能回导致采集端出现雪崩,削峰填谷
有序:具有先进先出的队列的特点
异步:将存储数据的文件拆分为多个,若干个生产者消费者,可以每个线程操作一个单独的文件
1. kafka核心原理
broker 代理人
broker是kafka服务启动后的一个jvm进程实例
topic 主题
topic是一个kafka集群的全局概念,不同的主题用于区分不同的数据来源
生产者在发送消息前,会先创建并指定topic进行发送
消费者在读取消息式,也需要指定从哪些topic进行读取
partition 分区
创建topic时通常会指定topic包含的分区个数,默认1
分区在kafka服务所在的Linux服务器上,以文件夹的形式存在
文件夹的命名 : topic名称-分区的编号
分区的编号从0开始
replication-factor 副本个数
创建topic时通常会指定数据存储的副本数,默认1
通常为了保证数据安全指定2~3个副本
kafka规定副本数不能超过可用的broker数量
offset 偏移量 编号 水位线
kafka接收到生产者发送的消息时,在存储消息前会为每条消息生成一个offset,
分区内全局唯一,单调递增的编号
- 为了记录消息的写入顺序
- 为了消费者可以记录读取进度
producer 生产者
是kafka的一类客户端
producer将需要发送到kafka的数据封装为ProducerRecord对象,并序列化后发送到kafka集群
consumer 消费者
是kafka的一类客户端
从kafka中读取数据反序列化后得到ConsumerRecord对象,从ConsumerRecord读取消息内容进行下游处理
consumer group 消费者组
启动消费者时可以指定group.id
每个group.id会维护一个消费进度列表
多个消费者线程,如果使用相同的group.id,则多个线程可以共享消费进度
2.集群安装
- 提前搭建并启动zk
-
下载解压
- 版本 kafka_2.12-2.8.1
- 其中2.12为kafka编译时使用的Scala版本
-
修改配置文件
- server.properties
# 为每台kafka服务设置全局唯一的id 叫做broker.id broker.id=121 # 设置kafka服务连接的监听URI listeners=PLAINTEXT://bd1201:9092 # 这里的所谓的logFile 不是程序的运行日志,而是kafka用来保存>数据,的专门的文件格式 # 默认存放在/tmp/kafka-logs,在这里我们将其设置到kafka的安装>目录中 log.dirs=/opt/kafka-2.8.1/kafka-logs # 设置kafka需要连接的zk集群的地址 zookeeper.connect=bd1201:2181,bd1202:2181,bd1203:2181
- server.properties
-
分发安装包
- 为每台机器单独设置
broker.id
,listeners
- 为每台机器单独设置
-
配置环境变量
echo 'export KFK_HOME=/opt/kafka-2.8.1' >> /etc/profile echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile source /etc/profile
-
启动
# kafka启动命令
# [-daemon]使用后台方式启动
# [--override property=value]*可以在启动kafka服务时指定配置项,--override的优先级会高于server.properties
# /opt/kafka-2.8.1/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
# 通常kafka集群搭建好需要进行性能调优
# 性能调优通常需要调整各种参数
# 性能测试时:
/opt/kafka-2.8.1/bin/kafka-server-start.sh \
server.properties \
--override property=value \
--override property=value \
--override property=value \
--override property=value \
--override property=value
# 调参结束后,最终启动服务
/opt/kafka-2.8.1/bin/kafka-server-start.sh -daemon server.properties
3. kafka使用
3.1 命令操作
- 主题相关
# 查看集群可用topic
kafka-topics.sh \
--bootstrap-server bd1201:9092,bd1202:9092,bd1203:9092 \
--list
# 使用默认配置创建主题
kafka-topics.sh \
--bootstrap-server bd1201:9092,bd1202:9092,bd1203:9092 \
--create \
--topic topic-1
# 使用指定的分区数和副本数创建主题
kafka-topics.sh \
--bootstrap-server bd1201:9092,bd1202:9092,bd1203:9092 \
--create \
--topic topic-2 \
--partitions 3 \
--replication-factor 2
- 消费者相关
# 控制台消费者 通常用于测试主题是否可用
kafka-console-consumer.sh \
--bootstrap-server bd1202:9092 \
--topic topic-t2 \
--gourp g2 \
--from-beginning
# 消费者性能测试脚本
kafka-consumer-perf-test.sh
- 生产者相关
# 控制台生产者 通常用于测试主题是否可用
kafka-console-producer.sh \
--bootstrap-server bd1203:9092 \
--topic topic-2
# 生产者性能测试脚本
kafka-producer-perf-test.sh
面试题
-
kafka能不能保证数据全局有序?
如果topic只有一个分区,则可以保证全局有序 -
如果包含多个分区呢?
生产者发送数据时默认是轮询的,所以无法保证多个分区之间全局有序 -
如果多个分区还想保证有序呢?
在我之前项目里,我们用户访问记录进kafka前,会根据用户id计算分区编号
将相同用户id的数据写入相同的分区,这样在分区内部可以保证数据读取的顺序与产生的顺序一致,保证有序
分区之间由于存储的是不同用户的数据,所以他们之间的顺序无所谓