Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using cassandra #1

Open
renderit opened this issue Jun 15, 2016 · 6 comments
Open

Using cassandra #1

renderit opened this issue Jun 15, 2016 · 6 comments

Comments

@renderit
Copy link

renderit commented Jun 15, 2016

@soniclavier Hey great work and the blog! I was trying the storm kafka code. I'm interested to try kafka-> storm-> cassandra or hadoop.. I'm sorry can you tell me which code blocks needs to be commented out if I don't want to use mongodb or solr? Alll java files in bolt folder and Topology file?
I have storm installed and on path, but when I run:

ubuntu@ip-172-31-1-172:~/Downloads/hadoop_datascience/stormkafka$ storm jar target/stormkafka-0.0.1-SNAPSHOT.jar com.vishnu.storm.Topology vis remote
Running: /usr/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/usr/local/storm -Dstorm.log.dir=/usr/local/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /usr/local/storm/lib/clojure-1.7.0.jar:/usr/local/storm/lib/log4j-core-2.1.jar:/usr/local/storm/lib/servlet-api-2.5.jar:/usr/local/storm/lib/log4j-api-2.1.jar:/usr/local/storm/lib/log4j-slf4j-impl-2.1.jar:/usr/local/storm/lib/slf4j-api-1.7.7.jar:/usr/local/storm/lib/minlog-1.3.0.jar:/usr/local/storm/lib/reflectasm-1.10.1.jar:/usr/local/storm/lib/asm-5.0.3.jar:/usr/local/storm/lib/objenesis-2.1.jar:/usr/local/storm/lib/storm-core-1.0.1.jar:/usr/local/storm/lib/log4j-over-slf4j-1.6.6.jar:/usr/local/storm/lib/storm-rename-hack-1.0.1.jar:/usr/local/storm/lib/kryo-3.0.3.jar:/usr/local/storm/lib/disruptor-3.3.2.jar:target/stormkafka-0.0.1-SNAPSHOT.jar:/usr/local/storm/conf:/usr/local/storm/bin -Dstorm.jar=target/stormkafka-0.0.1-SNAPSHOT.jar com.vishnu.storm.Topology vis remote
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: backtype/storm/topology/IRichSpout
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: backtype.storm.topology.IRichSpout
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

screen shot 2016-06-15 at 3 55 22 pm

Also where do I define the 'type', like if I want to use only mongo?
@renderit renderit changed the title Error running mvn Using cassandra Jun 15, 2016
@soniclavier
Copy link
Owner

if you don't need MongoDB and SOLR, you can remove the SolrBolt.java and MongodBolt.java files, also you might have to change few lines in the BoltBuilder since BoltBuilder depends on these classes.

Or you can keep those files as is and remove the binding in the Topology.java file where these bolts are added to the topology.

I haven't tried Cassandra integration but you can check this:
http://storm.apache.org/releases/1.0.0/storm-cassandra.html

@soniclavier
Copy link
Owner

soniclavier commented Jun 18, 2016

Also the code I have was written for Storm 0.10.0 and the current version is 1.0.1.

To make it work for 1.0.1 you have to do the following:

  1. change the versions of all storm dependencies in pom.xml to 1.0.1
  2. change the package import in most of the java files from backtype. to org.apache.storm.

@renderit
Copy link
Author

renderit commented Jun 19, 2016

@soniclavier Excellent! Thanks.. If I want to use hdfs but not mongo I still need to remove mongo and solr files? Parts where I updated pom.xml:

<groupId>org.apache.storm</groupId>
                        <artifactId>storm-core</artifactId>
                        <version>1.0.1</version>
                        <scope>provided</scope>
                        <exclusions>

 <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_2.10</artifactId>
                        <version>0.9.0.1</version>
                        <exclusions>

  <groupId>org.apache.storm</groupId>
                        <artifactId>storm-kafka</artifactId>
                        <version>0.10.0-beta1</version>
   <groupId>org.apache.storm</groupId>
                        <artifactId>storm-hdfs</artifactId>
                        <version>0.10.0</version>

I changed all backtypes to org.apache
My topology looks like this( trying only sinkBolt for now):

  1   package com.vishnu.storm;
  2
  3 import java.util.Properties;
  4
  5 //import org.apache.storm.hdfs.bolt.HdfsBolt;
  6
  7 import com.vishnu.storm.bolt.BoltBuilder;
  8 //import com.vishnu.storm.bolt.MongodbBolt;
  9 import com.vishnu.storm.bolt.SinkTypeBolt;
 10 //import com.vishnu.storm.bolt.SolrBolt;
 11 import com.vishnu.storm.spout.SpoutBuilder;
 12
 13 import org.apache.storm.Config;
 14 import org.apache.storm.StormSubmitter;
 15 import org.apache.storm.topology.TopologyBuilder;
 16 import storm.kafka.KafkaSpout;
 17
 18 public class Topology {
 19
 20         public Properties configs;
 21         public BoltBuilder boltBuilder;
 22         public SpoutBuilder spoutBuilder;
 23         public static final String SOLR_STREAM = "solr-stream";
 24         public static final String HDFS_STREAM = "hdfs-stream";
 25         public static final String MONGODB_STREAM = "mongodb-stream";
 26
 27
 28         public Topology(String configFile) throws Exception {
 29                 configs = new Properties();
 30                 try {
 31                         configs.load(Topology.class.getResourceAsStream("/default_config.properties"));
 32                         boltBuilder = new BoltBuilder(configs);
 33                         spoutBuilder = new SpoutBuilder(configs);
 34                 } catch (Exception ex) {
 35                         ex.printStackTrace();
 36                         System.exit(0);
 37                 }
 38         }
 39
 40         private void submitTopology() throws Exception {
 41                 TopologyBuilder builder = new TopologyBuilder();
 42                 KafkaSpout kafkaSpout = spoutBuilder.buildKafkaSpout();
43                 SinkTypeBolt sinkTypeBolt = boltBuilder.buildSinkTypeBolt();
 44                 //SolrBolt solrBolt = boltBuilder.buildSolrBolt();
 45                 //HdfsBolt hdfsBolt = boltBuilder.buildHdfsBolt();
 46                 //MongodbBolt mongoBolt = boltBuilder.buildMongodbBolt();
 47
 48
 49                 //set the kafkaSpout to topology
 50                 //parallelism-hint for kafkaSpout - defines number of executors/threads to be spawn per contai    ner
 51                 int kafkaSpoutCount = Integer.parseInt(configs.getProperty(Keys.KAFKA_SPOUT_COUNT));
 52                 builder.setSpout(configs.getProperty(Keys.KAFKA_SPOUT_ID), kafkaSpout, kafkaSpoutCount);
 53
 54
 55                 //set the sinktype bolt
 56                 int sinkBoltCount = Integer.parseInt(configs.getProperty(Keys.SINK_BOLT_COUNT));
 57                 builder.setBolt(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),sinkTypeBolt,sinkBoltCount).shuffl    eGrouping(configs.getProperty(Keys.KAFKA_SPOUT_ID));
 58
 59                 //set the solr bolt
 60                 //int solrBoltCount = Integer.parseInt(configs.getProperty(Keys.SOLR_BOLT_COUNT));
 61                 //builder.setBolt(configs.getProperty(Keys.SOLR_BOLT_ID), solrBolt,solrBoltCount).shuffleGroup    ing(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),SOLR_STREAM);
 62
 63                 //set the hdfs bolt
 64                 //int hdfsBoltCount = Integer.parseInt(configs.getProperty(Keys.HDFS_BOLT_COUNT));
 65                 //builder.setBolt(configs.getProperty(Keys.HDFS_BOLT_ID),hdfsBolt,hdfsBoltCount).shuffleGroupi    ng(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),HDFS_STREAM);
 66
 67                 //set the mongodb bolt
 68                 //int mongoBoltCount = Integer.parseInt(configs.getProperty(Keys.MONGO_BOLT_COUNT));
 69                 //builder.setBolt(configs.getProperty(Keys.MONGO_BOLT_ID),mongoBolt,mongoBoltCount).shuffleGro    uping(configs.getProperty(Keys.SINK_TYPE_BOLT_ID),MONGODB_STREAM);
 70
 71
 72                 Config conf = new Config();
 73                 //conf.put("solr.zookeeper.hosts",configs.getProperty(Keys.SOLR_ZOOKEEPER_HOSTS));
 74
 75
 76                 String topologyName = configs.getProperty(Keys.TOPOLOGY_NAME);
 77                 //Defines how many worker processes have to be created for the topology in the cluster.
                conf.setNumWorkers(4);
 79                 StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
 80         }
 81
 82         public static void main(String[] args) throws Exception {
 83                 String configFile;
 84                 if (args.length == 0) {
 85                         System.out.println("Missing input : config file location, using default");
 86                         configFile = "default_config.properties";
 87
 88                 } else{
 89                         configFile = args[0];
 90                 }
 91
 92                 Topology ingestionTopology = new Topology(configFile);
 93                 ingestionTopology.submitTopology();
 94         }
 95
 96 }

Error I am getting:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project stormkafka: Compilation failure: Compilation failure:
[ERROR] /home/ubuntu/Downloads/hadoop_datascience/stormkafka/src/main/java/com/vishnu/storm/spout/SpoutBuilder.java:[30,36] cannot access backtype.storm.spout.MultiScheme
[ERROR] class file for backtype.storm.spout.MultiScheme not found
[ERROR] /home/ubuntu/Downloads/hadoop_datascience/stormkafka/src/main/java/com/vishnu/storm/Topology.java:[52,24] cannot access backtype.storm.topology.base.BaseRichSpout
[ERROR] class file for backtype.storm.topology.base.BaseRichSpout not found

My SpountBuilder:

import org.apache.storm.spout.RawScheme;
import org.apache.storm.spout.SchemeAsMultiScheme;


public class SpoutBuilder {

        public Properties configs = null;

        public SpoutBuilder(Properties configs) {
                this.configs = configs;
        }
        public KafkaSpout buildKafkaSpout() {
                BrokerHosts hosts = new ZkHosts(configs.getProperty(Keys.KAFKA_ZOOKEEPER));
                String topic = configs.getProperty(Keys.KAFKA_TOPIC);
                String zkRoot = configs.getProperty(Keys.KAFKA_ZKROOT);
                String groupId = configs.getProperty(Keys.KAFKA_CONSUMERGROUP);
                SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, groupId);
                spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
                KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
                return kafkaSpout;
        }
}

@soniclavier
Copy link
Owner

pom.xml should have the following.

         <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.1</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hdfs</artifactId>
            <version>1.0.1</version>
        </dependency>
  • yes, remove mongo and solr if you are not using.
  • regarding the error, double check if you have changed backtype. to org.apache.storm (from the error log it looks like you still have some import from backtype packgae)

@renderit
Copy link
Author

renderit commented Jun 19, 2016

@soniclavier So I was having too many issues so I downgraded to 0.10.1; so I won't have to change? So it compiles fine and creates the jar,(I think it works now) but sinkbolt should be printing the kafka output in the terminal correct? (or maybe not, since storm says - 'topology submitted')
Anyway, one last thing I wanted to ask is, since Iam passing json from kafka, where should I parse it? In Spout.. I saw this, is this passing messages as is?

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

p.s. why in the ui, the spout is showing up as 'kafka-spout' instead of 'SpoutBuilder'. I don't see kafka-spout

@sudcha
Copy link

sudcha commented Oct 14, 2016

hi,
when i tried to execute below code on spark 2.0.0 i get below error. Can please help me out with the same.? is it issues with datatype from csv/data frame. ?

I executed same code you have written.


Original Code -

val predictionAndLabels = result.map { row =>
(row.get(0).asInstanceOf[Double],row.get(1).asInstanceOf[Double])
}
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("Area under ROC = " + metrics.areaUnderROC())

Modified Code -

val predictionAndLabels = model.transform(test).rdd.map { row =>
(row.get(0).asInstanceOf[Double], row.get(1).asInstanceOf[Double])
}

val metrics = new BinaryClassificationMetrics(predictionAndLabels)
println("Area under ROC = " + metrics.areaUnderROC())

I get error when i do metrics.areaUnderROC()

16/10/14 19:19:09 ERROR TaskSetManager: Task 0 in stage 130.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 130.0 failed 1 times, most recent failure: Lost task 0.0 in stage 130.0 (TID 132, localhost): java.lang.ClassCastException

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4$lzycompute(BinaryClassificationMetrics.scala:190)
at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4(BinaryClassificationMetrics.scala:144)
at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:146)
at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:146)
at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:221)
at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:85)
at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:96)
... 54 elided
Caused by: java.lang.ClassCastException

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants