AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

流式计算_day02

kafka

kafka和flume的使用

日志采集案例

1.技术选型

  • 确定使用的技术框架
    组长、经理、CTO

2.预研

  • 确定版本
    1. 操作系统(内核版本)
    2. 技术框架、组件版本

3.架构设计

  • 画流程图
  • 写设计文档

4.系统初始化

  • 搭建开发环境
    windows本地环境

  • 搭建测试环境
    服务器集群环境

5.开发

  • 翻译
    把流程图和设计文档翻译成:脚本,编程语言

6.测试 (功能测试、压力测试)

  • 功能测试

大数据:流程图中的每一个点和线,是否正常工作(是否有正常的输入输出)

  • 压力测试

人造数据,比预期的生产环境高出一个数据量级,进行长期运行

7.整理文档

  • 软件部署说明书
  • 软件使用说明书

8.部署实时

  • 按照说明书,把所有的软件,安装并启动在生产集群上,接入真实数据

工作记录

  • log2kfk
# example.conf: A single-node Flume configuration
# Name the components on this agent
log2kfk.sources = r1
log2kfk.sinks = k1
log2kfk.channels = c1
# Describe/configure the source
# 设置souece的类型
log2kfk.sources.r1.type = TAILDIR
# 设置taildir记录读取进度的json文件保存的位置
log2kfk.sources.r1.positionFile = /var/log/flume/taildir_position.json
log2kfk.sources.r1.filegroups = f1
log2kfk.sources.r1.filegroups.f1 = /root/web.log
log2kfk.sources.r1.headers.f1.headerKey1 = f1
log2kfk.sources.r1.fileHeader = true
log2kfk.sources.r1.maxBatchCount = 1000
# Describe the sink
log2kfk.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
log2kfk.sinks.k1.kafka.topic = weblog2
log2kfk.sinks.k1.kafka.bootstrap.servers = bd0201:9092,bd0202:9092,bd0203:9092
log2kfk.sinks.k1.kafka.flumeBatchSize = 20
log2kfk.sinks.k1.kafka.producer.acks = 1
log2kfk.sinks.k1.kafka.producer.linger.ms = 1
log2kfk.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
log2kfk.channels.c1.type = memory
log2kfk.channels.c1.capacity = 5000
log2kfk.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
log2kfk.sources.r1.channels = c1
log2kfk.sinks.k1.channel = c1

1. hive数仓

1.1 数仓分层设计

1.2 表结构设计

  • 分层
    1. 原始数据层
      采集到的原始数据,未经过清洗的全量数据
  1. 明细层
    从原始数据层清洗完,提取需要的字段

  2. 维度层(数据量会激增,用空间换时间)
    根据各种维度创建大的宽表

  3. 应用层(数据集市)
    最终统计的指标(度量)

  • 设计库

  • 设计表

1.3 建表

  • 原始数据层的表
-- 创建原始层库
create database weblog_ods_db;
use weblog_ods_db;
create external table weblog_ods_t(
line string)
partitioned by (dt string,min string);

2. kafka采集数据到HDFS

2.1 flume安装

2.2 flume-agent配置

# example.conf: A single-node Flume configuration
# Name the components on this agent
kfk2hdfs.sources = r1
kfk2hdfs.sinks = k1
kfk2hdfs.channels = c1
# Describe/configure the source
kfk2hdfs.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
kfk2hdfs.sources.source1.batchSize = 5000
kfk2hdfs.sources.source1.batchDurationMillis = 2000
kfk2hdfs.sources.source1.kafka.bootstrap.servers = bd0201:9092,bd0202:9092,bd0203:9092
kfk2hdfs.sources.source1.kafka.topics = weblog2
kfk2hdfs.sources.source1.kafka.consumer.group.id = test01
# Describe the sink
kfk2hdfs.sinks.k1.type = hdfs
kfk2hdfs.sinks.k1.hdfs.path = /user/hive/warehouse/weblog_ods_db.db/weblog_ods_t/dt=%y-%m-%d-%H/min=%M
kfk2hdfs.sinks.k1.hdfs.filePrefix = weblog-
kfk2hdfs.sinks.k1.hdfs.rollInterval = 0
kfk2hdfs.sinks.k1.hdfs.rollSize = 133169152
kfk2hdfs.sinks.k1.hdfs.rollCount = 0
kfk2hdfs.sinks.k1.hdfs.fileType = DataStream
kfk2hdfs.sinks.k1.hdfs.writeFormat = Text
# Use a channel which buffers events in memory
kfk2hdfs.channels.c1.type = memory
kfk2hdfs.channels.c1.capacity = 1000
kfk2hdfs.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
kfk2hdfs.sources.r1.channels = c1
kfk2hdfs.sinks.k1.channel = c1

2.3 测试

 flume-ng agent --conf /opt/flume-1.9.0/conf --conf-file /opt/flume-1.9.0/conf/my_conf/kfk2hdfs.conf --name kfk2hdfs -Dflume.root.logger=INFO,console