Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide example of creating Kafka topics #139

Open
CeeJayCee opened this issue Aug 19, 2021 · 6 comments
Open

Provide example of creating Kafka topics #139

CeeJayCee opened this issue Aug 19, 2021 · 6 comments
Labels
discussion enhancement New feature or request

Comments

@CeeJayCee
Copy link

CeeJayCee commented Aug 19, 2021

I have just started investigating Silverback to use with Kafka. Apologies for the simple question:

What is the recommended pattern for creating and configuring topics in Kafka when using Silverback?

I have a basic .NetCore project using a simple Controller. The Controller takes an IEventPublisher in the constructor and uses this to publish messages when an HTTP end point is called.

The topic is created automatically, but I would like control over the creation options.

I have been able to take an IConfluentAdminClientBuilder in the constructor of my controller, and use this to build an IAdminClient which can then call CreateTopicsAsync(). This doesn't feel right!

Is there a better recommended way?

Thanks :-)

@msallin
Copy link
Collaborator

msallin commented Aug 19, 2021

We use a BrokerCallbackHandler to do this.

The following two classes show how to get the configured topics from Silverback.

internal class ConsumedKafkaTopicsProvider : IKafkaTopicsProvider
{
    private readonly IBrokerCollection _brokers;

    public ConsumedKafkaTopicsProvider(IBrokerCollection brokers)
    {
        _brokers = brokers;
    }

    public IList<string> GetKafkaTopics()
    {
        var kafkaBroker = _brokers.OfType<KafkaBroker>().FirstOrDefault();

        if (kafkaBroker == null)
        {
            return Array.Empty<string>();
        }

        return kafkaBroker.Consumers.OfType<KafkaConsumer>().Select(c => c.Endpoint.Name).ToList();
    }
}
internal class ProducedKafkaTopicsProvider : IKafkaTopicsProvider
{
    private readonly IServiceProvider _serviceProvider;

    private readonly IOutboundRoutingConfiguration _routingConfiguration;

    public ProducedKafkaTopicsProvider(IServiceProvider serviceProvider, IOutboundRoutingConfiguration routingConfiguration)
    {
        _serviceProvider = serviceProvider;
        _routingConfiguration = routingConfiguration;
    }

    public IList<string> GetKafkaTopics()
    {
        return _routingConfiguration.Routes
            .SelectMany(r => r.GetOutboundRouter(_serviceProvider).Endpoints)
            .OfType<KafkaProducerEndpoint>()
            .Select(e => e.Name)
            .ToList();
    }
}

The class KafkaTopicsCreator gets the topics from all registered IKafkaTopicsProvider and uses the IAdminClient to create them.

internal class KafkaTopicsCreator : IKafkaTopicsCreator
{
    private readonly IEnumerable<IKafkaTopicsProvider> _topicsProviders;
    private readonly IAdminClient _adminClient;
    private readonly ILogger<KafkaTopicsCreator> _logger;

    public KafkaTopicsCreator(IEnumerable<IKafkaTopicsProvider> topicsProviders, IAdminClient adminClient, ILogger<KafkaTopicsCreator> logger)
    {
        _topicsProviders = topicsProviders;
        _adminClient = adminClient;
        _logger = logger;
    }

    public async Task CreateTopicsAsync(KafkaTopicCreationSettings settings)
    {
        IList<string> topics = _topicsProviders
            .SelectMany(p => p.GetKafkaTopics())
            .Distinct(StringComparer.InvariantCulture)
            .ToList();

        if (!topics.Any())
        {
            return;
        }

        _logger.ExecutingCreateTopics(topics);

        try
        {
            await _adminClient.CreateTopicsAsync(topics.Select(t => new TopicSpecification { Name = t, NumPartitions = settings.Partitions, ReplicationFactor = settings.ReplicationFactor }));
            _logger.ExecutedCreateTopics();
        }
        catch (CreateTopicsException ex)
        {
            if (ex.Results.All(r =>
                (r.Error.IsError && r.Error.Code == ErrorCode.TopicAlreadyExists) || (!r.Error.IsError)))
            {
                _logger.AllTopicsAlreadyExist();
                return;
            }

            throw;
        }
    }
}

We provide this functionality as a library and hence defined the necessary registrations in an extension method.

public static ISilverbackBuilder CreateConfiguredKafkaTopicsIfEnabled(this ISilverbackBuilder builder, IConfiguration configuration)
{
    var kafkaConnectionSettings = new KafkaConnectionSettings();
    configuration.Bind(KafkaConnectionSettings.Name, kafkaConnectionSettings);

    if (kafkaConnectionSettings.KafkaTopicCreation.CreateKafkaTopics)
    {
        builder.Services.AddSingleton(_ =>
        {
            var clientConfig = KafkaConfigFactory.CreateKafkaClientConfig(kafkaConnectionSettings);
            return new AdminClientBuilder(clientConfig).Build();
        });

        builder.Services.AddSingleton<IKafkaTopicsProvider, ConsumedKafkaTopicsProvider>();
        builder.Services.AddSingleton<IKafkaTopicsProvider, ProducedKafkaTopicsProvider>();
        builder.Services.AddSingleton<IKafkaTopicsCreator, KafkaTopicsCreator>();
        builder.AddSingletonBrokerCallbackHandler<TopicsCreationCallbackHandler>();
    }

    return builder;
}

You can use it like this

services.AddSilverback()
    .WithConnectionToMessageBroker(builder => builder.AddMockedKafka())
    .CreateConfiguredKafkaTopicsIfEnabled(Configuration)
    .AddEndpointsConfigurator<EndpointsConfigurator>();

@BEagle1984 @meggima, I see no reason to keep this into our own library. Move it to Silverback?

Disclaimer: This code is sponsored by Swiss Post :)

@BEagle1984
Copy link
Owner

The admin API is not covered (not abstracted, not proxied) by Silverback. Internally I use it to get the topic metadata and the IConfluentAdminClientBuilder was added explicitly for that and originally intended for internal use only.
It's main purpose is allowing the tests to replace the actual AdminClient with a mock. I currently only mock the GetMetaData though. All other methods will thrown a NotSupportedException, which isn't probably ideal in your case.

That being said I see nothing wrong in using the AdminClient to create the topics and I honestly don't see the point in abstracting/wrapping this in Silverback, being it a just very Kafka specific. Plus I wouldn't add any value beside proxying the calls to the underlying AdminClient, just adding overhead to keep up with the changes that may occur in the Confluent library.

This kind of initialization should be done at startup though, not in the controller's constructor. You could either trigger it from the Startup.Configure method, from within an IHostedService or binding to a Silverback's callback as suggested by @msallin (even though it might not be relevant and not necessary in your case).

If I understood correctly, you would like to customize the topic parameters, so the approach proposed by @msallin, automatically looping into the configured topics and creating them all with the same/default settings wouldn't suit you well. Correct?

@msallin, @meggima: we surely could move that helper into Silverback (it should be slightly modified since it relies on the KafkaConnectionSettings, which are SwissPost specific...but as said I already deal with the creation of the AdminClient in Silverback, so it wouldn't be an issue at all). I wonder if we should offer some hooks to configure the topics (and how).
Note that this approach implies some assumptions, like that the user you use to authenticate at runtime actually has the necessary permissions to create topics (which probably shouldn't and some poople would prefer to use a different user, much like many would do to run the database migrations).

@BEagle1984
Copy link
Owner

@CeeJayCee, what would you expect from Silverback to help you with a cleaner implementation?

@CeeJayCee
Copy link
Author

@msallin thank you, this works perfectly!

@BEagle1984 I fully understand not wanting to wrap IAdminClient, it could become a maintenance nightmare.

It would be nice if Silverback provided a configuration option to create the topic with options if it doesn't exist. Maybe in the IEndpointsConfigurator?

However, @msallin's example will work for my use-case.

Thanks to you both 👍

@BEagle1984
Copy link
Owner

It would be nice if Silverback provided a configuration option to create the topic with options if it doesn't exist. Maybe in the IEndpointsConfigurator?

I'm considering this for a future release. 👍

(BTW contributions are always welcome! 😉)

@BEagle1984 BEagle1984 added the enhancement New feature or request label Aug 19, 2021
@msallin
Copy link
Collaborator

msallin commented Aug 19, 2021

Just take into account our use case, which might be common. We need a setting to turn creation of topics off, because it's only used locally and in different kind of tests. Not on prod.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants