概论 SparkR 是一个 R 语言包, 它提供了轻量级的方式使得可以在 R 语言中使用 Apache Spark 在 Spark 1.4 中,SparkR 实现了分布式的 data frame, 支持类似查询 过滤以及聚合的操作 ( 类似于 R 中的 data frames:dplyr), 但是这个可以操作大规模的数据集 SparkR DataFrames DataFrame 是数据组织成一个带有列名称的分布式数据集 在概念上和关系型数据库中的表类似, 或者和 R 语言中的 data frame 类似, 但是这个提供了很多的优化措施 构造 DataFrame 的方式有很多 : 可以通过结构化文件中构造 ; 可以通过 Hive 中的表构造 ; 可以通过外部数据库构造或者是通过现有 R 的 data frame 构造等等 从 SparkContext 和 SQLContext 开始 SparkContext 是 SparkR 的切入点, 它使得你的 R 程序和 Spark 集群互通 你可以通过 sparkr.i nit 来构建 SparkContext, 然后可以传入类似于应用程序名称的选项给它 如果想使用 DataFrame s, 我们得创建 SQLContext, 这个可以通过 SparkContext 来构造 如果你使用 SparkR shell, SQLContext 和 SparkContext 会自动地构建好 sc <- sparkr.init() sqlcontext <- sparkrsql.init(sc) 1 / 6
如果想及时了解 Spark Hadoop 或者 Hbase 相关的文章, 欢迎关注微信公共帐号 :iteblog_hadoop 创建 DataFrames 如果有 SQLContext 实例, 那么应用程序就可以通过本地的 R data frame( 或者是 Hive 表 ; 或者是其他数据源 ) 来创建 DataFrames 下面将详细地介绍 通过本地 data frame 构造 最简单地创建 DataFrames 是将 R 的 data frame 转换成 SparkR DataFrames, 我们可以通过 createdataframe 来创建, 并传入本地 R 的 data frame 以此来创建 SparkR DataFrames, 下面例子就是这种方法 : df <- createdataframe(sqlcontext, faithful) # Displays the content of the DataFrame to stdout head(df) ## eruptions waiting ##1 3.600 79 ##2 1.800 54 ##3 3.333 74 通过 Data Sources 构造 通过 DataFrame 接口,SparkR 支持操作多种数据源, 本节将介绍如何通过 Data Sources 提供的方法来加载和保存数据 你可以阅读 Spark 2 / 6
SQL 编程指南来了解更多的 options 选项. Data Sources 中创建 DataFrames 的一般方法是使用 read.df, 这个方法需要传入 SQLContext, 需要加载的文件路径以及数据源的类型 SparkR 内置支持读取 JSON 和 Parquet 文件, 而且通过 S park Packages 你可以读取很多类型的数据, 比如 CSV 和 Avro 文件 下面是介绍如何 JSON 文件, 注意, 这里使用的文件不是典型的 JSON 文件 每行文件必须包含一个分隔符 自包含有效的 JSON 对象 : people <- read.df(sqlcontext, "./examples/src/main/resources/people.json", "json") head(people) ## age name ##1 NA Michael ##2 30 Andy ##3 19 Justin # SparkR automatically infers the schema from the JSON file printschema(people) # root # -- age: integer (nullable = true) # -- name: string (nullable = true) Data sources API 还可以将 DataFrames 保存成多种的文件格式, 比如我们可以通过 write.df 将上面的 DataFrame 保存成 Parquet 文件 : write.df(people, path="people.parquet", source="parquet", mode="overwrite") 通过 Hive tables 构造 我们也可以通过 Hive 表来创建 SparkR DataFrames, 为了达到这个目的, 我们需要创建 HiveContext, 因为我们可以通过它来访问 Hive MetaStore 中的表 注意,Spark 内置就对 Hive 提供了支持,SQLContext 和 HiveContext 的区别可以参见 SQL 编程指南 # sc is an existing SparkContext. hivecontext <- sparkrhive.init(sc) sql(hivecontext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql(hivecontext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TAB LE src") 3 / 6
# Queries can be expressed in HiveQL. results <- hivecontext.sql("from src SELECT key, value") # results is now a DataFrame head(results) ## key value ## 1 238 val_238 ## 2 86 val_86 ## 3 311 val_311 DataFrame 的相关操作 SparkR DataFrames 中提供了大量操作结构化数据的函数, 这里仅仅列出其中一小部分, 详细的 API 可以参见 SparkR 编程的 API 文档 选择行和列 # Create the DataFrame df <- createdataframe(sqlcontext, faithful) # Get basic information about the DataFrame df ## DataFrame[eruptions:double, waiting:double] # Select only the "eruptions" column head(select(df, df$eruptions)) ## eruptions ##1 3.600 ##2 1.800 ##3 3.333 # You can also pass in column name as strings head(select(df, "eruptions")) # Filter the DataFrame to only retain rows with wait times shorter than 50 mins head(filter(df, df$waiting < 50)) ## eruptions waiting ##1 1.750 47 ##2 1.750 47 ##3 1.867 48 4 / 6
Grouping 和 Aggregation # We use the `n` operator to count the number of times each waiting time appears head(summarize(groupby(df, df$waiting), count = n(df$waiting))) ## waiting count ##1 81 13 ##2 60 6 ##3 68 1 # We can also sort the output from the aggregation to get the most common waiting times waiting_counts <- summarize(groupby(df, df$waiting), count = n(df$waiting)) head(arrange(waiting_counts, desc(waiting_counts$count))) ## waiting count ##1 78 15 ##2 83 14 ##3 81 13 列上面的操作 SparkR 提供了大量的函数用于直接对列进行数据处理的操作 # Convert waiting time from hours to seconds. # Note that we can assign this to a new column in the same DataFrame df$waiting_secs <- df$waiting * 60 head(df) ## eruptions waiting waiting_secs ##1 3.600 79 4740 ##2 1.800 54 3240 ##3 3.333 74 4440 在 SparkR 中运行 SQL 查询 SparkR DataFrame 也可以在 Spark SQL 中注册成临时表 将 DataFrame 注册成表可以允许我们在数据集上运行 SQL 查询 sql 函数可以使得我们直接运行 SQL 查询, 而且返回的结构是 DataFra me # Load a JSON file 5 / 6
Powered by TCPDF (www.tcpdf.org) people <- read.df(sqlcontext, "./examples/src/main/resources/people.json", "json") # Register this DataFrame as a table. registertemptable(people, "people") # SQL statements can be run by using the sql method teenagers <- sql(sqlcontext, "SELECT name FROM people WHERE age >= 13 AND age <= 19") head(teenagers) ## name ##1 Justin 本文翻译自 SparkR 官方文档, 这个是 Spark 1.4 中才有的, 不过他还没发布, 不过可以看这里 http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc4-docs/sparkr.html 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 6 / 6