目录 1 本期内容 MapReduce 理论简介 MapReduce 编程模型 MapReduce 处理过程 运行 WordCount 程序 准备工作 运行例子

Similar documents
三种方法实现Hadoop(MapReduce)全局排序(1)

使用MapReduce读取XML文件

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

三种方法实现Hadoop(MapReduce)全局排序(2)

MapReduce

目录 1 本期内容 系统管理 连接 MySQL 修改新密码 增加新用户 启动停止 MySQL 数据库操作 库操作 表操作... 5

前 言

7521,WARD,SALESMAN,7698,22-2 月 -81,1250,500, ,JONES,MANAGER,7839,02-4 月 -81,2975,, ,MARTIN,SALESMAN,7698,28-9 月 -81,1250,1400, ,BLAK

Hadoop 集群 ( 第 9 期 ) MapReduce 初级案例 1 数据去重 数据去重 主要是为了掌握和利用并行化思想来对数据进行有意义的筛选 统计大 数据集上的数据种类个数 从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据 去重 下面就进入这个实例的 MapReduce 程序设计 1.


Guava学习之Resources

SDK 概要 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk" 获取不同版本的 Java SDK: 包名 odps-sdk-core odps-sdk-commons odps-sdk-udf odps-sdk-mapred odps-sdk-graph 描述 ODPS 基

Hadoop_Jordan

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

OOP with Java 通知 Project 4: 4 月 19 日晚 9 点

chp6.ppt

JavaIO.PDF

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

C++ 程序设计 告别 OJ1 - 参考答案 MASTER 2019 年 5 月 3 日 1

软件工程文档编制

OOP with Java 通知 Project 3: 3 月 29 日晚 9 点 4 月 1 日上课

内 容 简 介 本 书 是 一 本 关 于 语 言 程 序 设 计 的 教 材, 涵 盖 了 语 言 的 基 本 语 法 和 编 程 技 术, 其 中 包 含 了 作 者 对 语 言 多 年 开 发 经 验 的 总 结, 目 的 是 让 初 学 的 读 者 感 受 到 语 言 的 魅 力, 并 掌

OOP with Java 通知 Project 4: 5 月 2 日晚 9 点

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

培 训 机 构 介 绍 中 科 普 开 是 国 内 首 家 致 力 于 IT 新 技 术 领 域 的 领 航 者, 专 注 于 云 计 算 大 数 据 物 联 网 移 动 互 联 网 技 术 的 培 训, 也 是 国 内 第 一 家 开 展 Hadoop 云 计 算 的 培

幻灯片 1

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

EJB-Programming-4-cn.doc

Microsoft PowerPoint - hbase_program(0201).ppt

untitled

res/layout 目录下的 main.xml 源码 : <?xml version="1.0" encoding="utf 8"?> <TabHost android:layout_height="fill_parent" xml

Hive:用Java代码通过JDBC连接Hiveserver

Chapter 9: Objects and Classes

Microsoft Word - Broker.doc

目 录 1 不 断 开 发 工 具 以 管 理 大 数 据 Hadoop* 简 介 : 支 持 从 大 数 据 中 获 得 出 色 价 值 的 可 靠 框 架 大 数 据 技 术 的 行 业 生 态 系 统 在 关 键 组 件 中 实 现 平 衡...

前言 C# C# C# C C# C# C# C# C# microservices C# More Effective C# More Effective C# C# C# C# Effective C# 50 C# C# 7 Effective vii

全国计算机技术与软件专业技术资格(水平)考试

Learning Java

Microsoft Word - 01.DOC

一 登录 crm Mobile 系统 : 输入 ShijiCare 用户名和密码, 登录系统, 如图所示 : 第 2 页共 32 页

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

《大话设计模式》第一章

javaexample-02.pdf

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

D getinitparameternames() 9 下 列 选 项 中, 属 于 Servlet API 中 提 供 的 request 对 象 的 包 装 类 的 是 ( ) A HttpServletRequestWrapper B HttpServletRequest C HttpServ

1. 访 问 最 新 发 行 公 告 信 息 jconnect for JDBC 访 问 最 新 发 行 公 告 信 息 最 新 版 本 的 发 行 公 告 可 以 从 网 上 获 得 若 要 查 找 在 本 产 品 发 布 后 增 加 的 重 要 产 品 或 文 档 信 息, 请 访

ChinaBI企业会员服务- BI企业

内 容 提 要 将 JAVA 开 发 环 境 迁 移 到 Linux 系 统 上 是 现 在 很 多 公 司 的 现 实 想 法, 而 在 Linux 上 配 置 JAVA 开 发 环 境 是 步 入 Linux 下 JAVA 程 序 开 发 的 第 一 步, 本 文 图 文 并 茂 地 全 程 指

1 1 大概思路 创建 WebAPI 创建 CrossMainController 并编写 Nuget 安装 microsoft.aspnet.webapi.cors 跨域设置路由 编写 Jquery EasyUI 界面 运行效果 2 创建 WebAPI 创建 WebAPI, 新建 -> 项目 ->

IoC容器和Dependency Injection模式.doc

无类继承.key

Microsoft Word - 在VMWare-5.5+RedHat-9下建立本机QTopia-2.1.1虚拟平台a.doc

Reducing Client Incidents through Big Data Predictive Analytics

提问袁小兵:

untitled

Java

<4D F736F F D20C8EDC9E82DCFC2CEE7CCE22D3039C9CF>

Microsoft Word - PHP7Ch01.docx

java2d-4.PDF

, 即 使 是 在 昏 暗 的 灯 光 下, 她 仍 然 可 以 那 么 耀 眼 我 没 有 地 方 去, 你 会 带 着 我 么 杜 晗 像 是 在 嘲 笑 一 般, 嘴 角 的 一 抹 冷 笑 有 着 不 适 合 这 个 年 龄 的 冷 酷 和 无 情, 看 着 江 华 的 眼 神 毫 无 温

Contents 1 开始 概要 前提 如何创建和运行一个样例程序 Hadoop 前提 概要 Hadoop 提供的样例 样例 : TestDFSIO (Stre

目录 1 IPv6 快速转发 IPv6 快速转发配置命令 display ipv6 fast-forwarding aging-time display ipv6 fast-forwarding cache ipv6 fas

untitled

帝国CMS下在PHP文件中调用数据库类执行SQL语句实例

2. AOP 底层技术实现 小风 Java 实战系列教程 关键词 : 代理模式 代理模型分为两种 : 1) 接口代理 (JDK 动态代理 ) 2) 子类代理 (Cglib 子类代理 ) 需求 :CustomerService 业务类, 有 save,update 方法, 希望在 save,updat

4.1 AMI MQSeries API AMI MQI AMI / / AMI JavaC C++ AMI / AMI AMI - / /

Transcription:

细细品味 Hadoop Hadoop 集群 ( 第 6 期 ) 精华集锦 csaxp http://www.xiapistudio.com/ 2012 年 3 月 1 日

目录 1 本期内容... 2 1.1 MapReduce 理论简介... 2 1.1.1 MapReduce 编程模型... 2 1.1.2 MapReduce 处理过程... 2 1.2 运行 WordCount 程序... 3 1.2.1 准备工作... 3 1.2.2 运行例子... 4 1.2.3 查看结果... 5 1.3 WordCount 源码分析... 5 1.3.1 特别数据类型介绍... 5 1.3.2 旧的 WordCount 分析... 5 1.3.3 新的 WordCount 分析... 11 1.4 WordCount 处理过程... 14 1.5 MapReduce 新旧改变... 16 2 参考文献... 17 3 打赏小编... 18 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 1

Hadoop 集群 ( 第 6 期 ) WordCount 运行详解 1 本期内容 1.1 MapReduce 理论简介 1.1.1 MapReduce 编程模型 MapReduce 采用 分而治之 的思想, 把对大规模数据集的操作, 分发给一个主节点管理下的各个分节点共同完成, 然后通过整合各个节点的中间结果, 得到最终结果 简单地说, MapReduce 就是 任务的分解与结果的汇总 在 Hadoop 中, 用于执行 MapReduce 任务的机器角色有两个 : 一个是 JobTracker; 另一个是 TaskTracker,JobTracker 是用于调度工作的,TaskTracker 是用于执行工作的 一个 Hadoop 集群中只有一台 JobTracker 在分布式计算中,MapReduce 框架负责处理了并行编程中分布式存储 工作调度 负载均衡 容错均衡 容错处理以及网络通信等复杂问题, 把处理过程高度抽象为两个函数 :map 和 reduce,map 负责把任务分解成多个任务,reduce 负责把分解后多任务处理的结果汇总起来 需要注意的是, 用 MapReduce 来处理的数据集 ( 或任务 ) 必须具备这样的特点 : 待处理的数据集可以分解成许多小的数据集, 而且每一个小数据集都可以完全并行地进行处理 1.1.2 MapReduce 处理过程 在 Hadoop 中, 每个 MapReduce 任务都被初始化为一个 Job, 每个 Job 又可以分为两种阶段 :map 阶段和 reduce 阶段 这两个阶段分别用两个函数表示, 即 map 函数和 reduce 函数 map 函数接收一个 <key,value> 形式的输入, 然后同样产生一个 <key,value> 形式的中间输出,Hadoop 函数接收一个如 <key,(list of values)> 形式的输入, 然后对这个 value 集合进行处理, 每个 reduce 产生 0 或 1 个输出,reduce 的输出也是 <key,value> 形式的 Input Mappers Middle Result Reducers Output split0 map() reduce() part0 split1 map() split2 map() reduce() part1 MapReduce 处理大数据集的过程 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 2

1.2 运行 WordCount 程序 单词计数是最简单也是最能体现 MapReduce 思想的程序之一, 可以称为 MapReduce 版 Hello World, 该程序的完整代码可以在 Hadoop 安装包的 src/examples 目录下找到 单词计数主要完成功能是 : 统计一系列文本文件中每个单词出现的次数, 如下图所示 Hello World Hello Hadoop Hello 2 World 1 Hadoop 1 1.2.1 准备工作 现在以 hadoop 普通用户登录 Master.Hadoop 服务器 1) 创建本地示例文件 首先在 /home/hadoop 目录下创建文件夹 file 接着创建两个文本文件 file1.txt 和 file2.txt, 使 file1.txt 内容为 Hello World, 而 file2.txt 的内容为 Hello Hadoop 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 3

2) 在 HDFS 上创建输入文件夹 3) 上传本地 file 中文件到集群的 input 目录下 1.2.2 运行例子 1) 在集群上运行 WordCount 程序备注 : 以 input 作为输入目录,output 目录作为输出目录 已经编译好的 WordCount 的 Jar 在 /usr/hadoop 下面, 就是 hadoop-examples-1.0.0.jar, 所以在下面执行命令时记得把路径写全了, 不然会提示找不到该 Jar 包 2)MapReduce 执行过程显示信息 Hadoop 命令会启动一个 JVM 来运行这个 MapReduce 程序, 并自动获得 Hadoop 的配置, 同时把类的路径 ( 及其依赖关系 ) 加入到 Hadoop 的库中 以上就是 Hadoop Job 的运行记录, 从这里可以看到, 这个 Job 被赋予了一个 ID 号 :job_201202292213_0002, 而且得知输入文件有两个 (Total input paths to process : 2), 同时还可以了解 map 的输入输出记录 (record 数及字节数 ), 以及 reduce 输入输出记录 比如说, 在本例中,map 的 task 数量是 2 个, 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 4

reduce 的 task 数量是一个 map 的输入 record 数是 2 个, 输出 record 数是 4 个等信息 1.2.3 查看结果 1) 查看 HDFS 上 output 目录内容 从上图中知道生成了三个文件, 我们的结果在 part-r-00000 中 2) 查看结果输出文件内容 1.3 WordCount 源码分析 1.3.1 特别数据类型介绍 Hadoop 提供了如下内容的数据类型, 这些数据类型都实现了 WritableComparable 接口, 以便用这些类型定义的数据可以被序列化进行网络传输和文件存储, 以及进行大小比较 BooleanWritable: 标准布尔型数值 ByteWritable: 单字节数值 DoubleWritable: 双字节数 FloatWritable: 浮点数 IntWritable: 整型数 LongWritable: 长整型数 Text: 使用 UTF8 格式存储的文本 NullWritable: 当 <key,value> 中的 key 或 value 为空时使用 1.3.2 旧的 WordCount 分析 1) 源代码程序 package org.apache.hadoop.examples; 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 5

import java.io.ioexception; import java.util.iterator; import java.util.stringtokenizer; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapred.fileinputformat; import org.apache.hadoop.mapred.fileoutputformat; import org.apache.hadoop.mapred.jobclient; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.mapreducebase; import org.apache.hadoop.mapred.mapper; import org.apache.hadoop.mapred.outputcollector; import org.apache.hadoop.mapred.reducer; import org.apache.hadoop.mapred.reporter; import org.apache.hadoop.mapred.textinputformat; import org.apache.hadoop.mapred.textoutputformat; public class WordCount public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable( 1 ); private Text word = new Text(); public void map(longwritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.tostring(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasmoretokens()) word.set(tokenizer.nexttoken()); output.collect(word, one); 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 6

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> public void reduce(text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException int sum = 0 ; while (values.hasnext()) sum += values.next().get(); output.collect(key, new IntWritable(sum)); public static void main(string[] args) throws Exception JobConf conf = new JobConf(WordCount. class ); conf.setjobname("wordcount" ); conf.setoutputkeyclass(text.class ); conf.setoutputvalueclass(intwritable.class ); conf.setmapperclass(map.class ); conf.setcombinerclass(reduce.class ); conf.setreducerclass(reduce.class ); conf.setinputformat(textinputformat.class ); conf.setoutputformat(textoutputformat.class ); FileInputFormat.setInputPaths(conf, new Path(args[ 0 ])); FileOutputFormat.setOutputPath(conf, new Path(args[ 1 ])); JobClient.runJob(conf); 3) 主方法 Main 分析 public static void main(string[] args) throws Exception JobConf conf = new JobConf(WordCount. class ); conf.setjobname("wordcount" ); 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 7

conf.setoutputkeyclass(text.class ); conf.setoutputvalueclass(intwritable.class ); conf.setmapperclass(map.class ); conf.setcombinerclass(reduce.class ); conf.setreducerclass(reduce.class ); conf.setinputformat(textinputformat.class ); conf.setoutputformat(textoutputformat.class ); FileInputFormat.setInputPaths(conf, new Path(args[ 0 ])); FileOutputFormat.setOutputPath(conf, new Path(args[ 1 ])); JobClient.runJob(conf); 首先讲解一下 Job 的初始化过程 main 函数调用 Jobconf 类来对 MapReduce Job 进行 初始化, 然后调用 setjobname() 方法命名这个 Job 对 Job 进行合理的命名有助于更快地找 到 Job, 以便在 JobTracker 和 Tasktracker 的页面中对其进行监视 JobConf conf = new JobConf(WordCount. class ); conf.setjobname("wordcount" ); 接着设置 Job 输出结果 <key,value> 的中 key 和 value 数据类型, 因为结果是 < 单词, 个数 >, 所以 key 设置为 Text 类型, 相当于 Java 中 String 类型 Value 设置为 IntWritable, 相 当于 Java 中的 int 类型 conf.setoutputkeyclass(text.class ); conf.setoutputvalueclass(intwritable.class ); 然后设置 Job 处理的 Map( 拆分 ) Combiner( 中间结果合并 ) 以及 Reduce( 合并 ) 的 相关处理类 这里用 Reduce 类来进行 Map 产生的中间结果合并, 避免给网络数据传输产生 压力 conf.setmapperclass(map.class ); conf.setcombinerclass(reduce.class ); conf.setreducerclass(reduce.class ); 接着就是调用 setinputpath() 和 setoutputpath() 设置输入输出路径 conf.setinputformat(textinputformat.class ); conf.setoutputformat(textoutputformat.class ); 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 8

(1)InputFormat 和 InputSplit InputSplit 是 Hadoop 定义的用来传送给每个单独的 map 的数据,InputSplit 存储的并非数据本身, 而是一个分片长度和一个记录数据位置的数组 生成 InputSplit 的方法可以通过 InputFormat() 来设置 当数据传送给 map 时,map 会将输入分片传送到 InputFormat,InputFormat 则调用方法 getrecordreader() 生成 RecordReader,RecordReader 再通过 creatkey() creatvalue() 方法创建可供 map 处理的 <key,value> 对 简而言之,InputFormat() 方法是用来生成可供 map 处理的 <key,value> 对的 Hadoop 预定义了多种方法将不同类型的输入数据转化为 map 能够处理的 <key,value> 对, 它们都继承自 InputFormat, 分别是 : InputFormat ---BaileyBorweinPlouffe.BbpInputFormat ---ComposableInputFormat ---CompositeInputFormat ---DBInputFormat ---DistSum.Machine.AbstractInputFormat ---FileInputFormat ---CombineFileInputFormat ---KeyValueTextInputFormat ---NLineInputFormat ---SequenceFileInputFormat ---TeraInputFormat ---TextInputFormat 其中 TextInputFormat 是 Hadoop 默认的输入方法, 在 TextInputFormat 中, 每个文件 ( 或其一部分 ) 都会单独地作为 map 的输入, 而这个是继承自 FileInputFormat 的 之后, 每行数据都会生成一条记录, 每条记录则表示成 <key,value> 形式 : key 值是每个数据的记录在数据分片中字节偏移量, 数据类型是 LongWritable; value 值是每行的内容, 数据类型是 Text (2)OutputFormat 每一种输入格式都有一种输出格式与其对应 默认的输出格式是 TextOutputFormat, 这种输出方式与输入类似, 会将每条记录以一行的形式存入文本文件 不过, 它的键和值可以是任意形式的, 因为程序内容会调用 tostring() 方法将键和值转换为 String 类型再输出 3)Map 类中 map 方法分析 public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable( 1 ); private Text word = new Text(); 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 9

public void map(longwritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.tostring(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasmoretokens()) word.set(tokenizer.nexttoken()); output.collect(word, one); Map 类继承自 MapReduceBase, 并且它实现了 Mapper 接口, 此接口是一个规范类型, 它有 4 种形式的参数, 分别用来指定 map 的输入 key 值类型 输入 value 值类型 输出 key 值类型和输出 value 值类型 在本例中, 因为使用的是 TextInputFormat, 它的输出 key 值是 LongWritable 类型, 输出 value 值是 Text 类型, 所以 map 的输入类型为 <LongWritable,Text> 在本例中需要输出 <word,1> 这样的形式, 因此输出的 key 值类型是 Text, 输出的 value 值类型是 IntWritable 实现此接口类还需要实现 map 方法,map 方法会具体负责对输入进行操作, 在本例中, map 方法对输入的行以空格为单位进行切分, 然后使用 OutputCollect 收集输出的 <word,1> 4)Reduce 类中 reduce 方法分析 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> public void reduce(text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException int sum = 0 ; while (values.hasnext()) sum += values.next().get(); output.collect(key, new IntWritable(sum)); Reduce 类也是继承自 MapReduceBase 的, 需要实现 Reducer 接口 Reduce 类以 map 的输出作为输入, 因此 Reduce 的输入类型是 <Text,Intwritable> 而 Reduce 的输出是单词 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 10

和它的数目, 因此, 它的输出类型是 <Text,IntWritable> Reduce 类也要实现 reduce 方法, 在 此方法中,reduce 函数将输入的 key 值作为输出的 key 值, 然后将获得多个 value 值加起来, 作为输出的值 1.3.3 新的 WordCount 分析 1) 源代码程序 package org.apache.hadoop.examples; import java.io.ioexception; import java.util.stringtokenizer; import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.genericoptionsparser; public class WordCount public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(object key, Text value, Context context ) throws IOException, InterruptedException StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasmoretokens()) word.set(itr.nexttoken()); context.write(word, one); public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 11

private IntWritable result = new IntWritable(); public void reduce(text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key, result); public static void main(string[] args) throws Exception Configuration conf = new Configuration(); String[] otherargs = new GenericOptionsParser(conf, args).getremainingargs(); if (otherargs.length!= 2) System.err.println("Usage: wordcount <in> <out>"); System.exit(2); Job job = new Job(conf, "word count"); job.setjarbyclass(wordcount.class); job.setmapperclass(tokenizermapper.class); job.setcombinerclass(intsumreducer.class); job.setreducerclass(intsumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0 : 1); 1)Map 过程 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(object key, Text value, Context context ) throws IOException, InterruptedException 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 12

StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasmoretokens()) word.set(itr.nexttoken()); context.write(word, one); Map 过程需要继承 org.apache.hadoop.mapreduce 包中 Mapper 类, 并重写其 map 方法 通过在 map 方法中添加两句把 key 值和 value 值输出到控制台的代码, 可以发现 map 方法中 value 值存储的是文本文件中的一行 ( 以回车符为行结束标记 ), 而 key 值为该行的首字母相对于文本文件的首地址的偏移量 然后 StringTokenizer 类将每一行拆分成为一个个的单词, 并将 <word,1> 作为 map 方法的结果输出, 其余的工作都交有 MapReduce 框架处理 2)Reduce 过程 public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> private IntWritable result = new IntWritable(); public void reduce(text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key, result); Reduce 过程需要继承 org.apache.hadoop.mapreduce 包中 Reducer 类, 并重写其 reduce 方法 Map 过程输出 <key,values> 中 key 为单个单词, 而 values 是对应单词的计数值所组成的列表,Map 的输出就是 Reduce 的输入, 所以 reduce 方法只要遍历 values 并求和, 即可得到某个单词的总次数 3) 执行 MapReduce 任务 public static void main(string[] args) throws Exception Configuration conf = new Configuration(); String[] otherargs = new GenericOptionsParser(conf, args).getremainingargs(); if (otherargs.length!= 2) System.err.println("Usage: wordcount <in> <out>"); 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 13

System.exit(2); Job job = new Job(conf, "word count"); job.setjarbyclass(wordcount.class); job.setmapperclass(tokenizermapper.class); job.setcombinerclass(intsumreducer.class); job.setreducerclass(intsumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0 : 1); 在 MapReduce 中, 由 Job 对象负责管理和运行一个计算任务, 并通过 Job 的一些方法对任务的参数进行相关的设置 此处设置了使用 TokenizerMapper 完成 Map 过程中的处理和使用 IntSumReducer 完成 Combine 和 Reduce 过程中的处理 还设置了 Map 过程和 Reduce 过程的输出类型 :key 的类型为 Text,value 的类型为 IntWritable 任务的输出和输入路径则由命令行参数指定, 并由 FileInputFormat 和 FileOutputFormat 分别设定 完成相应任务的参数设定后, 即可调用 job.waitforcompletion() 方法执行任务 1.4 WordCount 处理过程 本节将对 WordCount 进行更详细的讲解 详细执行步骤如下 : 1) 将文件拆分成 splits, 由于测试用的文件较小, 所以每个文件为一个 split, 并将文件 按行分割形成 <key,value> 对, 如图 4-1 所示 这一步由 MapReduce 框架自动完成, 其中偏移 量 ( 即 key 值 ) 包括了回车所占的字符数 (Windows 和 Linux 环境会不同 ) 输入数据 分割结果 Hello World Bye World 分割 <0, Hello World > <12, Bye World > Hello Hadoop Bye Hadoop 分割 <0, Hello Hadoop > <13, Bye Hadoop > 图 4-1 分割过程 2) 将分割好的 <key,value> 对交给用户定义的 map 方法进行处理, 生成新的 <key,value> 对, 如图 4-2 所示 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 14

分割结果 <0, Hello World > <12, Bye World > map() map 方法输出 <World,1> <World,1> <0, Hello Hadoop > <13, Bye Hadoop > map() <Hadoop,1> <Hadoop,1> 图 4-2 执行 map 方法 3) 得到 map 方法输出的 <key,value> 对后,Mapper 会将它们按照 key 值进行排序, 并执 行 Combine 过程, 将 key 至相同 value 值累加, 得到 Mapper 的最终输出结果 如图 4-3 所 示 map 方法输出排序结果 Combine 输出 <World,1> <World,1> Map 端排序 <World,1> <World,1> Combine 过程 <World,2> <Hadoop,1> <Hadoop,1> Map 端排序 <Hadoop,1> <Hadoop,1> Combine 过程 <Hadoop,2> 图 4-3 Map 端排序及 Combine 过程 4)Reducer 先对从 Mapper 接收的数据进行排序, 再交由用户自定义的 reduce 方法进行 处理, 得到新的 <key,value> 对, 并作为 WordCount 的输出结果, 如图 4-4 所示 Combine 输出 排序结果 Reduce 输出 <World,2> <Hadoop,2> Reduce 端排序 <Bye,list(1,1)> <Hadoop,list(2)> <Hello,list(1,1)> <Word,list(2)> reduce() <Bye,2> <Hadoop,2> <Hello,2> <Word,2> 图 4-4 Reduce 端排序及输出结果 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 15

1.5 MapReduce 新旧改变 Hadoop 最新版本的 MapReduce Release 0.20.0 的 API 包括了一个全新的 Mapreduce JAVA API, 有时候也称为上下文对象 新的 API 类型上不兼容以前的 API, 所以, 以前的应用程序需要重写才能使新的 API 发挥其作用 新的 API 和旧的 API 之间有下面几个明显的区别 新的 API 倾向于使用抽象类, 而不是接口, 因为这更容易扩展 例如, 你可以添加一个方法 ( 用默认的实现 ) 到一个抽象类而不需修改类之前的实现方法 在新的 API 中,Mapper 和 Reducer 是抽象类 新的 API 是在 org.apache.hadoop.mapreduce 包 ( 和子包 ) 中的 之前版本的 API 则是放在 org.apache.hadoop.mapred 中的 新的 API 广泛使用 context object( 上下文对象 ), 并允许用户代码与 MapReduce 系统进行通信 例如,MapContext 基本上充当着 JobConf 的 OutputCollector 和 Reporter 的角色 新的 API 同时支持 " 推 " 和 " 拉 " 式的迭代 在这两个新老 API 中, 键 / 值记录对被推 mapper 中, 但除此之外, 新的 API 允许把记录从 map() 方法中拉出, 这也适用于 reducer " 拉 " 式的一个有用的例子是分批处理记录, 而不是一个接一个 新的 API 统一了配置 旧的 API 有一个特殊的 JobConf 对象用于作业配置, 这是一个对于 Hadoop 通常的 Configuration 对象的扩展 在新的 API 中, 这种区别没有了, 所以作业配置通过 Configuration 来完成 作业控制的执行由 Job 类来负责, 而不是 JobClient, 它在新的 API 中已经荡然无存 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 16

2 参考文献 感谢以下文章的编写作者, 没有你们的铺路, 我或许会走得很艰难, 参考不分先后, 贡 献同等珍贵 1 Hadoop 实战 陆嘉恒 机械工业出版社 2 实战 Hadoop 刘鹏 电子工业出版社 3 Hadoop 上运行 WordCount 以及本地调试地址 :http://www.beoop.com/archives/244.html 4 命令行运行 hadoop 实例 wordcount 程序地址 :http://blog.csdn.net/xw13106209/article/details/6862480 5 Hadoop 示例程序 WordCount 运行及详解地址 :http://samuschen.iteye.com/blog/763940 6 Hadoop 的安装与配置及示例 wordcount 的运行地址 :http://wenku.baidu.com/view/41eac9d850e2524de5187ef3.html 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 17

3 打赏小编 编辑简介 : 高级软件工程师 (T5), 河北工业大学硕士研究生, 现在就职于百度在线网络技术 ( 北京 ) 有限公司 专注于大数据以及其相关研究, 在离线计算和实时计算方面有较为深入的研究, 积累了丰富的实战经验 热衷于知识分享, 其细细品味系列教程深受网友喜爱 姓名 : 解耀伟 笔名 : 虾皮 扣扣 :461052034 网站 :www.xiapistudio.com 博客 :http://www.cnblogs.com/xia520pi/ 邮箱 :xieyaowei1986@163.com 从高考复习开始养成了总结的习惯, 习惯于在学习的过程中, 把相关的文章融会贯通, 并加以实践, 结合自己的实际情况把相关的内容整理成册, 便于学习和总结 在这几年里陆陆续续分享了很多细细品味系列文章 每一期文章都耗费了不少的心血, 很多时候都是在星期天业余的时间完成, 现在也建立了自己独立的网站 :www.xiapistudio.com, 需要一些资金来维持, 同时也可以鼓励我写更多的好东西来分享 如果你看了本文章对自己有用, 可以通过支付宝的形式来进行打赏,1 元 2 元 10 元皆可, 多少并不重要, 只要你感觉文章使你受益即可 温馨提示 : 在转账时, 可以写明 打赏虾皮 或者 打赏虾皮工作室 我的支付宝已 经进行实名认证, 支付宝是的个人头像, 请认准后再支付 中国 北京 虾皮工作室 (www.xiapistudio.com) 编辑 : 虾皮 18