TSMC 教育訓練課程 HBase Programming < V 0.20 > 王耀聰陳威宇 Jazz@nchc.org.tw waue@nchc.org.tw
Outline HBase 程式編譯方法 HBase 程式設計 常用的 HBase API 說明實做 I/O 操作搭配 Map Reduce 運算 其他用法補充 其他專案 2
HBase 程式編譯方法 此篇介紹兩種編譯與執行 HBase 程式的方法 : Method 1 使用 Java JDK 1.6 Method 2 使用 Eclipse 套件
1. Java 之編譯與執行 1. 將 hbase_home 目錄內的.jar 檔全部拷貝至 hadoop_home/lib/ 資料夾內 2. 編譯 javac Δ -classpath Δ hadoop-*-core.jar:hbase-*.jar Δ -d Δ MyJava Δ MyCode.java 3. 封裝 jar Δ -cvf Δ MyJar.jar Δ -C Δ MyJava Δ. 4. 執行 bin/hadoop Δ jar Δ MyJar.jar Δ MyCode Δ {Input/ Δ Output/ } 所在的執行目錄為 Hadoop_Home./MyJava = 編譯後程式碼目錄 Myjar.jar = 封裝後的編譯檔 先放些文件檔到 HDFS 上的 input 目錄./input;./ouput 不一定為 hdfs 的輸入 輸出目錄 4 4
2. Eclipse 之編譯與執行 5
程式設計 此篇介紹如何撰寫 HBase 程式常用的 HBase API 說明實做 I/O 操作搭配 Map Reduce 運算
HBase 程式設計 常用的 HBase API 說明
Table, Family Column, Qualifier Row, TimeStamp, Cell, Lock HTable 成員 8
HBase 常用函式 HBaseAdmin HBaseConfiguration HTable HTableDescriptor Put Get Scanner Database Table Family Column Qualifier 9
HBaseConfiguration Adds HBase configuration files to a Configuration = new HBaseConfiguration ( ) = new HBaseConfiguration (Configuration c) 繼承自 org.apache.hadoop.conf.configuration <property> <name> name </name> <value> value </value> </property> 回傳值 void void String String void void 函數 addresource clear get getboolean set setboolean 參數 (Path file) () (String name) (String name, boolean defaultvalue ) (String name, String value) (String name, boolean value) 10
HBaseAdmin HBase 的管理介面 = new HBaseAdmin( HBaseConfiguration conf ) Ex: HBaseAdmin admin = new HBaseAdmin(config); admin.disabletable ( tablename ); 回傳值 函數 參數 addcolumn (String tablename, HColumnDescriptor column) checkhbaseavailable (HBaseConfiguration conf) createtable (HTableDescriptor desc) void deletetable (byte[] tablename) deletecolumn (String tablename, String columnname) enabletable (byte[] tablename) disabletable (String tablename) HTableDescriptor[] listtables () void modifytable (byte[] tablename, HTableDescriptor htd) boolean tableexists (String tablename) 11
HTableDescriptor HTableDescriptor contains the name of an HTable, and its column families. = new HTableDescriptor() = new HTableDescriptor(String name) Constant-values Ex: org.apache.hadoop.hbase.htabledescriptor.table_descriptor_version HTableDescriptor htd = new HTableDescriptor(tablename); htd.addfamily ( new HColumnDescriptor ( Family )); 回傳值 void HColumnDescriptor byte[] byte[] void 函數 addfamily removefamily getname getvalue setvalue 參數 (HColumnDescriptor family) (byte[] column) ( ) = Table name (byte[] key) = 對應 key 的 value (String key, String value) 12
HColumnDescriptor An HColumnDescriptor contains information about a column family = new HColumnDescriptor(String familyname) Constant-values Ex: org.apache.hadoop.hbase.htabledescriptor.table_descriptor_version HTableDescriptor htd = new HTableDescriptor(tablename); HColumnDescriptor col = new HColumnDescriptor("content:"); htd.addfamily(col); 回傳值 byte[] byte[] void 函數 getname getvalue setvalue 參數 ( ) = Family name (byte[] key) = 對應 key 的 value (String key, String value) 13
HTable Used to communicate with a single HBase table. Ex: = new HTable(HBaseConfiguration conf, String tablename) HTable table = new HTable (conf, Bytes.toBytes ( tablename )); ResultScanner scanner = table.getscanner ( family ); 回傳值 void void boolean Result byte[][] ResultScanner HTableDescriptor byte[] static boolean void 函數 checkandput close exists get getendkeys getscanner gettabledescriptor gettablename istableenabled put 參數 (byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) () (Get get) (Get get) () (byte[] family) () () (HBaseConfiguration conf, String tablename) (Put put) 14
Put Used to perform Put operations for a single row. Ex: = new Put(byte[] row) = new Put(byte[] row, RowLock rowlock) HTable table = new HTable (conf, Bytes.toBytes ( tablename )); Put p = new Put ( brow ); p.add (family, qualifier, value); table.put ( p ); Put Put byte[] RowLock long boolean Put add add getrow getrowlock gettimestamp isempty settimestamp (byte[] family, byte[] qualifier, byte[] value) (byte[] column, long ts, byte[] value) () () () () (long timestamp) 15
Get Used to perform Get operations on a single row. Ex: = new Get (byte[] row) = new Get (byte[] row, RowLock rowlock) HTable table = new HTable(conf, Bytes.toBytes(tablename)); Get g = new Get(Bytes.toBytes(row)); Get Get Get Get TimeRange Get Get addcolumn addcolumn addcolumns addfamily gettimerange settimerange setfilter (byte[] column) (byte[] family, byte[] qualifier) (byte[][] columns) (byte[] family) () (long minstamp, long maxstamp) (Filter filter) 16
Scanner All operations are identical to Get Rather than specifying a single row, an optional startrow and stoprow may be defined. If rows are not specified, the Scanner will iterate over all rows. = new Scan () = new Scan (byte[] startrow, byte[] stoprow) = new Scan (byte[] startrow, Filter filter) Get Get Get Get TimeRange Get Get addcolumn addcolumn addcolumns addfamily gettimerange settimerange setfilter (byte[] column) (byte[] family, byte[] qualifier) (byte[][] columns) (byte[] family) () (long minstamp, long maxstamp) (Filter filter) 17
Result Single row result of a Get or Scan query. Ex: = new Result() HTable table = new HTable(conf, Bytes.toBytes(tablename)); Get g = new Get(Bytes.toBytes(row)); Result rowresult = table.get(g); Bytes[] ret = rowresult.getvalue( (family + ":"+ column ) ); boolean NavigableMap <byte[],byte[]> byte[] byte[] int containscolumn getfamilymap getvalue getvalue Size (byte[] family, byte[] qualifier) (byte[] family) (byte[] column) (byte[] family, byte[] qualifier) () 18
Interface ResultScanner Interface for client-side scanning. Go to HTable to obtain instances. Ex: HTable.getScanner (Bytes.toBytes(family)); ResultScanner scanner = table.getscanner (Bytes.toBytes(family)); for (Result rowresult : scanner) { Bytes[] str = rowresult.getvalue ( family, column ); } void Result close next () () 19
HBase Key/Value 的格式 org.apache.hadoop.hbase.keyvalue getrow(), getfamily(), getqualifier(), gettimestamp(), and getvalue(). The KeyValue blob format inside the byte array is: <keylength> <valuelength> <key> <value> Key 的格式 : < rowlength > < row> < columnfamilylength > < columnfamily > < columnqualifier > < timestamp > < keytype > Rowlength 最大值為 Short.MAX_SIZE, column family length 最大值為 Byte.MAX_SIZE, column qualifier + key length 必須小於 Integer.MAX_SIZE. 20
HBase 程式設計 實做 I/O 操作
範例一 : 新增 Table < 指令 > 22
範例一 : 新增 Table public static void createhbasetable ( String tablename ) throws IOException { HTableDescriptor htd = new HTableDescriptor(tablename); HColumnDescriptor col = new HColumnDescriptor("content:"); htd.addfamily(col); HBaseConfiguration config = new HBaseConfiguration(); HBaseAdmin admin = new HBaseAdmin(config); if(admin.tableexists(tablename)) { admin.disabletable(tablename); admin.deletetable(tablename); } admin.createtable(htd); } < 程式碼 > 23
範例二 :Put: 資料進 Column < 指令 > 24
範例二 : Put 資料進 Column static public void putdata(string tablename, String row, String family, String column, String value) throws IOException { HBaseConfiguration config = new HBaseConfiguration(); HTable table = new HTable(config, tablename); byte[] brow = Bytes.toBytes(row); byte[] bfamily = Bytes.toBytes(family); byte[] bcolumn = Bytes.toBytes(column); byte[] bvalue = Bytes.toBytes(value); Put p = new Put(brow); p.add(bfamily, bcolumn, bvalue); table.put(p); table.close(); } < 程式碼 > 25
範例三 : Get Column Value < 指令 > 26
範例三 : Get Column Value < 程式碼 > static String getcolumn ( String tablename, String row, String family, String column ) { HBaseConfiguration conf = new HBaseConfiguration(); String ret = ""; HTable table; try { table = new HTable(conf, Bytes.toBytes(tablename)); Get g = new Get(Bytes.toBytes(row)); Result rowresult = table.get(g); ret = Bytes.toString(rowResult.getValue(Bytes.toBytes(family + : + column))); table.close(); } catch (IOException e) { e.printstacktrace(); } return ret; } 27
範例四 : Scan all Column < 指令 > 28
範例四 :Scan: all Column static void ScanColumn(String tablename, String family, String column) { HBaseConfiguration conf = new HBaseConfiguration(); HTable table; try { table = new HTable(conf, Bytes.toBytes(tablename)); ResultScanner scanner = table.getscanner(bytes.tobytes(family)); int i = 1; for (Result rowresult : scanner) { byte[] by = rowresult.getvalue( Bytes.toBytes(family), Bytes.toBytes(column) ); String str = Bytes.toString ( by ); System.out.println("row " + i + " is \"" + str +"\""); i++; } } catch (IOException e) { e.printstacktrace(); } } < 程式碼 > 29
範例五 : 刪除資料表 < 指令 > 30
範例五 : 刪除資料表 static void drop ( String tablename ) { HBaseConfiguration conf = new HBaseConfiguration(); try { HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableexists(tablename)) { admin.disabletable(tablename); admin.deletetable(tablename); System.out.println("Droped the table [" + tablename+ "]"); }else{ System.out.println("Table [" + tablename+ "] was not found!"); } } catch (IOException e) { e.printstacktrace(); } } < 程式碼 > 31
HBase 程式設計 MapReduce 與 HBase 的搭配
範例六 :WordCountHBase: 程式說明 33
範例六 :WordCountHBase: public class WordCountHBase { public static class Map extends Mapper<LongWritable,Text,Text, IntWritable> { private IntWritable i = new IntWritable(1); public void map(longwritable key,text value,context context) throws IOException, InterruptedException { String s[] = value.tostring().trim().split(" "); for( String m : s) { context.write(new Text(m), i); } } } public static class Reduce extends TableReducer<Text, IntWritable, NullWritable> { public void reduce(text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(intwritable i : values) { sum += i.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(bytes.tobytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum))); context.write(nullwritable.get(), put); } } <1> 34
範例六 :WordCountHBase: public static void createhbasetable(string tablename)throws IOException { HTableDescriptor htd = new HTableDescriptor(tablename); HColumnDescriptor col = new HColumnDescriptor("content:"); htd.addfamily(col); HBaseConfiguration config = new HBaseConfiguration(); HBaseAdmin admin = new HBaseAdmin(config); if(admin.tableexists(tablename)) { admin.disabletable(tablename); admin.deletetable(tablename); } System.out.println("create new table: " + tablename); admin.createtable(htd); } } { } public static void main(string args[]) throws Exception String tablename = "wordcount"; Configuration conf = new Configuration(); conf.set(tableoutputformat.output_table, tablename); createhbasetable(tablename); String input = args[0]; Job job = new Job(conf, "WordCount table with " + input); job.setjarbyclass(wordcounthbase.class); job.setnumreducetasks(3); job.setmapperclass(map.class); job.setreducerclass(reduce.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); job.setinputformatclass(textinputformat.class); job.setoutputformatclass(tableoutputformat.class); FileInputFormat.addInputPath(job, new Path(input)); System.exit(job.waitForCompletion(true)?0:1); <2> 35
範例六 : 執行結果 36
範例七 :LoadHBaseMapper: 說明 : 此程式碼將 HBase 的資料取出來, 再將結果塞回 hdfs 上運算方法 : 將此程式運作在 hadoop 0.20 平台上, 用 ( 參考 2) 的方法加入 hbase 參數後, 將此程式碼打包成 XX.jar 執行 : --------------------------- hadoop jar XX.jar LoadHBaseMapper <hdfs_output> --------------------------- 結果 : $ hadoop fs -cat <hdfs_output>/part-r-00000 --------------------------- 54 30 31 GunLong 54 30 32 Esing 54 30 33 SunDon 54 30 34 StarBucks --------------------------- 注意 : 1. 請注意 hbase 上必須要有 table, 並且已經有資料 2. 運算完後, 程式將執行結果放在你指定 hdfs 的 <hdfs_output> 內請注意沒有 <hdfs_output> 資料夾 37
範例七 :LoadHBaseMapper: public class LoadHBaseMapper { public static class HtMap extends TableMapper<Text, Text> { public void map(immutablebyteswritable key, Result value, Context context) throws IOException, InterruptedException { String res = Bytes.toString(value.getValue( Bytes.toBytes("Detail"), }} Bytes.toBytes("Name"))); context.write(new Text(key.toString()), new Text(res)); public static class HtReduce extends Reducer<Text, Text, Text, Text> { public void reduce(text key, Iterable<Text> values, Context context) }} throws IOException, InterruptedException { String str = new String(""); Text final_key = new Text(key); Text final_value = new Text(); for (Text tmp : values) { str += tmp.tostring(); } final_value.set(str); context.write(final_key, final_value); <1> 38
範例七 : LoadHBaseMapper public static void main(string args[]) throws Exception { String input = args[0]; String tablename = "tsmc"; Configuration conf = new Configuration(); Job job = new Job(conf, tablename + " hbase data to hdfs"); job.setjarbyclass(loadhbasemapper.class); TableMapReduceUtil.initTableMapperJob (tablename, myscan, HtMap.class,Text.class, Text.class, job); job.setmapperclass(htmap.class); job.setreducerclass(htreduce.class); job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(text.class); job.setinputformatclass(tableinputformat.clas s); job.setoutputformatclass(textoutputformat.c lass); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); FileOutputFormat.setOutputPath(job, new Path(input)); System.exit(job.waitForCompletion(true)? 0 : 1); } } <2> 39
範例七 : 執行結果 40
其他用法補充 HBase 內 contrib 的項目, 如 Trancational Thrift
1. Transactional HBase Indexed Table = Secondary Index = Transactional HBase 內容與原本 table 相似的另一張 table, 但 key 不同, 利於排列內容 Primary Table Indexed Table name price description name price description 1 apple 10 xx 2 orig 5 ooo 2 3 orig banana 5 15 ooo vvvv 4 1 tomato apple 8 10 uu xx 4 tomato 8 uu 3 banana 15 vvvv 42
1. 環境設定 需在 $HBASE_INSTALL_DIR/conf/hbase-site.xml 檔內增加兩項內容 <property> <name>hbase.regionserver.class</name> <value>org.apache.hadoop.hbase.ipc.indexedregioninterface</value> </property> <property> <name>hbase.regionserver.impl</name> <value> org.apache.hadoop.hbase.regionserver.tableindexed.indexedregionserver </value> </property> 43
1.a Ex : 從一個原有的 Table 增加 IndexedTable public void addsecondaryindextoexistingtable(string TableName, String IndexID, String IndexColumn) throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addresource(new Path("/opt/hbase/conf/hbasesite.xml")); IndexedTableAdmin admin = null; admin = new IndexedTableAdmin(conf); admin.addindex(bytes.tobytes(tablename), new IndexSpecification( IndexID, Bytes.toBytes(IndexColumn))); }} 44
1.b Ex : 建立一個新的 Table 附帶 IndexedTable public void createtablewithsecondaryindexes(string TableName, String IndexColumn) throws IOException { HBaseConfiguration conf = new HBaseConfiguration(); conf.addresource(new Path("/opt/hbase/conf/hbase-site.xml")); HTableDescriptor desc = new HTableDescriptor(TableName); desc.addfamily(new HColumnDescriptor( Family1")); IndexedTableDescriptor Idxdesc = new IndexedTableDescriptor(desc); Idxdesc.addIndex(new IndexSpecification(IndexColumn, Bytes.toBytes(" Family1 :" + IndexColumn))); IndexedTableAdmin admin = new IndexedTableAdmin(conf); admin.createindexedtable(idxdesc); } 45
2. Thrift 46
其他專案 王耀聰陳威宇 Jazz@nchc.org.tw waue@nchc.org.tw
PIG 48
Hive 49
a Conclusions
Questions and Thanks