Apache Spark 与 多 数 据 源 的 结 合 田 毅 @
目 录 为 什 么 会 用 到 多 个 数 据 源 Spark 的 多 数 据 源 方 案 有 哪 些 已 有 的 数 据 源 支 持 Spark 在 GrowingIO 的 实 践 分 享
为 什 么 会 用 到 多 个 数 据 源 从 数 据 本 身 来 看 大 数 据 的 特 性 之 一 :Variety 数 据 的 多 样 性 结 构 化 数 据 与 非 结 构 化 数 据 实 时 数 据 与 离 线 数 据 五 花 八 门 的 数 据 格 式
为 什 么 会 用 到 多 个 数 据 源 从 业 务 需 求 来 看 不 同 的 业 务 场 景 决 定 了 数 据 需 要 被 存 储 到 多 个 地 方 数 据 挖 掘 实 时 查 询 多 维 分 析 查 询 聚 合 汇 总 统 计 实 时 更 新
为 什 么 会 用 到 多 个 数 据 源 从 软 件 的 发 展 来 看 越 来 越 多 的 针 对 某 一 个 细 分 领 域 的 软 件 技 术 不 断 产 生 NoSQL:HBase,Cassandra,MongoDB Storage:HDFS,Tachyon Search:ElasticSearch, Lucene MPP: Teradata,GreenPlum, Aster,Gbase QueryEngine:Hive,Phoenix Apache 基 金 会 下 的 软 件 中 : BigData:30 个 Database:25 个
常 见 的 大 数 据 系 统 的 架 构 应 用 计 算 引 擎 数 据 存 储 ETL 系 统 数 据 A 数 据 B 数 据 C 应 用 计 算 引 擎 A 数 据 存 储 A 计 算 引 擎 B 数 据 存 储 B 计 算 引 擎 C 数 据 存 储 C ETL 系 统 数 据 A 数 据 B 数 据 C
数 据 存 储 间 如 何 相 互 访 问 方 案 1: 冗 余 存 储 方 案 2: 集 中 计 算 问 题 : 1 数 据 一 致 性 2 存 储 成 本 问 题 : 1 数 据 类 型 转 换 2 数 据 处 理 效 率 存 储 1 存 储 2 存 储 3 集 中 计 算 存 储 1 存 储 2 存 储 3 数 据 数 据 数 据 数 据
Spark 的 多 数 据 源 方 案 Spark 在 1.2.0 版 本 中 首 次 发 布 了 Data Sources API 这 套 API 主 要 提 供 了 一 种 快 速 灵 活 的 方 法 为 Spark 提 供 访 问 外 部 数 据 源 的 功 能 主 要 目 标 是 让 Spark 各 个 组 件 以 及 外 部 应 用 可 以 方 便 高 效 的 读 写 外 部 数 据
Spark DataSource API 问 题 1: 关 于 数 据 类 型 转 换 Spark SQL 中 的 一 套 函 数 式 关 系 查 询 优 化 框 架 Catalyst Catalyst 提 供 了 一 整 套 数 据 类 型 的 定 义 各 个 数 据 源 实 现 各 自 数 据 类 型 和 Catalyst 的 转 换
Spark DataSource API 问 题 2: 关 于 数 据 处 理 效 率 Spark 提 供 了 一 个 灵 活 的 api 来 兼 容 不 同 类 型 的 DataSource 包 括 : 支 持 全 量 扫 描 支 持 列 剪 枝 支 持 列 剪 枝 + 过 滤 机 制 支 持 数 据 插 入
在 Spark DataSource API 之 前
在 Spark DataSource API 之 后
可 以 使 用 的 Spark DataSource Spark 自 带 JDBC Mysql PostgreSQL HadoopFs Parquet JSON Orc Spark Packages 网 站 Avro CSV RedShift MongoDB Cassandra ElasticSearch 还 有 其 他 软 件 内 置 的 一 些 DataSource Apache Phoenix
JDBC 支 持 列 剪 枝 支 持 条 件 过 滤 支 持 数 据 插 入 sqlcontext.read.jdbc( jdbc:postgresql://testhost:7531/testdb, testtable, idfield, ------- 索 引 列 10000, ------- 起 始 index 1000000, ------- 结 束 index 10, -------partition 数 量 new Properties ).registertemptable("testtable") Driver Executor Executor Executor JDBC Server 适 合 场 景 : 配 置 数 据 的 加 载, 处 理 问 题 : 随 着 数 据 量 增 加, 对 JDBC Server 的 压 力 骤 增
HadoopFs 获 取 文 件 列 表, 生 成 多 个 Task 支 持 列 剪 枝 支 持 条 件 过 滤 支 持 数 据 插 入 已 经 支 持 Parquet,JSON, sqlcontext.read.parquet("hdfs://testfs/testpath").registertemptable("test") Driver Executor Executor Executor HDFS 适 合 场 景 : 离 线 处 理 中 输 入 输 出 数 据, 临 时 数 据 问 题 : 实 时 场 景 下, 无 法 增 量 写 入 数 据, 连 续 写 入 会 产 生 大 量 碎 片 文 件
https://github.com/elastic/elasticsearch-hadoop 支 持 列 剪 枝 支 持 条 件 过 滤 支 持 数 据 插 入 EsSparkSQL.esDF(hc,indexName,esQuery).registerTempTable( testtable ) ElasticSearch Driver 获 取 Query 需 要 的 Nodes 和 Shards Executor Executor Executor 数 据 : 单 条 20K 主 机 :16C 32G 性 能 :40k~50k 条 / 秒 / 每 台 ES Node ES Node ES Node 适 合 场 景 : 存 储 doc 数 据, 随 机 数 据 搜 索 问 题 : 原 生 程 序 使 用 HTTP 方 式 进 行 数 据 加 载, 吞 吐 量 很 低 需 要 修 改 为 Traffic 方 式
Apache Phoenix https://github.com/apache/phoenix 支 持 列 剪 枝 支 持 条 件 过 滤 支 持 数 据 插 入 sqlcontext.read.format("org.apache.phoenix.spark").options(map( table -> table, zkurl -> zookeeperurl)).load.registertemptable( testtable ) Driver 获 取 执 行 计 划, 分 解 成 多 个 Task Executor Executor Executor 数 据 : 单 条 100b 5 字 段 主 机 :8C 16G 性 能 :80k-100k 条 / 秒 / 每 台 数 据 : 单 条 2K 20 字 段 主 机 :8C 16G 性 能 :30k-50k 条 / 秒 / 每 台 Region 适 合 场 景 : Server 实 时 处 理 中 输 入 数 据, 统 计 结 果 数 据 问 题 : 需 要 根 据 需 求 仔 细 设 计 Schema Region Server Region Server
MongoDB https://github.com/stratio/spark-mongodb 支 持 列 剪 枝 支 持 条 件 过 滤 支 持 数 据 插 入 sqlcontext.read.format( com.stratio.datasource.mongodb ).options(options).load.registertemptable( testtable ) Driver 获 取 Target 的 Sahrds Executor Executor Executor MongoDB Shard MongoDB Shard MongoDB Shard
Cassantra https://github.com/datastax/spark-cassandra-connector 支 持 列 剪 枝 支 持 条 件 过 滤 支 持 数 据 插 入 CassandraSQLContext val cscontext = new CassandraSQLContext(sc) sccontext. cassandrasql( xxxxxx ) Datastax 官 方 推 出 的 DataSource
我 们 的 公 司 GrowingIO 用 我 们 的 产 品 帮 助 企 业 大 规 模 驱 动 业 务 增 长 大 幅 度 增 加 分 析 效 率 用 数 据 增 长 营 收 智 能 数 据 标 记 Data 商 业 运 营 智 能 Intelligence
整 体 架 构 GrowingIO 的 Spark 实 践 Web UI Query Service HBase 离 线 应 用 Elastic Search 实 时 应 用 Kafka
GrowingIO 的 Spark 实 践 实 时 计 算 Kafka Json Table JDBC Table Spark Streaming 业 务 逻 辑 SQL HBase Table ES Table HBase Elastic Search PostgreSQL 配 置 库
GrowingIO 的 Spark 实 践 离 线 计 算 HBase HBase Table ES Table Spark Server 业 务 逻 辑 SQL HBase Table Hive Table HBase Elastic Search HDFS PostgreSQL 配 置 库
坑 1 Elastic Search Elastic Search 数 据 查 询 当 Mapping 数 据 的 列 大 于 Source 中 的 列 时, 报 Index Out of Bound Exception 修 改 RowValueReader 的 addtobuffer 方 法 Elastic 数 据 加 载 默 认 通 过 HTTP 的 接 口 加 载 数 据, 性 能 极 差 修 改 为 Transport 方 式 加 载 使 得 性 能 提 升 2-3 倍 性 能 优 化 需 要 详 细 设 计 Index, 尽 量 减 少 每 次 查 询 的 数 据 量
坑 2 Phoenix Spark 1.5 支 持 DecimalType GenericMutableRow => InternalRow PHOENIX-2279 Limit 与 Union 相 关 的 BUG Maven 中 Hadoop 版 本 兼 容 性 Region Split 导 致 缓 存 中 的 Region 信 息 失 效 Phoenix JDBC Driver has been closed(yarn 资 源 控 制 ) 读 取 数 据 时 Partition 数 量 过 少 导 致 读 取 速 度 慢
GrowingIO 的 Spark 实 践 总 结 : 使 用 Data Source API 带 来 好 处 存 储 成 本 降 低 : 数 据 只 存 储 一 份 开 发 成 本 降 低 : 无 需 关 心 外 部 数 据 源 的 连 接 代 码, 直 接 开 发 业 务 逻 辑 使 用 Data Source API 需 要 注 意 的 地 方 外 部 数 据 源 设 计 优 化 Elastic Search 的 Index 设 计 HBase 的 Schema 设 计 (Phoenix Bucket) 外 部 数 据 源 的 性 能 瓶 颈 执 行 顺 序 Cache 外 部 DataSource 的 BUG
GrowingIO 的 Spark 实 践 使 用 Spark Streaming Kafka Direct API 模 式 数 据 分 布 不 均 的 问 题 Receiver 模 式 在 计 算 量 较 大 时 需 要 通 过 shuffle 将 数 据 分 发 到 多 个 节 点 计 算 占 用 Yarn 资 源 的 问 题 Receiver 模 式 需 要 消 耗 Yarn 的 计 算 资 源 人 工 控 制 offset 根 据 应 用 场 景 手 工 控 制 offset 更 加 简 单 和 安 全 下 游 系 统 采 用 容 忍 重 复 的 设 计 原 则 保 证 Streaming 程 序 可 以 随 时 重 启 运 行
GrowingIO 的 Spark 实 践 优 化 Spark Streaming 的 调 度 机 制 原 有 调 度 机 制 下 的 问 题 定 时 器 定 时 创 建 job, 无 视 当 前 是 否 有 Job 正 在 运 行 所 有 的 Job 进 行 统 一 的 流 量 控 制 当 限 速 较 大 时, 会 发 生 Job 的 积 压 当 限 速 较 小 时, 处 理 性 能 又 不 高 优 化 后 的 调 度 机 制 JobScheduler 中 每 次 创 建 Job 后, 等 待 Job 结 束 后 再 创 建 下 一 个 Job 优 化 后 不 会 再 有 Job 的 积 压, 限 速 可 以 设 置 的 相 对 激 进 一 些 推 荐 的 限 速 设 置 : 最 大 处 理 能 力 的 80% Spark 1.5 中 的 BackPressure
GrowingIO 的 Spark 实 践 Spark Server 多 个 离 线 应 用 共 享 资 源 支 持 任 务 优 先 级 设 定 Spark App1 Spark Server Spark App2 Spark App3 S D K Spark Context Spark Cluster Spark AppN
感 谢 聆 听