AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Flume

Flume

是什么

Flume是hadoop生态的日志收集、聚合和移动的自动化海量日志采集工具。

核心概念

  • Agent
    是Flume程序启动之后一个Jvm进程,同一台服务器上不能使用重名的Agent

  • Source
    Source负责将原始数据采集并封装为Event发送channel进行缓存

  • Channel
    负责接收Source发送过来的Event,并缓存在内存或者硬盘种等待Sink的读取

  • Sink
    负责从Channel中读取Event,并解析出原始数据发送到下游

  • Event
    Flume中内置的一个Java对象,设计Event的目的是为了使Channel不需要关注数据本身内容,只需要缓存Event对象,提高Channel的通用性。

Flume的常用架构

1. 单节点日志采集

file

2. 使用Agent串联的方式,采集多节点日志到HDFS

file

file

3. Agent也支持更复杂的组件配置

file

Flume的Agent配置和使用

1. 安装Flume

  1. 下载解压改名
  2. 修改配置文件
    • flume-env.sh
      先将flume-env.sh.template重命名或复制为flume-env.sh
export JAVA_HOME=/opt/jdk1.8
  1. 配置环境变量

    • 添加环境变量到/etc/profile
      export FLUME_HOME=/opt/flume-1.9.0
      export PATH=$FLUME_HOME/bin:$PATH

2. flume案例

2.1 从socket中接收数据打印到控制台

Source:NETCAT
Sink: Logger
Channel: Memory

# 设置Agent中每个组件的名称
# agent名字a1 中的source 有一个起名r1
a1.sources = r1
# 给a1中的sink起名叫k1
a1.sinks = k1
# 给a1中channel起名叫c1
a1.channels = c1

# 设置source的具体配置
# 设置r1的类型为netcat
# netcat连接某个主机的某个端口接收数据
a1.sources.r1.type = netcat
# 设置连接的主机ip或者主机名
a1.sources.r1.bind = node01
# 设置连接的端口
a1.sources.r1.port = 44444

# 设置sink
# 配置sink的类型为logger
# logger负责将接收到的数据以INFO级别的日志进行输出
a1.sinks.k1.type = logger

# 设置channel
# 设置channel使用内存缓存数据
a1.channels.c1.type = memory
# 设置channel容量
a1.channels.c1.capacity = 1000
# 设置channel接收或者发送event的单次最大吞吐量
a1.channels.c1.transactionCapacity = 100

# 设置source sink channel之间的连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 启动agent
    flume-ng agent \
    --conf /opt/flume-1.9.0/conf \
    --conf-file /opt/flume-1.9.0/conf/my_conf/netcat2logger.conf \
    --name a1 \
    -Dflume.root.logger=INFO,console
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/netcat2logger.conf \
-n a1 \
-Dflume.root.logger=INFO,console
nc 主机名  端口
nc node01 44444

查看端口占用情况!

# 查看端口占用情况
netstat -nap | grep 44444
# 查看某个进程的详细信息
ps -ef  | grep 30055
# 通过PID杀死进程
kill -9 30055

2.2 从文件中读取数据变化打印到控制台

Source:EXEC
Sink: Logger
Channel: Memory

# 设置Agent中每个组件的名称
# agent名字exec2logger 中的source 有一个起名r1
exec2logger.sources = r1
# 给exec2logger中的sink起名叫k1
exec2logger.sinks = k1
# 给exec2logger中channel起名叫c1
exec2logger.channels = c1

# 设置source的具体配置
exec2logger.sources.r1.type = exec
exec2logger.sources.r1.command = tail -F /root/2.txt

# 设置sink
# 配置sink的类型为logger
# logger负责将接收到的数据以INFO级别的日志进行输出
exec2logger.sinks.k1.type = logger

# 设置channel
# 设置channel使用内存缓存数据
exec2logger.channels.c1.type = memory
# 设置channel容量
exec2logger.channels.c1.capacity = 1000
# 设置channel接收或者发送event的单次最大吞吐量
exec2logger.channels.c1.transactionCapacity = 100

# 设置source sink channel之间的连接
exec2logger.sources.r1.channels = c1
exec2logger.sinks.k1.channel = c1
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/exec2logger.conf \
-n exec2logger \
-Dflume.root.logger=INFO,console
  • exec缺点

早期使用exec监控单文件日志的追加并采集
但是exec具有局限性:只能监控一个文件、无法记录读取进度,一旦出现agent宕机,就会发生数据丢失或者数据重复

2.3 从文件夹中监控多个文件的变化打印到控制台

Source:TAILDIR
tailDir相比较EXEC和Spooldir的优势在于,tailDir会用一个Json文件记录所有被读取的文件的读取进度,下次启动agent,可以接着上一次记录的进度继续读取,避免数据重复或者遗漏
Sink: Logger
Channel: Memory

# 设置Agent中每个组件的名称
# agent名字tailDir2logger 中的source 有一个起名r1
tailDir2logger.sources = r1
# 给tailDir2logger中的sink起名叫k1
tailDir2logger.sinks = k1
# 给tailDir2logger中channel起名叫c1
tailDir2logger.channels = c1

# 设置source的具体配置
tailDir2logger.sources.r1.type = TAILDIR
# 设置tailDir读取的文件组 组名分别为 f1 f2
tailDir2logger.sources.r1.filegroups = f1 f2
# tailDir可以指定读取固定的单个文件
tailDir2logger.sources.r1.filegroups.f1 = /root/dir4/1.txt
# tailDir可以通过通配符读取多个文件
tailDir2logger.sources.r1.filegroups.f2 = /root/dir5/.*txt.*

# 设置sink
# 配置sink的类型为logger
# logger负责将接收到的数据以INFO级别的日志进行输出
tailDir2logger.sinks.k1.type = logger

# 设置channel
# 设置channel使用内存缓存数据
tailDir2logger.channels.c1.type = memory
# 设置channel容量
tailDir2logger.channels.c1.capacity = 1000
# 设置channel接收或者发送event的单次最大吞吐量
tailDir2logger.channels.c1.transactionCapacity = 100

# 设置source sink channel之间的连接
tailDir2logger.sources.r1.channels = c1
tailDir2logger.sinks.k1.channel = c1
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/tailDir2logger.conf \
-n tailDir2logger \
-Dflume.root.logger=INFO,console

2.4 从文件夹中监控多个文件的变化并发送到HDFS

Source:TAILDIR
Sink: HDFS
Channel: Memory

# 设置Agent中每个组件的名称
# agent名字tailDir2HDFS 中的source 有一个起名r1
tailDir2HDFS.sources = r1
# 给tailDir2HDFS中的sink起名叫k1
tailDir2HDFS.sinks = k1
# 给tailDir2HDFS中channel起名叫c1
tailDir2HDFS.channels = c1

# 设置source的具体配置
tailDir2HDFS.sources.r1.type = TAILDIR
# 设置tailDir读取的文件组 组名分别为 f1 f2
tailDir2HDFS.sources.r1.filegroups = f1 f2
# tailDir可以指定读取固定的单个文件
tailDir2HDFS.sources.r1.filegroups.f1 = /root/dir4/1.txt
# tailDir可以通过通配符读取多个文件
tailDir2HDFS.sources.r1.filegroups.f2 = /root/dir5/.*txt.*

# 设置sink
# 配置sink的类型为logger
# logger负责将接收到的数据以INFO级别的日志进行输出
tailDir2HDFS.sinks.k1.type = HDFS
# 设置上传到HDFS的路径
tailDir2HDFS.sinks.k1.hdfs.path = hdfs://node01:9000/flume/data/%Y-%m-%d/
# 设置上传时的文件类型
tailDir2HDFS.sinks.k1.hdfs.fileType = DataStream
# 设置上传时的文件格式
tailDir2HDFS.sinks.k1.hdfs.writeFormat = Text

tailDir2HDFS.sinks.k1.hdfs.rollInterval = 600
tailDir2HDFS.sinks.k1.hdfs.rollSize = 133169152
tailDir2HDFS.sinks.k1.hdfs.rollCount = 0

# 设置channel
# 设置channel使用内存缓存数据
tailDir2HDFS.channels.c1.type = memory
# 设置channel容量
tailDir2HDFS.channels.c1.capacity = 1000
# 设置channel接收或者发送event的单次最大吞吐量
tailDir2HDFS.channels.c1.transactionCapacity = 100

# 设置source sink channel之间的连接
tailDir2HDFS.sources.r1.channels = c1
tailDir2HDFS.sinks.k1.channel = c1
flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/tailDir2HDFS.conf \
-n tailDir2HDFS \
-Dflume.root.logger=INFO,console

2.5 将三台机器的日志聚合到一个agent发送到HDFS

Sink avro
Source: avro

架构:
node01 -\
node02 -->node02 --> HDFS
node03 -/

  • 采集文件到AVRO聚合层
# 设置Agent中每个组件的名称
# agent名字taildir2avro 中的source 有一个起名r1
taildir2avro.sources = r1
# 给taildir2avro中的sink起名叫k1
taildir2avro.sinks = k1
# 给taildir2avro中channel起名叫c1
taildir2avro.channels = c1

# 设置source的具体配置
taildir2avro.sources.r1.type = TAILDIR
# 设置tailDir读取的文件组 组名分别为 f1 f2
taildir2avro.sources.r1.filegroups = f1 f2
# tailDir可以指定读取固定的单个文件
taildir2avro.sources.r1.filegroups.f1 = /root/dir4/1.txt
# tailDir可以通过通配符读取多个文件
taildir2avro.sources.r1.filegroups.f2 = /root/dir5/.*txt.*

# 设置sink
# 配置sink的类型为logger
# logger负责将接收到的数据以INFO级别的日志进行输出
taildir2avro.sinks.k1.type = avro
taildir2avro.sinks.k1.hostname = node02
taildir2avro.sinks.k1.port = 55555

# 设置channel
# 设置channel使用内存缓存数据
taildir2avro.channels.c1.type = memory
# 设置channel容量
taildir2avro.channels.c1.capacity = 1000
# 设置channel接收或者发送event的单次最大吞吐量
taildir2avro.channels.c1.transactionCapacity = 100

# 设置source sink channel之间的连接
taildir2avro.sources.r1.channels = c1
taildir2avro.sinks.k1.channel = c1
  • 启动命令
nohup /opt/flume-1.9.0/bin/flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/taildir2avro.conf \
-n taildir2avro \
-Dflume.root.logger=INFO,console > /opt/flume-1.9.0/logs/taildir2avro.log &
  • Avro聚合层采集到HDFS
# 设置Agent中每个组件的名称
# agent名字avro2HDFS 中的source 有一个起名r1
avro2HDFS.sources = r1
# 给avro2HDFS中的sink起名叫k1
avro2HDFS.sinks = k1
# 给avro2HDFS中channel起名叫c1
avro2HDFS.channels = c1

# 设置source的具体配置
avro2HDFS.sources.r1.type = avro
avro2HDFS.sources.r1.bind = node02
avro2HDFS.sources.r1.port = 55555

# 设置sink
# 配置sink的类型为logger
# logger负责将接收到的数据以INFO级别的日志进行输出
avro2HDFS.sinks.k1.type = HDFS
avro2HDFS.sinks.k1.hdfs.path = hdfs://node01:9000/flume/data
# 设置上传时的文件类型
avro2HDFS.sinks.k1.hdfs.fileType = DataStream
# # 设置上传时的文件格式
avro2HDFS.sinks.k1.hdfs.writeFormat = Text
avro2HDFS.sinks.k1.hdfs.rollInterval = 600
avro2HDFS.sinks.k1.hdfs.rollSize = 133169152
avro2HDFS.sinks.k1.hdfs.rollCount = 0

# 设置channel
# 设置channel使用内存缓存数据
avro2HDFS.channels.c1.type = memory
# 设置channel容量
avro2HDFS.channels.c1.capacity = 1000
# 设置channel接收或者发送event的单次最大吞吐量
avro2HDFS.channels.c1.transactionCapacity = 100

# 设置source sink channel之间的连接
avro2HDFS.sources.r1.channels = c1
avro2HDFS.sinks.k1.channel = c1
  • 启动命令
nohup /opt/flume-1.9.0/bin/flume-ng agent \
-c /opt/flume-1.9.0/conf \
-f /opt/flume-1.9.0/conf/my_conf/avro2HDFS.conf \
-n avro2HDFS \
-Dflume.root.logger=INFO,console > /opt/flume-1.9.0/logs/avro2HDFS.log &

Java进程查询脚本

#!/bin/bash

HOSTS=`cat /etc/hosts | grep node | awk '{print $2}'`

for HOST in $HOSTS
do
  echo "====${HOST}的java进程运行情况为:===="
  ssh $HOST /opt/jdk1.8/bin/jps | grep -v Jps
  echo "===================================="
done