使用 Apache Spark 将数据写入 ElasticSearch ElasticSearch 是一个基于 Lucene 的搜索服务器 它提供了一个分布式多用户能力的全文搜索引擎, 基于 RESTful web 接口 Elasticsearch 是用 Java 开发的, 并作为 Apache 许可条款下的开放源码发布, 是当前流行的企业级搜索引擎 设计用于云计算中, 能够达到实时搜索, 稳定, 可靠, 快速, 安装使用方便 本文并不打算介绍 ElasticSearch 的概念, 安装部署等知识, 而是直接介绍如何使用 Apache S park 将数据写入到 ElasticSearch 中 本文使用的是类库是 elasticsearchhadoop, 其从 2.1 版本开始提供了内置支持 Apache Spark 的功能, 在使用 elasticsearchhadoop 之前, 我们需要引入依赖 : <dependency> <groupid>org.elasticsearch</groupid> <artifactid>elasticsearch-hadoop</artifactid> <version>2.3.4</version> </dependency> 为了方便, 本文直接在 spark-shell 中操作 ElasticSearch 在此之前, 我们需要在 $SPARK_HO ME/conf/spark-default.conf 文件中加入以下配置 : spark.es.nodes www.iteblog.com spark.es.port 9200 其中 spark.es.nodes 指定你 es 集群的机器列表, 但是不需要把你集群所有的节点都列在里面 ;spa rk.es.port 表示集群 HTTP 端口 之所以要加上 spark 前缀是因为 Spark 通过从文件里面或者命令行里面读取配置参数只会加载 spark 开头的, 其他的参数将会被忽略 之后 elasticsearchhadoop 会把 spark 前缀去掉 如果你直接将代码写入文件, 那么你可以在初始化 SparkContext 之前设置好 ElasticSearch 相关的参数, 如下 : import org.apache.spark.sparkconf val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.nodes", "www.iteblog.com") conf.set("es.port", "9200") 1 / 8
conf.set("es.index.auto.create", "true") 在写入数据之前, 先导入 org.elasticsearch.spark._ 包, 这将使得所有的 RDD 拥有 savetoes 方法 下面我将一一介绍将不同类型的数据写入 ElasticSearch 中 将 Map 对象写入 ElasticSearch scala> import org.elasticsearch.spark._ import org.elasticsearch.spark._ scala> val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) numbers: scala.collection.immutable.map[string,int] = Map(one -> 1, two -> 2, three -> 3) scala> val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran") airports: scala.collection.immutable.map[string,string] = Map(OTP -> Otopeni, SFO -> San Fran ) scala> sc.makerdd(seq(numbers, airports)).savetoes("iteblog/docs") 上面构建了两个 Map 对象, 然后将它们写入到 ElasticSearch 中 ; 其中 savetoes 里面参数的 iteblog 表示索引 (indexes), 而 docs 表示 type 然后我们可以通过下面 URL 查看 iteblog 这个 index 的属性 : curl -XGET https://www.iteblog.com:9200/iteblog "iteblog": "aliases":, "mappings": "docs": "properties": "SFO": "type": "string", "arrival": "type": "string", "one": "type": "long", 2 / 8
"three": "type": "long", "two": "type": "long", "settings": "index": "creation_date": "1470805957888", "uuid": "HNIcGZ69Tf6qX3XVccwKUg", "number_of_replicas": "1", "number_of_shards": "5", "version": "created": "2030499", "warmers": 同时使用下面 URL 搜索出所有的 documents: https://www.iteblog.com:9200/iteblog/docs/_search "took": 2, "timed_out": false, "_shards": "total": 5, "successful": 5, "failed": 0, "hits": "total": 2, "max_score": 1, "hits": [ "_index": "iteblog", "_type": "docs", 3 / 8
], "_id": "AVZy3d5sJfxPRwCjtWM9", "_score": 1, "_source": "one": 1, "two": 2, "three": 3 "_index": "iteblog", "_type": "docs", "_id": "AVZy3d5sJfxPRwCjtWM-", "_score": 1, "_source": "arrival": "Otopeni", "SFO": "San Fran" 将 case class 对象写入 ElasticSearch 我们还可以将 Scala 中的 case class 对象写入到 ElasticSearch;Java 中可以写入 JavaBean 对象, 如下 : scala> case class Trip(departure: String, arrival: String) defined class Trip scala> val upcomingtrip = Trip("OTP", "SFO") upcomingtrip: Trip = Trip(OTP,SFO) scala> val lastweektrip = Trip("MUC", "OTP") lastweektrip: Trip = Trip(MUC,OTP) scala> val rdd = sc.makerdd(seq(upcomingtrip, lastweektrip)) rdd: org.apache.spark.rdd.rdd[trip] = ParallelCollectionRDD[1] at makerdd at <console>:37 scala> rdd.savetoes("iteblog/class") 4 / 8
上面的代码片段将 upcomingtrip 和 lastweektrip 写入到名为 iteblog 的 _index 中,type 是 class 上面都是通过隐式转换才使得 rdd 拥有 savetoes 方法 elasticsearchhadoop 还提供显式方法来把 RDD 写入到 ElasticSearch 中, 如下 : scala> import org.elasticsearch.spark.rdd.esspark import org.elasticsearch.spark.rdd.esspark scala> val rdd = sc.makerdd(seq(upcomingtrip, lastweektrip)) rdd: org.apache.spark.rdd.rdd[trip] = ParallelCollectionRDD[0] at makerdd at <console>:34 scala> EsSpark.saveToEs(rdd, "spark/docs") 将 Json 字符串写入 ElasticSearch 我们可以直接将 Json 字符串写入到 ElasticSearch 中, 如下 : scala> val json1 = """"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"""" json1: String = "id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop" scala> val json2 = """"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"""" json2: String = "id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop" scala> sc.makerdd(seq(json1, json2)).savejsontoes("iteblog/json") 动态设置插入的 type 上面的示例都是将写入的 type 写死 有很多场景下同一个 Job 中有很多类型的数据, 我们希望一次就可以将不同的数据写入到不同的 type 中, 比如属于 book 的信息全部写入到 type 为 book 里面 ; 而属于 cd 的信息全部写入到 type 为 cd 里面 很高兴的是 elasticsearchhadoop 为我们提供了这个功能, 如下 : scala> val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994") game: scala.collection.immutable.map[string,string] = Map(media_type -> game, title -> FF VI, year -> 1994) scala> val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") book: scala.collection.immutable.map[string,string] = Map(media_type -> book, title -> Harry P otter, year -> 2010) scala> val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") 5 / 8
cd: scala.collection.immutable.map[string,string] = Map(media_type -> music, title -> Surfing W ith The Alien) scala> sc.makerdd(seq(game, book, cd)).savetoes("iteblog/media_type") type 是通过 media_type 通配符设置的, 这个在写入的时候可以获取到, 然后将不同类型的数据写入到不同的 type 中 自定义 id 在 ElasticSearch 中,_index/_type/_id 的组合可以唯一确定一个 Document 如果我们不指定 id 的话,ElasticSearch 将会自动为我们生产全局唯一的 id, 自动生成的 ID 有 20 个字符长如下 : "_index": "iteblog", "_type": "docs", "_id": "AVZy3d5sJfxPRwCjtWM-", "_score": 1, "_source": "arrival": "Otopeni", "SFO": "San Fran" 很显然, 这么长的字符串没啥意义, 而且也不便于我们记忆使用 不过我们可以在插入数据的时候手动指定 id 的值, 如下 : scala> val otp = Map("iata" -> "OTP", "name" -> "Otopeni") otp: scala.collection.immutable.map[string,string] = Map(iata -> OTP, name -> Otopeni) scala> val muc = Map("iata" -> "MUC", "name" -> "Munich") muc: scala.collection.immutable.map[string,string] = Map(iata -> MUC, name -> Munich) scala> val sfo = Map("iata" -> "SFO", "name" -> "San Fran") sfo: scala.collection.immutable.map[string,string] = Map(iata -> SFO, name -> San Fran) scala> val airportsrdd = sc.makerdd(seq((1, otp), (2, muc), (3, sfo))) scala> airportsrdd.savetoeswithmeta("iteblog/2015") 6 / 8
上面的 Seq((1, otp), (2, muc), (3, sfo)) 语句指定为各个对象指定了 id 值, 分别为 1 2 3 然后你可以通过 /iteblog/2015/1 URL 搜索到 otp 对象的值 我们还可以如下方式指定 id: scala> val json1 = """"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"""" json1: String = "id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop" scala> val json2 = """"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"""" json2: String = "id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop" scala> val rdd = sc.makerdd(seq(json1, json2)) scala> EsSpark.saveToEs(rdd, "iteblog/docs", Map("es.mapping.id" -> "id")) 上面通过 es.mapping.id 参数将对象中的 id 字段映射为每条记录的 id 自定义记录的元数据 我们甚至可以在写入数据的时候自定义记录的元数据, 如下 : scala> import org.elasticsearch.spark.rdd.metadata._ import org.elasticsearch.spark.rdd.metadata._ scala> val otp = Map("iata" -> "OTP", "name" -> "Otopeni") otp: scala.collection.immutable.map[string,string] = Map(iata -> OTP, name -> Otopeni) scala> val muc = Map("iata" -> "MUC", "name" -> "Munich") muc: scala.collection.immutable.map[string,string] = Map(iata -> MUC, name -> Munich) scala> val sfo = Map("iata" -> "SFO", "name" -> "San Fran") sfo: scala.collection.immutable.map[string,string] = Map(iata -> SFO, name -> San Fran) scala> val otpmeta = Map(ID -> 1, TTL -> "3h") scala> val mucmeta = Map(ID -> 2, VERSION -> "23") scala> val sfometa = Map(ID -> 3) 7 / 8
Powered by TCPDF (www.tcpdf.org) 使用 Apache Spark 将数据写入 ElasticSearch scala> val airportsrdd = sc.makerdd(seq((otpmeta, otp), (mucmeta, muc), (sfometa, sfo))) scala> airportsrdd.savetoeswithmeta("iteblog/2015") 上面代码片段分别为 otp muc 和 sfo 设置了不同的元数据, 这在很多场景下是非常有用的 好了不早了, 该洗洗睡, 后面我将介绍如何使用 Apache Spark 读取 ElasticSearch 中的数据 本博客文章除特别声明, 全部都是原创! 禁止个人和公司转载本文 谢谢理解 : 过往记忆 (https://www.iteblog.com/) 本文链接 : () 8 / 8