AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

Mapreduce part2

MapReduce运行原理

1. 确认输入文件

  • 输入路径可以同时填写多个

  • 如果设置的输入路径是文件夹,MR会读取文件夹中所有文件(单层,不递归)

  • 如果设置的输入路径是文件,直接读取单个文件

  • 本地测试时可以使用本地路径,生产环境从HDFS读取数据

2. 提交任务

  • 本地测试(IDEA):
    Driver中调用job.waitForCompletion(true),提交任务在本地使用hadoop.dllwinutils.exe模拟hadoop环境并执行任务

  • 集群运行:
    hadoop jar xxx.jar com.xahj.xxx args1 args2

yarn jar xxx.jar com.xahj.xxx args1 args2

3. 确认分片,确认任务Jar,确认任务配置

3.1 确认分片

3.1.1 分片是什么

  • 切片是一个逻辑概念
  • 用来对原始文件进行划分
  • 划分好的切片数量对应MR任务的MapTask并发度(线程数)

3.1.2 Hadoop默认的分片方式

  • MR默认使用org.apache.hadoop.mapreduce.lib.input.FileInputFormat中的getSplits()方法获取分片信息

  • MR默认分片是Block大小

  • MR默认会将每个文件至少产生一个分片

3.2 确认任务Jar

  • 无论使用hadoop直接运行还是使用yarn提交Jar,都是通过指定jar包文件所在的路径确认运行包,通常我们还会指定运行的主类

3.3 确认任务配置

4. 根据分片数量确认MapTask数量

  • MapTask就是MR任务启动后执行Mapper代码逻辑的线程

  • 默认MapTask数与InputFormat计算的分片数量一一对应

  • 问题:
    如果需要处理的文件是一堆小文件,那么因为FileInputFormat会为每个文件至少产生一个切片,所以将来会启动很多maptask线程,运行效率可能会降低

    • 解决方案
      1. 预先合并小文件
        可以在收集数据时就将文件合并为大文件
        或者通过MR任务将小文件合并成大文件
    1. 通过改变分片机制来限制MapTask线程数
      636个文件 690MB
      默认使用FileInputFormat会至少为每个文件产生一个切片
      可以选用CombineTextInputFormat

5. InputFormat 输入格式化

  • InputFormat负责将原始文件读取到内存并转换成K-V发送给Mapper

  • 需求:
    默认使用TextInputFormat时,会将文本文件按行读取,每次读取将当前行在文件中的偏移量作为K,当前行文本内容作为V,传入Mapper的map()
    此时读取次数过多,为了优化读取性能,可以自定义一个InputFormat,每次将整个文件的内容读取到Mapper

  • 实现:

    1. 自定义一个类继承FileInputFormat
      实现方法 createRecordReader() 用来创建记录读取器

    2. 自定义一个类继承RecordReader

    3. initialize
      初始化方法,用于获取文件分片信息和任务的context对象

    4. nextKeyValue
      实现IO操作,读取数据并将kv缓存到当前对象中

    5. getCurrentKey
      返回当前k

    6. getCurrentValue
      返回当前v

    7. getProgress
      返回一个0.0~1.0的float值表示当前记录读取器的读取进度

    8. close
      关闭链接,释放资源

    9. driver中设置自定一的InputFormat类

6. Mapper

  1. 自定义一个类继承org.apache.hadoop.mapreduce.Mapper并填写4个泛型:输入K的类型,输入V的类型,输出K的类型,输出V的类型

  2. 重写map()
    map每次调用传入一组kv,在map中编写处理逻辑,将处理完的结果使用context.write(k,v)写出

  3. setup(),cleanup()
    setup()获取链接或者初始化资源
    cleanup()关闭连接释放资源

7. 环形缓冲区

  • MR框架中Collecter对象负责将Mapper输出的kv收集到内存中的一个叫做环形缓冲区的对象中

  • 环形缓冲区本质上就是一个首尾相接的数组,默认大小为100MB

  • 当缓冲区存储Mapper写出的KV到达阈值(0.8)时,会启动线程将缓冲区的数据写入到磁盘进行保存

8. 分区,排序,预聚合

8.1 分区

  • Mapper输出的数据会按照Partitioner中的逻辑按照Key对数据进行分区

  • MR默认使用HashPartitioner,使用Key的HashCode与ReduceTask数量取模的到与ReduceTask数量一致的分区数

  • 自定义分区器时,需要注意

    1. 分区数 > Reduce数 报错
    2. 分区数 < Reduce数 空文件
    3. 分区数 = Reduce数

8.2 排序

  • MR默认会按照Mapper和Reducer输出的Key进行升序排列

  • 作为Key的对象必须实现WritableComparable接口

8.3 预聚合combiner

  • Driver中可以给Job添加Mapper端的聚合器Combiner

  • Combiner与Reducer一样 也继承于Reducer,combiner也是每次调用reduce(),拿到一个Key对应的所有Value,可以在Mapper端对数据进行预聚合

  • 在大多数情况下Combiner与Reducer的逻辑完全一致且可以互相替换

  • 如果开启combiner会影响MR最终的计算结果,那么无法使用预聚合

9. 溢写

  • 环形缓冲区到达阈值后,数据在内存中进行上一步的分区排序和合并,并将处理完的数据先存放在磁盘中

  • 溢写会发生多次,会产生大量的中间结果文件

10. 合并排序

  • 将上一步溢写生成的所有小文件按照分区合并成大文件

  • 合并时会按照数据的key进行排序

11. Reduce读取Mapper端的数据

  • 每个Reducer按照分区编号,读取所有Mapper端自己对应编号的数据

  • 并将来自不同Mapper端的相同分区编号的数据进行合并和排序

12. reduce方法

  • Reducer中使用Ruduce方法一次传入Key和Key对应的所有的Value

  • reduce()负责完成数据聚合的逻辑编写

  • reduce()将处理完的数据通过context.write()写出

13. OutputFormat 输出格式化

  • reducer处理完的数据会被OutputFormat收集并写出

  • RecordWriter负责将KeyValue数据使用IO写出到文件或者写出到其他的外部存储