Spark 读取 Hbase 中的数据 Spark 和 Flume-ng 整合, 可以参见本博客 : Spark 和 Flume-ng 整合 使用 Spark 读取 HBase 中的数据 如果想及时了解 Spark Hadoop 或者 Hbase 相关的文章, 欢迎关注微信公共帐号 :iteblog_hadoop 大家可能都知道很熟悉 Spark 的两种常见的数据读取方式 ( 存放到 RDD 中 ):(1) 调用 pa rallelize 函数直接从集合中获取数据, 并存入 RDD 中 ;Java 版本如下 : JavaRDD<Integer> myrdd = sc.parallelize(arrays.aslist(1,2,3)); Scala 版本如下 : val myrdd= sc.parallelize(list(1,2,3)) 这种方式很简单, 很容易就可以将一个集合中的数据变成 RDD 的初始化值 ; 更常见的是 (2 ) 从文本中读取数据到 RDD 中, 这个文本可以是纯文本文件 可以是 sequence 文件 ; 可以存放在本地 (file://) 可以存放在 HDFS(hdfs://) 上, 还可以存放在 S3 上 其实对文件来说,Spark 支持 Hadoop 所支持的所有文件类型和文件存放位置 Java 版如下 : 1 / 5
import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.api.java.javasparkcontext; SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); sc.addfile("wyp.data"); JavaRDD<String> lines = sc.textfile(sparkfiles.get("wyp.data")); Scala 版本如下 : import org.apache.spark.sparkcontext import org.apache.spark.sparkconf val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) sc.addfile("spam.data") val infile = sc.textfile(sparkfiles.get("spam.data")) 在实际情况下, 我们需要的数据可能不是简单的存放在 HDFS 文本中, 我们需要的数据可能就存放在 Hbase 中, 那么我们如何用 Spark 来读取 Hbase 中的数据呢? 本文的所有测试是基于 Hadoo p 2.2.0 Hbase 0.98.2 Spark 0.9.1, 不同版本可能代码的编写有点不同 本文只是简单地用 Spa rk 来读取 Hbase 中的数据, 如果需要对 Hbase 进行更强的操作, 本文可能不能帮你 话不多说,S park 操作 Hbase 的核心的 Java 版本代码如下 : import org.apache.hadoop.conf.configuration; import org.apache.hadoop.hbase.hbaseconfiguration; import org.apache.hadoop.hbase.client.result; import org.apache.hadoop.hbase.client.scan; import org.apache.hadoop.hbase.io.immutablebyteswritable; import org.apache.hadoop.hbase.mapreduce.tableinputformat; import org.apache.hadoop.hbase.protobuf.protobufutil; import org.apache.hadoop.hbase.protobuf.generated.clientprotos; import org.apache.hadoop.hbase.util.base64; import org.apache.hadoop.hbase.util.bytes; import org.apache.spark.api.java.javapairrdd; import org.apache.spark.api.java.javasparkcontext; 2 / 5
JavaSparkContext sc = new JavaSparkContext(master, "hbasetest", System.getenv("SPARK_HOME"), System.getenv("JARS")); Configuration conf = HBaseConfiguration.create(); Scan scan = new Scan(); scan.addfamily(bytes.tobytes("cf")); scan.addcolumn(bytes.tobytes("cf"), Bytes.toBytes("airName")); try { String tablename = "flight_wap_order_log"; conf.set(tableinputformat.input_table, tablename); ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String ScanToString = Base64.encodeBytes(proto.toByteArray()); conf.set(tableinputformat.scan, ScanToString); JavaPairRDD<ImmutableBytesWritable, Result> myrdd = sc.newapihadooprdd(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); catch (Exception e) { e.printstacktrace(); 这样本段代码段是从 Hbase 表名为 flight_wap_order_log 的数据库中读取 cf 列簇上的 airname 一列的数据, 这样我们就可以对 myrdd 进行相应的操作 : System.out.println(myRDD.count()); 本段代码需要在 pom.xml 文件加入以下依赖 : 3 / 5
<groupid>org.apache.spark</groupid> <artifactid>spark-core_2.10</artifactid> <version>0.9.1</version> <artifactid>hbase</artifactid> <artifactid>hbase-client</artifactid> <artifactid>hbase-common</artifactid> <artifactid>hbase-server</artifactid> Scala 版如下 : import org.apache.spark._ import org.apache.spark.rdd.newhadooprdd import org.apache.hadoop.hbase.{hbaseconfiguration, HTableDescriptor import org.apache.hadoop.hbase.client.hbaseadmin import org.apache.hadoop.hbase.mapreduce.tableinputformat 4 / 5
Powered by TCPDF (www.tcpdf.org) Spark 读取 Hbase 中的数据 object HBaseTest { def main(args: Array[String]) { val sc = new SparkContext(args(0), "HBaseTest", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) val conf = HBaseConfiguration.create() conf.set(tableinputformat.input_table, args(1)) val hbaserdd = sc.newapihadooprdd(conf, classof[tableinputformat], classof[org.apache.hadoop.hbase.io.immutablebyteswritable], classof[org.apache.hadoop.hbase.client.result]) hbaserdd.count() System.exit(0) 我们需要在加入如下依赖 : librarydependencies ++= Seq( "org.apache.spark" % "spark-core_2.10" % "0.9.1", "org.apache.hbase" % "hbase" % "0.98.2-hadoop2", "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2", "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2", "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2" ) 在测试的时候, 需要配置好 Hbase Hadoop 环境, 否则程序会出现问题, 特别是让程序找到 Hbase-site.xml 配置文件 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 5 / 5