AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

MapReduce

MapReduce

一、是什么

  • MapReduce是Hadoop中的一个分布式计算框架

  • 基于HDFS存储的数据,MapReduce可以让程序员编写一个简单的类似单机程序的任务,提交到集群自动运行分布式多线程程序

  • MR优点

    1. 易于编程
      将所有的计算抽象为Mapper和Reducer两个环节,编程时只需要实现对应的类完成映射阶段和聚合阶段的业务逻辑,就可以构建高性能分布式应用程序

    2. 良好的扩展性
      得益于Hadoop集群的扩展性

    3. 良好的容错
      故障任务会自动重试,重试失败会自动转移到其他可用副本所在机器进行重试,可以保证MR任务的正确执行

    4. 适合海量数据处理
      MapReduce设计专门用于HDFS这类分布式文件系统存储的海量数据的数据统计和计算,可以支持TB\PB\EB\ZB..

  • MR缺点

    不适合进行实时计算、流式计算、DAG计算。

  • DAG 有向无环图
    在迭代式运算中,计算步骤过多,而每个计算步骤都需要单独启动一个MR程序,开发量增加,并且由于MR在计算时会使用IO方式将原始数读取到内存,再将处理结果使用IO方式写出到磁盘,多个MR任务会将大量的时间消耗在磁盘IO上,所以性能较差。

  • 离线计算 (批处理)

  • 实时计算 (流式计算)

- 离线计算 实时计算
速度
数据量级 GB\TB\PB GB
计算频率 一天一次 每时每刻都在计算

二、MR编程模型

2.1 MR编写任务

  • 每个MapReduce任务通常需要编写3个类

  • Mapper

    1. 自定义类继承org.apache.hadoop.mapreduce.Mapper
      填写四个泛型,分别为Mapper输入的key value类型和Mapper输出的Key Value类型

    2. 重写map(K key,V v,Context context)方法
      map是Mapper中的核心方法,
      用于接收输入的KeyValue,
      并将处理完的KeyValue使用context写出到框架

  • Reducer

    1. 自定义类继承org.apache.hadoop.mapreduce.Reducer
      填写四个泛型,分别为Reducer输入的key value类型和Mapper输出的Key Value类型
      Reducer输入的kv类型就是Mapper输出的kv类型

    2. 重写reduce(K key, Itearable<V> value,Context context)方法
      reducer每次接收map输出的一个key和相同key的所有value组成的迭代器,
      将处理玩的结果使用context写出到框架

  • Driver
    包含程序入口方法
    负责创建Job对象
    设置任务相关的参数配置,组装Mapper和Reducer

2.2 MR任务执行

  • 测试:在windows搭建hadoop环境,直接在开发工具中运行MR任务,通常使用小数据样例文件进行逻辑测试

  • 生产: 将写好的MR任务打成jar包,提交到hadoop集群执行

# 提交命令
hadoop jar [mainClass] 参数1 参数2
hadoop jar mr_wordcount.jar  hdfs://node01:9000/Harry.txt hdfs://node01:9000/output

2.3 MR执行过程

2.3.1 确定输入文件

  • 使用FileInputFormat设置数据的输入路径
  • 与HDFS兼容的所有文件系统都可以作为MR的输入

2.3.2 发起任务提交

  • Job对象使用submit()方法提交任务到集群运行
  • 如果使用MR本地运行模式,则提交到ApplicationMaster进程中
  • 如果使用Yarn模式运行,则提交到Yarn主节点RedouceManager

2.3.3 计算分片

  • 分片split 是一个逻辑概念

  • 将输入文件按照一定大小切分成若干个部分 每个部分称之为一个分片

    • 默认最小切片大小 1
    • 默认最大切片大小 Long.MAX_VALUE
    • 计算分片大小的公式 Math.max(minSize,Math.min(maxSize,blockSize))
    • 默认切片大小是block的大小
  • MR框架会根据切片的数量启动相应数量的Mapper线程,每个Mapper负责读取一个分片的数据进行处理

  • !!!注意 每个文件至少产生一个分片

  • FileInputFormat

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        // 启动一个用于监控分片计算是否完成的守护线程
        StopWatch sw = new StopWatch().start();

        //      1                     1                        0
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        //   Long.MAX_VALUE 9223372036854775807
        long maxSize = getMaxSplitSize(job);

        // generate splits

        //  创建一个集合用于存储分片结果
        List<InputSplit> splits = new ArrayList<InputSplit>();
        //  从job中获取所有的输入文件的信息
        List<FileStatus> files = listStatus(job);
        //   遍历输入文件列表
        for (FileStatus file: files) {
            // 获取文件路径
            Path path = file.getPath();
            // 获取文件长度
            long length = file.getLen();
            // 如果文件非空
            if (length != 0) {
                // 创建块位置信息数组
                BlockLocation[] blkLocations;
                // 如果当前文件是本地文件系统则直接获取块信息
                if (file instanceof LocatedFileStatus) {
                    blkLocations = ((LocatedFileStatus) file).getBlockLocations();
                } else {
                // 如果不是本地文件系统,则使用HDFS客户端获取文件分块信息
                    FileSystem fs = path.getFileSystem(job.getConfiguration());
                    blkLocations = fs.getFileBlockLocations(file, 0, length);
                }
                //  true
                if (isSplitable(job, path)) {
                    //   134217728
                    long blockSize = file.getBlockSize();
                    // 默认情况下  分片大小与block大小一致
                    //   134217728                    134217728    1      9223372036854775807
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
                    //  设置字节剩余量 = 文件长度
                    long bytesRemaining = length;
                    //  当文件长度<= 分片大小的1.1倍时,
                    //  不进入循环
                    //                 204800000    134217728
                    //                  130M          128M          1.1
                    //                 128M
                    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                        // 根据文件偏移量获取块编号
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        //  使用第一个块的块信息 创建分片信息
                        //                              200M - 200M  0        128M
                        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));
                        // 剩余长度-=分片大小
                        bytesRemaining -= splitSize;
                    }

                    //  剩余大小如果!=0
                    if (bytesRemaining != 0) {
                        // 根据文件偏移量获取块编号
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        // 使用对一个的块 创建分片信息
                        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));
                    }
                } else { // not splitable
                    splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                            blkLocations[0].getCachedHosts()));
                }
            } else {
                //Create empty hosts array for zero length files
                // 如果文件为空,则使用空的主机列表创建一个分片信息
                splits.add(makeSplit(path, 0, length, new String[0]));
            }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                    + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
    }

2.3.4 提交到集群生成任务

2.3.5 InputFormat数据读入

  • InputFormat是MR框架中负责读取数据并将数据发送给Mapper的一个模块

  • 在MR中默认使用TextInputFormat作为输入读入的格式化

    • TextInputFormat<LongWritable,Text> 将文件按行读取,每行开头位置的字节偏移量作为输入的Key,每行文本内容作为输入的Value
  • 问题:使用TextInputFormat时由于TextInputFormat继承FileInputFormat,所以每个文件至少分为一个分片,处理大量小文件时,会导致分片数量过多,Mapper的线程数量过多,导致性能低下

  • 解决:

    1. 预先将若干小文件进行合并(通常会将文件合并后再存储到HDFS,HDFS也不适合小文件存储)

    2. 使用其他的InputFormat完成文件读取操作
      CombineTextInputFormat继承CombineFileInputFormat,重写了分片计算的逻辑,允许将多个文件按照大小合并到一个分片中
      解决大量小文件每个文件单独作为一个分片启动多个Mapper的问题

    3. 将小文件合并为SequenceFile,用于记录每个文件自己的属性和文件内容

  • 如果现有的InputFormat不能满足功能需求,有时需要自定义InputFormat实现数据读入功能

    1. 自定义类继承FileInputFormat
    2. 实现createRecordReader方法,创建并返回自定义的记录读取器对象
    3. 自定义类继承RecordReader
    4. 需要实现以下方法
    5. init用于获取分片信息
    6. nextKeyValue()实现具体的IO操作读取分片对应的文件,构建用于Mapper处理的Key和Value
    7. getCurrentKey() 用于返回当前key
    8. getCurrentValue() 用于返回当前value
    9. inProccess() 用于获取当前记录读取器的读取进度
    10. close() 用于关闭创建的链接或者资源
  • Driver程序中使用job.setInputFormatClass(自定义的输入格式化类)

2.3.6 Mapper处理数据

  1. setup()
    用于Mapper任务开始之前获取资源创建连接
    在Mapper任务开始时调用一次

  2. map(key,value)
    用于处理记录读取器读取的输入的key和value

  3. cleanup()
    用于Mapper任务开始结束前关闭资源关闭连接
    在Mapper任务结束前调用一次

  4. run()
    setup()
    while(context.nextKeyValue()){
    map(context.key,context.value,context)
    }
    cleanup()

2.3.7 收集器收集数据到环形缓冲区

  • MR提供【收集器】对象用于将Mapper处理完的数据收集到【环形缓冲区】
  • 环形缓冲区是一个首尾相接的数组结构作为底层,默认大小为100MB
  • 默认阈值为0.8,当缓冲区中未被溢写的数据量到达缓冲区阈值时,需要将数据写出到磁盘中

2.3.8 分区、排序、溢写

  • 分区(partition):

    • 如果在MR任务中想将输出的结果文件输出到多个文件中,

    • 就需要启动多个Reduce线程

    • Driver中可以设置job的reduce线程数量

    • 默认MR使用Reduce个数作为分区的依据,

    • MR默认使用HashPartitioner作为分区器,按照key的hashCode对reudce数量取模,得到的余数就是分区的编号

    • 可以自定义分区器实现分区逻辑

      1. 自定义一个类继承Partitioner类
      2. 实现getPartition(key,value,numReduce)方法
        自定义逻辑返回int作为分区的编号,编号通常从0开始
      3. job中设置自定义分区器
    • 分区数量 reduce数量的关系
      分区数量 > reduce数量 报错
      分区数量 < reduce数量 产生多余空文件
      分区数量 = reduce数量 合理的设置

  • 排序

    • MR框架会对Mapper输出的Key进行升序排序
    • Writable已经实现的序列化类型都实现了WritableComparable接口,实现了对象的序列化和可排序
    • 所以如果自定义对象需要放在Key的位置上,则也必须实现可排序接口WritableComparable
    • compareTo()方法中可以通过编写比较大小的逻辑,对多个属性进行依次排序也可以改变排序方式
  • Combiner
    MR框架允许在Mapper结束后添加一个类似Reducer的合并器组件
    合并器可以在Mapper运行的机器本机进行数据的预聚合,预聚合可以大大缩减MR shuffle中的网络IO开销
    注意!并不是所有的Reducer都可以直接作为Combiner使用
    如果添加Combiner之后影响统计结果则无法使用预聚合

file

2.3.9 合并分区启动reduce

  • MR框架将Mapper输出的数据按照分区划分为多个文件
  • MR框架将不同Mapper的相同分区的数据 拷贝到同一个Reducer端进行合并,合并过程中按照数据的Key排序

2.3.10 reduce读取分区数据进行处理

  • reducer中的核心方法reduce(key,Iter<Value>)
  • 将相同key对应的所有value在reudce()方法中进行聚合
  • 并将聚合之后的结果写出到框架

2.3.11 OutputFormat输出数据到文件

  • 如果MR有Reducer环节,reducer处理完的数据交给OutputFormat进行输出,或者不包含Reducer的MR,mapper处理完的数据直接交给OutputFormat进行输出

  • 一些场景下我们需要自定义数据的输出方式则可以自定义OutputFormat类

    1. 自定义类继承FileOutputFormat,重写getRecordWriter()
    2. 自定义类继承RecordWriter,重写write(key,value),close
      write()方法每次调用wirte获取一组k-v数据,write()负责操作io流将数据写出到文件中
      close()关闭资源
    3. 在driver中设置输出方式为自定义的outputFormta

2.4 Hadoop序列化Writable

  • hadoop自己实现序列化的原因

    • Java的序列化对于MR的计算场景来说,过于繁琐,会导致数据经过网络IO发送时负载压力过大,所以Hadoop自己实现了一套更加高效轻量级的适合分布式运算的序列化框架Writable
  • 使用Writable序列化方式定义自己的序列化类

    1. 自定义一个类 实现 Writable接口
    2. 实现 序列化write 和 反序列化 readFields方法
      write(DataOutput) 负责将对象的属性逐个写出到存储中
      readFields(DataInput) 负责将序列化文件中的数据 读取到内存并赋值给成员属性
      书写序列化和反序列化的顺序可以不与声明顺序一致
      但是必须保证序列化和反序列化内部顺序一致
    3. 实现toString方法方便输出
    4. 实现空参构造器用于反射

2.5 计数器

  • 在MR这种分布式应用程序中,如果想进行某个环节的计数操作,直接使用成员属性的方式会导致每个Mapper线程统计自己的计数结果,最终无法将计数结果进行汇总

  • 所以MR提供Counter计数器,专门用于分布式计数统计

  • 一般计数器用于在任务执行过程中记录一些参考指标,例如MR日志中的计数器用于统计MR任务中的运行指标

  • MR也允许自定义计数器完成分布式计数操作

    1. 使用字符串形式创建计数器
    2. 使用枚枚举类型创建计数器
  • 使用

    1. 创建
    2. 计数器调用increment(long)
    3. 直接在日志中查看计数结果