使用 Spark SQL 读取 Hive 上的数据 Spark SQL 主要目的是使得用户可以在 Spark 上使用 SQL, 其数据源既可以是 RDD, 也可以是外部的数据源 ( 比如 Parquet Hive Json 等 ) Spark SQL 的其中一个分支就是 Spark on Hive, 也就是使用 Hive 中 HQL 的解析 逻辑执行计划翻译 执行计划优化等逻辑, 可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业 本文就是来介绍如何通过 Spark SQL 来读取现有 Hive 中的数据 不过, 预先编译好的 Spark assembly 包是不支持 Hive 的, 如果你需要在 Spark 中使用 Hive, 必须重新编译, 加上 -Phive 选项既可, 具体如下 : [iteblog@www.iteblog.com spark]$./makedistribution.sh --tgz -Phadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.2.0 -Phive 编译完成之后, 会在 SPARK_HOME 的 lib 目录下多产生三个 jar 包, 分别是 datanucleus-api-jdo -3.2.6.jar datanucleus-core-3.2.10.jar datanucleusrdbms-3.2.9.jar, 这些包都是 Hive 所需要的 下面就开始介绍步骤 一 环境准备 为了让 Spark 能够连接到 Hive 的原有数据仓库, 我们需要将 Hive 中的 hive-site.xml 文件拷贝到 Spark 的 conf 目录下, 这样就可以通过这个配置文件找到 Hive 的元数据以及数据存放 如果 Hive 的元数据存放在 Mysql 中, 我们还需要准备好 Mysql 相关驱动, 比如 :mysqlconnector-java-5.1.22-bin.jar 二 启动 spark-shell 环境准备好之后, 为了方便起见, 我们使用 spark-shell 来进行说明如何通过 Spark SQL 读取 Hive 中的数据 我们可以通过下面的命令来启动 spark-shell: [iteblog@www.iteblog.com spark]$ bin/spark-shell --master yarn-client --jars lib/mysqlconnector-java-5.1.22-bin.jar... 15/08/27 18:21:25 INFO repl.sparkiloop: Created spark context.. Spark context available as sc.... 15/08/27 18:21:30 INFO repl.sparkiloop: Created sql context (with Hive support).. SQL context available as sqlcontext. 1 / 8
启动 spark-shell 的时候会先向 ResourceManager 申请资源, 而且还会初始化 SparkContext 和 SQLContext 实例 sqlcontext 对象其实是 HiveContext 的实例,sqlContext 是进入 Spark SQL 的切入点 接下来我们来读取 Hive 中的数据 scala> sqlcontext.sql("create EXTERNAL TABLE IF NOT EXISTS ewaplog (key STRING, value ST RING) STORED AS INPUTFORMAT 'com.hadoop.mapred.deprecatedlzotextinputformat' OUTPUTFOR MAT 'org.apache.hadoop.hive.ql.io.hiveignorekeytextoutputformat' LOCATION '/user/iteblog/ewa plog' ") res0: org.apache.spark.sql.dataframe = [result: string] scala> sqlcontext.sql("load DATA LOCAL INPATH '/data/test.lzo' INTO TABLE ewaplog") res1: org.apache.spark.sql.dataframe = [result: string] scala> sqlcontext.sql("from ewaplog SELECT key, value").collect().foreach(println) [12,wyp] [23,ry] [12,wyp] [23,ry] 我们先创建了 ewaplog 表, 然后导入数据, 最后查询 我们可以看出所有的 SQL 和在 Hive 中是一样的, 只是在 Spark 上运行而已 在执行 SQL 的时候, 默认是调用 hiveql 解析器来解析 SQL 的 当然, 你完全可以调用 Spark SQL 内置的 SQL 解析器 sql, 可以通过 spark.sql.dialect 参数来设置 但是建议还是使用 hivesql 解析器, 因为它支持的语法更多, 而且还支持 Hive 的 UDF 函数, 在多数情况下推荐使用 hivesql 解析器 如果你在创建 HiveContext 的时候出现了类似以下的错误 : 15/11/20 16:20:07 WARN metadata.hive: Failed to access metastore. This class should not acc essed in runtime. org.apache.hadoop.hive.ql.metadata.hiveexception: java.lang.runtimeexception: Unable to in stantiate org.apache.hadoop.hive.ql.metadata.sessionhivemetastoreclient at org.apache.hadoop.hive.ql.metadata.hive.getalldatabases(hive.java:1236) at org.apache.hadoop.hive.ql.metadata.hive.reloadfunctions(hive.java:174) at org.apache.hadoop.hive.ql.metadata.hive.<clinit>(hive.java:166) at org.apache.hadoop.hive.ql.session.sessionstate.start(sessionstate.java:503) at org.apache.spark.sql.hive.client.clientwrapper.<init>(clientwrapper.scala:171) at org.apache.spark.sql.hive.hivecontext.executionhive$lzycompute(hivecontext.scala:162) at org.apache.spark.sql.hive.hivecontext.executionhive(hivecontext.scala:160) 2 / 8
at org.apache.spark.sql.hive.hivecontext.<init>(hivecontext.scala:167) at sun.reflect.nativeconstructoraccessorimpl.newinstance0(native Method) at sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.ja va:57) at sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccesso rimpl.java:45) at java.lang.reflect.constructor.newinstance(constructor.java:526) at org.apache.spark.repl.sparkiloop.createsqlcontext(sparkiloop.scala:1028) at $line4.$read$$iwc$$iwc.<init>(<console>:9) at $line4.$read$$iwc.<init>(<console>:18) at $line4.$read.<init>(<console>:20) at $line4.$read$.<init>(<console>:24) at $line4.$read$.<clinit>(<console>) at $line4.$eval$.<init>(<console>:7) at $line4.$eval$.<clinit>(<console>) at $line4.$eval.$print(<console>) at org.apache.spark.repl.sparkimain$readevalprint.call(sparkimain.scala:1065) at org.apache.spark.repl.sparkimain$request.loadandrun(sparkimain.scala:1340) at org.apache.spark.repl.sparkimain.loadandrunreq$1(sparkimain.scala:840) at org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:871) at org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:819) at org.apache.spark.repl.sparkiloop.reallyinterpret$1(sparkiloop.scala:857) at org.apache.spark.repl.sparkiloop.interpretstartingwith(sparkiloop.scala:902) at org.apache.spark.repl.sparkiloop.command(sparkiloop.scala:814) at org.apache.spark.repl.sparkiloopinit$$anonfun$initializespark$1.apply(sparkiloopinit.sca la:132) at org.apache.spark.repl.sparkiloopinit$$anonfun$initializespark$1.apply(sparkiloopinit.sca la:124) at org.apache.spark.repl.sparkimain.bequietduring(sparkimain.scala:324) at org.apache.spark.repl.sparkiloopinit$class.initializespark(sparkiloopinit.scala:124) at org.apache.spark.repl.sparkiloop.initializespark(sparkiloop.scala:64) s$1$$anonfun$apply$mcz$sp$5.apply$mcv$sp(sparkiloop.scala:974) at org.apache.spark.repl.sparkiloopinit$class.runthunks(sparkiloopinit.scala:159) at org.apache.spark.repl.sparkiloop.runthunks(sparkiloop.scala:64) at org.apache.spark.repl.sparkiloopinit$class.postinitialization(sparkiloopinit.scala:108) at org.apache.spark.repl.sparkiloop.postinitialization(sparkiloop.scala:64) s$1.apply$mcz$sp(sparkiloop.scala:991) s$1.apply(sparkiloop.scala:945) 3 / 8
s$1.apply(sparkiloop.scala:945) at scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclassloader.scala:135) at org.apache.spark.repl.sparkiloop.org$apache$spark$repl$sparkiloop$$process(sparkiloo p.scala:945) at org.apache.spark.repl.sparkiloop.process(sparkiloop.scala:1059) at org.apache.spark.repl.main$.main(main.scala:31) at org.apache.spark.repl.main.main(main.scala) at org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain (SparkSubmit.scala:674) at org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:180) at org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:205) at org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:120) at org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) Caused by: java.lang.runtimeexception: Unable to instantiate org.apache.hadoop.hive.ql.meta data.sessionhivemetastoreclient at org.apache.hadoop.hive.metastore.metastoreutils.newinstance(metastoreutils.java:1523) at org.apache.hadoop.hive.metastore.retryingmetastoreclient.<init>(retryingmetastoreclient.java:86) at org.apache.hadoop.hive.metastore.retryingmetastoreclient.getproxy(retryingmetastorecli ent.java:132) at org.apache.hadoop.hive.metastore.retryingmetastoreclient.getproxy(retryingmetastorecli ent.java:104) at org.apache.hadoop.hive.ql.metadata.hive.createmetastoreclient(hive.java:3005) at org.apache.hadoop.hive.ql.metadata.hive.getmsc(hive.java:3024) at org.apache.hadoop.hive.ql.metadata.hive.getalldatabases(hive.java:1234)... 59 more Caused by: java.lang.reflect.invocationtargetexception at sun.reflect.nativeconstructoraccessorimpl.newinstance0(native Method) at sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.ja va:57) at sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccesso rimpl.java:45) at java.lang.reflect.constructor.newinstance(constructor.java:526) at org.apache.hadoop.hive.metastore.metastoreutils.newinstance(metastoreutils.java:1521)... 65 more Caused by: MetaException(message:Version information not found in metastore. ) at org.apache.hadoop.hive.metastore.objectstore.checkschema(objectstore.java:6664) at org.apache.hadoop.hive.metastore.objectstore.verifyschema(objectstore.java:6645) 4 / 8
at org.apache.hadoop.hive.metastore.rawstoreproxy.invoke(rawstoreproxy.java:114) at com.sun.proxy.$proxy15.verifyschema(unknown Source) at org.apache.hadoop.hive.metastore.hivemetastore$hmshandler.getms(hivemetastore.java :572) at org.apache.hadoop.hive.metastore.hivemetastore$hmshandler.createdefaultdb(hivemeta Store.java:620) at org.apache.hadoop.hive.metastore.hivemetastore$hmshandler.init(hivemetastore.java:46 1) at org.apache.hadoop.hive.metastore.retryinghmshandler.<init>(retryinghmshandler.java:6 6) at org.apache.hadoop.hive.metastore.retryinghmshandler.getproxy(retryinghmshandler.jav a:72) at org.apache.hadoop.hive.metastore.hivemetastore.newretryinghmshandler(hivemetastore.java:5762) at org.apache.hadoop.hive.metastore.hivemetastoreclient.<init>(hivemetastoreclient.java:19 9) at org.apache.hadoop.hive.ql.metadata.sessionhivemetastoreclient.<init>(sessionhivemetast oreclient.java:74)... 70 more 15/11/20 16:20:07 INFO metastore.hivemetastore: 0: Opening raw store with implemenation c lass:org.apache.hadoop.hive.metastore.objectstore 15/11/20 16:20:07 INFO metastore.objectstore: ObjectStore, initialize called 15/11/20 16:20:07 INFO metastore.metastoredirectsql: Using direct SQL, underlying DB is DER BY 15/11/20 16:20:07 INFO metastore.objectstore: Initialized ObjectStore java.lang.runtimeexception: java.lang.runtimeexception: Unable to instantiate org.apache.ha doop.hive.ql.metadata.sessionhivemetastoreclient at org.apache.hadoop.hive.ql.session.sessionstate.start(sessionstate.java:522) at org.apache.spark.sql.hive.client.clientwrapper.<init>(clientwrapper.scala:171) at org.apache.spark.sql.hive.hivecontext.executionhive$lzycompute(hivecontext.scala:162) at org.apache.spark.sql.hive.hivecontext.executionhive(hivecontext.scala:160) at org.apache.spark.sql.hive.hivecontext.<init>(hivecontext.scala:167) at sun.reflect.nativeconstructoraccessorimpl.newinstance0(native Method) at sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.ja va:57) at sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccesso rimpl.java:45) at java.lang.reflect.constructor.newinstance(constructor.java:526) at org.apache.spark.repl.sparkiloop.createsqlcontext(sparkiloop.scala:1028) at $iwc$$iwc.<init>(<console>:9) at $iwc.<init>(<console>:18) at <init>(<console>:20) at.<init>(<console>:24) at.<clinit>(<console>) at.<init>(<console>:7) at.<clinit>(<console>) 5 / 8
at $print(<console>) at org.apache.spark.repl.sparkimain$readevalprint.call(sparkimain.scala:1065) at org.apache.spark.repl.sparkimain$request.loadandrun(sparkimain.scala:1340) at org.apache.spark.repl.sparkimain.loadandrunreq$1(sparkimain.scala:840) at org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:871) at org.apache.spark.repl.sparkimain.interpret(sparkimain.scala:819) at org.apache.spark.repl.sparkiloop.reallyinterpret$1(sparkiloop.scala:857) at org.apache.spark.repl.sparkiloop.interpretstartingwith(sparkiloop.scala:902) at org.apache.spark.repl.sparkiloop.command(sparkiloop.scala:814) at org.apache.spark.repl.sparkiloopinit$$anonfun$initializespark$1.apply(sparkiloopinit.sca la:132) at org.apache.spark.repl.sparkiloopinit$$anonfun$initializespark$1.apply(sparkiloopinit.sca la:124) at org.apache.spark.repl.sparkimain.bequietduring(sparkimain.scala:324) at org.apache.spark.repl.sparkiloopinit$class.initializespark(sparkiloopinit.scala:124) at org.apache.spark.repl.sparkiloop.initializespark(sparkiloop.scala:64) s$1$$anonfun$apply$mcz$sp$5.apply$mcv$sp(sparkiloop.scala:974) at org.apache.spark.repl.sparkiloopinit$class.runthunks(sparkiloopinit.scala:159) at org.apache.spark.repl.sparkiloop.runthunks(sparkiloop.scala:64) at org.apache.spark.repl.sparkiloopinit$class.postinitialization(sparkiloopinit.scala:108) at org.apache.spark.repl.sparkiloop.postinitialization(sparkiloop.scala:64) s$1.apply$mcz$sp(sparkiloop.scala:991) s$1.apply(sparkiloop.scala:945) s$1.apply(sparkiloop.scala:945) at scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclassloader.scala:135) at org.apache.spark.repl.sparkiloop.org$apache$spark$repl$sparkiloop$$process(sparkiloo p.scala:945) at org.apache.spark.repl.sparkiloop.process(sparkiloop.scala:1059) at org.apache.spark.repl.main$.main(main.scala:31) at org.apache.spark.repl.main.main(main.scala) at org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain (SparkSubmit.scala:674) at org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:180) at org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:205) 6 / 8
at org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:120) at org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) Caused by: java.lang.runtimeexception: Unable to instantiate org.apache.hadoop.hive.ql.meta data.sessionhivemetastoreclient at org.apache.hadoop.hive.metastore.metastoreutils.newinstance(metastoreutils.java:1523) at org.apache.hadoop.hive.metastore.retryingmetastoreclient.<init>(retryingmetastoreclient.java:86) at org.apache.hadoop.hive.metastore.retryingmetastoreclient.getproxy(retryingmetastorecli ent.java:132) at org.apache.hadoop.hive.metastore.retryingmetastoreclient.getproxy(retryingmetastorecli ent.java:104) at org.apache.hadoop.hive.ql.metadata.hive.createmetastoreclient(hive.java:3005) at org.apache.hadoop.hive.ql.metadata.hive.getmsc(hive.java:3024) at org.apache.hadoop.hive.ql.session.sessionstate.start(sessionstate.java:503)... 56 more Caused by: java.lang.reflect.invocationtargetexception at sun.reflect.nativeconstructoraccessorimpl.newinstance0(native Method) at sun.reflect.nativeconstructoraccessorimpl.newinstance(nativeconstructoraccessorimpl.ja va:57) at sun.reflect.delegatingconstructoraccessorimpl.newinstance(delegatingconstructoraccesso rimpl.java:45) at java.lang.reflect.constructor.newinstance(constructor.java:526) at org.apache.hadoop.hive.metastore.metastoreutils.newinstance(metastoreutils.java:1521)... 62 more Caused by: MetaException(message:Version information not found in metastore. ) at org.apache.hadoop.hive.metastore.objectstore.checkschema(objectstore.java:6664) at org.apache.hadoop.hive.metastore.objectstore.verifyschema(objectstore.java:6645) at org.apache.hadoop.hive.metastore.rawstoreproxy.invoke(rawstoreproxy.java:114) at com.sun.proxy.$proxy15.verifyschema(unknown Source) at org.apache.hadoop.hive.metastore.hivemetastore$hmshandler.getms(hivemetastore.java :572) at org.apache.hadoop.hive.metastore.hivemetastore$hmshandler.createdefaultdb(hivemeta Store.java:620) at org.apache.hadoop.hive.metastore.hivemetastore$hmshandler.init(hivemetastore.java:46 1) at org.apache.hadoop.hive.metastore.retryinghmshandler.<init>(retryinghmshandler.java:6 6) at org.apache.hadoop.hive.metastore.retryinghmshandler.getproxy(retryinghmshandler.jav a:72) at org.apache.hadoop.hive.metastore.hivemetastore.newretryinghmshandler(hivemetastore.java:5762) at org.apache.hadoop.hive.metastore.hivemetastoreclient.<init>(hivemetastoreclient.java:19 7 / 8
Powered by TCPDF (www.tcpdf.org) 使用 Spark SQL 读取 Hive 上的数据 9) at org.apache.hadoop.hive.ql.metadata.sessionhivemetastoreclient.<init>(sessionhivemetast oreclient.java:74)... 67 more 看下你的 Hadoop 集群是否可以连接 Mysql 元数据 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 8 / 8