/
KafkaProducer.cs
60 lines (50 loc) · 1.96 KB
/
KafkaProducer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
using System.Text.Json;
using Confluent.Kafka;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
namespace Helpdesk.Api.Core.Kafka;
public class KafkaProducer(IConfiguration configuration): IProjection
{
private const string DefaultConfigKey = "KafkaProducer";
private readonly KafkaProducerConfig config =
configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();
public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
{
foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events))
{
await Publish(@event.Data, ct);
}
}
public void Apply(IDocumentOperations operations, IReadOnlyList<StreamAction> streams) =>
throw new NotImplementedException("Producer should be only used in the AsyncDaemon");
private async Task Publish(object @event, CancellationToken ct)
{
try
{
using var producer = new ProducerBuilder<string, string>(config.ProducerConfig).Build();
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(config.ProducerTimeoutInMs ?? 1000));
await producer.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = JsonSerializer.Serialize(@event)
}, cts.Token).ConfigureAwait(false);
}
catch (Exception exc)
{
Console.WriteLine(exc.Message);
throw;
}
}
}
public class KafkaProducerConfig
{
public ProducerConfig? ProducerConfig { get; set; }
public string? Topic { get; set; }
public int? ProducerTimeoutInMs { get; set; }
}