三种方法实现 Hadoop(MapReduce) 全局排序 () 三种方法实现 Hadoop(MapReduce) 全局排序 () 我们可能会有些需求要求 MapReduce 的输出全局有序, 这里说的有序是指 Key 全局有序 但是我们知道,MapReduce 默认只是保证同一个分区内的 Key 是有序的, 但是不保证全局有序 基于此, 本文提供三种方法来对 MapReduce 的输出进行全局排序 生成测试数据 在介绍如何实现之前, 我们先来生成一些测试数据, 实现如下 : #!/bin/sh for i in {..;do echo $RANDOM done; 将上面的代码保存到 iteblog.sh 的文件里面, 然后运行 : $ sh iteblog.sh > data $ sh iteblog.sh > data2 $ hadoop fs -put data /user/iteblog/input $ hadoop fs -put data2 /user/iteblog/input $RANDOM 变量是 Shell 内置的, 使用它能够生成五位内的随机正整数 上面我们一共运行了两次, 这样我们就有两份随机数文件 data 和 data2; 最后我们把生成的随机数文件上传到 HDFS 上 现在我们可以来写程序对这两个文件里面的数据进行排序了 使用一个 Reduce 进行排序 前面我们说了,MapReduce 默认只是保证同一个分区内的 Key 是有序的, 但是不保证全局有序 如果我们将所有的数据全部发送到一个 Reduce, 那么不就可以实现结果全局有序吗? 这种方法实现很简单, 如下 : package com.iteblog.mapreduce.sort; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; / 9
三种方法实现 Hadoop(MapReduce) 全局排序 () import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import java.io.ioexception; public class TotalSortV extends Configured implements Tool { static class SimpleMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { protected void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable intwritable = new IntWritable(Integer.parseInt(value.toString())); context.write(intwritable, intwritable); static class SimpleReducer extends Reducer<IntWritable, IntWritable, IntWritable, NullWritable> { protected void reduce(intwritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(value, NullWritable.get()); public int run(string[] args) throws Exception { if (args.length!= 2) { System.err.println("<input> <output>"); System.exit(27); Job job = Job.getInstance(getConf()); job.setjarbyclass(totalsortv.class); FileInputFormat.addInputPath(job, new Path(args[])); FileOutputFormat.setOutputPath(job, new Path(args[])); 2 / 9
三种方法实现 Hadoop(MapReduce) 全局排序 () job.setmapperclass(simplemapper.class); job.setreducerclass(simplereducer.class); job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(nullwritable.class); job.setnumreducetasks(); job.setjobname("totalsort"); return job.waitforcompletion(true)? : ; public static void main(string[] args) throws Exception { int exitcode = ToolRunner.run(new TotalSort(), args); System.exit(exitCode); 上面程序的实现很简单, 我们直接使用 TextInputFormat 类来读取上面生成的随机数文件 (data 和 data2) 因为文件里面的数据是正整数, 所以我们在 SimpleMapper 类里面直接将 value 转换成 int 类型, 然后赋值给 IntWritable 等数据到 SimpleReducer 的时候, 同一个 Reduce 里面的 Key 已经全部有序 ; 因为我们设置了一个 Reduce 作业, 这样的话, 我们就实现了数据全局有序 运行如下 : [iteblog@www.iteblog.com /home/iteblog]$ hadoop jar totalsort-..jar com.iteblog.mapreduce.sort.totalsortv /user/iteblog/input /user/iteblog/output [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output Found 2 items -rw-r--r-- 3 iteblog supergroup 27-5-9 :4 /user/iteblog/output/_success -rw-r--r-- 3 iteblog supergroup 3757 27-5-9 :4 /user/iteblog/output/part-r- [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- head -n 3 / 9
三种方法实现 Hadoop(MapReduce) 全局排序 () [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- tail -n 从上面的测试结果也可以看出, 我们只生成了一个数据文件, 而且这个文件里面的数据已经全局有序了 自定义分区函数实现全局有序 上面实现数据全局有序有个很大的局限性 : 所有的数据都发送到一个 Reduce 进行排序, 这样不能充分利用集群的计算资源, 而且在数据量很大的情况下, 很有可能会出现 OOM 问题 我们分析一下,MapReduce 默认的分区函数是 HashPartitioner, 其实现的原理是计算 map 输出 key 的 hashcode, 然后对 Reduce 个数求模, 这样只要求模结果一样的 Key 都会发送到同一个 Reduce 如果我们能够实现一个分区函数, 使得 所有 Key 所有 其余的 Key 都发送到 Reduce 2; 这就实现了 Reduce 的数据一定全部小于 Reduce, 且 Reduce 的数据全部小于 Reduce 2, 再加上同一个 Reduce 里面的数据局部有序, 这样就实现了数据的全局有序 实现如下 : package com.iteblog.mapreduce.sort; import com.iteblog.mapreduce.secondsort.intpair; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; 4 / 9
三种方法实现 Hadoop(MapReduce) 全局排序 () import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import java.io.ioexception; public class TotalSortV2 extends Configured implements Tool { static class SimpleMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { protected void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable intwritable = new IntWritable(Integer.parseInt(value.toString())); context.write(intwritable, intwritable); static class SimpleReducer extends Reducer<IntWritable, IntWritable, IntWritable, NullWrita ble> { protected void reduce(intwritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(value, NullWritable.get()); public static class IteblogPartitioner extends Partitioner<IntWritable, IntWritable> { public int getpartition(intwritable key, IntWritable value, int numpartitions) { int keyint = Integer.parseInt(key.toString()); if (keyint < ) { return ; else if (keyint < 2) { return ; else { return 2; 5 / 9
三种方法实现 Hadoop(MapReduce) 全局排序 () public int run(string[] args) throws Exception { if (args.length!= 2) { System.err.println("<input> <output>"); System.exit(27); Job job = Job.getInstance(getConf()); job.setjarbyclass(totalsortv2.class); FileInputFormat.addInputPath(job, new Path(args[])); FileOutputFormat.setOutputPath(job, new Path(args[])); job.setmapperclass(simplemapper.class); job.setreducerclass(simplereducer.class); job.setpartitionerclass(iteblogpartitioner.class); job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(nullwritable.class); job.setnumreducetasks(3); job.setjobname("dw_subject"); return job.waitforcompletion(true)? : ; public static void main(string[] args) throws Exception { int exitcode = ToolRunner.run(new TotalSortV2(), args); System.exit(exitCode); 第二版的排序实现除了自定义的 IteblogPartitioner, 其余的和第一种实现一样 现在我们来运行一下 : [iteblog@www.iteblog.com /home/iteblog]$ hadoop jar totalsort-..jar com.iteblog.mapreduce.sort.totalsortv2 /user/iteblog/input /user/iteblog/output [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output Found 4 items -rw-r--r-- 3 iteblog supergroup 27-5-9 3:53 /user/iteblog/output/_success -rw-r--r-- 3 iteblog supergroup 299845 27-5-9 3:53 /user/iteblog/output/partr- -rw-r--r-- 3 iteblog supergroup 3659 27-5-9 3:53 /user/iteblog/output/partr- 6 / 9
三种方法实现 Hadoop(MapReduce) 全局排序 () 466722 27-5-9 3:53 /user/iteblog/output/part- -rw-r--r-- 3 iteblog supergroup r-2 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- head -n [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- tail -n 9998 9998 9998 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- head -n [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- tail -n 9997 9997 7 / 9
三种方法实现 Hadoop(MapReduce) 全局排序 () 9998 9998 9998 9998 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr-2 head -n 2 2 2 2 2 2 2 2 2 2 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr-2 tail -n 我们已经看到了这个程序生成了三个文件 ( 因为我们设置了 Reduce 个数为 3), 而且每个文件都是局部有序 ; 所有小于 的数据都在 part-r- 里面, 所有小于 2 的数据都在 part-r- 里面, 所有大于 2 的数据都在 part-r-2 里面 part-r- part-r- 和 partr-2 三个文件实现了全局有序 这个方法也实现了数据的全局有序, 但是也有一些问题, 明天我再写一篇文章介绍第三种数据全局排序的方法 8 / 9
Powered by TCPDF (www.tcpdf.org) 三种方法实现 Hadoop(MapReduce) 全局排序 () 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 9 / 9