使用 Cassandra 和 Spark 2.0 实现 Rest API 服务 在这篇文章中, 我将介绍如何在 Spark 中使用 Akkahttp 并结合 Cassandra 实现 REST 服务, 在这个系统中 Cassandra 用于数据的存储 我们已经见识到 Spark 的威力, 如果和 Cassandra 正确地结合可以实现更强大的系统 我们先创建一个 build.sbt 文件, 内容如下 : name := "cassandra-spark-akka-http-starter-kit" version := "1.0" scalaversion := "2.11.8" organization := "com.iteblog" val akkav = "2.4.5" librarydependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.0", "org.apache.spark" % "spark-sql_2.11" % "2.0.0", "com.typesafe.akka" %% "akka-http-core" % akkav, "com.typesafe.akka" %% "akka-http-experimental" % akkav, "com.typesafe.akka" %% "akka-http-testkit" % akkav % "test", "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkav, "org.scalatest" %% "scalatest" % "2.2.6" % "test", "com.datastax.spark" % "spark-cassandra-connector_2.11" % "2.0.0-M3", "net.liftweb" % "lift-json_2.11" % "2.6.2" ) assembleartifact in assemblypackagescala := false assemblymergestrategy in assembly := { case m if m.tolowercase.endswith("manifest.mf") => MergeStrategy.discard case m if m.tolowercase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first ivyscala := ivyscala.value map { _.copy(overridescalaversion = true) fork in run := true 1 / 6
上面我们把 assembleartifact in assemblypackagescala 设置为 false, 因为 Spark 已经包含了 Scala library, 所以我们不需要再包含了 样本类 User 定义 User 累仅仅包含 id 名字以及 Email 等信息, 定义如下 : package com.iteblog.domain case class User(id: String, name: String, email: String) 数据访问层 下面代码片段是数据访问层的实现 : package com.iteblog.factories import com.iteblog.domain.user import com.typesafe.config.configfactory import org.apache.spark.sparkconf import org.apache.spark.sql.sparksession import com.datastax.spark.connector._ import scala.util.try trait DatabaseAccess { import Context._ def create(user: User): Boolean = Try(sc.parallelize(Seq(user)).saveToCassandra(keyspace, tablename)).tooption.isdefined def retrieve(id: String): Option[Array[User]] = Try(sc.cassandraTable[User](keyspace, tablena me).where(s"id='$id'").collect()).tooption object DatabaseAccess extends DatabaseAccess 2 / 6
object Context { val config = ConfigFactory.load() val url = config.getstring("cassandra.url") val sparkconf: SparkConf = new SparkConf().setAppName("Saprk-cassandra-akka-restexample").setMaster("local[4]").set("spark.cassandra.connection.host", url) val spark = SparkSession.builder().config(sparkConf).getOrCreate() val sc = spark.sparkcontext val keyspace = config.getstring("cassandra.keyspace") val tablename = config.getstring("cassandra.tablename") 服务层 下面是路由文件的实现代码 : package com.iteblog.routes import java.util.uuid import akka.actor.actorsystem import akka.event.logging import akka.http.scaladsl.model._ import akka.http.scaladsl.server.directives._ import akka.http.scaladsl.server.{exceptionhandler, Route import akka.stream.actormaterializer import com.iteblog.domain.user import com.iteblog.factories.databaseaccess import net.liftweb.json._ import java.util.date import net.liftweb.json.extraction._ trait SparkService extends DatabaseAccess { implicit val system:actorsystem implicit val materializer:actormaterializer val logger = Logging(system, getclass) implicit def myexceptionhandler = 3 / 6
ExceptionHandler { case e: ArithmeticException => extracturi { uri => complete(httpresponse(statuscodes.internalservererror, entity = s"data is not persisted and something went wrong")) implicit val formats: Formats = new DefaultFormats { outer => override val typehintfieldname = "type" override val typehints = ShortTypeHints(List(classOf[String], classof[date])) val sparkroutes: Route = { get { path("create" / "name" / Segment / "email" / Segment) { (name: String, email: String) => complete { val documentid = "user::" + UUID.randomUUID().toString try { val user = User(documentId,name,email) val ispersisted = create(user) if (ispersisted) { HttpResponse(StatusCodes.Created, entity = s"data is successfully persisted with id $d ocumentid") else { HttpResponse(StatusCodes.InternalServerError, entity = s"error found for id : $docum entid") catch { case ex: Throwable => logger.error(ex, ex.getmessage) HttpResponse(StatusCodes.InternalServerError, entity = s"error found for id : $docum entid") ~ path("retrieve" / "id" / Segment) { (listofids: String) => get { complete { try { val idasrdd: Option[Array[User]] = retrieve(listofids) idasrdd match { case Some(data) => HttpResponse(StatusCodes.OK, entity = data.headoption.fold("")(x => compact(render(decompose(x))))) 4 / 6
case None => HttpResponse(StatusCodes.InternalServerError, entity = s"data is not fet ched and something went wrong") catch { case ex: Throwable => logger.error(ex, ex.getmessage) HttpResponse(StatusCodes.InternalServerError, entity = s"error found for ids : $listofi ds") 服务启动 现在我们需要编写一个用于启动服务的类, 其主要目的是启动一个 HTTP 服务, 这样可以供用户调用, 如下 : package com.iteblog import akka.actor.actorsystem import akka.http.scaladsl.http import akka.stream.actormaterializer import com.iteblog.routes.sparkservice import com.iteblog.factories.context class StartSparkServer(implicit val system: ActorSystem, implicit val materializer: ActorMaterializer) extends SparkService { def startserver(address: String, port: Int) = { Http().bindAndHandle(sparkRoutes, address, port) object StartApplication extends App { StartApp object StartApp { implicit val system: ActorSystem = ActorSystem("Spark-Couchbase-Service") implicit val executor = system.dispatcher 5 / 6
Powered by TCPDF (www.tcpdf.org) 使用 Cassandra 和 Spark 2.0 实现 Rest API 服务 implicit val materializer = ActorMaterializer() val server = new StartSparkServer() val config = Context.config val serverurl = config.getstring("http.interface") val port = config.getint("http.port") server.startserver(serverurl, port) 本博客文章除特别声明, 全部都是原创! 转载本文请加上 : 转载自过往记忆 (https://www.iteblog.com/) 本文链接 : () 6 / 6