Skip to content

ssudhakaran/couchbase-kafka-connector

 
 

Repository files navigation

Couchbase Kafka Connector

Welcome to the official Couchbase Kafka connector! It provides functionality to direct a stream of events from Couchbase Server to Kafka.

You can read the quickstart guide below or consult the documentation here: http://developer.couchbase.com/documentation/server/4.1/connectors/kafka-2.0/kafka-intro.html

The issue tracker can be found at https://issues.couchbase.com/browse/KAFKAC

Quickstart

A sample build.gradle:

apply plugin: 'java'

repositories {
    mavenCentral()
    maven { url { "http://files.couchbase.com/maven2" } }
    mavenLocal()
}

dependencies {
    compile(group: 'com.couchbase.client', name: 'kafka-connector', version: '2.0.0')
}

Using the library is pretty easy. Let's say we would like to receive every modification from the Couchbase Server and send to Kafka only the document body (by default the connector serializes the document body and metadata to JSON). To achieve that, you need to define a filter class that allows only instances of MutationMessage to pass through:

package example;

import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.kafka.DCPEvent;
import com.couchbase.kafka.filter.Filter;

public class SampleFilter implements Filter {
    @Override
    public boolean pass(final DCPEvent dcpEvent) {
        return dcpEvent.message() instanceof MutationMessage;
    }
}

And you also need an encoder class, which takes document value converts it to byte array:

package example;

import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.kafka.DCPEvent;
import com.couchbase.kafka.coder.AbstractEncoder;
import kafka.utils.VerifiableProperties;

public class SampleEncoder extends AbstractEncoder {
    public SampleEncoder(final VerifiableProperties properties) {
        super(properties);
    }

    @Override
    public byte[] toBytes(final DCPEvent dcpEvent) {
        MutationMessage message = (MutationMessage)dcpEvent.message();
        return message.content().toString(CharsetUtil.UTF_8).getBytes();
    }
}

That essentially is enough to setup a Couchbase-Kafka bridge:

package example;

import com.couchbase.kafka.CouchbaseKafkaConnector;
import com.couchbase.kafka.CouchbaseKafkaEnvironment;
import com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment;

public class Example {
    public static void main(String[] args) {
        DefaultCouchbaseKafkaEnvironment.Builder builder =
                (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment
                        .builder()
                        .kafkaFilterClass("example.SampleFilter")
                        .kafkaValueSerializerClass("example.SampleEncoder")
                        .kafkaTopic("default")
                        .kafkaZookeeperAddress("kafka1.vagrant")
                        .couchbaseNodes("couchbase1.vagrant")
                        .couchbaseBucket("default")
                        .dcpEnabled(true);
        CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(builder.build());
        connector.run();
    }
}

It is also possible to start with some known state or to watch a limited set of partitions. The example below will stream only partition 115 starting from the beginning (see also currentState() and loadState() helpers).

ConnectorState startState = connector.startState(115);
ConnectorState endState = connector.endState(115);
connector.run(startState, endState);

The couchbase1.vagrant and kafka1.vagrant addresses above are the locations of Couchbase Server and Kafka respectively, which could be easily set up using provisioning scripts from the env/ directory. Just navigate there and run vagrant up. Vagrant scripts using Ansible (installation guide).

License

Copyright 2015 Couchbase Inc.

Licensed under the Apache License, Version 2.0.

See the Apache 2.0 license.

About

A Couchbase to Kafka connector.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 91.2%
  • Groovy 7.4%
  • Ruby 1.4%