AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

流式计算_day01

分布式计算的分类

1. 离线计算(批处理)

batch

  • 常见的组件:MapReduce、Hive、SparkCore、SparkSQL
  • 数据的产生:计算之前数据提前产生
  • 数据的运算:数据以文件作为单位。输入输出与计算的逻辑相关,逻辑不变、输入不变、输出也不变
  • 计算的延迟:一般计算延迟较高,因为在分布式场景中大多数情况处理的是海量数据
  • 常见的运行方式:定时每天凌晨2点执行,处理前一天的数据(9:10处理8:00-9:00的数据)

2. 实时计算(流处理)

stream

  • 常见的组件:kafka、JStorm、SparkStreaming、Flink、Beam
  • 数据的产生:文件或者数据,随着计算的进行,不断产生
  • 数据的运算:数据以条作为单位、每条数据单独处理、单独产生运算结果。
  • 计算的延迟:一般是秒级延迟(0-60s)
  • 常见的运行方式:一旦开始,永远执行

kafka

一 是什么

  • 早期:message system 消息系统

  • 现在:streaming platform 流平台

  • 消息系统(消息队列)
    队列Queue

二 核心名词

  1. broker 经纪人(代理人)
    kafka服务的java进程实例
    broker之间没有主从关系,他们是平等的节点关系
    broker需要依靠zookeeper集群进行通信

  2. topic 主题
    对于kafka来说 topic用来对消息进行归类
    生产者写入数据和消费者读取数据时,都需要指定topic

  3. partition 分区
    topic是一个逻辑概念,他对于kafka集群来说是全局的
    topic真正存消息时,是将消息写到特定目录的特定文件中
    特定目录就是 分区
    特定文件就是 分区这个目录下的文件
    分区的命名方式(文件夹)的命名方式 主题名-分区编号 topic-1

  4. offset偏移量
    在kafka的每个分区中,使用offset来对消息进行编号,
    每个分区内部,offset是单调递增且唯一的
    因为offset由分区来维护,所以不同分区可能会有相同的offset

  5. replication 副本
    某个分区的数据在集群中存多少份
    每个分区都可以设置多个副本
    注意! 副本的数量不能超过broker数量

同一个主题的同一个分区的不同副本之间,会选举出来一个leader,
其他的副本作为follower
客户端进行topic的读写时是与leader进行通信
follower负责同步leader的数据

  1. producer 生产者
    负责往kafka的topic中写入数据
    同一个topic可以有多个生产者
    同一个生产者也可以同时写入多个topic

  2. consumer 消费者
    负责从kafka的topic中读取数据
    consumer会记录自己读取的offset,下次启动时可以从上次的位置继续读取数据
    也可以自定义offset从特定位置开始消费数据

  3. cousumer group 消费者组
    同组的消费者,会共享offset记录

三 集群搭建

3.1 版本

  • 生产环境版本
    0.11.x、0.10.x、1.1.x

  • 最新版本
    2.6.x

3.2 集群搭建

kafka集群需要依托zookeeper进行集群管理
所以需要先安装并启动zk集群

  1. 下载上传解压改名

  2. 修改配置文件

  • server.properties
    # broker的编号、每台机器不一样、唯一整数
    # 通常从0开始编号
    broker.id=0
    # 配置broker服务监听的主机名和端口、每台机器不一行
    listeners=PLAINTEXT://bd0201:9092
    # kafka中日志分两类
    # 第一类(运行日志):平常说的日志,程序运行过程中,产生的运行记录,运行记录与程序的逻辑无关
    # 第二类(消息日志):kafka中缓存(持久化)的所有topic的数据,在kafka也叫做log,这一类数据如果删除,就会导致topic的数据丢失
    # 默认在tmp下,数据不安全,我们会保存到相对安全的目录
    # 练习时为了方便,放在kafka的安装目录下(生产环境会放到操作系统的较大分区中)
    log.dirs=/opt/kafka-1.1.1/logs
    # 创建topic时默认的分区数量
    # 当生产者生产数据时发现topic不存在,会自动按照默认配置创建topic
    # 通常生产环境中,会手动指定分区数量,并提前创建并测试topic
    num.partitions=1
    # kafka中的数据保存的时间,单位是小时
    # 默认7天  一般改成一两天
    log.retention.hours=168
    # kafka集群连接的zk集群信息
    zookeeper.connect=bd0201:2181,bd0202:2181,bd0203:2181
  1. 分发安装包-修改个性化配置

每台机器修改
broker.id
listeners

  1. 配置环境变量

    echo 'export KAFKA_HOME=/opt/kafka-1.1.1' >> /etc/profile
    echo 'export PATH=.:$KAFKA_HOME/bin:$PATH' >> /etc/profile
    source /etc/profile
  2. 启动

    # 先确保zk正常启动
    # 前台启动
    kafka-server-start.sh /opt/kafka-1.1.1/config/server.properties
    # 后台启动kafka
    kafka-server-start.sh -daemon /opt/kafka-1.1.1/config/server.properties
  3. 测试

    # 1.查看topic
    kafka-topics.sh --zookeeper bd0201:2181 --list
    # 2.创建topic
    kafka-topics.sh --zookeeper bd0201:2181 --create --topic topic1 --partitions 3 --replication-factor 2
    # 3.查看topic的详情
    kafka-topics.sh --zookeeper bd0201:2181 --topic topic1 --describe
    # 4.测试消费者
    kafka-console-consumer.sh --bootstrap-server bd0201:9092 --topic topic1
    # 5.测试生产者
    kafka-console-producer.sh --broker-list bd0201:9092 --topic topic1

3.3 kafka的JavaAPI

  1. 创建maven项目,引入依赖

    <!-- 含有新api和老api -->
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    </dependency>
    <!-- 只有新api,平时用这个就可以了 -->
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
    </dependency>
  2. java的生产者

  3. java的消费者

3.4 kafka和flume集成