AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Kafka

  • kafka集群需要提前启动zookeeper集群

一、zookeeper

1.1是什么

  • Hadoop生态中的一个高性能分布式协调服务
  • 常用场景:
    1. 集群高可用模式主备切换
    2. 配置文件管理
    3. 分布式锁

1.2 数据模型

file

1.3 集群搭建

  • 关于版本:
    因为目前主流的Hadoop生态和Spark生态包含的zk版本支持都是3.4.x,所以我们选择3.4.x中最新的版本
    zk自身最新版本为3.7.0
  1. 上传解压改名

  2. 修改配置文件

    • zoo_sample.cfg重命名为zoo.cfg
    • 修改zoo.cfg
# 修改zk保存数据的目录,可以直接放在zk的安装目录下
dataDir=/opt/zookeeper-3.4.14/data
#配置zk集群成员
# server.A=B:C:D
# A zk服务的id,是一个唯一整数值
# B id对应的服务器主机名或者IP
# C 集群之间选举投票使用的通信端口 通常使用2888
# D 集群之间同步数据使用的通信端口 通常使用3888
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
  1. 分发安装包

    • 分发安装包之后需要为每个节点添加自己的myid文件
    • zoo.cfg配置的dataDir目录中创建文件名为myid的文件,并在文件中填入当前机器对应的id
  2. 配置环境变量

echo 'export ZK_HOME=/opt/zookeeper-3.4.14/'  >> /etc/profile
echo 'PATH=.:$ZK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动
# zk服务启动命令
zkServer.sh start
# zk查看服务状态
zkServer.sh status
# zk服务关闭
zkServer.sh stop

1.4 zk选举机制

  • 顺序 1-2-3 leader 2

  • 顺序 3-2-1 leader 3

  • 半数以上取id最大的称为leader,其他机器作为follower

1.5 命令行客户端操作

  • 连接服务端

    # 使用zkCli.sh 连接zk服务器
    # 如果不加任何参数默认连接localhost
    zkClit.sh [-server host:port]
  • 常用的客户端命令

    # 查看路径中的子目录
    ls path
    # 创建znode
    create [-s] [-e] path data
    # 创建永久无序号znode
    create /a "this is /a"
    # 查看znode中的数据
    get path
    # 重新设置znode中的数据
    set path data
    # 删除znode
    rmr path

二、kafka概述

2.1 是什么

  • 2013: 高通量分布式消息系统
  • 2017: 分布式流平台
  • 2021: 分布式事件流平台

2.2 特点

  • 解耦
  • 冗余
  • 扩展性
  • 峰值处理能力(缓冲)
  • 单分区内部的顺序性
  • 异步(高并发)读写,提高通量

2.3 集群架构原理和核心名词

file

    1. broker 代理人
      是kafka服务启动之后的主进程
    2. broker启动后,会先读取zk中其他节点的在线状态,然后与集群中其他broker通信组成集群
    1. topic 主题
      kafka集群中可以包含多个主题,生产和消费数据都需要指定主题,每个主题包含多个分区和多个副本
    1. partition 分区
      topic中设置的分区个数,对应集群内文件夹的个数,这个分区文件夹的命名方式为topic名字-分区编号 分区编号从0开始编号
      分区文件夹内使用文件保存生产者发送到集群的数据
    1. replication 副本
      kafka允许在创建topic时为topic的分区设置副本数,副本数指的是每个分区在kafka保存的数量
      同一个topic的相同编号的分区的多个副本会选举出一个副本作为leader,其他副本作为follower
      leader负责处理客户端请求:接收生产者数据,处理消费者的读取请求
      follower负责实时同步leader的数据
    1. offset 偏移量
      每个分区中的数据,都会根据写入的顺序获得一个分区内唯一的单调递增的offset
      offset是分区维护的,每个分区内,offset从0开始单调递增,所以可以根据offset的值判断分区内的数据的写入顺序,但是分区之间无法确定顺序
    1. producer 生产者
      生产者是kafka的一类客户端,专本负责将数据发送给kafka集群
      一个生产者可以同时给多个topic发送数据
    1. consumer 消费者
      消费者是kafka的一类客户端,专门负责从kafka中读取数据
      每个消费会在读取消息时记录自己的消费进度(读取到哪个topic的哪个分区的哪个offset)
    1. consumer group 消费者组
      每个消费者在创建时,都会指定一个group.id,消费进度会记录在对应的组中,下次启动消费者时只需要指定相同的组ID就可以根据上次的进度继续读取
      相同组的不同consumer可以共享一份offset进度

2.4 集群搭建

  • 版本:
    生产环境主流: 0.8、0.10、0.11
    建议使用版本: 1.1.x 2.1.x
  1. 上传解压改名字

  2. 修改配置文件

# broker.id 为每一个broker设置一个唯一整数值
broker.id=101
# 设置kafka服务的主机名和端口
listeners=PLAINTEXT://node01:9092
# 设置用来保存topic数据的日志目录
# 注意这个日志不同于运行日志,是专门用来存储生产者发送的消息的日志文件
log.dirs=/opt/kafka-2.8.0/kafka-logs
# 设置kafka使用的zk集群的连接信息
zookeeper.connect=node01:2181,node02:2181,node03:2181
  1. 分发安装包

    • 分发安装包之后,为其他机器修改broker.id,listeners

    • 替换使用的脚本

      HOST=`hostname`
      IP=`cat /etc/hosts | grep $HOST | awk '{print $1}'`
      ID=`echo $IP | awk '{print substr($IP,12)}'`
      sed -i s/'broker.id=101'/"broker.id="$ID/g /opt/kafka-2.8.0/config/server.properties
  2. 配置环境变量

echo 'export KFK_HOME=/opt/kafka-2.8.0' >> /etc/profile
echo 'export PATH=.:$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动
# -daemon 指的是启动守护进程,也就是说后台运行kafka服务
kafka-server-start.sh -deamon /opt/kafka-2.8.0/config/server.properties

三、kafka操作

3.1 命令行操作

3.1.1 topic相关

  1. 创建topic
kafka-topics.sh \
--bootstrap-server node02:9092 \
--create \
--topic topic01 \
--partitions 3 \
--replication-factor 2
  1. 查看topic详情
kafka-topics.sh \
--bootstrap-server node02:9092 \
--describe \
--topic topic01
  1. 查看topic列表
kafka-topics.sh \
--bootstrap-server node02:9092 \
--list

3.1.2 消费者操作

  1. 启动控制台消费者
kafka-console-consumer.sh \
--bootstrap-server node03:9092 \
--topic topic01  \
--from-beginning

3.1.3 生产者操作

  1. 启动控制台生产者
kafka-console-producer.sh  \
--bootstrap-server  node01:9092  \
--topic topic01
  1. 生产者性能测试
kafka-producer-perf-test.sh \
--topic topic01 \
--num-records 10000000 \
--throughput -1 \
--producer.config /opt/kafka-2.8.0/config/producer.properties \
--payload-file access.log-20210221
kafka-producer-perf-test.sh \
--topic topic02 \
--num-records 10000000 \
--throughput -1 \
--producer-props bootstrap.servers=node01:9092,node02:9092,node03:9092 \
--payload-file access.log-20210221

3.2 kafka可视化

  • kafka官方没有提供可视化的工具,broker也没有web服务,所以如果想通过图形化界面直接进行kafka集群的管理,可以使用第三方的kafka可视化组件比如kafka-eagle

  • kafka-eagle官网
    https://www.kafka-eagle.org/

  • 版本选择,最新版本2.0.5

  • 安装:

    1. 解压
    2. 修改配置文件
# 配置集群别名
kafka.eagle.zk.cluster.alias=amos1
# 配置集群连接的zk主机和端口
amos1.zk.list=node01:2181,node02:2181,node03:2181
# 将默认的sqlite的存储方式注释掉
# 设置用来保存kafka-eagle数据的mysql服务器信息
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node01:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  1. 配置环境变量
export KE_HOME=/opt/ke-2.0.5
export PATH=.:$KE_HOME/bin:$PATH
  1. 启动
ke.sh start

3.3 JavaAPI

  1. 导入maven依赖