Skip to content

Component Implementation

Julien Cruz edited this page Jan 19, 2015 · 9 revisions

The StreamFlow delegation architecture makes integrating your existing Spouts and Bolts easy. The delegation architecture allows you to develop your Spouts and Bolts using the same native Storm libraries you already use making the transition of your current Storm components over to StreamFlow a relatively simple process. Although StreamFlow does not require you to import a StreamFlow specific API, there are a few simple libraries that can enhance your experience working with StreamFlow.

Guice Support

The first optional dependency is Google Guice, a dependency injection framework, which StreamFlow uses internally for injection of component properties. As you will see in the examples, the Guice @Inject annotation allow you to inject the user provided component properties into your Spouts and Bolts.

To include support for Guice in your framework project, add the following dependency to your pom.xml.

<dependency>
    <groupId>com.google.inject</groupId>
    <artifactId>guice</artifactId>
    <version>3.0</version>
    <scope>provided</scope>
</dependency>

Once the above dependency is added to the pom.xml, the @Inject annotations in your component implementation will be available.

Note: The Guice dependency injection is optional as all StreamFlow properties are also added to the Map config object passed to the prepare() and initialize() methods

SLF4J Support

The second optional dependency is the SLF4J logging framework which StreamFlow utilizes for capturing log output data. To alleviate the need to configure and manage your log files manually, StreamFlow can inject a preconfigured org.slf4j.Logger object for your convenience. The Logger is configured to output the log data using the settings specified in the Logger section of the streamflow.yml application configuration file. Using this configuration, StreamFlow will output the log data from your components to the directory specified by the logger.baseDir property using the pattern specified by the logger.formatPattern property. In the sample component implementation provided in the following sections, you will see sample code to inject the Logger and capture some text using the logging instance.

Important: While optional, the primary benefit of using the StreamFlow provided logger is that all log data collected by the logger is visible in the StreamFlow UI by selecting your topology in the topology builder and clicking the "View Log" button.

To include support for SLF4J in your framework project, add the following dependencies to your pom.xml.

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.7</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

Sample Component Implementation

This section will walk through a sample component implementation using a mock framework.yml and the associated component source code as reference. The example below is taken from the Twitter Framework included in the StreamFlow project. Some methods have been removed for the sake of brevity, however, the full source code is available if you would like to examine things in greater detail.

Sample framework.yml

  name: twitter-framework
  label: Twitter Framework
  version: ${project.version}
  description: Spouts and Bolts supporting Twitter functionality

  components: 
    - name: twitter-sample-spout
      label: Twitter Sample Spout
      type: storm-spout
      description: Utilizes Twitter Streaming API to stream of 1% Twitter data for analysis. Twitter OAuth credentials for you application are required for use.
      mainClass: streamflow.spout.twitter.TwitterSampleSpout
      icon: icons/twitter.png
      properties: 
        - name: oauth-consumer-key
          label: OAuth Consumer Key
          type: text
          description: Twitter OAuth Consumer Key
          defaultValue: 
          required: true
        - name: oauth-consumer-secret
          label: OAuth Consumer Secret
          type: text
          description: Twitter OAuth Consumer Secret
          defaultValue: 
          required: true
        - name: oauth-access-token
          label: OAuth Access Token
          type: text  
          description: Twitter OAuth Access Token
          defaultValue: 
          required: true
        - name: oauth-access-token-secret
          label: OAuth Access Token Secret
          type: text
          description: Twitter OAuth Access Token Secret
          defaultValue: 
          required: true
      outputs: 
        - key: default
          description: Twitter Status
          
serializations: 
  - typeClass: twitter4j.Status

Sample Storm Spout implementation

1.  public class TwitterSampleSpout extends BaseRichSpout {
2.
3.     private SpoutOutputCollector collector;
4.
5.     private Logger logger;
6.
7.     private String consumerKey;
8.     private String consumerSecret;
9.     private String accessToken;
10.    private String accessTokenSecret;
11.
12.    private final LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Status>(100000);
13.    private TwitterStream twitterStream;
14.
15.
16.    @Inject
17.    public void setConsumerKey(@Named("oauth-consumer-key") String consumerKey) {
18.       this.consumerKey = consumerKey;
19.    }
20.
21.    @Inject
22.    public void setConsumerSecret(@Named("oauth-consumer-secret") String consumerSecret) {
23.       this.consumerSecret = consumerSecret;
24.    }
25.
26.    @Inject
27.    public void setAccessToken(@Named("oauth-access-token") String accessToken) {
28.       this.accessToken = accessToken;
29.    }
30.
31.    @Inject
32.    public void setAccessTokenSecret(@Named("oauth-access-token-secret") String accessTokenSecret) {
33.       this.accessTokenSecret = accessTokenSecret;
34.    }
35.
36.    @Inject
37.    public void setLogger(Logger logger){
38.       this.logger = logger;
39.    }
40.
41.    @Override
42.    public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
43.       this.collector = collector;
44.
45.       logger.info("Twitter Sampler Started: Consumer Key = " + consumerKey
46.          + ", Consumer Secret = " + consumerSecret + ", Access Token = " + accessToken
47.          + ", Access Token Secret = " + accessTokenSecret);
48.
49.       // Build the twitter config to authenticate the requests
50.       ConfigurationBuilder twitterConfig = new ConfigurationBuilder()
51.          .setOAuthConsumerKey(consumerKey)
52.          .setOAuthConsumerSecret(consumerSecret)
53.          .setOAuthAccessToken(accessToken)
54.          .setOAuthAccessTokenSecret(accessTokenSecret)
55.
56.       // Status listener which handle the status events and add them to the queue
57.       StatusListener listener = new StatusListener() {
58.          @Override
59.          public void onStatus(Status status) {
60.             queue.offer(status);
61.          }
62.
63.             // ADDITIONAL METHODS OMITTED FOR SIMPLICITY
64.       };
65.
66.       TwitterStreamFactory twitterFactory = new TwitterStreamFactory(twitterConfig.build());
67.       twitterStream = twitterFactory.getInstance();
68.       twitterStream.addListener(listener);
69.       twitterStream.sample();
70.    }
71.
72.    @Override
73.    public void nextTuple() {
74.       Status status = queue.poll();
75.
76.       if (status == null) {
77.          Utils.sleep(50);
78.       } else {
79.          // Emit the twitter status as a JSON String
80.          collector.emit(new Values(status));
81.       }
82.    }
83.
84.    @Override
85.    public void close() {
86.       twitterStream.shutdown();
87.    }
88.
89.    @Override
90.    public void declareOutputFields(OutputFieldsDeclarer declarer) {
91.       declarer.declare(new Fields("tweet"));
92.    }
93. }

The purpose of the Twitter Sample Spout is to consume a steady stream of Twitter data using the Twitter streaming API. The code takes advantage of the Twitter4J open source library which simplifies the communication with Twitter's web services. In order to use the Twitter Sample Spout or any of the Twitter services in general, it requires registration on the Twitter Application Management website which provides the necessary OAuth credentials.

Component Property Access

In the above example, the framework.yml configuration defines four properties for the Twitter Sample Spout which are necessary for authentication with Twitter: oauth-consumer-key, oauth-consumer-secret, oauth-access-token, oauth-access-token-secret. The Twitter Sample Spout exposes these properties to allow the Spout to be generic and allow any user to provide their own credentials in their topology. Typically properties are added to allow users to make modifications to the behavior of the Spout or Bolt without needing to change internal constants or recompile the code.

The framework.yml example references a mainClass of streamflow.spout.twitter.TwitterSampleSpout for the Twitter Sample Spout. This means that the Spout implementation for the Twitter Sample Spout must be located in the streamflow.spout.twitter package and have a class name of streamflow.spout.twitter.TwitterSampleSpout.

Note: If you experience any Class Not Found exceptions, please check to make sure your mainClass property is valid and points to the right component.

On line 1 of the TwitterSampleSpout you will find that the Spout extends BaseRichSpout. As mentioned before, StreamFlow does not require you to extend any custom StreamFlow classes. Your code can be written and tested using standard Storm classes and when you are ready, you can transition things over to StreamFlow. The custom StreamFlow topology builder and delegate model make this possible as all component logic is dynamically loaded at runtime using a specialized class loader.

Lines 7 through 10 of the TwitterSampleSpout show the instance variables that line up with the four oauth properties specified in the configuration. In order to initialize these variables, you must either @Inject the values using setter/field annotations or using the config map in the open() method on line 42. Due to the runtime checking of the injected values and automatic conversion of String values primitive values, the injection method is preferred.

Lines 16 through 34 of the Twitter Sample Spout implementation shows the use of the @Named annotation to inject the values of the properties. When a topology is deployed using the Twitter Sample Spout, the instance variables will be populated with the values provided in the topology creation. For each property in the component configuration, StreamFlow adds an @Named value which uses the property name as the key. Line 17 shows the injection of the oauth-consumer-key using the @Named annotation method.

If you would prefer to not use injection, the config map provides a nice alternative. Each of the component properties are added to the config map when the component is deployed on the server. The key in the config map matches up with the property name from the framework.yml configuration file for easy access. The example below shows an example of how to use this feature in the open() or prepare() method of your implmentations.

String consumerSecret = config.get("oauth-consumer-secret");

Component Logging

If you enable support for SLF4J logging, it will greatly improve your ability to monitor your topologies while on the go. StreamFlow makes integration with the logging framework simple and painless. To add the StreamFlow configured logger to your component implementation, simply add the following code as shown in lines 36 through 39:

private org.slf4j.Logger logger;

@Inject
public void setLogger(Logger logger){
    this.logger = logger;
}

Once this code is added to your Spout or Bolt implementation, you are free to use the logger to write log data to the StreamFlow monitored logs. Lines 45 through 47 provide an example of how to use the logger inside your code to report on important events. The snippet of code below shows some additional uses of the logger:

logger.debug("This is a debug messageused for diagnostic info");
logger.warn("This is a warning message used to report non-critical errors");
logger.info("This is an info message used to report generic information");
logger.error("This is an error message used to report major errors");

Final Notes

This example demonstrates how values from the dynamically built jobs in the StreamFlow UI can be provided to the Spouts and Bolts during runtime. This feature is critical in allowing topologies to be built dynamically in the StreamFlow UI while allowing component configuration to be dynamically changed without the need for code recompilation. It is important to recognize that if you do not require property injection or logging that your native Spout or Bolt can be utilized in StreamFlow simply by added a framework.yml configuration with an entry for your component in the components section.