Skip to content

Commit

Permalink
More small todos (#4013)
Browse files Browse the repository at this point in the history
* It should be fine to return more headers

* Remove cancellation token TODO

* Remove workaround that is no longer needed with NSB v9

* Get rid of weird abstraction that adds no value

* Readonly

* For now add a comment until we know more from RavenDB.

* It should be fine to return more headers

* This problem existed before and we didn't really make it worse. Moving on

* Switch to content result and explicit header and status for now (SP adjusted)

* Connection Details parsing is case insensitive with PropertyNameCaseInsensitive = true in the platform connector package

* Honestly our caching approach probably requires an overhaul anyway
  • Loading branch information
danielmarbach committed Mar 18, 2024
1 parent 9d647e2 commit d854f29
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 139 deletions.
Expand Up @@ -5,40 +5,26 @@
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing;

class InMemoryFailedAuditStorage : IFailedAuditStorage
class InMemoryFailedAuditStorage(InMemoryAuditDataStore dataStore) : IFailedAuditStorage
{
public InMemoryFailedAuditStorage(InMemoryAuditDataStore dataStore) => this.dataStore = dataStore;

public async Task ProcessFailedMessages(Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage, CancellationToken cancellationToken)
{
foreach (var failedMessage in dataStore.failedAuditImports)
{
FailedTransportMessage transportMessage = failedMessage.Message;

await onMessage(transportMessage, (_) => { return Task.CompletedTask; }, cancellationToken);
await onMessage(transportMessage, _ => Task.CompletedTask, cancellationToken);
}

dataStore.failedAuditImports.Clear();
}

public Task Store(dynamic failure)
{
dataStore.failedAuditImports.Add(failure);

return Task.CompletedTask;
}

public Task SaveFailedAuditImport(FailedAuditImport message)
{
dataStore.failedAuditImports.Add(message);
return Task.CompletedTask;
}

public Task<int> GetFailedAuditsCount()
{
return Task.FromResult(dataStore.failedAuditImports.Count);
}

InMemoryAuditDataStore dataStore;
public Task<int> GetFailedAuditsCount() => Task.FromResult(dataStore.failedAuditImports.Count);
}
}
Expand Up @@ -10,79 +10,56 @@
using Raven.Client.Documents.Commands;
using Raven.Client.Documents.Commands.Batches;

class RavenFailedAuditStorage : IFailedAuditStorage
class RavenFailedAuditStorage(IRavenSessionProvider sessionProvider) : IFailedAuditStorage
{
public RavenFailedAuditStorage(IRavenSessionProvider sessionProvider)
{
this.sessionProvider = sessionProvider;
}

public async Task Store(dynamic failure)
{
using (var session = sessionProvider.OpenSession())
{
await session.StoreAsync(failure);

await session.SaveChangesAsync();
}
}

public async Task SaveFailedAuditImport(FailedAuditImport message)
{
using (var session = sessionProvider.OpenSession())
{
await session.StoreAsync(message);
await session.SaveChangesAsync();
}
using var session = sessionProvider.OpenSession();
await session.StoreAsync(message);
await session.SaveChangesAsync();
}

public async Task ProcessFailedMessages(
Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage,
CancellationToken cancellationToken)
{
using (var session = sessionProvider.OpenSession())
{
var query = session.Query<FailedAuditImport, FailedAuditImportIndex>();
using var session = sessionProvider.OpenSession();
var query = session.Query<FailedAuditImport, FailedAuditImportIndex>();

IAsyncEnumerator<StreamResult<FailedAuditImport>> stream = default;
try
IAsyncEnumerator<StreamResult<FailedAuditImport>> stream = default;
try
{
stream = await session.Advanced.StreamAsync(query, cancellationToken);
while (!cancellationToken.IsCancellationRequested &&
await stream.MoveNextAsync())
{
stream = await session.Advanced.StreamAsync(query, cancellationToken);
while (!cancellationToken.IsCancellationRequested &&
await stream.MoveNextAsync())
{
FailedTransportMessage transportMessage = stream.Current.Document.Message;
var localSession = session;

await onMessage(transportMessage, (token) =>
{
localSession.Advanced.Defer(
new DeleteCommandData(stream.Current.Id, stream.Current.ChangeVector));
return Task.CompletedTask;
}, cancellationToken);
}
FailedTransportMessage transportMessage = stream.Current.Document.Message;
var localSession = session;

await session.SaveChangesAsync(cancellationToken);
await onMessage(transportMessage, (token) =>
{
localSession.Advanced.Defer(
new DeleteCommandData(stream.Current.Id, stream.Current.ChangeVector));
return Task.CompletedTask;
}, cancellationToken);
}
finally

await session.SaveChangesAsync(cancellationToken);
}
finally
{
if (stream != null)
{
if (stream != null)
{
await stream.DisposeAsync();
}
await stream.DisposeAsync();
}
}
}

public async Task<int> GetFailedAuditsCount()
{
using (var session = sessionProvider.OpenSession())
{
return await session.Query<FailedAuditImport, FailedAuditImportIndex>()
.CountAsync();
}
using var session = sessionProvider.OpenSession();
return await session.Query<FailedAuditImport, FailedAuditImportIndex>()
.CountAsync();
}

readonly IRavenSessionProvider sessionProvider;
}
}
Expand Up @@ -2,19 +2,10 @@
{
using Raven.Client.Documents.Session;

class RavenSessionProvider : IRavenSessionProvider
class RavenSessionProvider(IRavenDocumentStoreProvider documentStoreProvider) : IRavenSessionProvider
{
public RavenSessionProvider(IRavenDocumentStoreProvider documentStoreProvider)
{
this.documentStoreProvider = documentStoreProvider;
}

public IAsyncDocumentSession OpenSession()
{
return documentStoreProvider.GetDocumentStore()
public IAsyncDocumentSession OpenSession() =>
documentStoreProvider.GetDocumentStore()
.OpenAsyncSession();
}

readonly IRavenDocumentStoreProvider documentStoreProvider;
}
}
2 changes: 0 additions & 2 deletions src/ServiceControl.Audit.Persistence/IFailedAuditStorage.cs
Expand Up @@ -7,8 +7,6 @@

public interface IFailedAuditStorage
{
Task Store(dynamic failure);

Task SaveFailedAuditImport(FailedAuditImport message);

Task ProcessFailedMessages(
Expand Down
10 changes: 1 addition & 9 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Expand Up @@ -53,7 +53,7 @@ class AuditIngestion : IHostedService
FullMode = BoundedChannelFullMode.Wait
});

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, loggingSettings, FailedMessageFactory, OnCriticalError);
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, loggingSettings, OnCriticalError);

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

Expand All @@ -74,14 +74,6 @@ public async Task StopAsync(CancellationToken cancellationToken)
}
}

FailedAuditImport FailedMessageFactory(FailedTransportMessage msg)
{
return new FailedAuditImport
{
Message = msg
};
}

Task OnCriticalError(string failure, Exception exception)
{
logger.Fatal($"OnCriticalError. '{failure}'", exception);
Expand Down
40 changes: 20 additions & 20 deletions src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs
Expand Up @@ -13,16 +13,14 @@

class AuditIngestionFaultPolicy
{
IFailedAuditStorage failedAuditStorage;
string logPath;
Func<FailedTransportMessage, object> messageBuilder;
ImportFailureCircuitBreaker failureCircuitBreaker;
readonly IFailedAuditStorage failedAuditStorage;
readonly string logPath;
readonly ImportFailureCircuitBreaker failureCircuitBreaker;

public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func<FailedTransportMessage, object> messageBuilder, Func<string, Exception, Task> onCriticalError)
public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func<string, Exception, Task> onCriticalError)
{
logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit");
this.failedAuditStorage = failedAuditStorage;
this.messageBuilder = messageBuilder;

failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError);

Expand All @@ -43,17 +41,24 @@ public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, Cancella

Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken)
{
var failure = (dynamic)messageBuilder(new FailedTransportMessage
var failure = new FailedAuditImport
{
Id = errorContext.Message.MessageId,
Headers = errorContext.Message.Headers,
Body = errorContext.Message.Body.ToArray() //TODO Can this be adjusted?
});
Id = Guid.NewGuid().ToString(),
Message = new FailedTransportMessage
{
Id = errorContext.Message.MessageId,
Headers = errorContext.Message.Headers,
// At the moment we are taking a defensive copy of the body to avoid issues with the message body
// buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB
// handles byte[] to ReadOnlyMemory<byte> conversion we might be able to remove this.
Body = errorContext.Message.Body.ToArray()
}
};

return Handle(errorContext.Exception, failure, cancellationToken);
}

async Task Handle(Exception exception, dynamic failure, CancellationToken cancellationToken)
async Task Handle(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken)
{
try
{
Expand All @@ -65,19 +70,14 @@ async Task Handle(Exception exception, dynamic failure, CancellationToken cancel
}
}

#pragma warning disable IDE0060
async Task DoLogging(Exception exception, dynamic failure, CancellationToken cancellationToken)
#pragma warning restore IDE0060
async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken)
{
var id = Guid.NewGuid().ToString();
failure.Id = id;

// Write to storage
await failedAuditStorage.Store(failure);
await failedAuditStorage.SaveFailedAuditImport(failure);

// Write to Log Path
var filePath = Path.Combine(logPath, failure.Id + ".txt");
File.WriteAllText(filePath, exception.ToFriendlyString());
await File.WriteAllTextAsync(filePath, exception.ToFriendlyString(), cancellationToken);

// Write to Event Log
WriteEvent("A message import has failed. A log file has been written to " + filePath);
Expand Down
Expand Up @@ -31,7 +31,6 @@ public void OnResultExecuting(ResultExecutingContext context)

if (ifNoneMatch || ifNotModifiedSince)
{
// TODO previously we were creating a brand new response, now we're adding more headers than before
context.Result = new StatusCodeResult((int)HttpStatusCode.NotModified);
}
}
Expand Down
@@ -1,5 +1,5 @@
{
"settings": {
"Settings": {
"Heartbeats": {
"Enabled": true,
"HeartbeatsQueue": "Particular.ServiceControl",
Expand All @@ -20,5 +20,5 @@
"CustomChecksQueue": "Particular.ServiceControl"
}
},
"errors": []
"Errors": []
}
Expand Up @@ -66,7 +66,6 @@ public async Task Initialize(RunDescriptor run)
}, auditEndpointConfiguration =>
{
var scanner = auditEndpointConfiguration.AssemblyScanner();
// TODO Maybe we need to find a more robust way to do this. For example excluding assemblies we find by convention
var excludedAssemblies = new[]
{
"ServiceControl.Persistence.RavenDB.dll",
Expand Down Expand Up @@ -100,7 +99,6 @@ public async Task Initialize(RunDescriptor run)
primaryEndpointConfiguration =>
{
var scanner = primaryEndpointConfiguration.AssemblyScanner();
// TODO Maybe we need to find a more robust way to do this. For example excluding assemblies we find by convention
var excludedAssemblies = new[]
{
"ServiceControl.Persistence.RavenDB.dll",
Expand Down
7 changes: 5 additions & 2 deletions src/ServiceControl/Connection/ConnectionController.cs
@@ -1,5 +1,7 @@
namespace ServiceControl.Connection
{
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
Expand All @@ -13,11 +15,12 @@ public class ConnectionController(IPlatformConnectionBuilder builder) : Controll
public async Task<IActionResult> GetConnectionDetails()
{
var platformConnectionDetails = await builder.BuildPlatformConnection();
// TODO why do these properties need to be lower cased?
var connectionDetails = new { settings = platformConnectionDetails.ToDictionary(), errors = platformConnectionDetails.Errors };
var connectionDetails = new ConnectionDetails(platformConnectionDetails.ToDictionary(), platformConnectionDetails.Errors);
// by default snake case is used for serialization so we take care of explicitly serializing here
var content = JsonSerializer.Serialize(connectionDetails);
return Content(content, "application/json");
}

public record ConnectionDetails(IDictionary<string, object> Settings, ConcurrentBag<string> Errors);
}
}
Expand Up @@ -12,7 +12,6 @@ public void OnResultExecuting(ResultExecutingContext context)
return;
}

// TODO do we even need to do this
var response = context.HttpContext.Response;
if (!response.Headers.ContainsKey("Cache-Control"))
{
Expand Down
Expand Up @@ -37,7 +37,6 @@ public void OnResultExecuting(ResultExecutingContext context)

if (ifNoneMatch || ifNotModifiedSince)
{
// TODO previously we were creating a brand new response, now we're adding more headers than before
context.Result = new StatusCodeResult((int)HttpStatusCode.NotModified);
}
}
Expand Down

0 comments on commit d854f29

Please sign in to comment.