AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Flume

Flume

一 是什么

  • Flume是hadoop生态中提供的一个全自动的海量日志收集、聚合和移动工具。

  • Flume可以支持多种数据来源和存储目标

二 核心名词

  • flume-ng
    flume当前使用的版本为新版的架构
    通常称为flume-next-generation
    以前的flume称之为og

  • agent
    是flume-ng启动后的Java进程

  • source

    • source是agent中的一个模块
    • 负责完成数据的读取
    • source根据类型不同可以从文件、数据库、网络连接等来源中读取数据并发送给channel
  • channel

    • 负责接收source发送的数据,并按照顺序缓存
    • 等待sink读取数据
  • sink

    • sink读取channel中缓存的数据
    • 根据sink的类型不同将数据发送到不同的数据库或者文件系统

三 flume常用架构

1. 单Agnet架构

file

2. Agent串联

file

3. Agent聚合架构

file

4. 复杂结构Agent

file

四 flume使用

4.1 flume安装

  • 服务:启动后服务端监听某些端口等待客户端连接
  • 框架:半成品软件,需要开发者编写自己的业务逻辑填充成可执行的完整软件
  • 工具:需要完成某些操作时,可以直接启动或者使用工具,用完就可以关闭或者删除
  1. 下载安装包

  2. 解压修改配置文件

    • flume-env.sh
      export JAVA_HOME=/opt/jdk1.8
  3. 配置环境变量

echo 'export FLUME_HOME=/opt/flume-1.9.0' >> /etc/profile
echo 'export PATH=$FLUME_HOME/bin:$PATH' >> /etc/profile
source /etc/profile

4.2 flume实战

4.2.1 第一个flume案例

  • 编写agent配置文件
# ak47是agent的名字,用户可以自定义
# 给当前agent中的source命名 r1
ak47.sources = r1
# 给当前agent中的sink命名 k1
ak47.sinks = k1
# 给当前agent中的channel命名 c1
ak47.channels = c1

# 配置source
# netcat创建一个socket连接
# 负责接收某个机器的某个端口的数据
ak47.sources.r1.type = netcat
ak47.sources.r1.bind = localhost
ak47.sources.r1.port = 44444

# 配置Sink
# logger用来将接收到的数据打印到控制台
ak47.sinks.k1.type = logger

# 配置channel
# 设置channel使用内存作为缓存
ak47.channels.c1.type = memory
# 容量设置为1000条数据
ak47.channels.c1.capacity = 1000
# 设置单次通信容量100条
ak47.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
ak47.sources.r1.channels = c1
ak47.sinks.k1.channel = c1
  • 启动agent
# flume-ng启动agent
flume-ng agent \
# 设置flume的配置文件路径
--conf /opt/flume-1.9.0/conf \
# 设置当前启动的agent的配置文件路径
--conf-file /opt/flume-1.9.0/agents/ak47.conf \
# 设置当前启动的agent的名称
--name ak47 \
# 设置日志输出级别和方式
-Dflume.root.logger=INFO,console
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/agents/ak47.conf \
-n ak47 \
-Dflume.root.logger=INFO,console

4.2.2 使用exec读取文件

# exec2logger是agent的名字,用户可以自定义
# 给当前agent中的source命名 r1
exec2logger.sources = r1
# 给当前agent中的sink命名 k1
exec2logger.sinks = k1
# 给当前agent中的channel命名 c1
exec2logger.channels = c1

# 配置source
exec2logger.sources.r1.type = exec
exec2logger.sources.r1.command = tail -F /root/jimi.xx

# 配置Sink
# logger用来将接收到的数据打印到控制台
exec2logger.sinks.k1.type = logger

# 配置channel
# 设置channel使用内存作为缓存
exec2logger.channels.c1.type = memory
# 容量设置为1000条数据
exec2logger.channels.c1.capacity = 1000
# 设置单次通信容量100条
exec2logger.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
exec2logger.sources.r1.channels = c1
exec2logger.sinks.k1.channel = c1
  • 启动命令
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/agents/exec2logger.conf \
-n exec2logger \
-Dflume.root.logger=INFO,console

4.2.3 使用exec读取文件收集到HDFS

# exec2hdfs是agent的名字,用户可以自定义
# 给当前agent中的source命名 r1
exec2hdfs.sources = r1
# 给当前agent中的sink命名 k1
exec2hdfs.sinks = k1
# 给当前agent中的channel命名 c1
exec2hdfs.channels = c1

# 配置source
exec2hdfs.sources.r1.type = exec
exec2hdfs.sources.r1.command = tail -F /root/jimi.xx

# 配置Sink

exec2hdfs.sinks.k1.type = hdfs
exec2hdfs.sinks.k1.hdfs.path = hdfs://node01:9000/flume/data
# 将文件类型设置为DataStream
exec2hdfs.sinks.k1.hdfs.fileType = DataStream
# 设置文件类型使用文本作为格式化
exec2hdfs.sinks.k1.hdfs.writeFormat = Text
# hdfs sink提供了三个维度的文件滚动条件
# 按照时间生成新文件
exec2hdfs.sinks.k1.hdfs.rollInterval = 120
# 按照文件大小
exec2hdfs.sinks.k1.hdfs.rollSize = 134000000
# 按照数据条数
exec2hdfs.sinks.k1.hdfs.rollCount = 0

# 配置channel
# 设置channel使用内存作为缓存
exec2hdfs.channels.c1.type = memory
# 容量设置为1000条数据
exec2hdfs.channels.c1.capacity = 1000
# 设置单次通信容量100条
exec2hdfs.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
exec2hdfs.sources.r1.channels = c1
exec2hdfs.sinks.k1.channel = c1
  • 启动命令
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/agents/exec2hdfs.conf \
-n exec2hdfs \
-Dflume.root.logger=INFO,console
使用taildir优化文本采集
source 优点 缺点
exec 性能高,直接使用shell命令作为数据来源 只能监控单个文件,不支持断点续传
spooldir 可以监控整个目录中所有文件,支持断点续传 不支持读取进度的记录,只能保证数据不丢 不能保证数据不重复
taildir 可以监控多个文件和目录,而且支持正则,支持读取进度的记录

4.2.4 聚合多个日志服务器的数据到HDFS

file

  • taildir => avro
# taildir2avro是agent的名字,用户可以自定义
# 给当前agent中的source命名 r1
taildir2avro.sources = r1
# 给当前agent中的sink命名 k1
taildir2avro.sinks = k1
# 给当前agent中的channel命名 c1
taildir2avro.channels = c1

# 配置source
taildir2avro.sources.r1.type = TAILDIR
taildir2avro.sources.r1.filegroups = g1 g2
taildir2avro.sources.r1.filegroups.g1 = /root/888.txt
taildir2avro.sources.r1.filegroups.g2 = /root/logs/.*log

# 配置Sink

taildir2avro.sinks.k1.type = avro
taildir2avro.sinks.k1.hostname = node02
taildir2avro.sinks.k1.port = 55555

# 配置channel
# 设置channel使用内存作为缓存
taildir2avro.channels.c1.type = memory
# 容量设置为1000条数据
taildir2avro.channels.c1.capacity = 100000
# 设置单次通信容量100条
taildir2avro.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
taildir2avro.sources.r1.channels = c1
taildir2avro.sinks.k1.channel = c1
  • avro => hdfs
# avro2hdfs是agent的名字,用户可以自定义
# 给当前agent中的source命名 r1
avro2hdfs.sources = r1
# 给当前agent中的sink命名 k1
avro2hdfs.sinks = k1
# 给当前agent中的channel命名 c1
avro2hdfs.channels = c1

# 配置source
avro2hdfs.sources.r1.type = avro
avro2hdfs.sources.r1.bind = node02
avro2hdfs.sources.r1.port = 55555

# 配置Sink

avro2hdfs.sinks.k1.type = hdfs
avro2hdfs.sinks.k1.hdfs.path = hdfs://node01:9000/flume/data
# 将文件类型设置为DataStream
avro2hdfs.sinks.k1.hdfs.fileType = DataStream
# # 设置文件类型使用文本作为格式化
avro2hdfs.sinks.k1.hdfs.writeFormat = Text
# hdfs sink提供了三个维度的文件滚动条件
# # 按照时间生成新文件
avro2hdfs.sinks.k1.hdfs.rollInterval = 0
# # 按照文件大小
avro2hdfs.sinks.k1.hdfs.rollSize = 133000000
# # 按照数据条数
 avro2hdfs.sinks.k1.hdfs.rollCount = 0

# 配置channel
# 设置channel使用内存作为缓存
avro2hdfs.channels.c1.type = memory
# 容量设置为1000条数据
avro2hdfs.channels.c1.capacity = 100000
# 设置单次通信容量100条
avro2hdfs.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
avro2hdfs.sources.r1.channels = c1
avro2hdfs.sinks.k1.channel = c1

4.2.5 使用单AGENT发送到多个数据目的地