AmosCloud

Library

Have a Question?

If you have any question you can ask below or enter what you are looking for!

[中译版]ApacheHadoop官方wiki-MapReduce

  • 说明:
  1. 来源:本文档原文取自Hadoop官方文档Hadoop wiki HadoopMapReduce
  2. 内容说明:文档内容均为本人人工翻译,部分章节会增加一些自己的理解,或将描述调整为更适合阅读的语序。
    不喜勿喷,若您在阅读过程中发现错误,敬请不吝赐教。
  3. 译者及联系方式:
    • Translator: Amos
    • E-mail: amos@amoscloud.com

Map和Reduce具体是如何工作的

引言

本文主要介绍MapReduce框架的实际执行过程。如果您还不熟悉Gooogle的MapReduce编程模型,推荐您先阅读MapReduce论文

Map

  • 在启动并行的Map任务之前,框架首先会将需要处理的所有文件分割为若干个叫做FileSplit的分片。
  • 较大的文件,会被分割为多个较小的分片。在计算分片时,负责计算分片的实际代码逻辑中,默认仅仅按照大小进行切片,不关心文件的具体逻辑结构(例如:行式存储的文本文件中的一行,可能会被从中间任意位置切开)。
  • 得到分片数量和具体的分片信息后,框架会启动与分片数量相同的MapTask线程,每个MapTask读取一个分片的数据。

  • 每个MapTask启动时,会根据ReduceTask的数量,为每个Reduce单独开启一个输出流。接下来Map会不断地利用Driver中指定的InputFormat所提供的RecordReader实例,从自己负责的FileSplit中读取数据。
  • InputFormat会将读取到的数据解析为Map需要的键值对类型。
  • InputFormat会专门处理FileSplit结尾被切开的数据。例如,TextInputFormat在读取FileSplit的最后一行文本时,会将这一行完整的读取进来,即使读取的范围已经超过FileSplit的边界。而除了文件的第一个FileSplit之外,其他的FileSplit在读取文件时,会跳过第一个换行符号之前的内容。

  • InputFormat生成的键和值,并非都会被Map真正使用到。例如:TextInputFormat默认会将读取到的文本中的每行开始的字节偏移量作为键,每行的文本内容作为值,而绝大部分应用只需要处理文本内容,而不需要关心行的字节偏移量。

  • RecordReader会将读取到的键值对传递给Mapper。Mapper根据开发者编写的具体业务逻辑对收到的键值对数据进行处理,并在处理完成后,通过OutputCollector.collect()将需要发送到下游的数据以键值对形式写出。Mapper输出的键值对数据会被写入一个SequenceFile。Mapper的输入和输出,在类型上或者数据条数上,并没有任何强相关关系。

  • Mapper输出的数据,会根据任务指定的Partitioner和Reducer的数量,将数据写到指定的输出文件。默认的分区器是HashPartitioner,它会根据Mapper输出的键对象的hashCode()方法对数据进行分区散列(这就意味着,hashCode()方法提供的散列算法需要足够优秀,这样才能将Mapper输出的数据尽量均衡的分散到下游的每个Reducer)。具体实现细节可以阅读org.apache.hadoop.mapred.MapTask源码。

  • N个文件作为输入,切割为M个文件分片(FileSplit),根据分片结果,生成M个Map任务,每个Map任务将生成与Reduce数量相同的临时文件。每个临时文件最终会交给相应的Reducer进行处理,Map任务在输出数据时,通常会根据给定的Partitioner逻辑将相同键的数据输出到同一个临时文件中,最终方便在Reducer中对相同键的数据进行聚合处理。

Combine

  • Map输出的键值对,会首先被缓存在内存中。出于性能优化的考量,我们有时会提供一个Combiner类,用于执行类似Reducer的聚合操作,事实证明Combiner的使用,对计算效率优化非常有效。

  • 当MR任务中添加了Combiner时,Mapper的输出不会立即写入输出文件,而是会被收集到一些列表中,每个键值对作为一个列表。当收集到一定数量的键值对时,会将每个键对应的所有值,传入Combiner的reduce()方法,并将reduce()输出的结果更新到缓冲区中。

  • 例如:在一个词频统计的MR程序中,Mapper输出的数据为(word,1)的键值对,这时可以使用Combiner进行预聚合。Combiner将在内存中直接对相同键的键值对进行合并,每个相同键的所有键值对,合并为一个键值对。每次Mapper输出了一定数量的键值对后,Combiner就在内存中对每个唯一的键进行上述的预聚合,最终Combiner再将聚合后的键值对输出。

  • 从Reducer的角度来说,无论是否使用Combiner,Reducer收到的数据,在信息量上是等价的,但计算任务中的磁盘和网络IO开支,通常会大大缩减。

Reduce

When a reduce task starts, its input is scattered in many files across all the nodes where map tasks ran. If run in distributed mode these need to be first copied to the local filesystem in a copy phase (see ReduceTaskRunner).

Once all the data is available locally it is appended to one file in an append phase. The file is then merge sorted so that the key-value pairs for a given key are contiguous (sort phase). This makes the actual reduce operation simple: the file is read sequentially and the values are passed to the reduce method with an iterator reading the input file until the next key value is encountered. See ReduceTask for details.

At the end, the output will consist of one output file per executed reduce task. The format of the files can be specified with JobConf.setOutputFormat. If SequentialOutputFormat is used then the output key and value classes must also be specified.