Skip to content

stormaref/KafkaStorm

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

54 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

KafkaStorm

Simple .net client for Kafka based on Confluent.Kafka

Features

  • Create queue for messages that couldn't be send
  • Concurrent consumers
  • Producing messages concurrently

Installation

Using package manager:

Install-Package KafkaStorm -Version 8.1.0

Usage/Examples

Setup

using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;

builder.Services.AddKafkaStorm(factory =>
{
    factory.AddProducer(prf =>
    {
        prf.ConfigProducer(new ProducerConfig
        {
            BootstrapServers = host
        });

        prf.InMemoryQueue();

        prf.SetQueueLimit(65536);
    });

    // Use this line for starting producer queue:
    factory.StartProducerHostedService();

    factory.AddConsumers(crf =>
    {
        crf.AddConsumer<HelloConsumer, HelloEvent>(new ConsumerConfig
        {
            BootstrapServers = "localhost:29092",
            GroupId = "TestGroup"
        }, "topicName");
    });
});

It's the same ConsumerConfig as Confluent.Kafka

New Feature 🎉

Adding consumers is even easier now:

using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;

builder.Services.AddKafkaStorm(factory =>
{
    factory.AddConsumers(crf =>
    {
        var config = new ConsumerConfig { BootstrapServers = "localhost:29092", GroupId = "TestGroup" };
            
        //This line can add all consumers in the assembly with their according messages automatically
        crf.AddConsumersFromAssembly(Assembly.GetExecutingAssembly(), config);
    });
});

Just make sure that messages you want to be consumed automatically, implement IMessage interface

Consuming

using KafkaStorm.Interfaces;  
using Microsoft.Extensions.Logging;

public class HelloConsumer : IConsumer<HelloEvent>  
{  
  private readonly ILogger<HelloConsumer> _logger;  
  
  public HelloConsumer(ILogger<HelloConsumer> logger)  
 {
	 _logger = logger;  
 }  
  public async Task Handle(HelloEvent @event, CancellationToken cancellationToken)  
 {  
	 _logger.LogDebug("Message Received");  
 }}

Event

Your event (message) can be any class like this:

public class HelloEvent  
{  
  public HelloEvent(DateTime time)  
 {
	 Message = "Hello";  
	 Time = time;  
 }  
 
  public string Message { get; }  
  public DateTime Time { get; }  
}

Attention: if your class contains a property with Interface type it may cause exception while deserializing JSON

Producing

Just use IProducer like a service (initialize it with constructor):

using KafkaStorm.Interfaces;

private readonly IProducer _producer;  
  
public TestService(IProducer producer)  
{  
	_producer = producer;  
}
  • Produce with queue

_producer.Produce(new HelloEvent(DateTime.Now), "topicName");
  • Produce right now

await _producer.ProduceNowAsync(new HelloEvent(DateTime.Now), "topicName");

Author

@stormaref

Related

Here are some related projects

Confluent's .NET Client