Hadoop 的 MapReduce API 提 供 自 動 的 平 行 化 與 工 作 分 配 容 錯 特 性 狀 態 監 控 工 具 一 個 乾 淨 的 抽 象 化 (abstration) 供 程 式 設 計 師 使 用 97



Similar documents
Hadoop 集 群 ( 第 6 期 ) WordCount 运 行 详 解 1 MapReduce 理 论 简 介 1.1 MapReduce 编 程 模 型 MapReduce 采 用 分 而 治 之 的 思 想, 把 对 大 规 模 数 据 集 的 操 作, 分 发 给 一 个 主 节 点 管

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

Day1-hadoop_ _v1

投影片 1

Java ¿ª·¢ 2.0: Óà Hadoop MapReduce ½øÐдóÊý¾Ý·ÖÎö

目录 1 本期内容 MapReduce 理论简介 MapReduce 编程模型 MapReduce 处理过程 运行 WordCount 程序 准备工作 运行例子

关于天云趋势 天云趋势由宽带资本和趋势科技共同投资成立于 2010 年 3 月 趋势科技是 Hadoop 的重度使用者 : 2006 年开始使用, 用于处理网页和邮件服务器评级 五个数据中心, 近 1000 个节点, 最大集群约 500 台服务器 日均处理 3.6T 日志数据 亚洲最早, 也是最大的

三种方法实现Hadoop(MapReduce)全局排序(1)

使用MapReduce读取XML文件

新・解きながら学ぶJava

MapReduce

Hadoop&Spark解决二次排序问题(Hadoop篇)

三种方法实现Hadoop(MapReduce)全局排序(2)

第 期 熊 安 萍 等 *1$ 文 件 系 统 中 范 围 锁 机 制 的 应 用 研 究! 锁 命 名 空 间 '+'($($ 描 述 了 资 源 和 锁 的 集 合 它 同 时 在 客 户 节 点 和 服 务 节 点 存 在 不 同 之 处 只 是 利 用 一 个 数 据 标 识 来 有 效 区

chp6.ppt

中 国 中 西 医 结 合 杂 志 年 月 第 卷 第 期!" 通 透 性 增 加 产 生 蛋 白 水 解 酶 促 进 血 管 内 皮 细 胞 有 丝 分 裂 内 皮 细 胞 从 基 底 膜 上 迁 移 到 血 管 周 围 间 隙 粘 附 聚 集 重 构 为 三 维 管 腔 并 与 周 围 血 管

《安徒生童话》(四)

Microsoft PowerPoint - hbase_program(0201).ppt

Microsoft Word - 01.DOC

7521,WARD,SALESMAN,7698,22-2 月 -81,1250,500, ,JONES,MANAGER,7839,02-4 月 -81,2975,, ,MARTIN,SALESMAN,7698,28-9 月 -81,1250,1400, ,BLAK

!

《饲料和饲料添加剂管理条例》

编写简单的Mapreduce程序并部署在Hadoop2.2.0上运行

4 中 南 大 学 学 报 医 学 版 摘 要 目 的 探 讨 早 发 性 精 神 分 裂 症 患 者 在 静 息 状 态 下 是 否 存 在 脑 功 能 连 接 异 常 以 及 异 常 区 域 的 定 位 方 法 采 用 第 版 美 国 精 神 障 碍 诊 断 与 统 计 手 册 ( * ) (

全国计算机技术与软件专业技术资格(水平)考试

《浮士德》(下)

中 国 中 西 医 结 合 杂 志 年 月 第 卷 第 期!"# $! 症 状 在 诊 断 时 推 荐 应 用 $3 的 症 状 指 数 $!0 " 0 %!2 3% ". )./!0 ) 1/! 5 1! 0 %7$3 6 进 行 基 础 评 估 和 治 疗 监 测 心 理 状 况 的 评 估 可

(\244j\257d\276\307\274\351_ C.indd_70%.pdf)

## $%& %& ## () #) (( * (+++ () #) #) (+ (+ #) #) ( #, - #)). #))- # ( / / / / $ # ( * *..# 4 #$ 3 ( 5 ) ### 4 $ # 5, $ ## # 4 $# 5 ( %

! # % % & # # % #!& % &# % &# % % % # %& ( (!& (! & & % % #!! ) %&! *& % %! % %!! # % %!! %*!& % &# % &# ) ) ( % # # ) % ( (!& (! (!! # % % #!! # ( &!


1 中 华 物 理 医 学 与 康 复 杂 志, - 年 月 第.0 卷 第 期 & + &# * & " (, - ".0 $ 代 康 复 理 念 更 强 调 患 者 主 动 参 与 因 此 笔 者 倾 向 于 采 用 球 囊 主 动 扩 张 术 即 治 疗 时 以 患 者 主 动 参 与 为 主

期 李 海 利 等 猪 接 触 传 染 性 胸 膜 肺 炎 放 线 杆 菌 血 清 型 分 子 鉴 定 及 药 敏 试 验 / 只 产 生 两 种,9: 毒 素 血 清 型 毒 力 的 强 弱 与,9: 毒 素 种 类 有 关 产,9: 和,9: 的 血 清 型 毒 力 最 强 本 研 究 对 临

目录 1 本期内容 Hadoop 开发环境简介 Hadoop 集群简介 Windows 开发简介 Hadoop Eclipse 简介和使用 Eclipse 插件介绍 Hadoo

气 候 与 环 境 研 究 卷 &!' 张 书 余 许 多 学 者 对 人 体 舒 适 度 进 行 了 研 究!!0!! " 对 欧 洲 不 同 国 家 的 城 市 热 舒 适 性 进 行 了 研 究 周 后 福 探 讨 了 气 候 变 化 对 人 体 健 康 的 影 响 吴 兑 ) 进 行 了 多

《沧浪诗话》

09 (File Processes) (mkdir) 9-3 (createnewfile) 9-4 (write) 9-5 (read) 9-6 (deletefile) 9-7 (deletedir) (Exercises)

《安徒生童话》(一)

EJB-Programming-4-cn.doc

国 际 政 治 研 究 年 第 期 一 中 国 国 名 渊 源 暨 中 外 交 流 中 中 国 的 称 谓 一 不 在 乎 国 名 的 王 朝 国 家 世 界 上 绝 大 多 数 国 家 的 国 名 是 在 历 史 上 逐 渐 形 成 的 国 名 具 有 排 他 性 宣 示 一 国 之 主 权 国

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

JavaIO.PDF

期 李 环 等 邻 苯 二 甲 酸 二 丁 酯 暴 露 对 雄 性 大 鼠 生 精 细 胞 功 能 影 响 1 )!# $ + $#'!!) #!%,$' $ 6. $#! +!! '!!' # $! 引 言 - # # 近 年 来 生 殖 健 康 问 题 日 益 突 出 % 不 孕 不 育 等 各

. "#

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

./

% 缓 解 患 者 的 心 理 障 碍 或 问 题, 促 进 其 人 格 向 健 康 治 疗 协 调 的 方 向 发 展 精 神 分 析 学 派 心 理 治 疗 起 源 于 弗 洛 依 德 ( ) 于 世 早 期 为 弗 洛 依 德 创 立 的 经 典 精 神 分 析 弗 洛 纪 末 创 始 的 精

社 会 妇 也 有 到 夫 家 守 志 的 情 况 目 前 各 地 现 存 的 大 量 贞 节 牌 坊 和 史 书 中 连 篇 累 牍 的 节 妇 传 就 是 当 时 历 史 的 真 实 反 映 但 是 在 历 史 上, 现 实 生 活 中 的 寡 妇 守 志 并 非 一 件 易 事 很 多 寡 妇

.' 6! "! 6 "'' 6 7% $! 7%/'& 人 类 非 洲 锥 虫 病 又 称 昏 睡 病 是 布 氏 锥 虫 冈 比 亚 亚 种!! 或 布 氏 锥 虫 罗 得 西 亚 种 "#$$ %! &'!!! 感 染 引 起 的 一 种 寄 生 虫 病 以 采 采 蝇! 为 传 播 ' 媒

《飘》(二)

untitled

《飘》(一)

使 用 Java 语 言 模 拟 保 险 箱 容 量 门 板 厚 度 箱 体 厚 度 属 性 锁 具 类 型 开 保 险 箱 关 保 险 箱 动 作 存 取 款

《无事生非》

$ $ $ %&!! ( )!"" " * ) " +! + ("$ + ) * "! ",! + " +! $, ( * " -. / !!""! %! * " 2 & * 345! + " %! + )! %! + )!!! (!"" ( ) ( + ) * + * 2!( *!)

!"#$!" %&' 0 0 2!!""#$ #%#$&'( )*%&'( &%('& 多坎坷 后来虽说政治清明 还了他一身清白 却已到了退休年纪 我们有 兄妹三人 我是长子 在我们心目中 父亲不仅有学问 而且是一个十分正 直又很能体贴别人的人 妈妈也是一个小学教师 慈祥 和善 我们全家 从未与别人

./ /

!

!

《复活》(下)

!

小 说 天 地 欲 望 摩 托 尚 成 河 血 溅 维 纳 斯 刘 步 明 长 调 短 歌 海 上 天 湖 李 转 生 目 海 尖 高 处 的 三 种 陈 述 谢 应 华 乡 村 笔 记 阿 曼 桃 花 渡 林 小 耳 种 诗 歌 江 良 热 雨 花 石 张 彩 霞 刊 名 书 法 陈 奋 武 屏

** 状 态 二 亚 健 康 亚 健 康 是 指 处 于 健 康 和 疾 病 两 者 之 间 的 一 种 状 态 即 机 体 内 出 现 某 些 功 能 紊 乱 但 未 影 响 到 行 使 社 会 功 能 主 观 上 有 不 适 感 觉 它 是 人 体 处 于 健 康 和 疾 病 之 间 的 过 渡

Microsoft Word - HERBRECIPES《中國藥膳》.doc

循经指压疗法

毛主席的猪

从 因 人 设 事 谈 起 一 部 文 学 作 品 ( 尤 其 是 长 篇 小 说 ) 的 结 构 至 关 重 要, 因 为 它 是 文 本 整 体 的 组 织 方 式 和 内 部 构 造, 既 是 形 式 又 是 内 容 ; 乃 是 表 达 主 题 最 有 效 的 艺 术 手 段 元 代 戏 曲

附件1.FIT)



北魏山东佛教文化个案研究


《大话设计模式》第一章

PowerPoint 演示文稿

前 言

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

新时期共青团工作实务全书(一百七十一)

《呼啸山庄》(上)

Microsoft Word - 第3章.doc

7 海 外 检 验 医 学 551 年 月 第 卷 第 ( 期 4: & % #4)!& 551 ; ) ( 工 具 的 发 展 如!$!6: 0 " :6" 和 86. 工 具,*9 这 些 工 具 使 用 某 些 风 险 因 素 例 如 吸 烟 血 压 和 脂 质 及 其 他 变 量 如 年 龄

# # # # # # # # # % # & # & # # # () # (( # * * (( # (+ # ( (# # (# # (# # ( # ( +) (

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

/0/ "!!!!! " "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! " # $ % && $ $ $ $ ( $ $ ( $ ) % * ( * $ $ $ $ $ $ $ ( $ $ $ $ $ # ( $ $ ( $ $ $ ( $ $ $ $


(2) 廠 商 具 有 維 修 維 護 或 售 後 服 務 能 力 之 證 明 ;(3) 廠 商 具 有 製 造 供 應 或 承 做 能 力 之 證 明 ;(4) 具 有 相 當 人 力 之 證 明 屬 特 定 資 格 之 ㄧ 8.(3) 機 關 辦 理 預 算 金 額 為 新 台 幣 四 億 元

, 2., 3., , 3.,,

"#$% "& ""!!!!! "!! # $ % $$ & #$%& $(#$)&* $+)*#, $ #$%#&

年 第 期 许 德 刚 基 于 遗 忘 因 子 -./ 算 法 的 自 适 应 直 达 波 对 消 技 术 * 达 站 周 围 的 环 境 可 能 比 较 复 杂 来 自 近 距 离 不 同 固 定 物 体 所 反 射 的 多 径 信 号 也 强 于 回 波 信 号 大 大 影 响 了 雷 达 的

/ 第 卷 (!(" $& $% $%% $$/,!. $"($ ) 0 %'&.(!.' (!' 0 %$ $'#78#/8# 8#$/!),% 3 -+ /! ", $ % +'!)%+%$" $ %'+(("& +'!) "'$,'(% -' (!' 0 %$ $'18 #88 #88!)(!

论 文 :?,,,,,,,,,, (, ),, ( ),,,,,,,, (, ) : (, ),,, :,, ;,,,,

中 国 中 西 医 结 合 杂 志 年 月 第 卷 第 期!"# $! )# 5! ) 3& &!" &"& & 4! (& )& * ) 55& " )! & 5 )!4 ( )&& & )&# 1-9,6 & 7! &) (& (& 5 ) & " 3!4 5! ) &"&!)! & ) " &

3.1 num = 3 ch = 'C' 2

《娜娜》(下)

动 物 中 能 促 进 但 会 在 表 达 的 物 种 中 产 生 不 良 反 应 如 引 起 脂 肪 肝 或 升 高 74-4 水 平 2 # ) 9 等 建 立 血 脂 异 常 和 肝 硬 化 仓 鼠 模 型 进 行 研 究 结 果 表 明 7'&$ 不 能 改 善 血 脂 异 常 和 肝 硬

zt

度 方 面 对 护 士 的 整 个 抢 救 过 程 进 行 评 价 医 生 对 护 士 抢 救 配 合 满 意 度 为 对 患 儿 首 次 评 估 的 正 确 表 & 快 捷 急 救 护 理 记 录 表 性 医 嘱 的 执 行 力 对 患 儿 抢 救 药 物 使 用 后 的 再 次 评 估 合 作

中 国 中 西 医 结 合 杂 志 年 月 第 卷 第 期 *. *, *. * * 4 +* ) ), ) 3, +3 ),, )., +3, ), +3* *. *, +. 3, * 4 +*, ) 3, +3 ),, )., +3 ),., *. * * 4 +* ) ), ) 3, +3 )

工 程 应 用 陈 泾 生 等 继 电 保 护 检 验 标 准 化 作 业 专 家 系 统 的 研 发 和 应 用 实 践 统 硬 件 结 构 和 软 件 功 能 结 构 分 别 如 图 图 所 示 图 / 系 统 硬 件 架 构 0 1/!&%!!" "! 图 软 件 功 能 0 1 %! " 高

!"#$!"#$%&$'!"#$ %$&' ()* +,-& 摄影作品 # $" 图片新闻 # $% 走近滨江 # $& 企业之窗 汽修中心工会成功举办!"#$ 年元旦迎新登山活动 # "% 第十六次!春风行动"报道 # "% 修理团队深夜连续奋战 立足市场 拓展业务 面对极寒天 保障人民群众顺利回

Java

食 用 蘑 菇 尤 其 是 在 上 流 社 会 非 常 流 行 据 传 古 罗 马 的 凯 撒 大 帝 在 食 用 蘑 菇 膳 肴 前 有 专 门 的 侍 者 先 行 鉴 尝 蘑 菇 是 否 有 毒 以 确 保 食 用 安 全 在 世 界 其 他 地 方 如 墨 西 哥 俄 罗 斯 以 及 一 些

Transcription:

Hadoop 程 式 設 計 五 開 發 Hadoop Map/Reduce 程 式 設 計 者 只 需 要 解 決 真 實 的 問 題, 架 構 面 留 給 MapReduce 96

Hadoop 的 MapReduce API 提 供 自 動 的 平 行 化 與 工 作 分 配 容 錯 特 性 狀 態 監 控 工 具 一 個 乾 淨 的 抽 象 化 (abstration) 供 程 式 設 計 師 使 用 97

HDFS & MapReduce HDFS Input Output 部 份 圖 片 來 源 :http://www.larsgeorge.com/2009/05/hbase-mapreduce-101-part-i.html 98

<Key, Value> Pair Input Row Data Map Output key1 key2 key1 val val val Map Select Key Reduce Input key1 val. val val Reduce Output key values 99

Program Prototype (v 0.20) Map 區 Reduce 區 設 定 區 Class MR{ static public Class Mapper { Map 程 式 碼 static public Class Reducer { Reduce 程 式 碼 main(){ Configuration conf = new Configuration(); Job job = new Job(conf, job name"); job.setjarbyclass(thismainclass.class); job.setmapperclass(mapper.class); job.setreduceclass(reducer.class); FileInputFormat.addInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); 其 他 的 設 定 參 數 程 式 碼 job.waitforcompletion(true); 100

Class Mapper (v 0.20) 1 2 3 4 5 6 7 8 9 import org.apache.hadoop.mapreduce.mapper; class MyMap extends Mapper < INPUT INPUT OUTPUT OUTPUT KEY, VALUE, KEY, VALUE > { Class Class Class Class // 全 域 變 數 區 INPUT INPUT public void map ( KEY key, VALUE value, Class Class Context context )throws IOException,InterruptedException { // 區 域 變 數 與 程 式 邏 輯 區 context.write( NewKey, NewValue); 101

Class Reducer (v 0.20) 1 2 3 4 5 6 7 8 9 import org.apache.hadoop.mapreduce.reducer; MyRed class MyRed extends INPUT INPUT OUTPUT OUTPUT Reducer <,,, > KEY VALUE KEY VALUE { Class Class Class Class // 全 域 變 數 區 public void reduce ( INPUT KEY Class key, Iterable< VALUE > values, Context context) throws IOException,InterruptedException { // 區 域 變 數 與 程 式 邏 輯 區 context.write( NewKey, NewValue); INPUT Class 102

其 他 常 用 的 設 定 參 數 設 定 Combiner Job.setCombinerClass ( ); 設 定 output class Job.setMapOutputKeyClass( ); Job.setMapOutputValueClass( ); Job.setOutputKeyClass( ); Job.setOutputValueClass( ); 103

Class Combiner 指 定 一 個 combiner, 它 負 責 對 中 間 過 程 的 輸 出 進 行 聚 集, 這 會 有 助 於 降 低 從 Mapper 到 Reducer 數 據 傳 輸 量 可 不 用 設 定 交 由 Hadoop 預 設 也 可 不 實 做 此 程 式, 引 用 Reducer 設 定 JobConf.setCombinerClass(Class) 104

範 例 一 (1) - mapper public class HelloHadoop { static public class HelloMapper extends Mapper<LongWritable, Text, LongWritable, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { context.write((longwritable) key, (Text) value); // HelloReducer end..( 待 續 ) 105

範 例 一 (2) - reducer static public class HelloReducer extends Reducer<LongWritable, Text, LongWritable, Text> { public void reduce(longwritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text val = new Text(); for (Text str : values) { val.set(str.tostring()); context.write(key, val); // HelloReducer end..( 待 續 ) 106

範 例 一 (3) - main public static void main(string[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "Hadoop Hello World"); job.setjarbyclass(hellohadoop.class); FileInputFormat.setInputPaths(job, "input"); FileOutputFormat.setOutputPath(job, new Path("output-hh1")); job.setmapperclass(hellomapper.class); job.setreducerclass(helloreducer.class); job.waitforcompletion(true); // main end // wordcount class end // 完 107

Hadoop 程 式 設 計 七 Hadoop 程 式 範 例 7.1:HDFS 操 作 篇 7.2:MapReduce 運 算 篇 108

傳 送 檔 案 至 HDFS // 將 檔 案 從 local 上 傳 到 hdfs, src 為 local 的 來 源, dst 為 hdfs 的 目 的 端 public class PutToHdfs { static boolean puttohdfs(string src, String dst, Configuration conf) { Path dstpath = new Path(dst); try { // 產 生 操 作 hdfs 的 物 件 FileSystem hdfs = dstpath.getfilesystem(conf); // 上 傳 hdfs.copyfromlocalfile(false, new Path(src),new Path(dst)); catch (IOException e) { e.printstacktrace(); return false; return true; 109

從 HDFS 取 回 檔 案 // 將 檔 案 從 hdfs 下 載 回 local, src 為 hdfs 的 來 源, dst 為 local 的 目 的 端 public class GetFromHdfs { static boolean getfromhdfs(string src,string dst, Configuration conf) { Path dstpath = new Path(src); try { // 產 生 操 作 hdfs 的 物 件 FileSystem hdfs = dstpath.getfilesystem(conf); // 下 載 hdfs.copytolocalfile(false, new Path(src),new Path(dst)); catch (IOException e) { e.printstacktrace(); return false; return true; 110

檢 查 與 刪 除 檔 案 // checkanddelete 函 式, 檢 查 是 否 存 在 該 資 料 夾, 若 有 則 刪 除 之 public class CheckAndDelete { static boolean checkanddelete(final String path, Configuration conf) { Path dst_path = new Path(path); try { // 產 生 操 作 hdfs 的 物 件 FileSystem hdfs = dst_path.getfilesystem(conf); // 檢 查 是 否 存 在 if (hdfs.exists(dst_path)) { // 有 則 刪 除 hdfs.delete(dst_path, true); catch (IOException e) { e.printstacktrace(); return false; return true; 111

Hadoop 程 式 設 計 七 Hadoop 程 式 範 例 7.1:HDFS 操 作 篇 7.2:MapReduce 運 算 篇 112

範 例 二 (1) HelloHadoopV2 說 明 : 此 程 式 碼 比 HelloHadoop 增 加 了 * 檢 查 輸 出 資 料 夾 是 否 存 在 並 刪 除 * input 資 料 夾 內 的 資 料 若 大 於 兩 個, 則 資 料 不 會 被 覆 蓋 * map 與 reduce 拆 開 以 利 程 式 再 利 用 測 試 方 法 : 將 此 程 式 運 作 在 hadoop 0.20 平 台 上, 執 行 : --------------------------- hadoop jar V2.jar HelloHadoopV2 --------------------------- 注 意 : 1. 在 hdfs 上 來 源 檔 案 的 路 徑 為 "/user/$your_name/input", 請 注 意 必 須 先 放 資 料 到 此 hdfs 上 的 資 料 夾 內, 且 此 資 料 夾 內 只 能 放 檔 案, 不 可 再 放 資 料 夾 2. 運 算 完 後, 程 式 將 執 行 結 果 放 在 hdfs 的 輸 出 路 徑 為 "/user/$your_name/output-hh2" 113

範 例 二 (2) public class HelloHadoopV2 { public static void main(string[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "Hadoop Hello World 2"); job.setjarbyclass(hellohadoopv2.class); // 設 定 map and reduce 以 及 Combiner class job.setmapperclass(hellomapperv2.class); job.setcombinerclass(helloreducerv2.class); job.setreducerclass(helloreducerv2.class); // 設 定 map 的 輸 出 型 態 job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); // 設 定 reduce 的 輸 出 型 態 job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); FileInputFormat.addInputPath (job, new Path("input")); FileOutputFormat.setOutputPath (job, new Path("output-hh2")); // 呼 叫 checkanddelete 函 式, // 檢 查 是 否 存 在 該 資 料 夾, 若 有 則 刪 除 之 CheckAndDelete.checkAndDelete("output-hh2", conf); boolean status = job.waitforcompletion(true); if (status) { System.err.println("Integrate Alert Job Finished!"); else { System.err.println("Integrate Alert Job Failed!"); System.exit(1); 114

範 例 二 (3) public class HelloMapperV2 extends Mapper <LongWritable, Text, Text, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text(key.toString()), value); public class HelloReducerV2 extends Reducer<Text, Text, Text, Text> { public void reduce(text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String str = new String(""); Text final_key = new Text(); Text final_value = new Text(); // 將 key 值 相 同 的 values, 透 過 && 符 號 分 隔 之 for (Text tmp : values) { str += tmp.tostring() + " &&"; final_key.set(key); final_value.set(str); context.write(final_key, final_value); 115

範 例 三 (1) HelloHadoopV3 說 明 : 此 程 式 碼 再 利 用 了 HelloHadoopV2 的 map, reduce 檔, 並 且 自 動 將 檔 案 上 傳 到 hdfs 上 運 算 並 自 動 取 回 結 果, 還 有 提 示 訊 息 參 數 輸 入 與 印 出 運 算 時 間 的 功 能 測 試 方 法 : 將 此 程 式 運 作 在 hadoop 0.20 平 台 上, 執 行 : --------------------------- hadoop jar V3.jar HelloHadoopV3 <local_input> <local_output> --------------------------- 注 意 : 1. 第 一 個 輸 入 的 參 數 是 在 local 的 輸 入 資 料 夾, 請 確 認 此 資 料 夾 內 有 資 料 並 無 子 目 錄 2. 第 二 個 輸 入 的 參 數 是 在 local 的 運 算 結 果 資 料 夾, 由 程 式 產 生 不 用 事 先 建 立, 若 有 請 刪 除 之 116

範 例 三 (2) public class HelloHadoopV3 { public static void main(string[] args) throws IOException, InterruptedException, ClassNotFoundException { String hdfs_input = "HH3_input"; String hdfs_output = "HH3_output"; Configuration conf = new Configuration(); // 宣 告 取 得 參 數 String[] otherargs = new GenericOptionsParser(conf, args).getremainingargs(); // 如 果 參 數 數 量 不 為 2 則 印 出 提 示 訊 息 if (otherargs.length!= 2) { System.err.println("Usage: hadoop jar HelloHadoopV3.jar <local_input> <local_output>"); System.exit(2); Job job = new Job(conf, "Hadoop Hello World"); job.setjarbyclass(hellohadoopv3.class); // 再 利 用 上 個 範 例 的 map 與 reduce job.setmapperclass(hellomapperv2.class); job.setcombinerclass(helloreducerv2.class); job.setreducerclass(helloreducerv2.class); // 設 定 map reduce 的 key value 輸 出 型 態 job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); 117

範 例 三 (2) // 用 checkanddelete 函 式 防 止 overhead 的 錯 誤 CheckAndDelete.checkAndDelete(hdfs_input, conf); CheckAndDelete.checkAndDelete(hdfs_output, conf); // 放 檔 案 到 hdfs PutToHdfs.putToHdfs(args[0], hdfs_input, conf); // 設 定 hdfs 的 輸 入 輸 出 來 源 路 定 FileInputFormat.addInputPath(job, new Path(hdfs_input)); FileOutputFormat.setOutputPath(job, new Path(hdfs_output)); long start = System.nanoTime(); job.waitforcompletion(true); // 把 hdfs 的 結 果 取 下 GetFromHdfs.getFromHdfs(hdfs_output, args[1], conf); boolean status = job.waitforcompletion(true); // 計 算 時 間 if (status) { System.err.println("Integrate Alert Job Finished!"); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); else { System.err.println("Integrate Alert Job Failed!"); System.exit(1); 118

範 例 四 (1) public static void main(string[] args) throws Exception { Configuration conf = new Configuration(); String[] otherargs = new GenericOptionsParser(conf, args).getremainingargs(); if (otherargs.length!= 2) { System.err.println("Usage: hadoop jar WordCount.jar <input> <output>"); System.exit(2); Job job = new Job(conf, "Word Count"); job.setjarbyclass(wordcount.class); job.setmapperclass(tokenizermapper.class); job.setcombinerclass(intsumreducer.class); job.setreducerclass(intsumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); CheckAndDelete.checkAndDelete(args[1], conf); System.exit(job.waitForCompletion(true)? 0 : 1); 119

範 例 四 (2) 1 class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 2 private final static IntWritable one = new IntWritable(1); 3 private Text word = new Text(); 4 public void map( LongWritable key, Text value, Context context) 5 6 7 8 9 Input key /user/hadooper/input/a.txt. No news is a good news. throws IOException, InterruptedException { String line = ((Text) value).tostring(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasmoretokens()) { word.set(itr.nexttoken()); context.write(word, one); Input value line itr no news is a good news itr itr itr itr itr itr <word,one> < no, 1 > < news, 1 > < is, 1 > < a, 1 > < good, 1 > < news, 1 > 120

範 例 四 (3) 1 2 3 4 5 6 7 8 class IntSumReducer extends Reducer< Text, IntWritable, Text, IntWritable> { IntWritable result = new IntWritable(); public void reduce( Text key, Iterable <IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for ( IntWritable val : values ) sum += val.get(); result.set(sum); context.write ( key, result); for ( int i ; i < values.length ; i ++ ){ sum += values[i].get() <word,one> < a, 1 > < good, 1 > < is, 1 > < news, 1 1 > < no, 1 > news 1 1 <key,sunvalue> < news, 2 > 121

範 例 五 (1) WordCountV2 說 明 : 用 於 字 數 統 計, 並 且 增 加 略 過 大 小 寫 辨 識 符 號 篩 除 等 功 能 測 試 方 法 : 將 此 程 式 運 作 在 hadoop 0.20 平 台 上, 執 行 : --------------------------- hadoop jar WCV2.jar WordCountV2 -Dwordcount.case.sensitive=false \ <input> <output> -skip patterns/patterns.txt --------------------------- 注 意 : 1. 在 hdfs 上 來 源 檔 案 的 路 徑 為 你 所 指 定 的 <input> 請 注 意 必 須 先 放 資 料 到 此 hdfs 上 的 資 料 夾 內, 且 此 資 料 夾 內 只 能 放 檔 案, 不 可 再 放 資 料 夾 2. 運 算 完 後, 程 式 將 執 行 結 果 放 在 hdfs 的 輸 出 路 徑 為 你 所 指 定 的 <output> 3. 請 建 立 一 個 資 料 夾 pattern 並 在 裡 面 放 置 pattern.txt, 內 容 如 ( 一 行 一 個, 前 置 提 示 符 號 \) \. \, \! 122

範 例 五 (2) public class WordCountV2 extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { static enum Counters { INPUT_WORDS private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean casesensitive = true; private Set<String> patternstoskip = new HashSet<String>(); private long numrecords = 0; private String inputfile; public void configure(jobconf job) { casesensitive = job.getboolean("wordcount.case.sensitive", true); inputfile = job.get("map.input.file"); if (job.getboolean("wordcount.skip.patterns", false)) { Path[] patternsfiles = new Path[0]; try { patternsfiles = DistributedCache.getLocalCacheFiles(job); catch (IOException ioe) { System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe)); for (Path patternsfile : patternsfiles) { parseskipfile(patternsfile); private void parseskipfile(path patternsfile) { try { BufferedReader fis = new BufferedReader(new FileReader( patternsfile.tostring())); String pattern = null; while ((pattern = fis.readline())!= null) { patternstoskip.add(pattern); catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '"+ patternsfile + "' : " + tringutils.stringifyexception(ioe)); public void map(longwritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = (casesensitive)? value.tostring() : value.tostring().tolowercase(); for (String pattern : patternstoskip) line = line.replaceall(pattern, ""); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasmoretokens()) { word.set(tokenizer.nexttoken()); output.collect(word, one); reporter.incrcounter(counters.input_words, 1); 123

範 例 五 (3) if ((++numrecords % 100) == 0) { reporter.setstatus("finished processing " + numrecords + " records " + "from the input file: " + inputfile); public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasnext()) { sum += values.next().get(); output.collect(key, new IntWritable(sum)); public int run(string[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setjobname("wordcount"); String[] otherargs = new GenericOptionsParser(conf, args).getremainingargs(); if (otherargs.length < 2) { System.out.println("WordCountV2 [- Dwordcount.case.sensitive=<false true>] \\ "); System.out.println(" <indir> <outdir> [-skip Pattern_file]"); return 0; conf.setoutputkeyclass(text.class); conf.setoutputvalueclass(intwritable.class); conf.setmapperclass(map.class); conf.setcombinerclass(reduce.class); conf.setreducerclass(reduce.class); conf.setinputformat(textinputformat.class); conf.setoutputformat(textoutputformat.class); List<String> other_args = new ArrayList<String>(); for (int i = 0; i < args.length; ++i) { if ("-skip".equals(args[i])) { DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf); conf.setboolean("wordcount.skip.patterns", true); else {other_args.add(args[i]); FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); CheckAndDelete.checkAndDelete(other_args.get(1), conf); JobClient.runJob(conf); return 0; public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCountV2(), args); System.exit(res); 124

說 明 : 範 例 六 (1) WordIndex 將 每 個 字 出 於 哪 個 檔 案, 那 一 行 印 出 來 測 試 方 法 : 將 此 程 式 運 作 在 hadoop 0.20 平 台 上, 執 行 : --------------------------- hadoop jar WI.jar WordIndex <input> <output> --------------------------- 注 意 : 1. 在 hdfs 上 來 源 檔 案 的 路 徑 為 你 所 指 定 的 <input> 請 注 意 必 須 先 放 資 料 到 此 hdfs 上 的 資 料 夾 內, 且 此 資 料 夾 內 只 能 放 檔 案, 不 可 再 放 資 料 夾 2. 運 算 完 後, 程 式 將 執 行 結 果 放 在 hdfs 的 輸 出 路 徑 為 你 所 指 定 的 <output> 125

public class WordIndex { public static class wordindexm extends Mapper<LongWritable, Text, Text, Text> { public void map(longwritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit filesplit = (FileSplit) context.getinputsplit(); 範 例 六 (2) static public class wordindexr extends Reducer<Text, Text, Text, Text> { public void reduce(text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String v = ""; Text map_key = new Text(); StringBuilder ret = new StringBuilder("\n"); Text map_value = new Text(); for (Text val : values) { String line = value.tostring(); v += val.tostring().trim(); StringTokenizer st = new StringTokenizer(line.toLowerCase()); if (v.length() > 0) while (st.hasmoretokens()) { ret.append(v + "\n"); String word = st.nexttoken(); map_key.set(word); output.collect((text) key, new map_value.set(filesplit.getpath().getname() + Text(ret.toString())); ":" + line); context.write(map_key, map_value); 126

範 例 六 (2) public static void main(string[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherargs = new GenericOptionsParser(conf, args).getremainingargs(); if (otherargs.length < 2) { System.out.println("hadoop jar WordIndex.jar <indir> <outdir>"); return; Job job = new Job(conf, "word index"); job.setjobname("word inverted index"); job.setjarbyclass(wordindex.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); job.setmapperclass(wordindexm.class); job.setreducerclass(wordindexr.class); job.setcombinerclass(wordindexr.class); FileInputFormat.setInputPaths(job, args[0]); CheckAndDelete.checkAndDelete(args[1], conf); FileOutputFormat.setOutputPath(job, new Path(args[1])); long start = System.nanoTime(); job.waitforcompletion(true); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); 127

範 例 七 (1) YourMenu 說 明 : 將 之 前 的 功 能 整 合 起 來 測 試 方 法 : 將 此 程 式 運 作 在 hadoop 0.20 平 台 上, 執 行 : --------------------------- hadoop jar YourMenu.jar < 功 能 > --------------------------- 注 意 : 1. 此 程 式 需 與 之 前 的 所 有 範 例 一 起 打 包 成 一 個 jar 檔 128

範 例 七 (2) public class YourMenu { public static void main(string argv[]) { int exitcode = -1; ProgramDriver pgd = new ProgramDriver(); if (argv.length < 1) { System.out.print("********************************** ********\n" + " 歡 迎 使 用 NCHC 的 運 算 功 能 \n" + " 指 令 : \n" + " Hadoop jar NCHC-example-*.jar < 功 能 > \n" + " 功 能 : \n" + " HelloHadoop: 秀 出 Hadoop 的 <Key,Value> 為 何 \n" + " HelloHadoopV2: 秀 出 Hadoop 的 <Key,Value> 進 階 版 \n" + " HelloHadoopV3: 秀 出 Hadoop 的 <Key,Value> 進 化 版 \n" + " WordCount: 計 算 輸 入 資 料 夾 內 分 別 在 每 個 檔 案 的 字 數 統 計 \n" + " WordCountV2: WordCount 進 階 版 \n" + " WordIndex: 索 引 每 個 字 與 其 所 有 出 現 的 所 在 列 \n" + "******************************************\n"); else { try { pgd.addclass("hellohadoop", HelloHadoop.class, " Hadoop hello world"); pgd.addclass("hellohadoopv2", HelloHadoopV2.class, " Hadoop hello world V2"); pgd.addclass("hellohadoopv3", HelloHadoopV3.class, " Hadoop hello world V3"); pgd.addclass("wordcount", WordCount.class, " word count."); pgd.addclass("wordcountv2", WordCountV2.class, " word count V2."); pgd.addclass("wordindex", WordIndex.class, "invert each word in line"); pgd.driver(argv); // Success exitcode = 0; System.exit(exitCode); catch (Throwable e) { e.printstacktrace(); 129

補 充 Program Prototype (v 0.18) Map 區 Reduce 區 設 定 區 Class MR{ Class Mapper { Class Reducer { main(){ Map 程 式 碼 Reduce 程 式 碼 JobConf conf = new JobConf( MR.class ); conf.setmapperclass(mapper.class); conf.setreduceclass(reducer.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); 其 他 的 設 定 參 數 程 式 碼 JobClient.runJob(conf); 130

補 充 Class Mapper (v0.18) 1 2 3 4 5 6 7 8 9 import org.apache.hadoop.mapred.*; MyMap class MyMap extends MapReduceBase INPUT INPUT OUTPUT OUTPUT implements Mapper < KEY, VALUE, KEY, VALUE > { // 全 域 變 數 區 INPUT INPUT public void map ( KEY key, VALUE value, OUTPUT OUTPUT OutputCollector<, > output, KEY VALUE Reporter reporter) throws IOException { // 區 域 變 數 與 程 式 邏 輯 區 output.collect( NewKey, NewValue); 131

補 充 Class Reducer (v0.18) import org.apache.hadoop.mapred.*; 1 2 3 4 5 6 7 8 9 class MyRed extends MapReduceBase INPUT INPUT OUTPUT OUTPUT implements Reducer < KEY, VALUE, KEY, VALUE > { // 全 域 變 數 區 INPUT KEY INPUT public void reduce ( key, Iterator< VALUE > values, OUTPUT OUTPUT OutputCollector<, > output, KEY VALUE Reporter reporter) throws IOException { // 區 域 變 數 與 程 式 邏 輯 區 output.collect( NewKey, NewValue); 132

Conclusions 以 上 範 例 程 式 碼 包 含 Hadoop 的 key,value 架 構 操 作 Hdfs 檔 案 系 統 Map Reduce 運 算 方 式 執 行 hadoop 運 算 時, 程 式 檔 不 用 上 傳 至 hadoop 上, 但 資 料 需 要 再 HDFS 內 可 運 用 範 例 七 的 程 式 達 成 連 續 運 算 Hadoop 0.20 與 Hadoop 0.18 有 些 API 有 些 許 差 異, 因 此 在 網 路 上 找 到 Hadoop 的 程 式 如 果 compiler 有 錯, 可 以 換 換 對 應 的 Function 試 試 133