Skip to content
This repository has been archived by the owner on Oct 24, 2018. It is now read-only.
Gabriel Commeau edited this page Nov 12, 2014 · 1 revision

Usage and Configuration

In order to use the Flume2Storm connector, it is assumed that there are working Flume, Storm and ZooKeeper clusters. Please refer to their respective documentation for further assistance about these clusters. First, decide which implementation of the location service and connection framework to use. By default, the dynamic-location-service and kryonet-flume2storm will be used. The implementations chosen needs to be the same on the Flume side and on the Storm side.

1. Flume: Installing the storm-sink

1.1. File Installation

In order to install the storm-sink, copy the appropriate jar files in the Flume plug-in directory on to the flume node: the location service implementation, the connection framework implementation and the storm-sink jar. For instance, to use the default implementations of the Flume2Storm version 2.0.0, copy the following 3 jars:

  • storm-sink-2.0.0.jar
  • dynamic-location-service-2.0.0.jar
  • kryonet-flume2storm-2.0.0.jar

The default Flume plug-in directory is $FLUME_HOME/plugins.d. Create the appropriate directory structure by doing the following (preferably as the Flume user):

$ # Make sure $FLUME_HOME is defined properly
$ mkdir -p $FLUME_HOME/plugins.d/flume2storm/lib
$ # Copy the jar files
$ ls -R $FLUME_HOME/plugins.d/
/usr/lib/flume-ng/plugins.d/:
flume2storm

/usr/lib/flume-ng/plugins.d/flume2storm:
lib

/usr/lib/flume-ng/plugins.d/flume2storm/lib:
dynamic-location-service-2.0.0.jar  kryonet-flume2storm-2.0.0.jar  storm-sink-2.0.0.jar

Note: To use the static-location-service, replace the dynamic-location-service-2.0.0.jar with the static-location-service-2.0.0.jar.

1.2. Configuration

1.2.1. Configuring storm-sink

Configure Flume to use the storm-sink. This configuration depends on the implementation that is to be used. The common, basic configuration is the following:

Assuming that agent is the name of the Flume agent and stormSink is the name chosen for the sink, all the following parameters must be prefixed with agent.sinks.stormSink. for both the sink configuration and its implementations.

Parameter Name Default Value Description
type com.comcast.viper.flume2storm.sink.StormSink The component type name
batch-size 100 The maximum number of events in a batch
location.service.factory com.comcast.viper.flume2storm.location.DynamicLocationServiceFactory The class name of the location service factory
service.provider.serialization com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization The class name for the service provider serialization class
event.sender.factory com.comcast.viper.flume2storm.connection.sender.KryoNetEventSenderFactory The class name for the event receptor factory
connection.parameters.factory com.comcast.viper.flume2storm.connection.parameters.KryoNetConnectionParametersFactory The class name of the connection parameter factory

The rest of the attributes depends on the implementation to be used.

1.2.2. Configuring dynamic-location-service

The following parameters must be prefixed with dynamic.location.service:

Parameter Name Default Value Description
base.path /services The ZooKeeper path in which to store the service registration
service.name flume2storm The name of the service that is used to group all the ServiceProvider instances in ZooKeeper
connection.string localhost:2181 The ZooKeeper connection string
session.timeout 30000 The Zookeeper session timeout (in milliseconds). Note: this is merely a suggestion - the actual session timeout is negotiated between the ZK client and the ZK server
connection.timeout 10000 The Zookeeper connection timeout (in milliseconds). This is the maximum time allowed for the ZooKeeper client to fully establish the connection with the ZooKeeper server.
reconnection.delay.in.ms 10000 The reconnection delay in milliseconds. Indicates the time to wait before trying to reconnect to the Zookeeper server when a session has expired

1.2.3. Configuring static-location-service

The following parameters must be prefixed with static.location.service.:

Parameter Name Default Value Description
service.providers N/A List of service provider identifiers, space-separated
service.providers.base service.providers The base name (i.e. parameter prefix) for the service provider attributes
configuration.loader.class N/A The class name of the ServiceProviderConfigurationLoader, which loads a service provider from an Apache Configuration

1.2.4. Configuring kryonet-flume2storm

There are two sets of parameters that should be configured to use the kryonet-flume2storm connection implementation:

  • The first set of parameters relates to the KryoNet connection parameters, which enable the event receptor to connect to the event sender (i.e. also the storm-sink). The following parameters must be prefixed with kryonet.connection.parameters.:
Parameter Name Default Value Description
address Host IP address or localhost Address (or host name) to use for the KryoNet event sender. The default behavior is to search for the host IP address using InetAddress.getLocalHost(). If this fails, localhost is used as the default.
port 7000 Port to use for the KryoNet event sender
object.buffer.size 3024 KryoNet's object buffer size in bytes, which must be large enough to serialize any object. See the KyroNet documentation for more details.
write.buffer.size 3024000 KryoNet's write buffer size in bytes. See the KyroNet documentation for more details.
  • The second set of parameters allows configuration of the event sender and is independent from its clients. The following parameters must be prefixed with kryonet.:
Parameter Name Default Value Description
connection.timeout.in.ms 10000 The timeout for a connection attempt, in milliseconds
retry.sleep.delay.in.ms 50 The time, in milliseconds, to wait before the server (i.e. EventSender) tries to resend an event to the same client (i.e. EventReceptor)
reconnection.delay.in.ms 2000 The time, in milliseconds, to wait before the client (i.e. EventReceptor) tries to reconnect to the server (i.e. EventSender)
max.retries 3 The maximum number of times to attempt to send an event before considering it as failed

1.2.5. Example of Using dynamic-location-service and kryonet-flume2storm

# Basic storm-sink configuration
agent.sinks.stormSink.type=com.comcast.viper.flume2storm.sink.StormSink
agent.sinks.stormSink.location.service.factory=com.comcast.viper.flume2storm.location.DynamicLocationServiceFactory
agent.sinks.stormSink.service.provider.serialization=com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization
agent.sinks.stormSink.event.sender.factory=com.comcast.viper.flume2storm.connection.sender.KryoNetEventSenderFactory
agent.sinks.stormSink.connection.parameters.factory=com.comcast.viper.flume2storm.connection.parameters.KryoNetConnectionParametersFactory

# Connection parameters configuration
agent.sinks.stormSink.kryonet.connection.parameters.port=63547

# Generic KryoNet parameters
agent.sinks.stormSink.kryonet.connection.timeout.in.ms=500
agent.sinks.stormSink.kryonet.reconnection.delay.in.ms=1000

# Location service configuration
agent.sinks.stormSink.dynamic.location.service.connection.string=127.0.0.1:2000
agent.sinks.stormSink.dynamic.location.service.service.name=ut
agent.sinks.stormSink.dynamic.location.service.reconnection.delay.in.ms=1000
agent.sinks.stormSink.dynamic.location.service.connection.timeout=500
agent.sinks.stormSink.dynamic.location.service.session.timeout=2000
agent.sinks.stormSink.dynamic.location.service.base.path=/unitTest

2. Storm: Using the flume-spout

2.1. File Installation

Similar to the storm-sink, the flume-spout does not include implementations of the location service and connection framework. However, the same default implementations apply for the dynamic-location-service and kryonet-flume2storm implementations.
For example, if use of the default implementations for Flume2Storm version 2.0.0 is selected, consider using the following 3 jars:

  • flume-spout-2.0.0.jar
  • dynamic-location-service-2.0.0.jar
  • kryonet-flume2storm-2.0.0.jar

Either include the jars in the topology, or copy the jars into Storm's library directory. To perform the former using maven, add the following dependencies:

<dependency>
        <groupId>com.comcast.viper.flume2storm</groupId>
        <artifactId>flume-spout</artifactId>
        <version>${flume2storm.version}</version>
</dependency>
<dependency>
        <groupId>com.comcast.viper.flume2storm</groupId>
        <artifactId>kryonet-flume2storm</artifactId>
        <version>${flume2storm.version}</version>
</dependency>
<dependency>
        <groupId>com.comcast.viper.flume2storm</groupId>
        <artifactId>dynamic-location-service</artifactId>
        <version>${flume2storm.version}</version>
</dependency>

2.2. Configuration

2.2.1. Configuring flume-spout

Like the storm-sink, the configuration also depends on the implementations that are selected to be used. However, the basic configuration is:

Parameter Name Default Value Description
location.service.factory com.comcast.viper.flume2storm.location.DynamicLocationServiceFactory The class name of the location service factory
service.provider.serialization com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization The class name of the service provider serialization
event.receptor.factory com.comcast.viper.flume2storm.connection.receptor.KryoNetEventReceptorFactory The class name of the event receptor factory

2.2.2. Configuring the Implementations used by the Topology

Since the implementations of the location service and the connection framework are the same between Flume and Storm topologies, the configuration is exactly the same. Note, however, that the Flume prefix (agent.sinks.stormSink.) must NOT be specified.

2.2.3. Example of Using static-location-service and kryonet-flume2storm

# Basic flume-spout configuration
location.service.factory=com.comcast.viper.flume2storm.location.StaticLocationServiceFactory
service.provider.serialization=com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization
event.receptor.factory=com.comcast.viper.flume2storm.connection.receptor.KryoNetEventReceptorFactory

# Generic KryoNet parameters
kryonet.connection.timeout.in.ms=500
kryonet.reconnection.delay.in.ms=1000

# Location service configuration
static.location.service.configuration.loader.class=com.comcast.viper.flume2storm.IntegrationTest$KryoNetServiceProvidersLoader
static.location.service.service.providers=sp1Id sp2Id
static.location.service.service.providers.sp1Id.address=localhost
static.location.service.service.providers.sp1Id.port=49182
static.location.service.service.providers.sp2Id.address=localhost
static.location.service.service.providers.sp2Id.port=49183

Note that the use of the KryoNetServiceProvidersLoader will not work in outside of the topologies since the integration-test and test classes are not accessible outside of the module - this only serves as an example of how to use it.

Monitoring

Storm-sink

The storm-sink maintains standard sink information via a org.apache.flume.instrumentation.SinkCounter, which is exposed by Flume by default (see Flume user guide's section on monitoring). More specifically, it maintains an accurate count of the number of batches processed (complete, empty, and under-flow), as well as the number of events drained (attempted and successful).

Flume spout

The flume-spout does not expose any stats information directly. However, relevant information can be gathered about some of the components it contains.

Other components

The event receptor and event sender interfaces define methods to access the stats object regarding their respective components, in a JMX fashion: with an MBean interface and a object implementation of the same name. For instance, com.comcast.viper.flume2storm.connection.receptor.EventReceptorStatsMBean defines the methods available to gather information about the event receptor instance, and com.comcast.viper.flume2storm.connection.receptor.EventReceptorStats provides a thread-safe implementation of this interface.