AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Zookeeper

1. 是什么

分布式集群协调服务

  • 使用场景
  1. 高可用集群的主备切换管理
    zk的ZAB zookeeper原子广播,分布式投票选举机制

  2. 分布式场景配置文件管理
    观察者模式 注册监听线程,数据发生变化时进行回调

  3. 分布式锁
    zk维护单调递增的zxid,对数据进行自动编号

2. zk安装

2.1. 单机模式

  • zk支持单机模式运行,standalone
  • 解压直接启动

2.2. 集群模式

  1. 下载解压
  2. 修改配置文件

    zoo.cfg

# 配置zk保存数据的路径
dataDir=/opt/zookeeper-3.4.14/data
#配置zk集群列表
#server.A=B:C:D
#A: zk集群中每个节点都有一个全局唯一的整数,叫做myid
#B: myid对应的主机名或者ip地址
#C: 集群节点之间数据同步使用的端口 默认2888
#D: 集群节点之间进行投票使用的端口 默认3888
server.1=bd1001:2888:3888
server.2=bd1002:2888:3888
server.3=bd1003:2888:3888
  1. 分发安装包

    • 为每台机器单独修改myid文件
  2. 配置环境变量

echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
echo 'export PATH=$ZK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动服务
zkServer.sh start

3. zk操作

  • 使用zk的cli连接服务端
# 启动控制台客户端
# 默认连接localhost:2181
zkCli.sh
# 连接远程zk服务器
zkCli.sh -server host:port
# 查看帮助
help
# 创建znode
# -s创建带序号的znode
# -e创建临时znode
# path在zk中需要使用从/开始的绝对路径
create [-s] [-e] path data
# 查询znode的内容数据和元数据
get path
# 修改znode的内容数据
set path data
# 删除节点
delete path
# 退出
quit
  • 可视化工具

    • zooinspector
    • idea的插件

4. zk核心原理

  • zk集群的选举机制

    • zk集群中半数以下机器启动时,每台节点都会通过选举投票端口向集群的其他机器发送选举投票
    • zk集群中半数以上机器启动时,所有已启动节点将选票投给myid最大的机器
    • 半数以上选票推选的myid主机,当选为leader
    • 其他机器作为follower
  • 数据模型

file

  • znode的分类

    • 命名方式

      1. 自定义名称 : 用户创建时指定znode名称
      2. 带序号的 : 用户创建时,zk会自动根据pZxid为znode名称后添加单调递增的序号
    • 生命周期

      1. 永久 : 不主动删除则永久保存,用于存储配置文件信息
      2. 临时 : 当创建znode的客户端断开连接时,znode会自动删除,通常用于保存节点的在线状态

Kafka

1 是什么

消息系统
流平台
事件流平台

  • kafka是一个分布式,高吞吐,消息队列。
  • 使用发布订阅模型管理数据通道。

2 核心名词

1. broker 代理人

broker是kafka服务启动后的主进程
集群中每个broker,有一个全局唯一的整数值,
作为当前节点的broker.id

2. producer 生产者

producer是kafka的一类客户端
专门负责将数据封装为ProducerRecord对象,
并发送给kafka集群
生产者发送数据时需要指定数据送达的topic

3. topic 主题

kafka中使用topic这个抽象概念对kafka管理的数据进行分类
创建topic时 需要指定topic的分区数和副本数

4. partition 分区

分区是存储在服务器上的具体文件夹
文件夹的命名方式为 topic名称-分区编号 (例如topicxxx-0)
分区文件夹中会保存三类主要的文件
xxx.index 用于构建log文件的索引
xxx.log 用于保存生产者发送的消息内容
xxx.timeindex 用于构建log文件的时间索引

5. replication-factor 副本因子

创建topic时可以指定数据保存的份数
kafka集群,不允许创建超过可用broker数量的副本数
通常生产环境1~3份副本
同一个主题下的,相同分区的不同副本之间,会选举产生一个leader和若干个follower
leader负责与客户端(生产者、消费者)进行交互,处理客户端请求
follower负责实时同步leader的数据

6. offset 偏移量(消息编号)

kafka中每个topic的每个分区,会独立维护一个单调递增的offset,
每条消息写入分区后,都会分配一个唯一offset值

7. consumer 消费者

consumer是kafka的一类客户端,负责从kafka中的指定topic拉取数据
consumer在消费数据时,会记录自己的offset读取进度

8. consumer group 消费者组

创建消费者时可以指定group.id
相同group.id的消费者,会共享一份offset读取进度

  • kafka数据顺序的问题

3 集群搭建

  • 关于版本:
    kafka是为数不多的所有版本都提供向下兼容的开源组件
    跨大版本号的新版本 依然兼容老版本api
    kafka的版本迭代主要提供性能改进
    生产环境主流版本: 0.10.x、0.11.x、1.1.x、2.2.x
    最新版本: 3.1.0 主要添加了kafka-stream模块,kafka自己也提供流式计算框架,用于替代sparkstreaming或者flink
    当前版本:2.6.3 ,与spark3.1.2兼容性最佳版本
  1. 下载解压
  2. 修改配置

server.properties

# 为每台broker设置全局唯一的整数值作为id号
broker.id=102
# 设置broker服务的主机名和端口
listeners=PLAINTEXT://bd1002:9092
# 设置 kafka用于保存 消息数据的log文件
# 这里的log不是程序运行过程中的日志
# 而是kafka用于保存生产者发送的消息,使用的一种特殊文件格式,kafka将其
称之为log文件
log.dirs=/opt/kafka-2.6.3/kafka-logs
# 设置消息的全局生命周期
# 每条进入kafka的数据,会带有发送时的时间戳,超时后kafka会自动删除数据
log.retention.hours=168
# 设置数据文件的分片大小 默认1GB
log.segment.bytes=1073741824
# 设置kafka集群使用的zk集群地址
zookeeper.connect=bd1001:2181,bd1002:2181,bd1003:2181
  1. 分发安装包

    • 单独修改broker.idlisteners
  2. 配置环境变量

echo 'export KFK_HOME=/opt/kafka-2.6.3' >> /etc/profile
echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动集群

4 命令行操作

  • 主题相关操作
# 查看主题列表
kafka-topics.sh \
--bootstrap-server bd1001:9092 \
--list
# 创建主题
kafka-topics.sh \
--bootstrap-server bd1001:9092 \
--create \
--topic topic-test-01 \
--partitions 3 \
--replication-factor 2
# 查看主题详情
kafka-topics.sh \
--bootstrap-server bd1001:9092 \
--describe \
--topic topic-test-01
  • 控制台生产者消费者
# 控制台消费者
kafka-console-consumer.sh \
--bootstrap-server bd1001:9092,bd1002:9092 \
--topic topic-test-01 \
--from-beginning
# 控制台生产者
kafka-console-producer.sh  \
--bootstrap-server bd1001:9092,bd1002:9092 \
--topic topic-test-01
  • kafka可视化

    • 选用 kafka-eagle作为kafka可视化工具
    • kafka-eagle又叫EFAK,通过jvm的一个web程序,提供可视化操作和集群管理
    • 安装

      1. 下载解压
      2. 修改配置文件
      • system-config.properties
# zk集群的配置
efak.zk.cluster.alias=cluster1
cluster1.zk.list=bd1001:2181,bd1002:2181,bd1003:2181
# 配置ke使用的数据库
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://bd1001:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
  1. 配置环境变量
echo 'export KE_HOME=/opt/efak-web-2.1.0' >> /etc/profile
echo 'export PATH=$KE_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动

ke.sh start

  1. 访问并操作

http://host:8048

5 编程API

6 组件整合