Kudu 介绍及实践 小米张震
纲 Kudu 介绍 Kudu 与 Spark 的整合 小米实践
存储系统的现状 实时更新 HBase OLTP 随机读写 数据更新的频次 频繁更新 追加 只读 HDFS OLAP 批量扫描 归档 离线查询 较频繁的查询 查询的频次 实时访问
Kudu 的设计目标 实时更新 HBase OLTP 随机读写 数据更新的频次 频繁更新 追加 只读 HDFS OLAP 批量扫描 Kudu Fast Analytics on Fast Data 归档 离线查询 较频繁的查询 查询的频次 实时访问
Kudu 的设计目标 性能 : 快速的批量扫描 2x Parquet 低延迟的随机读写 1ms on SSD 可扩展 : 400 nodes,1000s of nodes low MTTR 关系型数据模型 : 强 schema, 有限列, 不支持 BLOBs NoSQL APIs (insert/update/delete/scan), Java/C++ Client 事务支持 : 单 的 ACID 支持 与 Hadoop 态的集成 : Flume/Impala/Spark
Kudu 的数据模型 有限固定列 强类型 CREATE TABLE sales_by_year( year INT, sale_id INT COLPROPERTIES (encoding= bitshuffle ), amount INT, PRIMARY KEY (year, sale_id) ) PARTITION BY HASH (sale_id) PARTITIONS 4, RANGE (year) ( PARTITION 2014 <= VALUES <= 2016, PARTITION VALUE = 2017 ) STORED AS KUDU TBLPROPERTIES (replication=3); 列存储 每 列均可以设置 encoding 及压缩 式 主键索引 范围分区和哈希分区 多副本
分区和 Tablet Range (year) Hash (id) PARTITION BY HASH (id) PARTITIONS 4, RANGE (year) ( PARTITION VALUE = 2014 PARTITION VALUE = 2015 PARTITION VALUE = 2016 )
多备份 Range (year) Tablet Server Tablet 10 Tablet 3 Tablet Server Tablet 10 Tablet 7 Hash (id) Tablet Server Tablet 3 Tablet 2 Tablet Server Tablet10 Tablet 7
多备份 & raft 协议 Client 1. Client 发送写请求 6. 回复 Client 写成功 Tablet Server A Tablet 10 Leader 2. 写 WAL WAL 5. Leader 得知 多数已经写成功 2. Leader 要求 Follower 进 备份 Tablet Server A 3. 写 WAL 4. Follower 备份成功后返回状态给 Leader Tablet 10 Follower WAL Tablet Server A Tablet 10 Follower 3. 写 WAL WAL
多备份 & 容错 多备份 : 强 致, 低延迟 允许从 Follower 进 Snapshot 读 临时失败 (5 分钟内 ): Leader 临时失败 - 自动选主, 概在 5s 内完成 Follower 临时失败 - 少数的 Follower 失败不会影响读写 永久失败 ( 超过 5 分钟 ): 踢出 Raft Config,Master 会重新选择 台 TabletServer 部署新的 Replica
整体架构
Master 特殊的元表 : 表结构为 :type int8, id int64, meta string type 为 table 或 tablet id 为 table id 或 tablet id meta 为 protobuf 序列化后的元数据 table 元数据 schema,partition schema, 包含哪些 tablets tablet 元数据 replica locations tablet state Coordinator: 接收 TabletServer 的 跳, 跟踪集群状态 create/alter/drop table 对 under-replicated 的 tablet 重新分配 replica
Tablet1 Tablet2 Tablet3 insert msg= foo Client timestamp Master 我想插 数据 msg= foo, 应该发请求给谁? msg= foo 应该插 到 Tablet1, 分布在 WXY, 另外你可能对 Tablet2 和 Tablet3 也感兴趣
Tablet 类 LSM(HBase): 每个 Tablet 包含 个 MemRowSet 和多个 DiskRowSet; 新插 的数据存储在 MemRowSet 中, 定期 flush 成 DiskRowSet 不同于 LSM, 每 个 Row 只存在于 个 RowSet 中 RowSet RowSet 内有序,RowSet 间 序, 不同 RowSet 所包含的 key range 允许重叠 MemRowSet 内部使用 B 树 存储,DiskRowSet 使用列存储 Tablet Server 内存 Tablet MemRowSet 磁盘 DiskRowSet [1, 2,, 10] DiskRowSet [7, 8,, 15] DiskRowSet [13, 14,, 20] DiskRowSet [18, 24,, 30] DiskRowSet [25, 29,, 40]
MVCC MVCC 对每 都保存多个版本 MVCC 所提供的能 Snapshot 读 历史读 增量备份 同 个 Tablet 内多 写的原 性 个写操作的执 步骤 leader 收到写请求, 获得要写的 锁 为写操作分配 timestamp 通过 raft 协议备份写操作 真正对 数据做更改 更改 timestamp 为 committed, 对外可见
MVCC & 数据存储 Insert: 所有新插 的数据均存储在 MemRowSet 中 插 前使用 interval tree 和 bloom filter 检查是否已存在 Update/Delete: 对 MemRowSet 中的数据更新,Flush 后形成 Undo Records 对 DiskRowSet 中的数据更新, 存储在 DeltaMemStore 中,Flush 后形成 Redo Records Tablet Server 内存 Tablet MemRowSet DeltaMemStore 磁盘 WAL DiskRowSet (BaseData) Undo Redo Redo MemRowSet Flush DeltaMemStoreFlush
Flush 示例 insert value = 1 set value = 2 MemRowSet delete insert value = 4 MemRowSet Flush insert key= a, value=1 // t1 update set value=2 where key= a // t2 delete where key= a // t3 insert key= a, value=4 // t4 // MemRowSet Flush update set value=5 where key= a // t5 Undo Records Base Data delete set value = 1 insert value = 2 delete value = 4 DeltaMemStore Flush Undo Records Base Data Redo Records delete set value = 1 insert value = 2 delete value = 4 value = 5 t0 null t1 val =1 t2 val =2 t3 null t4 val =4 时间
Delta Compaction Minor Compaction: 减少 Redo 件的数量, 增加读效率 Major Compaction: 只读 Base Data, 节省 apply delta 的操作 Undo Base Data Redo Redo Minor Compaction Undo Base Data new Redo Major Compaction new Undo new Base Data
DiskRowSet Compaction 减少 DiskRowSet 间的重叠情况 写操作时减少需要查询的 DiskRowSet 读操作读取更少的 件, 增加读效率 后台线程在空闲时做小规模 Compaction 在设计表的 schema 时, 需要考虑尽量进 顺序写, 以减少后期 Compaction 的操作 不要使用 id 做为主键的第 列 Compaction 前 (1, a ) (3, c ) (2, b ) (5, e ) (4, d ) (7, g ) (6, f ) (8, h ) Compaction 后 (1, a ) (2, b ) (3, c ) (4, d ) (5, e ) (6, f ) (7, g ) (8, h ) Key Range
Integration with Spark Kudu 为 Spark 带来了什么? 实时数据的快速分析 谓词下推, 加速查询 基于主键索引的快速查询 Update 和 Delete 的支持 Spark 为 Kudu 带来了什么? 更简单的数据操作 式
NoSQL API Write KuduTable table = client.opentable( my_table ); KuduSession session = client.newsession(); Insert ins = table.newinsert(); ins.getrow().addstring( metric, load-avg.1sec ); ins.getrow().adddouble( value, 0.05); session.apply(ins); session.flush(); Read KuduScanner scanner = client.newscannerbuilder(table).setprojectedcolumnnames(lists.of( value )).build(); while (scanner.hasmorerows()) { RowResultIterator batch = scanner.nextrows(); while (batch.hasnext()) { RowResult result = batch.next(); System.out.println(result.getDouble( value )); } }
Spark DataFrame // spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.2.0 // Import kudu datasource import org.kududb.spark.kudu._ val kududataframe = sqlcontext.read.options( Map("kudu.master" -> "master1,master2,master3", "kudu.table" -> "my_table_name")).kudu // Then query using Spark data frame API kududataframe.select("id").filter("id" >= 5).show() // (prints the selection to the console) // Or register kududataframe as a table and use Spark SQL kududataframe.registertemptable("my_table") sqlcontext.sql("select id from my_table where id >= 5").show() // (prints the sql results to the console)
KuduRDD 的实现 getpartitions 根据下推的 predicate 获得需要访问的 tablet list 每个 tablet 均包装为一个 Partition getpreferredlocations 每个 tablet 的 replica locations execute 新建 KuduScanner 进行数据扫描
SparkSQL on Kudu 目标 : 和 SparkSQL on HDFS 提供 致的对外接 ( 数据库 / 表 /SQL) 更好的利用 Kudu 提供的能, 如 update/delete 增加内部需要的功能, 例如动态分区管理, 权限控制等 实现 : 修改 SparkSQL 语法 增加 KuduExternalCatalog, 直接使用 KuduClient 获得表的元数据信息 新建 Kudu 系统表, 存储内部功能需要的其他元信息
SparkSQL on Kudu // 创建表 CREATE TABLE kudu_test ( year INT WITH (key=true), id INT WITH (key=true), msg STRING ) PARTITIONED BY HASH (id) INTO 10 BUCKETS, RANGE (year) RANGE BETWEEN ((2015),(2016)); // 增删分区 ALTER TABLE kudu_test ADD PARTITION ((2016), (2017)); ALTER TABLE kudu_test DROP PARTITION ((2015), (2016)); // 展示 Range 分区 SHOW PARTITIONS kudu_test; // Update/Delete UPDATE kudu_test SET msg= b WHERE year=2015 AND id=1; DELETE FROM kudu_test WHERE year=2015 AND id=1;
Twitter 实时数据 13M 数据 当前时间 最新记录 Adhoc 查询
小米的实时数据仓库 Kudu 数据源 消息队列 Spark Streaming SparkSQL HDFS
限制 & Roadmap 限制 : 没有自增主键 不支持嵌套结构 监控 具不完善 Load balance Roadmap: 运维上, 增强运维 具的功能 ( 错误恢复, 问题诊断 ) 性能上, 和查询引擎的深度优化 (runtime filter), 优化批量导 的性能 功能上, 完善安全机制, 细粒度的权限管理, 和 Sentry 的整合
Kudu vs 流式计算 流式计算的 些局限 : 预计算固定的指标,Ad-hoc 查询能 较弱 复杂的查询难以支持 ( 表 join) 精度要求 (TopN,count(distinct)) 存在时间窗 容错状态保存, 依赖于外部存储, 难以开发
Kudu vs Druid 数据精度不同 : Druid 对数据进 预聚合, 不保存原始数据内部存储不同 : 均是列存储, 但 Druid 内部使用 bitmap 倒排表,Kudu 使用 bitshuffle 等 encoding 式应用场景不同 : Druid 更适合做在线指标的实时统计等 作,Kudu 适合实时的分析类查询查询 式不同 : Druid 有自 的查询引擎, 尚不支持 join;kudu 依靠 Impala/SparkSQL, 对 SQL 的支持更加完整查询性能不同 : Druid 查询的是 OLAP Cube, 速度更快 ;Kudu 需要扫描原始数据, 可支持的查询更灵活