Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More small todos #4013

Merged
merged 11 commits into from Mar 18, 2024
Expand Up @@ -5,40 +5,26 @@
using System.Threading.Tasks;
using ServiceControl.Audit.Auditing;

class InMemoryFailedAuditStorage : IFailedAuditStorage
class InMemoryFailedAuditStorage(InMemoryAuditDataStore dataStore) : IFailedAuditStorage
{
public InMemoryFailedAuditStorage(InMemoryAuditDataStore dataStore) => this.dataStore = dataStore;

public async Task ProcessFailedMessages(Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage, CancellationToken cancellationToken)
{
foreach (var failedMessage in dataStore.failedAuditImports)
{
FailedTransportMessage transportMessage = failedMessage.Message;

await onMessage(transportMessage, (_) => { return Task.CompletedTask; }, cancellationToken);
await onMessage(transportMessage, _ => Task.CompletedTask, cancellationToken);
}

dataStore.failedAuditImports.Clear();
}

public Task Store(dynamic failure)
{
dataStore.failedAuditImports.Add(failure);

return Task.CompletedTask;
}

public Task SaveFailedAuditImport(FailedAuditImport message)
{
dataStore.failedAuditImports.Add(message);
return Task.CompletedTask;
}

public Task<int> GetFailedAuditsCount()
{
return Task.FromResult(dataStore.failedAuditImports.Count);
}

InMemoryAuditDataStore dataStore;
public Task<int> GetFailedAuditsCount() => Task.FromResult(dataStore.failedAuditImports.Count);
}
}
Expand Up @@ -10,79 +10,56 @@
using Raven.Client.Documents.Commands;
using Raven.Client.Documents.Commands.Batches;

class RavenFailedAuditStorage : IFailedAuditStorage
class RavenFailedAuditStorage(IRavenSessionProvider sessionProvider) : IFailedAuditStorage
{
public RavenFailedAuditStorage(IRavenSessionProvider sessionProvider)
{
this.sessionProvider = sessionProvider;
}

public async Task Store(dynamic failure)
{
using (var session = sessionProvider.OpenSession())
{
await session.StoreAsync(failure);

await session.SaveChangesAsync();
}
}

public async Task SaveFailedAuditImport(FailedAuditImport message)
{
using (var session = sessionProvider.OpenSession())
{
await session.StoreAsync(message);
await session.SaveChangesAsync();
}
using var session = sessionProvider.OpenSession();
await session.StoreAsync(message);
await session.SaveChangesAsync();
}

public async Task ProcessFailedMessages(
Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage,
CancellationToken cancellationToken)
{
using (var session = sessionProvider.OpenSession())
{
var query = session.Query<FailedAuditImport, FailedAuditImportIndex>();
using var session = sessionProvider.OpenSession();
var query = session.Query<FailedAuditImport, FailedAuditImportIndex>();

IAsyncEnumerator<StreamResult<FailedAuditImport>> stream = default;
try
IAsyncEnumerator<StreamResult<FailedAuditImport>> stream = default;
try
{
stream = await session.Advanced.StreamAsync(query, cancellationToken);
while (!cancellationToken.IsCancellationRequested &&
await stream.MoveNextAsync())
{
stream = await session.Advanced.StreamAsync(query, cancellationToken);
while (!cancellationToken.IsCancellationRequested &&
await stream.MoveNextAsync())
{
FailedTransportMessage transportMessage = stream.Current.Document.Message;
var localSession = session;

await onMessage(transportMessage, (token) =>
{
localSession.Advanced.Defer(
new DeleteCommandData(stream.Current.Id, stream.Current.ChangeVector));
return Task.CompletedTask;
}, cancellationToken);
}
FailedTransportMessage transportMessage = stream.Current.Document.Message;
var localSession = session;

await session.SaveChangesAsync(cancellationToken);
await onMessage(transportMessage, (token) =>
{
localSession.Advanced.Defer(
new DeleteCommandData(stream.Current.Id, stream.Current.ChangeVector));
return Task.CompletedTask;
}, cancellationToken);
}
finally

await session.SaveChangesAsync(cancellationToken);
}
finally
{
if (stream != null)
{
if (stream != null)
{
await stream.DisposeAsync();
}
await stream.DisposeAsync();
}
}
}

public async Task<int> GetFailedAuditsCount()
{
using (var session = sessionProvider.OpenSession())
{
return await session.Query<FailedAuditImport, FailedAuditImportIndex>()
.CountAsync();
}
using var session = sessionProvider.OpenSession();
return await session.Query<FailedAuditImport, FailedAuditImportIndex>()
.CountAsync();
}

readonly IRavenSessionProvider sessionProvider;
}
}
Expand Up @@ -2,19 +2,10 @@
{
using Raven.Client.Documents.Session;

class RavenSessionProvider : IRavenSessionProvider
class RavenSessionProvider(IRavenDocumentStoreProvider documentStoreProvider) : IRavenSessionProvider
{
public RavenSessionProvider(IRavenDocumentStoreProvider documentStoreProvider)
{
this.documentStoreProvider = documentStoreProvider;
}

public IAsyncDocumentSession OpenSession()
{
return documentStoreProvider.GetDocumentStore()
public IAsyncDocumentSession OpenSession() =>
documentStoreProvider.GetDocumentStore()
.OpenAsyncSession();
}

readonly IRavenDocumentStoreProvider documentStoreProvider;
}
}
2 changes: 0 additions & 2 deletions src/ServiceControl.Audit.Persistence/IFailedAuditStorage.cs
Expand Up @@ -7,8 +7,6 @@

public interface IFailedAuditStorage
{
Task Store(dynamic failure);

Task SaveFailedAuditImport(FailedAuditImport message);

Task ProcessFailedMessages(
Expand Down
10 changes: 1 addition & 9 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Expand Up @@ -53,7 +53,7 @@ class AuditIngestion : IHostedService
FullMode = BoundedChannelFullMode.Wait
});

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, loggingSettings, FailedMessageFactory, OnCriticalError);
errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, loggingSettings, OnCriticalError);

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

Expand All @@ -74,14 +74,6 @@ public async Task StopAsync(CancellationToken cancellationToken)
}
}

FailedAuditImport FailedMessageFactory(FailedTransportMessage msg)
{
return new FailedAuditImport
{
Message = msg
};
}

Task OnCriticalError(string failure, Exception exception)
{
logger.Fatal($"OnCriticalError. '{failure}'", exception);
Expand Down
40 changes: 20 additions & 20 deletions src/ServiceControl.Audit/Auditing/AuditIngestionFaultPolicy.cs
Expand Up @@ -13,16 +13,14 @@

class AuditIngestionFaultPolicy
{
IFailedAuditStorage failedAuditStorage;
string logPath;
Func<FailedTransportMessage, object> messageBuilder;
ImportFailureCircuitBreaker failureCircuitBreaker;
readonly IFailedAuditStorage failedAuditStorage;
readonly string logPath;
readonly ImportFailureCircuitBreaker failureCircuitBreaker;

public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func<FailedTransportMessage, object> messageBuilder, Func<string, Exception, Task> onCriticalError)
public AuditIngestionFaultPolicy(IFailedAuditStorage failedAuditStorage, LoggingSettings settings, Func<string, Exception, Task> onCriticalError)
{
logPath = Path.Combine(settings.LogPath, @"FailedImports\Audit");
this.failedAuditStorage = failedAuditStorage;
this.messageBuilder = messageBuilder;

failureCircuitBreaker = new ImportFailureCircuitBreaker(onCriticalError);

Expand All @@ -43,17 +41,24 @@ public async Task<ErrorHandleResult> OnError(ErrorContext errorContext, Cancella

Task StoreFailedMessageDocument(ErrorContext errorContext, CancellationToken cancellationToken)
{
var failure = (dynamic)messageBuilder(new FailedTransportMessage
var failure = new FailedAuditImport
{
Id = errorContext.Message.MessageId,
Headers = errorContext.Message.Headers,
Body = errorContext.Message.Body.ToArray() //TODO Can this be adjusted?
});
Id = Guid.NewGuid().ToString(),
Message = new FailedTransportMessage
{
Id = errorContext.Message.MessageId,
Headers = errorContext.Message.Headers,
// At the moment we are taking a defensive copy of the body to avoid issues with the message body
// buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB
// handles byte[] to ReadOnlyMemory<byte> conversion we might be able to remove this.
Body = errorContext.Message.Body.ToArray()
}
};

return Handle(errorContext.Exception, failure, cancellationToken);
}

async Task Handle(Exception exception, dynamic failure, CancellationToken cancellationToken)
async Task Handle(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken)
{
try
{
Expand All @@ -65,19 +70,14 @@ async Task Handle(Exception exception, dynamic failure, CancellationToken cancel
}
}

#pragma warning disable IDE0060
async Task DoLogging(Exception exception, dynamic failure, CancellationToken cancellationToken)
#pragma warning restore IDE0060
async Task DoLogging(Exception exception, FailedAuditImport failure, CancellationToken cancellationToken)
{
var id = Guid.NewGuid().ToString();
failure.Id = id;

// Write to storage
await failedAuditStorage.Store(failure);
await failedAuditStorage.SaveFailedAuditImport(failure);

// Write to Log Path
var filePath = Path.Combine(logPath, failure.Id + ".txt");
File.WriteAllText(filePath, exception.ToFriendlyString());
await File.WriteAllTextAsync(filePath, exception.ToFriendlyString(), cancellationToken);

// Write to Event Log
WriteEvent("A message import has failed. A log file has been written to " + filePath);
Expand Down
Expand Up @@ -31,7 +31,6 @@ public void OnResultExecuting(ResultExecutingContext context)

if (ifNoneMatch || ifNotModifiedSince)
{
// TODO previously we were creating a brand new response, now we're adding more headers than before
context.Result = new StatusCodeResult((int)HttpStatusCode.NotModified);
}
}
Expand Down
@@ -1,5 +1,5 @@
{
"settings": {
"Settings": {
"Heartbeats": {
"Enabled": true,
"HeartbeatsQueue": "Particular.ServiceControl",
Expand All @@ -20,5 +20,5 @@
"CustomChecksQueue": "Particular.ServiceControl"
}
},
"errors": []
"Errors": []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't these changes breaking the PlatformConnection plugin? (I have not checked it myself, yet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the commit message ;)

Connection Details parsing is case insensitive with PropertyNameCaseInsensitive = true in the platform connector package

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I missed that.

}
Expand Up @@ -66,7 +66,6 @@ public async Task Initialize(RunDescriptor run)
}, auditEndpointConfiguration =>
{
var scanner = auditEndpointConfiguration.AssemblyScanner();
// TODO Maybe we need to find a more robust way to do this. For example excluding assemblies we find by convention
var excludedAssemblies = new[]
{
"ServiceControl.Persistence.RavenDB.dll",
Expand Down Expand Up @@ -100,7 +99,6 @@ public async Task Initialize(RunDescriptor run)
primaryEndpointConfiguration =>
{
var scanner = primaryEndpointConfiguration.AssemblyScanner();
// TODO Maybe we need to find a more robust way to do this. For example excluding assemblies we find by convention
var excludedAssemblies = new[]
{
"ServiceControl.Persistence.RavenDB.dll",
Expand Down
7 changes: 5 additions & 2 deletions src/ServiceControl/Connection/ConnectionController.cs
@@ -1,5 +1,7 @@
namespace ServiceControl.Connection
{
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
Expand All @@ -13,11 +15,12 @@ public class ConnectionController(IPlatformConnectionBuilder builder) : Controll
public async Task<IActionResult> GetConnectionDetails()
{
var platformConnectionDetails = await builder.BuildPlatformConnection();
// TODO why do these properties need to be lower cased?
var connectionDetails = new { settings = platformConnectionDetails.ToDictionary(), errors = platformConnectionDetails.Errors };
var connectionDetails = new ConnectionDetails(platformConnectionDetails.ToDictionary(), platformConnectionDetails.Errors);
// by default snake case is used for serialization so we take care of explicitly serializing here
var content = JsonSerializer.Serialize(connectionDetails);
return Content(content, "application/json");
}

public record ConnectionDetails(IDictionary<string, object> Settings, ConcurrentBag<string> Errors);
}
}
Expand Up @@ -12,7 +12,6 @@ public void OnResultExecuting(ResultExecutingContext context)
return;
}

// TODO do we even need to do this
var response = context.HttpContext.Response;
if (!response.Headers.ContainsKey("Cache-Control"))
{
Expand Down
Expand Up @@ -37,7 +37,6 @@ public void OnResultExecuting(ResultExecutingContext context)

if (ifNoneMatch || ifNotModifiedSince)
{
// TODO previously we were creating a brand new response, now we're adding more headers than before
context.Result = new StatusCodeResult((int)HttpStatusCode.NotModified);
}
}
Expand Down