-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide example of creating Kafka topics #139
Comments
We use a The following two classes show how to get the configured topics from Silverback. internal class ConsumedKafkaTopicsProvider : IKafkaTopicsProvider
{
private readonly IBrokerCollection _brokers;
public ConsumedKafkaTopicsProvider(IBrokerCollection brokers)
{
_brokers = brokers;
}
public IList<string> GetKafkaTopics()
{
var kafkaBroker = _brokers.OfType<KafkaBroker>().FirstOrDefault();
if (kafkaBroker == null)
{
return Array.Empty<string>();
}
return kafkaBroker.Consumers.OfType<KafkaConsumer>().Select(c => c.Endpoint.Name).ToList();
}
} internal class ProducedKafkaTopicsProvider : IKafkaTopicsProvider
{
private readonly IServiceProvider _serviceProvider;
private readonly IOutboundRoutingConfiguration _routingConfiguration;
public ProducedKafkaTopicsProvider(IServiceProvider serviceProvider, IOutboundRoutingConfiguration routingConfiguration)
{
_serviceProvider = serviceProvider;
_routingConfiguration = routingConfiguration;
}
public IList<string> GetKafkaTopics()
{
return _routingConfiguration.Routes
.SelectMany(r => r.GetOutboundRouter(_serviceProvider).Endpoints)
.OfType<KafkaProducerEndpoint>()
.Select(e => e.Name)
.ToList();
}
} The class internal class KafkaTopicsCreator : IKafkaTopicsCreator
{
private readonly IEnumerable<IKafkaTopicsProvider> _topicsProviders;
private readonly IAdminClient _adminClient;
private readonly ILogger<KafkaTopicsCreator> _logger;
public KafkaTopicsCreator(IEnumerable<IKafkaTopicsProvider> topicsProviders, IAdminClient adminClient, ILogger<KafkaTopicsCreator> logger)
{
_topicsProviders = topicsProviders;
_adminClient = adminClient;
_logger = logger;
}
public async Task CreateTopicsAsync(KafkaTopicCreationSettings settings)
{
IList<string> topics = _topicsProviders
.SelectMany(p => p.GetKafkaTopics())
.Distinct(StringComparer.InvariantCulture)
.ToList();
if (!topics.Any())
{
return;
}
_logger.ExecutingCreateTopics(topics);
try
{
await _adminClient.CreateTopicsAsync(topics.Select(t => new TopicSpecification { Name = t, NumPartitions = settings.Partitions, ReplicationFactor = settings.ReplicationFactor }));
_logger.ExecutedCreateTopics();
}
catch (CreateTopicsException ex)
{
if (ex.Results.All(r =>
(r.Error.IsError && r.Error.Code == ErrorCode.TopicAlreadyExists) || (!r.Error.IsError)))
{
_logger.AllTopicsAlreadyExist();
return;
}
throw;
}
}
} We provide this functionality as a library and hence defined the necessary registrations in an extension method. public static ISilverbackBuilder CreateConfiguredKafkaTopicsIfEnabled(this ISilverbackBuilder builder, IConfiguration configuration)
{
var kafkaConnectionSettings = new KafkaConnectionSettings();
configuration.Bind(KafkaConnectionSettings.Name, kafkaConnectionSettings);
if (kafkaConnectionSettings.KafkaTopicCreation.CreateKafkaTopics)
{
builder.Services.AddSingleton(_ =>
{
var clientConfig = KafkaConfigFactory.CreateKafkaClientConfig(kafkaConnectionSettings);
return new AdminClientBuilder(clientConfig).Build();
});
builder.Services.AddSingleton<IKafkaTopicsProvider, ConsumedKafkaTopicsProvider>();
builder.Services.AddSingleton<IKafkaTopicsProvider, ProducedKafkaTopicsProvider>();
builder.Services.AddSingleton<IKafkaTopicsCreator, KafkaTopicsCreator>();
builder.AddSingletonBrokerCallbackHandler<TopicsCreationCallbackHandler>();
}
return builder;
} You can use it like this services.AddSilverback()
.WithConnectionToMessageBroker(builder => builder.AddMockedKafka())
.CreateConfiguredKafkaTopicsIfEnabled(Configuration)
.AddEndpointsConfigurator<EndpointsConfigurator>(); @BEagle1984 @meggima, I see no reason to keep this into our own library. Move it to Silverback? Disclaimer: This code is sponsored by Swiss Post :) |
The admin API is not covered (not abstracted, not proxied) by Silverback. Internally I use it to get the topic metadata and the That being said I see nothing wrong in using the This kind of initialization should be done at startup though, not in the controller's constructor. You could either trigger it from the If I understood correctly, you would like to customize the topic parameters, so the approach proposed by @msallin, automatically looping into the configured topics and creating them all with the same/default settings wouldn't suit you well. Correct? @msallin, @meggima: we surely could move that helper into Silverback (it should be slightly modified since it relies on the |
@CeeJayCee, what would you expect from Silverback to help you with a cleaner implementation? |
@msallin thank you, this works perfectly! @BEagle1984 I fully understand not wanting to wrap It would be nice if Silverback provided a configuration option to create the topic with options if it doesn't exist. Maybe in the However, @msallin's example will work for my use-case. Thanks to you both 👍 |
I'm considering this for a future release. 👍 (BTW contributions are always welcome! 😉) |
Just take into account our use case, which might be common. We need a setting to turn creation of topics off, because it's only used locally and in different kind of tests. Not on prod. |
I have just started investigating Silverback to use with Kafka. Apologies for the simple question:
What is the recommended pattern for creating and configuring topics in Kafka when using Silverback?
I have a basic .NetCore project using a simple Controller. The Controller takes an
IEventPublisher
in the constructor and uses this to publish messages when an HTTP end point is called.The topic is created automatically, but I would like control over the creation options.
I have been able to take an
IConfluentAdminClientBuilder
in the constructor of my controller, and use this to build anIAdminClient
which can then callCreateTopicsAsync()
. This doesn't feel right!Is there a better recommended way?
Thanks :-)
The text was updated successfully, but these errors were encountered: