Hadoop&Spark解决二次排序问题(Hadoop篇)

Similar documents
使用MapReduce读取XML文件

三种方法实现Hadoop(MapReduce)全局排序(1)

Flume-ng与Mysql整合开发

Apache CarbonData集群模式使用指南

Guava学习之Resources

三种方法实现Hadoop(MapReduce)全局排序(2)

Kafka客户端是如何找到 leader 分区的

使用Cassandra和Spark 2.0实现Rest API服务

Spark读取Hbase中的数据

在Spring中使用Kafka:Producer篇

通过Hive将数据写入到ElasticSearch

Hive:用Java代码通过JDBC连接Hiveserver

Java ¿ª·¢ 2.0: Óà Hadoop MapReduce ½øÐдóÊý¾Ý·ÖÎö

使用Apache Beam读写HDFS上的文件

新・解きながら学ぶJava

如何在 Apache Hive 中解析 Json 数组

Guava学习之CharSequenceReader

使用Spark SQL读取Hive上的数据

使用Hive读取ElasticSearch中的数据

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

Hadoop元数据合并异常及解决方法

关于天云趋势 天云趋势由宽带资本和趋势科技共同投资成立于 2010 年 3 月 趋势科技是 Hadoop 的重度使用者 : 2006 年开始使用, 用于处理网页和邮件服务器评级 五个数据中心, 近 1000 个节点, 最大集群约 500 台服务器 日均处理 3.6T 日志数据 亚洲最早, 也是最大的

JavaIO.PDF

西岭雪山滑雪场

韶关:神奇丹霞

哼, 你 們 不 回 答 又 怎 麼 樣? 不 管 是 多 大 來 頭, 現 在 都 被 血 魔 吞 噬 無 蹤 了 你 們 幾 個 真 是 太 過 分, 我 不 犯 你 們, 你 們 卻 一 天 到 晚 來 挑 釁 我 教 尊 冷 笑 著 說 道 嗚, 大 人 土 地 大 姐 跪 下 來, 流 下

エスポラージュ株式会社 住所 : 東京都江東区大島 東急ドエルアルス大島 HP: ******************* * 关于 Java 测试试题 ******

SparkR(R on Spark)编程指南

Microsoft Word - 01.DOC

伊春:醉人林都


Hadoop 集 群 ( 第 6 期 ) WordCount 运 行 详 解 1 MapReduce 理 论 简 介 1.1 MapReduce 编 程 模 型 MapReduce 采 用 分 而 治 之 的 思 想, 把 对 大 规 模 数 据 集 的 操 作, 分 发 给 一 个 主 节 点 管

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

Java Access 5-1 Server Client Client Server Server Client 5-2 DataInputStream Class java.io.datainptstream (extends) FilterInputStream InputStream Obj

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

Flink on YARN部署快速入门指南

Flink快速上手(QuickStart)

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

关林:武圣陵寝

泰山:五岳独尊

国内26省市新能源汽车推广规划已出台

PowerPoint 演示文稿

EJB-Programming-4-cn.doc

Microsoft Word - ch04三校.doc

北戴河:海阔天空

六种使用Linux命令发送带附件的邮件


EJB-Programming-3.PDF

Ubuntu和CentOS如何配置SSH使得无密码登陆

09 (File Processes) (mkdir) 9-3 (createnewfile) 9-4 (write) 9-5 (read) 9-6 (deletefile) 9-7 (deletedir) (Exercises)

Spark作业代码(源码)IDE远程调试

主程式 : public class Main3Activity extends AppCompatActivity { ListView listview; // 先整理資料來源,listitem.xml 需要傳入三種資料 : 圖片 狗狗名字 狗狗生日 // 狗狗圖片 int[] pic =new

javaexample-02.pdf

前言 C# C# C# C C# C# C# C# C# microservices C# More Effective C# More Effective C# C# C# C# Effective C# 50 C# C# 7 Effective vii

Hive几种数据导入方式

Chapter 9: Objects and Classes

IoC容器和Dependency Injection模式.doc

Apache Spark 2.4 新增内置函数和高阶函数使用介绍

untitled

自定义Spark Streaming接收器(Receivers)

投影片 1

Java

D getinitparameternames() 9 下 列 选 项 中, 属 于 Servlet API 中 提 供 的 request 对 象 的 包 装 类 的 是 ( ) A HttpServletRequestWrapper B HttpServletRequest C HttpServ

( 总 第 1073 期 ) 浙 江 省 人 民 政 府 主 办 2015 年 3 月 17 日 出 版 省 政 府 令 省 政 府 文 件 目 录 浙 江 省 大 型 群 众 性 活 动 安 全 管 理 办 法 ( 浙 江 省 人 民 政 府 令 第 333 号 ) (3) 浙 江 省 人 民 政

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

在Fedora上部署Hadoop2.2.0伪分布式平台

chp6.ppt

zxj

<443A5CCED2B5C4D7CAC1CF5CD7C0C3E65CB9D8D3DAC3FCC3FB C4EAB6C8CAA1C7E0C4EACEC4C3F7BAC5A1A2CAA1C7E0C4EACEC4C3F7BAC5B1EAB1F8BACDCAA1C7E0C4EACEC4C3F7BAC5CFC8BDF8B9A4D7F7D5DFB5C4BEF6B6A8C5C55CA3A830372E3038A3A9B9D8D3DAC3FCC3FB C4EAB

江门:中国第一侨乡

是 喔, 就 是 那 個 在 BBS 醫 療 版 跟 你 嗆 聲, 自 稱 有 三 十 多 年 推 拿 經 驗 的 大 叔 嗎? 一 個 看 來 頗 為 清 秀 的 女 生 問 道, 她 語 氣 中 略 感 訝 異 是 啊, 什 麼 推 拿 按 摩 有 多 好, 還 要 人 生 病 盡 量 不 要

1.5招募说明书(草案)

教育扩张能改善收入分配差距吗?——来自CHNS2006年数据的证据

山水文化,市井人家——以湖州邱城小镇的概念性规划为例

目 录 第 一 部 分 档 案 局 概 况 一 主 要 职 责 二 部 门 决 算 单 位 构 成 第 二 部 分 档 案 局 2016 年 度 部 门 预 算 表 一 2016 年 度 市 级 部 门 收 支 预 算 总 表 二 2016 年 度 市 级 部 门 支 出 预 算 表 三 2016

2015 年 度 收 入 支 出 决 算 总 表 单 位 名 称 : 北 京 市 朝 阳 区 卫 生 局 单 位 : 万 元 收 入 支 出 项 目 决 算 数 项 目 ( 按 功 能 分 类 ) 决 算 数 一 财 政 拨 款 一 一 般 公 共 服 务 支 出 二


OOP with Java 通知 Project 4: 4 月 18 日晚 9 点 关于抄袭 没有分数

OOP with Java 通知 Project 4: 4 月 19 日晚 9 点

无类继承.key

前 言

Microsoft Word - 第3章.doc

HBase 中加盐(Salting)之后的表如何读取:协处理器篇

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

Fun Time (1) What happens in memory? 1 i n t i ; 2 s h o r t j ; 3 double k ; 4 char c = a ; 5 i = 3; j = 2; 6 k = i j ; H.-T. Lin (NTU CSIE) Referenc

ABOUT ME AGENDA 唐建法 / TJ MongoDB 高级方案架构师 MongoDB 中文社区联合发起人 Spark 介绍 Spark 和 MongoDB 案例演示

三种恢复 HDFS 上删除文件的方法

untitled

将 MySQL 的全量数据导入到 Apache Solr 中

OOP with Java 通知 Project 4: 5 月 2 日晚 9 点

鼓浪屿:懒得艳遇

使 用 Java 语 言 模 拟 保 险 箱 容 量 门 板 厚 度 箱 体 厚 度 属 性 锁 具 类 型 开 保 险 箱 关 保 险 箱 动 作 存 取 款

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

untitled

MapReduce

Flink快速上手之Scala API使用

<4D F736F F F696E74202D20332D322E432B2BC3E6CFF2B6D4CFF3B3CCD0F2C9E8BCC6A1AAD6D8D4D8A1A2BCCCB3D0A1A2B6E0CCACBACDBEDBBACF2E707074>

Transcription:

Hadoop&Spark 解决二次排序问题 (Spark 篇 ) 问题描述 二次排序就是 key 之间有序, 而且每个 Key 对应的 value 也是有序的 ; 也就是对 MapReduce 的输出 (KEY, Value(v 1,v 2,v 3,...,v n )) 中的 Value(v 1,v 2,v 3,...,v n ) 值进行排序 ( 升序或者降序 ), 使得 Value(s 1,s 2,s 3,...,s n ),s i (v 1,v 2,v 3,...,v n ) 且 s 1 < s 2 < s 3 <... < s n 假设我们有以下输入文件 ( 逗号分割的分别是年, 月, 总数 ): [root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0 我们期望的输出结果是 2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5 但是 Hadoop 默认的输出结果只能对 Key 进行排序, 其中 Value 中的值次序是不定的 ; 也就是说,Hadoop 默认的输出可能如下 : 1 / 9

2014-2 64 2015-1 21,4,3,24 2015-2 0,35,-43 2015-3 46,56 2015-4 5 解决方案 针对这个问题我们有两种方法来解决 :(1) 将每个 Key 对应的 Value 全部存储到内存 ( 这个只会存储到单台机器 ), 然后对这些 Value 进行相应的排序 但是如果 Value 的数据量非常大, 导致单台内存无法存储这些数据, 这将会导致程序出现 java.lang.outofmemoryerror, 所以这个方法不是很通用 (2) 这种方法将 Value 中的值和旧的 Key 组成一个新的 Key, 这样我们就可以利用 Reduce 来排序这个 Key, 其生成的结果就是我们需要的 过程如下 : 1 原始的键值对是 (k,v) 这里的 k 就是就的 key, 也可以称为 natural key; 2 我们可以将 k 和 v 组合成新的 key( 可以称为 composite key), 也就是 ((k,v), v) 3 自定义分区函数, 将 k 相同的键值对发送到同一个 Reduce 中 ; 4 自定义分组函数, 将 k 相同的键值对当作一个分组 文字比较枯燥, 我们来看看下面实例 : 1 原始数据是 [root@iteblog.com /tmp]# vim data.txt 2015,1,24 2015,3,56 2015,1,3 2015,2,-43 2015,4,5 2015,3,46 2014,2,64 2015,1,4 2015,1,21 2015,2,35 2015,2,0 我们将年 月组成 key(natural key), 总数作为 value, 结果变成 : (2015-1,24) (2015-3,56) (2015-1,3) (2015-2,-43) (2015-4,5) 2 / 9

(2015-3,46) (2014-2,64) (2015-1,4) (2015-1,21) (2015-2,35) (2015-2,0) 2 将 value 和 key(natural key) 组成新的 key(composite key), 如下 : ((2015-1,24),24) ((2015-3,56),56) ((2015-1,3),3) ((2015-2,-43),-43) ((2015-4,5),5) ((2015-3,46),46) ((2014-2,64),64) ((2015-1,4),4) ((2015-1,21),21) ((2015-2,35),35) ((2015-2,0),0) 3 自定义分区函数, 将 k 相同的键值对发送到同一个 Reduce 中, 结果如下 : [((2014-2,64),64)] [((2015-1,24),24),((2015-1,3),3),((2015-1,4),4),((2015-1,21),21)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,56),56),((2015-3,46),46)] [((2015-4,5),5)] 4 自定义组排序函数, 结果如下 : [((2014-2,64),64)] [((2015-1,3),3),((2015-1,4),4),((2015-1,21),21),((2015-1,24),24)] [((2015-2,-43),-43),((2015-2,0),0),((2015-2,35),35)] [((2015-3,46),46),((2015-3,56),56)] [((2015-4,5),5)] 3 / 9

5 自定义分组函数, 结果如下 : ((2014-2,64),(64)) ((2015-1,24),(3,4,21,24)) ((2015-2,35),(-43,0,35)) ((2015-3,56),(46,56)) ((2015-4,5),(5)) 6 最后输出的结果就是我们要的 : 2014-2 64 2015-1 3,4,21,24 2015-2 -43,0,35 2015-3 46,56 2015-4 5 代码实例 下面将贴出使用 MapReduce 解决这个问题的代码 : package com.iteblog; import org.apache.hadoop.io.writablecomparable; import java.io.datainput; import java.io.dataoutput; import java.io.ioexception; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午 23:49 * bolg: https://www.iteblog.com * 本文地址 :https://www.iteblog.com/archives/1415 * 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 * 过往记忆博客微信公共帐号 :iteblog_hadoop */ public class Entry implements WritableComparable<Entry> { 4 / 9

private String yearmonth; private int count; public Entry() { public int compareto(entry entry) { int result = this.yearmonth.compareto(entry.getyearmonth()); if (result == 0) { result = compare(count, entry.getcount()); return result; public void write(dataoutput dataoutput) throws IOException { dataoutput.writeutf(yearmonth); dataoutput.writeint(count); public void readfields(datainput datainput) throws IOException { this.yearmonth = datainput.readutf(); this.count = datainput.readint(); public String getyearmonth() { return yearmonth; public void setyearmonth(string yearmonth) { this.yearmonth = yearmonth; public int getcount() { return count; public void setcount(int count) { this.count = count; public static int compare(int a, int b) { return a < b? -1 : (a > b? 1 : 0); 5 / 9

public String tostring() { return yearmonth; 上面就是将旧的 Key(natural key) 和 Value 组合成新的 Key(composite key) 的代码, 接下来看下自定义的分区类 : package com.iteblog; import org.apache.hadoop.mapreduce.partitioner; public class EntryPartitioner extends Partitioner<Entry, Integer> { public int getpartition(entry entry, Integer integer, int numberpartitions) { return Math.abs((entry.getYearMonth().hashCode() % numberpartitions)); 这个类使得 natural key 相同的数据分派到同一个 Reduce 中 然后看下自定义分组类 : package com.iteblog; import org.apache.hadoop.io.writablecomparable; import org.apache.hadoop.io.writablecomparator; /** * User: 过往记忆 * Date: 2015-08-05 * Time: 下午 23:49 * bolg: https://www.iteblog.com * 本文地址 :https://www.iteblog.com/archives/1415 * 过往记忆博客, 专注于 hadoop hive spark shark flume 的技术博客, 大量的干货 * 过往记忆博客微信公共帐号 :iteblog_hadoop */ public class EntryGroupingComparator extends WritableComparator { public EntryGroupingComparator() { 6 / 9

super(entry.class, true); public int compare(writablecomparable a, WritableComparable b) { Entry a1 = (Entry) a; Entry b1 = (Entry) b; return a1.getyearmonth().compareto(b1.getyearmonth()); 只要是 natural key 相同, 我们就认为是同一个分组, 这样 Reduce 内部才可以对 Value 中的值进行排序 接下来看下 Map 类 public class SecondarySortMapper extends Mapper<LongWritable, Text, Entry, Text> { private Entry entry = new Entry(); private Text value = new Text(); protected void map(longwritable key, Text lines, Context context) throws IOException, InterruptedException { String line = lines.tostring(); String[] tokens = line.split(","); // YYYY = tokens[0] // MM = tokens[1] // count = tokens[2] String yearmonth = tokens[0] + "-" + tokens[1]; int count = Integer.parseInt(tokens[2]); entry.setyearmonth(yearmonth); entry.setcount(count); value.set(tokens[2]); context.write(entry, value); 其实就是解析每一行的数据, 然后将旧的 Key(natural key) 和 Value 组合成新的 Key(composite key) 接下来看下 Reduce 类实现 7 / 9

public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> { protected void reduce(entry key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder builder = new StringBuilder(); for (Text value : values) { builder.append(value.tostring()); builder.append(","); context.write(key, new Text(builder.toString())); builder 存储的就是排序好的 Value 序列, 最后来看看启动程序的使用 : Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setjarbyclass(iteblog.class); job.setjobname("secondarysort"); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setoutputkeyclass(entry.class); job.setoutputvalueclass(text.class); job.setmapperclass(secondarysortmapper.class); job.setreducerclass(secondarysortreducer.class); job.setpartitionerclass(entrypartitioner.class); job.setgroupingcomparatorclass(entrygroupingcomparator.class); 关键看上面第 12-15 行的代码 下面是运行这个程序的方法和结果 : [root@iteblog.com /hadoop]# bin/hadoop jar /tmp/iteblog-1.0-snapshot.jar com.iteblog.main /iteblog/data.txt /iteblog/output [root@iteblog.com /hadoop]# bin/hadoop fs -cat /iteblog/output/pa* 2014-2 64, 2015-1 3,4,21,24, 8 / 9

Powered by TCPDF (www.tcpdf.org) 2015-2 -43,0,35, 2015-3 46,56, 2015-4 5, 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 9 / 9