开放数据处理服务 ODPS 批量数据通道
批量数据通道 SDK 介绍 概要 ODPS Tunnel 是 ODPS 的数据通道, 用户可以通过 Tunnel 向 ODPS 中上传或者下载数据 目前 Tunnel 仅支持表 ( 不包括视图 View) 数据的上传下载 ODPS 提供的数据上传下载工具即是基于 Tunnel SDK 编写的 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk-core" 获取不同版本的 Java SDK, 相关配置信息 : <dependency> <groupid>com.aliyun.odps</groupid> <artifactid>odps-sdk-core</artifactid> <version>0.20.7-public</version> </dependency> 这篇教程从用户的角度出发, 介绍 Tunnel SDK 的主要接口, 不同版本的 SDK 在使用上有差别, 准确信息以 SDK Java Doc 为准 主要接口 TableTunnel TableTunnel.UploadSession TableTunnel.DownloadSession 描述 访问 ODPS Tunnel 服务的入口类 用户可以通过公网或者阿里云内网环境对 ODPS 及其 Tunnel 进行访问 当用户在阿里云内网环境中, 使用 Tunnel 内网连接下载数据时,ODPS 不会将该操作产生的流量计入计费 此外内网地址仅对杭州域的云产品有效 表示一个向 ODPS 表中上传数据的会话 表示一个向 ODPS 表中下载数据的会话 备注 : 用户可以通过公网或者阿里云内网环境对 ODPS 及其 Tunnel 进行访问 当用户在阿里云内网环境中, 使用 Tunnel 内网连接下载数据时,ODPS 不会将该操作产生的流量计入计费 此外内网地址仅对杭州域的云产品有效 阿里云内网地址 : 1
ODPS 地址 :http://odps-ext.aliyun-inc.com/api 公网地址 : ODPS 地址 :http://service.odps.aliyun.com/api 对于 tunnel 地址, 用户无需配置, 支持根据 odps endpoint 来自动路由 关于 SDK 的更多详细信息请参阅 SDK Java Doc TableTunnel 接口定义 : public class TableTunnel { public DownloadSession createdownloadsession(string projectname, String tablename); public DownloadSession createdownloadsession(string projectname, String tablename, PartitionSpec partitionspe c); public UploadSession createuploadsession(string projectname, String tablename); public UploadSession createuploadsession(string projectname, String tablename, PartitionSpec partitionspec); public DownloadSession getdownloadsession(string projectname, String tablename, PartitionSpec partitionspec, String id); public DownloadSession getdownloadsession(string projectname, String tablename, String id); public UploadSession getuploadsession(string projectname, String tablename, PartitionSpec partitionspec, String id); public UploadSession getuploadsession(string projectname, String tablename, String id); public void setendpoint(string endpoint); TableTunnel: 生命周期 : 从 TableTunnel 实例被创建开始, 一直到程序结束 提供创建 Upload 对象和 Download 对象的方法 UploadSession 接口定义 : public class UploadSession { UploadSession(Configuration conf, String projectname, String tablename, String partitionspec) throws TunnelException; UploadSession(Configuration conf, String projectname, String tablename, String partitionspec, String uploadid) throws TunnelException; public void commit(long[] blocks); public Long[] getblocklist(); 2
public String getid(); public TableSchema getschema(); public UploadSession.Status getstatus(); public Record newrecord(); public RecordWriter openrecordwriter(long blockid); public RecordWriter openrecordwriter(long blockid, boolean compress); Upload 对象 : 生命周期 : 从创建 Upload 实例到结束上传 创建 Upload 实例, 可以通过调用构造方法创建, 也可以通过 TableTunnel 创建 ; 请求方式 : 同步 Server 端会为该 Upload 创建一个 session, 生成唯一 uploadid 标识该 Upload, 客户端可以 通过 getid 获取 上传数据 : 请求方式 : 异步 调用 openrecordwriter 方法, 生成 RecordWriter 实例, 其中参数 blockid 用于标识此次上 传的数据, 也描述了数据在整个表中的位置, 取值范围 :[0,20000], 当数据上传失败, 可 以根据 blockid 重新上传 查看上传 : 请求方式 : 同步 调用 getstatus 可以获取当前 Upload 状态 调用 getblocklist 可以获取成功上传的 blockid list, 可以和上传的 blockid list 对比, 对失 败的 blockid 重新上传 3
结束上传 : 请求方式 : 同步 调用 commit(long[] blocks) 方法, 参数 blocks 列表表示已经成功上传的 block 列表,server 端会对该列表进行验证 该功能是加强对数据正确性的校验, 如果提供的 block 列表与 server 端存在的 block 列表不一 致抛出异常 Commit 失败可以进行重试 7 种状态说明 : UNKNOWN, server 端刚创建一个 session 时设置的初始值 NORMAL, 创建 upload 对象成功 CLOSING, 当调用 complete 方法 ( 结束上传 ) 时, 服务端会先把状态置为 CLOSING CLOSED, 完成结束上传 ( 即把数据移动到结果表所在目录 ) 后 EXPIRED, 上传超时 CRITICAL, 服务出错 注意 : 同一个 UploadSession 里的 blockid 不能重复 也就是说, 对于同一个 UploadSession, 用一个 blockid 打开 RecordWriter, 写入一批数据后, 调用 close, 然后再 commit 完成后, 不可以重新再用 该 blockid 打开另一个 RecordWriter 写入数据 一个 block 大小上限 100GB, 建议大于 64M 的数据 DownloadSession 接口定义 : public class DownloadSession { 4
DownloadSession(Configuration conf, String projectname, String tablename, String partitionspec) throws TunnelException DownloadSession(Configuration conf, String projectname, String tablename, String partitionspec, String downloadid) throws TunnelException public String getid() public long getrecordcount() public TableSchema getschema() public DownloadSession.Status getstatus() public RecordReader openrecordreader(long start, long count) public RecordReader openrecordreader(long start, long count, boolean compress) Download 对象 : 生命周期 : 从创建 Download 实例到下载结束 创建 Download 实例, 可以通过调用构造方法创建, 也可以通过 TableTunnel 创建 ; 下载数据 : 查看下载 : 4 种状态说明 : 请求方式 : 同步 Server 端会为该 Download 创建一个 session, 生成唯一 downloadid 标识该 Download, 客 户端可以通过 getid 获取 该操作开销较大,server 端会对数据文件创建索引, 当文件数很多时, 该时间会比较长 ; 同时 server 端会返回总 Record 数, 可以根据总 Record 数启动多个并发同时下载 请求方式 : 异步 调用 openrecordreader 方法, 生成 RecordReader 实例, 其中参数 start 标识本次下载的 record 的起始位置, 从 0 开始, 取值范围是 >= 0, count 标识本次下载的记录数, 取值范围 是 >0 请求方式 : 同步 调用 getstatus 可以获取当前 Download 状态 UNKNOWN, server 端刚创建一个 session 时设置的初始值 NORMAL, 创建 Download 对象成功 CLOSED, 下载结束后 EXPIRED, 下载超时 SDK 示例 示例概要 ODPS 提供了两个服务地址供用户选择 Tunnel 服务地址的选择会直接影响用户上传数据的效率及计 量计费 详细说明请参考 Tunnel SDK 简介 不同版本 SDK 会有不同, 本示例仅供参考 请用户注意版本变更 简单上传示例 import java.io.ioexception; import java.util.date; 5
import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordwriter; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tunnelexception; import com.aliyun.odps.tunnel.tabletunnel.uploadsession; public class UploadSample { private static String accessid = "<your access id>"; private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "http://service.odps.aliyun.com/api"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); try { TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); UploadSession uploadsession = tunnel.createuploadsession(project, table, partitionspec); System.out.println("Session Status is : " + uploadsession.getstatus().tostring()); TableSchema schema = uploadsession.getschema(); RecordWriter recordwriter = uploadsession.openrecordwriter(0); Record record = uploadsession.newrecord(); for (int i = 0; i < schema.getcolumns().size(); i++) { Column column = schema.getcolumn(i); switch (column.gettype()) { case BIGINT: record.setbigint(i, 1L); case BOOLEAN: record.setboolean(i, true); case DATETIME: record.setdatetime(i, new Date()); case DOUBLE: record.setdouble(i, 0.0); 6
case STRING: record.setstring(i, "sample"); default: throw new RuntimeException("Unknown column type: " + column.gettype()); for (int i = 0; i < 10; i++) { recordwriter.write(record); recordwriter.close(); uploadsession.commit(new Long[]{0L); System.out.println("upload success!"); catch (TunnelException e) { catch (IOException e) { 构造器举例说明 : PartitionSpec(String spec): 通过字符串构造此类对象 参数 : spec: 分区定义字符串, 比如 : pt='1',ds='2' 因此程序中应该这样配置 :private static String partition = "pt='xxx',ds='xxx'"; 注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的 endpoint 路由 如果不指定, 支持自动路由 简单下载示例 import java.io.ioexception; import java.util.date; import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordreader; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tabletunnel.downloadsession; import com.aliyun.odps.tunnel.tunnelexception; public class DownloadSample { private static String accessid = "<your access id>"; 7
private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "http://service.odps.aliyun.com/api"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); try { DownloadSession downloadsession = tunnel.createdownloadsession(project, table, partitionspec); System.out.println("Session Status is : " + downloadsession.getstatus().tostring()); long count = downloadsession.getrecordcount(); System.out.println("RecordCount is: " + count); RecordReader recordreader = downloadsession.openrecordreader(0, count); Record record; while ((record = recordreader.read())!= null) { consumerecord(record, downloadsession.getschema()); recordreader.close(); catch (TunnelException e) { catch (IOException e1) { e1.printstacktrace(); private static void consumerecord(record record, TableSchema schema) { for (int i = 0; i < schema.getcolumns().size(); i++) { Column column = schema.getcolumn(i); String colvalue = null; switch (column.gettype()) { case BIGINT: { Long v = record.getbigint(i); case BOOLEAN: { Boolean v = record.getboolean(i); 8
case DATETIME: { Date v = record.getdatetime(i); case DOUBLE: { Double v = record.getdouble(i); case STRING: { String v = record.getstring(i); default: throw new RuntimeException("Unknown column type: " + column.gettype()); System.out.print(colValue == null? "null" : colvalue); if (i!= schema.getcolumns().size()) System.out.print("\t"); System.out.println(); 注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的 endpoint 路由 如果不指定, 支持自动路由 多线程上传示例 import java.io.ioexception; import java.util.arraylist; import java.util.date; import java.util.concurrent.callable; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordwriter; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tunnelexception; import com.aliyun.odps.tunnel.tabletunnel.uploadsession; class UploadThread implements Callable<Boolean> { private long id; 9
private RecordWriter recordwriter; private Record record; private TableSchema tableschema; public UploadThread(long id, RecordWriter recordwriter, Record record, TableSchema tableschema) { this.id = id; this.recordwriter = recordwriter; this.record = record; this.tableschema = tableschema; @Override public Boolean call() { for (int i = 0; i < tableschema.getcolumns().size(); i++) { Column column = tableschema.getcolumn(i); switch (column.gettype()) { case BIGINT: record.setbigint(i, 1L); case BOOLEAN: record.setboolean(i, true); case DATETIME: record.setdatetime(i, new Date()); case DOUBLE: record.setdouble(i, 0.0); case STRING: record.setstring(i, "sample"); default: throw new RuntimeException("Unknown column type: " + column.gettype()); for (int i = 0; i < 10; i++) { try { recordwriter.write(record); catch (IOException e) { recordwriter.close(); return false; recordwriter.close(); return true; public class UploadThreadSample { private static String accessid = "<your access id>"; 10
private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "<http://service.odps.aliyun.com/api>"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; private static int threadnum = 10; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); try { TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); UploadSession uploadsession = tunnel.createuploadsession(project, table, partitionspec); System.out.println("Session Status is : " + uploadsession.getstatus().tostring()); ExecutorService pool = Executors.newFixedThreadPool(threadNum); ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>(); for (int i = 0; i < threadnum; i++) { RecordWriter recordwriter = uploadsession.openrecordwriter(i); Record record = uploadsession.newrecord(); callers.add(new UploadThread(i, recordwriter, record, uploadsession.getschema())); pool.invokeall(callers); pool.shutdown(); Long[] blocklist = new Long[threadNum]; for (int i = 0; i < threadnum; i++) blocklist[i] = Long.valueOf(i); uploadsession.commit(blocklist); System.out.println("upload success!"); catch (TunnelException e) { catch (IOException e) { catch (InterruptedException e) { 11
注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的 endpoint 路由 如果不指定, 支持自动路由 多线程下载示例 import java.io.ioexception; import java.util.arraylist; import java.util.date; import java.util.list; import java.util.concurrent.callable; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.future; import com.aliyun.odps.column; import com.aliyun.odps.odps; import com.aliyun.odps.partitionspec; import com.aliyun.odps.tableschema; import com.aliyun.odps.account.account; import com.aliyun.odps.account.aliyunaccount; import com.aliyun.odps.data.record; import com.aliyun.odps.data.recordreader; import com.aliyun.odps.tunnel.tabletunnel; import com.aliyun.odps.tunnel.tabletunnel.downloadsession; import com.aliyun.odps.tunnel.tunnelexception; class DownloadThread implements Callable<Long> { private long id; private RecordReader recordreader; private TableSchema tableschema; public DownloadThread(int id, RecordReader recordreader, TableSchema tableschema) { this.id = id; this.recordreader = recordreader; this.tableschema = tableschema; @Override public Long call() { Long recordnum = 0L; try { Record record; while ((record = recordreader.read())!= null) { recordnum++; System.out.print("Thread " + id + "\t"); consumerecord(record, tableschema); recordreader.close(); catch (IOException e) { return recordnum; 12
private static void consumerecord(record record, TableSchema schema) { for (int i = 0; i < schema.getcolumns().size(); i++) { Column column = schema.getcolumn(i); String colvalue = null; switch (column.gettype()) { case BIGINT: { Long v = record.getbigint(i); case BOOLEAN: { Boolean v = record.getboolean(i); case DATETIME: { Date v = record.getdatetime(i); case DOUBLE: { Double v = record.getdouble(i); case STRING: { String v = record.getstring(i); default: throw new RuntimeException("Unknown column type: " + column.gettype()); System.out.print(colValue == null? "null" : colvalue); if (i!= schema.getcolumns().size()) System.out.print("\t"); System.out.println(); public class DownloadThreadSample { private static String accessid = "<your access id>"; private static String accesskey = "<your access Key>"; private static String tunnelurl = "http://dt.odps.aliyun.com"; private static String odpsurl = "http://service.odps.aliyun.com/api"; private static String project = "<your project>"; private static String table = "<your table name>"; private static String partition = "<your partition spec>"; 13
private static int threadnum = 10; public static void main(string args[]) { Account account = new AliyunAccount(accessId, accesskey); Odps odps = new Odps(account); odps.setendpoint(odpsurl); odps.setdefaultproject(project); TableTunnel tunnel = new TableTunnel(odps); tunnel.setendpoint(tunnelurl); PartitionSpec partitionspec = new PartitionSpec(partition); DownloadSession downloadsession; try { downloadsession = tunnel.createdownloadsession(project, table, partitionspec); System.out.println("Session Status is : " + downloadsession.getstatus().tostring()); long count = downloadsession.getrecordcount(); System.out.println("RecordCount is: " + count); ExecutorService pool = Executors.newFixedThreadPool(threadNum); ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>(); long start = 0; long step = count / threadnum; for (int i = 0; i < threadnum - 1; i++) { RecordReader recordreader = downloadsession.openrecordreader( step * i, step); callers.add(new DownloadThread( i, recordreader, downloadsession.getschema())); RecordReader recordreader = downloadsession.openrecordreader(step * (threadnum - 1), count - ((threadnum - 1) * step)); callers.add(new DownloadThread( threadnum - 1, recordreader, downloadsession.getschema())); Long downloadnum = 0L; List<Future<Long>> recordnum = pool.invokeall(callers); for (Future<Long> num : recordnum) downloadnum += num.get(); System.out.println("Record Count is: " + downloadnum); pool.shutdown(); catch (TunnelException e) { catch (IOException e) { catch (InterruptedException e) { catch (ExecutionException e) { 14
注意 : 对于 tunnel endpoint, 支持指定或者不指定 如果指定, 按照指定的下载 如果不指定, 按照我 们的自动路由下载 15