Skip to content

This repository contains a Kafka Connect sink connector for copying data from Apache Kafka into IBM Cloud Object Storage

License

Notifications You must be signed in to change notification settings

ibm-messaging/kafka-connect-ibmcos-sink

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

72 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kafka-connect-ibmcos-sink

Kafka Connect Sink Connector for IBM Cloud Object Storage.

Using this connector, you can:

  • Copy data from a Kafka topic into IBM Cloud Object Storage.
  • Flexibly batch together multiple Kafka records into a single object for efficient retrieval from object storage.
  • Preserve ordering of Kafka data. Kafka ordering of records within a partition is maintained as the data is written into object storage.
  • Transfer data exactly once (subject to some restrictions, see Exactly once delivery).

Content

Building the connector

To build the connector, you must have the following installed:

Clone the repository with the following command:

git clone https://github.com/ibm-messaging/kafka-connect-ibmcos-sink

Change directory into the kafka-connect-ibmcos-sink directory:

cd kafka-connect-ibmcos-sink

Build the connector using Gradle:

$ gradle shadowJar

Configuration

  • cos.api.key (required) - API key used to connect to the Cloud Object Storage service instance.

  • cos.bucket.location (required) - Location of the Cloud Object Storage service bucket, for example: eu-gb.

  • cos.bucket.name (required) - Name of the Cloud Object Storage service bucket to write data into.

  • cos.bucket.resiliency (required) - Resiliency of the Cloud Object Storage bucket. Must be one of: cross-region, regional, or single-site.

  • cos.endpoint.visibility (optional) - Specify public to connect to the Cloud Object Storage service over the public internet, or private to connect from a connector running inside the IBM Cloud network, for example from an IBM Cloud Kubernetes Service cluster. The default is public.

  • cos.object.deadline.seconds (optional) - The number of seconds (as measured wall clock time for the Connect Task instance) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object. This can be useful in situations where there are long pauses between Kafka records being produced to a topic, as it ensures that any records received by this connector will always be written into object storage within the specified period of time.

  • cos.object.interval.seconds (optional) - The number of seconds (as measured by the timestamps in Kafka records) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object.

  • cos.object.records (optional) - The maximum number of Kafka records to combine into a object.

  • cos.object.record.delimiter.nl (optional) - If set to true (default false) records within a single object will be separated by new line.

  • cos.service.crn (required) - CRN for the Cloud Object Storage service instance.

  • cos.endpoints.url (optional) - Endpoints URL for the Cloud Object Storage instance. Only set this in environments where a non-default set of endpoints is required.

  • cos.writer.format (optional) - Determines the output format of files written to COS. Can be "json" (default) or "parquet". It is recommended to set cos.object.record.delimiter.nl to "true" when JSON format is chosen. With Parquet the delimiter setting is silently ignored.

  • cos.writer.schema.uri (required if Parquet output format is selected) - Points to the Avro schema of records to be written in the Parquet file. Currently only the file URI schema is supported.

  • cos.writer.parquet.buffer.size (optional) Specifies the size, in bytes, of the Parquet output stream buffer. The value must be sufficient to accommodate the entire batch of records as determined by cos.object.deadline.seconds, cos.interval.seconds, and cos.object.records. Default: 26214400 (25 Mib).

  • cos.writer.parquet.write.mode (optional) Can be "create" (default) or "overwrite". If "create" is specified and the file already exists, an exception will be raised.

  • cos.writer.parquet.compression.codec (optional) Can be one of the following values:
    "none", "uncompressed", "snappy" (default), "gzip", "lzo", "brotli", "lz4", "zstd".

  • cos.writer.parquet.row.group.size (optional) Determines the Parquet file row group size in bytes. Default value: 536870912 (512 Mib).

  • cos.writer.parquet.page.size (optional) Determines the Parquet file page size in bytes. Default value: 65536 (64 Kib).

  • cos.writer.parquet.enable.dictionary (optional) Set to "false" to disable the column dictionaries. Default: true.

  • value.converter.cos.writer.schema.uri (required if Parquet output format is selected) - Must be set to the same value as cos.writer.schema.uri.

Note that while the configuration properties cos.object.deadline.seconds, cos.interval.seconds, and cos.object.records are all listed as optional, at least one of these properties must be set to a non-default value.

Object Names

The connector uses the following scheme to name Cloud Object Storage objects:

topic/partition/firstoffset-lastoffset

Both firstoffset and lastoffset refer to the Kafka offsets of the records included in this object and are padded to 16 characters using zeros.

For example:

mytopic/1/0000000000017805-0000000000017809

means this objects contains 5 Kafka records, offsets 17805 to 17809, from partition 1 of mytopic.

When Parquet output format is selected the suffix .parquet is added to the object name.

Combining multiple Kafka records into an object

Typically Kafka records are much smaller than the maximum size an object storage object. And while it is possible to create an object for each Kafka record this is usually not an efficient way to use Cloud Object Storage. This connector offers three different controls for deciding how much Kafka data gets combined into a object:

  • cos.object.records
  • cos.object.interval.seconds
  • cos.object.deadline.seconds

At least one of these configuration properties must be specified. If more than one property is specified then an object is written at the point the first of these limits is reached.

For example, given the following configuration:

cos.object.records=100
cos.object.deadline.seconds=60

Assuming that at least one Kafka record is available to be written into an object then objects will be created either: every minute, or after 100 Kafka records have been received, whichever condition occurs first.

Exactly once delivery

This connector can be used to provide exactly once delivery of Kafka records into object storage. When used like this, data is copied without loss or duplication.

Exactly once delivery requires that Kafka records are combined into objects in a completely deterministic way. Which is to say that given a stream of Kafka records to process, the connector will always group the same records into each Cloud Object Storage object.

Using either the cos.object.records property or the cos.object.interval.seconds property (or both together) will result in deterministic processing of Kafka records into objects. However the cos.object.deadline.seconds option cannot be used for exactly once delivery as the grouping of Kafka records into objects is dependent on the speed at which the system hosting the connector can process records.

Provisioning an IBM Cloud Object Storage Service instance

To use this connector you must provision an instance of the IBM Cloud Object Storage Service. Once provisioned, navigate to the Service Credentials tab in your instance to retrieve the required configurations for the connector. You also need to create a Bucket.

Running the connector

To run the connector, you must have:

  • The JAR from building the connector
  • A properties file containing the configuration for the connector
  • Apache Kafka 1.1.0 or later, either standalone or included as part of an offering such as IBM Event Streams

The connector can be run in a Kafka Connect worker in either standalone (single process) or distributed mode.

Standalone Mode

You need two configuration files, one for the configuration that applies to all of the connectors such as the Kafka bootstrap servers, and another for the configuration specific to the IBM Cloud Object Storage sink connector such as the connection information for your Cloud Object Storage service. For the former, the Kafka distribution includes a file called connect-standalone.properties that you can use as a starting point. For the latter, you can use config/cos-sink.properties in this repository after replacing all placeholders.

To run the connector in standalone mode from the directory into which you installed Apache Kafka, you use a command like this:

bin/connect-standalone.sh connect-standalone.properties cos-sink.properties

Distributed Mode

You need an instance of Kafka Connect running in distributed mode. The Kafka distribution includes a file called connect-distributed.properties that you can use as a starting point.

To start the COS connector, you can use config/cos-sink.json in this repository after replacing all placeholders, and use a command like this:

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors \
  --data "@./config/cos-sink.json" 

About

This repository contains a Kafka Connect sink connector for copying data from Apache Kafka into IBM Cloud Object Storage

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages