AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Kafka

一、概述

  • 是什么?

2018之前 Kafka定位为 Messaging System(消息系统)
这两年重新定位为 Streaming platform(流平台)

本质上 是一个
分布式
消息
队列

  • 核心概念

    • kafka服务 broker
      kafka启动起来的一个JVM进程(Kafka核心代码使用scala语言编写,而外围客户端API,大部分已经使用java重新实现,为了提高java的兼容性)

    • 主题 topic
      在kafka中创建一个逻辑概念,约束客户端生产和消费时,具体访问哪些文件的数据

    • 分区 partition
      分区其实是一个Linux文件系统上的目录
      分区的命名 主题名-分区编号 例如: topic1-0 topic1-1
      目录中,存储topic中保存的数据

    • 偏移量(编号) offset
      生产者发送的数据,在topic中的partition中存储时,每一条消息都有一个全局唯一的offset编号
      offset对于分区来说是唯一的、而topic内不唯一

    • 副本 replication
      为了增加冗余性,每个topic可以为partition设置副本数,
      副本数不能超过可用的broker数量

    • 生产者 producer
      kafka的客户端,将数据以message为单位,写入到kafka的某个topic中

    • 消费者 consumer
      kafka的客户端,从kafka的某个topic中读取message

    • 消费者组 group
      每一个消费者都会有一个组ID 如果不设置,在命令行中程序会自动随机生成,在api中不设置组不能正常启动消费者
      同组(同group id)的消费者 共用一套offset记录

    • 分区的角色
      相同topic的相同分区,有多个副本时
      其中一个副本会被选举为leader作为客户端连接的节点
      其他副本作为follower,如果leader宕机,重新选举leader

二、集群搭建

  1. 安装zookeeper
    kafka使用zk进行配置的存储和集群状态管理
    所以必须安装并启动zk服务

  2. 下载解压

  3. 修改配置文件,分发安装包

    • server.properties
# 为每一个kafka进程 设置一个单独的id
broker.id=0
# 设置服务使用的主机名和端口
listeners=PLAINTEXT://bd0402:9092
# 日志的存储目录
# kafka中有两类日志
# 1. 运行日志
# 2. kafka保存的topic中的message也叫做日志  数据日志
log.dirs=/opt/kafka-2.4.1/logs
# 数据日志保存的生命周期
# 默认 7天
log.retention.hours=168
# 设置kafka连接的zookeeper集群的信息
zookeeper.connect=bd0401:2181,bd0402:2181,bd0403:2181
  1. 配置环境变量

    echo 'export KAFKA_HOME=/opt/kafka-2.4.1' >> /etc/profile
    echo 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profile
    source /etc/profile
  2. 启动测试

# 启动zk
zkServer.sh start
# 启动kafka
kafka-server-start.sh -daemon /opt/kafka-2.4.1/config/server.properties
# 查看topic
kafka-topics.sh --list --zookeeper bd0401:2181
# 创建topic
kafka-topics.sh --zookeeper bd0401:2181 --create  --topic topic1 --partitions 3 --replication-factor 2
# 测试生产
kafka-console-producer.sh --broker-list bd0403:9092 --topic topic1
# 消费
kafka-console-consumer.sh --bootstrap-server bd0401:9092 --topic topic1

需求案例

需求分析

使用kafka实现一个文件的断点续传

要求kafka 的topic不能只有一个分区

  • 发送端
    将一个大文件 1MB的大小切成多个小块,
    每个小块作为kafka的一条message

    • 读文件 输入流
      • 使用1M缓冲区 byte[]
      • 当剩余大小小于1M 以剩余大小创建缓冲区
    • kafka Producer
      send( ProducerRecord( topic, key(文件名 文件类型 文件块偏移量|编号 文件大小 ) , byte[] ))
  • 接收端
    读取kafka的数据,将这些小块拼接成原来的完整文件

    • kafka consumer
      byte[]

      1. 内存中创建一个集合用来对接收的文件块进行排序
        如果块不能按序到达 ,导致缓存数据量过大,内存溢出
      2. 随机访问流 RandomAccess
    • 输出流
      创建输出流时可以直接创建一个与文件大小相同的空文件
      文件名

遇到的问题和解决方案:
当我们使用1M的byte数组作为message的value时
实际message大小 是key+value的总大小,所以这个大小会超过1M
kafka默认的配置不兼容当下的message大小
所以做了如下配置修改

  1. 修改broker的配置并重启kafka集群
    server.properties

    socket.receive.buffer.bytes=2097152
    socket.send.buffer.bytes=2097152
    replica.socket.receive.buffer.bytes=2097152
    max.message.bytes=2097152
  2. 修改topic设置

    kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name topic2 --alter --add-config max.message.bytes=2097152
  3. 修改producer设置

    props.setProperty("send.buffer.bytes", 2097152 + "");
    props.setProperty("max.request.size", 2097152 + "");
  4. 修改consumer的配置

    props.setProperty("receive.buffer.bytes", 2097152 + "");