Spark 编程基础 (Python 厦门大学计算机科学系版 ) 厦门大学计算机科学系林子雨 2019 版 ziyulin@xmu.edu.cn Spark 编程基础 (Python 版 ) 教材官网 : http://dblab.xmu.edu.cn/post/spark-python/ 温馨提示 : 编辑幻灯片母版, 可以修改每页 PPT 的厦大校徽和底部文字 第 5 章 Spark SQL (PPT 版本号 :2019 年春季学期 ) 林子雨厦门大学计算机科学系 扫一扫访问教材官网 E-mail: ziyulin@xmu.edu.cn 主页 : http://dblab.xmu.edu.cn/post/linziyu
课程教材 Spark 入门教程 (Python 版 ) http://dblab.xmu.edu.cn/blog/1709-2/ 纸质教材预期在 2019 年夏天上市销售 扫一扫访问在线教程 本书以 Python 作为开发 Spark 应用程序的编程语言, 系统介绍了 Spark 编程的基础知识 全书共 8 章, 内容包括大数据技术概述 Spark 的设计与运行原理 Spark 环境搭建和使用方法 RDD 编程 Spark SQL Spark Streaming Structured Streaming Spark MLlib 等 本书每个章节都安排了入门级的编程实践操作, 以便读者更好地学习和掌握 Spark 编程方法 本书官网免费提供了全套的在线教学资源, 包括讲义 PPT 习题 源代码 软件 数据集 授课视频 上机实验指南等
提纲 5.1 Spark SQL 简介 5.2 DataFrame 概述 5.3 DataFrame 的创建 5.4 DataFrame 的保存 5.5 DataFrame 的常用操作 5.6 从 RDD 转换得到 DataFrame 5.7 使用 Spark SQL 读写数据库 百度搜索厦门大学数据库实验室网站访问平台
5.1 Spark SQL 简介 5.1.1 从 Shark 说起 5.1.2 Spark SQL 设计 5.1.3 为什么推出 Spark SQL
5.1.1 从 Shark 说起 Hive: SQL-on-Hadoop 输入 将 SQL 转换成抽象语法树 Parser Semantic Analyzer Logical Plan Generator Logical Optimizer Physical Plan Generator 将抽象语法树转换成查询块 将查询块转换成逻辑查询计划 重写逻辑查询计划 将逻辑计划转成物理计划 Physical Optimizer 选择最佳的优化查询策略 输出 Hive 中 SQL 查询的 MapReduce 作业转化过程
5.1.1 从 Shark 说起 Shark 即 Hive on Spark, 为了实现与 Hive 兼容,Shark 在 HiveQL 方面重用了 Hive 中 HiveQL 的解析 逻辑执行计划翻译 执行计划优化等逻辑, 可以近似认为仅将物理执行计划从 MapReduce 作业替换成了 Spark 作业, 通过 Hive 的 HiveQL 解析, 把 HiveQL 翻译成 Spark 上的 RDD 操作
5.1.1 从 Shark 说起 Shark 的出现, 使得 SQL-on-Hadoop 的性能比 Hive 有了 10-100 倍的提高
5.1.1 从 Shark 说起 Shark 的设计导致了两个问题 : 一是执行计划优化完全依赖于 Hive, 不方便添加新的优化策略 二是因为 Spark 是线程级并行, 而 MapReduce 是进程级并行, 因此,Spark 在兼容 Hive 的实现上存在线程安全问题, 导致 Shark 不得不使用另外一套独立维护的打了补丁的 Hive 源码分支
5.1.1 从 Shark 说起 2014 年 6 月 1 日 Shark 项目和 Spark SQL 项目的主持人 Reynold Xin 宣布 : 停止对 Shark 的开发, 团队将所有资源放在 Spark SQL 项目上, 至此, Shark 的发展画上了句号, 但也因此发展出两个分支 :Spark SQL 和 Hive on Spark Spark SQL 作为 Spark 生态的一员继续发展, 而不再受限于 Hive, 只是兼容 Hive Hive on Spark 是一个 Hive 的发展计划, 该计划将 Spark 作为 Hive 的底层引擎之一, 也就是说, Hive 将不再受限于一个引擎, 可以采用 Map-Reduce Tez Spark 等引擎
5.1.2 Spark SQL 设计 Spark SQL 在 Hive 兼容层面仅依赖 HiveQL 解析 Hive 元数据, 也就是说, 从 HQL 被解析成抽象语法树 (AST) 起, 就全部由 Spark SQL 接管了 Spark SQL 执行计划生成和优化都由 Catalyst( 函数式关系查询优化框架 ) 负责 Client CLI JDBC Driver Cache Mgr. Metastore SQL Parser catalyst Physical Plan Execution Spark HDFS 图 Spark SQL 架构
5.1.2 Spark SQL 设计 Spark SQL 增加了 DataFrame( 即带有 Schema 信息的 RDD), 使用户可以在 Spark SQL 中执行 SQL 语句, 数据既可以来自 RDD, 也可以是 Hive HDFS Cassandra 等外部数据源, 还可以是 JSON 格式的数据 Spark SQL 目前支持 Scala Java Python 三种语言, 支持 SQL-92 规范 Scala Python Java HiveQL SQL-92 Spark SQL (DataFrame) HDFS Hive Cassandra JSON 图 Spark SQL 支持的数据格式和编程语言
5.1.3 为什么推出 Spark SQL
5.1.3 为什么推出 Spark SQL
5.1.3 为什么推出 Spark SQL 关系数据库已经很流行 关系数据库在大数据时代已经不能满足要求 首先, 用户需要从不同数据源执行各种操作, 包括结构化 半结构化和非结构化数据 其次, 用户需要执行高级分析, 比如机器学习和图像处理 在实际大数据应用中, 经常需要融合关系查询和复杂分析算法 ( 比如机器学习或图像处理 ), 但是, 缺少这样的系统 Spark SQL 填补了这个鸿沟 : 首先, 可以提供 DataFrame API, 可以对内部和外部各种数据源执行各种关系型操作 其次, 可以支持大数据中的大量数据源和数据分析算法 Spark SQL 可以融合 : 传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力
5.2 DataFrame 概述 DataFrame 的推出, 让 Spark 具备了处理大规模结构化数据的能力, 不仅比原有的 RDD 转化方式更加简单易用, 而且获得了更高的计算性能 Spark 能够轻松实现从 MySQL 到 DataFrame 的转化, 并且支持 SQL 查询 图 DataFrame 与 RDD 的区别 RDD 是分布式的 Java 对象的集合, 但是, 对象内部结构对于 RDD 而言却是不可知的 DataFrame 是一种以 RDD 为基础的分布式数据集, 提供了详细的结构信息
5.3 DataFrame 的创建 从 Spark2.0 以上版本开始,Spark 使用全新的 SparkSession 接口替代 Spark1.6 中的 SQLContext 及 HiveContext 接口来实现其对数据加载 转换 处理等功能 SparkSession 实现了 SQLContext 及 HiveContext 所有功能 SparkSession 支持从不同的数据源加载数据, 并把数据转换成 DataFrame, 并且支持把 DataFrame 转换成 SQLContext 自身中的表, 然后使用 SQL 语句来操作数据 SparkSession 亦提供了 HiveQL 以及其他依赖于 Hive 的功能的支持 可以通过如下语句创建一个 SparkSession 对象 : from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() 实际上, 在启动进入 pyspark 以后,pyspark 就默认提供了一个 SparkContext 对象 ( 名称为 sc) 和一个 SparkSession 对象 ( 名称为 spark)
5.3 DataFrame 的创建 在创建 DataFrame 时, 可以使用 spark.read 操作, 从不同类型的文件中加载数据创建 DataFrame, 例如 : spark.read.text("people.txt"): 读取文本文件 people.txt 创建 DataFrame spark.read.json("people.json"): 读取 people.json 文件创建 DataFrame; 在读取本地文件或 HDFS 文件时, 要注意给出正确的文件路径 spark.read.parquet( people.parquet ): 读取 people.parquet 文件创建 DataFrame
5.3 DataFrame 的创建 或者也可以使用如下格式的语句 : spark.read.format("text").load("people.txt"): 读取文本文件 people.json 创建 DataFrame; spark.read.format("json").load("people.json"): 读取 JSON 文件 people.json 创建 DataFrame; spark.read.format("parquet").load("people.parquet"): 读取 Parquet 文件 people.parquet 创建 DataFrame
5.3 DataFrame 的创建 一个实例 在 /usr/local/spark/examples/src/main/resources/ 这个目录下, 这个目录下有两个样例数据 people.json 和 people.txt people.json 文件的内容如下 : {"name":"michael"} {"name":"andy", "age":30} {"name":"justin", "age":19} people.txt 文件的内容如下 : Michael, 29 Andy, 30 Justin, 19
5.3 DataFrame 的创建 >>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json") >>> df.show() +----+-------+ age name +----+-------+ null Michael 30 Andy 19 Justin +----+-------+
5.4 DataFrame 的保存 可以使用 spark.write 操作, 把一个 DataFrame 保存成不同格式的文件, 例如, 把一个名称为 df 的 DataFrame 保存到不同格式文件中, 方法如下 : df.write.text("people.txt") df.write.json("people.json ) df.write.parquet("people.parquet ) 或者也可以使用如下格式的语句 : df.write.format("text").save("people.txt") df.write.format("json").save("people.json") df.write.format ("parquet").save("people.parquet")
5.4 DataFrame 的保存 下面从示例文件 people.json 中创建一个 DataFrame, 名称为 peopledf, 把 peopledf 保存到另外一个 JSON 文件中, 然后, 再从 peopledf 中选取一个列 ( 即 name 列 ), 把该列数据保存到一个文本文件中 >>> peopledf = spark.read.format("json").\... load("file:///usr/local/spark/examples/src/main/resources/people.json") >>> peopledf.select("name", "age").write.format("json").\... save("file:///usr/local/spark/mycode/sparksql/newpeople.json") >>> peopledf.select("name").write.format("text").\... save("file:///usr/local/spark/mycode/sparksql/newpeople.txt") 会新生成一个名称为 newpeople.json 的目录 ( 不是文件 ) 和一个名称为 newpeople.txt 的目录 ( 不是文件 ) part-00000-3db90180-ec7c-4291-ad05-df8e45c77f4d.json _SUCCESS
5.5 DataFrame 的常用操作 可以执行一些常用的 DataFrame 操作 >>> df=spark.read.json( people.json ) printschema() select()
5.5 DataFrame 的常用操作 filter() groupby()
5.5 DataFrame 的常用操作 sort()
5.6 从 RDD 转换得到 DataFrame 5.6.1 利用反射机制推断 RDD 模式 5.6.2 使用编程方式定义 RDD 模式
5.6.1 利用反射机制推断 RDD 模式 在 /usr/local/spark/examples/src/main/resources/ 目录下, 有个 Spark 安装时自带的样例数据 people.txt, 其内容如下 : Michael, 29 Andy, 30 Justin, 19 现在要把 people.txt 加载到内存中生成一个 DataFrame, 并查询其中的数据
5.6.1 利用反射机制推断 RDD 模式 >>> from pyspark.sql import Row >>> people = spark.sparkcontext.\... textfile("file:///usr/local/spark/examples/src/main/resources/people.txt").\... map(lambda line: line.split(",")).\... map(lambda p: Row(name=p[0], age=int(p[1]))) >>> schemapeople = spark.createdataframe(people) # 必须注册为临时表才能供下面的查询使用 >>> schemapeople.createorreplacetempview("people") >>> personsdf = spark.sql("select name,age from people where age > 20") #DataFrame 中的每个元素都是一行记录, 包含 name 和 age 两个字段, 分别用 p.name 和 p.age 来获取值 >>> personsrdd=personsdf.rdd.map(lambda p:"name: "+p.name+ ","+"Age: "+str(p.age)) >>> personsrdd.foreach(print) Name: Michael,Age: 29 Name: Andy,Age: 30
5.6.2 使用编程方式定义 RDD 模式 当无法提前获知数据结构时, 就需要采用编程方式定义 RDD 模式 比如, 现在需要通过编程方式把 people.txt 加载进来生成 DataFrame, 并完成 SQL 查询 第 1 步 : 制作 表头 name age 第 2 步 : 制作 表中的记录 Michael 29 Andy 30 Justin 19 第 3 步 : 把 表头 和 表中的记录 拼装在一起 name age Michael 29 Andy 30 Justin 19 图通过编程方式定义 RDD 模式的实现过程
5.6.2 使用编程方式定义 RDD 模式 >>> from pyspark.sql.types import * >>> from pyspark.sql import Row # 下面生成 表头 >>> schemastring = "name age" >>> fields = [StructField(field_name, StringType(), True) for field_name in schemastring.split(" ")] >>> schema = StructType(fields) # 下面生成 表中的记录 >>> lines = spark.sparkcontext.\... textfile("file:///usr/local/spark/examples/src/main/resources/people.txt") >>> parts = lines.map(lambda x: x.split(",")) >>> people = parts.map(lambda p: Row(p[0], p[1].strip())) # 下面把 表头 和 表中的记录 拼装在一起 >>> schemapeople = spark.createdataframe(people, schema) 剩余代码见下一页
5.6.2 使用编程方式定义 RDD 模式 # 注册一个临时表供下面查询使用 >>> schemapeople.createorreplacetempview("people") >>> results = spark.sql("select name,age FROM people") >>> results.show() +-------+---+ name age +-------+---+ Michael 29 Andy 30 Justin 19 +-------+---+
5.7 使用 Spark SQL 读写数据库 Spark SQL 可以支持 Parquet JSON Hive 等数据源, 并且可以通过 JDBC 连接外部数据源 5.7.1 准备工作 5.7.2 读取 MySQL 数据库中的数据 5.7.3 向 MySQL 数据库写入数据
5.7.1 准备工作 请参考厦门大学数据库实验室博客教程 Ubuntu 安装 MySQL, 在 Linux 系统中安装好 MySQL 数据库教程地址 : http://dblab.xmu.edu.cn/blog/install-mysql/ 平台每年访问量超过 100 万次
5.7.1 准备工作 在 Linux 中启动 MySQL 数据库 $ service mysql start $ mysql -u root -p # 屏幕会提示你输入密码 输入下面 SQL 语句完成数据库和表的创建 : mysql> create database spark; mysql> use spark; mysql> create table student (id int(4), name char(20), gender char(4), age int(4)); mysql> insert into student values(1,'xueqian','f',23); mysql> insert into student values(2,'weiliang','m',24); mysql> select * from student;
5.7.1 准备工作 下载 MySQL 的 JDBC 驱动程序, 比如 mysql-connectorjava-5.1.40.tar.gz 把该驱动程序拷贝到 spark 的安装目录 /usr/local/spark/jars 下 启动 pyspark $ cd /usr/local/spark $./bin/pyspark
5.7.2 读取 MySQL 数据库中的数据 执行以下命令连接数据库, 读取数据, 并显示 : >>> jdbcdf = spark.read \.format("jdbc") \.option("driver","com.mysql.jdbc.driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "student") \.option("user", "root") \.option("password", "123456") \.load() >>> jdbcdf.show() +---+--------+------+---+ id name gender age +---+--------+------+---+ 1 Xueqian F 23 2 Weiliang M 24 +---+--------+------+---+
5.7.3 向 MySQL 数据库写入数据 在 MySQL 数据库中创建了一个名称为 spark 的数据库, 并创建了一个名称为 student 的表创建后, 查看一下数据库内容 :
5.7.3 向 MySQL 数据库写入数据 现在开始编写程序, 往 spark.student 表中插入两条记录 #!/usr/bin/env python3 from pyspark.sql import Row from pyspark.sql.types import * from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() # 下面设置模式信息 schema = StructType([StructField("id", IntegerType(), True), \ StructField("name", StringType(), True), \ StructField("gender", StringType(), True), \ StructField("age", IntegerType(), True)])
5.7.3 向 MySQL 数据库写入数据 # 下面设置两条数据, 表示两个学生的信息 studentrdd = spark \.sparkcontext \.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \.map(lambda x:x.split(" ")) # 下面创建 Row 对象, 每个 Row 对象都是 rowrdd 中的一行 rowrdd = studentrdd.map(lambda p:row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip()))) # 建立起 Row 对象和模式之间的对应关系, 也就是把数据和模式对应起来 studentdf = spark.createdataframe(rowrdd, schema) # 写入数据库 prop = {} prop['user'] = 'root' prop['password'] = '123456' prop['driver'] = "com.mysql.jdbc.driver" studentdf.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)
5.7.3 向 MySQL 数据库写入数据 可以看一下效果, 看看 MySQL 数据库中的 spark.student 表发生了什么变化 mysql> select * from student; +------+-----------+--------+------+ id name gender age +------+-----------+--------+------+ 1 Xueqian F 23 2 Weiliang M 24 3 Rongcheng M 26 4 Guanhua M 27 +------+-----------+--------+------+ 4 rows in set (0.00 sec)
附录 A: 主讲教师林子雨简介 主讲教师 : 林子雨 单位 : 厦门大学计算机科学系 E-mail: ziyulin@xmu.edu.cn 个人网页 :http://dblab.xmu.edu.cn/post/linziyu 数据库实验室网站 :http://dblab.xmu.edu.cn 扫一扫访问个人主页 林子雨, 男,1978 年出生, 博士 ( 毕业于北京大学 ), 现为厦门大学计算机科学系助理教授 ( 讲师 ), 曾任厦门大学信息科学与技术学院院长助理 晋江市发展和改革局副局长 中国计算机学会数据库专业委员会委员, 中国计算机学会信息系统专业委员会委员 国内高校首个 数字教师 提出者和建设者, 厦门大学数据库实验室负责人, 厦门大学云计算与大数据研究中心主要建设者和骨干成员,2013 年度和 2017 年度厦门大学教学类奖教金获得者, 荣获 2017 年福建省精品在线开放课程 2018 年厦门大学高等教育成果特等奖 2018 年福建省高等教育教学成果二等奖 2018 年国家精品在线开放课程 主要研究方向为数据库 数据仓库 数据挖掘 大数据 云计算和物联网, 并以第一作者身份在 软件学报 计算机学报 和 计算机研究与发展 等国家重点期刊以及国际学术会议上发表多篇学术论文 作为项目负责人主持的科研项目包括 1 项国家自然科学青年基金项目 (No.61303004) 1 项福建省自然科学青年基金项目 (No.2013J05099) 和 1 项中央高校基本科研业务费项目 (No.2011121049), 主持的教改课题包括 1 项 2016 年福建省教改课题和 1 项 2016 年教育部产学协作育人项目, 同时, 作为课题负责人完成了国家发改委城市信息化重大课题 国家物联网重大应用示范工程区域试点泉州市工作方案 2015 泉州市互联网经济调研等课题 中国高校首个 数字教师 提出者和建设者,2009 年至今, 数字教师 大平台累计向网络免费发布超过 500 万字高价值的研究和教学资料, 累计网络访问量超过 500 万次 打造了中国高校大数据教学知名品牌, 编著出版了中国高校第一本系统介绍大数据知识的专业教材 大数据技术原理与应用, 并成为京东 当当网等网店畅销书籍 ; 建设了国内高校首个大数据课程公共服务平台, 为教师教学和学生学习大数据课程提供全方位 一站式服务, 年访问量超过 100 万次
附录 B: 大数据学习路线图 大数据学习路线图访问地址 :http://dblab.xmu.edu.cn/post/10164/
附录 C: 大数据技术原理与应用 教材 大数据技术原理与应用 概念 存储 处理 分析与应用 ( 第 2 版 ), 由厦门大学计算机科学系林子雨博士编著, 是国内高校第一本系统介绍大数据知识的专业教材 人民邮电出版社 ISBN:978-7-115-44330-4 定价 :49.80 元 全书共有 15 章, 系统地论述了大数据的基本概念 大数据处理架构 Hadoop 分布式文件系统 HDFS 分布式数据库 HBase NoSQL 数据库 云数据库 分布式并行编程模型 MapReduce Spark 流计算 图计算 数据可视化以及大数据在互联网 生物医学和物流等各个领域的应用 在 Hadoop HDFS HBase 和 MapReduce 等重要章节, 安排了入门级的实践操作, 让读者更好地学习和掌握大数据关键技术 扫一扫访问教材官网 本书可以作为高等院校计算机专业 信息管理等相关专业的大数据课程教材, 也可供相关技术人员参考 学习 培训之用 欢迎访问 大数据技术原理与应用 概念 存储 处理 分析与应用 教材官方网站 : http://dblab.xmu.edu.cn/post/bigdata
附录 D: 大数据基础编程 实验和案例教程 本书是与 大数据技术原理与应用 ( 第 2 版 ) 教材配套的唯一指定实验指导书 清华大学出版社 ISBN:978-7-302-47209-4 定价 :59 元 步步引导, 循序渐进, 详尽的安装指南为顺利搭建大数据实验环境铺平道路 深入浅出, 去粗取精, 丰富的代码实例帮助快速掌握大数据基础编程方法 精心设计, 巧妙融合, 五套大数据实验题目促进理论与编程知识的消化和吸收 结合理论, 联系实际, 大数据课程综合实验案例精彩呈现大数据分析全流程
附录 E: Spark 编程基础 (Scala 版 ) Spark 编程基础 (Scala 版 ) 厦门大学林子雨, 赖永炫, 陶继平编著 披荆斩棘, 在大数据丛林中开辟学习捷径填沟削坎, 为快速学习 Spark 技术铺平道路深入浅出, 有效降低 Spark 技术学习门槛资源全面, 构建全方位一站式在线服务体系 人民邮电出版社出版发行,ISBN:978-7-115-48816-9 教材官网 :http://dblab.xmu.edu.cn/post/spark/ 本书以 Scala 作为开发 Spark 应用程序的编程语言, 系统介绍了 Spark 编程的基础知识 全书共 8 章, 内容包括大数据技术概述 Scala 语言基础 Spark 的设计与运行原理 Spark 环境搭建和使用方法 RDD 编程 Spark SQL Spark Streaming Spark MLlib 等 本书每个章节都安排了入门级的编程实践操作, 以便读者更好地学习和掌握 Spark 编程方法 本书官网免费提供了全套的在线教学资源, 包括讲义 PPT 习题 源代码 软件 数据集 授课视频 上机实验指南等
附录 F: 高校大数据课程公共服务平台 http://dblab.xmu.edu.cn/post/bigdata-teaching-platform/ 扫一扫访问平台主页 扫一扫观看 3 分钟 FLASH 动画宣传片
Department of Computer Science, Xiamen University, 2019