AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Zookeeper

1.是什么

ZooKeeper 是一个集中式服务

  • 在分布式集群中的作用

    1. 用于维护配置信息
      zk提供了观察者模式的数据节点监听
      可以在数据发生变动时,立即对所有监听此数据的客户端产生回调
      通常会将分布式场景中的集群配置信息直接保存在zk中
      并在zk中统一进行管理和维护

    2. 命名
      平级服务的编号与命名
      分布式锁,为每个线程分配编号和命名

    3. 提供分布式同步
      zk中提供了临时节点(暂时节点),在客户端断开连接后,自动删除数据
      用于kafka之类的,非主从架构的分布式集群中,每个服务不直接相互通信,而是通过查看zk中的节点上线状态,实现集群状态监控

    4. 提供组服务
      zk提供类似文件目录结构的树形结构
      可以为不同的应用程序,分不同目录进行数据存储

2.安装

  1. 下载解压

    • 版本:为兼容Hadoop和Spark选择3.4.14
  2. 修改配置文件

    • zoo.cfg
      # 设置zk保存数据和myid文件的路径
      dataDir=/opt/zookeeper-3.4.14/data
      # 设置zk集群所有节点的配置信息
      #A:每个zk服务有一个全局唯一的整数值,作为myid
      #B:id对应服务器的主机名或者Ip地址
      #C:集群之间进行数据同步的端口
      #D:集群之间进行选举投票的端口
      #server.A=B:C:D
      server.1=bd1201:2888:3888
      server.2=bd1202:2888:3888
      server.3=bd1203:2888:3888
  3. 分发安装包

    • 为每个服务器单独修改属于自己myid文件
  4. 添加环境变量

echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
echo 'export PATH=$ZK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动服务

3.zk操作

3.1命令行操作

# ls path 查看路径中包含的子路径
ls /
# create [-s] [-e] path data acl
# 创建新的子路径
# [-s]
# [-e]
# path 必须使用从根开始的绝对路径
# data 为创建的新路径添加数据
# acl 可选的,在开启权限验证时,需要添加授权文件
create /test1 helloZookeeper
# get path 读取路径中的数据
get /test1
## 路径的元数据
# helloZookeeper# 路径中的数据内容
# cZxid = 0x6  # 创建操作事务id
# ctime = Thu May 12 09:28:28 CST 2022 # 创建时间
# mZxid = 0x6 # 修改操作事务id
# mtime = Thu May 12 09:28:28 CST 2022 # 修改时间
# pZxid = 0x6 # 创建时,父路径的事务id
# cversion = 0 # 创建版本
# dataVersion = 0 # 数据版本
# aclVersion = 0 # 权限版本
# ephemeralOwner = 0x0 # 如果当前路径时一个临时路径,则会记录路径的具体创建人
# dataLength = 14 # 数据长度
# numChildren = 0 # 子路径个数
# set path data 更新路径中的数据
set /test1/a bbbbb
# rmr path 递归删除路径
rmr /test1

3.2 可视化工具

  • ZooInspector
    file

4.核心原理

4.1 存储模型

file

4.2 zk选举机制

  • 半数以上,id最大当选leader
    如果集群有3台机器,选举的过程如下

    1. id为1的机器启动后,将选票投给自己,并通过3888端口与集群其他机器进行通信
    2. id为2的机器启动后,将选票投给自己,并通过3888端口与集群其他机器进行通信
    3. id为1的机器,发现id为2的机器id号较大,选择将选票投给id为2
    4. id为1的机器和id为2的机器,都将选票投给id为2的机器,选票的数量超过集群节点的半数,所以选举成功,2作为leader,1作为follower

Kafka

  • kafka是一个开源分布式事件流平台(消息系统、消息队列)

  • 消息
    早期 message
    当前版本 record

  • 使用场景
    高性能流管道
    流式数据分析
    数据整合中间件
    构建关键任务流

  • 消息队列的特点
    解耦:保证通信的客户端之间,进行解耦,不需要同时在线
    冗余:为同一份数据保存多个副本
    扩展性:通过分布式架构,可以随时调整集群的实例个数
    缓冲:由于数据产生的速度会存在峰值,如果不使用中间件进行缓冲,可能回导致采集端出现雪崩,削峰填谷
    有序:具有先进先出的队列的特点
    异步:将存储数据的文件拆分为多个,若干个生产者消费者,可以每个线程操作一个单独的文件

1. kafka核心原理

broker 代理人

broker是kafka服务启动后的一个jvm进程实例

topic 主题

topic是一个kafka集群的全局概念,不同的主题用于区分不同的数据来源
生产者在发送消息前,会先创建并指定topic进行发送
消费者在读取消息式,也需要指定从哪些topic进行读取

partition 分区

创建topic时通常会指定topic包含的分区个数,默认1
分区在kafka服务所在的Linux服务器上,以文件夹的形式存在
文件夹的命名 : topic名称-分区的编号
分区的编号从0开始

replication-factor 副本个数

创建topic时通常会指定数据存储的副本数,默认1
通常为了保证数据安全指定2~3个副本
kafka规定副本数不能超过可用的broker数量

offset 偏移量 编号 水位线

kafka接收到生产者发送的消息时,在存储消息前会为每条消息生成一个offset,
分区内全局唯一,单调递增的编号

  1. 为了记录消息的写入顺序
  2. 为了消费者可以记录读取进度

producer 生产者

是kafka的一类客户端
producer将需要发送到kafka的数据封装为ProducerRecord对象,并序列化后发送到kafka集群

consumer 消费者

是kafka的一类客户端
从kafka中读取数据反序列化后得到ConsumerRecord对象,从ConsumerRecord读取消息内容进行下游处理

consumer group 消费者组

启动消费者时可以指定group.id
每个group.id会维护一个消费进度列表
多个消费者线程,如果使用相同的group.id,则多个线程可以共享消费进度

2.集群安装

  • 提前搭建并启动zk
  1. 下载解压

    • 版本 kafka_2.12-2.8.1
    • 其中2.12为kafka编译时使用的Scala版本
  2. 修改配置文件

    • server.properties
      # 为每台kafka服务设置全局唯一的id 叫做broker.id
      broker.id=121
      # 设置kafka服务连接的监听URI
      listeners=PLAINTEXT://bd1201:9092
      # 这里的所谓的logFile  不是程序的运行日志,而是kafka用来保存>数据,的专门的文件格式
      # 默认存放在/tmp/kafka-logs,在这里我们将其设置到kafka的安装>目录中
      log.dirs=/opt/kafka-2.8.1/kafka-logs
      # 设置kafka需要连接的zk集群的地址
      zookeeper.connect=bd1201:2181,bd1202:2181,bd1203:2181
  3. 分发安装包

    • 为每台机器单独设置broker.id,listeners
  4. 配置环境变量

    echo 'export KFK_HOME=/opt/kafka-2.8.1' >> /etc/profile
    echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
    source /etc/profile
  5. 启动

# kafka启动命令
# [-daemon]使用后台方式启动
# [--override property=value]*可以在启动kafka服务时指定配置项,--override的优先级会高于server.properties
# /opt/kafka-2.8.1/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
# 通常kafka集群搭建好需要进行性能调优
# 性能调优通常需要调整各种参数
#  性能测试时:
/opt/kafka-2.8.1/bin/kafka-server-start.sh \
server.properties \
--override property=value \
--override property=value \
--override property=value \
--override property=value \
--override property=value
# 调参结束后,最终启动服务
/opt/kafka-2.8.1/bin/kafka-server-start.sh -daemon server.properties

3. kafka使用

3.1 命令操作

  • 主题相关
# 查看集群可用topic
kafka-topics.sh \
--bootstrap-server bd1201:9092,bd1202:9092,bd1203:9092 \
--list
# 使用默认配置创建主题
kafka-topics.sh \
--bootstrap-server bd1201:9092,bd1202:9092,bd1203:9092 \
--create \
--topic topic-1
# 使用指定的分区数和副本数创建主题
kafka-topics.sh \
--bootstrap-server bd1201:9092,bd1202:9092,bd1203:9092 \
--create \
--topic topic-2 \
--partitions 3 \
--replication-factor 2
  • 消费者相关
# 控制台消费者  通常用于测试主题是否可用
kafka-console-consumer.sh \
--bootstrap-server bd1202:9092 \
--topic topic-t2 \
--gourp g2 \
--from-beginning
# 消费者性能测试脚本
kafka-consumer-perf-test.sh
  • 生产者相关
# 控制台生产者  通常用于测试主题是否可用
kafka-console-producer.sh  \
--bootstrap-server bd1203:9092 \
--topic topic-2
# 生产者性能测试脚本
kafka-producer-perf-test.sh

面试题

  1. kafka能不能保证数据全局有序?
    如果topic只有一个分区,则可以保证全局有序

  2. 如果包含多个分区呢?
    生产者发送数据时默认是轮询的,所以无法保证多个分区之间全局有序

  3. 如果多个分区还想保证有序呢?
    在我之前项目里,我们用户访问记录进kafka前,会根据用户id计算分区编号
    将相同用户id的数据写入相同的分区,这样在分区内部可以保证数据读取的顺序与产生的顺序一致,保证有序
    分区之间由于存储的是不同用户的数据,所以他们之间的顺序无所谓