Usage
In order to use the Flume2Storm connector, it is assumed that there are working Flume, Storm and ZooKeeper clusters. Please refer to their respective documentation for further assistance about these clusters.
First, decide which implementation of the location service and connection framework to use. By default, the dynamic-location-service
and kryonet-flume2storm
will be used. The implementations chosen needs to be the same on the Flume side and on the Storm side.
In order to install the storm-sink
, copy the appropriate jar files in the Flume plug-in directory on to the flume node: the location service implementation, the connection framework implementation and the storm-sink
jar.
For instance, to use the default implementations of the Flume2Storm version 2.0.0, copy the following 3 jars:
storm-sink-2.0.0.jar
dynamic-location-service-2.0.0.jar
kryonet-flume2storm-2.0.0.jar
The default Flume plug-in directory is $FLUME_HOME/plugins.d
. Create the appropriate directory structure by doing the following (preferably as the Flume user):
$ # Make sure $FLUME_HOME is defined properly
$ mkdir -p $FLUME_HOME/plugins.d/flume2storm/lib
$ # Copy the jar files
$ ls -R $FLUME_HOME/plugins.d/
/usr/lib/flume-ng/plugins.d/:
flume2storm
/usr/lib/flume-ng/plugins.d/flume2storm:
lib
/usr/lib/flume-ng/plugins.d/flume2storm/lib:
dynamic-location-service-2.0.0.jar kryonet-flume2storm-2.0.0.jar storm-sink-2.0.0.jar
Note: To use the static-location-service
, replace the dynamic-location-service-2.0.0.jar
with the static-location-service-2.0.0.jar
.
Configure Flume to use the storm-sink
. This configuration depends on the implementation that is to be used. The common, basic configuration is the following:
Assuming that agent
is the name of the Flume agent and stormSink
is the name chosen for the sink, all the following parameters must be prefixed with agent.sinks.stormSink.
for both the sink configuration and its implementations.
Parameter Name | Default Value | Description |
---|---|---|
type |
com.comcast.viper.flume2storm.sink.StormSink |
The component type name |
batch-size |
100 |
The maximum number of events in a batch |
location.service.factory |
com.comcast.viper.flume2storm.location.DynamicLocationServiceFactory |
The class name of the location service factory |
service.provider.serialization |
com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization |
The class name for the service provider serialization class |
event.sender.factory |
com.comcast.viper.flume2storm.connection.sender.KryoNetEventSenderFactory |
The class name for the event receptor factory |
connection.parameters.factory |
com.comcast.viper.flume2storm.connection.parameters.KryoNetConnectionParametersFactory |
The class name of the connection parameter factory |
The rest of the attributes depends on the implementation to be used.
The following parameters must be prefixed with dynamic.location.service
:
Parameter Name | Default Value | Description |
---|---|---|
base.path |
/services |
The ZooKeeper path in which to store the service registration |
service.name |
flume2storm |
The name of the service that is used to group all the ServiceProvider instances in ZooKeeper |
connection.string |
localhost:2181 |
The ZooKeeper connection string |
session.timeout |
30000 |
The Zookeeper session timeout (in milliseconds). Note: this is merely a suggestion - the actual session timeout is negotiated between the ZK client and the ZK server |
connection.timeout |
10000 |
The Zookeeper connection timeout (in milliseconds). This is the maximum time allowed for the ZooKeeper client to fully establish the connection with the ZooKeeper server. |
reconnection.delay.in.ms |
10000 |
The reconnection delay in milliseconds. Indicates the time to wait before trying to reconnect to the Zookeeper server when a session has expired |
The following parameters must be prefixed with static.location.service.
:
Parameter Name | Default Value | Description |
---|---|---|
service.providers |
N/A | List of service provider identifiers, space-separated |
service.providers.base |
service.providers |
The base name (i.e. parameter prefix) for the service provider attributes |
configuration.loader.class |
N/A | The class name of the ServiceProviderConfigurationLoader, which loads a service provider from an Apache Configuration |
There are two sets of parameters that should be configured to use the kryonet-flume2storm
connection implementation:
- The first set of parameters relates to the KryoNet connection parameters, which enable the event receptor to connect to the event sender (i.e. also the
storm-sink
). The following parameters must be prefixed withkryonet.connection.parameters.
:
Parameter Name | Default Value | Description |
---|---|---|
address |
Host IP address or localhost
|
Address (or host name) to use for the KryoNet event sender. The default behavior is to search for the host IP address using InetAddress.getLocalHost() . If this fails, localhost is used as the default. |
port |
7000 |
Port to use for the KryoNet event sender |
object.buffer.size |
3024 |
KryoNet's object buffer size in bytes, which must be large enough to serialize any object. See the KyroNet documentation for more details. |
write.buffer.size |
3024000 |
KryoNet's write buffer size in bytes. See the KyroNet documentation for more details. |
- The second set of parameters allows configuration of the event sender and is independent from its clients. The following parameters must be prefixed with
kryonet.
:
Parameter Name | Default Value | Description |
---|---|---|
connection.timeout.in.ms |
10000 |
The timeout for a connection attempt, in milliseconds |
retry.sleep.delay.in.ms |
50 |
The time, in milliseconds, to wait before the server (i.e. EventSender) tries to resend an event to the same client (i.e. EventReceptor) |
reconnection.delay.in.ms |
2000 |
The time, in milliseconds, to wait before the client (i.e. EventReceptor) tries to reconnect to the server (i.e. EventSender) |
max.retries |
3 |
The maximum number of times to attempt to send an event before considering it as failed |
# Basic storm-sink configuration
agent.sinks.stormSink.type=com.comcast.viper.flume2storm.sink.StormSink
agent.sinks.stormSink.location.service.factory=com.comcast.viper.flume2storm.location.DynamicLocationServiceFactory
agent.sinks.stormSink.service.provider.serialization=com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization
agent.sinks.stormSink.event.sender.factory=com.comcast.viper.flume2storm.connection.sender.KryoNetEventSenderFactory
agent.sinks.stormSink.connection.parameters.factory=com.comcast.viper.flume2storm.connection.parameters.KryoNetConnectionParametersFactory
# Connection parameters configuration
agent.sinks.stormSink.kryonet.connection.parameters.port=63547
# Generic KryoNet parameters
agent.sinks.stormSink.kryonet.connection.timeout.in.ms=500
agent.sinks.stormSink.kryonet.reconnection.delay.in.ms=1000
# Location service configuration
agent.sinks.stormSink.dynamic.location.service.connection.string=127.0.0.1:2000
agent.sinks.stormSink.dynamic.location.service.service.name=ut
agent.sinks.stormSink.dynamic.location.service.reconnection.delay.in.ms=1000
agent.sinks.stormSink.dynamic.location.service.connection.timeout=500
agent.sinks.stormSink.dynamic.location.service.session.timeout=2000
agent.sinks.stormSink.dynamic.location.service.base.path=/unitTest
Similar to the storm-sink
, the flume-spout
does not include implementations of the location service and connection framework. However, the same default implementations apply for the dynamic-location-service
and kryonet-flume2storm
implementations.
For example, if use of the default implementations for Flume2Storm version 2.0.0 is selected, consider using the following 3 jars:
flume-spout-2.0.0.jar
dynamic-location-service-2.0.0.jar
kryonet-flume2storm-2.0.0.jar
Either include the jars in the topology, or copy the jars into Storm's library directory. To perform the former using maven, add the following dependencies:
<dependency>
<groupId>com.comcast.viper.flume2storm</groupId>
<artifactId>flume-spout</artifactId>
<version>${flume2storm.version}</version>
</dependency>
<dependency>
<groupId>com.comcast.viper.flume2storm</groupId>
<artifactId>kryonet-flume2storm</artifactId>
<version>${flume2storm.version}</version>
</dependency>
<dependency>
<groupId>com.comcast.viper.flume2storm</groupId>
<artifactId>dynamic-location-service</artifactId>
<version>${flume2storm.version}</version>
</dependency>
Like the storm-sink
, the configuration also depends on the implementations that are selected to be used. However, the basic configuration is:
Parameter Name | Default Value | Description |
---|---|---|
location.service.factory |
com.comcast.viper.flume2storm.location.DynamicLocationServiceFactory |
The class name of the location service factory |
service.provider.serialization |
com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization |
The class name of the service provider serialization |
event.receptor.factory |
com.comcast.viper.flume2storm.connection.receptor.KryoNetEventReceptorFactory |
The class name of the event receptor factory |
Since the implementations of the location service and the connection framework are the same between Flume and Storm topologies, the configuration is exactly the same. Note, however, that the Flume prefix (agent.sinks.stormSink.
) must NOT be specified.
# Basic flume-spout configuration
location.service.factory=com.comcast.viper.flume2storm.location.StaticLocationServiceFactory
service.provider.serialization=com.comcast.viper.flume2storm.location.KryoNetServiceProviderSerialization
event.receptor.factory=com.comcast.viper.flume2storm.connection.receptor.KryoNetEventReceptorFactory
# Generic KryoNet parameters
kryonet.connection.timeout.in.ms=500
kryonet.reconnection.delay.in.ms=1000
# Location service configuration
static.location.service.configuration.loader.class=com.comcast.viper.flume2storm.IntegrationTest$KryoNetServiceProvidersLoader
static.location.service.service.providers=sp1Id sp2Id
static.location.service.service.providers.sp1Id.address=localhost
static.location.service.service.providers.sp1Id.port=49182
static.location.service.service.providers.sp2Id.address=localhost
static.location.service.service.providers.sp2Id.port=49183
Note that the use of the KryoNetServiceProvidersLoader
will not work in outside of the topologies since the integration-test and test classes are not accessible outside of the module - this only serves as an example of how to use it.
The storm-sink
maintains standard sink information via a org.apache.flume.instrumentation.SinkCounter
, which is exposed by Flume by default (see Flume user guide's section on monitoring). More specifically, it maintains an accurate count of the number of batches processed (complete, empty, and under-flow), as well as the number of events drained (attempted and successful).
The flume-spout
does not expose any stats information directly. However, relevant information can be gathered about some of the components it contains.
The event receptor and event sender interfaces define methods to access the stats object regarding their respective components, in a JMX fashion: with an MBean interface and a object implementation of the same name.
For instance, com.comcast.viper.flume2storm.connection.receptor.EventReceptorStatsMBean
defines the methods available to gather information about the event receptor instance, and com.comcast.viper.flume2storm.connection.receptor.EventReceptorStats
provides a thread-safe implementation of this interface.