Skip to content

Commit

Permalink
Fixed KafkaProducer to throw exception instead of infinitely trying t…
Browse files Browse the repository at this point in the history
…o publish message
  • Loading branch information
oskardudycz committed Apr 24, 2024
1 parent 9946912 commit 28eb184
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 49 deletions.
12 changes: 6 additions & 6 deletions Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs
Expand Up @@ -68,13 +68,13 @@ await foreach (var @event in subscription)
await HandleEvent(@event, ct).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
catch (RpcException rpcException) when (rpcException is { StatusCode: StatusCode.Cancelled } ||
rpcException.InnerException is ObjectDisposedException)
{
logger.LogWarning("Subscription was canceled.");
}
catch (ObjectDisposedException)
{
logger.LogWarning("Subscription was canceled by the user.");
logger.LogWarning(
"Subscription to all '{SubscriptionId}' dropped by client",
SubscriptionId
);
}
catch (Exception ex)
{
Expand Down
27 changes: 11 additions & 16 deletions Core.Kafka/Producers/KafkaProducer.cs
Expand Up @@ -11,23 +11,15 @@

namespace Core.Kafka.Producers;

public class KafkaProducer: IExternalEventProducer
public class KafkaProducer(
IConfiguration configuration,
IActivityScope activityScope,
ILogger<KafkaProducer> logger
): IExternalEventProducer
{
private readonly IActivityScope activityScope;
private readonly ILogger<KafkaProducer> logger;
private readonly KafkaProducerConfig config;
private readonly KafkaProducerConfig config = configuration.GetKafkaProducerConfig();

public KafkaProducer(
IConfiguration configuration,
IActivityScope activityScope,
ILogger<KafkaProducer> logger
)
{
this.activityScope = activityScope;
this.logger = logger;
// get configuration from appsettings.json
config = configuration.GetKafkaProducerConfig();
}
// get configuration from appsettings.json

public async Task Publish(IEventEnvelope @event, CancellationToken token)
{
Expand All @@ -39,14 +31,17 @@ public async Task Publish(IEventEnvelope @event, CancellationToken token)
using var p = new ProducerBuilder<string, string>(config.ProducerConfig).Build();
// publish event to kafka topic taken from config
using var cts =
new CancellationTokenSource(TimeSpan.FromMilliseconds(config.ProducerTimeoutInMs ?? 1000));
await p.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Key = @event.Data.GetType().Name,
// serialize event to message Value
Value = @event.ToJson(new PropagationContextJsonConverter())
}, ct).ConfigureAwait(false);
}, cts.Token).ConfigureAwait(false);
},
new StartActivityOptions
{
Expand Down
1 change: 1 addition & 0 deletions Core.Kafka/Producers/KafkaProducerConfig.cs
Expand Up @@ -8,6 +8,7 @@ public class KafkaProducerConfig
{
public ProducerConfig? ProducerConfig { get; set; }
public string Topic { get; set; } = default!;
public int? ProducerTimeoutInMs { get; set; }
}

public static class KafkaProducerConfigExtensions
Expand Down
4 changes: 4 additions & 0 deletions Core.Marten/MartenConfig.cs
Expand Up @@ -88,6 +88,10 @@ public static class MartenConfigExtensions
nonPublicMembersStorage: NonPublicMembersStorage.All
);

options.Projections.Errors.SkipApplyErrors = false;
options.Projections.Errors.SkipSerializationErrors = false;
options.Projections.Errors.SkipUnknownEvents = false;

options.Projections.Add(
new MartenSubscription(
new[]
Expand Down
10 changes: 3 additions & 7 deletions Marten.Integration.Tests/Commands/MartenAsyncCommandBusTests.cs
Expand Up @@ -17,7 +17,7 @@

namespace Marten.Integration.Tests.Commands;

public class MartenAsyncCommandBusTests: MartenTest
public class MartenAsyncCommandBusTests(MartenFixture fixture): MartenTest(fixture.PostgreSqlContainer, true)
{
[Fact]
public async Task CommandIsStoredInMartenAndForwardedToCommandHandler()
Expand Down Expand Up @@ -80,20 +80,16 @@ public override async Task InitializeAsync()
var serviceProvider = services.BuildServiceProvider();
var session = serviceProvider.GetRequiredService<IDocumentSession>();

asyncDaemon = serviceProvider.GetRequiredService<ProjectionCoordinator>();
asyncDaemon = serviceProvider.GetRequiredService<IProjectionCoordinator>();

martenAsyncCommandBus = new MartenAsyncCommandBus(session);
}

private MartenAsyncCommandBus martenAsyncCommandBus = default!;
private readonly List<Guid> userIds = new();
private readonly EventListener eventListener = new();
private ProjectionCoordinator asyncDaemon = default!;
private IProjectionCoordinator asyncDaemon = default!;
private readonly CancellationToken ct = new CancellationTokenSource().Token;

public MartenAsyncCommandBusTests(MartenFixture fixture) : base(fixture.PostgreSqlContainer, true)
{
}
}

public record AddUser(Guid UserId, string? Sth = default);
Expand Down
Expand Up @@ -6,17 +6,13 @@

namespace Helpdesk.Api.Core.Kafka;

public class KafkaProducer: IProjection
public class KafkaProducer(IConfiguration configuration): IProjection
{
private const string DefaultConfigKey = "KafkaProducer";

private readonly KafkaProducerConfig config;

public KafkaProducer(IConfiguration configuration)
{
config = configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();
}
private readonly KafkaProducerConfig config =
configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
Expand All @@ -36,14 +32,16 @@ private async Task Publish(object @event, CancellationToken ct)
{
using var producer = new ProducerBuilder<string, string>(config.ProducerConfig).Build();

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(config.ProducerTimeoutInMs ?? 1000));

await producer.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = JsonSerializer.Serialize(@event)
}, ct).ConfigureAwait(false);
}, cts.Token).ConfigureAwait(false);
}
catch (Exception exc)
{
Expand All @@ -57,4 +55,6 @@ public class KafkaProducerConfig
{
public ProducerConfig? ProducerConfig { get; set; }
public string? Topic { get; set; }

public int? ProducerTimeoutInMs { get; set; }
}
Expand Up @@ -45,9 +45,9 @@ DateTimeOffset now
: [];

[AggregateHandler]
public static Events Handle(TimeoutIncidentsBatchResolution command, IncidentsBatchResolution batch,
public static Events Handle(TimeoutIncidentsBatchResolution command, IncidentsBatchResolution? batch,
DateTimeOffset now) =>
batch.Status != ResolutionStatus.Pending
batch != null && batch.Status != ResolutionStatus.Pending
? [new IncidentsBatchResolutionResolutionTimedOut(command.IncidentsBatchResolutionId, batch.Incidents, now)]
: [];

Expand Down
9 changes: 9 additions & 0 deletions Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs
Expand Up @@ -50,6 +50,14 @@
casing: Casing.CamelCase
);
options.Projections.Errors.SkipApplyErrors = false;
options.Projections.Errors.SkipSerializationErrors = false;
options.Projections.Errors.SkipUnknownEvents = false;
options.Projections.RebuildErrors.SkipApplyErrors = false;
options.Projections.RebuildErrors.SkipSerializationErrors = false;
options.Projections.RebuildErrors.SkipUnknownEvents = false;
options.Projections.Add(new KafkaProducer(builder.Configuration), ProjectionLifecycle.Async);
options.Projections.Add(
new SignalRProducer((IHubContext)sp.GetRequiredService<IHubContext<IncidentsHub>>()),
Expand All @@ -58,6 +66,7 @@
options.ConfigureIncidents();
options.DisableNpgsqlLogging = true;
return options;
})
.OptimizeArtifactWorkflow(TypeLoadMode.Static)
Expand Down
18 changes: 9 additions & 9 deletions Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs
Expand Up @@ -6,17 +6,13 @@

namespace Helpdesk.Api.Core.Kafka;

public class KafkaProducer: IProjection
public class KafkaProducer(IConfiguration configuration): IProjection
{
private const string DefaultConfigKey = "KafkaProducer";

private readonly KafkaProducerConfig config;

public KafkaProducer(IConfiguration configuration)
{
config = configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();
}
private readonly KafkaProducerConfig config =
configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
Expand All @@ -36,14 +32,16 @@ private async Task Publish(object @event, CancellationToken ct)
{
using var producer = new ProducerBuilder<string, string>(config.ProducerConfig).Build();

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(config.ProducerTimeoutInMs ?? 1000));

await producer.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = JsonSerializer.Serialize(@event)
}, ct).ConfigureAwait(false);
}, cts.Token).ConfigureAwait(false);
}
catch (Exception exc)
{
Expand All @@ -57,4 +55,6 @@ public class KafkaProducerConfig
{
public ProducerConfig? ProducerConfig { get; set; }
public string? Topic { get; set; }

public int? ProducerTimeoutInMs { get; set; }
}
4 changes: 4 additions & 0 deletions Sample/Helpdesk/Helpdesk.Api/Program.cs
Expand Up @@ -44,6 +44,10 @@
options.UseSystemTextJsonForSerialization(EnumStorage.AsString);
options.Projections.Errors.SkipApplyErrors = false;
options.Projections.Errors.SkipSerializationErrors = false;
options.Projections.Errors.SkipUnknownEvents = false;
options.Projections.LiveStreamAggregation<Incident>();
options.Projections.Add<IncidentHistoryTransformation>(ProjectionLifecycle.Inline);
options.Projections.Add<IncidentDetailsProjection>(ProjectionLifecycle.Inline);
Expand Down

0 comments on commit 28eb184

Please sign in to comment.