/
EventStoreDBRepository.cs
77 lines (64 loc) · 2.61 KB
/
EventStoreDBRepository.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
using Core.Aggregates;
using Core.Events;
using Core.EventStoreDB.Events;
using Core.EventStoreDB.Serialization;
using Core.OpenTelemetry;
using EventStore.Client;
using OpenTelemetry.Context.Propagation;
namespace Core.EventStoreDB.Repository;
public interface IEventStoreDBRepository<T> where T : class, IAggregate
{
Task<T?> Find(Guid id, CancellationToken cancellationToken);
Task<ulong> Add(T aggregate, CancellationToken ct = default);
Task<ulong> Update(T aggregate, ulong? expectedRevision = null, CancellationToken ct = default);
Task<ulong> Delete(T aggregate, ulong? expectedRevision = null, CancellationToken ct = default);
}
public class EventStoreDBRepository<T>: IEventStoreDBRepository<T> where T : class, IAggregate
{
private readonly EventStoreClient eventStore;
private readonly IActivityScope activityScope;
public EventStoreDBRepository(
EventStoreClient eventStore,
IActivityScope activityScope
)
{
this.eventStore = eventStore;
this.activityScope = activityScope;
}
public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
eventStore.AggregateStream<T>(
id,
cancellationToken
);
public async Task<ulong> Add(T aggregate, CancellationToken ct = default)
{
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
StreamState.NoStream,
GetEventsToStore(aggregate),
cancellationToken: ct
).ConfigureAwait(false);
return result.NextExpectedStreamRevision.ToUInt64();
}
public async Task<ulong> Update(T aggregate, ulong? expectedRevision = null, CancellationToken ct = default)
{
var eventsToAppend = GetEventsToStore(aggregate);
var nextVersion = expectedRevision ?? (ulong)(aggregate.Version - eventsToAppend.Count);
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(aggregate.Id),
nextVersion,
eventsToAppend,
cancellationToken: ct
).ConfigureAwait(false);
return result.NextExpectedStreamRevision.ToUInt64();
}
public Task<ulong> Delete(T aggregate, ulong? expectedRevision = null, CancellationToken ct = default) =>
Update(aggregate, expectedRevision, ct);
private static List<EventData> GetEventsToStore(T aggregate)
{
var events = aggregate.DequeueUncommittedEvents();
return events
.Select(@event => @event.ToJsonEventData(TelemetryPropagator.GetPropagationContext()))
.ToList();
}
}