使用Apache Spark将数据写入ElasticSearch

Similar documents
通过Hive将数据写入到ElasticSearch

使用Cassandra和Spark 2.0实现Rest API服务

Apache CarbonData集群模式使用指南

Spark读取Hbase中的数据

使用MapReduce读取XML文件

在Spring中使用Kafka:Producer篇

Guava学习之Resources

使用Spark SQL读取Hive上的数据

Spark 2.0介绍:在Spark SQL中定义查询优化规则

Hive:用Java代码通过JDBC连接Hiveserver

59 1 CSpace 2 CSpace CSpace URL CSpace 1 CSpace URL 2 Lucene 3 ID 4 ID Web 1. 2 CSpace LireSolr 3 LireSolr 3 Web LireSolr ID

韶关:神奇丹霞

SparkR(R on Spark)编程指南

六种使用Linux命令发送带附件的邮件

哼, 你 們 不 回 答 又 怎 麼 樣? 不 管 是 多 大 來 頭, 現 在 都 被 血 魔 吞 噬 無 蹤 了 你 們 幾 個 真 是 太 過 分, 我 不 犯 你 們, 你 們 卻 一 天 到 晚 來 挑 釁 我 教 尊 冷 笑 著 說 道 嗚, 大 人 土 地 大 姐 跪 下 來, 流 下

Flume-ng与Mysql整合开发

伊春:醉人林都

三种方法实现Hadoop(MapReduce)全局排序(1)

PowerPoint 演示文稿

Kafka客户端是如何找到 leader 分区的

Hadoop元数据合并异常及解决方法

关林:武圣陵寝

泰山:五岳独尊

国内26省市新能源汽车推广规划已出台

北戴河:海阔天空

帝国CMS下在PHP文件中调用数据库类执行SQL语句实例


西岭雪山滑雪场

SDK 概要 使用 Maven 的用户可以从 Maven 库中搜索 "odps-sdk" 获取不同版本的 Java SDK: 包名 odps-sdk-core odps-sdk-commons odps-sdk-udf odps-sdk-mapred odps-sdk-graph 描述 ODPS 基

将 MySQL 的全量数据导入到 Apache Solr 中

Flink快速上手(QuickStart)

第 06 期 李祥池 : 基于 ELK 和 Spark Streaming 的日志分析系统设计与实现 1 日志 1.1 日志定义 IT 1.2 日志处理方案演进 v1.0 v2.0 Hadoop Storm Spark Hadoop/Storm/Spark v3.0 TB Splunk ELK SI

据传-蚂蚁金服350亿美元开约A股IPO,马云身价又要暴涨

Hive几种数据导入方式

Kubenetes 系列列公开课 2 每周四晚 8 点档 1. Kubernetes 初探 2. 上 手 Kubernetes 3. Kubernetes 的资源调度 4. Kubernetes 的运 行行时 5. Kubernetes 的 网络管理理 6. Kubernetes 的存储管理理 7.

团 学 要 闻 我 校 召 开 共 青 团 五 届 九 次 全 委 ( 扩 大 ) 会 议 3 月 17 日, 我 校 共 青 团 五 届 九 次 全 委 ( 扩 大 ) 会 议 在 行 政 办 公 楼 五 楼 会 议 室 举 行, 校 团 委 委 员 各 院 ( 系 ) 团 委 书 记 校 学 生

01

Hadoop 2.2.0安装和配置lzo

江门:中国第一侨乡

南華大學數位論文

是 喔, 就 是 那 個 在 BBS 醫 療 版 跟 你 嗆 聲, 自 稱 有 三 十 多 年 推 拿 經 驗 的 大 叔 嗎? 一 個 看 來 頗 為 清 秀 的 女 生 問 道, 她 語 氣 中 略 感 訝 異 是 啊, 什 麼 推 拿 按 摩 有 多 好, 還 要 人 生 病 盡 量 不 要

案例分享产品文档

胃癌早诊早治技术方案.doc


目 录 1 足 阳 明 胃 经 足 太 阴 脾 经 手 少 阴 心 经 手 太 阳 小 肠 经 足 太 阳 膀 胱 经 : 足 少 阴 肾 经 手 厥 阴 心 包 经 手 少 阳 三 焦 经... 10

<4D F736F F D203132A4EBAB50BE50B0EAAFE8ADF4B4B5B946ACD3AB61B8B9A661A4A4AEFCB66CBDFC E646F63>


Fun Time (1) What happens in memory? 1 i n t i ; 2 s h o r t j ; 3 double k ; 4 char c = a ; 5 i = 3; j = 2; 6 k = i j ; H.-T. Lin (NTU CSIE) Referenc

電機工程系認可證照清單 /7/1

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

菩提道次第廣論

路 上 沒 說 話, 車 子 被 爸 離 去 後 開 走 了, 沒 什 麼 變, 除 了 一 股 淡 淡 的 香 味, 我 不 太 習 慣, 像 空 氣 中 的 粉 塵, 左 飄 右 飄, 光 中 飛 舞 我 沒 提, 看 車 窗 外, 外 面 不 太 有 趣, 我 只 是 沒 事 幹, 我 們 本

繁 華 國 小 101 學 年 母 親 節 感 恩 惜 福 - 跳 蚤 市 場 暨 科 學 闖 關 遊 戲 親 子 活 動 實 施 計 畫 一 依 據 : 本 校 101 學 年 度 校 務 計 畫 及 行 事 曆 二 目 的 : 1. 培 養 學 生 感 恩 惜 物 知 福 惜 福 的 節 儉 觀

台 中 市 北 屯 區 東 山 里 橫 坑 9 林 志 明 巷 89-5 菜 豆 菜 大 漿 果 菜 豆 菜 大 漿 果 小 漿 果 核 果 柑 桔 無 陳 錦 生 新 竹 市 香 山 區


育儿小故事(四)

大连软~1

教育扩张能改善收入分配差距吗?——来自CHNS2006年数据的证据

山水文化,市井人家——以湖州邱城小镇的概念性规划为例

ABOUT ME AGENDA 唐建法 / TJ MongoDB 高级方案架构师 MongoDB 中文社区联合发起人 Spark 介绍 Spark 和 MongoDB 案例演示

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

ElasticSearch系列文章:集群操作

安 徽 蓝 皮 书 社 会 域 遍 及 社 会 生 活 各 个 方 面, 如 社 会 福 利 慈 善 公 益 社 会 救 助 扶 贫 济 困 精 神 健 康 残 障 康 复 矫 治 帮 扶 犯 罪 预 防 应 急 事 件 等, 服 务 人 群 包 括 儿 童 青 少 年 妇 女 老 人 及 残 疾

Microsoft Word - RAP CHI.doc

赔 偿 ), 保 险 公 司 在 其 承 保 范 围 内 承 担 赔 偿 责 任 ;2 案 件 受 理 费 由 四 被 告 承 担 为 支 持 其 诉 讼 主 张, 原 告 江 明 相 在 举 证 期 限 内 向 本 院 提 供 了 下 列 证 据 材 料 供 法 庭 组 织 质 证 : 1 鉴 定


<4D F736F F D20D6D0CEC4B7A88C57B454CABF8C57CEBBD593CEC4D28EB9A0>

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

Flink快速上手之Scala API使用

週次

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

Microsoft Word - 01.DOC

EJB-Programming-3.PDF

jsp

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

Apache Spark 2.4 新增内置函数和高阶函数使用介绍

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

2014年全国体操锦标赛

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

雲端 Cloud Computing 技術指南 運算 應用 平台與架構 10/04/15 11:55:46 INFO 10/04/15 11:55:53 INFO 10/04/15 11:55:56 INFO 10/04/15 11:56:05 INFO 10/04/15 11:56:07 INFO

张家口:塞外明珠

Elasticsearch课件.key

Ubuntu和CentOS如何配置SSH使得无密码登陆

在Fedora上部署Hadoop2.2.0伪分布式平台

金山词霸的教程_金山软件介绍

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

Microsoft Word 级第二专业学士学位培养计划.doc

PDF 入不思議解脫境界普賢行願品 ... 《大方廣佛華嚴經卷第四十》

近四年网络工程专业培养方案.doc

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

}; "P2VTKNvTAnYNwBrqXbgxRSFQs6FTEhNJ", " " string imagedata; if(0!= read_image("a.jpg",imagedata)) { return -1; } string rsp; ytopen_sdk m_sd

208 中 南 大 学 学 报 ( 社 会 科 学 版 ) 2013 年 第 19 卷 第 6 期 节 目 录 上 卷 一 所 载 篇 名, 乃 总 目 录 中 篇 名 之 误, 正 文 卷 一 收 录 篇 名 为 月 支 使 者 玄 觉 杜 凝 妻 灌 国 婴 女 独 狐 及 吕 卿 均 五 篇

ebook

! " # $ % & (( %) "*+,- &.(/-) & ( 0 & 1! % " % # % & & $ % "/()%!"# (( (02-03 /(((.1/.2( 4 //). /$0 3)0%. /1/%-2 (( ) / ((0 // "*+,- &.(/-) & ( 0 & 1

XXXXXXXX

目 錄 版 次 變 更 記 錄... 2 原 始 程 式 碼 類 型 之 使 用 手 冊... 3 一 安 裝 軟 體 套 件 事 前 準 備... 3 二 編 譯 流 程 說 明

三种方法实现Hadoop(MapReduce)全局排序(2)

Transcription:

使用 Apache Spark 将数据写入 ElasticSearch ElasticSearch 是一个基于 Lucene 的搜索服务器 它提供了一个分布式多用户能力的全文搜索引擎, 基于 RESTful web 接口 Elasticsearch 是用 Java 开发的, 并作为 Apache 许可条款下的开放源码发布, 是当前流行的企业级搜索引擎 设计用于云计算中, 能够达到实时搜索, 稳定, 可靠, 快速, 安装使用方便 本文并不打算介绍 ElasticSearch 的概念, 安装部署等知识, 而是直接介绍如何使用 Apache S park 将数据写入到 ElasticSearch 中 本文使用的是类库是 elasticsearchhadoop, 其从 2.1 版本开始提供了内置支持 Apache Spark 的功能, 在使用 elasticsearchhadoop 之前, 我们需要引入依赖 : <dependency> <groupid>org.elasticsearch</groupid> <artifactid>elasticsearch-hadoop</artifactid> <version>2.3.4</version> </dependency> 为了方便, 本文直接在 spark-shell 中操作 ElasticSearch 在此之前, 我们需要在 $SPARK_HO ME/conf/spark-default.conf 文件中加入以下配置 : spark.es.nodes www.iteblog.com spark.es.port 9200 其中 spark.es.nodes 指定你 es 集群的机器列表, 但是不需要把你集群所有的节点都列在里面 ;spa rk.es.port 表示集群 HTTP 端口 之所以要加上 spark 前缀是因为 Spark 通过从文件里面或者命令行里面读取配置参数只会加载 spark 开头的, 其他的参数将会被忽略 之后 elasticsearchhadoop 会把 spark 前缀去掉 如果你直接将代码写入文件, 那么你可以在初始化 SparkContext 之前设置好 ElasticSearch 相关的参数, 如下 : import org.apache.spark.sparkconf val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.nodes", "www.iteblog.com") conf.set("es.port", "9200") 1 / 8

conf.set("es.index.auto.create", "true") 在写入数据之前, 先导入 org.elasticsearch.spark._ 包, 这将使得所有的 RDD 拥有 savetoes 方法 下面我将一一介绍将不同类型的数据写入 ElasticSearch 中 将 Map 对象写入 ElasticSearch scala> import org.elasticsearch.spark._ import org.elasticsearch.spark._ scala> val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) numbers: scala.collection.immutable.map[string,int] = Map(one -> 1, two -> 2, three -> 3) scala> val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran") airports: scala.collection.immutable.map[string,string] = Map(OTP -> Otopeni, SFO -> San Fran ) scala> sc.makerdd(seq(numbers, airports)).savetoes("iteblog/docs") 上面构建了两个 Map 对象, 然后将它们写入到 ElasticSearch 中 ; 其中 savetoes 里面参数的 iteblog 表示索引 (indexes), 而 docs 表示 type 然后我们可以通过下面 URL 查看 iteblog 这个 index 的属性 : curl -XGET https://www.iteblog.com:9200/iteblog "iteblog": "aliases":, "mappings": "docs": "properties": "SFO": "type": "string", "arrival": "type": "string", "one": "type": "long", 2 / 8

"three": "type": "long", "two": "type": "long", "settings": "index": "creation_date": "1470805957888", "uuid": "HNIcGZ69Tf6qX3XVccwKUg", "number_of_replicas": "1", "number_of_shards": "5", "version": "created": "2030499", "warmers": 同时使用下面 URL 搜索出所有的 documents: https://www.iteblog.com:9200/iteblog/docs/_search "took": 2, "timed_out": false, "_shards": "total": 5, "successful": 5, "failed": 0, "hits": "total": 2, "max_score": 1, "hits": [ "_index": "iteblog", "_type": "docs", 3 / 8

], "_id": "AVZy3d5sJfxPRwCjtWM9", "_score": 1, "_source": "one": 1, "two": 2, "three": 3 "_index": "iteblog", "_type": "docs", "_id": "AVZy3d5sJfxPRwCjtWM-", "_score": 1, "_source": "arrival": "Otopeni", "SFO": "San Fran" 将 case class 对象写入 ElasticSearch 我们还可以将 Scala 中的 case class 对象写入到 ElasticSearch;Java 中可以写入 JavaBean 对象, 如下 : scala> case class Trip(departure: String, arrival: String) defined class Trip scala> val upcomingtrip = Trip("OTP", "SFO") upcomingtrip: Trip = Trip(OTP,SFO) scala> val lastweektrip = Trip("MUC", "OTP") lastweektrip: Trip = Trip(MUC,OTP) scala> val rdd = sc.makerdd(seq(upcomingtrip, lastweektrip)) rdd: org.apache.spark.rdd.rdd[trip] = ParallelCollectionRDD[1] at makerdd at <console>:37 scala> rdd.savetoes("iteblog/class") 4 / 8

上面的代码片段将 upcomingtrip 和 lastweektrip 写入到名为 iteblog 的 _index 中,type 是 class 上面都是通过隐式转换才使得 rdd 拥有 savetoes 方法 elasticsearchhadoop 还提供显式方法来把 RDD 写入到 ElasticSearch 中, 如下 : scala> import org.elasticsearch.spark.rdd.esspark import org.elasticsearch.spark.rdd.esspark scala> val rdd = sc.makerdd(seq(upcomingtrip, lastweektrip)) rdd: org.apache.spark.rdd.rdd[trip] = ParallelCollectionRDD[0] at makerdd at <console>:34 scala> EsSpark.saveToEs(rdd, "spark/docs") 将 Json 字符串写入 ElasticSearch 我们可以直接将 Json 字符串写入到 ElasticSearch 中, 如下 : scala> val json1 = """"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"""" json1: String = "id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop" scala> val json2 = """"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"""" json2: String = "id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop" scala> sc.makerdd(seq(json1, json2)).savejsontoes("iteblog/json") 动态设置插入的 type 上面的示例都是将写入的 type 写死 有很多场景下同一个 Job 中有很多类型的数据, 我们希望一次就可以将不同的数据写入到不同的 type 中, 比如属于 book 的信息全部写入到 type 为 book 里面 ; 而属于 cd 的信息全部写入到 type 为 cd 里面 很高兴的是 elasticsearchhadoop 为我们提供了这个功能, 如下 : scala> val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994") game: scala.collection.immutable.map[string,string] = Map(media_type -> game, title -> FF VI, year -> 1994) scala> val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") book: scala.collection.immutable.map[string,string] = Map(media_type -> book, title -> Harry P otter, year -> 2010) scala> val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") 5 / 8

cd: scala.collection.immutable.map[string,string] = Map(media_type -> music, title -> Surfing W ith The Alien) scala> sc.makerdd(seq(game, book, cd)).savetoes("iteblog/media_type") type 是通过 media_type 通配符设置的, 这个在写入的时候可以获取到, 然后将不同类型的数据写入到不同的 type 中 自定义 id 在 ElasticSearch 中,_index/_type/_id 的组合可以唯一确定一个 Document 如果我们不指定 id 的话,ElasticSearch 将会自动为我们生产全局唯一的 id, 自动生成的 ID 有 20 个字符长如下 : "_index": "iteblog", "_type": "docs", "_id": "AVZy3d5sJfxPRwCjtWM-", "_score": 1, "_source": "arrival": "Otopeni", "SFO": "San Fran" 很显然, 这么长的字符串没啥意义, 而且也不便于我们记忆使用 不过我们可以在插入数据的时候手动指定 id 的值, 如下 : scala> val otp = Map("iata" -> "OTP", "name" -> "Otopeni") otp: scala.collection.immutable.map[string,string] = Map(iata -> OTP, name -> Otopeni) scala> val muc = Map("iata" -> "MUC", "name" -> "Munich") muc: scala.collection.immutable.map[string,string] = Map(iata -> MUC, name -> Munich) scala> val sfo = Map("iata" -> "SFO", "name" -> "San Fran") sfo: scala.collection.immutable.map[string,string] = Map(iata -> SFO, name -> San Fran) scala> val airportsrdd = sc.makerdd(seq((1, otp), (2, muc), (3, sfo))) scala> airportsrdd.savetoeswithmeta("iteblog/2015") 6 / 8

上面的 Seq((1, otp), (2, muc), (3, sfo)) 语句指定为各个对象指定了 id 值, 分别为 1 2 3 然后你可以通过 /iteblog/2015/1 URL 搜索到 otp 对象的值 我们还可以如下方式指定 id: scala> val json1 = """"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"""" json1: String = "id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop" scala> val json2 = """"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"""" json2: String = "id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop" scala> val rdd = sc.makerdd(seq(json1, json2)) scala> EsSpark.saveToEs(rdd, "iteblog/docs", Map("es.mapping.id" -> "id")) 上面通过 es.mapping.id 参数将对象中的 id 字段映射为每条记录的 id 自定义记录的元数据 我们甚至可以在写入数据的时候自定义记录的元数据, 如下 : scala> import org.elasticsearch.spark.rdd.metadata._ import org.elasticsearch.spark.rdd.metadata._ scala> val otp = Map("iata" -> "OTP", "name" -> "Otopeni") otp: scala.collection.immutable.map[string,string] = Map(iata -> OTP, name -> Otopeni) scala> val muc = Map("iata" -> "MUC", "name" -> "Munich") muc: scala.collection.immutable.map[string,string] = Map(iata -> MUC, name -> Munich) scala> val sfo = Map("iata" -> "SFO", "name" -> "San Fran") sfo: scala.collection.immutable.map[string,string] = Map(iata -> SFO, name -> San Fran) scala> val otpmeta = Map(ID -> 1, TTL -> "3h") scala> val mucmeta = Map(ID -> 2, VERSION -> "23") scala> val sfometa = Map(ID -> 3) 7 / 8

Powered by TCPDF (www.tcpdf.org) 使用 Apache Spark 将数据写入 ElasticSearch scala> val airportsrdd = sc.makerdd(seq((otpmeta, otp), (mucmeta, muc), (sfometa, sfo))) scala> airportsrdd.savetoeswithmeta("iteblog/2015") 上面代码片段分别为 otp muc 和 sfo 设置了不同的元数据, 这在很多场景下是非常有用的 好了不早了, 该洗洗睡, 后面我将介绍如何使用 Apache Spark 读取 ElasticSearch 中的数据 本博客文章除特别声明, 全部都是原创! 禁止个人和公司转载本文 谢谢理解 : 过往记忆 (https://www.iteblog.com/) 本文链接 : () 8 / 8