头图来自[#崩坏 雷电芽衣 - Ekita玄][1]

Buffer 和 Cache 啊.....

Map 端的 Shuffle 过程

image.png

  1. 输入数据&执行 Map 任务
    • Map task 的输入数据一般保存于分布式文件系统的 Block 当中。Map task 读取 <key,value>键值对形式的spilt后,按照一定的映射规则转换成一批 <key,value>键值对输出
  2. 写入缓存
    • 每个 Map 任务都会被分配一个缓存,Map 的输出结果不会立刻写入磁盘,而是首先写入缓存。
    • 在写入缓存之前, key 与 value 值都会被序列化为字节数组
    • 当缓存中积累一定数量的 Map 输出结果以后,再一次性写入磁盘
  3. 溢写(分区、排序、合并)

    • 提供给 MapReduce 的缓存容量默认大小是 100MB ,随着 Map 任务的执行,缓存中的 Map 结果会不断增加,很快就会占满整个内存,这个时候就要启动 溢写(Spill)操作,把缓存中的内容一次性写入磁盘,并清空缓存。
    • 为了保证溢写过程中,Map 结果的写入不受影响,会设置一个溢写比例,默认是80%,当缓存被填满80%时,就会启动溢写操作,剩下20%保证 Map 结果继续写入
    • 在溢写到磁盘之前,会执行分区(Partition)、排序(Sort)、排序(Combiner)
    • 分区
      • 缓存中的数据时<key,value>形式的键值对,这些键值对最终需要交给不同的 Reduce 任务进行处理,MapReduce 通过调用 Partition 接口对这些键值对进行分区,默认采用 Hash 函数 对 key 进行哈希后在用 Reduce 任务的数量进行取模,MapReduce提供默认的分区类HashPartitione源码如下:
        
        public class HashPartitioner<K, V> extends Partitioner<K, V> {

    /* Use {@link Object#hashCode()} to partition. /
    public int getPartition(K key, V value,
    int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }

}



      - getPartition()方法中,`K key, V value`这两个参数是输入的键值对,`int numReduceTasks`指的是设置的reduce任务的数量,默认值为1。因为任何整数与1相除的余数肯定是0。也就是说默认的getPartition()方法的返回值总是0,也就是Mapper任务的输出默认总是送给同一个Reducer任务,最终只能输出到一个文件中。
      - 如果想要让mapper输出的结果给多个reducer处理,那么只需要写一个类,让其继承Partitioner类,并重写getPartition()方法,让其针对不同情况返回不同数值即可。并在最后通过job设置指定分区类和reducer任务数量即可。
      - 有博客的流程图把 partition 放在了第二步,那是指写入 **缓冲(Buffer)**,而本文指的是写入 **缓存(Cache)**,具体的区别请读者自行甄别
    - 排序
      - 对于每个分区的所有键值对,后台线程会根据 key 来进行**内存排序**,排序是 MapReduce的默认操作
    - 合并
      - 这个操作是可选的,如果用户事先没有定义 Combiner 函数,就**不会**执行合并操作
      - 合并的操作是指将那些具有相同的 key 的<key,value>的value加起来,比如有两个键值对<"TEST",1>、<"TEST",1>就会合并为一个键值对<"TEST",2>,从而减少键值对的数量
  - 在分区、排序、合并(视情况)操作之后,这些缓存中的键值对就可以被写入磁盘,并且清空缓存
  - 每次溢写操作都会在磁盘中生成一个新的溢写文件,写入溢写文件的所有键值对都是经过分区和排序的
4. 文件归并
  - 具有相同 key 的键值对会被归并成为一个新的键值对,比如键值对<k1,v1>、<k1,v2>、<k1,v3>就会归并成<k1,<v1,v2,v3>>键值对
  - 如果磁盘中已经生成的溢写文件数量超过参数 min.num.spills.for.combine 的值时(默认值是3,可以修改)时,可以再次运行 Combiner,对数据进行合并操作,从而减少写入磁盘的数据量;如果磁盘只有一两个溢写文件时,合并往往会得不偿失,因为合并操作本身也是需要代价的,因此不会运行 Combiner
- 最后会生成一个大文件被存放在本地磁盘上,这个大文件的数据是被分区的,不同的分区会被发送到到不同的 Reduce 任务进行并行处理。
- JobTracker 会一直检测 Map 任务的执行,当一个 Map 任务完成后,就会立即通知相关的 Reduce 来领取数据,然后开始 Reduce 端的 Shuffle 过程
<a name="JvDtE"></a>
## Reduce 端的 Shuffle 过程
![image.png](https://cdn.nlark.com/yuque/0/2020/png/710820/1586944620917-2b8daa06-7a6f-48fa-84cb-c2093035ca24.png#align=left&display=inline&height=218&margin=%5Bobject%20Object%5D&name=image.png&originHeight=435&originWidth=778&size=35235&status=done&style=none&width=389)

1. 领取数据
  - 在领取到数据之前,每个 Reduce 都在不断地通过 RPC 向 JobTracker 询问 Map 任务 是否已经完成,如果 JobTracker 监测到一个 Map 任务完成,就会通知相关的 Reduce 来领取数据,Reduce 会到相应的 Map 任务机器上把属于自己处理的分区数据领取到本地磁盘中
2. 归并数据
  - 从 Map 端领会的数据首先会被存放在 Reduce 任务所在机器的缓存中,如果缓存被占满,就会像 Map端一样被溢写到磁盘中
  - Reduce 任务会从多个 Map 机器领回数据,因此缓存中的数据大多来自不同的 Map 机器,一般会存在很多可以合并(Combine)的键值对
  - 当溢写过程启动时,具有相同的 key 的键值对会被归并(Merge),如果用户定义了 Combiner 则归并后的数据会进行合并操作,减少磁盘的数据量
3. 归并文件
  - 当所有的 Map 数据被领回时,多个溢写文件会归并成一个大文件,归并的时候还会对键值对进行排序,从而是的最终大文件中的键值对都是有序的
  - 把多个文件归并成一个大文件可能要执行多轮归并操作,每轮归并操作可以归并的文件数量由 io.sort.factor 的值来控制(默认值为10,可修改)。
  - 存在一种情况,缓存可以存储所有的数据,就不需要把数据溢写到磁盘上,直接在内存中执行归并操作后输出给 Reduce 任务
4. 把数据输入给 Reduce 任务
  - 磁盘中经过多轮归并后得到的若干个大文件,不会继续归并成为一个新的大文件,为了减少磁盘读写开销,会直接输入给 Reduce 任务

  [1]: https://www.pixiv.net/artworks/80774528
Last modification:April 15th, 2020 at 07:25 pm
如果觉得我的文章对你有用,请随意赞赏