三种方法实现 Hadoop(MapReduce) 全局排序 (2) 我在前面的文章介绍了 MapReduce 中两种全排序的方法及其实现 但是上面的两种方法都是有很大的局限性 : 方法一在数据量很大的时候会出现 OOM 问题 ; 方法二虽然能够将数据分散到多个 Reduce 中, 但是问题也很明显 : 我们必须手动地找到各个 Reduce 的分界点, 尽量使得分散到每个 Reduce 的数据量均衡 而且每次修改 Reduce 的个数时, 都得手动去找一次 Key 的分界点! 非常不灵活 本文这里介绍的第三种使用 MapReduce 全局排序的方法算是比较通用了, 而且是内置的实现 使用 TotalOrderPartitioner 进行全排序 我们都知道 Hadoop 内置有个 HashPartitioner 分区实现类,MapReduce 默认就是使用它 ; 但其实 Hadoop 内置还有个名为 TotalOrderPartitioner 的分区实现类, 看名字就清楚它其实就是解决全排序的问题 如果你去看他的实现, 其主要做的事实际上和我们上文介绍的 IteblogPartitioner 分区实现类很类似, 也就是根据 Key 的分界点将不同的 Key 发送到相应的分区 问题是, 上文用到的分界点是我们人为计算的 ; 而这里用到的分界点是由程序解决的! 数据抽样 寻找合适的 Key 分割点需要我们对数据的分布有个大概的了解 ; 如果数据量很大的话, 我们不可能对所有的数据进行分析然后选出 N- (N 代表 Reduce 的个数 ) 个分割点, 最适合的方式是对数据进行抽样, 然后对抽样的数据进行分析并选出合适的分割点 Hadoop 提供了三种抽样的方法 : SplitSampler: 从 s 个 split 中选取前 n 条记录取样 RandomSampler: 随机取样 IntervalSampler: 从 s 个 split 里面按照一定间隔取样, 通常适用于有序数据 这三个抽样都实现了 K[] getsample(inputformat inf, Job job) throws IOException, InterruptedException; 方法 ; 通过调用这个方法我们可以返回抽样到的 Key 数组, 除了 IntervalSampler 类返回的抽样 Key 是有序的, 其他都无序 获取到采样好的 Key 数组之后, 需要对其进行排序, 然后选择好 N- (N 代表 Reduce 的个数 ) 个分割点, 最后将这些 Key 分割点存储到指定的 HDFS 文件中, 存储的文件格式是 SequenceFile, 使用如下 : TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2])); InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.0, 00 0, 00); InputSampler.writePartitionFile(job, sampler); / 7
TotalOrderPartitioner 上面通过 InputSampler.writePartitionFile(job, sampler); 存储好了分割点, 然后 TotalOrderPartitioner 类会在 setconf 函数中读取这个文件, 并根据 Key 的类型分别创建不同的数据结构 : 如果 Key 的类型是 BinaryComparable (BytesWritable 和 Text ), 并且 mapreduce.totalorderpartitioner.naturalorder 属性的指是 true, 则会构建 trie 树, 便于后面的查找 ; 在计算机科学中,trie, 又称前缀树或字典树, 是一种有序树, 用于保存关联数组, 其中的键通常是字符串 与二叉查找树不同, 键不是直接保存在节点中, 而是由节点在树中的位置决定 一个节点的所有子孙都有相同的前缀, 也就是这个节点对应的字符串, 而根节点对应空字符串 一般情况下, 不是所有的节点都有对应的值, 只有叶子节点和部分内部节点所对应的键才有相关的值 ( 摘自 :https://zh.wikipedia.org/wiki/trie) 其他情况会构建一个 BinarySearchNode, 用二分查找 最后程序通过调用 getpartition 函数决定当前 Key 应该发送到哪个 Reduce 中 : public int getpartition(k key, V value, int numpartitions) { return partitions.findpartition(key); 程序实现 下面是使用 TotalOrderPartitioner 类进行全局排序的完整代码 : package com.iteblog.mapreduce.sort; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; 2 / 7
import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.keyvaluetextinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.partition.inputsampler; import org.apache.hadoop.mapreduce.lib.partition.totalorderpartitioner; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import java.io.ioexception; public class TotalSortV3 extends Configured implements Tool { static class SimpleMapper extends Mapper<Text, Text, Text, IntWritable> { protected void map(text key, Text value, Context context) throws IOException, InterruptedException { IntWritable intwritable = new IntWritable(Integer.parseInt(key.toString())); context.write(key, intwritable); static class SimpleReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> { protected void reduce(text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(value, NullWritable.get()); public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(text.class, true); public int compare(writablecomparable w, WritableComparable w2) { int v = Integer.parseInt(w.toString()); int v2 = Integer.parseInt(w2.toString()); return v - v2; public int run(string[] args) throws Exception { 3 / 7
Configuration conf = getconf(); conf.set("mapreduce.totalorderpartitioner.naturalorder", "false"); Job job = Job.getInstance(conf, "Total Order Sorting"); job.setjarbyclass(totalsortv3.class); job.setinputformatclass(keyvaluetextinputformat.class); job.setsortcomparatorclass(keycomparator.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[])); job.setnumreducetasks(3); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(nullwritable.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2])); InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.0, 000, 00); InputSampler.writePartitionFile(job, sampler); job.setpartitionerclass(totalorderpartitioner.class); job.setmapperclass(simplemapper.class); job.setreducerclass(simplereducer.class); job.setjobname("iteblog"); return job.waitforcompletion(true)? 0 : ; public static void main(string[] args) throws Exception { int exitcode = ToolRunner.run(new TotalSortV3(), args); System.exit(exitCode); 运行程序 [iteblog@www.iteblog.com /home/iteblog]$ hadoop jar total-sort-0..jar com.iteblog.mapredu ce.sort.totalsortv3 /user/iteblog/input /user/iteblog/output /user/iteblog/partitions ## 生成的 Key 分割点 [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/partitions (null) 4 / 7
(null) [iteblog@www.iteblog.com ~]$ hadoop fs -ls /user/iteblog/output/ Found 4 items -rw-r--r-- 3 iteblog supergroup 0 207-05-09 6:56 /user/iteblog/output/_success -rw-r--r-- 3 iteblog supergroup 335923 207-05-09 6:56 /user/iteblog/output/part-r-00000 -rw-r--r-- 3 iteblog supergroup 388362 207-05-09 6:56 /user/iteblog/output/part-r-0000 -rw-r--r-- 3 iteblog supergroup 407472 207-05-09 6:56 /user/iteblog/output/part-r-00002 [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00000 head -n 0 0 0 0 [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00000 tail -n 0 0976 0976 0976 [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-0000 head -n 0 [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-0000 tail -n 0 2609 5 / 7
2609 2609 [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00002 head -n 0 [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00002 tail -n 0 注意事项 我们这里使用的 InputFormat 类是 KeyValueTextInputFormat, 而不是 TextInputFormat 因为采样是对 Key 进行的, 而 TextInputFormat 的 Key 是偏移量, 这样的采样结果是无意义的 ; 而如果使用 KeyValueTextInputFormat 作为输入类型, 则可以将数据存放在 Key 中, 从而得到正确的采样结果 2 我们 map 输出 Key 的类型是 Text, 这是没办法的, 因为 InputSampler.writePartitionFile 函数实现的原因, 必须要求 map 输入和输出 Key 的类型一致, 否则会出现如下的异常 : 6 / 7
Powered by TCPDF (www.tcpdf.org) 三种方法实现 Hadoop(MapReduce) 全局排序 (2) Exception in thread "main" java.io.ioexception: wrong key class: org.apache.hadoop.io.text is not class org.apache.hadoop.io.longwritable at org.apache.hadoop.io.sequencefile$recordcompresswriter.append(sequencefile.java:3 80) at com.iteblog.mapreduce.sort.totalsortv3.writepartitionfile(totalsortv3.java:06) at com.iteblog.mapreduce.sort.totalsortv3.run(totalsortv3.java:47) at org.apache.hadoop.util.toolrunner.run(toolrunner.java:70) at org.apache.hadoop.util.toolrunner.run(toolrunner.java:84) at com.iteblog.mapreduce.sort.totalsortv3.main(totalsortv3.java:73) at sun.reflect.nativemethodaccessorimpl.invoke0(native Method) at sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) at sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43 ) at java.lang.reflect.method.invoke(method.java:606) at org.apache.hadoop.util.runjar.main(runjar.java:22) 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 7 / 7