开放数据处理服务 ODPS 批量数据通道

Similar documents
SDK 概要 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk" 获取不同版本的 Java SDK: 包名 odps-sdk-core odps-sdk-commons odps-sdk-udf odps-sdk-mapred odps-sdk-graph 描述 ODPS 基

新・解きながら学ぶJava

Microsoft Word - 第3章.doc

untitled

EJB-Programming-4-cn.doc

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

Microsoft Word - 01.DOC

エスポラージュ株式会社 住所 : 東京都江東区大島 東急ドエルアルス大島 HP: ******************* * 关于 Java 测试试题 ******

《大话设计模式》第一章

chp6.ppt


Java

JavaIO.PDF

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

内 容 简 介 本 书 是 一 本 关 于 语 言 程 序 设 计 的 教 材, 涵 盖 了 语 言 的 基 本 语 法 和 编 程 技 术, 其 中 包 含 了 作 者 对 语 言 多 年 开 发 经 验 的 总 结, 目 的 是 让 初 学 的 读 者 感 受 到 语 言 的 魅 力, 并 掌

获取 Access Token access_token 是接口的全局唯一票据, 接入方调用各接口时都需使用 access_token 开发者需要进行妥善保存 access_token 的存储至少要保留 512 个字符空间 access_token 的有效期目前为 2 个小时, 需定时刷新, 重复

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

KillTest 质量更高 服务更好 学习资料 半年免费更新服务

untitled

新版 明解C++入門編

OOP with Java 通知 Project 4: 4 月 19 日晚 9 点

概述

Chapter 9: Objects and Classes

无类继承.key

OOP with Java 通知 Project 3: 3 月 29 日晚 9 点 4 月 1 日上课

Hive:用Java代码通过JDBC连接Hiveserver

國家圖書館典藏電子全文

任務二 : 產生 20 個有炸彈的磚塊, 放在隨機的位置編輯 Block 類別的程式碼 import greenfoot.; // (World, Actor, GreenfootImage, Greenfoot and MouseInfo) Write a description of class

CC213

使用MapReduce读取XML文件

输入 project name 选择完成

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

主程式 : public class Main3Activity extends AppCompatActivity { ListView listview; // 先整理資料來源,listitem.xml 需要傳入三種資料 : 圖片 狗狗名字 狗狗生日 // 狗狗圖片 int[] pic =new

Flume-ng与Mysql整合开发

用手機直接傳值不透過網頁連接, 來當作搖控器控制家電 ( 電視遙控器 ) 按下按鍵發送同時會回傳值來確定是否有送出 問題 :1. 應該是使用了太多 thread 導致在傳值上有問題 2. 一次按很多次按鈕沒辦法即時反應

附录J:Eclipse教程

<4D F736F F F696E74202D20332D322E432B2BC3E6CFF2B6D4CFF3B3CCD0F2C9E8BCC6A1AAD6D8D4D8A1A2BCCCB3D0A1A2B6E0CCACBACDBEDBBACF2E707074>

javaexample-02.pdf

untitled

FileMaker 16 ODBC 和 JDBC 指南

OOP with Java 通知 Project 4: 5 月 2 日晚 9 点

Microsoft Word - ch04三校.doc

java2d-4.PDF

软件工程文档编制

Fun Time (1) What happens in memory? 1 i n t i ; 2 s h o r t j ; 3 double k ; 4 char c = a ; 5 i = 3; j = 2; 6 k = i j ; H.-T. Lin (NTU CSIE) Referenc

北 风 网 讲 师 原 创 作 品 ---- 仅 供 学 员 内 部 交 流 使 用 前 言 吾 尝 终 日 而 思 矣, 不 如 须 臾 之 所 学 也 ; 吾 尝 跂 而 望 矣, 不 如 登 高 之 博 见 也 登 高 而 招, 臂 非 加 长 也, 而 见

1 1 大概思路 创建 WebAPI 创建 CrossMainController 并编写 Nuget 安装 microsoft.aspnet.webapi.cors 跨域设置路由 编写 Jquery EasyUI 界面 运行效果 2 创建 WebAPI 创建 WebAPI, 新建 -> 项目 ->

FileMaker 15 ODBC 和 JDBC 指南

Microsoft Word - Broker.doc

获取 Access Token 1 基础概念 access_token 是接口的全局唯一票据, 接入方调用各接口时都需使用 access_token 开发者需要妥善保存,access_token 的存储至少要保留 512 个字符空间 access_token 的有效期目前为 2 个小时, 需定时刷新

BOOL EnumWindows(WNDENUMPROC lparam); lpenumfunc, LPARAM (Native Interface) PowerBuilder PowerBuilder PBNI 2

HBase 中加盐(Salting)之后的表如何读取:协处理器篇

云数据库 RDS SDK

提问袁小兵:

Socket Socket TcpClient Socket.Connect TcpClient.Connect Socket.Send / Receive NetworkStream 6-5

2. AOP 底层技术实现 小风 Java 实战系列教程 关键词 : 代理模式 代理模型分为两种 : 1) 接口代理 (JDK 动态代理 ) 2) 子类代理 (Cglib 子类代理 ) 需求 :CustomerService 业务类, 有 save,update 方法, 希望在 save,updat

Guava学习之Resources

目 录 / CATALOG 一 桌 面 网 站 接 入 1. JAVASCRIPT 网 页 插 件 接 入 2. JS 代 码 操 作 指 引 3. 网 页 链 接 独 立 页 面 接 入 4. 网 页 链 接 接 入 操 作 指 引 5. 自 定 义 桌 面 网 站 接 入 图 标 颜 色 等 0

Microsoft Office SharePoint Server MOSS Web SharePoint Web SharePoint 22 Web SharePoint Web Web SharePoint Web Web f Lists.asmx Web Web CAML f

C H A P T E R 7 Windows Vista Windows Vista Windows Vista FAT16 FAT32 NTFS NTFS New Technology File System NTFS

新美大酒店开放平台SDK(.NET版)使用说明.pages

全国计算机技术与软件专业技术资格(水平)考试

CHAPTER VC#

3.1 num = 3 ch = 'C' 2

威 福 髮 藝 店 桃 園 市 蘆 竹 區 中 山 里 福 祿 一 街 48 號 地 下 一 樓 50,000 獨 資 李 依 純 105/04/06 府 經 登 字 第 號 宏 品 餐 飲 桃 園 市 桃 園 區 信 光 里 民

6-1 Table Column Data Type Row Record 1. DBMS 2. DBMS MySQL Microsoft Access SQL Server Oracle 3. ODBC SQL 1. Structured Query Language 2. IBM

Hadoop&Spark解决二次排序问题(Hadoop篇)

Go构建日请求千亿微服务最佳实践的副本

RxJava

SL2511 SR Plus 操作手冊_單面.doc

Transcription:

开放数据处理服务 ODPS 批量数据通道

批量数据通道 SDK 介绍 概要 ODPS Tunnel 是 ODPS 的数据通道, 用户可以通过 Tunnel 向 ODPS 中上传或者下载数据 目前 Tunnel 仅支持表 ( 不包括视图 View) 数据的上传下载 ODPS 提供的数据上传下载工具即是基于 Tunnel SDK 编写的 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk-core" 获取不同版本的 Java SDK, 相关配置信息 : <dependency> <groupid>com.aliyun.odps</groupid> <artifactid>odps-sdk-core</artifactid> <version>0.20.7-public</version> </dependency> 这篇教程从用户的角度出发, 介绍 Tunnel SDK 的主要接口, 不同版本的 SDK 在使用上有差别, 准确信息以 SDK Java Doc 为准 主要接口 TableTunnel TableTunnel.UploadSession TableTunnel.DownloadSession 描述 访问 ODPS Tunnel 服务的入口类 用户可以通过公网或者阿里云内网环境对 ODPS 及其 Tunnel 进行访问 当用户在阿里云内网环境中, 使用 Tunnel 内网连接下载数据时,ODPS 不会将该操作产生的流量计入计费 此外内网地址仅对杭州域的云产品有效 表示一个向 ODPS 表中上传数据的会话 表示一个向 ODPS 表中下载数据的会话 备注 : 用户可以通过公网或者阿里云内网环境对 ODPS 及其 Tunnel 进行访问 当用户在阿里云内网环境中, 使用 Tunnel 内网连接下载数据时,ODPS 不会将该操作产生的流量计入计费 此外内网地址仅对杭州域的云产品有效 阿里云内网地址 : 1

ODPS 地址 :http://odps-ext.aliyun-inc.com/api 公网地址 : ODPS 地址 :http://service.odps.aliyun.com/api 对于 tunnel 地址, 用户无需配置, 支持根据 odps endpoint 来自动路由 关于 SDK 的更多详细信息请参阅 SDK Java Doc TableTunnel 接口定义 : public class TableTunnel { public DownloadSession createdownloadsession(string projectname, String tablename); public DownloadSession createdownloadsession(string projectname, String tablename, PartitionSpec partitionspe c); public UploadSession createuploadsession(string projectname, String tablename); public UploadSession createuploadsession(string projectname, String tablename, PartitionSpec partitionspec); public DownloadSession getdownloadsession(string projectname, String tablename, PartitionSpec partitionspec, String id); public DownloadSession getdownloadsession(string projectname, String tablename, String id); public UploadSession getuploadsession(string projectname, String tablename, PartitionSpec partitionspec, String id); public UploadSession getuploadsession(string projectname, String tablename, String id); public void setendpoint(string endpoint); TableTunnel: 生命周期 : 从 TableTunnel 实例被创建开始, 一直到程序结束 提供创建 Upload 对象和 Download 对象的方法 UploadSession 接口定义 : public class UploadSession { UploadSession(Configuration conf, String projectname, String tablename, String partitionspec) throws TunnelException; UploadSession(Configuration conf, String projectname, String tablename, String partitionspec, String uploadid) throws TunnelException; public void commit(long[] blocks); public Long[] getblocklist(); 2

public String getid(); public TableSchema getschema(); public UploadSession.Status getstatus(); public Record newrecord(); public RecordWriter openrecordwriter(long blockid); public RecordWriter openrecordwriter(long blockid, boolean compress); Upload 对象 : 生命周期 : 从创建 Upload 实例到结束上传 创建 Upload 实例, 可以通过调用构造方法创建, 也可以通过 TableTunnel 创建 ; 请求方式 : 同步 Server 端会为该 Upload 创建一个 session, 生成唯一 uploadid 标识该 Upload, 客户端可以 通过 getid 获取 上传数据 : 请求方式 : 异步 调用 openrecordwriter 方法, 生成 RecordWriter 实例, 其中参数 blockid 用于标识此次上 传的数据, 也描述了数据在整个表中的位置, 取值范围 :[0,20000], 当数据上传失败, 可 以根据 blockid 重新上传 查看上传 : 请求方式 : 同步 调用 getstatus 可以获取当前 Upload 状态 调用 getblocklist 可以获取成功上传的 blockid list, 可以和上传的 blockid list 对比, 对失 败的 blockid 重新上传 3

结束上传 : 请求方式 : 同步 调用 commit(long[] blocks) 方法, 参数 blocks 列表表示已经成功上传的 block 列表,server 端会对该列表进行验证 该功能是加强对数据正确性的校验, 如果提供的 block 列表与 server 端存在的 block 列表不一 致抛出异常 Commit 失败可以进行重试 7 种状态说明 : UNKNOWN, server 端刚创建一个 session 时设置的初始值 NORMAL, 创建 upload 对象成功 CLOSING, 当调用 complete 方法 ( 结束上传 ) 时, 服务端会先把状态置为 CLOSING CLOSED, 完成结束上传 ( 即把数据移动到结果表所在目录 ) 后 EXPIRED, 上传超时 CRITICAL, 服务出错 注意 : 同一个 UploadSession 里的 blockid 不能重复 也就是说, 对于同一个 UploadSession, 用一个 blockid 打开 RecordWriter, 写入一批数据后, 调用 close, 然后再 commit 完成后, 不可以重新再用 该 blockid 打开另一个 RecordWriter 写入数据 一个 block 大小上限 100GB, 建议大于 64M 的数据 DownloadSession 接口定义 : public class DownloadSession { 4

DownloadSession(Configuration conf, String projectname, String tablename, String partitionspec) throws TunnelException DownloadSession(Configuration conf, String projectname, String tablename, String partitionspec, String downloadid) throws TunnelException public String getid() public long getrecordcount() public TableSchema getschema() public DownloadSession.Status getstatus() public RecordReader openrecordreader(long start, long count) public RecordReader openrecordreader(long start, long count, boolean compress) Download 对象 : 生命周期 : 从创建 Download 实例到下载结束 创建 Download 实例, 可以通过调用构造方法创建, 也可以通过 TableTunnel 创建 ; 下载数据 : 查看下载 : 4 种状态说明 : 请求方式 : 同步 Server 端会为该 Download 创建一个 session, 生成唯一 downloadid 标识该 Download, 客 户端可以通过 getid 获取 该操作开销较大,server 端会对数据文件创建索引, 当文件数很多时, 该时间会比较长 ; 同时 server 端会返回总 Record 数, 可以根据总 Record 数启动多个并发同时下载 请求方式 : 异步 调用 openrecordreader 方法, 生成 RecordReader 实例, 其中参数 start 标识本次下载的 record 的起始位置, 从 0 开始, 取值范围是 >= 0, count 标识本次下载的记录数, 取值范围 是 >0 请求方式 : 同步 调用 getstatus 可以获取当前 Download 状态 UNKNOWN, server 端刚创建一个 session 时设置的初始值 NORMAL, 创建 Download 对象成功 CLOSED, 下载结束后 EXPIRED, 下载超时 SDK 示例 示例概要 ODPS 提供了两个服务地址供用户选择 Tunnel 服务地址的选择会直接影响用户上传数据的效率及计 量计费 详细说明请参考 Tunnel SDK 简介 不同版本 SDK 会有不同, 本示例仅供参考 请用户注意版本变更 简单上传示例 import java.io.ioexception; import java.util.date; 5

import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordwriter; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tunnelexception; import com.aliyun.odps.tunnel.tabletunnel.uploadsession; public class UploadSample { private static String accessid = "<your access id>"; private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "http://service.odps.aliyun.com/api"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); try { TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); UploadSession uploadsession = tunnel.createuploadsession(project, table, partitionspec); System.out.println("Session Status is : " + uploadsession.getstatus().tostring()); TableSchema schema = uploadsession.getschema(); RecordWriter recordwriter = uploadsession.openrecordwriter(0); Record record = uploadsession.newrecord(); for (int i = 0; i < schema.getcolumns().size(); i++) { Column column = schema.getcolumn(i); switch (column.gettype()) { case BIGINT: record.setbigint(i, 1L); case BOOLEAN: record.setboolean(i, true); case DATETIME: record.setdatetime(i, new Date()); case DOUBLE: record.setdouble(i, 0.0); 6

case STRING: record.setstring(i, "sample"); default: throw new RuntimeException("Unknown column type: " + column.gettype()); for (int i = 0; i < 10; i++) { recordwriter.write(record); recordwriter.close(); uploadsession.commit(new Long[]{0L); System.out.println("upload success!"); catch (TunnelException e) { catch (IOException e) { 构造器举例说明 : PartitionSpec(String spec): 通过字符串构造此类对象 参数 : spec: 分区定义字符串, 比如 : pt='1',ds='2' 因此程序中应该这样配置 :private static String partition = "pt='xxx',ds='xxx'"; 注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的 endpoint 路由 如果不指定, 支持自动路由 简单下载示例 import java.io.ioexception; import java.util.date; import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordreader; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tabletunnel.downloadsession; import com.aliyun.odps.tunnel.tunnelexception; public class DownloadSample { private static String accessid = "<your access id>"; 7

private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "http://service.odps.aliyun.com/api"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); try { DownloadSession downloadsession = tunnel.createdownloadsession(project, table, partitionspec); System.out.println("Session Status is : " + downloadsession.getstatus().tostring()); long count = downloadsession.getrecordcount(); System.out.println("RecordCount is: " + count); RecordReader recordreader = downloadsession.openrecordreader(0, count); Record record; while ((record = recordreader.read())!= null) { consumerecord(record, downloadsession.getschema()); recordreader.close(); catch (TunnelException e) { catch (IOException e1) { e1.printstacktrace(); private static void consumerecord(record record, TableSchema schema) { for (int i = 0; i < schema.getcolumns().size(); i++) { Column column = schema.getcolumn(i); String colvalue = null; switch (column.gettype()) { case BIGINT: { Long v = record.getbigint(i); case BOOLEAN: { Boolean v = record.getboolean(i); 8

case DATETIME: { Date v = record.getdatetime(i); case DOUBLE: { Double v = record.getdouble(i); case STRING: { String v = record.getstring(i); default: throw new RuntimeException("Unknown column type: " + column.gettype()); System.out.print(colValue == null? "null" : colvalue); if (i!= schema.getcolumns().size()) System.out.print("\t"); System.out.println(); 注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的 endpoint 路由 如果不指定, 支持自动路由 多线程上传示例 import java.io.ioexception; import java.util.arraylist; import java.util.date; import java.util.concurrent.callable; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordwriter; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tunnelexception; import com.aliyun.odps.tunnel.tabletunnel.uploadsession; class UploadThread implements Callable<Boolean> { private long id; 9

private RecordWriter recordwriter; private Record record; private TableSchema tableschema; public UploadThread(long id, RecordWriter recordwriter, Record record, TableSchema tableschema) { this.id = id; this.recordwriter = recordwriter; this.record = record; this.tableschema = tableschema; @Override public Boolean call() { for (int i = 0; i < tableschema.getcolumns().size(); i++) { Column column = tableschema.getcolumn(i); switch (column.gettype()) { case BIGINT: record.setbigint(i, 1L); case BOOLEAN: record.setboolean(i, true); case DATETIME: record.setdatetime(i, new Date()); case DOUBLE: record.setdouble(i, 0.0); case STRING: record.setstring(i, "sample"); default: throw new RuntimeException("Unknown column type: " + column.gettype()); for (int i = 0; i < 10; i++) { try { recordwriter.write(record); catch (IOException e) { recordwriter.close(); return false; recordwriter.close(); return true; public class UploadThreadSample { private static String accessid = "<your access id>"; 10

private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "<http://service.odps.aliyun.com/api>"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; private static int threadnum = 10; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); try { TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); UploadSession uploadsession = tunnel.createuploadsession(project, table, partitionspec); System.out.println("Session Status is : " + uploadsession.getstatus().tostring()); ExecutorService pool = Executors.newFixedThreadPool(threadNum); ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>(); for (int i = 0; i < threadnum; i++) { RecordWriter recordwriter = uploadsession.openrecordwriter(i); Record record = uploadsession.newrecord(); callers.add(new UploadThread(i, recordwriter, record, uploadsession.getschema())); pool.invokeall(callers); pool.shutdown(); Long[] blocklist = new Long[threadNum]; for (int i = 0; i < threadnum; i++) blocklist[i] = Long.valueOf(i); uploadsession.commit(blocklist); System.out.println("upload success!"); catch (TunnelException e) { catch (IOException e) { catch (InterruptedException e) { 11

注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的 endpoint 路由 如果不指定, 支持自动路由 多线程下载示例 import java.io.ioexception; import java.util.arraylist; import java.util.date; import java.util.list; import java.util.concurrent.callable; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.future; import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordreader; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tabletunnel.downloadsession; import com.aliyun.odps.tunnel.tunnelexception; class DownloadThread implements Callable<Long> { private long id; private RecordReader recordreader; private TableSchema tableschema; public DownloadThread(int id, RecordReader recordreader, TableSchema tableschema) { this.id = id; this.recordreader = recordreader; this.tableschema = tableschema; @Override public Long call() { Long recordnum = 0L; try { Record record; while ((record = recordreader.read())!= null) { recordnum++; System.out.print("Thread " + id + "\t"); consumerecord(record, tableschema); recordreader.close(); catch (IOException e) { return recordnum; 12

private static void consumerecord(record record, TableSchema schema) { for (int i = 0; i < schema.getcolumns().size(); i++) { Column column = schema.getcolumn(i); String colvalue = null; switch (column.gettype()) { case BIGINT: { Long v = record.getbigint(i); case BOOLEAN: { Boolean v = record.getboolean(i); case DATETIME: { Date v = record.getdatetime(i); case DOUBLE: { Double v = record.getdouble(i); case STRING: { String v = record.getstring(i); default: throw new RuntimeException("Unknown column type: " + column.gettype()); System.out.print(colValue == null? "null" : colvalue); if (i!= schema.getcolumns().size()) System.out.print("\t"); System.out.println(); public class DownloadThreadSample { private static String accessid = "<your access id>"; private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "http://service.odps.aliyun.com/api"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; 13

private static int threadnum = 10; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); DownloadSession downloadsession; try { downloadsession = tunnel.createdownloadsession(project, table, partitionspec); System.out.println("Session Status is : " + downloadsession.getstatus().tostring()); long count = downloadsession.getrecordcount(); System.out.println("RecordCount is: " + count); ExecutorService pool = Executors.newFixedThreadPool(threadNum); ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>(); long start = 0; long step = count / threadnum; for (int i = 0; i < threadnum - 1; i++) { RecordReader recordreader = downloadsession.openrecordreader( step * i, step); callers.add(new DownloadThread( i, recordreader, downloadsession.getschema())); RecordReader recordreader = downloadsession.openrecordreader(step * (threadnum - 1), count - ((threadnum - 1) * step)); callers.add(new DownloadThread( threadnum - 1, recordreader, downloadsession.getschema())); Long downloadnum = 0L; List<Future<Long>> recordnum = pool.invokeall(callers); for (Future<Long> num : recordnum) downloadnum += num.get(); System.out.println("Record Count is: " + downloadnum); pool.shutdown(); catch (TunnelException e) { catch (IOException e) { catch (InterruptedException e) { catch (ExecutionException e) { 14

注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的下载 如果不指定, 按照我 们的自动路由下载 15