Skip to content

leetoo/spark-akka

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

tags projects
couchbase
spark
akka-http
spark-couchbase-akka-http-starter-kit

This guide walks you through the process of using Apache Spark and Akka-http to build an application that stores data in and retrieves it from Couchbase, a document-based database.

What you’ll build

You will build a REST api that will run on your development machine. You will access it through your browser at an address like :

http://localhost:8080/

With this REST Api, you’ll be able to perform the basic CRUD (Create,Read,Update,Delete) operations.

In this demonstration, you’ll be interacting with information about people: their name and their email. The Couchbase database will contain one bucket named "userBucket" that contains people. Each person document will be represented in JSON:

You will be using Apache Spark , Akka-http and Couchbase Spark Connectorfor this project.

What you’ll need

  • 15-30 minutes.

  • A Scala code editor

  • Sbt

  • Lightbend Activator(follow the instructions to install activator and run your first activator application, if you have this activator thtn you do not need the sbt and the scala code editor as it is already provided with it.)

  • Couchbase Server 4.5+ (follow the instructions to install Couchbase and create a bucket - this guide assumes you will have it running locally)

Setup

Install and launch Couchbase

With your project set up, you can install and launch Couchbase.

For whatever operating system you are using, instructions and downloads can be found at How to install Couchbase?.

After you install Couchbase, launch it. You should see a webpage opening in your default browser allowing you to setup Couchbase

Setting up build.sbt

The following code will rely on Apache Spark , spark-couchbase connector and akka-http so make sure you add the correct dependency.Your build.sbt should look like this:

    name := "spark-couchbase-akka-http-starter-kit"
    version := "1.0"
    scalaVersion := "2.11.7"
    organization := "com.knoldus"
    val akkaV = "2.4.5"
    libraryDependencies ++= Seq(
      "org.apache.spark" % "spark-core_2.11" % "1.6.1",
      "com.typesafe.akka" %% "akka-http-core" % akkaV,
      "com.typesafe.akka" %% "akka-http-experimental" % akkaV,
      "com.typesafe.akka" %% "akka-http-testkit" % akkaV % "test",
      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
      "org.scalatest"     %% "scalatest" % "2.2.6" % "test",
      "com.couchbase.client" %% "spark-connector" % "1.1.0"
    )
    assembleArtifact in packageScala := false // We don't need the Scala library, Spark already includes it
    mergeStrategy in assembly := {
      case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
      case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
      case "reference.conf" => MergeStrategy.concat
      case _ => MergeStrategy.first
    }
    ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
    fork in run := true

For making the assembly we have to specify the Merge strategy and we do not ned the scala dependency because spark already includes it.

Code

After following the steps above, we’re ready to code CRUD operations in Couchbase

We will list all the important steps with the headings hence you can see it below.

Creating a separate DatabaseFactory file.

In this we will add all the functions related to the Couchbase interactions and will initialize the spark context with couchbase configurations.

Creating the Context

 //loading the Configuration file
val config = ConfigFactory.load("application.conf")
val couchbaseUrl = config.getString("couchbase.url")
val bucketName = config.getString("couchbase.bucketName")
val bucketPassword = config.getString("couchbase.bucketPassword")
// setting the spark configuration
val sparkConf: SparkConf = new SparkConf()
.setAppName("spark-akka-http-couchbase-starter-kit").setMaster("local")
.set("com.couchbase.nodes", couchbaseUrl).set(s"com.couchbase.bucket.$bucketName", bucketPassword)
//initializing the spark Context
val sc = new SparkContext(sparkConf)

Creating or Updating a document

This function will return true if the document is successfully saved and if not it will return false.

 def persistOrUpdate(documentId: String, jsonObject: JsonObject): Boolean = {
    val jsonDocument = JsonDocument.create(documentId, jsonObject)
    val savedData = sc.parallelize(Seq(jsonDocument))
    Try(savedData.saveToCouchbase()).toOption.fold(false)(x => true)
  }

Couchbase has an idea of document "revisions", which you can read about in the main documentation. If you examine the document after this update, you can see the revision number has increased by one.

Reading a document

There are three ways of retrieving the document.We will cover all three here.

val NIQLQUERY = s"SELECT * FROM `$bucketName` WHERE name LIKE"
val VIEWNAME = "emailtoName"
val DDOCNAME = "userddoc"

//Reirieving document via N1ql query
def getViaN1Ql(name: String): Option[Array[String]] = {
    val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(NIQLQUERY + s"'$name%'")).collect()).toOption
    n1qlRDD.map(_.map(a => a.value.toString))
  }
//Retrieving data via Couchbase View.
def getViaView(name: String): Option[Array[String]] = {
    val viewRDDData = Try(sc.couchbaseView(ViewQuery.from(DDOCNAME, VIEWNAME).startKey(name)).collect()).toOption
    viewRDDData.map(_.map(a => a.value.toString))
  }
//Retrieving data via Couchbase Document Id (Key Value Pair)
def getViaKV(listOfDocumentIds: String): Option[Array[String]] = {
    val idAsRDD = sc.parallelize(listOfDocumentIds.split(","))
    Try(idAsRDD.couchbaseGet[JsonDocument]().map(_.content.toString).collect).toOption
  }

Deleting a document

Deleting a document is a single, straightforward call.

  def getNIQLDeleteQuery(documentId: String) =s"""DELETE FROM $bucketName p USE KEYS "$documentId" RETURNING p"""
 def deleteViaId(documentID: String): Option[Array[String]] = {
    val n1qlRDD = Try(sc.couchbaseQuery(N1qlQuery.simple(getNIQLDeleteQuery(documentID))).collect()).toOption
    n1qlRDD.map(_.map(a => a.value.toString))
  }

Writing the Akka-http Routes

 implicit val system:ActorSystem
  implicit val materializer:ActorMaterializer
  val logger = Logging(system, getClass)
  // Default Exception Handler
  implicit def myExceptionHandler =
    ExceptionHandler {
      case e: ArithmeticException =>
        extractUri { uri =>
          complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Data is not persisted and something went wrong"))
        }
    }

Here is a sample code for writing a single route

 val sparkRoutes: Route = {
    get {
      path("insert" / "name" / Segment / "email" / Segment) { (name: String, email: String) =>
        complete {
          val documentId = "user::" + UUID.randomUUID().toString
          try {
            val jsonObject = JsonObject.create().put("name", name).put("email", email)
            val isPersisted = persistOrUpdate(documentId, jsonObject)
            isPersisted match {
              case true => HttpResponse(StatusCodes.Created, entity = s"Data is successfully persisted with id $documentId")
              case false => HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
            }
          } catch {
            case ex: Throwable =>
              logger.error(ex, ex.getMessage)
              HttpResponse(StatusCodes.InternalServerError, entity = s"Error found for id : $documentId")
          }
        }
      }
    }

You can find all the routes in the file SparkServices.scala located in src/main/scala/com/knoldus/couchbaseServices/routes/SparkServices.scala

Start the Akka-http server.

Read the application.conf file for the configurations of server port and start the server.

class StartSparkServer(implicit val system: ActorSystem,
                       implicit val materializer: ActorMaterializer) extends SparkService {
  def startServer(address: String, port: Int) = {
    Http().bindAndHandle(sparkRoutes, address, port)
  }
}

object StartApplication extends App {
  StartApp
}

object StartApp {
  implicit val system: ActorSystem = ActorSystem("Spark-Couchbase-Service")
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()
  val server = new StartSparkServer()
  val config = server.config
  val serverUrl = config.getString("http.interface")
  val port = config.getInt("http.port")
  server.startServer(serverUrl, port)
}

Run

At this point, the code is ready to run. We haven’t added any code to display contents of documents. You may wish to add logging statements and examine output on the console, or even step through the application with the debugger to see the results.

You can run the application using the following command inisde the code directory

sbt run

and if you are using the activator then use

activator run

Summary

Congratulations! You set up a Couchbase server and wrote a simple Spark-couchbase-akka-http application that stores a document in Couchbase and provides a basic REST api.

Releases

No releases published

Packages

No packages published

Languages

  • Scala 100.0%