Skip to content

Commit

Permalink
Added Marten Outbox Pattern with custom Projection (a.k.a Subscriptio…
Browse files Browse the repository at this point in the history
…ns). Plugged it into samples removing calling EventBus from repository, by that getting at-least once processing guarantee.

Updated Marten to latest v5 alpha
Disabled API tests parallelisation until better test setup is provided
  • Loading branch information
oskardudycz committed Mar 3, 2022
1 parent f69ae84 commit dbe5e3b
Show file tree
Hide file tree
Showing 52 changed files with 230 additions and 123 deletions.
2 changes: 1 addition & 1 deletion CQRS.Tests/CQRS.Tests.csproj
Expand Up @@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="MediatR" Version="10.0.1" />
<PackageReference Include="SharpTestsEx" Version="2.0.0" />
Expand Down
1 change: 1 addition & 0 deletions Core.Api.Testing/ApiFixture.cs
Expand Up @@ -48,6 +48,7 @@ public virtual Task DisposeAsync()
return Task.CompletedTask;
}

// TODO: Add Poly here
public async Task<HttpResponseMessage> Get(string path = "", int maxNumberOfRetries = 0,
int retryIntervalInMs = 1000, Func<HttpResponseMessage, ValueTask<bool>>? check = null)
{
Expand Down
2 changes: 1 addition & 1 deletion Core.ElasticSearch/Indices/IndexNameMapper.cs
Expand Up @@ -18,7 +18,7 @@ public static void AddCustomMap(Type streamType, string mappedStreamName)

public static string ToIndexPrefix<TStream>() => ToIndexPrefix(typeof(TStream));

public static string ToIndexPrefix(Type streamType) => Instance.typeNameMap.GetOrAdd(streamType, (_) =>
public static string ToIndexPrefix(Type streamType) => Instance.typeNameMap.GetOrAdd(streamType, _ =>
{
var modulePrefix = streamType.Namespace!.Split(".").First();
return $"{modulePrefix}-{streamType.Name}".ToLower();
Expand Down
Expand Up @@ -23,7 +23,7 @@ public class EventStoreDBSubscriptionToAllOptions

public class EventStoreDBSubscriptionToAll
{
private readonly INoMediatorEventBus noMediatorEventBus;
private readonly INoMediatorEventBus eventBus;
private readonly EventStoreClient eventStoreClient;
private readonly ISubscriptionCheckpointRepository checkpointRepository;
private readonly ILogger<EventStoreDBSubscriptionToAll> logger;
Expand All @@ -34,12 +34,12 @@ public class EventStoreDBSubscriptionToAll

public EventStoreDBSubscriptionToAll(
EventStoreClient eventStoreClient,
INoMediatorEventBus noMediatorEventBus,
INoMediatorEventBus eventBus,
ISubscriptionCheckpointRepository checkpointRepository,
ILogger<EventStoreDBSubscriptionToAll> logger
)
{
this.noMediatorEventBus = noMediatorEventBus ?? throw new ArgumentNullException(nameof(noMediatorEventBus));
this.eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
this.eventStoreClient = eventStoreClient ?? throw new ArgumentNullException(nameof(eventStoreClient));
this.checkpointRepository =
checkpointRepository ?? throw new ArgumentNullException(nameof(checkpointRepository));
Expand Down Expand Up @@ -98,7 +98,7 @@ public async Task SubscribeToAll(EventStoreDBSubscriptionToAllOptions subscripti
}

// publish event to internal event bus
await noMediatorEventBus.Publish(streamEvent, ct);
await eventBus.Publish(streamEvent, ct);

await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.CommitPosition, ct);
}
Expand Down
2 changes: 1 addition & 1 deletion Core.Kafka/Producers/KafkaProducer.cs
Expand Up @@ -26,7 +26,7 @@ public async Task Publish(IExternalEvent @event, CancellationToken cancellationT
using var p = new ProducerBuilder<string, string>(config.ProducerConfig).Build();
await Task.Yield();
// publish event to kafka topic taken from config
var result = await p.ProduceAsync(config.Topic,
await p.ProduceAsync(config.Topic,
new Message<string, string>
{
// store event type name in message Key
Expand Down
56 changes: 22 additions & 34 deletions Core.Marten/Config.cs
@@ -1,9 +1,13 @@
using Baseline.ImTools;
using Core.Events;
using Core.Ids;
using Core.Marten.Ids;
using Core.Marten.OptimisticConcurrency;
using Core.Marten.Subscriptions;
using Core.Threading;
using Marten;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Weasel.Core;
Expand Down Expand Up @@ -37,45 +41,21 @@ public static class MartenConfigExtensions
.AddScoped<IIdGenerator, MartenIdGenerator>()
.AddScoped<MartenOptimisticConcurrencyScope, MartenOptimisticConcurrencyScope>()
.AddScoped<MartenExpectedStreamVersionProvider, MartenExpectedStreamVersionProvider>()
.AddScoped<MartenNextStreamVersionProvider, MartenNextStreamVersionProvider>();

var documentStore = services
.AddMarten(options =>
{
SetStoreOptions(options, martenConfig, configureOptions);
})
.AddAsyncDaemon(DaemonMode.Solo)
.InitializeStore();

SetupSchema(documentStore, martenConfig, 1);
.AddScoped<MartenNextStreamVersionProvider, MartenNextStreamVersionProvider>()
.AddMarten(sp => SetStoreOptions(sp, martenConfig, configureOptions))
.ApplyAllDatabaseChangesOnStartup()
.AddAsyncDaemon(DaemonMode.Solo);

return services;
}

private static void SetupSchema(IDocumentStore documentStore, Config martenConfig, int retryLeft = 1)
{
try
{
if (martenConfig.ShouldRecreateDatabase)
documentStore.Advanced.Clean.CompletelyRemoveAll();

using (NoSynchronizationContextScope.Enter())
{
documentStore.Schema.ApplyAllConfiguredChangesToDatabaseAsync().Wait();
}
}
catch
{
if (retryLeft == 0) throw;

Thread.Sleep(1000);
SetupSchema(documentStore, martenConfig, --retryLeft);
}
}

private static void SetStoreOptions(StoreOptions options, Config config,
Action<StoreOptions>? configureOptions = null)
private static StoreOptions SetStoreOptions(
IServiceProvider serviceProvider,
Config config,
Action<StoreOptions>? configureOptions = null
)
{
var options = new StoreOptions();
options.Connection(config.ConnectionString);
options.AutoCreateSchemaObjects = AutoCreate.CreateOrUpdate;

Expand All @@ -88,6 +68,14 @@ private static void SetupSchema(IDocumentStore documentStore, Config martenConfi
nonPublicMembersStorage: NonPublicMembersStorage.All
);

options.Projections.Add(
new MartenSubscription(new[] { new MartenEventPublisher(serviceProvider) }),
ProjectionLifecycle.Async,
"MartenSubscription"
);

configureOptions?.Invoke(options);

return options;
}
}
2 changes: 1 addition & 1 deletion Core.Marten/Core.Marten.csproj
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="MediatR" Version="10.0.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />
Expand Down
7 changes: 1 addition & 6 deletions Core.Marten/Repository/MartenRepository.cs
Expand Up @@ -15,15 +15,12 @@ public interface IMartenRepository<T> where T : class, IAggregate
public class MartenRepository<T>: IMartenRepository<T> where T : class, IAggregate
{
private readonly IDocumentSession documentSession;
private readonly IEventBus eventBus;

public MartenRepository(
IDocumentSession documentSession,
IEventBus eventBus
IDocumentSession documentSession
)
{
this.documentSession = documentSession;
this.eventBus = eventBus;
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
Expand All @@ -39,7 +36,6 @@ public async Task<long> Add(T aggregate, CancellationToken cancellationToken)
);

await documentSession.SaveChangesAsync(cancellationToken);
await eventBus.Publish(events, cancellationToken);

return events.Length;
}
Expand All @@ -59,7 +55,6 @@ public async Task<long> Update(T aggregate, long? expectedVersion = null, Cancel
);

await documentSession.SaveChangesAsync(cancellationToken);
await eventBus.Publish(events, cancellationToken);

return nextVersion;
}
Expand Down
42 changes: 42 additions & 0 deletions Core.Marten/Subscriptions/MartenEventPublisher.cs
@@ -0,0 +1,42 @@
using Core.Events;
using Marten;
using Marten.Events;
using Microsoft.Extensions.DependencyInjection;
using IEvent = Core.Events.IEvent;

namespace Core.Marten.Subscriptions;

public class MartenEventPublisher: IMartenEventsConsumer
{
private readonly IServiceProvider serviceProvider;

public MartenEventPublisher(
IServiceProvider serviceProvider
)
{
this.serviceProvider = serviceProvider;
}

public async Task ConsumeAsync(IDocumentOperations documentOperations, IReadOnlyList<StreamAction> streamActions,
CancellationToken ct)
{
foreach (var @event in streamActions.SelectMany(streamAction => streamAction.Events))
{
// TODO: align all handlers to use StreamEvent
// var streamEvent = new StreamEvent(
// @event.Data,
// new EventMetadata(
// (ulong)@event.Version,
// (ulong)@event.Sequence
// )
// );

using var scope = serviceProvider.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();

if (@event.Data is not IEvent mappedEvent) continue;

await eventBus.Publish(mappedEvent, ct);
}
}
}
43 changes: 43 additions & 0 deletions Core.Marten/Subscriptions/MartenSubscription.cs
@@ -0,0 +1,43 @@
using Marten;
using Marten.Events;
using Marten.Events.Projections;

namespace Core.Marten.Subscriptions;

public class MartenSubscription: IProjection
{
private readonly IEnumerable<IMartenEventsConsumer> consumers;

public MartenSubscription(IEnumerable<IMartenEventsConsumer> consumers)
{
this.consumers = consumers;
}

public void Apply(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams
) =>
throw new NotImplementedException("Subscriptions should work only in the async scope");

public async Task ApplyAsync(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams,
CancellationToken ct
)
{
foreach (var consumer in consumers)
{
await consumer.ConsumeAsync(operations, streams, ct);
}
}
}


public interface IMartenEventsConsumer
{
Task ConsumeAsync(
IDocumentOperations documentOperations,
IReadOnlyList<StreamAction> streamActions,
CancellationToken ct
);
}
38 changes: 35 additions & 3 deletions Core.Testing/ApiFixture.cs
@@ -1,8 +1,10 @@
using System.Linq.Expressions;
using Core.Api.Testing;
using Core.Commands;
using Core.Events;
using Core.Events.External;
using Core.Requests;
using FluentAssertions;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using IEventBus = Core.Events.IEventBus;
Expand All @@ -16,7 +18,7 @@ public abstract class ApiWithEventsFixture<TStartup>: ApiFixture<TStartup> where
private readonly DummyExternalCommandBus externalCommandBus = new();

public override TestContext CreateTestContext() =>
new TestContext<TStartup>(GetConfiguration, (services) =>
new TestContext<TStartup>(GetConfiguration, services =>
{
SetupServices?.Invoke(services);
services.AddSingleton(eventsLog);
Expand Down Expand Up @@ -46,10 +48,40 @@ public async Task PublishInternalEvent(IEvent @event, CancellationToken ct = def
await eventBus.Publish(@event, ct);
}

public IReadOnlyCollection<TEvent> PublishedInternalEventsOfType<TEvent>()
public IReadOnlyCollection<TEvent> PublishedInternalEventsOfType<TEvent>() =>
eventsLog.PublishedEvents.OfType<TEvent>().ToList();

// TODO: Add Poly here
public async Task ShouldPublishInternalEventOfType<TEvent>(
Expression<Func<TEvent, bool>> predicate,
int maxNumberOfRetries = 5,
int retryIntervalInMs = 1000)
{
return eventsLog.PublishedEvents.OfType<TEvent>().ToList();
var retryCount = maxNumberOfRetries;
var finished = false;

do
{
try
{
PublishedInternalEventsOfType<TEvent>().Should()
.HaveCount(1)
.And.Contain(predicate);

finished = true;
}
catch
{
if (retryCount == 0)
throw;
}

await Task.Delay(retryIntervalInMs);
retryCount--;
} while (!finished);
}


}

public abstract class ApiWithEventsFixture: ApiFixture
Expand Down
2 changes: 1 addition & 1 deletion Core.Tests/Core.Tests.csproj
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="Microsoft.DotNet.InternalAbstractions" Version="1.0.500-preview2-1-003177" />
<PackageReference Include="SharpTestsEx" Version="2.0.0" />
Expand Down
8 changes: 3 additions & 5 deletions Core.WebApi/Tracing/Correlation/CorrelationIdMiddleware.cs
Expand Up @@ -15,16 +15,14 @@ public class CorrelationIdMiddleware

private readonly RequestDelegate next;
private readonly ILogger<CorrelationIdMiddleware> logger;
private readonly Func<CorrelationId> correlationIdFactory;

public CorrelationIdMiddleware(RequestDelegate next, ILogger<CorrelationIdMiddleware> logger, Func<CorrelationId> correlationIdFactory)
public CorrelationIdMiddleware(RequestDelegate next, ILogger<CorrelationIdMiddleware> logger)
{
this.next = next ?? throw new ArgumentNullException(nameof(next));
this.logger = logger;
this.correlationIdFactory = correlationIdFactory;
}

public async Task Invoke(HttpContext context)
public async Task Invoke(HttpContext context, Func<CorrelationId> correlationIdFactory)
{
// get correlation id from header or generate a new one
context.TraceIdentifier =
Expand Down Expand Up @@ -54,8 +52,8 @@ public static class CorrelationIdMiddlewareConfig
{
public static IServiceCollection AddCorrelationIdMiddleware(this IServiceCollection services)
{
services.TryAddScoped<ICorrelationIdFactory, GuidCorrelationIdFactory>();
services.TryAddScoped<Func<CorrelationId>>(sp => sp.GetRequiredService<ICorrelationIdFactory>().New);
services.TryAddScoped<ICorrelationIdFactory, GuidCorrelationIdFactory>();

return services;
}
Expand Down
Expand Up @@ -12,7 +12,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.5.1" />
<PackageReference Include="Marten" Version="5.0.0-alpha.5" />
<PackageReference Include="Marten" Version="5.0.0-alpha.6" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="MediatR" Version="10.0.1" />
<PackageReference Include="Microsoft.DotNet.InternalAbstractions" Version="1.0.500-preview2-1-003177" />
Expand Down

0 comments on commit dbe5e3b

Please sign in to comment.