Skip to content
This repository has been archived by the owner on Oct 24, 2018. It is now read-only.

Commit

Permalink
Major code clean up and better testing
Browse files Browse the repository at this point in the history
Using Storm 0.9.2-incubating
Adding license/notice in missing places
Removing timestamp from F2SEvent
Created kryo-utils module
Created integration-test module and example topology
Unit tests only use test-impl
Better configuration objects and error messages
Using logback instead of log4j
  • Loading branch information
gcommeau committed Oct 21, 2014
1 parent 6e2d3fb commit 2a5d6ca
Show file tree
Hide file tree
Showing 157 changed files with 7,076 additions and 4,139 deletions.
12 changes: 11 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
/target
*~
.metadata/
.classpath
.project
.settings
.eclipse
.recommenders
bin
target
RemoteSystemsTempFiles
.DS_Store
4 changes: 4 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Flume2Storm connector
Copyright 2012-2014 Comcast Cable Communications Management, LLC

This product includes software developed at Comcast (http://www.comcast.com/).
18 changes: 8 additions & 10 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,27 @@
<description>Defines the core API for the Flume2Storm connector</description>

<properties>
<license.header.location>../license.header</license.header.location>
<commons-configuration.version>1.6</commons-configuration.version>
<commons-lang.version>3.3.1</commons-lang.version>
<f2s.basedir>${project.parent.basedir}</f2s.basedir>
<commons-codec.version>1.8</commons-codec.version>
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>${commons-configuration.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,58 @@
*/
package com.comcast.viper.flume2storm;

import org.apache.commons.configuration.Configuration;

/**
* Exception when the configuration of a Flume2Storm component is invalid
*/
@SuppressWarnings("javadoc")
public class F2SConfigurationException extends Exception {
private static final long serialVersionUID = 4941423443352427505L;

// TODO This kind of Strings should be externalized
public static enum Reason {
NOT_POSITIVE("It must be positive"),
NOT_STRICLY_POSITIVE("It must be strictly positive");

private final String message;

private Reason(final String message) {
this.message = message;
}

public String getMessage() {
return message;
}
/**
* @param config
* The configuration that contains the invalid parameter
* @param parameter
* The name of the configuration parameter that is invalid
* @param throwable
* The exception that occurred when setting the parameter's value
* @return The newly built F2SConfigurationException related to a specific
* invalid parameter
*/
public static F2SConfigurationException with(Configuration config, String parameter, Throwable throwable) {
return F2SConfigurationException.with(parameter, config.getProperty(parameter), throwable);
}

/**
* @param parameter
* The name of the configuration parameter that is invalid
* @param value
* The configured value of the parameter
* @param throwable
* The exception that occurred when setting the parameter's value
* @return The newly built F2SConfigurationException related to a specific
* invalid parameter
*/
public static F2SConfigurationException with(String parameter, Object value, Throwable throwable) {
return new F2SConfigurationException(buildErrorMessage(parameter, value, throwable.getMessage()), throwable);
return new F2SConfigurationException(buildErrorMessage(parameter, value, throwable), throwable);
}

public static F2SConfigurationException with(String parameter, Object value, Reason reason) {
return new F2SConfigurationException(buildErrorMessage(parameter, value, reason.getMessage()));
protected static final String buildErrorMessage(String parameter, Object value, Throwable throwable) {
return new StringBuilder("Configuration attribute \"").append(parameter).append("\" has invalid value (\"")
.append(value).append("\"). ").append(throwable.getClass().getSimpleName()).append(": ")
.append(throwable.getLocalizedMessage()).toString();
}

protected static final String buildErrorMessage(String parameter, Object value, String message) {
return new StringBuilder("Configuration attribute '").append(parameter).append("' has invalid value (")
.append(value.toString()).append("): ").append(message).toString();
/**
* @param parameter
* The name of the configuration parameter that is missing
* @return The newly built F2SConfigurationException related to a specific
* missing parameter
*/
public static F2SConfigurationException missing(String parameter) {
return new F2SConfigurationException(new StringBuilder("Configuration attribute \"").append(parameter)
.append("\" is required but not specified").toString());
}

public F2SConfigurationException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@
*/
package com.comcast.viper.flume2storm.connection.parameters;

import com.comcast.viper.flume2storm.connection.receptor.EventReceptor;
import com.comcast.viper.flume2storm.connection.sender.EventSender;
import com.comcast.viper.flume2storm.location.ServiceProvider;

/**
* Interface of the configuration for a Flume2Storm connection (from an Event
* Receptor to an Event Sender).
* Interface of the configuration for a Flume2Storm connection (from an
* {@link EventReceptor} to an {@link EventSender}).
*/
public interface ConnectionParameters {
/**
* @return A unique identifier of the associated {@link EventSender} /
* {@link ServiceProvider} (hostname:port for instance)
*
*/
String getId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import com.comcast.viper.flume2storm.F2SConfigurationException;

/**
* Interface to build a connection parameter. It follows the abstract factory
* design pattern.
* Interface to build a {@link ConnectionParameters}. It follows the abstract
* factory design pattern. Implementation of this factory must have a
* no-argument constructor.
*
* @param <CP>
* The Connection Parameters class
* The {@link ConnectionParameters} class
*/
public interface ConnectionParametersFactory<CP extends ConnectionParameters> {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@

/**
* The client-side of the Flume2Storm connector, which receives the Flume2Storm
* events from the Event Sender.
* events from the {@link EventSender}.
*
* @param <CP>
* The Connection Parameters class
* The {@link ConnectionParameters} class
*/
public interface EventReceptor<CP extends ConnectionParameters> {
/**
Expand All @@ -47,10 +47,9 @@ public interface EventReceptor<CP extends ConnectionParameters> {
boolean stop();

/**
* @return True if the {@link EventReceptor} is connected to the
* {@link EventSender}
* @return Statistics related to this {@link EventReceptor}
*/
boolean isConnected();
EventReceptorStatsMBean getStats();

// TODO use listener instead
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import com.comcast.viper.flume2storm.connection.parameters.ConnectionParameters;

/**
* Interface to build an Event Receptor. It follows the abstract factory design
* pattern.
* Interface to build an {@link EventReceptor}. It follows the abstract factory
* design pattern. Implementation of this factory must have a no-argument
* constructor.
*
* @param <CP>
* The Connection Parameters class
* The {@link ConnectionParameters} class
*/
public interface EventReceptorFactory<CP extends ConnectionParameters> {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.comcast.viper.flume2storm.event.F2SEvent;

/**
* Listener for EventReceptor events.
* Listener for {@link EventReceptor} events.
*/
public interface EventReceptorListener {
/**
Expand Down

0 comments on commit 2a5d6ca

Please sign in to comment.