Hadoop&Spark 解决二次排序问题 (Spark 篇 ) 问题描述 二次排序就是 key 之间有序, 而且每个 Key 对应的 value 也是有序的 ; 也就是对 MapReduce 的输出 (KEY, Value(v 1,v 2,v 3,...,v n )) 中的 Value(v 1,v 2,v 3,...,v n ) 值进行排序 ( 升序或者降序 ), 使得 Value(s 1,s 2,s 3,...,s n ),s i (v 1,v 2,v 3,...,v n ) 且 s 1 < s 2 < s 3 <... < s n 假设我们有以下输入文件 ( 逗号分割的分别是年, 月, 总数 ): [root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0 我们期望的输出结果是 2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5 但是 Hadoop 默认的输出结果只能对 Key 进行排序, 其中 Value 中的值次序是不定的 ; 也就是说,Hadoop 默认的输出可能如下 : 1 / 9
2014-2 64 2015-1 21,4,3,24 2015-2 0,35,-43 2015-3 46,56 2015-4 5 解决方案 针对这个问题我们有两种方法来解决 :(1) 将每个 Key 对应的 Value 全部存储到内存 ( 这个只会存储到单台机器 ), 然后对这些 Value 进行相应的排序 但是如果 Value 的数据量非常大, 导致单台内存无法存储这些数据, 这将会导致程序出现 java.lang.outofmemoryerror, 所以这个方法不是很通用 (2) 这种方法将 Value 中的值和旧的 Key 组成一个新的 Key, 这样我们就可以利用 Reduce 来排序这个 Key, 其生成的结果就是我们需要的 过程如下 : 1 原始的键值对是 (k,v) 这里的 k 就是就的 key, 也可以称为 natural key; 2 我们可以将 k 和 v 组合成新的 key( 可以称为 composite key), 也就是 ((k,v), v) 3 自定义分区函数, 将 k 相同的键值对发送到同一个 Reduce 中 ; 4 自定义分组函数, 将 k 相同的键值对当作一个分组 文字比较枯燥, 我们来看看下面实例 : 1 原始数据是 [root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0 我们将年 月组成 key(natural key), 总数作为 value, 结果变成 : (2015-1,24) (2015-3,56) (2015-1,3) (2015-2,-43) (2015-4,5) 2 / 9
(2015-3,46) (2014-2,64) (2015-1,4) (2015-1,21) (2015-2,35) (2015-2,0) 2 将 value 和 key(natural key) 组成新的 key(composite key), 如下 : ((2015-1,24),24) ((2015-3,56),56) ((2015-1,3),3) ((2015-2,-43),-43) ((2015-4,5),5) ((2015-3,46),46) ((2014-2,64),64) ((2015-1,4),4) ((2015-1,21),21) ((2015-2,35),35) ((2015-2,0),0) 3 自定义分区函数, 将 k 相同的键值对发送到同一个 Reduce 中, 结果如下 : [((2014-2,64),64)] [((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,56),56),((2015-3,46),46)] [((2015-4,5),5)] 4 自定义组排序函数, 结果如下 : [((2014-2,64),64)] [((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,46),46),((2015-3,56),56)] [((2015-4,5),5)] 3 / 9
5 自定义分组函数, 结果如下 : ((2014-2,64),(64)) ((2015-1,24),(3,4,21,24)) ((2015-2,35),(-43,0,35)) ((2015-3,56),(46,56)) ((2015-4,5),(5)) 6 最后输出的结果就是我们要的 : 2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5 代码实例 下面将贴出使用 MapReduce 解决这个问题的代码 : package com.iteblog; import org.apache.hadoop.io.writablecomparable; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午 23:49 * bolg: https://www.iteblog.com * 本文地址 :https://www.iteblog.com/archives/1415 * 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 * 过往记忆博客微信公共帐号 :iteblog_hadoop */ public class Entry implements WritableComparable<Entry> { 4 / 9
private String yearmonth; private int count; public Entry() { public int compareto(entry entry) { int result = this.yearmonth.compareto(entry.getyearmonth()); if (result == 0) { result = compare(count, entry.getcount()); return result; public void write(dataoutput dataoutput) throws IOException { dataoutput.writeutf(yearmonth); dataoutput.writeint(count); public void readfields(datainput datainput) throws IOException { this.yearmonth = datainput.readutf(); this.count = datainput.readint(); public String getyearmonth() { return yearmonth; public void setyearmonth(string yearmonth) { this.yearmonth = yearmonth; public int getcount() { return count; public void setcount(int count) { this.count = count; public static int compare(int a, int b) { return a < b? -1 : (a > b? 1 : 0); 5 / 9
public String tostring() { return yearmonth; 上面就是将旧的 Key(natural key) 和 Value 组合成新的 Key(composite key) 的代码, 接下来看下自定义的分区类 : package com.iteblog; import org.apache.hadoop.mapreduce.partitioner; public class EntryPartitioner extends Partitioner<Entry, Integer> { public int getpartition(entry entry, Integer integer, int numberpartitions) { return Math.abs((entry.getYearMonth().hashCode() % numberpartitions)); 这个类使得 natural key 相同的数据分派到同一个 Reduce 中 然后看下自定义分组类 : package com.iteblog; import org.apache.hadoop.io.writablecomparable; import org.apache.hadoop.io.writablecomparator; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午 23:49 * bolg: https://www.iteblog.com * 本文地址 :https://www.iteblog.com/archives/1415 * 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 * 过往记忆博客微信公共帐号 :iteblog_hadoop */ public class EntryGroupingComparator extends WritableComparator { public EntryGroupingComparator() { 6 / 9
super(entry.class, true); public int compare(writablecomparable a, WritableComparable b) { Entry a1 = (Entry) a; Entry b1 = (Entry) b; return a1.getyearmonth().compareto(b1.getyearmonth()); 只要是 natural key 相同, 我们就认为是同一个分组, 这样 Reduce 内部才可以对 Value 中的值进行排序 接下来看下 Map 类 public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> { private Entry entry = new Entry(); private Text value = new Text(); protected void map(longwritable key, Text lines, Context context) throws IOException, InterruptedException { String line = lines.tostring(); String[] tokens = line.split(","); // YYYY = tokens[0] // MM = tokens[1] // count = tokens[2] String yearmonth = tokens[0] + "-" + tokens[1]; int count = Integer.parseInt(tokens[2]); entry.setyearmonth(yearmonth); entry.setcount(count); value.set(tokens[2]); context.write(entry, value); 其实就是解析每一行的数据, 然后将旧的 Key(natural key) 和 Value 组合成新的 Key(composite key) 接下来看下 Reduce 类实现 7 / 9
public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> { protected void reduce(entry key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder builder = new StringBuilder(); for (Text value : values) { builder.append(value.tostring()); builder.append(","); context.write(key, new Text(builder.toString())); builder 存储的就是排序好的 Value 序列, 最后来看看启动程序的使用 : Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setjarbyclass(iteblog.class); job.setjobname("secondarysort"); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setoutputkeyclass(entry.class); job.setoutputvalueclass(text.class); job.setmapperclass(secondarysortmapper.class); job.setreducerclass(secondarysortreducer.class); job.setpartitionerclass(entrypartitioner.class); job.setgroupingcomparatorclass(entrygroupingcomparator.class); 关键看上面第 12-15 行的代码 下面是运行这个程序的方法和结果 : [root@iteblog.com /hadoop]# bin/hadoop jar /tmp/iteblog-1.0-snapshot.jar com.iteblog.main /iteblog/data.txt /iteblog/output [root@iteblog.com /hadoop]# bin/hadoop fs -cat /iteblog/output/pa* 2014-2 64, 2015-1 3,4,21,24, 8 / 9
Powered by TCPDF (www.tcpdf.org) 2015-2 -43,0,35, 2015-3 46,56, 2015-4 5, 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 9 / 9