Skip to content

eo-cqrs/eo-kafka

Repository files navigation

logo

This nice logo made by @l3r8yJ

Managed By Self XDSD

EO principles respected here DevOps By Rultor.com We recommend IntelliJ IDEA

mvn maven central javadoc codecov

Hits-of-Code Lines-of-Code PDD status License

Project architect: @h1alexbel

EO Kafka Producers and consumers for working with Apache Kafka message broker.

Read Kafka Producers and Consumers for Elegant Microservices, the blog post about EO-Kafka
and EO-Kafka with Spring, how to connect EO-Kafka with Spring.

Motivation. We are not happy with Spring Kafka, because it is very procedural and not object-oriented. eo-kafka is suggesting to do almost exactly the same, but through objects.

Principles. These are the design principles behind eo-kafka.

How to use. All you need is this (get the latest version here):

Maven:

<dependency>
  <groupId>io.github.eo-cqrs</groupId>
  <artifactId>eo-kafka</artifactId>
</dependency>

To use it with Spring Boot:

<dependency>
  <groupId>io.github.eo-cqrs</groupId>
  <artifactId>eo-kafka</artifactId>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Gradle:

dependencies {
    compile 'io.github.eo-cqrs:eo-kafka:<version>'
}

Messages

To create Kafka Message with Topic, Key and Value:

final Message<String, String> msg = new Tkv<>("test.topic", "test-k", "test-v");

Creation Kafka Message with Partition:

final Message<String, String> msg = 
  new WithPartition<>(
    0,
    new Tkv<>(
      "test.topic",
      "test-k",
      "test-v"
    )
  );

Creation Kafka Message with Timestamp:

final Message<String, String> msg =
  new Timestamped<>(
      tmstmp,
      new WithPartition<>(
        partition,
        new Tkv<>(
          topic,
          key,
          value
        )
    )
);

Producers

To create Kafka Producer you can wrap original KafkaProducer:

final KafkaProducer origin = ...;
final Producer<String, String> producer = new KfProducer<>(origin);

Or construct it with KfFlexible:

final Producer<String, String> producer =
  new KfProducer<>(
    new KfFlexible<>(
      new KfProducerParams(
        new KfParams(
          new BootstrapServers("localhost:9092"),
          new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"),
          new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
        )
      )
    )
  );

Or create it with XML file:

final Producer<String, String> producer =
  new KfProducer<>(
    new KfXmlFlexible<String, String>(
      "producer.xml" // file with producer config
    )
);

btw, your XML file should be in the resources look like:

<producer>
  <bootstrapServers>localhost:9092</bootstrapServers>
  <keySerializer>org.apache.kafka.common.serialization.StringSerializer</keySerializer>
  <valueSerializer>org.apache.kafka.common.serialization.StringSerializer</valueSerializer>
</producer>

Since version 0.4.6 you can create Producer with JSON file:

final Producer<String, String> producer =
  new KfProducer<>(
    new KfJsonFlexible<String, String>(
      "producer.json" // file with producer config
    )  
);

Your JSON, located in resources directory, should look like this:

{
  "bootstrapServers": "localhost:9092",
  "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
  "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer"
}

Since version 0.5.6 you can create Producer with YAML file:

final Producer<String, String> producer =
  new KfProducer<>(
    new KfYamlProducerSettings<>(
      "producer.yaml"
  )
);

Your YAML, located in resources directory, should look like this:

bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

To send a message:

try (final Producer<String, String> producer = ...) {
      producer.send(
        new WithPartition<>(
          0,
          new Tkv<>(
            "xyz.topic",
            "key",
            "message"
        )
      )
    );
    } catch (Exception e) {
        throw new IllegalStateException(e);
  }
}

Also, you can create KfCallback, Kafka Producer with Async Callback support:

final Producer<String, String> producer =
  new KfCallback<>(
    new KfFlexible<>(
      new KfProducerParams(
        new KfParams(
          // producer params
        )
      )
    ),
    new Callback() {
      @Override
      public void onCompletion(final RecordMetadata meta, final Exception ex) {
        // logic
      }
    }
);    

Consumers

To create Kafka Consumer you can wrap original KafkaConsumer:

final KafkaConsumer origin = ...;
final Consumer<String, String> producer = new KfConsumer<>(origin);

Using KfFlexible:

final Consumer<String, String> consumer =
  new KfConsumer<>(
    new KfFlexible<>(
      new KfConsumerParams(
        new KfParams(
          new BootstrapServers("localhost:9092"),
          new GroupId("1"),
          new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
          new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
        )
      )
    )
  );

And XML File approach:

final Consumer<String, String> consumer =
  new KfConsumer<>(
    new KfXmlFlexible<String, String>("consumer.xml")
);

Again, XML file should be in the resources look like:

<consumer>
  <bootstrapServers>localhost:9092</bootstrapServers>
  <groupId>1</groupId>
  <keyDeserializer>org.apache.kafka.common.serialization.StringDeserializer</keyDeserializer>
  <valueDeserializer>org.apache.kafka.common.serialization.StringDeserializer</valueDeserializer>
</consumer>

Since version 0.4.6 you can create Consumer with JSON file:

final Consumer<String, String> producer =
  new KfConsumer<>(
    new KfJsonFlexible<String, String>(
      "consumer.json" // file with producer config
    )  
);

Your JSON, located in resources directory, should looks like this:

{
 "bootstrapServers": "localhost:9092",
 "groupId": "1",
 "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
 "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}

Since version 0.5.6 you can create Consumer with YAML file:

final Consumer<String, String> consumer = 
  new KfConsumer<>(
    new KfYamlConsumerSettings<>(
      "consumer.yaml"
    )
);

Your YAML, located in resources directory, should look like this:

bootstrap-servers: localhost:9092
group-id: "1"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Consuming messages:

try (
  final Consumer<String, String> consumer =
      new KfConsumer<>(
        new KfFlexible<>(
          new KfConsumerParams(
            new KfParams(
              new BootstrapServers(this.severs),
              new GroupId("1"),
              new AutoOffsetReset("earliest"),
              new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
              new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
            )
          )
        )
      )
  ) {
  // you need to be subscribed on a topic to iterate over data in the topic
//    consumer.subscribe(new ListOf<>("orders-saga-init")));
//    or call #records(topic, duration) it will subscribe to the topic you provide
      final ConsumerRecords<String, String> records = consumer.records("orders-saga-init", Duration.ofSeconds(5L));
    }
  }

Also, you can subscribe with ConsumerRebalanceListener:

consumer.subscribe(new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
    }
    @Override
    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
    }
  }, "<your topic>");
 }
);

Finally, you can unsubscribe:

consumer.unsubscribe();

Fakes

In case of mocking eo-kafka, you can use existing Fake Objects from io.github.eocqrs.kafka.fake package. They look like a normal ones, but instead of talking to real Kafka broker, they are manipulating in-memory XML document.

FkBroker

final FkBroker broker = new InXml(
   new Synchronized(
     new InFile(
       "consumer-test", "<broker/>"
     )
   )
);

It will create in-memory XML document with following structure:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
  <topics/>
  <subs/>
</broker>

you can create a topic inside broker:

broker.with(new TopicDirs("fake.topic").value());

Under the hood XML will be modified to:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
  <topics>
    <topic>
      <name>fake.topic</name>
      <datasets/>
    </topic>
  </topics>
  <subs/>
</broker>

FkProducer

final Producer<String, String> producer =
    new FkProducer<>(
      UUID.randomUUID(),
      broker
);

FkConsumer

final Consumer<Object, String> consumer =
    new FkConsumer(
      UUID.randomUUID(),
      broker
);

Example with Fakes

final String topic = "test";
final Consumer<Object, String> consumer =
   new FkConsumer(UUID.randomUUID(),
     this.broker
       .with(new TopicDirs(topic).value())
   );
final Producer<String, String> producer =
   new FkProducer<>(UUID.randomUUID(), this.broker);
producer.send(
  new WithPartition<>(
      0,
      new Tkv<>(
        topic,
        "test1",
        "test-data-1"
      )
    )
);
producer.send(
  new WithPartition<>(
      0,
      new Tkv<>(
        topic,
        "test2",
        "test-data-2"
      )
    )
);
producer.send(
  new WithPartition<>(
      0,
      new Tkv<>(
        topic,
        "test-data-3",
        "test3"
      )
    )
);
final ConsumerRecords<Object, String> records =
   consumer.records(topic, Duration.ofSeconds(1L));
final List<String> datasets = new ListOf<>();
records.forEach(rec -> datasets.add(rec.value()));
MatcherAssert.assertThat(
   "First datasets in right format",
   datasets,
   Matchers.contains("test-data-1", "test-data-2", "test-data-3")
);

As well as production producers and consumers, fake ones also should be closed after things been done:

fake.close();

Under the hood XML document will looks like this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<broker>
  <topics>
    <topic>
      <name>test</name>
      <datasets>
        <dataset>
          <partition>0</partition>
          <key>test1</key>
          <value>test-data-1</value>
          <seen>true</seen>
        </dataset>
        <dataset>
          <partition>0</partition>
          <key>test2</key>
          <value>test-data-2</value>
          <seen>true</seen>
        </dataset>
        <dataset>
          <partition>0</partition>
          <key>test3</key>
          <value>test-data-3</value>
          <seen>true</seen>
        </dataset>
      </datasets>
    </topic>
  </topics>
  <subs>
    <sub>
      <topic>test</topic>
      <consumer>aa4a2008-764b-4e19-9368-8250df4bea38</consumer>
    </sub>
  </subs>
</broker>

By the version 0.3.5, eo-kafka support only String values in FkConsumer.

Configs

Kafka Property eo-kafka API XML/JSON tag YAML
bootstrap.servers BootstrapServers bootstrapServers bootstrap-servers
key.serializer KeySerializer keySerializer key-serializer
value.serializer ValueSerializer valueSerializer value-serializer
key.deserializer KeyDeserializer keyDeserializer key-deserializer
value.deserializer ValueDeserializer valueDeserializer value-Deserializer
group.id GroupId groupId group-id
auto.offset.reset AutoOffsetReset autoOffsetReset auto-offset-reset
client.id ClientId clientId client-id
client.rack ClientRack clientRack client-rack
acks Acks acks acks
security.protocol SecurityProtocol securityProtocol security-protocol
sasl.jaas.config SaslJaasConfig saslJaasConfig sasl-jaas-config
sasl.mechanism SaslMechanism saslMechanism sasl-mechanism
batch.size BatchSize batchSize batch-size
buffer.memory BufferMemory bufferMemory buffer-memory
linger.ms LingerMs lingerMs linger-ms
retries Retries retries retries
retry.backoff.ms RetryBackoffMs retryBackoffMs retry-backoff-ms
compression.type CompressionType compressionType compression-type
partition.assignment.strategy PartitionAssignmentStrategy partitionAssignmentStrategy partition-assignment-strategy
max.poll.records MaxPollRecords maxPollRecords max-poll-records
max.poll.interval.ms MaxPollIntervalMs maxPollIntervalMs max-poll-intervalMs
heartbeat.interval.ms HeartbeatIntervalMs heartbeatIntervalMs heartbeat-interval-ms
enable.auto.commit EnableAutoCommit enableAutoCommit enable-auto-commit
session.timeout.ms SessionTimeoutMs sessionTimeoutMs session-timeout-ms
max.partition.fetch.bytes MaxPartitionFetchBytes maxPartitionFetchBytes max-partition-fetch-bytes
fetch.max.wait.ms FetchMaxWaitMs fetchMaxWaitMs fetch-max-wait-ms
fetch.min.bytes FetchMinBytes fetchMinBytes fetch-min-bytes
fetch.max.bytes FetchMaxBytes fetchMaxBytes fetch-max-bytes
send.buffer.bytes SendBufferBytes sendBufferBytes send-buffer-bytes
receive.buffer.bytes ReceiveBufferBytes receiveBufferBytes receive-buffer-bytes
max.block.ms MaxBlockMs maxBlockMs max-block-ms
max.request.size MaxRqSize maxRequestSize max-request-size
group.instance.id GroupInstanceId groupInstanceId group-instance-id
max.in.flight.requests.per.connection MaxInFlightRq maxInFlightRequestsPerConnection max-in-flight-requests-per-connection
delivery.timeout.ms DeliveryTimeoutMs deliveryTimeoutMs delivery-timeout-ms
enable.idempotence EnableIdempotence enableIdempotence enable-idempotence

How to Contribute

Fork repository, make changes, send us a pull request. We will review your changes and apply them to the master branch shortly, provided they don't violate our quality standards. To avoid frustration, before sending us your pull request please run full Maven build:

$ mvn clean install

You will need Maven 3.8.7+ and Java 17+.

If you want to contribute to the next release version of eo-kafka, please check the project board.

Our rultor image for CI/CD.