MapReduce 应用案例 本文版权归作者和博客园共有, 欢迎转载, 但未经作者同意必须保留此段声明, 且在文章页面明显位置给出原文连接, 博主为石山园, 博客地址为 http://www.cnblogs.com/shishanyuan 该系列课程是应邀实验楼整理编写的, 这里需要赞一下实验楼提供了学习的新方式, 可以边看博客边上机实验, 课程地址为 https://www.shiyanlou.com/courses/237 注 该系列所使用到安装包 测试数据和代码均可在百度网盘下载, 具体地址为 http://pan.baidu.com/s/10pnds, 下载该 PDF 文件 1 环境说明 部署节点操作系统为 CentOS, 防火墙和 SElinux 禁用, 创建了一个 shiyanlou 用户并在系统根目录下创建 /app 目录, 用于存放 Hadoop 等组件运行包 因为该目录用于安装 hadoop 等组件程序, 用户对 shiyanlou 必须赋予 rwx 权限 ( 一般做法是 root 用户在根目录下创建 /app 目录, 并修改该目录拥有者为 shiyanlou(chown R shiyanlou:shiyanlou /app) Hadoop 搭建环境 : 虚拟机操作系统 : CentOS6.6 64 位, 单核,1G 内存 JDK:1.7.0_55 64 位 Hadoop:1.1.2 2 准备测试数据 测试数据包括两个文件 dept( 部门 ) 和 emp( 员工 ), 其中各字段用逗号分隔 : dept 文件内容 : 10,ACCOUNTING,NEW YORK 20,RESEARCH,DALLAS 30,SALES,CHICAGO 40,OPERATIONS,BOSTON emp 文件内容 : 7369,SMITH,CLERK,7902,17-12 月 -80,800,,20 7499,ALLEN,SALESMAN,7698,20-2 月 -81,1600,300,30 第 1 页共 57 页
7521,WARD,SALESMAN,7698,22-2 月 -81,1250,500,30 7566,JONES,MANAGER,7839,02-4 月 -81,2975,,20 7654,MARTIN,SALESMAN,7698,28-9 月 -81,1250,1400,30 7698,BLAKE,MANAGER,7839,01-5 月 -81,2850,,30 7782,CLARK,MANAGER,7839,09-6 月 -81,2450,,10 7839,KING,PRESIDENT,,17-11 月 -81,5000,,10 7844,TURNER,SALESMAN,7698,08-9 月 -81,1500,0,30 7900,JAMES,CLERK,7698,03-12 月 -81,950,,30 7902,FORD,ANALYST,7566,03-12 月 -81,3000,,20 7934,MILLER,CLERK,7782,23-1 月 -82,1300,,10 在 /home/shiyanlou/install-pack/class6 目录可以找到这两个文件, 把这两个文件上传到 HDFS 中 /class6/input 目录中, 执行如下命令 : cd /home/shiyanlou/install-pack/class6 hadoop fs -mkdir -p /class6/input hadoop fs -copyfromlocal dept /class6/input hadoop fs -copyfromlocal emp /class6/input hadoop fs -ls /class6/input 第 2 页共 57 页
3 应用案例 3.1 测试例子 1: 求各个部门的总工资 3.1.1 问题分析 MapReduce 中的 join 分为好几种, 比如有最常见的 reduce side join map side join 和 semi join 等 reduce join 在 shuffle 阶段要进行大量的数据传输, 会造成大量的网络 IO 效率低下, 而 map side join 在处理多个小表关联大表时非常有用 Map side join 是针对以下场景进行的优化 : 两个待连接表中, 有一个表非常大, 而另一个表非常小, 以至于小表可以直接存放到内存中 这样我们可以将小表复制多份, 让每个 map task 内存中存在一份 ( 比如存放到 hash table 中 ), 然后只扫描大表 : 对于大表中的每一条记录 key/value, 在 hash table 中查找是否有相同的 key 的记录, 如果有, 则连接后输出即可 为了支持文件的复制,Hadoop 提供了一个类 DistributedCache, 使用该类的方法如下 : (1) 用户使用静态方法 DistributedCache.addCacheFile() 指定要复制的文件, 它的参数是文件的 URI ( 如果是 HDFS 上的文件, 可以这样 : hdfs://jobtracker:50030/home/xxx/file) JobTracker 在作业启动之前会获取这个 URI 列表, 并将相应的文件拷贝到各个 TaskTracker 的本地磁盘上 (2) 用户使用 DistributedCache.getLocalCacheFiles() 方法获取文件目录, 并使用标准的文件读写 API 读取相应的文件 在下面代码中, 将会把数据量小的表 ( 部门 dept) 缓存在内存中, 在 Mapper 阶段对员工部门编号映射成部门名称, 该名称作为 key 输出到 Reduce 中, 在 Reduce 中计算按照部门计算各个部门的总工资 第 3 页共 57 页
3.1.2 处理流程图 3.1.3 测试代码 Q1SumDeptSalary.java 代码 (vi 编辑代码是不能存在中文 ): import java.io.bufferedreader; import java.io.filereader; import java.io.ioexception; import java.util.hashmap; import java.util.map; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.filecache.distributedcache; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; 第 4 页共 57 页
public class Q1SumDeptSalary extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用于缓存 dept 文件中的数据 private Map<String, String> deptmap = new HashMap<String, String>(); private String[] kv; // 此方法会在 Map 方法执行之前执行且执行一次 @Override protected void setup(context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String deptidname = null; for (Path path : paths) { // 对部门文件字段进行拆分并缓存到 deptmap 中 if (path.tostring().contains("dept")) { in = new BufferedReader(new FileReader(path.toString())); while (null!= (deptidname = in.readline())) { // 对部门文件字段进行拆分并缓存到 deptmap 中 // 其中 Map 中 key 为部门编号,value 为所在部门名称 deptmap.put(deptidname.split(",")[0], deptidname.split(",")[1]); catch (IOException e) { e.printstacktrace(); finally { try { if (in!= null) { in.close(); catch (IOException e) { e.printstacktrace(); public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 第 5 页共 57 页
kv = value.tostring().split(","); // map join: 在 map 阶段过滤掉不需要的数据, 输出 key 为部门名称和 value 为员工工资 if (deptmap.containskey(kv[7])) { if (null!= kv[5] &&!"".equals(kv[5].tostring())) { context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { public void reduce(text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 对同一部门的员工工资进行求和 long sumsalary = 0; for (Text val : values) { sumsalary += Long.parseLong(val.toString()); // 输出 key 为部门名称和 value 为该部门员工工资总和 context.write(key, new LongWritable(sumSalary)); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Mapper 和 Reduce 类 Job job = new Job(getConf(), "Q1SumDeptSalary"); job.setjobname("q1sumdeptsalary"); job.setjarbyclass(q1sumdeptsalary.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // 设置输入格式类 job.setinputformatclass(textinputformat.class); // 设置输出格式 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); // 第 1 个参数为缓存的部门数据路径 第 2 个参数为员工数据路径和第 3 个参数为输出路径 第 6 页共 57 页
String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getconfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args); System.exit(res); 3.1.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q1SumDeptSalary.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q1sumdeptsalary.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q1SumDeptSalary.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java 把编译好的代码打成 jar 包 ( 如果不打成 jar 形式运行会提示 class 无法找到的错误 ) jar cvf./q1sumdeptsalary.jar./q1sumdept*.class mv *.jar../.. rm Q1SumDept*.class 第 7 页共 57 页
3.1.5 运行并查看结果 运行 Q1SumDeptSalary 时需要输入部门数据路径 员工数据路径和输出路径三个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 部门数据路径 :hdfs://hadoop:9000/class6/input/dept, 部门数据将缓存在各运行任务的节点内容中, 可以提供处理的效率 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out1 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out1 目录, 打开 part-r-00000 文件 第 8 页共 57 页
hadoop fs -ls /class6/out1 hadoop fs -cat /class6/out1/part-r-00000 可以看到运行结果 : ACCOUNTING 8750 RESEARCH 6775 SALES 9400 3.2 测试例子 2: 求各个部门的人数和平均工资 3.2.1 问题分析 求各个部门的人数和平均工资, 需要得到各部门工资总数和部门人数, 通过两者相除获取各部门平均工资 首先和问题 1 类似在 Mapper 的 Setup 阶段缓存部门数据, 然后在 Mapper 阶段抽取出部门编号和员工工资, 利用缓存部门数据把部门编号对应为部门名称, 接着在 Shuffle 阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表, 最后在 Reduce 中按照部门归组, 遍历部门所有员工, 求出总数和员工数, 输出部门名称和平均工资 3.2.2 处理流程图 第 9 页共 57 页
3.2.3 编写代码 Q2DeptNumberAveSalary.java 代码 : import java.io.bufferedreader; import java.io.filereader; import java.io.ioexception; import java.util.hashmap; import java.util.map; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.filecache.distributedcache; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q2DeptNumberAveSalary extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用于缓存 dept 文件中的数据 private Map<String, String> deptmap = new HashMap<String, String>(); private String[] kv; // 此方法会在 Map 方法执行之前执行且执行一次 @Override protected void setup(context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String deptidname = null; for (Path path : paths) { // 对部门文件字段进行拆分并缓存到 deptmap 中 if (path.tostring().contains("dept")) { 第 10 页共 57 页
in = new BufferedReader(new FileReader(path.toString())); while (null!= (deptidname = in.readline())) { // 对部门文件字段进行拆分并缓存到 deptmap 中 // 其中 Map 中 key 为部门编号,value 为所在部门名称 deptmap.put(deptidname.split(",")[0], deptidname.split(",")[1]); catch (IOException e) { e.printstacktrace(); finally { try { if (in!= null) { in.close(); catch (IOException e) { e.printstacktrace(); public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 kv = value.tostring().split(","); // map join: 在 map 阶段过滤掉不需要的数据, 输出 key 为部门名称和 value 为员工工资 if (deptmap.containskey(kv[7])) { if (null!= kv[5] &&!"".equals(kv[5].tostring())) { context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long sumsalary = 0; int deptnumber = 0; // 对同一部门的员工工资进行求和 for (Text val : values) { 第 11 页共 57 页
sumsalary += Long.parseLong(val.toString()); deptnumber++; // 输出 key 为部门名称和 value 为该部门员工工资平均值 context.write(key, new Text("Dept Number:" + deptnumber + ", Ave Salary:" + sumsalary / deptnumber)); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Mapper 和 Reduce 类 Job job = new Job(getConf(), "Q2DeptNumberAveSalary"); job.setjobname("q2deptnumberavesalary"); job.setjarbyclass(q2deptnumberavesalary.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // 设置输入格式类 job.setinputformatclass(textinputformat.class); // 设置输出格式类 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); // 第 1 个参数为缓存的部门数据路径 第 2 个参数为员工数据路径和第 3 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getconfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args); System.exit(res); 第 12 页共 57 页
3.2.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q2DeptNumberAveSalary.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q2deptnumberavesalary.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q2DeptNumberAveSalary.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q2deptnumberavesalary.jar./q2deptnum*.class mv *.jar../.. rm Q2DeptNum*.class 3.2.5 运行并查看结果 运行 Q2DeptNumberAveSalary 时需要输入部门数据路径 员工数据路径和输出路径三个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 部门数据路径 :hdfs://hadoop:9000/class6/input/dept, 部门数据将缓存在各运行任务的节点内容中, 可以提供处理的效率 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out2 第 13 页共 57 页
运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out2 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out2 目录 hadoop fs -ls /class6/out2 hadoop fs -cat /class6/out2/part-r-00000 打开 part-r-00000 文件, 可以看到运行结果 : ACCOUNTING Dept Number:3,Ave Salary:2916 RESEARCH Dept Number:3,Ave Salary:2258 SALES Dept Number:6,Ave Salary:1566 3.3 测试例子 3: 求每个部门最早进入公司的员工姓名 3.3.1 问题分析 求每个部门最早进入公司员工姓名, 需要得到各部门所有员工的进入公司日期, 通过比较获取最早进入公司员工姓名 首先和问题 1 类似在 Mapper 的 Setup 阶段缓存部门数据, 然后 Mapper 阶段抽取出 key 为部门名称 ( 利用缓存部门数据把部门编号对应为部门名称 ),value 为员工姓名和进入公司日期, 接着在 Shuffle 阶段把传过来的数据处理为部门名称对应该部门所有员工 + 进入公司日期的列表, 最后在 Reduce 中按照部门归组, 遍历部门所有员工, 找出最早进入公司的员工并输出 第 14 页共 57 页
3.3.2 处理流程图 3.3.3 编写代码 import java.io.bufferedreader; import java.io.filereader; import java.io.ioexception; import java.text.dateformat; import java.text.parseexception; import java.text.simpledateformat; import java.util.date; import java.util.hashmap; import java.util.map; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.filecache.distributedcache; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; 第 15 页共 57 页
import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q3DeptEarliestEmp extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用于缓存 dept 文件中的数据 private Map<String, String> deptmap = new HashMap<String, String>(); private String[] kv; // 此方法会在 Map 方法执行之前执行且执行一次 @Override protected void setup(context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String deptidname = null; for (Path path : paths) { if (path.tostring().contains("dept")) { in = new BufferedReader(new FileReader(path.toString())); while (null!= (deptidname = in.readline())) { // 对部门文件字段进行拆分并缓存到 deptmap 中 // 其中 Map 中 key 为部门编号,value 为所在部门名称 deptmap.put(deptidname.split(",")[0], deptidname.split(",")[1]); catch (IOException e) { e.printstacktrace(); finally { try { if (in!= null) { in.close(); catch (IOException e) { e.printstacktrace(); public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { 第 16 页共 57 页
// 对员工文件字段进行拆分 kv = value.tostring().split(","); // map join: 在 map 阶段过滤掉不需要的数据 // 输出 key 为部门名称和 value 为员工姓名 +","+ 员工进入公司日期 if (deptmap.containskey(kv[7])) { if (null!= kv[4] &&!"".equals(kv[4].tostring())) { context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim() + "," + kv[4].trim())); public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 员工姓名和进入公司日期 String empname = null; String empenterdate = null; // 设置日期转换格式和最早进入公司的员工 日期 DateFormat df = new SimpleDateFormat("dd-MM 月 -yy"); Date earliestdate = new Date(); String earliestemp = null; // 遍历该部门下所有员工, 得到最早进入公司的员工信息 for (Text val : values) { empname = val.tostring().split(",")[0]; empenterdate = val.tostring().split(",")[1].tostring().trim(); try { System.out.println(df.parse(empEnterDate)); if (df.parse(empenterdate).compareto(earliestdate) < 0) { earliestdate = df.parse(empenterdate); earliestemp = empname; catch (ParseException e) { e.printstacktrace(); // 输出 key 为部门名称和 value 为该部门最早进入公司员工 context.write(key, new Text("The earliest emp of dept:" + earliestemp + ", Enter date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate))); 第 17 页共 57 页
@Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q3DeptEarliestEmp"); job.setjobname("q3deptearliestemp"); // 设置 Mapper 和 Reduce 类 job.setjarbyclass(q3deptearliestemp.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // 设置输入格式类 job.setinputformatclass(textinputformat.class); // 设置输出格式类 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); // 第 1 个参数为缓存的部门数据路径 第 2 个参数为员工数据路径和第三个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getconfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args); System.exit(res); 第 18 页共 57 页
3.3.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q3DeptEarliestEmp.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q3deptearliestemp.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q3DeptEarliestEmp.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q3DeptEarliestEmp.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q3deptearliestemp.jar./q3deptear*.class mv *.jar../.. rm Q3DeptEar*.class 3.3.5 运行并查看结果 运行 Q3DeptEarliestEmp 时需要输入部门数据路径 员工数据路径和输出路径三个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 部门数据路径 :hdfs://hadoop:9000/class6/input/dept, 部门数据将缓存在各运行任务的节点内容中, 可以提供处理的效率 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out3 运行如下命令 : cd /app/hadoop-1.1.2 第 19 页共 57 页
hadoop jar Q3DeptEarliestEmp.jar Q3DeptEarliestEmp hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out3 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out3 目录 hadoop fs -ls /class6/out3 hadoop fs -cat /class6/out3/part-r-00000 打开 part-r-00000 文件, 可以看到运行结果 : ACCOUNTING The earliest emp of dept:clark, Enter date:1981-06-09 RESEARCH The earliest emp of dept:smith, Enter date:1980-12-17 SALES The earliest emp of dept:allen, Enter date:1981-02-20 3.4 测试例子 4: 求各个城市的员工的总工资 3.4.1 问题分析 求各个城市员工的总工资, 需要得到各个城市所有员工的工资, 通过对各个城市所有员工工资求和得到总工资 首先和测试例子 1 类似在 Mapper 的 Setup 阶段缓存部门对应所在城市数据, 然后在 Mapper 阶段抽取出 key 为城市名称 ( 利用缓存数据把部门编号对应为所在城市名称 ), value 为员工工资, 接着在 Shuffle 阶段把传过来的数据处理为城市名称对应该城市所有员工工资, 最后在 Reduce 中按照城市归组, 遍历城市所有员工, 求出工资总数并输出 第 20 页共 57 页
3.4.2 处理流程图 3.4.3 编写代码 import java.io.bufferedreader; import java.io.filereader; import java.io.ioexception; import java.util.hashmap; import java.util.map; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.filecache.distributedcache; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q4SumCitySalary extends Configured implements Tool { 第 21 页共 57 页
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用于缓存 dept 文件中的数据 private Map<String, String> deptmap = new HashMap<String, String>(); private String[] kv; // 此方法会在 Map 方法执行之前执行且执行一次 @Override protected void setup(context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String deptidname = null; for (Path path : paths) { if (path.tostring().contains("dept")) { in = new BufferedReader(new FileReader(path.toString())); while (null!= (deptidname = in.readline())) { // 对部门文件字段进行拆分并缓存到 deptmap 中 // 其中 Map 中 key 为部门编号,value 为所在城市名称 deptmap.put(deptidname.split(",")[0], deptidname.split(",")[2]); catch (IOException e) { e.printstacktrace(); finally { try { if (in!= null) { in.close(); catch (IOException e) { e.printstacktrace(); public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 kv = value.tostring().split(","); // map join: 在 map 阶段过滤掉不需要的数据, 输出 key 为城市名称和 value 为员工工资 if (deptmap.containskey(kv[7])) { 第 22 页共 57 页
if (null!= kv[5] &&!"".equals(kv[5].tostring())) { context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim())); public static class Reduce extends Reducer<Text, Text, Text, LongWritable> { public void reduce(text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 对同一城市的员工工资进行求和 long sumsalary = 0; for (Text val : values) { sumsalary += Long.parseLong(val.toString()); // 输出 key 为城市名称和 value 为该城市工资总和 context.write(key, new LongWritable(sumSalary)); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q4SumCitySalary"); job.setjobname("q4sumcitysalary"); // 设置 Mapper 和 Reduce 类 job.setjarbyclass(q4sumcitysalary.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // 设置输入格式类 job.setinputformatclass(textinputformat.class); // 设置输出格式类 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); // 第 1 个参数为缓存的部门数据路径 第 2 个参数为员工数据路径和第 3 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), 第 23 页共 57 页
job.getconfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args); System.exit(res); 3.4.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q4SumCitySalary.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q4sumcitysalary.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q4SumCitySalary.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q4SumCitySalary.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q4sumcitysalary.jar./q4sumcity*.class mv *.jar../.. rm Q4SumCity*.class 第 24 页共 57 页
3.4.5 运行并查看结果 运行 Q4SumCitySalary 时需要输入部门数据路径 员工数据路径和输出路径三个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 部门数据路径 :hdfs://hadoop:9000/class6/input/dept, 部门数据将缓存在各运行任务的节点内容中, 可以提供处理的效率 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out4 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q4SumCitySalary.jar Q4SumCitySalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out4 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out4 目录 hadoop fs -ls /class6/out4 hadoop fs -cat /class6/out4/part-r-00000 第 25 页共 57 页
打开 part-r-00000 文件, 可以看到运行结果 : CHICAGO 9400 DALLAS 6775 NEW YORK 8750 3.5 测试例子 5: 列出工资比上司高的员工姓名及其工资 3.5.1 问题分析 求工资比上司高的员工姓名及工资, 需要得到上司工资及上司所有下属员工, 通过比较他们工资高低得到比上司工资高的员工 在 Mapper 阶段输出经理数据和员工对应经理表数据, 其中经理数据 key 为员工编号 value 为 "M, 该员工工资 ", 员工对应经理表数据 key 为经理编号 value 为 "E, 该员工姓名, 该员工工资 "; 然后在 Shuffle 阶段把传过来的经理数据和员工对应经理表数据进行归组, 如编号为 7698 员工,value 中标志 M 为自己工资,value 中标志 E 为其下属姓名及工资 ; 最后在 Reduce 中遍历比较员工与经理工资高低, 输出工资高于经理的员工 3.5.2 处理流程图 第 26 页共 57 页
3.5.3 编写代码 import java.io.ioexception; import java.util.hashmap; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q5EarnMoreThanManager extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 String[] kv = value.tostring().split(","); // 输出经理表数据, 其中 key 为员工编号和 value 为 M+ 该员工工资 context.write(new Text(kv[0].toString()), new Text("M," + kv[5])); // 输出员工对应经理表数据, 其中 key 为经理编号和 value 为 (E, 该员工姓名, 该员工工资 ) if (null!= kv[3] &&!"".equals(kv[3].tostring())) { context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5])); public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 定义员工姓名 工资和存放部门员工 Map 第 27 页共 57 页
String empname; long empsalary = 0; HashMap<String, Long> empmap = new HashMap<String, Long>(); // 定义经理工资变量 long mgrsalary = 0; for (Text val : values) { if (val.tostring().startswith("e")) { // 当是员工标示时, 获取该员工对应的姓名和工资并放入 Map 中 empname = val.tostring().split(",")[1]; empsalary = Long.parseLong(val.toString().split(",")[2]); empmap.put(empname, empsalary); else { // 当时经理标志时, 获取该经理工资 mgrsalary = Long.parseLong(val.toString().split(",")[1]); // 遍历该经理下属, 比较员工与经理工资高低, 输出工资高于经理的员工 for (java.util.map.entry<string, Long> entry : empmap.entryset()) { if (entry.getvalue() > mgrsalary) { context.write(new Text(entry.getKey()), new Text("" + entry.getvalue())); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q5EarnMoreThanManager"); job.setjobname("q5earnmorethanmanager"); // 设置 Mapper 和 Reduce 类 job.setjarbyclass(q5earnmorethanmanager.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // 设置输入格式类 job.setinputformatclass(textinputformat.class); // 设置输出格式类 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); 第 28 页共 57 页
// 第 1 个参数为员工数据路径和第 2 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args); System.exit(res); 3.5.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q5EarnMoreThanManager.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q5earnmorethanmanager.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q5EarnMoreThanManager.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q5EarnMoreThanManager.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q5earnmorethanmanager.jar./q5earnmore*.class mv *.jar../.. rm Q5EarnMore*.class 第 29 页共 57 页
3.5.5 运行并查看结果 运行 Q5EarnMoreThanManager 运行的员工数据路径和输出路径两个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out5 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q5EarnMoreThanManager.jar Q5EarnMoreThanManager hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out5 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out5 目录 hadoop fs -ls /class6/out5 hadoop fs -cat /class6/out5/part-r-00000 打开 part-r-00000 文件, 可以看到运行结果 : FORD 3000 第 30 页共 57 页
3.6 测试例子 6: 列出工资比公司平均工资要高的员工姓名及其工资 3.6.1 问题分析 求工资比公司平均工资要高的员工姓名及工资, 需要得到公司的平均工资和所有员工工资, 通过比较得出工资比平均工资高的员工姓名及工资 这个问题可以分两个作业进行解决, 先求出公司的平均工资, 然后与所有员工进行比较得到结果 ; 也可以在一个作业进行解决, 这里就得使用作业 setnumreducetasks 方法, 设置 Reduce 任务数为 1, 保证每次运行一个 reduce 任务, 从而能先求出平均工资, 然后进行比较得出结果 在 Mapper 阶段输出两份所有员工数据, 其中一份 key 为 0 value 为该员工工资, 另外一份 key 为 0 value 为 " 该员工姓名, 员工工资 "; 然后在 Shuffle 阶段把传过来数据按照 key 进行归组, 在该任务中有 key 值为 0 和 1 两组数据 ; 最后在 Reduce 中对 key 值 0 的所有员工求工资总数和员工数, 获得平均工资 ; 对 key 值 1, 比较员工与平均工资的大小, 输出比平均工资高的员工和对应的工资 3.6.2 处理流程图 第 31 页共 57 页
3.6.3 编写代码 import java.io.ioexception; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q6HigherThanAveSalary extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 String[] kv = value.tostring().split(","); // 获取所有员工数据, 其中 key 为 0 和 value 为该员工工资 context.write(new IntWritable(0), new Text(kv[5])); // 获取所有员工数据, 其中 key 为 0 和 value 为 ( 该员工姓名, 员工工资 ) context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5])); public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { // 定义员工工资 员工数和平均工资 private long allsalary = 0; private int allempcount = 0; private long avesalary = 0; // 定义员工工资变量 private long empsalary = 0; 第 32 页共 57 页
public void reduce(intwritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { if (0 == key.get()) { // 获取所有员工工资和员工数 allsalary += Long.parseLong(val.toString()); allempcount++; System.out.println("allEmpCount = " + allempcount); else if (1 == key.get()) { if (avesalary == 0) { avesalary = allsalary / allempcount; context.write(new Text("Average Salary = "), new Text("" + avesalary)); context.write(new Text("Following employees have salarys higher than Average:"), new Text("")); // 获取员工的平均工资 System.out.println("Employee salary = " + val.tostring()); avesalary = allsalary / allempcount; // 比较员工与平均工资的大小, 输出比平均工资高的员工和对应的工资 empsalary = Long.parseLong(val.toString().split(",")[1]); if (empsalary > avesalary) { context.write(new Text(val.toString().split(",")[0]), new Text("" + empsalary)); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q6HigherThanAveSalary"); job.setjobname("q6higherthanavesalary"); // 设置 Mapper 和 Reduce 类 job.setjarbyclass(q6higherthanavesalary.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // 必须设置 Reduce 任务数为 1 # -D mapred.reduce.tasks = 1 // 这是该作业设置的核心, 这样才能够保证各 reduce 是串行的 job.setnumreducetasks(1); 第 33 页共 57 页
// 设置输出格式类 job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(text.class); // 设置输出键和值类型 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(longwritable.class); // 第 1 个参数为员工数据路径和第 2 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args); System.exit(res); 3.6.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q5EarnMoreThanManager.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q6higherthanavesalary.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q6HigherThanAveSalary.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q6HigherThanAveSalary.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q6higherthanavesalary.jar./q6higherthan*.class mv *.jar../.. rm Q6HigherThan*.class 第 34 页共 57 页
3.6.5 运行并查看结果 运行 Q6HigherThanAveSalary 运行的员工数据路径和输出路径两个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out6 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q6HigherThanAveSalary.jar Q6HigherThanAveSalary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out6 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out6 目录 hadoop fs -ls /class6/out6 hadoop fs -cat /class6/out6/part-r-00000 打开 part-r-00000 文件, 可以看到运行结果 : Average Salary = 2077 Following employees have salarys higher than Average: 第 35 页共 57 页
FORD 3000 CLARK 2450 KING 5000 JONES 2975 BLAKE 2850 3.7 测试例子 7: 列出名字以 J 开头的员工姓名及其所属部门名称 3.7.1 问题分析 求名字以 J 开头的员工姓名机器所属部门名称, 只需判断员工姓名是否以 J 开头 首先和问题 1 类似在 Mapper 的 Setup 阶段缓存部门数据, 然后在 Mapper 阶段判断员工姓名是否以 J 开头, 如果是抽取出员工姓名和员工所在部门编号, 利用缓存部门数据把部门编号对应为部门名称, 转换后输出结果 3.7.2 处理流程图 第 36 页共 57 页
3.7.3 编写代码 import java.io.bufferedreader; import java.io.filereader; import java.io.ioexception; import java.util.hashmap; import java.util.map; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.filecache.distributedcache; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q7NameDeptOfStartJ extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { // 用于缓存 dept 文件中的数据 private Map<String, String> deptmap = new HashMap<String, String>(); private String[] kv; // 此方法会在 Map 方法执行之前执行且执行一次 @Override protected void setup(context context) throws IOException, InterruptedException { BufferedReader in = null; try { // 从当前作业中获取要缓存的文件 Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration()); String deptidname = null; for (Path path : paths) { // 对部门文件字段进行拆分并缓存到 deptmap 中 if (path.tostring().contains("dept")) { in = new BufferedReader(new FileReader(path.toString())); while (null!= (deptidname = in.readline())) { 第 37 页共 57 页
// 对部门文件字段进行拆分并缓存到 deptmap 中 // 其中 Map 中 key 为部门编号,value 为所在部门名称 deptmap.put(deptidname.split(",")[0], deptidname.split(",")[1]); catch (IOException e) { e.printstacktrace(); finally { try { if (in!= null) { in.close(); catch (IOException e) { e.printstacktrace(); public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 kv = value.tostring().split(","); // 输出员工姓名为 J 开头的员工信息,key 为员工姓名和 value 为员工所在部门名称 if (kv[1].tostring().trim().startswith("j")) { context.write(new Text(kv[1].trim()), new Text(deptMap.get(kv[7].trim()))); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q7NameDeptOfStartJ"); job.setjobname("q7namedeptofstartj"); // 设置 Mapper 和 Reduce 类 job.setjarbyclass(q7namedeptofstartj.class); job.setmapperclass(mapclass.class); // 设置输入格式类 job.setinputformatclass(textinputformat.class); 第 38 页共 57 页
// 设置输出格式类 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); // 第 1 个参数为缓存的部门数据路径 第 2 个参数为员工数据路径和第 3 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getconfiguration()); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args); System.exit(res); 3.7.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q7NameDeptOfStartJ.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q7namedeptofstartj.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q7NameDeptOfStartJ.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q7NameDeptOfStartJ.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q7namedeptofstartj.jar./q7namedept*.class mv *.jar../.. rm Q7NameDept*.class 第 39 页共 57 页
3.7.5 运行并查看结果 运行 Q7NameDeptOfStartJ 时需要输入部门数据路径 员工数据路径和输出路径三个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 部门数据路径 :hdfs://hadoop:9000/class6/input/dept, 部门数据将缓存在各运行任务的节点内容中, 可以提供处理的效率 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out7 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q7NameDeptOfStartJ.jar Q7NameDeptOfStartJ hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out7 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out7 目录 hadoop fs -ls /class6/out7 hadoop fs -cat /class6/out7/part-r-00000 打开 part-r-00000 文件, 可以看到运行结果 : JAMES SALES 第 40 页共 57 页
JONES RESEARCH 3.8 测试例子 8: 列出工资最高的头三名员工姓名及其工资 3.8.1 问题分析 求工资最高的头三名员工姓名及工资, 可以通过冒泡法得到 在 Mapper 阶段输出经理数据和员工对应经理表数据, 其中经理数据 key 为 0 值 value 为 " 员工姓名, 员工工资 "; 最后在 Reduce 中通过冒泡法遍历所有员工, 比较员工工资多少, 求出前三名 3.8.2 处理流程图 3.8.3 编写代码 import java.io.ioexception; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; 第 41 页共 57 页
import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q8SalaryTop3Salary extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 String[] kv = value.tostring().split(","); // 输出 key 为 0 和 value 为员工姓名 +","+ 员工工资 context.write(new IntWritable(0), new Text(kv[1].trim() + "," + kv[5].trim())); public static class Reduce extends Reducer<IntWritable, Text, Text, Text> { public void reduce(intwritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 定义工资前三员工姓名 String empname; String firstempname = ""; String secondempname = ""; String thirdempname = ""; // 定义工资前三工资 long empsalary = 0; long firstempsalary = 0; long secondempsalary = 0; long thirdempsalary = 0; // 通过冒泡法遍历所有员工, 比较员工工资多少, 求出前三名 for (Text val : values) { empname = val.tostring().split(",")[0]; empsalary = Long.parseLong(val.toString().split(",")[1]); 第 42 页共 57 页
if(empsalary > firstempsalary) { thirdempname = secondempname; thirdempsalary = secondempsalary; secondempname = firstempname; secondempsalary = firstempsalary; firstempname = empname; firstempsalary = empsalary; else if (empsalary > secondempsalary) { thirdempname = secondempname; thirdempsalary = secondempsalary; secondempname = empname; secondempsalary = empsalary; else if (empsalary > thirdempsalary) { thirdempname = empname; thirdempsalary = empsalary; // 输出工资前三名信息 context.write(new Text( "First employee name:" + firstempname), new Text("Salary:" + firstempsalary)); context.write(new Text( "Second employee name:" + secondempname), new Text("Salary:" + secondempsalary)); context.write(new Text( "Third employee name:" + thirdempname), new Text("Salary:" + thirdempsalary)); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q8SalaryTop3Salary"); job.setjobname("q8salarytop3salary"); // 设置 Mapper 和 Reduce 类 job.setjarbyclass(q8salarytop3salary.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(text.class); // 设置输入格式类 job.setinputformatclass(textinputformat.class); // 设置输出格式类 job.setoutputkeyclass(text.class); 第 43 页共 57 页
job.setoutputformatclass(textoutputformat.class); job.setoutputvalueclass(text.class); // 第 1 个参数为员工数据路径和第 2 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args); System.exit(res); 3.8.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q8SalaryTop3Salary.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q8salarytop3salary.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q8SalaryTop3Salary.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q8SalaryTop3Salary.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q8salarytop3salary.jar./q8salarytop3*.class mv *.jar../.. rm Q8SalaryTop3*.class 第 44 页共 57 页
3.8.5 运行并查看结果 运行 Q8SalaryTop3Salary 运行的员工数据路径和输出路径两个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out8 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q8SalaryTop3Salary.jar Q8SalaryTop3Salary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out8 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out8 目录 hadoop fs -ls /class6/out8 hadoop fs -cat /class6/out8/part-r-00000 打开 part-r-00000 文件, 可以看到运行结果 : First employee name:king Salary:5000 Second employee name:ford Salary:3000 第 45 页共 57 页
Third employee name:jones Salary:2975 3.9 测试例子 9: 将全体员工按照总收入 ( 工资 + 提成 ) 从高到低排列 3.9.1 问题分析 求全体员工总收入降序排列, 获得所有员工总收入并降序排列即可 在 Mapper 阶段输出所有员工总工资数据, 其中 key 为员工总工资 value 为员工姓名, 在 Mapper 阶段的最后会先调用 job.setpartitionerclass 对数据进行分区, 每个分区映射到一个 reducer, 每个分区内又调用 job.setsortcomparatorclass 设置的 key 比较函数类排序 由于在本作业中 Map 的 key 只有 0 值, 故能实现对所有数据进行排序 3.9.2 处理流程图 3.9.3 编写代码 import java.io.ioexception; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; 第 46 页共 57 页
import org.apache.hadoop.io.text; import org.apache.hadoop.io.writablecomparable; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q9EmpSalarySort extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 String[] kv = value.tostring().split(","); // 输出 key 为员工所有工资和 value 为员工姓名 int empallsalary = "".equals(kv[6])? Integer.parseInt(kv[5]) : Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]); context.write(new IntWritable(empAllSalary), new Text(kv[1])); /** * 递减排序算法 */ public static class DecreaseComparator extends IntWritable.Comparator { public int compare(writablecomparable a, WritableComparable b) { return -super.compare(a, b); public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q9EmpSalarySort"); job.setjobname("q9empsalarysort"); 第 47 页共 57 页
// 设置 Mapper 和 Reduce 类 job.setjarbyclass(q9empsalarysort.class); job.setmapperclass(mapclass.class); // 设置输出格式类 job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(text.class); job.setsortcomparatorclass(decreasecomparator.class); // 第 1 个参数为员工数据路径和第 2 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args); System.exit(res); 3.9.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q9EmpSalarySort.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q9empsalarysort.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q9EmpSalarySort.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q9EmpSalarySort.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q9empsalarysort.jar./q9empsalary*.class mv *.jar../.. rm Q9EmpSalary*.class 第 48 页共 57 页
3.9.5 运行并查看结果 运行 Q9EmpSalarySort 运行的员工数据路径和输出路径两个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out9 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q9EmpSalarySort.jar Q9EmpSalarySort hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out9 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out9 目录 hadoop fs -ls /class6/out9 hadoop fs -cat /class6/out9/part-r-00000 打开 part-r-00000 文件, 可以看到运行结果 : 5000 KING 3000 FORD 2975 JONES 第 49 页共 57 页
2850 BLAKE... 3.10 测试例子 10: 求任何两名员工信息传递所需要经过的中间节点数 3.10.1 问题分析 该公司所有员工可以形成入下图的树形结构, 求两个员工的沟通的中间节点数, 可转换在员工树中求两个节点连通所经过的节点数, 即从其中一节点到汇合节点经过节点数加上另一节点到汇合节点经过节点数 例如求 M 到 Q 所需节点数, 可以先找出 M 到 A 经过的节点数, 然后找出 Q 到 A 经过的节点数, 两者相加得到 M 到 Q 所需节点数 在作业中首先在 Mapper 阶段所有员工数据, 其中经理数据 key 为 0 值 value 为 " 员工编号, 员工经理编号 ", 然后在 Reduce 阶段把所有员工放到员工列表和员工对应经理链表 Map 中, 最后在 Reduce 的 Cleanup 中按照上面说所算法对任意两个员工计算出沟通的路径长度并输出 第 50 页共 57 页
3.10.2 处理流程图 3.10.3 编写代码 import java.io.ioexception; import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.genericoptionsparser; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; public class Q10MiddlePersonsCountForComm extends Configured implements Tool { 第 51 页共 57 页
public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { // 对员工文件字段进行拆分 String[] kv = value.tostring().split(","); // 输出 key 为 0 和 value 为员工编号 +","+ 员工经理编号 context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3])? " " : kv[3]))); public static class Reduce extends Reducer<IntWritable, Text, NullWritable, Text> { // 定义员工列表和员工对应经理 Map List<String> employeelist = new ArrayList<String>(); Map<String, String> employeetomanagermap = new HashMap<String, String>(); public void reduce(intwritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 在 reduce 阶段把所有员工放到员工列表和员工对应经理 Map 中 for (Text value : values) { employeelist.add(value.tostring().split(",")[0].trim()); employeetomanagermap.put(value.tostring().split(",")[0].trim(), value.tostring().split(",")[1].trim()); @Override protected void cleanup(context context) throws IOException, InterruptedException { int totalemployee = employeelist.size(); int i, j; int distance; System.out.println(employeeList); System.out.println(employeeToManagerMap); // 对任意两个员工计算出沟通的路径长度并输出 for (i = 0; i < (totalemployee - 1); i++) { for (j = (i + 1); j < totalemployee; j++) { distance = calculatedistance(i, j); String value = employeelist.get(i) + " and " + employeelist.get(j) + " = " + distance; context.write(nullwritable.get(), new Text(value)); 第 52 页共 57 页
/** * 该公司可以由所有员工形成树形结构, 求两个员工的沟通的中间节点数, 可以转换在员工树中两员工之间的距离 * 由于在树中任意两点都会在某上级节点汇合, 根据该情况设计了如下算法 */ private int calculatedistance(int i, int j) { String employeea = employeelist.get(i); String employeeb = employeelist.get(j); int distance = 0; // 如果 A 是 B 的经理, 反之亦然 if (employeetomanagermap.get(employeea).equals(employeeb) employeetomanagermap.get(employeeb).equals(employeea)) { distance = 0; // A 和 B 在同一经理下 else if (employeetomanagermap.get(employeea).equals( employeetomanagermap.get(employeeb))) { distance = 0; else { // 定义 A 和 B 对应经理链表 List<String> employeea_managerlist = new ArrayList<String>(); List<String> employeeb_managerlist = new ArrayList<String>(); // 获取从 A 开始经理链表 employeea_managerlist.add(employeea); String current = employeea; while (false == employeetomanagermap.get(current).isempty()) { current = employeetomanagermap.get(current); employeea_managerlist.add(current); // 获取从 B 开始经理链表 employeeb_managerlist.add(employeeb); current = employeeb; while (false == employeetomanagermap.get(current).isempty()) { current = employeetomanagermap.get(current); employeeb_managerlist.add(current); int ii = 0, jj = 0; String currenta_manager, currentb_manager; boolean found = false; // 遍历 A 与 B 开始经理链表, 找出汇合点计算 for (ii = 0; ii < employeea_managerlist.size(); ii++) { 第 53 页共 57 页
currenta_manager = employeea_managerlist.get(ii); for (jj = 0; jj < employeeb_managerlist.size(); jj++) { currentb_manager = employeeb_managerlist.get(jj); if (currenta_manager.equals(currentb_manager)) { found = true; break; if (found) { break; // 最后获取两只之前的路径 distance = ii + jj - 1; return distance; @Override public int run(string[] args) throws Exception { // 实例化作业对象, 设置作业名称 Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm"); job.setjobname("q10middlepersonscountforcomm"); // 设置 Mapper 和 Reduce 类 job.setjarbyclass(q10middlepersonscountforcomm.class); job.setmapperclass(mapclass.class); job.setreducerclass(reduce.class); // 设置 Mapper 输出格式类 job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(text.class); // 设置 Reduce 输出键和值类型 job.setoutputformatclass(textoutputformat.class); job.setoutputkeyclass(nullwritable.class); job.setoutputvalueclass(text.class); // 第 1 个参数为员工数据路径和第 2 个参数为输出路径 String[] otherargs = new GenericOptionsParser(job.getConfiguration(), args).getremainingargs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 第 54 页共 57 页
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitforcompletion(true); return job.issuccessful()? 0 : 1; /** * 主方法, 执行入口 * @param args 输入参数 */ public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args); System.exit(res); 3.10.4 编译并打包代码 进入 /app/hadoop-1.1.2/myclass/class6 目录中新建 Q10MiddlePersonsCountForComm.java 程序代码 ( 代码页可以使用 /home/shiyanlou/install-pack/class6/q10middlepersonscountforcomm.java 文件 ) cd /app/hadoop-1.1.2/myclass/class6 vi Q10MiddlePersonsCountForComm.java 编译代码 javac -classpath../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q10MiddlePersonsCountForComm.java 把编译好的代码打成 jar 包, 如果不打成 jar 形式运行会提示 class 无法找到的错误 jar cvf./q10middlepersonscountforcomm.jar./q10middlepersons*.class mv *.jar../.. rm Q10MiddlePersons*.class 第 55 页共 57 页
3.10.5 运行并查看结果 运行 Q10MiddlePersonsCountForComm 运行的员工数据路径和输出路径两个参数, 需要注意的是 hdfs 的路径参数路径需要全路径, 否则运行会报错 : 员工数据路径 :hdfs://hadoop:9000/class6/input/emp 输出路径 :hdfs://hadoop:9000/class6/out10 运行如下命令 : cd /app/hadoop-1.1.2 hadoop jar Q10MiddlePersonsCountForComm.jar Q10MiddlePersonsCountForComm hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out10 运行成功后, 刷新 CentOS HDFS 中的输出路径 /class6/out10 目录 hadoop fs -ls /class6/out10 hadoop fs -cat /class6/out10/part-r-00000 第 56 页共 57 页
打开 part-r-00000 文件, 可以看到运行结果 : 7369 and 7499 = 4 7369 and 7521 = 4 7369 and 7566 = 1 7369 and 7654 = 4 7369 and 7698 = 3... 第 57 页共 57 页