diff --git a/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs b/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs index 8eb2ac83c..04fc8be6e 100644 --- a/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs +++ b/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs @@ -68,13 +68,13 @@ await foreach (var @event in subscription) await HandleEvent(@event, ct).ConfigureAwait(false); } } - catch (OperationCanceledException) + catch (RpcException rpcException) when (rpcException is { StatusCode: StatusCode.Cancelled } || + rpcException.InnerException is ObjectDisposedException) { - logger.LogWarning("Subscription was canceled."); - } - catch (ObjectDisposedException) - { - logger.LogWarning("Subscription was canceled by the user."); + logger.LogWarning( + "Subscription to all '{SubscriptionId}' dropped by client", + SubscriptionId + ); } catch (Exception ex) { diff --git a/Core.Kafka/Producers/KafkaProducer.cs b/Core.Kafka/Producers/KafkaProducer.cs index ef93ba612..deebdfb11 100644 --- a/Core.Kafka/Producers/KafkaProducer.cs +++ b/Core.Kafka/Producers/KafkaProducer.cs @@ -11,23 +11,15 @@ namespace Core.Kafka.Producers; -public class KafkaProducer: IExternalEventProducer +public class KafkaProducer( + IConfiguration configuration, + IActivityScope activityScope, + ILogger logger +): IExternalEventProducer { - private readonly IActivityScope activityScope; - private readonly ILogger logger; - private readonly KafkaProducerConfig config; + private readonly KafkaProducerConfig config = configuration.GetKafkaProducerConfig(); - public KafkaProducer( - IConfiguration configuration, - IActivityScope activityScope, - ILogger logger - ) - { - this.activityScope = activityScope; - this.logger = logger; - // get configuration from appsettings.json - config = configuration.GetKafkaProducerConfig(); - } + // get configuration from appsettings.json public async Task Publish(IEventEnvelope @event, CancellationToken token) { @@ -39,6 +31,9 @@ public async Task Publish(IEventEnvelope @event, CancellationToken token) using var p = new ProducerBuilder(config.ProducerConfig).Build(); // publish event to kafka topic taken from config + using var cts = + new CancellationTokenSource(TimeSpan.FromMilliseconds(config.ProducerTimeoutInMs ?? 1000)); + await p.ProduceAsync(config.Topic, new Message { @@ -46,7 +41,7 @@ public async Task Publish(IEventEnvelope @event, CancellationToken token) Key = @event.Data.GetType().Name, // serialize event to message Value Value = @event.ToJson(new PropagationContextJsonConverter()) - }, ct).ConfigureAwait(false); + }, cts.Token).ConfigureAwait(false); }, new StartActivityOptions { diff --git a/Core.Kafka/Producers/KafkaProducerConfig.cs b/Core.Kafka/Producers/KafkaProducerConfig.cs index 39d0f36eb..ee016d2a8 100644 --- a/Core.Kafka/Producers/KafkaProducerConfig.cs +++ b/Core.Kafka/Producers/KafkaProducerConfig.cs @@ -8,6 +8,7 @@ public class KafkaProducerConfig { public ProducerConfig? ProducerConfig { get; set; } public string Topic { get; set; } = default!; + public int? ProducerTimeoutInMs { get; set; } } public static class KafkaProducerConfigExtensions diff --git a/Core.Marten/MartenConfig.cs b/Core.Marten/MartenConfig.cs index a2ffd509c..9114b9fb6 100644 --- a/Core.Marten/MartenConfig.cs +++ b/Core.Marten/MartenConfig.cs @@ -88,6 +88,10 @@ public static class MartenConfigExtensions nonPublicMembersStorage: NonPublicMembersStorage.All ); + options.Projections.Errors.SkipApplyErrors = false; + options.Projections.Errors.SkipSerializationErrors = false; + options.Projections.Errors.SkipUnknownEvents = false; + options.Projections.Add( new MartenSubscription( new[] diff --git a/Marten.Integration.Tests/Commands/MartenAsyncCommandBusTests.cs b/Marten.Integration.Tests/Commands/MartenAsyncCommandBusTests.cs index 409d083a0..0faca21bd 100644 --- a/Marten.Integration.Tests/Commands/MartenAsyncCommandBusTests.cs +++ b/Marten.Integration.Tests/Commands/MartenAsyncCommandBusTests.cs @@ -17,7 +17,7 @@ namespace Marten.Integration.Tests.Commands; -public class MartenAsyncCommandBusTests: MartenTest +public class MartenAsyncCommandBusTests(MartenFixture fixture): MartenTest(fixture.PostgreSqlContainer, true) { [Fact] public async Task CommandIsStoredInMartenAndForwardedToCommandHandler() @@ -80,7 +80,7 @@ public override async Task InitializeAsync() var serviceProvider = services.BuildServiceProvider(); var session = serviceProvider.GetRequiredService(); - asyncDaemon = serviceProvider.GetRequiredService(); + asyncDaemon = serviceProvider.GetRequiredService(); martenAsyncCommandBus = new MartenAsyncCommandBus(session); } @@ -88,12 +88,8 @@ public override async Task InitializeAsync() private MartenAsyncCommandBus martenAsyncCommandBus = default!; private readonly List userIds = new(); private readonly EventListener eventListener = new(); - private ProjectionCoordinator asyncDaemon = default!; + private IProjectionCoordinator asyncDaemon = default!; private readonly CancellationToken ct = new CancellationTokenSource().Token; - - public MartenAsyncCommandBusTests(MartenFixture fixture) : base(fixture.PostgreSqlContainer, true) - { - } } public record AddUser(Guid UserId, string? Sth = default); diff --git a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs index d4c28773e..2674bc1d7 100644 --- a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs +++ b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs @@ -6,17 +6,13 @@ namespace Helpdesk.Api.Core.Kafka; -public class KafkaProducer: IProjection +public class KafkaProducer(IConfiguration configuration): IProjection { private const string DefaultConfigKey = "KafkaProducer"; - private readonly KafkaProducerConfig config; - - public KafkaProducer(IConfiguration configuration) - { - config = configuration.GetRequiredSection(DefaultConfigKey).Get() ?? - throw new InvalidOperationException(); - } + private readonly KafkaProducerConfig config = + configuration.GetRequiredSection(DefaultConfigKey).Get() ?? + throw new InvalidOperationException(); public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList streamsActions, CancellationToken ct) @@ -36,6 +32,8 @@ private async Task Publish(object @event, CancellationToken ct) { using var producer = new ProducerBuilder(config.ProducerConfig).Build(); + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(config.ProducerTimeoutInMs ?? 1000)); + await producer.ProduceAsync(config.Topic, new Message { @@ -43,7 +41,7 @@ private async Task Publish(object @event, CancellationToken ct) Key = @event.GetType().Name, // serialize event to message Value Value = JsonSerializer.Serialize(@event) - }, ct).ConfigureAwait(false); + }, cts.Token).ConfigureAwait(false); } catch (Exception exc) { @@ -57,4 +55,6 @@ public class KafkaProducerConfig { public ProducerConfig? ProducerConfig { get; set; } public string? Topic { get; set; } + + public int? ProducerTimeoutInMs { get; set; } } diff --git a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Incidents/ResolutionBatch/IncidentsBatchResolutionHandler.cs b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Incidents/ResolutionBatch/IncidentsBatchResolutionHandler.cs index 20879bd90..1ce8116b6 100644 --- a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Incidents/ResolutionBatch/IncidentsBatchResolutionHandler.cs +++ b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Incidents/ResolutionBatch/IncidentsBatchResolutionHandler.cs @@ -45,9 +45,9 @@ DateTimeOffset now : []; [AggregateHandler] - public static Events Handle(TimeoutIncidentsBatchResolution command, IncidentsBatchResolution batch, + public static Events Handle(TimeoutIncidentsBatchResolution command, IncidentsBatchResolution? batch, DateTimeOffset now) => - batch.Status != ResolutionStatus.Pending + batch != null && batch.Status != ResolutionStatus.Pending ? [new IncidentsBatchResolutionResolutionTimedOut(command.IncidentsBatchResolutionId, batch.Incidents, now)] : []; diff --git a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs index 931036f7d..0f355fbbf 100644 --- a/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs +++ b/Sample/Helpdesk.Wolverine/Helpdesk.Api/Program.cs @@ -50,6 +50,14 @@ casing: Casing.CamelCase ); + options.Projections.Errors.SkipApplyErrors = false; + options.Projections.Errors.SkipSerializationErrors = false; + options.Projections.Errors.SkipUnknownEvents = false; + + options.Projections.RebuildErrors.SkipApplyErrors = false; + options.Projections.RebuildErrors.SkipSerializationErrors = false; + options.Projections.RebuildErrors.SkipUnknownEvents = false; + options.Projections.Add(new KafkaProducer(builder.Configuration), ProjectionLifecycle.Async); options.Projections.Add( new SignalRProducer((IHubContext)sp.GetRequiredService>()), @@ -58,6 +66,7 @@ options.ConfigureIncidents(); + options.DisableNpgsqlLogging = true; return options; }) .OptimizeArtifactWorkflow(TypeLoadMode.Static) diff --git a/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs b/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs index d4c28773e..2674bc1d7 100644 --- a/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs +++ b/Sample/Helpdesk/Helpdesk.Api/Core/Kafka/KafkaProducer.cs @@ -6,17 +6,13 @@ namespace Helpdesk.Api.Core.Kafka; -public class KafkaProducer: IProjection +public class KafkaProducer(IConfiguration configuration): IProjection { private const string DefaultConfigKey = "KafkaProducer"; - private readonly KafkaProducerConfig config; - - public KafkaProducer(IConfiguration configuration) - { - config = configuration.GetRequiredSection(DefaultConfigKey).Get() ?? - throw new InvalidOperationException(); - } + private readonly KafkaProducerConfig config = + configuration.GetRequiredSection(DefaultConfigKey).Get() ?? + throw new InvalidOperationException(); public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList streamsActions, CancellationToken ct) @@ -36,6 +32,8 @@ private async Task Publish(object @event, CancellationToken ct) { using var producer = new ProducerBuilder(config.ProducerConfig).Build(); + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(config.ProducerTimeoutInMs ?? 1000)); + await producer.ProduceAsync(config.Topic, new Message { @@ -43,7 +41,7 @@ private async Task Publish(object @event, CancellationToken ct) Key = @event.GetType().Name, // serialize event to message Value Value = JsonSerializer.Serialize(@event) - }, ct).ConfigureAwait(false); + }, cts.Token).ConfigureAwait(false); } catch (Exception exc) { @@ -57,4 +55,6 @@ public class KafkaProducerConfig { public ProducerConfig? ProducerConfig { get; set; } public string? Topic { get; set; } + + public int? ProducerTimeoutInMs { get; set; } } diff --git a/Sample/Helpdesk/Helpdesk.Api/Program.cs b/Sample/Helpdesk/Helpdesk.Api/Program.cs index 6b2f9ee18..42f395e45 100644 --- a/Sample/Helpdesk/Helpdesk.Api/Program.cs +++ b/Sample/Helpdesk/Helpdesk.Api/Program.cs @@ -44,6 +44,10 @@ options.UseSystemTextJsonForSerialization(EnumStorage.AsString); + options.Projections.Errors.SkipApplyErrors = false; + options.Projections.Errors.SkipSerializationErrors = false; + options.Projections.Errors.SkipUnknownEvents = false; + options.Projections.LiveStreamAggregation(); options.Projections.Add(ProjectionLifecycle.Inline); options.Projections.Add(ProjectionLifecycle.Inline);