使用Cassandra和Spark 2.0实现Rest API服务

Similar documents
Spark读取Hbase中的数据

使用MapReduce读取XML文件

通过Hive将数据写入到ElasticSearch

Flume-ng与Mysql整合开发

Apache CarbonData集群模式使用指南

Hive:用Java代码通过JDBC连接Hiveserver

Guava学习之Resources

Hadoop&Spark解决二次排序问题(Hadoop篇)

使用Spark SQL读取Hive上的数据

新・解きながら学ぶJava

在Spring中使用Kafka:Producer篇

如何在 Apache Hive 中解析 Json 数组

使用Hive读取ElasticSearch中的数据

Spark 2.0介绍:在Spark SQL中定义查询优化规则

Kafka客户端是如何找到 leader 分区的


Hadoop元数据合并异常及解决方法

使用Apache Spark将数据写入ElasticSearch

韶关:神奇丹霞

(TestFailure) JUnit Framework AssertionFailedError JUnit Composite TestSuite Test TestSuite run() run() JUnit

Partition Key: 字 符 串 类 型, 表 示 当 前 Entity 的 分 区 信 息 这 个 Property 对 于 Table Service 自 动 纵 向 和 横 向 扩 展 至 关 重 要 Row Key: 字 符 串 类 型, 在 给 定 Partition Key 的

哼, 你 們 不 回 答 又 怎 麼 樣? 不 管 是 多 大 來 頭, 現 在 都 被 血 魔 吞 噬 無 蹤 了 你 們 幾 個 真 是 太 過 分, 我 不 犯 你 們, 你 們 卻 一 天 到 晚 來 挑 釁 我 教 尊 冷 笑 著 說 道 嗚, 大 人 土 地 大 姐 跪 下 來, 流 下

Microsoft Word - 01.DOC

Guava学习之CharSequenceReader

SparkR(R on Spark)编程指南

三种方法实现Hadoop(MapReduce)全局排序(1)

伊春:醉人林都

1 1 大概思路 创建 WebAPI 创建 CrossMainController 并编写 Nuget 安装 microsoft.aspnet.webapi.cors 跨域设置路由 编写 Jquery EasyUI 界面 运行效果 2 创建 WebAPI 创建 WebAPI, 新建 -> 项目 ->

WebSphere Studio Application Developer IBM Portal Toolkit... 2/21 1. WebSphere Portal Portal WebSphere Application Server stopserver.bat -configfile..

关林:武圣陵寝

泰山:五岳独尊

untitled

国内26省市新能源汽车推广规划已出台

EJB-Programming-4-cn.doc

Flink快速上手之Scala API使用

前言 C# C# C# C C# C# C# C# C# microservices C# More Effective C# More Effective C# C# C# C# Effective C# 50 C# C# 7 Effective vii

untitled

北戴河:海阔天空

chp6.ppt

EJB-Programming-3.PDF

六种使用Linux命令发送带附件的邮件

Java java.lang.math Java Java.util.Random : ArithmeticException int zero = 0; try { int i= 72 / zero ; }catch (ArithmeticException e ) { // } 0,

1

Java

西岭雪山滑雪场

untitled

使用Apache Beam读写HDFS上的文件

59 1 CSpace 2 CSpace CSpace URL CSpace 1 CSpace URL 2 Lucene 3 ID 4 ID Web 1. 2 CSpace LireSolr 3 LireSolr 3 Web LireSolr ID

Flink on YARN部署快速入门指南

Flink快速上手(QuickStart)

Chapter 9: Objects and Classes

附件三

2 WF 1 T I P WF WF WF WF WF WF WF WF 2.1 WF WF WF WF WF WF

没 有 多 余 的 Contruol 或 Action 了 原 来 Domain 层 被 服 务 层 Service layer 遮 挡, 在 右 边 图 中, 则 Domain 层 直 接 暴 露 给 前 台 了, 没 有 被 遮 挡, 裸 露 了 这 样 一 步 到 位 实 现 领 域 模 型

PowerPoint 演示文稿

untitled

概述

南華大學數位論文

自定义Spark Streaming接收器(Receivers)

1.JasperReport ireport JasperReport ireport JDK JDK JDK JDK ant ant...6

Chapter 9: Objects and Classes

ext-web-auth-wlc.pdf

epub 61-2

江门:中国第一侨乡

Microsoft Word - 第3章.doc

是 喔, 就 是 那 個 在 BBS 醫 療 版 跟 你 嗆 聲, 自 稱 有 三 十 多 年 推 拿 經 驗 的 大 叔 嗎? 一 個 看 來 頗 為 清 秀 的 女 生 問 道, 她 語 氣 中 略 感 訝 異 是 啊, 什 麼 推 拿 按 摩 有 多 好, 還 要 人 生 病 盡 量 不 要

ex

<4D F736F F D20AC4FBDBDA4FBB67DA96CAABA2DA743A67EAFC5AAA95FA7B9BD5A5F2E646F63>

PowerPoint 簡報

在 ongodb 中实现强事务

教育扩张能改善收入分配差距吗?——来自CHNS2006年数据的证据

山水文化,市井人家——以湖州邱城小镇的概念性规划为例

团 学 要 闻 我 校 召 开 共 青 团 五 届 九 次 全 委 ( 扩 大 ) 会 议 3 月 17 日, 我 校 共 青 团 五 届 九 次 全 委 ( 扩 大 ) 会 议 在 行 政 办 公 楼 五 楼 会 议 室 举 行, 校 团 委 委 员 各 院 ( 系 ) 团 委 书 记 校 学 生

詞 彙 表 編 號 詞 彙 描 述 1 預 約 人 資 料 中 文 姓 名 英 文 姓 名 身 份 證 字 號 預 約 人 電 話 性 別 2 付 款 資 料 信 用 卡 別 信 用 卡 號 信 用 卡 有 效 日 期 3 住 房 條 件 入 住 日 期 退 房 日 期 人 數 房 間 數 量 入

Kubenetes 系列列公开课 2 每周四晚 8 点档 1. Kubernetes 初探 2. 上 手 Kubernetes 3. Kubernetes 的资源调度 4. Kubernetes 的运 行行时 5. Kubernetes 的 网络管理理 6. Kubernetes 的存储管理理 7.

09 (File Processes) (mkdir) 9-3 (createnewfile) 9-4 (write) 9-5 (read) 9-6 (deletefile) 9-7 (deletedir) (Exercises)

"+handlermethod.getbean().getclass().getname()); public void aftercompletion(httpservletrequest req, HttpServletResponse resp, Object handler, Excepti

获取 Access Token access_token 是接口的全局唯一票据, 接入方调用各接口时都需使用 access_token 开发者需要进行妥善保存 access_token 的存储至少要保留 512 个字符空间 access_token 的有效期目前为 2 个小时, 需定时刷新, 重复

2 Requirements Documentation Doc Name Doc No. 1.0 Version No. Total Page Generated By V6.0.0 Generated Date 2011/10/21 Checked By Checked Date 2011/10

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

目 录 第 五 部 分 第 六 部 分 第 七 部 分 第 八 部 分 投 标 邀 请 投 标 人 须 知 附 表 评 标 方 法 和 评 分 细 则 项 目 需 求 和 技 术 方 案 要 求 1

RunPC2_.doc

Ubuntu和CentOS如何配置SSH使得无密码登陆

1-1 database columnrow record field 不 DBMS Access Paradox SQL Server Linux MySQL Oracle IBM Informix IBM DB2 Sybase 1-2

エスポラージュ株式会社 住所 : 東京都江東区大島 東急ドエルアルス大島 HP: ******************* * 关于 Java 测试试题 ******

案例分享产品文档

目 录 1. 前 言 为 什 么 要 用 ESB 技 术 发 展 业 务 需 求 IT 需 求 IT 与 业 务 一 致 性 要 求 TongIntegrator ESB v5 简 介..

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

主程式 : public class Main3Activity extends AppCompatActivity { ListView listview; // 先整理資料來源,listitem.xml 需要傳入三種資料 : 圖片 狗狗名字 狗狗生日 // 狗狗圖片 int[] pic =new

杭师大党字〔2011〕15号中共杭州师范大学委员会关于进一步加强和改进发展党员工作的意见

<4D F736F F D A67EAF64BEC7BCFABEC7AAF7C2B2B3B95FA5FEB3A1AAA95F2D31312E31362E646F63>

得 依 法 召 集 股 東 臨 時 會 第 十 一 條 : 股 東 常 會 之 召 集 應 於 開 會 三 十 日 前, 股 東 臨 時 會 之 召 集 應 於 開 會 十 五 日 前, 將 開 會 日 期 地 點 及 召 集 事 由 通 知 各 股 東 並 公 告 之 第 十 二 條 : 本 公

同 時, 那 些 百 萬 富 翁 們 正 乘 坐 着 私 家 噴 射 機 駛 往 歐 洲, 甘 願 花 大 把 的 鈔 票 接 受 替 代 療 法 並 且 重 獲 了 健 康 替 代 療 法 總 是 很 靈 嗎? 不, 當 然 不 是 在 這 世 界 上 没 有 盡 善 盡 美 的 事 物 但 是

<4D F736F F D B2C431A6B8A4A4A4DFA8C6B0C8B77CC4B3ACF6BFFD E646F63>

untitled

高校发展动态

行 业 市 场 研 究 属 于 企 业 战 略 研 究 范 畴, 作 为 当 前 应 用 最 为 广 泛 的 咨 询 服 务, 其 研 究 报 告 形 式 呈 现, 通 常 包 含 以 下 内 容 : 一 份 专 业 的 行 业 研 究 报 告, 注 重 指 导 企 业 或 投 资 者 了 解 该

untitled

1: public class MyOutputStream implements AutoCloseable { 3: public void close() throws IOException { 4: throw new IOException(); 5: } 6:

untitled

Transcription:

使用 Cassandra 和 Spark 2.0 实现 Rest API 服务 在这篇文章中, 我将介绍如何在 Spark 中使用 Akkahttp 并结合 Cassandra 实现 REST 服务, 在这个系统中 Cassandra 用于数据的存储 我们已经见识到 Spark 的威力, 如果和 Cassandra 正确地结合可以实现更强大的系统 我们先创建一个 build.sbt 文件, 内容如下 : name := "cassandra-spark-akka-http-starter-kit" version := "1.0" scalaversion := "2.11.8" organization := "com.iteblog" val akkav = "2.4.5" librarydependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.0", "org.apache.spark" % "spark-sql_2.11" % "2.0.0", "com.typesafe.akka" %% "akka-http-core" % akkav, "com.typesafe.akka" %% "akka-http-experimental" % akkav, "com.typesafe.akka" %% "akka-http-testkit" % akkav % "test", "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkav, "org.scalatest" %% "scalatest" % "2.2.6" % "test", "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3", "net.liftweb" % "lift-json_2.11" % "2.6.2" ) assembleartifact in assemblypackagescala := false assemblymergestrategy in assembly := { case m if m.tolowercase.endswith("manifest.mf") => MergeStrategy.discard case m if m.tolowercase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first ivyscala := ivyscala.value map { _.copy(overridescalaversion = true) fork in run := true 1 / 6

上面我们把 assembleartifact in assemblypackagescala 设置为 false, 因为 Spark 已经包含了 Scala library, 所以我们不需要再包含了 样本类 User 定义 User 累仅仅包含 id 名字以及 Email 等信息, 定义如下 : package com.iteblog.domain case class User(id: String, name: String, email: String) 数据访问层 下面代码片段是数据访问层的实现 : package com.iteblog.factories import com.iteblog.domain.user import com.typesafe.config.configfactory import org.apache.spark.sparkconf import org.apache.spark.sql.sparksession import com.datastax.spark.connector._ import scala.util.try trait DatabaseAccess { import Context._ def create(user: User): Boolean = Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tablename)).tooption.isdefined def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tablena me).where(s"id='$id'").collect()).tooption object DatabaseAccess extends DatabaseAccess 2 / 6

object Context { val config = ConfigFactory.load() val url = config.getstring("cassandra.url") val sparkconf: SparkConf = new SparkConf().setAppName("Saprk-cassandra-akka-restexample").setMaster("local[4]").set("spark.cassandra.connection.host", url) val spark = SparkSession.builder().config(sparkConf).getOrCreate() val sc = spark.sparkcontext val keyspace = config.getstring("cassandra.keyspace") val tablename = config.getstring("cassandra.tablename") 服务层 下面是路由文件的实现代码 : package com.iteblog.routes import java.util.uuid import akka.actor.actorsystem import akka.event.logging import akka.http.scaladsl.model._ import akka.http.scaladsl.server.directives._ import akka.http.scaladsl.server.{exceptionhandler, Route import akka.stream.actormaterializer import com.iteblog.domain.user import com.iteblog.factories.databaseaccess import net.liftweb.json._ import java.util.date import net.liftweb.json.extraction._ trait SparkService extends DatabaseAccess { implicit val system:actorsystem implicit val materializer:actormaterializer val logger = Logging(system, getclass) implicit def myexceptionhandler = 3 / 6

ExceptionHandler { case e: ArithmeticException => extracturi { uri => complete(httpresponse(statuscodes.internalservererror, entity = s"data is not persisted and something went wrong")) implicit val formats: Formats = new DefaultFormats { outer => override val typehintfieldname = "type" override val typehints = ShortTypeHints(List(classOf[String], classof[date])) val sparkroutes: Route = { get { path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) => complete { val documentid = "user::" + UUID.randomUUID().toString try { val user = User(documentId,name,email) val ispersisted = create(user) if (ispersisted) { HttpResponse(StatusCodes.Created, entity = s"data is successfully persisted with id $d ocumentid") else { HttpResponse(StatusCodes.InternalServerError, entity = s"error found for id : $docum entid") catch { case ex: Throwable => logger.error(ex, ex.getmessage) HttpResponse(StatusCodes.InternalServerError, entity = s"error found for id : $docum entid") ~ path("retrieve" / "id" / Segment) { (listofids: String) => get { complete { try { val idasrdd: Option[Array[User]] = retrieve(listofids) idasrdd match { case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headoption.fold("")(x => compact(render(decompose(x))))) 4 / 6

case None => HttpResponse(StatusCodes.InternalServerError, entity = s"data is not fet ched and something went wrong") catch { case ex: Throwable => logger.error(ex, ex.getmessage) HttpResponse(StatusCodes.InternalServerError, entity = s"error found for ids : $listofi ds") 服务启动 现在我们需要编写一个用于启动服务的类, 其主要目的是启动一个 HTTP 服务, 这样可以供用户调用, 如下 : package com.iteblog import akka.actor.actorsystem import akka.http.scaladsl.http import akka.stream.actormaterializer import com.iteblog.routes.sparkservice import com.iteblog.factories.context class StartSparkServer(implicit val system: ActorSystem, implicit val materializer: ActorMaterializer) extends SparkService { def startserver(address: String, port: Int) = { Http().bindAndHandle(sparkRoutes, address, port) object StartApplication extends App { StartApp object StartApp { implicit val system: ActorSystem = ActorSystem("Spark-Couchbase-Service") implicit val executor = system.dispatcher 5 / 6

Powered by TCPDF (www.tcpdf.org) 使用 Cassandra 和 Spark 2.0 实现 Rest API 服务 implicit val materializer = ActorMaterializer() val server = new StartSparkServer() val config = Context.config val serverurl = config.getstring("http.interface") val port = config.getint("http.port") server.startserver(serverurl, port) 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 6 / 6