/
KafkaConsumer.cs
128 lines (112 loc) · 4.8 KB
/
KafkaConsumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
using System.Diagnostics;
using Confluent.Kafka;
using Core.Events;
using Core.Events.External;
using Core.Kafka.Events;
using Core.Kafka.Producers;
using Core.OpenTelemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using static Core.Extensions.DictionaryExtensions;
namespace Core.Kafka.Consumers;
public class KafkaConsumer: IExternalEventConsumer
{
private readonly KafkaConsumerConfig config;
private readonly IEventBus eventBus;
private readonly IActivityScope activityScope;
private readonly ILogger<KafkaConsumer> logger;
public KafkaConsumer(
IConfiguration configuration,
IEventBus eventBus,
IActivityScope activityScope,
ILogger<KafkaConsumer> logger
)
{
if (configuration == null)
throw new ArgumentNullException(nameof(configuration));
this.eventBus = eventBus;
this.activityScope = activityScope;
this.logger = logger;
// get configuration from appsettings.json
config = configuration.GetKafkaConsumerConfig();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
logger.LogInformation("Kafka consumer started");
// create consumer
using var consumer = new ConsumerBuilder<string, string>(config.ConsumerConfig).Build();
// subscribe to Kafka topics (taken from config)
consumer.Subscribe(config.Topics);
try
{
// keep consumer working until it get signal that it should be shuted down
while (!cancellationToken.IsCancellationRequested)
{
// consume event from Kafka
await ConsumeNextEvent(consumer, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception e)
{
logger.LogError("Error consuming Kafka message: {Message} {StackTrace}",e.Message, e.StackTrace);
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
private async Task ConsumeNextEvent(IConsumer<string, string> consumer, CancellationToken token)
{
try
{
//lol ^_^ - remove this hack when this GH issue is solved: https://github.com/dotnet/extensions/issues/2149#issuecomment-518709751
await Task.Yield();
// wait for the upcoming message, consume it when arrives
var message = consumer.Consume(token);
// get event type from name stored in message.Key
var eventEnvelope = message.ToEventEnvelope();
if (eventEnvelope == null)
{
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
// then we might get events that are from other module and we might not be able to deserialize them.
// In that case it's safe to ignore deserialization error.
// You may add more sophisticated logic checking if it should be ignored or not.
logger.LogWarning("Couldn't deserialize event of type: {EventType}", message.Message.Key);
if (!config.IgnoreDeserializationErrors)
throw new InvalidOperationException(
$"Unable to deserialize event {message.Message.Key}"
);
return;
}
await activityScope.Run($"{nameof(KafkaConsumer)}/{nameof(ConsumeNextEvent)}",
async (_, ct) =>
{
// publish event to internal event bus
await eventBus.Publish(eventEnvelope, ct).ConfigureAwait(false);
consumer.Commit();
},
new StartActivityOptions
{
Tags = Merge(
TelemetryTags.Messaging.Kafka.ConsumerTags(
config.ConsumerConfig.GroupId,
message.Topic,
message.Message.Key,
message.Partition.Value.ToString(),
config.ConsumerConfig.GroupId
),
new Dictionary<string, object?>
{
{ TelemetryTags.EventHandling.Event, eventEnvelope.Data.GetType() }
}),
Parent = eventEnvelope.Metadata.PropagationContext?.ActivityContext,
Kind = ActivityKind.Consumer
},
token
).ConfigureAwait(false);
}
catch (Exception e)
{
logger.LogError("Error producing Kafka message: {Message} {StackTrace}",e.Message, e.StackTrace);
}
}
}