AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

day05_Hadoop_MR

MapReduce运行流程

MR主要组成

  • 我们使用MR框架
    主要需要实现两个类的代码

    • Mapper
      对数据进行逐行处理
    • Reducer
      对Mapper输出的数据按照Key进行聚合
  • 完整的MR流程

    • job 提交任务

      1. 设置API的版本为新版本
      2. 获取集群连接Cluster
      3. 创建Submitter(hdfs文件系统,集群客户端)
    • 计算分片数量

    • 分片数量决定了MapTask的线程数

    • 在FileInputFormat中会计算分片的大小 Math.max(minSize,Math.min(blockSize,maxSize))
      默认分片大小和blockSize大小一致
      可以通过设置最大最小分片大小来修改分片大小

    • 然后根据分片的大小计算分片的数量

    • 注意!每个文件至少分一个片

      public List<InputSplit> getSplits(JobContext job) throws IOException {
      StopWatch sw = new StopWatch().start();
      //     1                 1                           1
      long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
      //  Long.MAX_VALUE
      long maxSize = getMaxSplitSize(job);
      // generate splits
      List<InputSplit> splits = new ArrayList<InputSplit>();
      //listStatus 从我们在driver设置的输入路径中找到所有的文件的FileStatus
      List<FileStatus> files = listStatus(job);
      for (FileStatus file: files) {
          //获取文件路径
          Path path = file.getPath();
          //获取文件长度
          long length = file.getLen();
          //长度不为0的文件
          if (length != 0) {
              // 块位置数组
              BlockLocation[] blkLocations;
              //判断FileStatus是否是LocatedFileStatus类型
              //是的话对blkLocations进行赋值
              if (file instanceof LocatedFileStatus) {
                  blkLocations = ((LocatedFileStatus) file).getBlockLocations();
              } else {
                  //如果不是LocatedFileStatus类型
                  //通过文件系统的方式获取块信息
                  FileSystem fs = path.getFileSystem(job.getConfiguration());
                  blkLocations = fs.getFileBlockLocations(file, 0, length);
              }
              //判断是否可分片
              if (isSplitable(job, path)) {
                  //   128M   134217728
                  long blockSize = file.getBlockSize();
                  //   128M  134217728
                  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
                  //  字节剩余数
                  //  1    200M
                  //  2    72M
                  long bytesRemaining = length;
                  //               200M            128M      > 1.1
                  //               72M             128M
                  // 132M   128  +12.8
                  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                      //获取文件块索引
                      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                      //                              0                       128M
                      splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                              //设置分片对应的主机位置信息
                              blkLocations[blkIndex].getHosts(),
                              blkLocations[blkIndex].getCachedHosts()));
                      bytesRemaining -= splitSize;
                  }
                  //  72M
                  if (bytesRemaining != 0) {
                      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                      //                              200M   72M
                      splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                              blkLocations[blkIndex].getHosts(),
                              blkLocations[blkIndex].getCachedHosts()));
                  }
              } else { // not splitable
                  splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
              }
          } else {
              //Create empty hosts array for zero length files
              //长度为0的文件咱们就不处理了
              splits.add(makeSplit(path, 0, length, new String[0]));
          }
      }
      // Save the number of input files for metrics/loadgen
      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
      sw.stop();
      if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
                  + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
      }
      return splits;
      }
    • RecordReader 记录读取器

    • InputFormat 输入格式化
      用来设置输入的格式
      用来封装RecordReader
      读取数据将数据交给Mapper

    • Mapper

    • Reducer

You must be logged in to post a comment.