前 言

Similar documents
三种方法实现Hadoop(MapReduce)全局排序(1)

使用MapReduce读取XML文件

三种方法实现Hadoop(MapReduce)全局排序(2)

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

目录 1 本期内容 MapReduce 理论简介 MapReduce 编程模型 MapReduce 处理过程 运行 WordCount 程序 准备工作 运行例子

MapReduce

7521,WARD,SALESMAN,7698,22-2 月 -81,1250,500, ,JONES,MANAGER,7839,02-4 月 -81,2975,, ,MARTIN,SALESMAN,7698,28-9 月 -81,1250,1400, ,BLAK

Guava学习之Resources

SDK 概要 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk" 获取不同版本的 Java SDK: 包名 odps-sdk-core odps-sdk-commons odps-sdk-udf odps-sdk-mapred odps-sdk-graph 描述 ODPS 基

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

OOP with Java 通知 Project 4: 4 月 19 日晚 9 点

javaexample-02.pdf

Microsoft Word - 01.DOC

Microsoft PowerPoint - hbase_program(0201).ppt

JavaIO.PDF

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

Hadoop_Jordan

chp6.ppt

Hadoop 集群 ( 第 9 期 ) MapReduce 初级案例 1 数据去重 数据去重 主要是为了掌握和利用并行化思想来对数据进行有意义的筛选 统计大 数据集上的数据种类个数 从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据 去重 下面就进入这个实例的 MapReduce 程序设计 1.

untitled

C++ 程序设计 告别 OJ1 - 参考答案 MASTER 2019 年 5 月 3 日 1

OOP with Java 通知 Project 4: 5 月 2 日晚 9 点

通过Hive将数据写入到ElasticSearch

培 训 机 构 介 绍 中 科 普 开 是 国 内 首 家 致 力 于 IT 新 技 术 领 域 的 领 航 者, 专 注 于 云 计 算 大 数 据 物 联 网 移 动 互 联 网 技 术 的 培 训, 也 是 国 内 第 一 家 开 展 Hadoop 云 计 算 的 培

Java

EJB-Programming-4-cn.doc

OOP with Java 通知 Project 3: 3 月 29 日晚 9 点 4 月 1 日上课

Hive:用Java代码通过JDBC连接Hiveserver

PowerPoint 演示文稿

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

chap07.key

untitled

全国计算机技术与软件专业技术资格(水平)考试

帝国CMS下在PHP文件中调用数据库类执行SQL语句实例

C/C++ - 文件IO

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

"+handlermethod.getbean().getclass().getname()); public void aftercompletion(httpservletrequest req, HttpServletResponse resp, Object handler, Excepti


3.1 num = 3 ch = 'C' 2

Microsoft Word - Broker.doc

前言 C# C# C# C C# C# C# C# C# microservices C# More Effective C# More Effective C# C# C# C# Effective C# 50 C# C# 7 Effective vii

《大话设计模式》第一章

Microsoft Word - ch04三校.doc

软件工程文档编制

Chapter 9: Objects and Classes

无类继承.key

水晶分析师

操作指南产品文档

1 Framework.NET Framework Microsoft Windows.NET Framework.NET Framework NOTE.NET NET Framework.NET Framework 2.0 ( 3 ).NET Framework 2.0.NET F

RUN_PC連載_12_.doc

Flume-ng与Mysql整合开发

Fun Time (1) What happens in memory? 1 i n t i ; 2 s h o r t j ; 3 double k ; 4 char c = a ; 5 i = 3; j = 2; 6 k = i j ; H.-T. Lin (NTU CSIE) Referenc

IoC容器和Dependency Injection模式.doc

PowerPoint 演示文稿

内 容 简 介 本 书 是 一 本 关 于 语 言 程 序 设 计 的 教 材, 涵 盖 了 语 言 的 基 本 语 法 和 编 程 技 术, 其 中 包 含 了 作 者 对 语 言 多 年 开 发 经 验 的 总 结, 目 的 是 让 初 学 的 读 者 感 受 到 语 言 的 魅 力, 并 掌

java2d-4.PDF

Outline USB Application Requirements Variable Definition Communications Code for VB Code for Keil C Practice

开放数据处理服务 ODPS 批量数据通道

CC213

untitled

在Spring中使用Kafka:Producer篇

主程式 : public class Main3Activity extends AppCompatActivity { ListView listview; // 先整理資料來源,listitem.xml 需要傳入三種資料 : 圖片 狗狗名字 狗狗生日 // 狗狗圖片 int[] pic =new

1 1 大概思路 创建 WebAPI 创建 CrossMainController 并编写 Nuget 安装 microsoft.aspnet.webapi.cors 跨域设置路由 编写 Jquery EasyUI 界面 运行效果 2 创建 WebAPI 创建 WebAPI, 新建 -> 项目 ->

Learning Java

9, : Java 19., [4 ]. 3 Apla2Java Apla PAR,Apla2Java Apla Java.,Apla,,, 1. 1 Apla Apla A[J ] Get elem (set A) A J A B Intersection(set A,set B) A B A B

概述

res/layout 目录下的 main.xml 源码 : <?xml version="1.0" encoding="utf 8"?> <TabHost android:layout_height="fill_parent" xml

Transcription:

分布式计算框架 MapReduce 本章重点 : MapReduce 编程模型 使用 MapReduce 开发常用的功能 本章目标 : 了解 MapReduce 是什么掌握 MapReduce 编程模型掌握 MapReduce 中常见核心 API 的编程掌握使用 MapReduce 开发常用的功能

Hadoop&Spark 大数据开发实战 本章任务 学习本章, 需要完成以下 个工作任务 请记录下来学习过程中所遇到的问题, 可以通过自己 的努力或访问 kgc.cn 解决 任务 1:MapReduce 编程模型 理解并掌握 MapReduce 的编程模型, 进一步加深对大数据并行计算模型的理解与思考 任务 2:MapReduce 进阶 通过学习 MapReduce 各个组件的概念和原理加深对 MapReduce 底层原理和计算模型的掌握 任务 :MapReduce 高级编程 掌握 MapReduce 开发常用的应用, 例如 Join 排序 二次排序 合并小文件等 任务 1 MapReduce 编程模型 关键步骤如下 : MapReduce 是什么, 适合做什么, 不适合做什么 MapReduce 中 map 和 reduce 方法的功能 开发 MapReduce 版本的 wordcount 程序并提交到集群运行 1.1 MapReduce 概述 1.1.1 MapReduce 是什么 MapReduce 是 Google 开源的一项重要技术, 首先它是一个编程模型, 用以进行大数据量的计算 对于大数据量的计算, 通常采用的处理方式就是并行计算 但对许多开发者来说, 自己完完全全实现一个并行计算程序难度太大, 而 MapReduce 就是一种简化并行计算的编程模型, 它使得那些没有多少并行计算经验的开发人员也可以开发并行应用程序 这也就是 MapReduce 的价值所在, 通过简化编程模型, 降低了开发并行应用程序的入门门槛 1.1.2 MapReduce 设计目标 2

分布式计算框架 MapReduce MapReduce 的设计目标是方便编程人员在不熟悉分布式并行编程的情况下, 将自己的程序运行 在分布式系统上 MapReduce 采用的是 分而治之 的思想, 把对大规模数据集的操作, 分发给一 个主节点管理下的各个子节点共同完成, 然后整合各个子节点的中间结果, 得到最终的计算结果 简而言之,MapReduce 就是 分散任务, 汇总结果 1.1. MapReduce 特点 1) MapReduce 易于编程 它简单的实现一些接口, 就可以完成一个分布式程序, 这个分布式 程序可以分布到大量廉价的 PC 机器运行 也就是说你写一个分布式程序, 跟写一个简单 的串行程序是一模一样的 就是因为这个特点使得 MapReduce 编程变得非常流行 2) 良好的扩展性 当你的计算资源不能得到满足的时候, 你可以通过简单的增加机器来扩展 它的计算能力 ) 高容错性 MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上, 这就要求它 具有很高的容错性 比如其中一台机器挂了, 它可以把上面的计算任务转移到另外一个节 点上面上运行, 不至于这个任务运行失败, 而且这个过程不需要人工参与, 而完全是由 Hadoop 内部完成的 4) 能对 PB 级以上海量数据的进行离线处理 适合离线处理而不适合实时处理, 比如像毫秒级 别的返回一个结果,MapReduce 很难做到 1.1.4 MapReduce 不擅长的场景 易于编程 MapReduce 虽然具有很多的优势, 但是它也有不擅长的地方 这里的不擅长不代表它 不能做, 而是在有些场景下实现的效果差, 并不适合 MapReduce 来处理, 主要表现在以下几个方 面 1) 实时计算 :MapReduce 无法像 MySQL 一样, 在毫秒或者秒级内返回结果 2) 流式计算 : 流式计算的输入数据时动态的, 而 MapReduce 的输入数据集是静态的, 不能动 态变化 这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的 ) DAG( 有向图 ) 计算 : 多个应用程序存在依赖关系, 后一个应用程序的输入为前一个的输 出 在这种情况下,MapReduce 并不是不能做, 而是使用后, 每个 MapReduce 作业的输出 结果都会写入到磁盘, 会造成大量的磁盘 IO, 导致性能非常的低下 第章 1.2 MapReduce 编程模型 1.2.1 编程模型概述从 MapReduce 自身的命名特点可以看出,MapReduce 由两个阶段组成 :Map 和 Reduce 用户只需编写 map() 和 reduce() 两个函数, 即可完成简单的分布式程序的设计 map() 函数以 key/value 对作为输入, 产生另外一系列 key/value 对作为中间输出写入本地磁盘 MapReduce 框架会自动将这些中间数据按照 key 值进行聚集, 且 key 值相同 ( 用户可设定聚集策略, 默认情况下是对 key 值进行哈希取模 ) 的数据被统一交给 reduce() 函数处理

Hadoop&Spark 大数据开发实战 reduce() 函数以 key 及对应的 value 列表作为输入, 经合并 key 相同的 value 值后, 产生另外一系 列 key/value 对作为最终输出写入 HDFS MapReduce 将作业 ( 一个 MapReduce 应用程序 ) 的整个运行过程分为两个阶段 :Map 阶段和 Reduce 阶段 图 1.1 MapReduce 编程模型 1) Map 阶段由一定数量的 Map Task 组成 ; 输入数据格式解析 :InputFormat( 把输入文件进行分片 ) 输入数据处理 :Mapper 数据分组 :Partitioner 2) Reduce 阶段由一定数量的 Reduce Task 组成 ; 数据远程拷贝 ( 从 Map Task 的输出拷贝部分数据 ) 数据按照 key 排序和分组 (key 相同的都挨在一起, 按照 key 进行分组操作, 每一组交由 reducer 进行处理 ) 处理处理 :Reducer 数据输出格式 :OutputFormat ( 输出文件格式, 分隔符等的设置 ) 1.2.2 编程模型三步曲 1)Input: 一系列 k1/v1 对 2)Map 和 Reduce: Map:(k1,v1) --> list(k2,v2),reduce:(k2, list(v2)) --> list(k,v) 其中 :k2/v2 是中间结果对 )Output: 一系列 (k,v) 对 4

1. MapReduce WordCount 编程实例 分布式计算框架 MapReduce 第章 通过上一节的学习, 我们已经理解了 MapReduce 的基本编程模型, 为了加深对 MapReduce 的理解, 本节将以一个 WordCount 程序来详细解释 MapReduce 模型 一个最简单的 MapReduce 应用程序至少包含 个部分 : 一个 Map 函数 一个 Reduce 函数和一个 main 函数 在运行一个 MapReduce 计算任务时候, 任务过程被分为两个阶段 :map 阶段和 reduce 阶段, 每个阶段都是用键值对 (key/value) 作为输入 (input) 和输出 (output),main 函数将作业控制和文件输入 / 输出结合起来 我们一起来看看 WordCount 程序的需求 : 现在有大量的文件, 每个文件又有大量的单词, 要求 统计每个单词出现的词频 1..1 WordCount 实现设计分析 1) Map 过程 : 并行读取文本, 对读取的单词进行 map 操作, 每个词都以 <key,value> 形式生成 读取第一行 Hello World Bye World, 分割单词形成 Map <Hello,1> <World,1> <Bye,1> <World,1> 读取第二行 Hello Hadoop Bye Hadoop, 分割单词形成 Map <Hello,1> <Hadoop,1> <Bye,1> <Hadoop,1> 读取第三行 Bye Hadoop Hello Hadoop, 分割单词形成 Map <Bye,1> <Hadoop,1> <Hello,1> <Hadoop,1> 2) Reduce 操作是对 map 的结果进行排序, 合并, 最后得出词频 reduce 将形成的 Map 根据相同的 key 组合成 value 数组 <Bye,1,1,1> <Hadoop,1,1,1,1> <Hello,1,1,1> <World,1,1> 循环执行 Reduce(K,V[]), 分别统计每个单词出现的次数,<Bye,> <Hadoop,4> <Hello,> <World,2> 1..2 WordCount 代码开发在上面我们已经详细描述了 MapReduce 的编程模型, 接下来, 请各位跟我一起使用 Maven 来完成相关程序的开发, 至于开发工具, 大家可以根据自己的爱好选择,Eclipse 或者 IDEA 均可 为了方便大家理解, 重要程序处标注了相关解释! 1) 新建工程, 添加 Maven 依赖 <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>${hadoop.version</version> 5

Hadoop&Spark 大数据开发实战 </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version>${hadoop.version</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-mapreduce-client-core</artifactid> <version>${hadoop.version</version> </dependency> 2) 完整的 WordCount 程序代码 package com.kgc.bigdata.hadoop.mapreduce.wordcount; import com.kgc.bigdata.hadoop.mapreduce.partitioner.partitionerapp; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import java.io.ioexception; import java.net.uri; import java.util.stringtokenizer; /** 6

分布式计算框架 MapReduce 第章 * WordCount 的 MapReduce 实现 */ public class WordCountApp { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasmoretokens()) { word.set(itr.nexttoken()); context.write(word, one); public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); result.set(sum); context.write(key, result); public static void main(string[] args) throws Exception { String INPUT_PATH = "hdfs://hadoop000:8020/wc"; String OUTPUT_PATH = "hdfs://hadoop000:8020/outputwc"; Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH), conf); 7

Hadoop&Spark 大数据开发实战 if (filesystem.exists(new Path(OUTPUT_PATH))) { filesystem.delete(new Path(OUTPUT_PATH), true); Job job = Job.getInstance(conf, "WordCountApp"); // run jar class job.setjarbyclass(wordcountapp.class); // 设置 map job.setmapperclass(mymapper.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); // 设置 reduce job.setreducerclass(myreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); // 设置 input formart job.setinputformatclass(textinputformat.class); Path inputpath = new Path(INPUT_PATH); FileInputFormat.addInputPath(job, inputpath); // 设置 output format job.setoutputformatclass(textoutputformat.class); Path outputpath = new Path(OUTPUT_PATH); FileOutputFormat.setOutputPath(job, outputpath); // 提交 job System.exit(job.waitForCompletion(true)? 0 : 1); 代码 1.1 WordCount 代码实现 1.. WordCount 代码说明 1) 对于 map 函数的方法 public void map(object key, Text value, Context context) 8

分布式计算框架 MapReduce 继承 Mapper 类, 实现 map 方法, 这里有三个参数, 前面两个 Object key, Text value 就是输入的 key 和 value, 第三个参数 Context context 记录的是整个上下文, 比如我们可以通过 context 将数据写 出去 第章 2) 对于 reduce 函数的方法 public void reduce(text key, Iterable<IntWritable> values, Context context) 继承 Reducer 类, 实现 reduce 方法,reduce 函数的输入也是一个 key/value 的形式, 不过它的 value 是一个迭代器的形式 Iterable<IntWritable> values, 也就是说 reduce 的输入是一个 key 对应一组的值的 value,reduce 也有 context 和 map 的 context 作用一致 ) 对于 main 函数的调用 创建 Configuration 类 :Configuration conf = new Configuration(); 运行 MapReduce 程序前都要初始化 Configuration, 该类主要是读取 MapReduce 系统配置信息 创建 Job 类 : Job job = Job.getInstance(conf, "word count"); job.setjarbyclass(wordcount.class); job.setmapperclass(tokenizermapper.class); job.setreducerclass(intsumreducer.class); 第一行就是在构建一个 job, 有两个参数, 一个是 conf, 另外一个是这个 job 的名称 第二行就是设置我们自己开发的 MapReduce 类 ; 第三行和第四行就是设置 map 函数和 reduce 函数实现类 设置输出的键值对的类型 : job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); 这个是定义输出的 key/value 的类型, 也就是最终存储在 hdfs 上结果文件的 key/value 的类型 设置 Job 的输入输出路径并提交到集群运行 : FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0 : 1); 第一行就是构建输入的数据文件, 第二行是构建输出的数据文件, 最后一行如果 job 运行成功了, 我们的程序就会正常退出 1..4 WordCount 提交到集群运行在这里我们第一次接触 MapReduce, 因此这里以伪分布式集群的方式运行 WordCount 程序! 感兴趣的朋友也可以在完全分布式集群中运行, 具体过程笔者在此不再讨论 ( 其实是一模一样的 ) 操作步骤如下所示 : 9

Hadoop&Spark 大数据开发实战 1) 使用 mvn clean package -DskipTests 打成 hadoop-1.0-snapshot.jar, 然后上传到 /home/hadoop/lib 目录下面 ; 2) 将测试数据上传到 HDFS 目录中 hadoop fs -mkdir /wc hadoop fs -put hello.txt /wc ) 提交 MapReduce 作业到集群运行 hadoop jar /home/hadoop/lib/hadoop-1.0-snapshot.jar com.kgc.bigdata.hadoop.mapreduce.wordcount.wordcountapp 4) 查看作业输出结果 hadoop fs -text /outputwc/part-* hello welcome 1 world 2 至此, 在学习了以上相关知识后, 任务 1 就可以完成了 任务 2 MapReduce 进阶 关键步骤如下 : MapReduce 类型 MapReduce 输入格式 MapReduce 输出格式 MapReduce 中的 Combiner Partitioner RecordReader 的使用 2.1 MapReduce 类型 2.1.1 MapReduce 类型概述 使用 Hadoop 中的 MapReduce 编程模型处理过程非常简单, 只需要定义好 map 和 reduce 函数的 输入和输出是键值对的类型即可, 那么我们来看看各种数据类型是如何在 MapReduce 中使用的 MapReduce 中的 map 和 reduce 函数需要遵循如下的格式 : map: (K1,V1) -> list(k2,v2) 10

reduce: (K2, list(v2)) -> list(k,v) 分布式计算框架 MapReduce 从这个需要遵循的格式我们可以看出 :reduce 函数的输入类型必须与 map 函数的输出类型一致 2.1.2 MapReduce 中常用的设置 1) 输入数据类型由输入格式 (InputFormat) 设置 比如 :TextInputFormat 的 Key 的类型就是 LongWritable,Value 的类型是 Text; 2)map 的输出的 Key 的类型通过 setmapoutputkeyclass 设置,Value 的类型通过 setmapoutputvalueclass 设置 ; ) reduce 的输出的 Key 的类型通过 setoutputkeyclass 设置,Value 的类型通过 setoutputvalueclass 设置 ; 第章 2.2 MapReduce 输入格式 MapReduce 处理的数据文件, 一般情况下输入文件一般是存储在 HDFS 里面 这些文件的格式可以是任意的 : 我们可以使用基于行的日志文件, 也可以使用二进制格式, 多行输入记录或者其它一些格式 这些文件一般会很大, 达到数十 GB, 甚至更大 那么 MapReduce 是如何读取这些数据的呢 2.2.1 InputFormat 接口 InputFormat 接口决定了输入文件如何被 Hadoop 分块 InputFormat 能够从一个 job 中得到一个 split 集合 (InputSplit[]), 然后再为这个 split 集合配上一个合适的 RecordReader(getRecordReader) 来读取每个 split 中的数据 下面我们来看一下 InputFormat 接口由哪些抽象方法组成 public interface InputFormat<K, V> { InputSplit[] getsplits(jobconf job, int numsplits) throws IOException; RecordReader<K, V> getrecordreader(inputsplit split, JobConf job, Reporter reporter) throws IOException; 方法作用说明 : 1)getSplits(JobContext context) 方法负责将一个大数据逻辑分成许多片 比如数据库表有 100 条数据, 按照主键 ID 升序存储 假设每 20 条分成一片, 这个 List 的大小就是 5, 然后每个 InputSplit 记录两个参数, 第一个为这个分片的起始 ID, 第二个为这个分片数据的大小, 这里是 20. 很明显 InputSplit 并没有真正存储数据 只是提供了一个如何将数据分片的方法 2)createRecordReader(InputSplit split,taskattemptcontext context) 方法根据 InputSplit 定义的方 11

Hadoop&Spark 大数据开发实战 法, 返回一个能够读取分片记录的 RecordReader getsplit 用来获取由输入文件计算出来的 InputSplit, 后面会看到计算 InputSplit 时, 会考虑输入文件是否可分割 文件存储时分块的大小和文件大小等因素 ; 而 createrecordreader() 提供了前面说的 RecordReader 的实现, 将 Key-Value 对从 InputSplit 中正确读出来, 比如 LineRecordReader, 它是以偏移值为 Key, 每行的数据为 Value, 这使所有 createrecordreader() 返回 LineRecordReader 的 InputFormat 都是以偏移值为 Key, 每行数据为 Value 的形式读取输入分片的 2.1.2 InputFormat 接口实现类 InputFormat 接口实现类有很多, 其层次结构如下图所示 图 1.2 InputFormat 实现类 常用的 InputFormat 实现类介绍 : 1) FileInputFormat FileInputFormat 是所有使用文件作为其数据源的 InputFormat 实现的基类, 它的主要作用是指出作业的输入文件位置 因为作业的输入被设定为一组路径, 这对指定作业输入提供了很强的灵活性 FileInputFormat 提供了四种静态方法来设定 Job 的输入路径 : 12

分布式计算框架 MapReduce 第章 public static void addinputpath(job job,path path); public static void addinputpaths(job job,string commaseparatedpaths); public static void setinputpaths(job job,path... inputpaths); public static void setinputpaths(job job,string commaseparatedpaths); 2) KeyValueTextInputFormat 每一行均为一条记录, 被分隔符 ( 缺省是 tab) 分割为 key(text),value(text) 可以通过 mapreduce.input.keyvaluelinerecordreader.key.value,separator 属性 ( 或者旧版本 API 中的 key.value.separator.in.input.line) 来设定分隔符 2. MapReduce 输出格式 针对前面介绍的输入格式,Hadoop 都有相应的输出格式 默认情况下只有一个 Reduce, 输出只有一个文件, 默认文件名为 part-r-00000, 输出文件的个数与 Reduce 的个数一致 如果有两个 Reduce, 输出结果就有两个文件, 第一个为 part-r-00000, 第二个为 part-r-00001, 依次类推 2..1 OutputFormat 接口 OutputFormat 主要用于描述输出数据的格式, 它能够将用户提供的 key/value 对写入特定格式的文件中 通过 OutputFormat 接口, 实现具体的输出格式, 过程有些复杂也没有这个必要 Hadoop 自带了很多 OutputFormat 的实现, 它们与 InputFormat 实现相对应, 足够满足我们业务的需要 2..2 OutputFormat 接口实现类 OutputFormat 接口实现类有很多, 其层次结构如下图所示 1

Hadoop&Spark 大数据开发实战 图 1. OutputFormat 实现类 OutputFormat 是 MapReduce 输出的基类, 所有实现 MapReduce 输出都实现了 OutputFormat 接口 常用的 OutputFormat 实现类介绍 : 1) 文本输出默认的输出格式是 TextOutputFormat, 它把每条记录写为文本行 它的键和值可以是实现了 Writable 的任意类型, 因为 TextOutputFormat 调用 tostring() 方法把它们转换为字符串 每个键 / 值对由制表符进行分割, 当然也可以设定 mapreduce.output.textoutputformat.separator 属性 ( 旧版本 API 中的 mapred.textoutputformat.separator) 改变默认的分隔符 与 FileOutputFormat 对应的输入格式是 KeyValueTextInputFormat, 它通过可配置的分隔符将键 / 值对文本分割 可以使用 NullWritable 来省略输出的键或值 ( 或两者都省略, 相当于 NullOutputFormat 输出格式, 后者什么也不输出 ) 这也会导致无分隔符输出, 以使输出适合用 TextInputFormat 读取 2) 二进制输出 SequenceFileOutputFormat 将它的输出写为一个顺序文件 如果输出需要作为后续 MapReduce 任务的输入, 这便是一种好的输出格式, 因为它的格式紧凑, 很容易被压缩 2.4 Combiner 2.4.1 Combiner 概述 14

分布式计算框架 MapReduce 通过上面章节的学习我们可知,Hadoop 框架使用 Mapper 将数据处理成一个 <key,value> 键值对, 然后在网络节点间对其进行整理 (shuffle), 然后使用 Reducer 处理数据并进行最终输出 试想如果 存在这样一个实际场景 : 如果有 10 亿个数据,Mapper 会生成 10 亿个键值对在网络间进行传输, 但如果我们只是对数据 求最大值, 那么很明显的 Mapper 只需要输出它所知道的最大值即可 这样做不仅可以减轻网络压力, 同样也可以大幅度提高程序效率 在 MapReducer 框架中中,Combiner 就是为了避免 map 任务和 reduce 任务之间的数据传输而设 置的,Hadoop 允许用户针对 map task 的输出指定一个合并函数 即为了减少传输到 Reduce 中的数 据量 它主要是为了削减 Mapper 的输出数量, 从而减少网络带宽和 Reducer 之上的负载 我们可以把 Combiner 操作看成是一个在每个单独的节点上先做一次 Reducer 操作, 其输出和输 出的参数是和 Reduce 一样的 以 WordCount 为例,Combiner 的执行过程如下图所示 : 第章 图 1.4 Combiner 执行流程 注意事项 : 对于求和 求最值的方式我们是可以使用 Combiner 的, 但是求平均数是不能使用 Combiner 的 2.4.2 Combiner 在 WordCount 中的使用我们可以在 Map 输出之后添加一步 Combiner 的操作, 先进行一次聚合, 再由 Reduce 来处理, 15

Hadoop&Spark 大数据开发实战 进而使得传输数据减少, 提高执行效率 package com.kgc.bigdata.hadoop.mapreduce.combiner; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import java.io.ioexception; import java.util.stringtokenizer; /** * WordCount 中使用 Combiner */ public class WordCountCombinerApp { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(object key, Text value, Mapper.Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasmoretokens()) { word.set(itr.nexttoken()); context.write(word, one); public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { 16

分布式计算框架 MapReduce 第章 private IntWritable result = new IntWritable(); public void reduce(text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); result.set(sum); context.write(key, result); public static void main(string[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setjarbyclass(wordcountcombinerapp.class); job.setmapperclass(tokenizermapper.class); // 通过 job 设置 Combiner 处理类, 其实逻辑就可以直接使用 Reducer job.setcombinerclass(intsumreducer.class); job.setreducerclass(intsumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0 : 1); 代码 1.2 Combiner 操作 2.5 Partitioner 17

Hadoop&Spark 大数据开发实战 2.5.1 Partitioner 概述在进行 MapReduce 计算时, 有时候需要把最终的输出数据分到不同的文件中, 比如按照省份划分的话, 需要把同一省份的数据放到一个文件中 ; 按照性别划分的话, 需要把同一性别的数据放到一个文件中 我们知道最终的输出数据是来自于 Reducer 任务 那么, 如果要得到多个文件, 意味着有同样数量的 Reducer 任务在运行 Reducer 任务的数据来自于 Mapper 任务, 也就说 Mapper 任务要划分数据, 对于不同的数据分配给不同的 Reducer 任务运行 Mapper 任务划分数据的过程就称作 Partition 负责实现划分数据的类称作 Partitioner MapReduce 默认的 partitioner 是 HashPartitioner 默认情况下,partitioner 先计算 key 的散列值 ( 通常为 md5 值 ) 然后通过 reducer 个数执行取模运算 :key.hashcode%(reducer 个数 ) 这种方式不仅能够随机地将整个 key 空间平均分发给每个 reducer, 同时也能确保不同 mapper 产生的相同 key 能被分发到同一个 reducer 2.5.2 Partitioner 案例 1) 需求 : 分别统计每种类型手机的销售情况, 每种类型手机统计数据单独存放在一个结果中 2) 代码实现 : package com.kgc.bigdata.hadoop.mapreduce.partitioner; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import java.io.ioexception; import java.net.uri; /** 18

分布式计算框架 MapReduce 第章 * 自定义 Partitoner 在 MapReduce 中的应用 */ public class PartitionerApp { private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { String[] s = value.tostring().split("\t"); context.write(new Text(s[0]), new IntWritable(Integer.parseInt(s[1]))); private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce(text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : value) { sum += val.get(); context.write(key, new IntWritable(sum)); public static class MyPartitioner extends Partitioner<Text, IntWritable> { // 转发给 4 个不同的 reducer public int getpartition(text key, IntWritable value, int numpartitons) { if (key.tostring().equals("xiaomi")) return 0; if (key.tostring().equals("huawei")) return 1; if (key.tostring().equals("iphone7")) 19

Hadoop&Spark 大数据开发实战 return ; return 2; // driver public static void main(string[] args) throws Exception { String INPUT_PATH = "hdfs://hadoop000:8020/partitioner"; String OUTPUT_PATH = "hdfs://hadoop000:8020/outputpartitioner"; Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH), conf); if (filesystem.exists(new Path(OUTPUT_PATH))) { filesystem.delete(new Path(OUTPUT_PATH), true); Job job = Job.getInstance(conf, "PartitionerApp"); // run jar class job.setjarbyclass(partitionerapp.class); // 设置 map job.setmapperclass(mymapper.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); // 设置 reduce job.setreducerclass(myreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); // 设置 Partitioner job.setpartitionerclass(mypartitioner.class); // 设置 4 个 reducer, 每个分区一个 job.setnumreducetasks(4); 20

分布式计算框架 MapReduce 第章 // input formart job.setinputformatclass(textinputformat.class); Path inputpath = new Path(INPUT_PATH); FileInputFormat.addInputPath(job, inputpath); // output format job.setoutputformatclass(textoutputformat.class); Path outputpath = new Path(OUTPUT_PATH); FileOutputFormat.setOutputPath(job, outputpath); // 提交 job System.exit(job.waitForCompletion(true)? 0 : 1); 代码 1. Partitioner 操作 2.5. 提交作业到集群运行 1) 使用 mvn clean package -DskipTests 打成 hadoop-1.0-snapshot.jar, 然后上传到 /home/hadoop/lib 目录下面 ; 2) 将测试数据上传到 HDFS 目录中 hadoop fs -mkdir /partitioner hadoop fs -put part_1.txt part_2.txt /partitioner ) 提交 MapReduce 作业到集群运行 hadoop jar /home/hadoop/lib/hadoop-1.0-snapshot.jar com.kgc.bigdata.hadoop.mapreduce.partitioner.partitionerapp 4) 查看作业输出结果 hadoop fs -ls /outputpartitioner Found 5 items -rw-r--r-- 1 hadoop supergroup 0 2017-02-19 1:40 /outputpartitioner/_success -rw-r--r-- 1 hadoop supergroup 51 2017-02-19 1:40 /outputpartitioner/part-r-00000 -rw-r--r-- 1 hadoop supergroup 51 2017-02-19 1:40 /outputpartitioner/part-r-00001 -rw-r--r-- 1 hadoop supergroup 52 2017-02-19 1:40 /outputpartitioner/part-r-00002 -rw-r--r-- 1 hadoop supergroup 52 2017-02-19 1:40 /outputpartitioner/part-r-0000 hadoop fs -text /outputpartitioner/part-r-00000 xiaomi 5 21

Hadoop&Spark 大数据开发实战 hadoop fs -text /outputpartitioner/part-r-00001 huawei 11 hadoop fs -text /outputpartitioner/part-r-00002 iphone7 120 hadoop fs -text /outputpartitioner/part-r-0000 iphone7p 120 2.6 RecordReader 2.6.1 RecordReader 概述 RecordReader 表示以怎样的方式从分片中读取一条记录, 每读取一条记录都会调用 RecordReader 类, 系统默认的 RecordReader 是 LineRecordReader, 它是 TextInputFormat 对应的 RecordReader; 而 SequenceFileInputFormat 对应的 RecordReader 是 SequenceFileRecordReader LineRecordReader 是每行的偏移量作为读入 map 的 key, 每行的内容作为读入 map 的 value 很多时候 Hadoop 内置的 RecordReader 并不能满足我们的需求, 比如我们在读取记录的时候, 希望 Map 读入的 Key 值不是偏移量而是行号或者是文件名, 这时候就需要我们自定义 RecordReader 自定义 RecordReader 的实现步骤 1) 继承抽象类 RecordReader, 实现 RecordReader 的一个实例 2) 实现自定义 InputFormat 类, 重写 InputFormat 中的 CreateRecordReader() 方法, 返回值是自定义的 RecordReader 实例 ) 配置 job.setinputformatclass() 为自定义的 InputFormat 实例 2.6.2 RecordReader 案例 1) 需求 : 统计 data 文件中奇数行和偶数行的和 2) 代码实现 : package com.kgc.bigdata.hadoop.mapreduce.recordreader; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; 22

分布式计算框架 MapReduce 第章 import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.inputsplit; import org.apache.hadoop.mapreduce.recordreader; import org.apache.hadoop.mapreduce.taskattemptcontext; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import java.io.ioexception; /** * 自定义 InputFormat */ public class MyInputFormat extends FileInputFormat<LongWritable, Text> { public RecordReader<LongWritable, Text> createrecordreader(inputsplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 返回自定义的 RecordReader return new RecordReaderApp.MyRecordReader(); /** * 为了使得切分数据的时候行号不发生错乱, 这里设置为不进行切分 */ protected boolean issplitable(filesystem fs, Path filename) { return false; package com.kgc.bigdata.hadoop.mapreduce.recordreader; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.partitioner; /** * 自定义 Partitioner */ public class MyPartitioner extends Partitioner<LongWritable, Text> { 2

Hadoop&Spark 大数据开发实战 public int getpartition(longwritable key, Text value, int numpartitions) { // 偶数放到第二个分区进行计算 if (key.get() % 2 == 0) { // 将输入到 reduce 中的 key 设置为 1 key.set(1); return 1; else {// 奇数放在第一个分区进行计算 // 将输入到 reduce 中的 key 设置为 0 key.set(0); return 0; package com.kgc.bigdata.hadoop.mapreduce.recordreader; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.filesplit; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; import org.apache.hadoop.util.linereader; import java.io.ioexception; import java.net.uri; /** * 自定义 RecordReader 在 MapReduce 中的使用 */ public class RecordReaderApp { 24

分布式计算框架 MapReduce public static class MyRecordReader extends RecordReader<LongWritable, Text> { 第章 // 起始位置 ( 相对整个分片而言 ) private long start; // 结束位置 ( 相对整个分片而言 ) private long end; // 当前位置 private long pos; // 文件输入流 private FSDataInputStream fin = null; //key value private LongWritable key = null; private Text value = null; // 定义行阅读器 (hadoop.util 包下的类 ) private LineReader reader = null; public void initialize(inputsplit split, TaskAttemptContext context) throws IOException { // 获取分片 FileSplit filesplit = (FileSplit) split; // 获取起始位置 start = filesplit.getstart(); // 获取结束位置 end = start + filesplit.getlength(); // 创建配置 Configuration conf = context.getconfiguration(); // 获取文件路径 Path path = filesplit.getpath(); // 根据路径获取文件系统 FileSystem filesystem = path.getfilesystem(conf); // 打开文件输入流 fin = filesystem.open(path); // 找到开始位置开始读取 fin.seek(start); 25

Hadoop&Spark 大数据开发实战 // 创建阅读器 reader = new LineReader(fin); // 将当期位置置为 1 pos = 1; public boolean nextkeyvalue() throws IOException, InterruptedException { if (key == null) { key = new LongWritable(); key.set(pos); if (value == null) { value = new Text(); if (reader.readline(value) == 0) { return false; pos++; return true; public LongWritable getcurrentkey() throws IOException, InterruptedException { return key; public Text getcurrentvalue() throws IOException, InterruptedException { return value; public float getprogress() throws IOException, InterruptedException { 26

分布式计算框架 MapReduce 第章 return 0; public void close() throws IOException { fin.close(); public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { protected void map(longwritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { // 直接将读取的记录写出去 context.write(key, value); public static class MyReducer extends Reducer<LongWritable, Text, Text, LongWritable> { // 创建写出去的 key 和 value private Text outkey = new Text(); private LongWritable outvalue = new LongWritable(); protected void reduce(longwritable key, Iterable<Text> values, Reducer<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { System.out.println(" 奇数行还是偶数行 :" + key); // 定义求和的变量 long sum = 0; // 遍历 value 求和 for (Text val : values) { // 累加 sum += Long.parseLong(val.toString()); 27

Hadoop&Spark 大数据开发实战 // 判断奇偶数 if (key.get() == 0) { outkey.set(" 奇数之和为 :"); else { outkey.set(" 偶数之和为 :"); // 设置 value outvalue.set(sum); // 把结果写出去 context.write(outkey, outvalue); // driver public static void main(string[] args) throws Exception { String INPUT_PATH = "hdfs://hadoop000:8020/recordreader"; String OUTPUT_PATH = "hdfs://hadoop000:8020/outputrecordreader"; Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH), conf); if (filesystem.exists(new Path(OUTPUT_PATH))) { filesystem.delete(new Path(OUTPUT_PATH), true); Job job = Job.getInstance(conf, "RecordReaderApp"); // run jar class job.setjarbyclass(recordreaderapp.class); // 1.1 设置输入目录和设置输入数据格式化的类 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setinputformatclass(myinputformat.class); 28

分布式计算框架 MapReduce // 1.2 设置自定义 Mapper 类和设置 map 函数输出数据的 key 和 value 的类型 job.setmapperclass(mymapper.class); job.setmapoutputkeyclass(longwritable.class); job.setmapoutputvalueclass(text.class); 第章 // 1. 设置分区和 reduce 数量 (reduce 的数量, 和分区的数量对应, 因为分区为一个, 所以 reduce 的数量也是一个 ) job.setpartitionerclass(mypartitioner.class); job.setnumreducetasks(2); // 2.1 Shuffle 把数据从 Map 端拷贝到 Reduce 端 // 2.2 指定 Reducer 类和输出 key 和 value 的类型 job.setreducerclass(myreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(longwritable.class); // 2. 指定输出的路径和设置输出的格式化类 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.setoutputformatclass(textoutputformat.class); // 提交 job System.exit(job.waitForCompletion(true)? 0 : 1); 代码 1.4 RecordReader 操作 2.6. 提交作业到集群运行 1) 使用 mvn clean package -DskipTests 打成 hadoop-1.0-snapshot.jar, 然后上传到 /home/hadoop/lib 目录下面 ; 2) 将测试数据上传到 HDFS 目录中 hadoop fs -mkdir /recordreader hadoop fs -put recordreader.txt /recordreader ) 提交 MapReduce 作业到集群运行 hadoop jar /home/hadoop/lib/hadoop-1.0-snapshot.jar com.kgc.bigdata.hadoop.mapreduce.recordreader.recordreaderapp 4) 查看作业输出结果 29

Hadoop&Spark 大数据开发实战 hadoop fs -ls /outputrecordreader Found items -rw-r--r-- 1 hadoop supergroup 0 2017-02-19 14:0 /outputrecordreader/_success -rw-r--r-- 1 hadoop supergroup 69 2017-02-19 14:0 /outputrecordreader/part-r-00000 -rw-r--r-- 1 hadoop supergroup 67 2017-02-19 14:0 /outputrecordreader/part-r-00001 hadoop fs -text /outputrecordreader/part-r-00000 奇数之和为 : 25 hadoop fs -text /outputrecordreader/part-r-00001 偶数之和为 : 0 至此, 在学习了以上相关知识后, 任务 2 就可以完成了 任务 MapReduce 高级编程 关键步骤如下 : 使用 MapReduce 完成 join 操作 使用 MapReduce 完成排序操作 使用 MapReduce 完成二次排序操作 使用 MapReduce 完成小文件合并操作.1 Join 的 MapReduce 实现.1.1 概述熟悉 SQL 的同学都知道, 使用 SQL 语法实现 join 是非常简单的, 只需要一条 SQL 语句即可实现, 但是在大数据场景下使用 MapReduce 编程模型实现 join 还是比较繁琐的 在实际生产中我们可以借助于 Hive Spark SQL 等框架来实现 join, 但是对于 join 的实现原理我个人觉得大家还是需要掌握的, 这对于理解 join 的底层实现是很有帮助的, 所以本节我们将学习如何使用 MapReduce API 来实现 join 0

.1.2 需求 分布式计算框架 MapReduce 第章 实现如下 SQL 的功能 : select e.empno,e.ename,d.deptno,d.dname from emp e join dept d on e.deptno=d.deptno; // 测试数据 emp.txt 769 SMITH CLERK 7902 1980-12-17 800.00 20 7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 00.00 0 7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 0 7566 JONES MANAGER 789 1981-4-2 2975.00 20. // 测试数据 dept.txt 10 ACCOUNTING NEW YORK 20 RESEARCH DALLAS 0 SALES CHICAGO 40 OPERATIONS BOSTON.1. MapReduce Map 端 join 的实现原理 1)Map 端读取所有的文件, 并在输出的内容里加上标示, 代表数据是从哪个文件里来的 2) 在 reduce 处理函数中, 按照标识对数据进行处理 ) 然后根据 Key 去 join 来求出结果直接输出.1.4 MapReduce Map 端 join 的代码实现 1) 员工类定义 package com.kgc.bigdata.hadoop.mapreduce.reducejoin; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; import org.apache.hadoop.io.writablecomparable; /** 1

Hadoop&Spark 大数据开发实战 * 员工对象 */ public class Emplyee implements WritableComparable { private String empno = ""; private String empname = ""; private String deptno = ""; private String deptname = ""; private int flag = 0; // 区分是员工还是部门 public Emplyee() { flag) { public Emplyee(String empno, String empname, String deptno, String deptname, int this.empno = empno; this.empname = empname; this.deptno = deptno; this.deptname = deptname; this.flag = flag; public Emplyee(Emplyee e) { this.empno = e.empno; this.empname = e.empname; this.deptno = e.deptno; this.deptname = e.deptname; this.flag = e.flag; public String getempno() { return empno; public void setempno(string empno) { this.empno = empno; 2

分布式计算框架 MapReduce 第章 public String getempname() { return empname; public void setempname(string empname) { this.empname = empname; public String getdeptno() { return deptno; public void setdeptno(string deptno) { this.deptno = deptno; public String getdeptname() { return deptname; public void setdeptname(string deptname) { this.deptname = deptname; public int getflag() { return flag; public void setflag(int flag) { this.flag = flag; public void readfields(datainput input) throws IOException { this.empno = input.readutf(); this.empname = input.readutf(); this.deptno = input.readutf(); this.deptname = input.readutf();

Hadoop&Spark 大数据开发实战 this.flag = input.readint(); public void write(dataoutput output) throws IOException { output.writeutf(this.empno); output.writeutf(this.empname); output.writeutf(this.deptno); output.writeutf(this.deptname); output.writeint(this.flag); // 不做排序 public int compareto(object o) { return 0; public String tostring() { return this.empno + "," + this.empname + "," + this.deptno + "," + this.deptname; 代码 1.5 ReduceJoin 实现 2) 自定义 Mapper 类开发 package com.kgc.bigdata.hadoop.mapreduce.reducejoin; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; import java.io.ioexception; public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Emplyee> { 4

分布式计算框架 MapReduce 第章 protected void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { String val = value.tostring(); String[] arr = val.split("\t"); System.out.println("arr.length=" + arr.length + " arr[0]=" + arr[0]); if (arr.length <= ) {//dept Emplyee e = new Emplyee(); e.setdeptno(arr[0]); e.setdeptname(arr[1]); e.setflag(1); context.write(new LongWritable(Long.valueOf(e.getDeptNo())), e); else {//emp Emplyee e = new Emplyee(); e.setempno(arr[0]); e.setempname(arr[1]); e.setdeptno(arr[7]); e.setflag(0); context.write(new LongWritable(Long.valueOf(e.getDeptNo())), e); ) 自定义 Reducer 类开发 package com.kgc.bigdata.hadoop.mapreduce.reducejoin; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.reducer; 5

Hadoop&Spark 大数据开发实战 import java.io.ioexception; import java.util.arraylist; import java.util.list; public class MyReducer extends Reducer<LongWritable, Emplyee, NullWritable, Text> { protected void reduce(longwritable key, Iterable<Emplyee> iter, Context context) throws IOException, InterruptedException { Emplyee dept = null; List<Emplyee> list = new ArrayList<Emplyee>(); for (Emplyee tmp : iter) { if (tmp.getflag() == 0) {//emp Emplyee emplyee = new Emplyee(tmp); list.add(emplyee); else { dept = new Emplyee(tmp); if (dept!= null) { for (Emplyee emp : list) { emp.setdeptname(dept.getdeptname()); context.write(nullwritable.get(), new Text(emp.toString())); 4) 驱动类开发 package com.kgc.bigdata.hadoop.mapreduce.reducejoin; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; 6

import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; 分布式计算框架 MapReduce 第章 import java.net.uri; /** * 使用 MapReduce API 完成 Reduce Join 的功能 */ public class EmpJoinApp { public static void main(string[] args) throws Exception { String INPUT_PATH = "hdfs://hadoop000:8020/inputjoin"; String OUTPUT_PATH = "hdfs://hadoop000:8020/outputmapjoin"; Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH), conf); if (filesystem.exists(new Path(OUTPUT_PATH))) { filesystem.delete(new Path(OUTPUT_PATH), true); Job job = Job.getInstance(conf, "Reduce Join"); // 设置主类 job.setjarbyclass(empjoinapp.class); // 设置 Map 和 Reduce 处理类 job.setmapperclass(mymapper.class); job.setreducerclass(myreducer.class); // 设置 Map 输出类型 job.setmapoutputkeyclass(longwritable.class); job.setmapoutputvalueclass(emplyee.class); 7

Hadoop&Spark 大数据开发实战 // 设置 Reduce 输出类型 job.setoutputkeyclass(nullwritable.class); job.setoutputvalueclass(emplyee.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true)? 0 : 1);.1.5 提交作业到集群运行 1) 使用 mvn clean package -DskipTests 打成 hadoop-1.0-snapshot.jar, 然后上传到 /home/hadoop/lib 目录下面 ; 2) 将测试数据上传到 HDFS 目录中 hadoop fs -mkdir /inputjoin hadoop fs -put emp.txt dept.txt /inputjoin ) 提交 MapReduce 作业到集群运行 hadoop jar /home/hadoop/lib/hadoop-1.0-snapshot.jar com.kgc.bigdata.hadoop.mapreduce.reducejoin.empjoinapp 4) 查看作业输出结果 hadoop fs -text /outputmapjoin/part* 794,MILLER,10,ACCOUNTING 789,KING,10,ACCOUNTING 7782,CLARK,10,ACCOUNTING 7876,ADAMS,20,RESEARCH 7788,SCOTT,20,RESEARCH 769,SMITH,20,RESEARCH 7566,JONES,20,RESEARCH 7902,FORD,20,RESEARCH 7844,TURNER,0,SALES 7499,ALLEN,0,SALES 8

分布式计算框架 MapReduce 第章 7698,BLAKE,0,SALES 7654,MARTIN,0,SALES 7521,WARD,0,SALES 7900,JAMES,0,SALES.2 排序的 MapReduce 实现.2.1 需求对输入文件中数据进行排序 输入文件中的每行内容均为一个数字, 即一个数据 要求在输出中每行有两个间隔的数字, 其中, 第一个代表原始数据在原始数据集中的位次, 第二个代表原始数据.2.2 MapReduce 排序的实现原理在 MapReduce 中默认就可以进行排序的, 如果 key 为封装 int 的 IntWritable 类型, 那么 MapReduce 按照数字大小对 key 排序, 如果 key 为封装为 String 的 Text 类型, 那么 MapReduce 按照字典顺序对字符串排序 我们能否使用内置的排序来完成这个功能呢? 答案是肯定的 在使用之前首先需要了解它的默认排序规则 它是按照 key 值进行排序的 我们就知道应该使用封装 int 的 IntWritable 型数据结构了 也就是在 map 中将读入的数据转化成 IntWritable 型, 然后作为 key 值输出 (value 任意 ) reduce 拿到 <key,value-list> 之后, 将输入的 key 作为 value 输出, 并根据 value-list 中元素的个数决定输出的次数 输出的 key 是一个全局变量, 它统计当前 key 的位次.2. MapReduce 排序的代码实现 package com.kgc.bigdata.hadoop.mapreduce.sort; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; 9

Hadoop&Spark 大数据开发实战 import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import java.io.ioexception; import java.net.uri; /** * 使用 MapReduce API 实现排序 */ public class SortApp { public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { private static IntWritable data = new IntWritable(); public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.tostring(); data.set(integer.parseint(line)); context.write(data, new IntWritable(1)); public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { private static IntWritable data = new IntWritable(1); public void reduce(intwritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(data, key); data = new IntWritable(data.get() + 1); public static void main(string[] args) throws Exception { 40

String INPUT_PATH = "hdfs://hadoop000:8020/sort"; String OUTPUT_PATH = "hdfs://hadoop000:8020/outputsort"; 分布式计算框架 MapReduce 第章 Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH), conf); if (filesystem.exists(new Path(OUTPUT_PATH))) { filesystem.delete(new Path(OUTPUT_PATH), true); Job job = Job.getInstance(conf, "SortApp"); // 设置主类 job.setjarbyclass(sortapp.class); // 设置 Map 和 Reduce 处理类 job.setmapperclass(mymapper.class); job.setreducerclass(myreducer.class); // 设置输出类型 job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(intwritable.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true)? 0 : 1); 代码 1.6 排序实现.2.4 提交作业到集群运行 1) 使用 mvn clean package -DskipTests 打成 hadoop-1.0-snapshot.jar, 然后上传到 /home/hadoop/lib 目录下面 ; 2) 将测试数据上传到 HDFS 目录中 hadoop fs -mkdir /sort 41

Hadoop&Spark 大数据开发实战 hadoop fs -put sort.txt /sort ) 提交 MapReduce 作业到集群运行 hadoop jar /home/hadoop/lib/hadoop-1.0-snapshot.jar com.kgc.bigdata.hadoop.mapreduce.sort.sortapp 4) 查看作业输出结果 hadoop fs -text /outputsort/part* 1 1 2 2 4 4 5 5 6 9. 二次排序的 MapReduce 实现..1 概述默认情况下,Map 输出的结果会对 Key 进行默认的排序, 但是有时候需要对 Key 排序的同时还需要对 Value 进行排序, 这就是所谓的二次排序..2 需求对输入文件中的数据 ( 每行两列, 列于列之间的分隔符是制表符 ), 输出结果先按照第一个字段的升序排列, 如果第一列的值相等, 就按照第二个字段的升序排列 形如 : 0 10 0 20 0 0 0 40 40 5 40 10 40 20 40 0 50 10 50 20 42

分布式计算框架 MapReduce 第章 50 50 50 60.. MapReduce 二次排序的实现原理 1)Mapper 任务会接收输入分片, 然后不断的调用 map 函数, 对记录进行处理 处理完毕后, 转换为新的 <key,value> 输出 2) 对 map 函数输出的 <key, value> 调用分区函数, 对数据进行分区 不同分区的数据会被送到不同的 Reducer 任务中 ) 对于不同分区的数据, 会按照 key 进行排序, 这里的 key 必须实现 WritableComparable 接口 该接口实现了 Comparable 接口, 因此可以进行比较排序 4) 对于排序后的 <key,value>, 会按照 key 进行分组 如果 key 相同, 那么相同 key 的 <key,value> 就被分到一个组中 最终, 每个分组会调用一次 reduce 函数 5) 排序 分组后的数据会被送到 Reducer 节点..4 MapReduce 二次排序的代码实现 package com.kgc.bigdata.hadoop.mapreduce.secondsort; import org.apache.hadoop.io.writablecomparable; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; public class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left, int right) { first = left; second = right; public int getfirst() { return first; public int getsecond() { 4

Hadoop&Spark 大数据开发实战 return second; public void readfields(datainput in) throws IOException { first = in.readint(); second = in.readint(); public void write(dataoutput out) throws IOException { out.writeint(first); out.writeint(second); public int hashcode() { return first+"".hashcode() + second+"".hashcode(); public boolean equals(object right) { if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first == first && r.second == second; else { return false; // 这里的代码是关键, 因为对 key 排序时, 调用的就是这个 compareto 方法 public int compareto(intpair o) { if (first!= o.first) { return first - o.first; else if (second!= o.second) { return second - o.second; else { return 0; 44

分布式计算框架 MapReduce 第章 package com.kgc.bigdata.hadoop.mapreduce.secondsort; import java.io.ioexception; import java.net.uri; import java.util.stringtokenizer; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; public class SecondarySortApp { IntWritable> { public static class MyMapper extends Mapper<LongWritable, Text, IntPair, private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); public void map(longwritable inkey, Text invalue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasmoretokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasmoretokens()) { 45

Hadoop&Spark 大数据开发实战 right = Integer.parseInt(itr.nextToken()); key.set(left, right); value.set(right); context.write(key, value); /** * 在分组比较的时候, 只比较原来的 key, 而不是组合 key */ public static class GroupingComparator implements RawComparator<IntPair> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); public int compare(intpair o1, IntPair o2) { int first1 = o1.getfirst(); int first2 = o2.getfirst(); return first1 - first2; { public static class MyReducer extends Reducer<IntPair, IntWritable, Text, IntWritable> private static final Text SEPARATOR = new Text("-------------"); private final Text first = new Text(); public void reduce(intpair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(separator, null); first.set(integer.tostring(key.getfirst())); for(intwritable value: values) { 46

分布式计算框架 MapReduce 第章 context.write(first, value); public static void main(string[] args) throws Exception { String INPUT_PATH = "hdfs://hadoop000:8020/secondsort"; String OUTPUT_PATH = "hdfs://hadoop000:8020/outputsecondsort"; Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH), conf); if (filesystem.exists(new Path(OUTPUT_PATH))) { filesystem.delete(new Path(OUTPUT_PATH), true); Job job = Job.getInstance(conf, "SecondarySortApp"); // 设置主类 job.setjarbyclass(secondarysortapp.class); // 输入路径 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 输出路径 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 设置 Map 和 Reduce 处理类 job.setmapperclass(mymapper.class); job.setreducerclass(myreducer.class); // 分组函数 job.setgroupingcomparatorclass(groupingcomparator.class); job.setmapoutputkeyclass(intpair.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); 47

Hadoop&Spark 大数据开发实战 // 输入输出格式 job.setinputformatclass(textinputformat.class); job.setoutputformatclass(textoutputformat.class); System.exit(job.waitForCompletion(true)? 0 : 1); 代码 1.7 二次排序实现..5 提交作业到集群运行 1) 使用 mvn clean package -DskipTests 打成 hadoop-1.0-snapshot.jar, 然后上传到 /home/hadoop/lib 目录下面 ; 2) 将测试数据上传到 HDFS 目录中 hadoop fs -mkdir /secondsort hadoop fs -put secondsort.txt /secondsort ) 提交 MapReduce 作业到集群运行 hadoop jar /home/hadoop/lib/hadoop-1.0-snapshot.jar com.kgc.bigdata.hadoop.mapreduce.secondsort.secondarysortapp 4) 查看作业输出结果 hadoop fs -text /outputsecondsort/part* ------------------------------------------------ 0 10 0 20 0 0 0 40 ------------------------------------------------ 40 5 40 10 40 20 40 0 ------------------------------------------------ 50 10 50 20 50 50 50 60 48

.4 合并小文件的 MapReduce 实现 分布式计算框架 MapReduce 第章.4.1 概述 Hadoop 对处理单个大文件比处理多个小文件更有效率, 另外单个文件也非常占用 HDFS 的存储空间 所以往往要将其合并起来.4.2 需求通过 MapReduce API 对小文件进行合并, 输出成 SequenceFile.4. 合并小文件的代码实现 package com.kgc.bigdata.hadoop.mapreduce.merge; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.byteswritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.mapreduce.inputsplit; import org.apache.hadoop.mapreduce.jobcontext; import org.apache.hadoop.mapreduce.recordreader; import org.apache.hadoop.mapreduce.taskattemptcontext; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import java.io.ioexception; /** * 实现将整个文件作为一条记录处理的 InputFormat */ public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { // 设置每个小文件不可分片, 保证一个小文件生成一个 key-value 键值对 protected boolean issplitable(jobcontext context, Path file) { return false; public RecordReader<NullWritable, BytesWritable> createrecordreader( 49

Hadoop&Spark 大数据开发实战 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; package com.kgc.bigdata.hadoop.mapreduce.merge; import java.io.ioexception; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.fsdatainputstream; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.byteswritable; import org.apache.hadoop.io.ioutils; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.mapreduce.inputsplit; import org.apache.hadoop.mapreduce.recordreader; import org.apache.hadoop.mapreduce.taskattemptcontext; import org.apache.hadoop.mapreduce.lib.input.filesplit; /** * 实现一个定制的 RecordReader, 这六个方法均为继承的 RecordReader */ class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit filesplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; public void initialize(inputsplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.filesplit = (FileSplit) split; 50

分布式计算框架 MapReduce 第章 this.conf = context.getconfiguration(); public boolean nextkeyvalue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) filesplit.getlength()]; Path file = filesplit.getpath(); FileSystem fs = file.getfilesystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); finally { IOUtils.closeStream(in); processed = true; return true; return false; public NullWritable getcurrentkey() throws IOException, InterruptedException { return NullWritable.get(); public BytesWritable getcurrentvalue() throws IOException, InterruptedException { return value; public float getprogress() throws IOException { return processed? 1.0f : 0.0f; 51

Hadoop&Spark 大数据开发实战 public void close() throws IOException { // do nothing package com.kgc.bigdata.hadoop.mapreduce.merge; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.byteswritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.inputsplit; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.filesplit; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.sequencefileoutputformat; import java.io.ioexception; import java.net.uri; /** * 使用 MapReduce API 完成文件合并的功能 */ public class MergeApp { /** * 将小文件打包成 SequenceFile */ static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { private Text filenamekey; 52

分布式计算框架 MapReduce 第章 protected void setup(context context) throws IOException, InterruptedException { InputSplit split = context.getinputsplit(); Path path = ((FileSplit) split).getpath(); filenamekey = new Text(path.toString()); protected void map(nullwritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(filenamekey, value); public static void main(string[] args) throws Exception { String INPUT_PATH = "hdfs://hadoop000:8020/inputmerge"; String OUTPUT_PATH = "hdfs://hadoop000:8020/outputmerge"; Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH), conf); if (filesystem.exists(new Path(OUTPUT_PATH))) { filesystem.delete(new Path(OUTPUT_PATH), true); Job job = Job.getInstance(conf, "MergeApp"); // 设置主类 job.setjarbyclass(mergeapp.class); job.setinputformatclass(wholefileinputformat.class); job.setoutputformatclass(sequencefileoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(byteswritable.class); job.setmapperclass(sequencefilemapper.class); 5

Hadoop&Spark 大数据开发实战 // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true)? 0 : 1); 代码 1.8 合并小文件实现.4.4 提交作业到集群运行 1) 使用 mvn clean package -DskipTests 打成 hadoop-1.0-snapshot.jar, 然后上传到 /home/hadoop/lib 目录下面 ; 2) 将测试数据上传到 HDFS 目录中 hadoop fs -mkdir /inputmerge hadoop fs -put secondsort.txt /inputmerge ) 提交 MapReduce 作业到集群运行 hadoop jar /home/hadoop/lib/hadoop-1.0-snapshot.jar com.kgc.bigdata.hadoop.mapreduce.merge.mergeapp 4) 查看作业输出结果 hadoop fs -ls /outputmerge Found 2 items -rw-r--r-- 1 hadoop supergroup 0 2017-02-18 19:2 /outputmerge/_success -rw-r--r-- 1 hadoop supergroup 760548 2017-02-18 19:2 /outputmerge/part-r-00000 至此, 在学习了以上相关知识后, 任务 就可以完成了 本章总结 本章学习了以下知识点 : 掌握 MapReduce 的编程模型 掌握 MapReduce 中 Combiner Partitioner 的使用 掌握使用 MapReduce API 完成常用的功能 本章作业 54

分布式计算框架 MapReduce 第章 1. 使用 MapReduce API 完成如下功能 需求 : 统计每个手机号码的流量 ( 上行 下行 ) 数据包 ( 上行 下行 ) 输出结果字段 : 手机号码 上行数据包数 下行数据包数 上行数据量 下行数据量 输入文件字段说明 : 55