Skip to content
This repository has been archived by the owner on Jan 8, 2021. It is now read-only.
akaIDIOT edited this page Sep 24, 2014 · 12 revisions

Like any spout in a storm toplogy, a kafka spout is created to emit tuples into the topology to be processed further by bolts. As kafka cares little about the content of messages, a kafka spout will emit messages read from a topic as tuples with a single field named bytes, containing a byte array as it was read from the topic. In its current implementation, it operates by reading batches of messages from kafka and emitting these into the storm topology. Only once a batch has been processed completely, the consumed offsets are committed to zookeeper (this is a limitation of the high level consumer API for kafka, this is expected to change with kafka 0.9).

Maven

The project uses maven and is published to maven central. To use it in your local project, include the following dependency:

<dependency>
  <groupId>nl.minvenj.nfi.storm</groupId>
  <artifactId>kafka-spout</artifactId>
  <version>0.2</version>
</dependency>

Use the latest development state by cloning the repository and running mvn install. Note the version in the checkout to depend on the version just built.

Example Usage

In its simplest form, the kafka spout can be used as follows:

TopologyBuilder topology = new TopologyBuilder();

builder.setSpout("kafka", new KafkaSpout());
builder.setBolt("bolt", new MyBolt()).shuffleGrouping("kafka");

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("my-topology", topology.build());

Configuration

Configuration of the spout and the underlying kafka consumer is done through storm itself. The spout uses the following configuration parameters:

  • kafka.spout.topic: the kafka topic to read messages from (default: storm);
  • kafka.spout.consumer.group: the kafka consumer group to register with zookeeper (default: kafka_spout);
  • kafka.spout.buffer.size.max: the maximum number of messages to buffer within a KafkaSpout instance (default: 1024);
  • kafka.spout.fail.handler: the 'failure policy' to use when tuples are failed by a bolt, see Failure Policy (default: reliable).

These configurations can be supplied to storm through topology configuration as follows:

TopologyBuilder topology = new TopologyBuilder();

builder.setSpout("kafka", new KafkaSpout());
// (...) define a topology

// create a storm Config object
Config config = new Config();
// configure kafka spout (values are available as constants on ConfigUtils)
config.put("kafka.spout.topic", "a-topic");
// kafka consumer configuration, see below
config.put("kafka.zookeeper.connect", "zookeeper.example.net:2181");
config.put("kafka.consumer.timeout.ms", 100);

LocalCluster cluster = new LocalCluster();
// submit topology with configuration
cluster.submitTopology("my-topology", config, topology.build());

Beside the configuration options specific to the spout, the configuration for the underlying kafka consumer are read from configuration provided by storm as well. These options can be specified in one of two ways:

  • Configuration file as classpath resource: value for configuration key kakfa.config is read as a classpath resource, loaded as a java Properties object and passed to a kafka consumer (note that the resource is loaded when the spout is opened, make sure the resource is available on the remote host);
  • Configuration parameters prefixed from storm config: keys prefixed with kafka. are read from storm configuration and passed to a kafka consumer (this includes kafka.spout keys, which kafka ignores).

Special actions are taken for some kafka consumer configuration parameters: zookeeper.connect, consumer.timeout.ms and auto.commit.enable. If zookeeper.connect is not provided by one of the above methods, storm's zookeeper configuration (Config.STORM_ZOOKEEPER_SERVERS and Config.STORM_ZOOKEEPER_PORT) is converted to a zookeeper.connect compatible string. consumer.timeout.ms is required to be present and greater than zero, as a kafka spout will otherwise block indefinately when waiting for new messages which hurts storm's way of calling nextTuple on spouts. Setting auto.commit.enable to true causes problems with reliability in the case a spout is stopped mid-batch or crashes. The spout will disable this if it's not specified and refuse the configuration if it's enabled.

See kafka's consumer documentation for all available configuration parameters to kafka consumers.

Failure Policy

The project allows you to specify a failure policy, which controls the whether a failed tuple should be replayed. Two implementations are provided by default: ReliableFailHandler and UnreliableFailHandler. The reliable failure policy will replay a tuple when it fails in the topology, the unreliable failure policy will ignore failed tuples.

The policy to be used by a spout is configured with the kafka.spout.fail.handler configuration option. The two default implementations are available by supplying either reliable or unreliable for the configuration, but the key allows your to specify a fully qualified class name, which is loaded and instantiated when the spout is opened. A failure policy configured like this should implement nl.minvenj.nfi.storm.kafka.fail.FailHandler.

The current implementation of KafkaSpout will still require all tuples to be either acknowledged or failed before reading the next batch of messages from kafka. This limitation hurts performance for applications that don't care about failed tuples, so expect this to change in the future.

Clone this wiki locally