Skip to content

Running an instance of Spark and Cassandra

Dor Laor edited this page Apr 7, 2015 · 6 revisions

The following demonstrate running 1 node of Spark and Cassandra on the same server. It use 2.1.2, Spark 1.1.1, and spark-cassandra-connector 1.1.2 None of these is the latest version, but this combination seems to work.

Prerequisites

  • Java Install the following package:

openjdk-7-jre openjdk-7-jdk git

Validate with java -version

  • Scala (Spark prerequisites)
wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
sudo mkdir /usr/local/src/scala
sudo cp scala-2.10.4.tgz /usr/local/src/scala
cd /usr/local/src/scalay
sudo tar xvf /usr/local/src/scala/scala-2.10.4.tgz

cd ~
Add to ~/.bashrc
export SCALA_HOME=/usr/local/src/scala/scala-2.10.4
export PATH=$SCALA_HOME/bin:$PATH
. .bashrc

Cassandra

echo "deb http://debian.datastax.com/community stable main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
curl -L http://debian.datastax.com/debian/repo_key | sudo apt-key add -
sudo apt-get update
sudo apt-get install cassandra=2.1.2

Source: https://www.datastax.com/documentation/cassandra/2.0/cassandra/install/installDeb_t.html

  • Spark
wget http://www.us.apache.org/dist/spark/spark-1.1.1/spark-1.1.1.tgz 
tar xvf spark-1.1.1.tgz
cd spark-1.1.1
sbt/sbt assembly

(Warning: download most of the internet)

  • Spark Cassandra Connector
git clone https://github.com/datastax/spark-cassandra-connector.git
git checkout b1.2
cd spark-cassandra-connector
sbt/sbt assembly
  • Provision Cassandra with cqlsh
cqlsh> CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
cqlsh> CREATE TABLE test.kv(key text PRIMARY KEY, value int);
cqlsh> INSERT INTO test.kv(key, value) VALUES ('key1', 1);
cqlsh> INSERT INTO test.kv(key, value) VALUES ('key2', 2);
  • Running Spark CLI
cd cd spark-1.1.1
bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.1.2-SNAPSHOT.jar 

Importing relevant libs:

scala> sc.stop
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf

Setting the context Cassandra interaction examples

scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "your server ip")
scala> val sc = new SparkContext(conf);
scala> val rdd = sc.cassandraTable("test", "kv")

scala> println(rdd.first)
scala> println(rdd.count)
scala> println(rdd.map(_.getInt("value")).sum)
scala> val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
scala> collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
scala> val firstRow = rdd.first
scala> firstRow.columnNames
scala> firstRow.size
scala> firstRow.getInt("value")

Working with CassandraSQLContext

scala> import org.apache.spark.sql.cassandra.CassandraSQLContext
scala> val cc = new CassandraSQLContext(sc)
scala> import org.apache.spark.sql.SchemaRDD;
scala> val rdd: SchemaRDD = cc.sql("SELECT * from test.kv");
scala> println(rdd.first)

A fantastic and almost real-life simple example is at:

https://gist.github.com/erichgess/a02aefddd6231c91babb#file-cassandra-schemas

Source:

Clone this wiki locally