AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

MapReduce

MapReduce

是什么

MR是Hadoop中的分布式计算框架
用于编写分布式高并发程序进行海量数据的批处理(离线处理)统计计算

  • 优点:

    1. 易于编程:
      MR提供了简单的编程API,将所有计算抽象为Map和Reduce两个环节,开发时只需要实现Mapper类和Reducer类编写业务逻辑,再实现Driver类创建并组装Job对象运行MR程序。

    2. 易于扩展:
      MR是hadoop的一个组件,数据存储可以使用Hadoop中的HDFS将海量数据分散在集群的多台机器上,可以使用MR框架编写分布式高并发程序,利用集群所有机器的CPU和内存资源完成高性能运算。

    3. 高容错:
      MR会自动进行故障转移,计算过程中如果某个线程计算失败,会自动重试或在其他数据副本上启动相同任务保证计算准确执行。

    4. 适合处理海量数据
      不适合处理小数据量,因为MR启动需要一些资源的消耗
      MR适合处理 百GB TB PB

  • 缺点:

    1. 不适合实时计算:
      MR启动延迟较高,不适合低延迟响应的应用

    2. 不适合流式计算:

计算方式 批处理(离线计算) 流式计算(实时计算)
速度
数据量级 海量数据,没有上限 GB
框架 MapReduce、SparkCore、FlinkDataSet Storm、SparkStreaming、FlinkStream
  1. 不适合做 DAG (有向无环图)运算
    MR每次任务都需要使用磁盘IO读取原始数据,并将计算结果使用磁盘IO写入到磁盘,DAG是一类由多个计算步骤组成的复杂计算,如果使用MR完成DAG计算,需要编写大量的MR程序串联或并联,计算过程中会频繁产生大量的磁盘IO操作,性能极差
    Spark、Flink新一代的计算框架,基于内存运算,将所有的DAG步骤全部在内存中完成,相比较MR节约了大量的磁盘IO,所以性能更优。

核心思想和编程模型

  1. MR将所有计算拆分为两个环节
  2. Mapper负责进行逐条数据的处理
  3. Reducer负责进行数据聚合

MR编程核心原理

1. 分片计算

在MR任务开始前,
MR需要根据原始文件的大小计算启动时的文件分片FileSplit
根据分片的数量启动对应数量的MapperTask线程

默认分片大小与块大小一致
Math.max(minSize, Math.min(maxSize, blockSize))

默认每个文件至少产生一个分片
如果文件大小不超过块大小的1.1倍,不会切分为两个分片

2. 提交任务到集群执行

测试过程 local
生产环境 YARN

hadoop jar xxx.jar 参数
yarn jar xxx.jar 参数

3. 数据读取

MR框架使用FileInputFormat进行数据的读取

  • 问题
    有14个小文件,使用默认的数据读取方式
    每行都需要读取一次,效率较低
    如果可以每次将整个文件一次性读取到内存,可以提高计算效率

  • 解决
    自定义输入格式化类 继承FileInputFormat
    实现createRecordReader()方法,用于创建一个记录读取器对象
    自定义记录读取器对象 继承RecordReader
    实现方法用于进行文件的IO操作

4. map

每次接收一组kv数据,进行处理
并将结果写出到框架

  • 需求
    统计每个ip的访问数量、成功访问的数量、失败访问数量、请求流量、响应流量

当Map或者Reduce需要保存更复杂的数据结构时
Hadoop自带的序列化类不能满足要求,此时可以自定义javaBean作为MapperReduce的输入输出类型

  1. 自定义一个类实现Writable接口

  2. 实现序列化和反序列化方法
    属性使用顺序需要一致

  3. 添加空参构造器用于反序列化时反射创建对象

  4. 添加toString方便对象的打印

5. 环形缓冲区

将map输出的数据放在内存中缓存,缓存超过阈值 溢写到磁盘

环形缓冲区默认大小100MB
本质上是一个数组,通过封装数组实现环形访问
访问下标单调递增,如果当前访问的下标已经超过数组长度,就减去数组长度从而实现环形访问

6. 分区

在Map执行之后
Reduce开始之间
MR框架会根据执行的分区器, 对数据进行分区
MR默认使用HashPartitioner,使用Map输出的数据的Key的hashCode跟设置的reducer数量取模,产生与reducer数量匹配的分区数量
将数据分散到多个分区
reducer阶段每个reducer读取一个分区的数据进行处理

  • MR默认只有一个Reducer,如果不设置ReducerTask数量,自定义的分区器不生效
  • 如果设置自定分区和reducer数量,那么满足以下情况
    1. reducer > partition 生成多余空文件
    2. partition > reducer MR报错无法执行
    3. partition = reducer 每个reducer处理一个分区,需要注意自定义分区器中分区编号需要从0开始连续设置

7. 排序

MR会自动按照作为Key的对象进行升序排列

MR框架在Map结束后 会对Map输出的key进行排序
本质上调用对象实现的comparaTo方法进行大小比较

  • 需求:
    按照用户的访问次数进行降序排列,并将相同访问次数的用户按照请求流量进行降序排列

  • 实现:
    如果Hadoop提供的默认的Writable类不能实现排序需求,
    用户可以自定义可以排序的Bean作为Mapper输出的key
    自定义可排序对象需要实现WritableComparable接口 实现序列化反序列化和排序方法

8. combiner

Map结束后,MR允许用户添加一个Map端的预聚合类
预聚合类称之为合并器(Combiner),Combiner与reducer类似继承Reducer类实现reduce()
目的是在shuffle前每个Map端对数据进行预先聚合减少shuffle阶段的网络IO,提要任务的运行效率
绝大部分可以使用Combiner的场景,Combiner与Reducer的逻辑完全相同
另外一些场景Combiner会影响最终任务的结果,则无法使用预聚合

9. reduce

  • 每个reducerTask接收一个分区的数据进行处理
  • reducer将多个mapper的同一个分区的数据进行合并
  • 将相同key对应的所有value一次传入reduce(K key,Iterable<V> iter)方法
  • reduce()方法编写业务逻辑对相同key的数据进行聚合操作
  • reduce()架构聚合之后的结果通过context.write()写出到框架

10. 数据输出

FileOutputFormat用于指定输出文件的路径和格式

  • MR会将Reducer处理完的数据以KeyValue的结构发送给OutputFormat
  • OutputFormat调用RecordWriter的write()方法将每个KV进行输出
  • 如果需要自定义输出路径,文件名,或者文件格式,可以自定义个OutputFormat和RecordWriter类实现IO操作

计数器

在MR这种分布式计算中
如果需要进行任务过程中的关键数据统计,在每个Mapper或者每个Reducer中进行统计的话,会拿到多个结果
因为Mapper和Reducer会有多个实例

如果像直接拿到汇总的计数结果,MR提供了一个计数器类

  • 使用字符串创建计数器

  • 使用枚举创建计数器

  • 使用计数器对象进行自增

  • 计数器的统计结果会自动打印到日志中

  • 练习
    编写MR

    1. 将成功访问和失败访问输出到不同的文件
    2. 将第一次统计的结果进行处理 按照IP的访问次数升序排列输出