tags | projects | ||||
---|---|---|---|---|---|
|
|
This guide walks you through the process of using Apache Spark and Akka-http to build an application that stores data in and retrieves it from Couchbase, a document-based database.
You will build a REST api that will run on your development machine. You will access it through your browser at an address like :
http://localhost:8080/
With this REST Api, you’ll be able to perform the basic CRUD (Create,Read,Update,Delete) operations.
-
Create Route: http://localhost:8080/insert/name/shivansh/email/shiv4nsh@gmail.com
-
UpdateViaKeyValuePair Route: http://localhost:8080/updateViaKV/name/shivansh/email/shivansh@knoldus.com/id/user::267316c9-75a2-49d2-ae25-791d82b3d5fa
-
Retrieve Via Document id: http://localhost:8080/getViaKV/id/user::267316c9-75a2-49d2-ae25-791d82b3d5fa
-
Retrieve via View: http://localhost:8080/getViaView/name/shivansh
-
Retrieve via N1QL: http://localhost:8080/getViaN1Ql/name/shivansh
-
Delete via Document id: http://localhost:8080/delete/id/user::267316c9-75a2-49d2-ae25-791d82b3d5fa
In this demonstration, you’ll be interacting with information about people: their name and their email. The Couchbase database will contain one bucket named "userBucket" that contains people. Each person document will be represented in JSON:
You will be using Apache Spark , Akka-http and Couchbase Spark Connectorfor this project.
-
15-30 minutes.
-
Lightbend Activator(follow the instructions to install activator and run your first activator application, if you have this activator thtn you do not need the sbt and the scala code editor as it is already provided with it.)
-
Couchbase Server 4.5+ (follow the instructions to install Couchbase and create a bucket - this guide assumes you will have it running locally)
With your project set up, you can install and launch Couchbase.
For whatever operating system you are using, instructions and downloads can be found at How to install Couchbase?.
After you install Couchbase, launch it. You should see a webpage opening in your default browser allowing you to setup Couchbase
The following code will rely on Apache Spark , spark-couchbase connector and akka-http so make sure you add the correct dependency.Your build.sbt should look like this:
name := "spark-couchbase-akka-http-starter-kit"
version := "1.0"
scalaVersion := "2.11.7"
organization := "com.knoldus"
val akkaV = "2.4.5"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "1.6.1",
"com.typesafe.akka" %% "akka-http-core" % akkaV,
"com.typesafe.akka" %% "akka-http-experimental" % akkaV,
"com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"com.couchbase.client" %% "spark-connector" % "1.1.0"
)
assembleArtifact in packageScala := false // We don't need the Scala library, Spark already includes it
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
fork in run := true
For making the assembly we have to specify the Merge strategy and we do not ned the scala dependency because spark already includes it.
After following the steps above, we’re ready to code CRUD operations in Couchbase
We will list all the important steps with the headings hence you can see it below.
In this we will add all the functions related to the Couchbase interactions and will initialize the spark context with couchbase configurations.
//loading the Configuration file
val config = ConfigFactory.load("application.conf")
val couchbaseUrl = config.getString("couchbase.url")
val bucketName = config.getString("couchbase.bucketName")
val bucketPassword = config.getString("couchbase.bucketPassword")
// setting the spark configuration
val sparkConf: SparkConf = new SparkConf()
.setAppName("spark-akka-http-couchbase-starter-kit").setMaster("local")
.set("com.couchbase.nodes", couchbaseUrl).set(s"com.couchbase.bucket.$bucketName", bucketPassword)
//initializing the spark Context
val sc = new SparkContext(sparkConf)
This function will return true if the document is successfully saved and if not it will return false.
def persistOrUpdate(documentId: String, jsonObject: JsonObject): Boolean = {
val jsonDocument = JsonDocument.create(documentId, jsonObject)
val savedData = sc.parallelize(Seq(jsonDocument))
Try(savedData.saveToCouchbase()).toOption.fold(false)(x => true)
}
Couchbase has an idea of document "revisions", which you can read about in the main documentation. If you examine the document after this update, you can see the revision number has increased by one.
There are three ways of retrieving the document.We will cover all three here.
val NIQLQUERY = s"SELECT * FROM `$bucketName` WHERE name LIKE"
val VIEWNAME = "emailtoName"
val DDOCNAME = "userddoc"
//Reirieving document via N1ql query
def getViaN1Ql(name: String): Option[Array[String]] = {
val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(NIQLQUERY + s"'$name%'")).collect()).toOption
n1qlRDD.map(_.map(a => a.value.toString))
}
//Retrieving data via Couchbase View.
def getViaView(name: String): Option[Array[String]] = {
val viewRDDData = Try(sc.couchbaseView(ViewQuery.from(DDOCNAME, VIEWNAME).startKey(name)).collect()).toOption
viewRDDData.map(_.map(a => a.value.toString))
}
//Retrieving data via Couchbase Document Id (Key Value Pair)
def getViaKV(listOfDocumentIds: String): Option[Array[String]] = {
val idAsRDD = sc.parallelize(listOfDocumentIds.split(","))
Try(idAsRDD.couchbaseGet[JsonDocument]().map(_.content.toString).collect).toOption
}
Deleting a document is a single, straightforward call.
def getNIQLDeleteQuery(documentId: String) =s"""DELETE FROM $bucketName p USE KEYS "$documentId" RETURNING p"""
def deleteViaId(documentID: String): Option[Array[String]] = {
val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(getNIQLDeleteQuery(documentID))).collect()).toOption
n1qlRDD.map(_.map(a => a.value.toString))
}
implicit val system:ActorSystem
implicit val materializer:ActorMaterializer
val logger = Logging(system, getClass)
// Default Exception Handler
implicit def myExceptionHandler =
ExceptionHandler {
case e: ArithmeticException =>
extractUri { uri =>
complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
}
}
Here is a sample code for writing a single route
val sparkRoutes: Route = {
get {
path("insert" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
complete {
val documentId = "user::" + UUID.randomUUID().toString
try {
val jsonObject = JsonObject.create().put("name", name).put("email", email)
val isPersisted = persistOrUpdate(documentId, jsonObject)
isPersisted match {
case true => HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
case false => HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
} catch {
case ex: Throwable =>
logger.error(ex, ex.getMessage)
HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
}
}
}
}
You can find all the routes in the file SparkServices.scala located in src/main/scala/com/knoldus/couchbaseServices/routes/SparkServices.scala
Read the application.conf file for the configurations of server port and start the server.
class StartSparkServer(implicit val system: ActorSystem,
implicit val materializer: ActorMaterializer) extends SparkService {
def startServer(address: String, port: Int) = {
Http().bindAndHandle(sparkRoutes, address, port)
}
}
object StartApplication extends App {
StartApp
}
object StartApp {
implicit val system: ActorSystem = ActorSystem("Spark-Couchbase-Service")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val server = new StartSparkServer()
val config = server.config
val serverUrl = config.getString("http.interface")
val port = config.getInt("http.port")
server.startServer(serverUrl, port)
}
At this point, the code is ready to run. We haven’t added any code to display contents of documents. You may wish to add logging statements and examine output on the console, or even step through the application with the debugger to see the results.
You can run the application using the following command inisde the code directory
sbt run
and if you are using the activator then use
activator run