AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Zookeeper

是什么

  • zookeeper是一个hadoop生态的集群协调管理服务
  • 主要使用场景
    1. 管理配置文件
    2. 管理HA架构的迁移
    3. 管理集群服务在线状态
    4. 分布式锁

安装zookeeper

  • 版本

    • 当前主流的大数据生态组件 兼容的zk版本为3.4.x

    • 最新zk版本为3.7.x

    • 此处选用3.4.14

1.单节点zk安装

  1. 下载解压改名

  2. 修改配置文件

  • 将zoo_sample.cfg 改名为zoo.cfg
  • 修改zoo.cfg
# 设置zk用来保存数据的路径在zk安装目录中
dataDir=/opt/zookeeper-3.4.14/data
  1. 启动
# 启动zk服务
zkServer.sh start
# 查看zk服务状态
zkServer.sh status
  1. 使用zkCli
# 启动命令行客户端连接zk本机服务 默认连接localhost:2181
zkCli.sh
# 带server参数可以用于连接其他机器其他端口
zkCli.sh -server host:port
zkCli.sh -server node01:2181
# 创建znode并写入数据
# create znode绝对路径  需要保存的数据
create /znode1 testdata
# 查看znode
get /znode1
testdata
# cZxid = 0x4                            # 创建操作的逻辑时钟
# ctime = Tue Aug 31 15:34:00 CST 2021   # 创建时间
# mZxid = 0x4                            # 修改操作的逻辑时钟
# mtime = Tue Aug 31 15:34:00 CST 2021   # 修改时间
# pZxid = 0x4                            # 父节点的逻辑时钟
# cversion = 0                           # 创建版本
# dataVersion = 0                        # 数据版本
# aclVersion = 0                         # 权限版本
# ephemeralOwner = 0x0                   # 临时所有者
# dataLength = 7                         # 数据长度
# numChildren = 0                        # 子节点数量
# znode重新赋值
set /znode1 aaaa

2.集群搭建3个节点

  1. 下载解压改名

  2. 修改配置文件

    • zoo.cfg
# 设置数据的存储路径
# 需要手动创建
dataDir=/opt/zookeeper-3.4.14/data
# 设置集群信息
#server.A=B:C:D
#A 是每个zk服务的唯一标识  myid
#B myid对应的服务器主机名或者IP
#C zk集群内部选举使用的通信端口
#D zk集群内部同步数据使用的通信端口
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
  • myid
    • 在zoo.cfg中配置的dataDir目录中创建一个文本文件,名字叫myid,文件内存放当前机器的myid
  1. 分发安装包

    • scp分发安装包
    • 修改每台机器自己的myid文件
  2. 配置环境变量

echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
echo 'export PATH=$ZK_HOME/bin:$PATH'  >> /etc/profile
source /etc/profile
  1. 使用zk可视化工具访问

zk存储模型

file

  • znode有4种类型

    • 生命周期:短暂,永久

    • 永久:如果不手动删除则znode永久保留

    • 短暂:创建短暂znode的客户端断开连接时,znode自动删除

    • 命名方式:无序号,有序号

    • 无序号:创建的znode名称是客户端自定义名称

    • 有序号:创建znode时,zk会自动在znode名称后添加一个单调递增的序号

zk选举机制

  • 半数以上取id最大
  1. 例如有5台zk服务器,每台id分别为1 2 3 4 5
  2. 当4号启动后, 先将票投给自己然后观察其他服务器的上线状态
  3. 3号启动,3号发现有一个id为4的服务器已经存在,就将票投给4号
  4. 5号启动,3,4,5 发现5号id最大,投票选举产生leader

Kafka

1.是什么

  • 早期定位为 A high-throughput distributed messaging system
    高吞吐的分布式消息系统

  • 后来改叫 a distributed streaming platform
    分布式流平台

  • 当前版本 Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

    • 开源的分布式的事件流平台,数千个公司在使用高性能数据管道,流式数据分析,数据处理和一些重要任务的应用程序。

2.什么是消息队列

  • 消息:在大数据场景中通常是一些文本日志
  • 队列:有顺序的先进先出的数据结构
  • 生产者:负责将消息发送到消息队列
  • 消费者:负责读取消息队列的数据进行进行数据处理

3.搭建kafka集群

  • kafka核心源代码使用scala语言编写
  • 后来由于经常需要于java应用程序配合使用所以也提供了JavaApi
  • 最新版本 2.8.0
  • 当前选择 基于scala2.11 kafka2.4.1
  • 生产环境 0.8.x 、0.10.x 、0.11.x、1.1.x、2.1.x
  1. 下载安装包

  2. 修改配置文件

    • server.properties
# 为每台kafka节点设置一个全局唯一的整数,作为broker的id
broker.id=101
# 为每台kafka节点设置一个全局唯一的服务主机名和端口
listeners=PLAINTEXT://node01:9092
# 通常我们认为log(日志)是程序运行过程中用来记录运行状态的文件,----->运行日志
# 这里所谓的log(日志文件)是kafka集群用来存储生产者发送的message(消息)的文件---->消息日志
log.dirs=/opt/kafka_2.12-2.4.1/kafka-logs
# 设置kafka需要连接的zk集群的地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
  1. 分发安装包

    • 分发之后修改每台机器的broker.id和listeners
  2. 配置环境变量

echo 'export KFK_HOME=/opt/kafka_2.12-2.4.1' >> /etc/profile
echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动测试
# -deamon 将服务启动为后台执行
kafka-server-start.sh -daemon /opt/kafka_2.12-2.4.1/config/server.properties

4. kafka核心名词

4.1 broker 代理人

  • broker是kafka服务的实例,使用kafka-server-start启动起来的kafka服务就称之为一个broker进程
  • 每个broker有一个全局唯一的id
  • 每个服务器上通常只启动一个broker

4.2 topic 主题

  • topic是一个逻辑概念
  • 用来划分kafka中保存的消息
  • 生产者发送数据和消费者接收数据时都需要指定topic

4.3 partition 分区

  • 每个topic可以包含多个分区
  • 分区分散在集群的不同机器上
  • 每个分区的都有一个id ,id从0开始编号
  • 分区本质上是一个文件夹,文件夹名称为 topic名-分区编号

4.4 replication.factor 副本因子(副本数)

  • 每个分区可以设置多个副本
  • 副本数就是每条消息在集群内保存的份数

4.5 offset 偏移量

  • 分区内会按照消息写入的顺序为每条消息分配一个单调递增的编号 称之为offset

4.6 分区之间的主从

  • 相同topic的 相同分区 的不同副本之间会选举产生一个leader和多个follower

  • leader负责接收客户端的请求 接收生产者的发送请求和消费者的读取请求

  • follower同步leader的数据

4.7 producer 生产者

  • 使用命令或者java、scala API启动生产者程序
  • 生产者负责将消息发送到kafka集群进行保存

4.8 consumer 消费者

  • 使用命令或者java、scala API启动消费者程序
  • 读取kafka集群指定topic的数据,进行处理
  • 每个消费者可以设置一个group.id
  • 消费者会记录自己的读取进度
    • 进度包含topic信息分区信息和offset记录
  • 读取进度与group.id一一对应

5. kafka使用

5.1 使用命令操作

  • topic相关
# 查看topic列表
kafka-topics.sh \
--bootstrap-server node01:9092 \
--list
# 创建topic
kafka-topics.sh \
--bootstrap-server node01:9092 \
--create --topic t1 \
--partitions 3 \
--replication-factor 2
# 查看topic详情
kafka-topics.sh \
--bootstrap-server node01:9092 \
--topic t1 \
--describe
  • 消费者
# 启动控制台消费者测试消费
kafka-console-consumer.sh --bootstrap-server node02:9092 --topic t1
# 消费者性能测试
  • 生产者
# 启动控制台生产者测试生产
kafka-console-producer.sh --broker-list node03:9092 --topic t1
# 生产者性能测试
kafka-producer-perf-test.sh \
--producer-props bootstrap.servers=node01:9092,node02:9092,node03:9092 \
--topic t3 \
--num-records 1000000 \
--throughput -1 \
--payload-file /root/logs/access.log-20210401

5.2 使用可视化工具

  • kafka-eagle
    本质上就是一个web应用程序,将kafka封装为web页面可以直接操作的软件
  • 最新版本2.0.6
  • 当前安装2.0.5
  1. 下载解压
  2. 修改配置文件
    • system-config.properties
# 配置集群信息
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181
# 配置web服务端口
kafka.eagle.webui.port=8048
# 关闭sqlite
######################################
# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org
# 设置连接的mysql服务器
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node02:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  1. 配置环境变量
export KE_HOME=/opt/ke-2.0.5
export PATH=$KE_HOME/bin:$PATH
  1. 启动
ke.sh start

5.3 使用JavaAPI

  • 修改topic的配置
kafka-configs.sh \
--zookeeper node01:2181 \
--entity-type topics \
--entity-name t_file_helper01 \
--alter --add-config max.message.bytes=3000000