Skip to content

Commit

Permalink
Fixed ESDB subscription to all
Browse files Browse the repository at this point in the history
Brought back Newtonsoft as default serialiser. 
Adjusted marten test settings to fix isolation
  • Loading branch information
oskardudycz committed Apr 24, 2024
1 parent c7572c7 commit 9946912
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 97 deletions.
115 changes: 23 additions & 92 deletions Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs
Expand Up @@ -23,36 +23,20 @@ public class EventStoreDBSubscriptionToAllOptions
public bool IgnoreDeserializationErrors { get; set; } = true;
}

public class EventStoreDBSubscriptionToAll
public class EventStoreDBSubscriptionToAll(
EventStoreClient eventStoreClient,
EventTypeMapper eventTypeMapper,
IEventBus eventBus,
ISubscriptionCheckpointRepository checkpointRepository,
IActivityScope activityScope,
ILogger<EventStoreDBSubscriptionToAll> logger
)
{
private readonly IEventBus eventBus;
private readonly EventStoreClient eventStoreClient;
private readonly EventTypeMapper eventTypeMapper;
private readonly ISubscriptionCheckpointRepository checkpointRepository;
private readonly IActivityScope activityScope;
private readonly ILogger<EventStoreDBSubscriptionToAll> logger;
private EventStoreDBSubscriptionToAllOptions subscriptionOptions = default!;
private string SubscriptionId => subscriptionOptions.SubscriptionId;
private readonly object resubscribeLock = new();
private CancellationToken cancellationToken;

public EventStoreDBSubscriptionToAll(
EventStoreClient eventStoreClient,
EventTypeMapper eventTypeMapper,
IEventBus eventBus,
ISubscriptionCheckpointRepository checkpointRepository,
IActivityScope activityScope,
ILogger<EventStoreDBSubscriptionToAll> logger
)
{
this.eventBus = eventBus;
this.eventStoreClient = eventStoreClient;
this.eventTypeMapper = eventTypeMapper;
this.checkpointRepository = checkpointRepository;
this.activityScope = activityScope;
this.logger = logger;
}

public async Task SubscribeToAll(EventStoreDBSubscriptionToAllOptions subscriptionOptions, CancellationToken ct)
{
// see: https://github.com/dotnet/runtime/issues/36063
Expand Down Expand Up @@ -84,11 +68,24 @@ await foreach (var @event in subscription)
await HandleEvent(@event, ct).ConfigureAwait(false);
}
}
catch (Exception ex)
catch (OperationCanceledException)
{
HandleDrop(ex);
logger.LogWarning("Subscription was canceled.");
}
catch (ObjectDisposedException)
{
logger.LogWarning("Subscription was canceled by the user.");
}
catch (Exception ex)
{
logger.LogWarning("Subscription was dropped: {ex}", ex);

// Sleep between reconnections to not flood the database or not kill the CPU with infinite loop
// Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time
Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000));

await SubscribeToAll(this.subscriptionOptions, ct).ConfigureAwait(false);
}
}

private async Task HandleEvent(
Expand Down Expand Up @@ -147,72 +144,6 @@ await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.Co
}
}

private void HandleDrop(Exception? exception)
{
if (exception is RpcException { StatusCode: StatusCode.Cancelled })
{
logger.LogWarning(
"Subscription to all '{SubscriptionId}' dropped by client",
SubscriptionId
);

return;
}

logger.LogError(
exception,
"Subscription to all '{SubscriptionId}' dropped with '{StatusCode}'",
SubscriptionId,
(exception as RpcException)?.StatusCode ?? StatusCode.Unknown
);


Resubscribe();
}

private void Resubscribe()
{
// You may consider adding a max resubscribe count if you want to fail process
// instead of retrying until database is up
while (true)
{
var resubscribed = false;
try
{
Monitor.Enter(resubscribeLock);

// No synchronization context is needed to disable synchronization context.
// That enables running asynchronous method not causing deadlocks.
// As this is a background process then we don't need to have async context here.
using (NoSynchronizationContextScope.Enter())
{
#pragma warning disable VSTHRD002
SubscribeToAll(subscriptionOptions, cancellationToken).Wait(cancellationToken);
#pragma warning restore VSTHRD002
}

resubscribed = true;
}
catch (Exception exception)
{
logger.LogWarning(exception,
"Failed to resubscribe to all '{SubscriptionId}' dropped with '{ExceptionMessage}{ExceptionStackTrace}'",
SubscriptionId, exception.Message, exception.StackTrace);
}
finally
{
Monitor.Exit(resubscribeLock);
}

if (resubscribed)
break;

// Sleep between reconnections to not flood the database or not kill the CPU with infinite loop
// Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time
Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000));
}
}

private bool IsEventWithEmptyData(ResolvedEvent resolvedEvent)
{
if (resolvedEvent.Event.Data.Length != 0) return false;
Expand Down
5 changes: 4 additions & 1 deletion Core.Marten/MartenConfig.cs
Expand Up @@ -83,7 +83,10 @@ public static class MartenConfigExtensions
options.Events.DatabaseSchemaName = schemaName ?? martenConfig.WriteModelSchema;
options.DatabaseSchemaName = schemaName ?? martenConfig.ReadModelSchema;

options.UseSystemTextJsonForSerialization(EnumStorage.AsString);
options.UseNewtonsoftForSerialization(
EnumStorage.AsString,
nonPublicMembersStorage: NonPublicMembersStorage.All
);

options.Projections.Add(
new MartenSubscription(
Expand Down
Expand Up @@ -11,7 +11,7 @@
},
"ReadModel_Marten": {
"ConnectionString": "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'",
"WriteModelSchema": "carts_management_write",
"ReadModelSchema": "carts_management_read"
"WriteModelSchema": "esdb_carts_management_write",
"ReadModelSchema": "esdb_carts_management_read"
}
}
Expand Up @@ -13,7 +13,7 @@
},
"ReadModel_Marten": {
"ConnectionString": "PORT = 5432; HOST = localhost; TIMEOUT = 15; POOLING = True; MINPOOLSIZE = 1; MAXPOOLSIZE = 100; COMMANDTIMEOUT = 20; DATABASE = 'postgres'; PASSWORD = 'Password12!'; USER ID = 'postgres'",
"WriteModelSchema": "carts_management_write",
"ReadModelSchema": "carts_management_read"
"WriteModelSchema": "esdb_carts_management_write",
"ReadModelSchema": "esdb_carts_management_read"
}
}

0 comments on commit 9946912

Please sign in to comment.