AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Flume

Flume

1. 是什么

hadoop生态的分布式 日志采集工具
可以完成自动化的海量日志收集 聚合 和移动

2. Flume核心概念

file

agent 代理

agent是flume启动后的JVM进程
使用flume进行采集时 先编写agent的配置文件
之后启动agent开启自动采集

source 源

source是agent中的一个组件
负责从数据源端采集数据,并将数据放入channel

channel 频道

channel通过队列将source发过来的数据进行缓存
等待sink读取数据

sink 下沉

sink读取channel数据,将数据发送到目的地

3. Flume常用架构

file

file

file

file

4.Flume 安装和使用

4.1安装

  1. 下载解压
  2. 修改配置文件

    • flume-env.sh
export JAVA_HOME=/opt/jdk1.8
  1. 添加环境变量
echo 'export FLUME_HOME=/opt/flume-1.9.0' >> /etc/profile
echo 'export PATH=$FLUME_HOME/bin:$PATH' >> /etc/profile
source /etc/profile

4.2 配置和使用

4.2.1 从netcat采集数据发送到控制台

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 配置agent中所有组件的名称
# a1是agent的名字
# r1是source的名字
# k1是sink的名字
# c1是channel的名字
# 如果有多个组件 可以空格拼接多个名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
# 设置source类型
a1.sources.r1.type = netcat
# 设置netcat连接的主机名
a1.sources.r1.bind = host01
a1.sources.r1.port = 12345

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 设置source和sink连接到哪些channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/agents/netcat2logger.conf \
-n a1 \
-Dflume.root.logger=INFO,console

4.2.1 从netcat采集数据发送到HDFS

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 配置agent中所有组件的名称
# a1是agent的名字
# r1是source的名字
# k1是sink的名字
# c1是channel的名字
# 如果有多个组件 可以空格拼接多个名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
# 设置source类型
a1.sources.r1.type = netcat
# 设置netcat连接的主机名
a1.sources.r1.bind = host01
a1.sources.r1.port = 12345

# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://host01:8020/flume

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 设置source和sink连接到哪些channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent \
--conf /opt/flume-1.9.0/conf \
--conf-file /opt/flume-1.9.0/agents/netcat2logger.conf \
--name a1 \
-Dflume.root.logger=INFO,console

4.2.1 使用exec监控单文件变化采集数据发送到HDFS

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 配置agent中所有组件的名称
# a1是agent的名字
# r1是source的名字
# k1是sink的名字
# c1是channel的名字
# 如果有多个组件 可以空格拼接多个名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
# 设置source类型
a1.sources.r1.type = exec
# 设置netcat连接的主机名
a1.sources.r1.command = tail -F /root/3.txt

# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://host01:8020/flume
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 设置source和sink连接到哪些channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/agents/exec2hdfs.conf \
-n a1 \
-Dflume.root.logger=INFO,console

4.3 常用source channel sink

  • 使用flume实时监控日志文件的变化并收集

    • exec 用于监控单个文件的变化
    • spooling dir 用于监控文件夹 将移动到文件夹中的新文件全部读取
    • tailDir 用于监控文件或者文件夹 并记录读取进度实现断点续传
  • channel

    • memory 将数据缓存到内容中,容量受制于agent的堆内存大小
    • file 配置一个checkpoint目录用于将source采集的数据保存在磁盘上,agent重新启动后sink可以继续读取channel缓存的数据,避免数据丢失
  • sink

    • logger 用于测试 将数据打印到控制台
    • hdfs 将数据写入HDFS
# 类型
a1.sinks.k1.type = hdfs
# 存储路径
# 可以使用别名设置动态路径
a1.sinks.k1.hdfs.path = hdfs://host01:8020/flume/%Y-%m-%d-%H-%M/
# HDFSSink默认使用Hadoop中的SequenceFile存储数据
# 可以将其设置为DataStream用于开启普通字节流存储数据
a1.sinks.k1.hdfs.fileType = DataStream
# 设置文件的输出格式
# HDFS Sink默认使用Writable
# 采集日志时可以设置为Text用文本保存
a1.sinks.k1.hdfs.writeFormat = Text
# 使用本地时间作为时间戳标识
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 滚动生成新文件的周期 单位秒
a1.sinks.k1.hdfs.rollInterval = 60
# 滚动生成新文件的大小  单位字节
a1.sinks.k1.hdfs.rollSize = 134217728
# 滚动生成新文件的数据条数  设置为0 关闭以条数滚动文件的条件
a1.sinks.k1.hdfs.rollCount = 0