Skip to content

The purpose of this repository is to explain step by step how to build producer and consumer applications that interact using Confluent Kafka.

Notifications You must be signed in to change notification settings

p-brito/kafka-dotnet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 

Repository files navigation

kafka-dotnet

The purpose of this repository is to explain step by step how to build producer and consumer applications that interact using Confluent Kafka. Create a cluster

Getting started

  • If you don't have the docker installed, please do install it here.

  • Let's set the confluent platform to run locally, download this confluent platform all-in-one docker compose file.

  • Then run the following command docker-compose up -d. This will start the confluent platform, it might take some minutes.

  • Go to http://localhost:9021 and create a topic.

  • You are ready! If not, click here for more details.

Confluent Kafka

Producing a message using the Confluent Kafka.

  • To instantiate a producer you need a ProducerConfig. In this example, we will only define the BootstrapServers but you can define the BatchSize, CompressionType, and many others.
ProducerConfig config = new()
{
    BootstrapServers = this.Options.Server
};
  • Building the IProducer.
using IProducer<string, TMessage> producer = new ProducerBuilder<string, TMessage>(config).Build();
  • Producing a message.
string eventId = Guid.NewGuid().ToString();

Message<string, TMessage> @event = new()
{
    Key = eventId,
    Value = msg,
};

await producer.ProduceAsync(topic, @event, cancellationToken).ConfigureAwait(false);

Consuming a message using the Confluent Kafka.

  • To instantiate a consumer you need a ConsumerConfig. In this, example we define the BootstrapServers, the GroupId, and the AutoOffsetReset but you can define many others. The AutoOffReset is set to Earliest which means that it will automatically reset the offset to the earliest offset.
ConsumerConfig config = new()
{
    BootstrapServers = this.Options.Server,
    GroupId = $"{topic}-{Guid.NewGuid()}",
    AutoOffsetReset = AutoOffsetReset.Earliest
};
  • Building the IConsumer.
using IConsumer<string, TMessage> consumer = new ConsumerBuilder<string, TMessage>(config).Build();
  • Subscribing a topic.
consumer.Subscribe(topic);
  • Consuming a message.
var message = consumer.Consume(cancellationToken);

Kafka service

This is a wrapper around the confluent Kafka implementation, the goal was to simplify!

The IKafkaService has two methods:

  • ProduceAsync, all the logic above about producing messages to a topic is wrapped within this method.
  • ConsumeAsync, all the logic above about consuming messages from a topic is wrapped within this method.

The KafkaServiceOptions gather all the necessary configurations for the producer and consumer.

In your application add the service as follows:

services.Configure<KafkaServiceOptions>(this.Configuration.GetSection(nameof(KafkaServiceOptions)));

services.AddKafkaService();

Set the KafkaServiceOptions in your appsettings json file as follows:

{
  "KafkaServiceOptions": {
    "Server": "localhost:9092",
    "Topics": [ "myTopic" ]
  }
}

DEMO

Producing messages using the Kafka service wrapper.

try
{
    int count = 1;

    while (!stoppingToken.IsCancellationRequested)
    {
        await this.kafkaService.ProduceAsync($"Hi! I'm the event number {count}", "myTopic", stoppingToken).ConfigureAwait(false);

        count++;
    }
}
catch (Exception ex)
{
    this.Logger.LogError($"Exception: {ex.GetType().FullName} | " + $"Message: {ex.Message}");
}

Consuming messages using the Kafka service wrapper.

try
{
    MessageHandlerDelegate<string> handler = this.HandleAsync;

    await this.kafkaService.ConsumeAsync<string>("myTopic", handler, stoppingToken).ConfigureAwait(false);
}
catch (Exception ex)
{
    this.Logger.LogError($"Exception: {ex.GetType().FullName} | " + $"Message: {ex.Message}");
}

Notice, that a handler is passed, it's in this method where all the messages will be delivered.

private Task HandleAsync(string @event)
{
    this.Logger.LogInformation($"Event received: {@event}");

    return Task.CompletedTask;
}

Notice that this a demo, there is a lot of aspects that are not covered here.

About

The purpose of this repository is to explain step by step how to build producer and consumer applications that interact using Confluent Kafka.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages