Skip to content

Commit

Permalink
Added ScrutR to make decorators easier
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Dec 8, 2022
1 parent b075b07 commit 7d9bd9e
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 66 deletions.
28 changes: 18 additions & 10 deletions Core.EventStoreDB/Repository/Config.cs
@@ -1,4 +1,5 @@
using Core.Aggregates;
using Core.OpenTelemetry;
using Core.OptimisticConcurrency;
using Microsoft.Extensions.DependencyInjection;

Expand All @@ -8,26 +9,33 @@ public static class Config
{
public static IServiceCollection AddEventStoreDBRepository<T>(
this IServiceCollection services,
bool withAppendScope = true
bool withAppendScope = true,
bool withTelemetry = true
) where T : class, IAggregate
{
services.AddScoped<EventStoreDBRepository<T>, EventStoreDBRepository<T>>();
services.AddScoped<IEventStoreDBRepository<T>, EventStoreDBRepository<T>>();

if (!withAppendScope)
if (withAppendScope)
{
services.AddScoped<IEventStoreDBRepository<T>, EventStoreDBRepository<T>>();
}
else
{
services.AddScoped<IEventStoreDBRepository<T>, EventStoreDBRepositoryWithETagDecorator<T>>(
sp => new EventStoreDBRepositoryWithETagDecorator<T>(
sp.GetRequiredService<EventStoreDBRepository<T>>(),
services.Decorate<IEventStoreDBRepository<T>>(
(inner, sp) => new EventStoreDBRepositoryWithETagDecorator<T>(
inner,
sp.GetRequiredService<IExpectedResourceVersionProvider>(),
sp.GetRequiredService<INextResourceVersionProvider>()
)
);
}

if (withTelemetry)
{
services.Decorate<IEventStoreDBRepository<T>>(
(inner, sp) => new EventStoreDBRepositoryWithTelemetryDecorator<T>(
inner,
sp.GetRequiredService<IActivityScope>()
)
);
}

return services;
}
}
Expand Down
75 changes: 27 additions & 48 deletions Core.EventStoreDB/Repository/EventStoreDBRepository.cs
Expand Up @@ -36,63 +36,42 @@ IActivityScope activityScope
cancellationToken
);

public Task<ulong> Add(T aggregate, CancellationToken token = default) =>
activityScope.Run($"{typeof(EventStoreDBRepository<T>).Name}/{nameof(Add)}",
async (activity, ct) =>
{
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
StreamState.NoStream,
GetEventsToStore(aggregate, TelemetryPropagator.GetPropagationContext(activity)),
cancellationToken: ct
).ConfigureAwait(false);
return result.NextExpectedStreamRevision.ToUInt64();
},
token
);
public async Task<ulong> Add(T aggregate, CancellationToken ct = default)
{
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
StreamState.NoStream,
GetEventsToStore(aggregate),
cancellationToken: ct
).ConfigureAwait(false);

public Task<ulong> Update(T aggregate, ulong? expectedRevision = null, CancellationToken token = default) =>
activityScope.Run($"{typeof(EventStoreDBRepository<T>).Name}/{nameof(Update)}",
async (activity, ct) =>
{
var eventsToAppend = GetEventsToStore(aggregate, TelemetryPropagator.GetPropagationContext(activity));
var nextVersion = expectedRevision ?? (ulong)(aggregate.Version - eventsToAppend.Count);
return result.NextExpectedStreamRevision.ToUInt64();
}

var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
nextVersion,
eventsToAppend,
cancellationToken: ct
).ConfigureAwait(false);
return result.NextExpectedStreamRevision.ToUInt64();
},
token
);
public async Task<ulong> Update(T aggregate, ulong? expectedRevision = null, CancellationToken ct = default)
{
var eventsToAppend = GetEventsToStore(aggregate);
var nextVersion = expectedRevision ?? (ulong)(aggregate.Version - eventsToAppend.Count);

public Task<ulong> Delete(T aggregate, ulong? expectedRevision = null, CancellationToken token = default) =>
activityScope.Run($"{typeof(EventStoreDBRepository<T>).Name}/{nameof(Delete)}",
async (activity, ct) =>
{
var eventsToAppend = GetEventsToStore(aggregate, TelemetryPropagator.GetPropagationContext(activity));
var nextVersion = expectedRevision ?? (ulong)(aggregate.Version - eventsToAppend.Count);
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
nextVersion,
eventsToAppend,
cancellationToken: ct
).ConfigureAwait(false);

var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
nextVersion,
eventsToAppend,
cancellationToken: ct
).ConfigureAwait(false);
return result.NextExpectedStreamRevision.ToUInt64();
},
token
);
return result.NextExpectedStreamRevision.ToUInt64();
}

public Task<ulong> Delete(T aggregate, ulong? expectedRevision = null, CancellationToken ct = default) =>
Update(aggregate, expectedRevision, ct);

private static List<EventData> GetEventsToStore(T aggregate, PropagationContext? propagationContext)
private static List<EventData> GetEventsToStore(T aggregate)
{
var events = aggregate.DequeueUncommittedEvents();

return events
.Select(@event => @event.ToJsonEventData(propagationContext))
.Select(@event => @event.ToJsonEventData(TelemetryPropagator.GetPropagationContext()))
.ToList();
}
}
@@ -0,0 +1,45 @@
using Core.Aggregates;
using Core.OpenTelemetry;
using Microsoft.Extensions.Logging;

namespace Core.EventStoreDB.Repository;

public class EventStoreDBRepositoryWithTelemetryDecorator<T>: IEventStoreDBRepository<T>
where T : class, IAggregate
{
private readonly IEventStoreDBRepository<T> inner;
private readonly IActivityScope activityScope;

public EventStoreDBRepositoryWithTelemetryDecorator(
IEventStoreDBRepository<T> inner,
IActivityScope activityScope
)
{
this.inner = inner;
this.activityScope = activityScope;
}

public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
inner.Find(id, cancellationToken);

public Task<ulong> Add(T aggregate, CancellationToken cancellationToken = default) =>
activityScope.Run($"EventStoreDBRepository/{nameof(Add)}",
(_, ct) => inner.Add(aggregate, ct),
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
cancellationToken
);

public Task<ulong> Update(T aggregate, ulong? expectedVersion = null, CancellationToken token = default) =>
activityScope.Run($"EventStoreDBRepository/{nameof(Update)}",
(_, ct) => inner.Update(aggregate, expectedVersion, ct),
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
token
);

public Task<ulong> Delete(T aggregate, ulong? expectedVersion = null, CancellationToken token = default) =>
activityScope.Run($"EventStoreDBRepository/{nameof(Delete)}",
(_, ct) => inner.Delete(aggregate, expectedVersion, ct),
new StartActivityOptions { Tags = { { TelemetryTags.Logic.Entity, typeof(T).Name } } },
token
);
}
14 changes: 6 additions & 8 deletions Core.Marten/Repository/Config.cs
Expand Up @@ -18,21 +18,19 @@ public static class Config
services.AddScoped<IMartenRepository<T>, MartenRepository<T>>();

if (withAppendScope)
{
services.AddScoped<IMartenRepository<T>, MartenRepositoryWithETagDecorator<T>>(
sp => new MartenRepositoryWithETagDecorator<T>(
sp.GetRequiredService<IMartenRepository<T>>(),
services.Decorate<IMartenRepository<T>>(
(inner, sp) => new MartenRepositoryWithETagDecorator<T>(
inner,
sp.GetRequiredService<IExpectedResourceVersionProvider>(),
sp.GetRequiredService<INextResourceVersionProvider>()
)
);
}

if (withTelemetry)
{
services.AddScoped<IMartenRepository<T>, MartenRepositoryWithTracingDecorator<T>>(
sp => new MartenRepositoryWithTracingDecorator<T>(
sp.GetRequiredService<IMartenRepository<T>>(),
services.Decorate<IMartenRepository<T>>(
(inner, sp) => new MartenRepositoryWithTracingDecorator<T>(
inner,
sp.GetRequiredService<IDocumentSession>(),
sp.GetRequiredService<IActivityScope>(),
sp.GetRequiredService<ILogger<MartenRepositoryWithTracingDecorator<T>>>()
Expand Down
1 change: 1 addition & 0 deletions Core/Core.csproj
Expand Up @@ -17,6 +17,7 @@
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.0.0-rc9.9" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.0.0-rc9.9" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.3.1" />
<PackageReference Include="Scrutor" Version="4.2.0" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 7d9bd9e

Please sign in to comment.