MapReduce排序过程详解

Hadoop、Spark等分布式数据处理框架在宣传自己的性能时大都以排序效果来做比较,各种类别的Sort Benchmark已成为行业基准测试。之所以选择排序是因为排序的核心是shuffle操作,数据的传输会横跨集群中所有主机,Shuffle基本支持了所有的分布式数据处理负载。

下面就来详细分析一下使用mapreduce实现排序的基本过程。先看一些准备知识。

MapReduce中的数据流动

  • 最简单的过程: map - reduce
  • 定制了partitioner以将map的结果送往指定reducer的过程: map - partition - reduce
  • 增加了在本地先进行一次reduce(优化)的过程: map - combine - partition - reduce

Partition的概念和使用

得到map产生的记录后,他们该分配给哪些reducer来处理呢?hadoop默认是根据散列值来派发,但是实际中,这并不能很高效或者按照我们要求的去执行任务。例如,经过partition处理后,一个节点的reducer分配到了20条记录,另一个却分配到了10W万条,试想,这种情况效率如何。又或者,我们想要处理后得到的文件按照一定的规律进行输出,假设有两个reducer,我们想要最终结果中part-00000中存储的是”h”开头的记录的结果,part-00001中存储其他开头的结果,这些默认的partitioner是做不到的。所以需要我们自己定制partition来选择reducer。自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的setPartitionerClass指定一下即可。

MapReduce基于key的全排序的原理

如何使用mapreduce来做全排序?最简单的方法就是使用一个partition,因为一个partition对应一个reduce的task,然而reduce的输入本来就是对key有序的,所以很自然地就产生了一个全排序文件。但是这种方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了mapreduce所提供的并行架构的优势。

如果是分多个partition呢,则只要确保partition是有序的就行了。首先创建一系列排好序的文件;其次,串联这些文件(类似于归并排序);最后得到一个全局有序的文件。比如有1000个1-10000的数据,跑10个ruduce任务,如果进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要concat所有的输出文件,变成一个大的文件,就都是有序的了。

这时候可能会有一个疑问,虽然各个reduce的数据是按照区间排列好的,但是每个reduce里面的数据是乱序的啊?当然不会,不要忘了排序是MapReduce的天然特性 — 在数据达到reducer之前,mapreduce框架已经对这些数据按key排序了。

但是这里又有另外一个问题,就是在定义每个partition的边界的时候,可能会导致每个partition上分配到的记录数相差很大,这样数据最多的partition就会拖慢整个系统。我们期望的是每个partition上分配的数据量基本相同,hadoop提供了采样器帮我们预估整个边界,以使数据的分配尽量平均。

在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区,然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,可以使用hadoop的几种采样工具,如RandomSampler,InputSampler,IntervalSampler。

关于上述过程,在《Hadoop权威指南》中有具体的讲解,其中一张图可以帮助我们更好地理解在排序操作中hadoop在map和reduce阶段所做的事:

在map阶段,根据预先定义的patition规则进行分区,map首先将输出写到缓存中,当缓存内容达到阈值时,将结果spill到硬盘,每一次spill都会在硬盘产生一个spill文件,因此一个map task可能会产生多个spill文件。

接下来进入shuffle阶段,当map写出最后一个输出,需要在map端进行一次merge操作,按照partition和partition内的key进行合并和排序,此时每个partition内按照key值整体有序。

然后开始第二次merge,这次是在reduce端,在此期间数据在内存和磁盘上都有,其实这个阶段的merge并不是严格意义上的排序,只是将多个整体有序的文件merge成一个大的文件,最终完成排序工作。

取样和Partition过程详解

面对大量的数据,为了partition均匀,需要先取样:

1.根据所有数据键值对的数目、所有数据split的数目以及设定的每个split取样数目进行取样,比如原有100亿条数据,10个split,对每个split取样1W条,则总共10W个样本;

2.将10W个样本进行全排序,根据reducer的数量n,取出间隔平均的n-1个样本;

3.将这n-1个样本写入二进制文件(默认是 _partition.lst,是一个SequenceFile);

4.将上述二进制文件写入DistributedCache(所有mapper和reducer共享)。

接下来PartitionerClass来读取这个共享的二进制文件,根据这n-1个key生成一个类似于B-树的Tire树,可以加快查找(以空间换取时间),将所有的map输出根据这n-1个不同范围内的key输出到不同partition,这样可以保证第i个partition输出的键值对都比第i+1个partition的键值对的key小。然后每个partition进行一下局部排序即可,从而达到所有的key全局有序。

Trie树,又称单词查找树或键树,是一种树形结构,哈希树的变种。典型应用是用于统计和排序大量的字符串(但不仅限于字符串),所以经常被搜索引擎系统用于文本词频统计。它的优点是:最大限度地减少无谓的字符串比较,查询效率比哈希表高。它有3个基本特性:

  • 根节点不包含字符,除根节点外每一个节点都只包含一个字符。
  • 从根节点到某一节点,路径上经过的字符连接起来,为该节点对应的字符串。
  • 每个节点的所有子节点包含的字符都不相同。

文字比较晦涩,引用一张示意图:

  • 图中假设有n=20,即有20个reducer(下标0到19),那么我们最终获得n-1个样,即19个样(下标为18的为最后一个样);
  • 图中的圆圈,代表单词查找树上的节点;
  • 叶子节点下面的长方形代表取样数组,红色的数字代表取样的下标;
  • 每个节点都对应取样数组上的一个下标范围(更准备的说,是对应一个partition number的范围,每个partition number代表一个reducer)。这个范围在图中用蓝色的文字标识。

则小于或者等于第i个样的key,被分配到第i个reducer,剩下的被分配到最后一个reducer。具体的partition的过程:

如果key以”AAA”开头,被分配到第“0”个reducer。

如果key以”ACA”开头,被分配到第“4”个reducer。

如果key以”ACD”开头,被分配到第“4”个reducer。

如果key以”ACF”开头,被分配到第“5”个reducer。

如果key以”EDZ”开头,被分配到第“19”个reducer。