Skip to content

Commit

Permalink
Changed custom IProjections like KafkaProducer, SignalRProducer, Elas…
Browse files Browse the repository at this point in the history
…ticProjection to subscriptions
  • Loading branch information
oskardudycz committed May 2, 2024
1 parent 935b79a commit 65ce4fb
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 258 deletions.
28 changes: 5 additions & 23 deletions Core.Marten/MartenConfig.cs
Expand Up @@ -57,25 +57,25 @@ public static class MartenConfigExtensions
{
services
.AddScoped<IIdGenerator, MartenIdGenerator>()
.AddMarten(sp => SetStoreOptions(sp, martenConfig, configureOptions))
.AddMarten(options => SetStoreOptions(options, martenConfig, configureOptions))
.UseLightweightSessions()
.ApplyAllDatabaseChangesOnStartup()
//.OptimizeArtifactWorkflow()
.AddAsyncDaemon(martenConfig.DaemonMode);
.AddAsyncDaemon(martenConfig.DaemonMode)
.AddSubscriptionWithServices<MartenEventPublisher>(ServiceLifetime.Scoped);

if (useExternalBus)
services.AddMartenAsyncCommandBus();

return services;
}

private static StoreOptions SetStoreOptions(
IServiceProvider serviceProvider,
private static void SetStoreOptions(
StoreOptions options,
MartenConfig martenConfig,
Action<StoreOptions>? configureOptions = null
)
{
var options = new StoreOptions();
options.Connection(martenConfig.ConnectionString);
options.AutoCreateSchemaObjects = AutoCreate.CreateOrUpdate;

Expand All @@ -92,22 +92,6 @@ public static class MartenConfigExtensions
options.Projections.Errors.SkipSerializationErrors = false;
options.Projections.Errors.SkipUnknownEvents = false;

options.Projections.Add(
new MartenSubscription(
new[]
{
new MartenEventPublisher(
serviceProvider,
serviceProvider.GetRequiredService<IActivityScope>(),
serviceProvider.GetRequiredService<ILogger<MartenEventPublisher>>()
)
},
serviceProvider.GetRequiredService<ILogger<MartenSubscription>>()
),
ProjectionLifecycle.Async,
"MartenSubscription"
);

if (martenConfig.UseMetadata)
{
options.Events.MetadataConfig.CausationIdEnabled = true;
Expand All @@ -116,7 +100,5 @@ public static class MartenConfigExtensions
}

configureOptions?.Invoke(options);

return options;
}
}
97 changes: 53 additions & 44 deletions Core.Marten/Subscriptions/MartenEventPublisher.cs
Expand Up @@ -2,62 +2,71 @@
using Core.OpenTelemetry;
using Marten;
using Marten.Events;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Events.Projections;
using Marten.Subscriptions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace Core.Marten.Subscriptions;

public class MartenEventPublisher: IMartenEventsConsumer
public class MartenEventPublisher(
IServiceProvider serviceProvider,
IActivityScope activityScope,
ILogger<MartenEventPublisher> logger
): SubscriptionBase
{
private readonly IServiceProvider serviceProvider;
private readonly IActivityScope activityScope;
private readonly ILogger<MartenEventPublisher> logger;

public MartenEventPublisher(
IServiceProvider serviceProvider,
IActivityScope activityScope,
ILogger<MartenEventPublisher> logger
)
{
this.serviceProvider = serviceProvider;
this.activityScope = activityScope;
this.logger = logger;
}

public async Task ConsumeAsync(
IDocumentOperations documentOperations,
IReadOnlyList<StreamAction> streamActions,
CancellationToken cancellationToken
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange eventRange,
ISubscriptionController subscriptionController,
IDocumentOperations operations,
CancellationToken token
)
{
foreach (var @event in streamActions.SelectMany(streamAction => streamAction.Events))
var lastProcessed = eventRange.SequenceFloor;
try
{
var parentContext =
TelemetryPropagator.Extract(@event.Headers, ExtractTraceContextFromEventMetadata);
foreach (var @event in eventRange.Events)
{
var parentContext =
TelemetryPropagator.Extract(@event.Headers, ExtractTraceContextFromEventMetadata);

await activityScope.Run($"{nameof(MartenEventPublisher)}/{nameof(ProcessEventsAsync)}",
async (_, ct) =>
{
using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
var eventMetadata = new EventMetadata(
@event.Id.ToString(),
(ulong)@event.Version,
(ulong)@event.Sequence,
parentContext
);
await activityScope.Run($"{nameof(MartenEventPublisher)}/{nameof(ConsumeAsync)}",
async (_, ct) =>
{
using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
await eventBus.Publish(EventEnvelope.From(@event.Data, eventMetadata), ct)
.ConfigureAwait(false);
var eventMetadata = new EventMetadata(
@event.Id.ToString(),
(ulong)@event.Version,
(ulong)@event.Sequence,
parentContext
);
// TODO: you can also differentiate based on the exception
// await controller.RecordDeadLetterEventAsync(e, ex);
},
new StartActivityOptions
{
Tags = { { TelemetryTags.EventHandling.Event, @event.Data.GetType() } },
Parent = parentContext.ActivityContext
},
token
).ConfigureAwait(false);
}

await eventBus.Publish(EventEnvelope.From(@event.Data, eventMetadata), ct)
.ConfigureAwait(false);
},
new StartActivityOptions
{
Tags = { { TelemetryTags.EventHandling.Event, @event.Data.GetType() } },
Parent = parentContext.ActivityContext
},
cancellationToken
).ConfigureAwait(false);
return NullChangeListener.Instance;
}
catch (Exception exc)
{
logger.LogError("Error while processing Marten Subscription: {ExceptionMessage}", exc.Message);
await subscriptionController.ReportCriticalFailureAsync(exc, lastProcessed).ConfigureAwait(false);
throw;
}
}

Expand Down
56 changes: 0 additions & 56 deletions Core.Marten/Subscriptions/MartenSubscription.cs

This file was deleted.

23 changes: 6 additions & 17 deletions Core/Commands/InMemoryCommandBus.cs
Expand Up @@ -5,29 +5,18 @@

namespace Core.Commands;

public class InMemoryCommandBus: ICommandBus
public class InMemoryCommandBus(
IServiceProvider serviceProvider,
IActivityScope activityScope,
IAsyncPolicy retryPolicy
): ICommandBus
{
private readonly IServiceProvider serviceProvider;
private readonly AsyncPolicy retryPolicy;
private readonly IActivityScope activityScope;

public InMemoryCommandBus(
IServiceProvider serviceProvider,
IActivityScope activityScope,
AsyncPolicy retryPolicy
)
{
this.serviceProvider = serviceProvider;
this.retryPolicy = retryPolicy;
this.activityScope = activityScope;
}

public async Task Send<TCommand>(TCommand command, CancellationToken ct = default)
where TCommand : notnull
{
var wasHandled = await TrySend(command, ct).ConfigureAwait(true);

if(!wasHandled)
if (!wasHandled)
throw new InvalidOperationException($"Unable to find handler for command '{command.GetType().Name}'");
}

Expand Down
26 changes: 18 additions & 8 deletions Sample/Helpdesk.Wolverine/Helpdesk.Api/Core/Kafka/KafkaProducer.cs
Expand Up @@ -2,31 +2,38 @@
using Confluent.Kafka;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Subscriptions;

namespace Helpdesk.Api.Core.Kafka;

public class KafkaProducer(IConfiguration configuration): IProjection
public class KafkaProducer(IConfiguration configuration): SubscriptionBase
{
private const string DefaultConfigKey = "KafkaProducer";

private readonly KafkaProducerConfig config =
configuration.GetRequiredSection(DefaultConfigKey).Get<KafkaProducerConfig>() ??
throw new InvalidOperationException();

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange eventRange,
ISubscriptionController subscriptionController,
IDocumentOperations operations,
CancellationToken ct
)
{
foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events))
foreach (var @event in eventRange.Events)
{
await Publish(@event.Data, ct);
await Publish(subscriptionController, @event, ct);
}
return NullChangeListener.Instance;
}

public void Apply(IDocumentOperations operations, IReadOnlyList<StreamAction> streams) =>
throw new NotImplementedException("Producer should be only used in the AsyncDaemon");

private async Task Publish(object @event, CancellationToken ct)
private async Task Publish(ISubscriptionController subscriptionController, IEvent @event, CancellationToken ct)
{
try
{
Expand All @@ -40,11 +47,14 @@ private async Task Publish(object @event, CancellationToken ct)
// store event type name in message Key
Key = @event.GetType().Name,
// serialize event to message Value
Value = JsonSerializer.Serialize(@event)
Value = JsonSerializer.Serialize(@event.Data)
}, cts.Token).ConfigureAwait(false);
}
catch (Exception exc)
{
await subscriptionController.ReportCriticalFailureAsync(exc, @event.Sequence);
// TODO: you can also differentiate based on the exception
// await subscriptionController.RecordDeadLetterEventAsync(@event, exc);
Console.WriteLine(exc.Message);
throw;
}
Expand Down
@@ -1,27 +1,33 @@
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Internals;
using Marten.Subscriptions;
using Microsoft.AspNetCore.SignalR;

namespace Helpdesk.Api.Core.SignalR;

public class SignalRProducer: IProjection
public class SignalRProducer<THub>(IHubContext<THub> hubContext): SubscriptionBase where THub : Hub
{
private readonly IHubContext hubContext;

public SignalRProducer(IHubContext hubContext) =>
this.hubContext = hubContext;

public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<StreamAction> streamsActions,
CancellationToken ct)
public override async Task<IChangeListener> ProcessEventsAsync(
EventRange eventRange,
ISubscriptionController subscriptionController,
IDocumentOperations operations,
CancellationToken ct
)
{
foreach (var @event in streamsActions.SelectMany(streamAction => streamAction.Events))
foreach (var @event in eventRange.Events)
{
await hubContext.Clients.All.SendAsync(@event.EventTypeName, @event.Data, ct);
try
{
await hubContext.Clients.All.SendAsync(@event.EventTypeName, @event.Data, ct);
}
catch (Exception exc)
{
// this is fine to put event to dead letter queue, as it's just SignalR notification
await subscriptionController.RecordDeadLetterEventAsync(@event, exc);
}
}
}

public void Apply(IDocumentOperations operations, IReadOnlyList<StreamAction> streams) =>
throw new NotImplementedException("Producer should be only used in the AsyncDaemon");
return NullChangeListener.Instance;
}
}

0 comments on commit 65ce4fb

Please sign in to comment.