AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

day17_Flume

Flume

1.是什么?

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。它具有基于流数据流的简单灵活的体系结构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有强大的功能和容错能力。它使用一个简单的可扩展数据模型,允许在线分析应用程序。

  • 定位:

在大数据处理中,flume作为一个数据采集工具
flume自身可以支持高可用的架构

2.模型

我们现在使用flume叫做flume-ng

flume-ng 启动起来的每一个实例(java进程),我们把它叫做agent
agent中包含三种组件
source
channel
sink

2.1 source

agent的输入

  • Avro Source

  • Exec Source

  • Spooling Directory Source

  • Taildir Source

  • Kafka Source

  • NetCat TCP Source

2.2 channel

agent的缓存
缓存是为了对source和sink进行解耦

  • Memory Channel

  • File Channel

2.3 sink

agent的输出

  • HDFS Sink

  • Hive Sink

  • Logger Sink

  • Avro Sink

  • HBaseSink

  • Kafka Sink

2.4 event

agent处理数据的单位
source会将读取到的数据封装成一个一个的event写入到channel中
sink会从channel中读取一个一个的event从中获取数据,发送给目标服务

3.案例

3.1从socket读取数据,打印到控制台

  1. 选择组件类型
    NetCat TCP Source
    Memory Channel
    Logger Sink

  2. 编写配置文件

    # 一个单节点flume的简单案例
    # 给当前的agent中所有的组件起名字
    # 'a1'是当前agent实例的名字
    # 'r1'是当前agent中的source的名字
    a1.sources = r1
    # 'k1'是当前agent中的sink的名字
    a1.sinks = k1
    # 'c1'是当前agent中的channel的名字
    a1.channels = c1
    # 配置source的属性
    # 设置source类型为netcat(netcat用来监听一个端口,从中读取数据)
    a1.sources.r1.type = netcat
    # 启动监听的主机名或者IP
    a1.sources.r1.bind = test01
    # 启动监听的端口
    a1.sources.r1.port = 44444
    # 配置sink的属性
    a1.sinks.k1.type = logger
    # 配置channel的属性
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # 将source、sink连接到channel上
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 启动flume-ng实例

bin/flume-ng agent \
-n agent的名字 \
-c flume安装目录下的conf目录 \
-f 我们编写的配置文件
-Dflume.root.logger=INFO,console

例如上面的:

flume-ng agent \
-n a1 \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/job_conf/netcat_logger.conf \
-Dflume.root.logger=INFO,console

3.2从日志文件读取数据,发送到kafka

  1. 选择组件类型
    Exec Source
    Spooling Directory Source
    Taildir Source

Memory Channel

Kafka Sink

  1. 编写配置文件

    # 一个单节点flume的简单案例
    # 给当前的agent中所有的组件起名字
    # 'a2'是当前agent实例的名字
    # 'r1'是当前agent中的source的名字
    a2.sources = r1
    # 'k1'是当前agent中的sink的名字
    a2.sinks = k1
    # 'c1'是当前agent中的channel的名字
    a2.channels = c1
    # 配置source的属性
    a2.sources.r1.type = exec
    a2.sources.r1.command = tail -F /root/log.txt
    # 配置sink的属性
    a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a2.sinks.k1.kafka.topic = topic03
    a2.sinks.k1.kafka.bootstrap.servers = test01:9092,test02:9092,test03:9092
    # 配置channel的属性
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # 将source、sink连接到channel上
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1
  2. 启动flume-ng实例

bin/flume-ng agent \
-n agent的名字 \
-c flume安装目录下的conf目录 \
-f 我们编写的配置文件
-Dflume.root.logger=INFO,console

例如上面的:

flume-ng agent \
-n a2 \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/job_conf/file_kafka.conf \
-Dflume.root.logger=INFO,console

You must be logged in to post a comment.