Skip to content

Latest commit

History

History

kafka-to-s3

Folders and files

NameName
Last commit message
Last commit date

parent directory

..

Streaming data from Kafka to S3 using Kafka Connect

This uses Docker Compose to run the Kafka Connect worker.

  1. Create the S3 bucket, make a note of the region

  2. Obtain your access key pair

  3. Update aws_credentials

    • Alternatively, uncomment the environment lines for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY and set the values here instead

  4. Bring the Docker Compose up

    docker-compose up -d
  5. Make sure everything is up and running

    $ docker-compose ps
         Name                  Command               State                    Ports
    ---------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run   Up             0.0.0.0:9092->9092/tcp
    kafka-connect     bash -c #                   Up (healthy)   0.0.0.0:8083->8083/tcp, 9092/tcp
                      echo "Installing ...
    ksqldb            /usr/bin/docker/run         Up             0.0.0.0:8088->8088/tcp
    schema-registry   /etc/confluent/docker/run   Up             0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run   Up             2181/tcp, 2888/tcp, 3888/tcp
  6. Create the Sink connector

    curl -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-voluble/config \
        -d '
     {
    		"connector.class": "io.confluent.connect.s3.S3SinkConnector",
    		"key.converter":"org.apache.kafka.connect.storage.StringConverter",
    		"tasks.max": "1",
    		"topics": "cats",
    		"s3.region": "us-east-1",
    		"s3.bucket.name": "rmoff-voluble-test",
    		"flush.size": "65536",
    		"storage.class": "io.confluent.connect.s3.storage.S3Storage",
    		"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    		"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    		"schema.compatibility": "NONE",
            "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
            "transforms": "AddMetadata",
            "transforms.AddMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.AddMetadata.offset.field": "_offset",
            "transforms.AddMetadata.partition.field": "_partition"
    	}
    '

    Things to customise for your environment:

    • topics : the source topic(s) you want to send to S3

    • key.converter : match the serialisation of your source data (see here)

    • value.converter : match the serialisation of your source data (see here)

    • transforms : remove this if you don鈥檛 want partition and offset added to each message


If you want to create the data generator and view the data in ksqlDB:

docker exec -it ksqldb ksql http://ksqldb:8088
CREATE SOURCE CONNECTOR s WITH (
  'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',

  'genkp.owners.with' = '#{Internet.uuid}',
  'genv.owners.name.with' = '#{Name.full_name}',
  'genv.owners.creditCardNumber.with' = '#{Finance.credit_card}',

  'genk.cats.name.with' = '#{FunnyName.name}',
  'genv.cats.owner.matching' = 'owners.key',

  'genk.diets.catName.matching' = 'cats.key.name',
  'genv.diets.dish.with' = '#{Food.vegetables}',
  'genv.diets.measurement.with' = '#{Food.measurements}',
  'genv.diets.size.with' = '#{Food.measurement_sizes}',

  'genk.adopters.name.sometimes.with' = '#{Name.full_name}',
  'genk.adopters.name.sometimes.matching' = 'adopters.key.name',
  'genv.adopters.jobTitle.with' = '#{Job.title}',
  'attrk.adopters.name.matching.rate' = '0.05',
  'topic.adopters.tombstone.rate' = '0.10',

  'global.history.records.max' = '100000'
);
SHOW TOPICS;
PRINT cats;

References