MapReduce 陳威宇
Review Hadoop Hdfs Datanode Namenode files / blocks Data locality ( 在地運算 ) 2
Outline What is MapReduce Process flow Yarn Configuration Java programing 3
MapReduce Introduction Objective : Design scalable parallel programming framework to be deployed on large cluster of commodity machines Data divided into splits, each processed by map functions, whose output are processed by reduce functions. Originated and first practical implementation in Google Inc. 2004 4
運算篇 ( MapReduce) 儲存篇 ( HDFS ) Hadoop 1.X 遇到的挑戰 我們希望 事實上 無論大小, 通通進 hdfs 資料可以任意修改 生產在用, 當然要 High Availability 多種用法 圖形運算 及時運算 資源分配更精確更廣泛 一個 namenode 管理資料的定址空間有限 檔案無法再修改 Single Point Of Failure 只有 batch job 文字處理 每個 batch job 都需讀所有的資料 一起處理 只有一種用法 : Map -> Reduce 5
Yarn = MapReduce V2 Yarn 與 MapReduce V1 相同之處 編程模型 :map() 與 reduce() MRv1 與 MRv2 的程式寫法都相同 但需要用 V2 框架重新 compiler 6
Yarn = MapReduce V2 Yarn 與 MapReduce V1 相異之處 運算環境 : MRv1: 只有 map / reducer 兩種 slot MRv2:container 可以運算各種型態的程序 資源管理 : MRv1: 所有工作統籌都交給 JobTracker ( 經理 / 工程師 ) MRv2:ResourceManager 管資源,ApplicationMaster 管工作 ( 經理 /PI/ 工程師 ) 7
YARN 運作模式 1. Client 向 RM 提交工作 2. RM 啟動挑選一個 container 啟動 AM 3. AM 啟動後會向 RM 註冊 4. AM 向 RM 協商 container 資源使用 5. 協商成功後,AM 可以向 NM 要求 container 資源 6. 接著 program 會在 NM 上的 container 上做計算 7. 運算期間,client 可以直接與 AM 溝通, 取得 job 資訊 8. 一旦程式運算完成, AM 會向 RM 申請註銷, 並釋放自身的 container 資源 8
還有更威的 : Spark 天下武功 唯快不破 Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. 理由 : RDD (Resilient Distribute Datasets) In memory 適用場景 迭代式運算 交互分析 Why not 不夠穩定 不夠 bigdata 9
At a glance 10
Why MapReduce 11
The Feature Distributed Computing Fault tolerance Task granularity Data locality 12
Distributed Computing Figure source: https://www.cs.rutgers.edu/~pxk/417/notes/content/mapreduce.html 13
Fault tolerance Objective : handle machine failures gracefully, i.e. programmer don t need to handle it or be aware of details. Two type of failures : Master failure Worker failure Two main activities Failure detection Recover lost (computed) data with least possible cost 14
Task granularity Load balancing: fine grained is better, faster machines tend to take more tasks than slower machined over time, leads to less overall job execution time. Failure recovery : less time to re-execute failed tasks. Very fine-grained tasks may not be desirable : overhead to manage and too much data shuffling (consume valuable bandwidth). Optimal Granularity : Split size = HDFS block size (128 MB by default) HDFS block is guaranteed to be in the same node. We need to maximize work done by one mapper locally if split size < block : not fully utilizing possibility of local data processing if split size > block : may need data transfer to make map function complete Where s the catch? Less expensive to make more mappers than moving data, code size is far less than data size. Practical considerations Master bottleneck : take O(M+R) scheduling decision, and store O(M*R) states. # mappers = Input file size / file system block size (16 to 128 MB) 15
Data locality Network bandwidth is a valuable resource. We assume a rack server hardware. MapReduce scheduler works as follows : 1. Try to assign map task to the node where the corresponding split block(s) reside, if it is free assign, else go to step 2 2. try to find a free nod in the same rack to assign the map task, if can t find a free offrack node to assign. More complex implementation uses network cost model. 16
MapReduce Process Flow 17
Example 18
Example 19
Example 20
Map 21
Shuffle & Sort Map -> Sort -> Shuffle -> Sort -> Reduce 開發人員不用煩腦,framework 會自行處理 確保所有 key1 的值一定在同一個 Reducer 裡出現 22
Reduce 23
Yarn Setup 24
Yarn Installation & Configuration yarn-site.xml mapred-site.xml <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration> <configuration><property> <name>yarn.resourcemanager.scheduler.address</name> <value>master:8030</value> </property><property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>master:8031</value> </property><property> <name>yarn.resourcemanager.address</name> <value>master:8032</value> </property><property> <name>yarn.nodemanager.address</name> <value>0.0.0.0:8034</value> </property><property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property><property> <name>yarn.nodemanager.local-dirs</name> <value>/home/hadoop/hadoop_dir/nm-local-dir</value> </property><property> <name>yarn.nodemanager.log-dirs</name> <value>/home/hadoop/hadoop_dir/userlogs</value> </property></configuration> 25
環境 conf / yarn.site.xml: conf / mapred.site.xml: conf/slaves: master slave master x.x.x.1 Namenode Resource Manager Datanode Node Manager http://x.x.x.1:50070 http://x.x.x.1:8088 slave x.x.x.2 Datanode Node Manager 26
YARN CLI 啟動與關閉 YARN 確認 HDFS 已啟動 $start-yarn.sh $stop-yarn.sh 確認 ResourceManager 與 NodeManager 皆啟動 JPS http://master:8088/cluster Run mr example In /home/hadoop/hadoop/share/hadoop/mapreduce hadoop jar hadoop-mapreduce-examples-2.5.0- cdh5.3.2.jar pi 10 1000 27
MapReduce Programming Model 28
Java programming Programing prototype public class MyMR { public class MyMapper extends Mapper<Object, Text, Text, IntWritable> { }... Map Code public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { Reduce Code }... } public static void main(string[] args) throws Exception { } Driver Code 29
Java programming Driver Driver Code Configuration conf = new Configuration(); Job job = new Job(conf, New MR job"); job.setjarbyclass(mymr.class); job.setmapperclass(mymapper.class); job.setreducerclass(myreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0 : 1); Config Initialize Job Initialize Job setup Input / output setup Job run 30
Java programming Driver / input & output format Input / Output Format 設定 若使用預設的 input/output format TextInputFormat/TextOutputFormat, 無需在 Driver 中設定 每一筆結果為輸出文件中的一行 每一行包含 key/value, 預設以 tab 分隔 Key/value 可為任意 class, 但需在 Driver 中設定 若使用非預設的 input/output format job.setinputformatclass(sequencefileinputformat.class ); job.setoutputformatclass(nulloutputformat.class); 31
Java programming Driver / input & output format Input / Output Format 設定 若使用預設的 input/output format TextInputFormat/TextOutputFormat, 無需在 Driver 中設定 每一筆結果為輸出文件中的一行 每一行包含 key/value, 預設以 tab 分隔 Key/value 可為任意 class, 但需在 Driver 中設定 若使用非預設的 input/output format job.setinputformatclass(sequencefileinputformat.class ); job.setoutputformatclass(nulloutputformat.class); 32
Java programming Mapper 33
Java programming Mapper / Realcase public class WordCountMapper extends Mapper< Object, Text, Text, IntWritable>{ public void map(object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); IntWritable one = new IntWritable(1); } } while (itr.hasmoretokens()) { Text word = new Text(itr.nextToken()) context.write(word, one); } 34
Java programming Reducer 35
Java programming Reducer / Realcase public class WordCountReducer extends Reducer< Text, IntWritable, Text, IntWritable > { public void reduce( Text key, Iterable< IntWritable > values, Context context) throws IOException, InterruptedException { int sum = 0; IntWritable result = new IntWritable(); for (IntWritable val : values) { sum += val.get(); } } result.set(sum); context.write(key, result); } 36