AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Kafka

Kafka

一、是什么?

  • Kafka分布式流平台(消息队列)

二、核心概念

  • Broker
    是kafka集群启动起来的一个JVM进程

  • topic
    在Kafka中可以创建多个topic
    每个生产者和消费者进行数据读写时,都需要指定topic

  • partition
    每个topic包含多个分区,分区会被均衡的创建在每一个broker上,分区本质上就是一个文件夹,分区中存储生产者发送的数据

  • replication-factor
    每个topic创建时会设置分区的副本个数
    副本个数指的是每个分区的数据在集群中保存的数量
    topic中的相同分区的不同副本之间,会选举产生leader和follower,leader接收客户端的请求,follower同步leader数据

  • offset
    每条消息在分区内部会有一个唯一的单调递增的编号称之为offset

  • producer
    指定topic 向Kafka集群中发送数据
    producer将数据发送到leader分区

  • consumer
    指定topic 从kafka中拉取数据
    消费者读取数据时会记录自己的读取进度
    topic-partition-offset
    下次启动时根据上次的记录继续读取数据

  • consumer group
    多个消费者线程如果设置为相同的group id,则他们可以共享一份offset记录

三、kafka集群搭建

3.1 搭建zookeeper

  1. 下载解压

    • 由于目前Hadoop生态和Spark生态主流使用的是zk的3.4.x版本,所以我们选择3.4.14版本而不使用最新的3.7.x版本
  2. 修改配置文件

    • 将zoo_sample.cfg复制并重命名为zoo.cfg
# 修改zk保存数据的目录
dataDir=/opt/zookeeper-3.4.14/data
# 集群配置
#server.A=B:C:D
#A:每个zk节点自己唯一的id
#B:每个zk节点自己的主机名
#C:集群之间数据同步使用的端口 默认2888
#D:集群之间选举使用的端口     默认3888
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
  1. 分发安装包

  2. 在dataDir中添加myid文件
    在每个zk节点的dataDir目录中创建一个文本文件,文件中填入当前机器对应的id号
    node01 :echo 1 > /opt/zookeeper-3.4.14/data/myid
    node02 :echo 2 > /opt/zookeeper-3.4.14/data/myid
    node03 :echo 3 > /opt/zookeeper-3.4.14/data/myid

  3. 配置环境变量

    echo 'export ZK_HOME=/opt/zookeeper-3.4.14' >> /etc/profile
    echo 'export PATH=.:$ZK_HOME/bin:$PATH' >> /etc/profile
    source /etc/profile
  4. zk启动

    • zk没有提供集群启动脚本,所以需要到每台zk节点上分别执行zkServer.sh start

3.2 zookeeper选举机制

  • zk集群中每个节点启动时,会互相投票选举出一个作为leader,其他节点作为follower
  • zk集群中leader负责调度所有节点进行数据同步
顺序 leader
1 2 3 2
3 2 1 3
2 3 1 3
  • 半数以上选id最大

  • 如果奇数个节点的zk集群id分别为1 2 3 4 5,那么选举机制如下:
    比如1 先启动,1会通过选举端口将票投给自己,2第二个启动,2号发现自己的id比1号大,所以将票投给自己,1号发现2号上线,也将票投给2号,3号第三个启动,由于3号id更大,3号2号1号都会将票投给3号,此时集群中已启动的节点数超过集群节点半数,3号当选为leader,其他节点作为follower

3.2 安装kafka

  1. 下载解压改名

    • 关于版本:
      业界主流版本: 0.10、0.11、1.1.x、2.1.x
    • 最新版本:2.8.0
    • 当前文档中我们使用2.4.1,因为2.4.1同时提供scala2.11和scala2.12的兼容
  2. 修改配置文件

  • server.properties
    # 为每个Borker设置一个唯一的整数,作为id号
    broker.id=1
    # 为每个Broker设置服务的主机名和端口
    listeners = PLAINTEXT://node01:9092
    # 设置 消息保存的文件目录
    # kafka中将生产者发送的消息,保存的文件称之为叫store log file
    # 不同于通常的日志,store log file仅仅用来存储kafka缓存的消息
    log.dirs=/opt/kafka-2.4.1/kafka-logs
    # 设置zk集群的连接
    # 因为zk集群有多个节点,所以这里添加所有节点的列表,真正连接时,只需要连接到其中一台就可以正常工作,如果连接不上,会在列表中使用下一个连接方式
    zookeeper.connect=node01:2181,node02:2181,node03:2181
  1. 分发安装包

  2. 修改个性化配置

    • 修改所有机器的
      broker.id

      listeners = PLAINTEXT://node01:9092
  3. 配置环境变量

echo 'export KFK_HOME=/opt/kafka-2.4.1' >> /etc/profile
echo 'export PATH=.:$KFK_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
  1. 启动
kafka-server-start.sh -daemon /opt/kafka-2.4.1/config/server.properties
  1. 测试

四、kafka命令操作

# 查看topic列表
kafka-topic.sh --zookeeper node01:2181 --list
# 创建topic
kafka-topics.sh \
--zookeeper node01:2181 \
--create \
--topic topic01 \
--partitions 3 \
--replication-factor 2

# 启动消费者测试消费
kafka-console-consumer.sh  \
--bootstrap-server node01:9092 \
--topic topic01 \
--group test02 \
--from-beginning
# 启动生产者测试生产
kafka-console-producer.sh \
--broker-list e03:9092 \
--topic topic01

4.2 kafka eagle

  • 为了方便kafka的使用
    我们可以安装第三方的kafka可视化工具kafka eagle

    1. 下载解压安装包
      http://download.kafka-eagle.org/

    2. 修改配置文件

      • system-config.properties
# 设置kafka集群连接的zk集群地址
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node2:2181,node03:2181
# 注释掉之前关于sqlite设置
# 将保存数据使用的数据库换成自己集群的mysql
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node01:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  1. ke要求配置环境变量
export KE_HOME=/opt/kafka-eagle-web-2.0.5
export PATH=.:$KE_HOME/bin:$PATH
  1. 启动ke

五、Api操作