Kafka
Zookeeper
是什么
- zookeeper是一个hadoop生态的集群协调管理服务
- 主要使用场景
- 管理配置文件
- 管理HA架构的迁移
- 管理集群服务在线状态
- 分布式锁
安装zookeeper
-
版本
-
当前主流的大数据生态组件 兼容的zk版本为3.4.x
-
最新zk版本为3.7.x
-
此处选用3.4.14
-
1.单节点zk安装
-
下载解压改名
-
修改配置文件
- 将zoo_sample.cfg 改名为zoo.cfg
- 修改zoo.cfg
# 设置zk用来保存数据的路径在zk安装目录中
dataDir=/opt/zookeeper-3.4.14/data
- 启动
# 启动zk服务
zkServer.sh start
# 查看zk服务状态
zkServer.sh status
- 使用zkCli
# 启动命令行客户端连接zk本机服务 默认连接localhost:2181
zkCli.sh
# 带server参数可以用于连接其他机器其他端口
zkCli.sh -server host:port
zkCli.sh -server node01:2181
# 创建znode并写入数据
# create znode绝对路径 需要保存的数据
create /znode1 testdata
# 查看znode
get /znode1
testdata
# cZxid = 0x4 # 创建操作的逻辑时钟
# ctime = Tue Aug 31 15:34:00 CST 2021 # 创建时间
# mZxid = 0x4 # 修改操作的逻辑时钟
# mtime = Tue Aug 31 15:34:00 CST 2021 # 修改时间
# pZxid = 0x4 # 父节点的逻辑时钟
# cversion = 0 # 创建版本
# dataVersion = 0 # 数据版本
# aclVersion = 0 # 权限版本
# ephemeralOwner = 0x0 # 临时所有者
# dataLength = 7 # 数据长度
# numChildren = 0 # 子节点数量
# znode重新赋值
set /znode1 aaaa
2.集群搭建3个节点
-
下载解压改名
-
修改配置文件
- zoo.cfg
# 设置数据的存储路径
# 需要手动创建
dataDir=/opt/zookeeper-3.4.14/data
# 设置集群信息
#server.A=B:C:D
#A 是每个zk服务的唯一标识 myid
#B myid对应的服务器主机名或者IP
#C zk集群内部选举使用的通信端口
#D zk集群内部同步数据使用的通信端口
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
- myid
- 在zoo.cfg中配置的dataDir目录中创建一个文本文件,名字叫myid,文件内存放当前机器的myid
-
分发安装包
- scp分发安装包
- 修改每台机器自己的myid文件
-
配置环境变量
echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
echo 'export PATH=$ZK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
- 使用zk可视化工具访问
zk存储模型
-
znode有4种类型
-
生命周期:短暂,永久
-
永久:如果不手动删除则znode永久保留
-
短暂:创建短暂znode的客户端断开连接时,znode自动删除
-
命名方式:无序号,有序号
-
无序号:创建的znode名称是客户端自定义名称
-
有序号:创建znode时,zk会自动在znode名称后添加一个单调递增的序号
-
zk选举机制
- 半数以上取id最大
- 例如有5台zk服务器,每台id分别为1 2 3 4 5
- 当4号启动后, 先将票投给自己然后观察其他服务器的上线状态
- 3号启动,3号发现有一个id为4的服务器已经存在,就将票投给4号
- 5号启动,3,4,5 发现5号id最大,投票选举产生leader
Kafka
1.是什么
-
早期定位为 A high-throughput distributed messaging system
高吞吐的分布式消息系统 -
后来改叫 a distributed streaming platform
分布式流平台 -
当前版本 Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
- 开源的分布式的事件流平台,数千个公司在使用高性能数据管道,流式数据分析,数据处理和一些重要任务的应用程序。
2.什么是消息队列
- 消息:在大数据场景中通常是一些文本日志
- 队列:有顺序的先进先出的数据结构
- 生产者:负责将消息发送到消息队列
- 消费者:负责读取消息队列的数据进行进行数据处理
3.搭建kafka集群
- kafka核心源代码使用scala语言编写
- 后来由于经常需要于java应用程序配合使用所以也提供了JavaApi
- 最新版本 2.8.0
- 当前选择 基于scala2.11 kafka2.4.1
- 生产环境 0.8.x 、0.10.x 、0.11.x、1.1.x、2.1.x
-
下载安装包
-
修改配置文件
- server.properties
# 为每台kafka节点设置一个全局唯一的整数,作为broker的id
broker.id=101
# 为每台kafka节点设置一个全局唯一的服务主机名和端口
listeners=PLAINTEXT://node01:9092
# 通常我们认为log(日志)是程序运行过程中用来记录运行状态的文件,----->运行日志
# 这里所谓的log(日志文件)是kafka集群用来存储生产者发送的message(消息)的文件---->消息日志
log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs
# 设置kafka需要连接的zk集群的地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
-
分发安装包
- 分发之后修改每台机器的broker.id和listeners
-
配置环境变量
echo 'export KFK_HOME=/opt/kafka_2.12-2.4.1' >> /etc/profile
echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
- 启动测试
# -deamon 将服务启动为后台执行
kafka-server-start.sh -daemon /opt/kafka_2.12-2.4.1/config/server.properties
4. kafka核心名词
4.1 broker 代理人
- broker是kafka服务的实例,使用kafka-server-start启动起来的kafka服务就称之为一个broker进程
- 每个broker有一个全局唯一的id
- 每个服务器上通常只启动一个broker
4.2 topic 主题
- topic是一个逻辑概念
- 用来划分kafka中保存的消息
- 生产者发送数据和消费者接收数据时都需要指定topic
4.3 partition 分区
- 每个topic可以包含多个分区
- 分区分散在集群的不同机器上
- 每个分区的都有一个id ,id从0开始编号
- 分区本质上是一个文件夹,文件夹名称为 topic名-分区编号
4.4 replication.factor 副本因子(副本数)
- 每个分区可以设置多个副本
- 副本数就是每条消息在集群内保存的份数
4.5 offset 偏移量
- 分区内会按照消息写入的顺序为每条消息分配一个单调递增的编号 称之为offset
4.6 分区之间的主从
-
相同topic的 相同分区 的不同副本之间会选举产生一个leader和多个follower
-
leader负责接收客户端的请求 接收生产者的发送请求和消费者的读取请求
-
follower同步leader的数据
4.7 producer 生产者
- 使用命令或者java、scala API启动生产者程序
- 生产者负责将消息发送到kafka集群进行保存
4.8 consumer 消费者
- 使用命令或者java、scala API启动消费者程序
- 读取kafka集群指定topic的数据,进行处理
- 每个消费者可以设置一个group.id
- 消费者会记录自己的读取进度
- 进度包含topic信息分区信息和offset记录
- 读取进度与group.id一一对应
5. kafka使用
5.1 使用命令操作
- topic相关
# 查看topic列表
kafka-topics.sh \
--bootstrap-server node01:9092 \
--list
# 创建topic
kafka-topics.sh \
--bootstrap-server node01:9092 \
--create --topic t1 \
--partitions 3 \
--replication-factor 2
# 查看topic详情
kafka-topics.sh \
--bootstrap-server node01:9092 \
--topic t1 \
--describe
- 消费者
# 启动控制台消费者测试消费
kafka-console-consumer.sh --bootstrap-server node02:9092 --topic t1
# 消费者性能测试
- 生产者
# 启动控制台生产者测试生产
kafka-console-producer.sh --broker-list node03:9092 --topic t1
# 生产者性能测试
kafka-producer-perf-test.sh \
--producer-props bootstrap.servers=node01:9092,node02:9092,node03:9092 \
--topic t3 \
--num-records 1000000 \
--throughput -1 \
--payload-file /root/logs/access.log-20210401
5.2 使用可视化工具
- kafka-eagle
本质上就是一个web应用程序,将kafka封装为web页面可以直接操作的软件 - 最新版本2.0.6
- 当前安装2.0.5
- 下载解压
- 修改配置文件
- system-config.properties
# 配置集群信息
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181
# 配置web服务端口
kafka.eagle.webui.port=8048
# 关闭sqlite
######################################
# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org
# 设置连接的mysql服务器
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node02:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
- 配置环境变量
export KE_HOME=/opt/ke-2.0.5
export PATH=$KE_HOME/bin:$PATH
- 启动
ke.sh start
5.3 使用JavaAPI
- 修改topic的配置
kafka-configs.sh \
--zookeeper node01:2181 \
--entity-type topics \
--entity-name t_file_helper01 \
--alter --add-config max.message.bytes=3000000