AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Kafka

是什么

  • 分布式高吞吐消息系统(消息队列)

  • 分布式流平台

  • 同步:多个线程需要访问同一个资源时,为了避免出现脏读,创建公共锁资源,当某个线程占用锁资源时,则可以进行操作,操作完成后释放锁资源,其他线程获取锁资源,完成相应操作

  • 异步:多个线程,每个线程单独操作自己保有的资源,线程之间互不干扰

  • 消息队列

    1. 解耦:多个客户端或者应用程序进行通信时,为了避免同步带来的性能影响,会将数据拆分为多个部分,并将所有读写操作交由中间件代理,A与B直接交互,引入中间件,可以改造为A<->中间件<->B

    2. 削峰填谷:由于应用程序的用户访问会出现周期性的峰值变化,日志生成端和采集端会根据数据产生速度发生热点峰值写入问题,如果此时采集端直连存储端,可能导致数据积压最终引起雪崩。中间件可以以极高的吞吐量,接收数据采集端的峰值流量,并以存储端适合的带宽将数据平稳的收集到存储平台

    3. 冗余:kafka提供了类似HDFS的副本机制,与HDFS不同的是,kafka的文件副本数最多不能超过可用服务器数量

    4. 高吞吐:零拷贝、异步读写、顺序读写(log,index,timeindex)

集群架构原理和核心名词解释

1. broker 代理人

是kafka服务启动后的实例,是一个JVM进程

2. producer 生产者

kafka的一类客户端,用于将数据封装为ProducerRecord对象,将ProducerRecord发送到Kafka集群

3. topic 主题

主题用于区分kafka种不同的数据,
在生产者发送数据前, 会先在kafka中创建topic
创建topic需要指定主题包含的分区个数和副本个数

4. partition 分区

分区是存储在kafka服务器上的一个文件夹
文件夹的命名方式为 主题名-分区编号,分区编号从0开始编号
生产者发送到kafka的数据会以文件的形式存储在对应主题的分区文件夹中

5. replication-factor 副本因数(副本个数)

kafka中一般使用2个或2个以上副本存储数据保证数据安全
kafka允许的副本数 最大不能超过可用broker数量
相同主题相同分区的多个副本之间,会选举产生1个leader和若干个follower
leader负责接收客户端(生产者/消费者)请求,进行数据的网络IO操作
follower负责实时同步leader

6. offset 偏移量(数据编号)

为了保证kafka中分区内部的数据可以有序存储,kafka为每条数据添加了一个offset
offset从0开始编号,单调递增
offset可以用来记录消费者的读取进度

7. consumer 消费者

是kafka的一类客户端,负责订阅kafka的某些topic,
将读取到的ConsumerRecord对象解析为数据,进行数据处理
每个消费者需要指定一个字符串作为group.id,这个id用于维护消费进度列表

8. consumer group

如果多个消费者实例使用相同的group.id,则他们称之为同组消费者,
同组消费者会共用一份offset记录
kafka主题中分区的个数会影响消费者组的并发度
对于同组消费者,在组内,默认每个分区同一时间只有一个实例可以读取数据
如果分区数量小于组内消费者的并发度,此时组内会有闲置的消费者线程
如果分区数量大于组内消费者的并发度,此时如果topic的数据持续写入,则可能导致数据呆滞

集群搭建

  1. 下载解压

    • 生产环境: 0.8 0.10 0.11,1.x ,2.x
    • 最新版本: 3.x
    • 学习版本: 2.8.1 scala 2.12
  2. 修改配置文件

    • server.properties
# 为每个kafka服务实例设置集群内唯一的整数值作为id
broker.id=131
# 设置kafka broker服务的主机名和端口
listeners=PLAINTEXT://bd1301:9092
# 这里的logFile不是程序运行过程中 "运行日>志"
# 而是一种特殊的文件格式,用于存储生产者发,送到kafka集群的数据
# kafka的运行日志会保存在kafka安装目录的logs目录中
# 而存储数据的logFile需要设置单独的存储路>径
log.dirs=/opt/kafka-2.8.1/kafka-logs
# 设置topic的默认分区数
num.partitions=1
# 设置topic的默认副本数
offsets.topic.replication.factor=1
# 设置kafka中每条消息的默认超时时间
# 生产者封装producerRecord时可以手动设置消息写入的时间戳,如果不设置,时间戳默认为系统当前时间
# 如果需要手动调整某些数据的超时间,可以根据需要在封装producerRecord时添加时间戳
# 生产环境根据需要可以将超时时间调整到2~3>天
log.retention.hours=168
# kafka集群需要连接的zk集群地址
zookeeper.connect=bd1301:2181,bd1302:2181,bd1303:2181
  1. 分发安装包
    • 为每个broker单独修改
broker.id=integer
listeners=PLAINTEXT://host:9092
  1. 添加环境变量
echo 'export KFK_HOME=/opt/kafka-2.8.1' >> /etc/profile
echo 'export PATH=$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动集群
# kafka服务启动命令
/opt/kafka-2.8.1/bin/kafka-server-start.sh \
# 可选的  加上-daemon ,将kafka启动为后台运行
[-daemon] \
# 设置当前启动需要使用的配置文件
server.properties \
# 可以通过override覆盖server.properties,用于临时测试
[--override property=value]*

# 通常测试时使用前台启动并通过override添加临时配置项
/opt/kafka-2.8.1/bin/kafka-server-start.sh server.properties --override num.network.threads=10
# 长期运行的服务使用后台运行的方式
/opt/kafka-2.8.1/bin/kafka-server-start.sh  -daemon /opt/kafka-2.8.1/config/server.properties

kafka操作

命令行

  • topic相关
# 查看kafka集群中的主题列表
# --bootstrap-server用于设置当前需要访问的kafka集群地址,通常会直接添加所有节点信息
# --list展示集群内可用topic列表
kafka-topics.sh \
--bootstrap-server bd1301:9092,bd1302:9092,bd1303:9092 \
--list
# 创建topic
# --topicy用于设置操作的主题名称
# --partitions指定分区个数
# --replication-factor指定副本个数
kafka-topics.sh \
--bootstrap-server bd1301:9092,bd1302:9092,bd1303:9092 \
--create \
--topic topic01 \
--partitions 3 \
--replication-factor 2
  • consumer相关
# 控制台消费者
kafka-console-consumer.sh
# 测试消费
kafka-console-consumer.sh \
--bootstrap-server bd1302:9092 \
--topic topic01 \
--from-beginning \
--partition 2
# 消费者性能测试
kafka-consumer-perf-test.sh
  • producer相关
# 控制台生产者
kafka-console-producer.sh
# 测试生产数据
kafka-console-producer.sh \
--bootstrap-server bd1303:9092 \
--topic topic01
# 生产者性能测试
kafka-producer-perf-test.sh

可视化

  1. 下载解压
  2. 修改配置文件

    • system-config.properties
# 设置需要让ke管理的kafka的zk集群
efak.zk.cluster.alias=cluster1
cluster1.zk.list=bd1301:2181,bd1302:2181,bd1303:2181
# 设置ke保存数据使用的数据库
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://bd1301:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
  1. 配置环境变量
echo 'export KE_HOME=/opt/efak-web-2.1.0' >> /etc/profile
echo 'export PATH=$KE_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动
ke.sh start

JavaAPI

  1. maven
    kafka-clients

  2. 生产者
    KafkaProducer
    send

  3. 消费者
    KafkaConsumer
    poll

面试题

  • kafka能不能保证数据的全局有序?

  • 有没有保证全局有序的解决方案呢?

  • 你生产环境kafka的分区是如何设置的?

生产环境中通常需要同时利用kafka的多分区异步并发的性能,以及单分区内部全局有序的特点

预先对需要收到kafka的数据按照某个Key进行散列,
比如使用uid作为散列Key,将uid.hashCode() % 分区个数,作为当前数据希望写入的分区编号
就可以保证,每个uid的数据不会被分散到多个分区中,并且在分区内部可以保证某个uid的数据可以根据写入的顺序做到全局有序