三种方法实现Hadoop(MapReduce)全局排序(1)

Similar documents
三种方法实现Hadoop(MapReduce)全局排序(2)

使用MapReduce读取XML文件

通过Hive将数据写入到ElasticSearch

Guava学习之Resources

Java ¿ª·¢ 2.0: Óà Hadoop MapReduce ½øÐдóÊý¾Ý·ÖÎö

Hadoop 集 群 ( 第 6 期 ) WordCount 运 行 详 解 1 MapReduce 理 论 简 介 1.1 MapReduce 编 程 模 型 MapReduce 采 用 分 而 治 之 的 思 想, 把 对 大 规 模 数 据 集 的 操 作, 分 发 给 一 个 主 节 点 管

在Spring中使用Kafka:Producer篇

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

使用Cassandra和Spark 2.0实现Rest API服务

Apache CarbonData集群模式使用指南

Hive:用Java代码通过JDBC连接Hiveserver

Spark读取Hbase中的数据


7521,WARD,SALESMAN,7698,22-2 月 -81,1250,500, ,JONES,MANAGER,7839,02-4 月 -81,2975,, ,MARTIN,SALESMAN,7698,28-9 月 -81,1250,1400, ,BLAK

关于天云趋势 天云趋势由宽带资本和趋势科技共同投资成立于 2010 年 3 月 趋势科技是 Hadoop 的重度使用者 : 2006 年开始使用, 用于处理网页和邮件服务器评级 五个数据中心, 近 1000 个节点, 最大集群约 500 台服务器 日均处理 3.6T 日志数据 亚洲最早, 也是最大的

Flume-ng与Mysql整合开发

JavaIO.PDF

目录 1 本期内容 MapReduce 理论简介 MapReduce 编程模型 MapReduce 处理过程 运行 WordCount 程序 准备工作 运行例子

编写简单的Mapreduce程序并部署在Hadoop2.2.0上运行

Flink快速上手(QuickStart)

chp6.ppt

使用Spark SQL读取Hive上的数据

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

Hadoop元数据合并异常及解决方法

前 言

韶关:神奇丹霞

哼, 你 們 不 回 答 又 怎 麼 樣? 不 管 是 多 大 來 頭, 現 在 都 被 血 魔 吞 噬 無 蹤 了 你 們 幾 個 真 是 太 過 分, 我 不 犯 你 們, 你 們 卻 一 天 到 晚 來 挑 釁 我 教 尊 冷 笑 著 說 道 嗚, 大 人 土 地 大 姐 跪 下 來, 流 下

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

Java Access 5-1 Server Client Client Server Server Client 5-2 DataInputStream Class java.io.datainptstream (extends) FilterInputStream InputStream Obj

伊春:醉人林都

Hive几种数据导入方式

EJB-Programming-4-cn.doc

六种使用Linux命令发送带附件的邮件

MapReduce

关林:武圣陵寝

泰山:五岳独尊

Microsoft Word - 01.DOC

国内26省市新能源汽车推广规划已出台

三种恢复 HDFS 上删除文件的方法

Kafka客户端是如何找到 leader 分区的

北戴河:海阔天空

Ubuntu和CentOS如何配置SSH使得无密码登陆

使 用 Java 语 言 模 拟 保 险 箱 容 量 门 板 厚 度 箱 体 厚 度 属 性 锁 具 类 型 开 保 险 箱 关 保 险 箱 动 作 存 取 款

目录 1 本期内容 Hadoop 开发环境简介 Hadoop 集群简介 Windows 开发简介 Hadoop Eclipse 简介和使用 Eclipse 插件介绍 Hadoo

<4D F736F F D20C8EDC9E82DCFC2CEE7CCE22D3039C9CF>


java2d-4.PDF

西岭雪山滑雪场

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

HBase 中加盐(Salting)之后的表如何读取:协处理器篇

《大话设计模式》第一章

内 容 简 介 本 书 是 一 本 关 于 语 言 程 序 设 计 的 教 材, 涵 盖 了 语 言 的 基 本 语 法 和 编 程 技 术, 其 中 包 含 了 作 者 对 语 言 多 年 开 发 经 验 的 总 结, 目 的 是 让 初 学 的 读 者 感 受 到 语 言 的 魅 力, 并 掌

chap07.key

在Fedora上部署Hadoop2.2.0伪分布式平台

untitled

OOP with Java 通知 Project 4: 4 月 18 日晚 9 点 关于抄袭 没有分数

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

Microsoft Word - Broker.doc

操作指南产品文档

untitled

Microsoft PowerPoint - hbase_program(0201).ppt

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

江门:中国第一侨乡

是 喔, 就 是 那 個 在 BBS 醫 療 版 跟 你 嗆 聲, 自 稱 有 三 十 多 年 推 拿 經 驗 的 大 叔 嗎? 一 個 看 來 頗 為 清 秀 的 女 生 問 道, 她 語 氣 中 略 感 訝 異 是 啊, 什 麼 推 拿 按 摩 有 多 好, 還 要 人 生 病 盡 量 不 要

教育扩张能改善收入分配差距吗?——来自CHNS2006年数据的证据

山水文化,市井人家——以湖州邱城小镇的概念性规划为例

1.5招募说明书(草案)

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

2015 年 度 收 入 支 出 决 算 总 表 单 位 名 称 : 北 京 市 朝 阳 区 卫 生 局 单 位 : 万 元 收 入 支 出 项 目 决 算 数 项 目 ( 按 功 能 分 类 ) 决 算 数 一 财 政 拨 款 一 一 般 公 共 服 务 支 出 二

目 录 第 一 部 分 档 案 局 概 况 一 主 要 职 责 二 部 门 决 算 单 位 构 成 第 二 部 分 档 案 局 2016 年 度 部 门 预 算 表 一 2016 年 度 市 级 部 门 收 支 预 算 总 表 二 2016 年 度 市 级 部 门 支 出 预 算 表 三 2016

javaexample-02.pdf

信 息 披 露 义 务 人 声 明 1 信 息 披 露 义 务 人 依 据 中 华 人 民 共 和 国 公 司 法 中 华 人 民 共 和 国 证 券 法 上 市 公 司 收 购 管 理 办 法 公 开 发 行 证 券 公 司 信 息 披 露 内 容 与 格 式 准 则 第 15 号 权 益 变 动

, (, ),,,,,, : : ( ), :,,,,,,, ( ), ( ),,,,,, ( ) ( ),, :!,,,,,,,,,,,,,,,,,,,,,,, [1 ] :,,,, :, ;, ( ),, :,,,,,,,,,,, 66

Cover-3.indd, page Normalize


人 間 菩 提 Part 1 人 間 菩 提 Part 2 清 涼 菩 提 正 覺 修 行 清 心 發 願 自 重 ----

FP.pdf

2009年3月全国计算机等级考试二级Java语言程序设计笔试试题

OOP with Java 通知 Project 3: 3 月 29 日晚 9 点 4 月 1 日上课

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

SparkR(R on Spark)编程指南

Hadoop 集群 ( 第 9 期 ) MapReduce 初级案例 1 数据去重 数据去重 主要是为了掌握和利用并行化思想来对数据进行有意义的筛选 统计大 数据集上的数据种类个数 从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据 去重 下面就进入这个实例的 MapReduce 程序设计 1.

Microsoft Word - ch04三校.doc

untitled

OOP with Java 通知 Project 4: 4 月 19 日晚 9 点

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

Hadoop 2.2.0安装和配置lzo

北 风 网 讲 师 原 创 作 品 ---- 仅 供 学 员 内 部 交 流 使 用 前 言 吾 尝 终 日 而 思 矣, 不 如 须 臾 之 所 学 也 ; 吾 尝 跂 而 望 矣, 不 如 登 高 之 博 见 也 登 高 而 招, 臂 非 加 长 也, 而 见

untitled

EJB-Programming-3.PDF

Swing-02.pdf

迅速在两个含有大量数据的文件中寻找相同的数据

OOP with Java 通知 Project 4: 5 月 2 日晚 9 点

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

untitled

PowerPoint Presentation

SDK 概要 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk" 获取不同版本的 Java SDK: 包名 odps-sdk-core odps-sdk-commons odps-sdk-udf odps-sdk-mapred odps-sdk-graph 描述 ODPS 基

将 MySQL 的全量数据导入到 Apache Solr 中

untitled

Transcription:

三种方法实现 Hadoop(MapReduce) 全局排序 () 三种方法实现 Hadoop(MapReduce) 全局排序 () 我们可能会有些需求要求 MapReduce 的输出全局有序, 这里说的有序是指 Key 全局有序 但是我们知道,MapReduce 默认只是保证同一个分区内的 Key 是有序的, 但是不保证全局有序 基于此, 本文提供三种方法来对 MapReduce 的输出进行全局排序 生成测试数据 在介绍如何实现之前, 我们先来生成一些测试数据, 实现如下 : #!/bin/sh for i in {..;do echo $RANDOM done; 将上面的代码保存到 iteblog.sh 的文件里面, 然后运行 : $ sh iteblog.sh > data $ sh iteblog.sh > data2 $ hadoop fs -put data /user/iteblog/input $ hadoop fs -put data2 /user/iteblog/input $RANDOM 变量是 Shell 内置的, 使用它能够生成五位内的随机正整数 上面我们一共运行了两次, 这样我们就有两份随机数文件 data 和 data2; 最后我们把生成的随机数文件上传到 HDFS 上 现在我们可以来写程序对这两个文件里面的数据进行排序了 使用一个 Reduce 进行排序 前面我们说了,MapReduce 默认只是保证同一个分区内的 Key 是有序的, 但是不保证全局有序 如果我们将所有的数据全部发送到一个 Reduce, 那么不就可以实现结果全局有序吗? 这种方法实现很简单, 如下 : package com.iteblog.mapreduce.sort; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; / 9

三种方法实现 Hadoop(MapReduce) 全局排序 () import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import java.io.ioexception; public class TotalSortV extends Configured implements Tool { static class SimpleMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { protected void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable intwritable = new IntWritable(Integer.parseInt(value.toString())); context.write(intwritable, intwritable); static class SimpleReducer extends Reducer<IntWritable, IntWritable, IntWritable, NullWritable> { protected void reduce(intwritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(value, NullWritable.get()); public int run(string[] args) throws Exception { if (args.length!= 2) { System.err.println("<input> <output>"); System.exit(27); Job job = Job.getInstance(getConf()); job.setjarbyclass(totalsortv.class); FileInputFormat.addInputPath(job, new Path(args[])); FileOutputFormat.setOutputPath(job, new Path(args[])); 2 / 9

三种方法实现 Hadoop(MapReduce) 全局排序 () job.setmapperclass(simplemapper.class); job.setreducerclass(simplereducer.class); job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(nullwritable.class); job.setnumreducetasks(); job.setjobname("totalsort"); return job.waitforcompletion(true)? : ; public static void main(string[] args) throws Exception { int exitcode = ToolRunner.run(new TotalSort(), args); System.exit(exitCode); 上面程序的实现很简单, 我们直接使用 TextInputFormat 类来读取上面生成的随机数文件 (data 和 data2) 因为文件里面的数据是正整数, 所以我们在 SimpleMapper 类里面直接将 value 转换成 int 类型, 然后赋值给 IntWritable 等数据到 SimpleReducer 的时候, 同一个 Reduce 里面的 Key 已经全部有序 ; 因为我们设置了一个 Reduce 作业, 这样的话, 我们就实现了数据全局有序 运行如下 : [iteblog@www.iteblog.com /home/iteblog]$ hadoop jar totalsort-..jar com.iteblog.mapreduce.sort.totalsortv /user/iteblog/input /user/iteblog/output [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output Found 2 items -rw-r--r-- 3 iteblog supergroup 27-5-9 :4 /user/iteblog/output/_success -rw-r--r-- 3 iteblog supergroup 3757 27-5-9 :4 /user/iteblog/output/part-r- [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- head -n 3 / 9

三种方法实现 Hadoop(MapReduce) 全局排序 () [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- tail -n 从上面的测试结果也可以看出, 我们只生成了一个数据文件, 而且这个文件里面的数据已经全局有序了 自定义分区函数实现全局有序 上面实现数据全局有序有个很大的局限性 : 所有的数据都发送到一个 Reduce 进行排序, 这样不能充分利用集群的计算资源, 而且在数据量很大的情况下, 很有可能会出现 OOM 问题 我们分析一下,MapReduce 默认的分区函数是 HashPartitioner, 其实现的原理是计算 map 输出 key 的 hashcode, 然后对 Reduce 个数求模, 这样只要求模结果一样的 Key 都会发送到同一个 Reduce 如果我们能够实现一个分区函数, 使得 所有 Key 所有 其余的 Key 都发送到 Reduce 2; 这就实现了 Reduce 的数据一定全部小于 Reduce, 且 Reduce 的数据全部小于 Reduce 2, 再加上同一个 Reduce 里面的数据局部有序, 这样就实现了数据的全局有序 实现如下 : package com.iteblog.mapreduce.sort; import com.iteblog.mapreduce.secondsort.intpair; import org.apache.hadoop.conf.configured; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.nullwritable; 4 / 9

三种方法实现 Hadoop(MapReduce) 全局排序 () import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.partitioner; import org.apache.hadoop.mapreduce.reducer; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.util.tool; import org.apache.hadoop.util.toolrunner; import java.io.ioexception; public class TotalSortV2 extends Configured implements Tool { static class SimpleMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { protected void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable intwritable = new IntWritable(Integer.parseInt(value.toString())); context.write(intwritable, intwritable); static class SimpleReducer extends Reducer<IntWritable, IntWritable, IntWritable, NullWrita ble> { protected void reduce(intwritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable value : values) context.write(value, NullWritable.get()); public static class IteblogPartitioner extends Partitioner<IntWritable, IntWritable> { public int getpartition(intwritable key, IntWritable value, int numpartitions) { int keyint = Integer.parseInt(key.toString()); if (keyint < ) { return ; else if (keyint < 2) { return ; else { return 2; 5 / 9

三种方法实现 Hadoop(MapReduce) 全局排序 () public int run(string[] args) throws Exception { if (args.length!= 2) { System.err.println("<input> <output>"); System.exit(27); Job job = Job.getInstance(getConf()); job.setjarbyclass(totalsortv2.class); FileInputFormat.addInputPath(job, new Path(args[])); FileOutputFormat.setOutputPath(job, new Path(args[])); job.setmapperclass(simplemapper.class); job.setreducerclass(simplereducer.class); job.setpartitionerclass(iteblogpartitioner.class); job.setmapoutputkeyclass(intwritable.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(nullwritable.class); job.setnumreducetasks(3); job.setjobname("dw_subject"); return job.waitforcompletion(true)? : ; public static void main(string[] args) throws Exception { int exitcode = ToolRunner.run(new TotalSortV2(), args); System.exit(exitCode); 第二版的排序实现除了自定义的 IteblogPartitioner, 其余的和第一种实现一样 现在我们来运行一下 : [iteblog@www.iteblog.com /home/iteblog]$ hadoop jar totalsort-..jar com.iteblog.mapreduce.sort.totalsortv2 /user/iteblog/input /user/iteblog/output [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output Found 4 items -rw-r--r-- 3 iteblog supergroup 27-5-9 3:53 /user/iteblog/output/_success -rw-r--r-- 3 iteblog supergroup 299845 27-5-9 3:53 /user/iteblog/output/partr- -rw-r--r-- 3 iteblog supergroup 3659 27-5-9 3:53 /user/iteblog/output/partr- 6 / 9

三种方法实现 Hadoop(MapReduce) 全局排序 () 466722 27-5-9 3:53 /user/iteblog/output/part- -rw-r--r-- 3 iteblog supergroup r-2 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- head -n [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- tail -n 9998 9998 9998 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- head -n [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr- tail -n 9997 9997 7 / 9

三种方法实现 Hadoop(MapReduce) 全局排序 () 9998 9998 9998 9998 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr-2 head -n 2 2 2 2 2 2 2 2 2 2 [iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/partr-2 tail -n 我们已经看到了这个程序生成了三个文件 ( 因为我们设置了 Reduce 个数为 3), 而且每个文件都是局部有序 ; 所有小于 的数据都在 part-r- 里面, 所有小于 2 的数据都在 part-r- 里面, 所有大于 2 的数据都在 part-r-2 里面 part-r- part-r- 和 partr-2 三个文件实现了全局有序 这个方法也实现了数据的全局有序, 但是也有一些问题, 明天我再写一篇文章介绍第三种数据全局排序的方法 8 / 9

Powered by TCPDF (www.tcpdf.org) 三种方法实现 Hadoop(MapReduce) 全局排序 () 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 9 / 9