Running an instance of Spark and Cassandra
Dor Laor edited this page Apr 7, 2015
·
6 revisions
The following demonstrate running 1 node of Spark and Cassandra on the same server. It use 2.1.2, Spark 1.1.1, and spark-cassandra-connector 1.1.2 None of these is the latest version, but this combination seems to work.
- Java Install the following package:
openjdk-7-jre openjdk-7-jdk git
Validate with java -version
- Scala (Spark prerequisites)
wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
sudo mkdir /usr/local/src/scala
sudo cp scala-2.10.4.tgz /usr/local/src/scala
cd /usr/local/src/scalay
sudo tar xvf /usr/local/src/scala/scala-2.10.4.tgz
cd ~
Add to ~/.bashrc
export SCALA_HOME=/usr/local/src/scala/scala-2.10.4
export PATH=$SCALA_HOME/bin:$PATH
. .bashrc
echo "deb http://debian.datastax.com/community stable main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
curl -L http://debian.datastax.com/debian/repo_key | sudo apt-key add -
sudo apt-get update
sudo apt-get install cassandra=2.1.2
Source: https://www.datastax.com/documentation/cassandra/2.0/cassandra/install/installDeb_t.html
- Spark
wget http://www.us.apache.org/dist/spark/spark-1.1.1/spark-1.1.1.tgz
tar xvf spark-1.1.1.tgz
cd spark-1.1.1
sbt/sbt assembly
(Warning: download most of the internet)
- Spark Cassandra Connector
git clone https://github.com/datastax/spark-cassandra-connector.git
git checkout b1.2
cd spark-cassandra-connector
sbt/sbt assembly
- Provision Cassandra with cqlsh
cqlsh> CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
cqlsh> CREATE TABLE test.kv(key text PRIMARY KEY, value int);
cqlsh> INSERT INTO test.kv(key, value) VALUES ('key1', 1);
cqlsh> INSERT INTO test.kv(key, value) VALUES ('key2', 2);
- Running Spark CLI
cd cd spark-1.1.1
bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.1.2-SNAPSHOT.jar
Importing relevant libs:
scala> sc.stop
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf
Setting the context Cassandra interaction examples
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "your server ip")
scala> val sc = new SparkContext(conf);
scala> val rdd = sc.cassandraTable("test", "kv")
scala> println(rdd.first)
scala> println(rdd.count)
scala> println(rdd.map(_.getInt("value")).sum)
scala> val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
scala> collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))
scala> val firstRow = rdd.first
scala> firstRow.columnNames
scala> firstRow.size
scala> firstRow.getInt("value")
Working with CassandraSQLContext
scala> import org.apache.spark.sql.cassandra.CassandraSQLContext
scala> val cc = new CassandraSQLContext(sc)
scala> import org.apache.spark.sql.SchemaRDD;
scala> val rdd: SchemaRDD = cc.sql("SELECT * from test.kv");
scala> println(rdd.first)
https://gist.github.com/erichgess/a02aefddd6231c91babb#file-cassandra-schemas
Source: