Skip to content

Commit

Permalink
Persistence changes (#4024)
Browse files Browse the repository at this point in the history
* Guard against accidentally restarting the server while we are shutting down

* Use IHostApplicationLifetime to detect ServiceControl is shutting down and RavenDB doesn't need to be restarted (#4025)

* Use IHostApplicationLifetime to detect it's shutting down

* Mock IHostApplicationLifetime for tests

* Align audit embedded database, simplify DI integration
Get rid of MockHostApplicationLifetime

Co-authored-by: Mauro Servienti <mauro.servienti@gmail.com>

* Simplify and align persistence seam further

* Fix setup not having application lifetime available

* Switch the audit setup command to use a host and make the installer a hosted service

* Move persistence config into a dedicated AddServiceControlAuditInstallers extension (#4029)

* Cleanup audit store

* Inline the lifecycle concept into the persistence

* Align name with primary

* Attempt to convert the primary seam (WIP)

* Biting the bullet to switch to the provider model

* Using default builder

* Internal

* Switch to system json (but I wonder why we need to write to the test context)

* Fix the persistence tests for now with a hack

* Fix the source name

* Tweaks

* Make IHostApplicationLifetime mandatory

* Remove unused interfaces

* Remove IPersistenceLifecycle

* Align audit interface with primary

---------

Co-authored-by: Mauro Servienti <mauro.servienti@gmail.com>
Co-authored-by: Brandon Ording <bording@gmail.com>
  • Loading branch information
3 people committed Mar 27, 2024
1 parent 0363a0f commit df9a6cf
Show file tree
Hide file tree
Showing 91 changed files with 1,054 additions and 1,496 deletions.
Expand Up @@ -5,6 +5,7 @@
using Infrastructure.WebApi;
using Microsoft.AspNetCore.Mvc;
using Operations;
using Persistence.RavenDB;
using Raven.Client.Documents;

public class FailedErrorsCountReponse
Expand All @@ -14,14 +15,14 @@ public class FailedErrorsCountReponse

[ApiController]
[Route("api")]
public class FailedErrorsController(IDocumentStore store, ImportFailedErrors failedErrors)
public class FailedErrorsController(IRavenSessionProvider sessionProvider, ImportFailedErrors failedErrors)
: ControllerBase
{
[Route("failederrors/count")]
[HttpGet]
public async Task<FailedErrorsCountReponse> GetFailedErrorsCount()
{
using var session = store.OpenAsyncSession();
using var session = sessionProvider.OpenSession();
var query =
session.Query<FailedErrorImport, FailedErrorImportIndex>().Statistics(out var stats);

Expand Down
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using Infrastructure.WebApi;
using Microsoft.AspNetCore.Mvc;
using Persistence.RavenDB;
using Raven.Client.Documents;
using ServiceControl.Recoverability;

Expand All @@ -13,13 +14,13 @@ public class FailedMessageRetriesCountReponse

[ApiController]
[Route("api")]
public class FailedMessageRetriesController(IDocumentStore store) : ControllerBase
public class FailedMessageRetriesController(IRavenSessionProvider sessionProvider) : ControllerBase
{
[Route("failedmessageretries/count")]
[HttpGet]
public async Task<FailedMessageRetriesCountReponse> GetFailedMessageRetriesCount()
{
using var session = store.OpenAsyncSession();
using var session = sessionProvider.OpenSession();
await session.Query<FailedMessageRetry>().Statistics(out var stats).ToListAsync();

Response.WithEtag(stats.ResultEtag.ToString());
Expand Down

This file was deleted.

Expand Up @@ -5,9 +5,7 @@ namespace ServiceControl.AcceptanceTests.RavenDB.Shared;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.TestHost;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using ServiceBus.Management.Infrastructure.Settings;
using TestSupport;

static class WebApplicationBuilderExtensions
{
Expand All @@ -18,7 +16,6 @@ public static void AddServiceControlTesting(this WebApplicationBuilder hostBuild
hostBuilder.Logging.AddScenarioContextLogging();

hostBuilder.WebHost.UseTestServer(options => options.BaseAddress = new Uri(settings.RootUrl));
hostBuilder.Services.AddSingleton<IHostLifetime, TestServerHostLifetime>();

// This facilitates receiving the test server anywhere where DI is available
hostBuilder.Services.AddSingleton(provider => (TestServer)provider.GetRequiredService<IServer>());
Expand Down
Expand Up @@ -4,24 +4,20 @@
using Microsoft.Extensions.DependencyInjection;
using UnitOfWork;

public class InMemoryPersistence : IPersistence
public class InMemoryPersistence(PersistenceSettings persistenceSettings) : IPersistence
{
public InMemoryPersistence(PersistenceSettings persistenceSettings) => settings = persistenceSettings;

public IPersistenceLifecycle Configure(IServiceCollection serviceCollection)
public void AddPersistence(IServiceCollection services)
{
serviceCollection.AddSingleton(settings);
serviceCollection.AddSingleton<InMemoryAuditDataStore>();
serviceCollection.AddSingleton<IAuditDataStore>(sp => sp.GetRequiredService<InMemoryAuditDataStore>());
serviceCollection.AddSingleton<IBodyStorage, InMemoryAttachmentsBodyStorage>();
serviceCollection.AddSingleton<IFailedAuditStorage, InMemoryFailedAuditStorage>();
serviceCollection.AddSingleton<IAuditIngestionUnitOfWorkFactory, InMemoryAuditIngestionUnitOfWorkFactory>();

return new InMemoryPersistenceLifecycle();
services.AddSingleton(persistenceSettings);
services.AddSingleton<InMemoryAuditDataStore>();
services.AddSingleton<IAuditDataStore>(sp => sp.GetRequiredService<InMemoryAuditDataStore>());
services.AddSingleton<IBodyStorage, InMemoryAttachmentsBodyStorage>();
services.AddSingleton<IFailedAuditStorage, InMemoryFailedAuditStorage>();
services.AddSingleton<IAuditIngestionUnitOfWorkFactory, InMemoryAuditIngestionUnitOfWorkFactory>();
}

public IPersistenceInstaller CreateInstaller() => new InMemoryPersistenceInstaller();

readonly PersistenceSettings settings;
public void AddInstaller(IServiceCollection services)
{
}
}
}

This file was deleted.

This file was deleted.

Expand Up @@ -15,13 +15,8 @@
using ServiceControl.Audit.Persistence.RavenDB.Indexes;
using ServiceControl.SagaAudit;

class DatabaseSetup
class DatabaseSetup(DatabaseConfiguration configuration)
{
public DatabaseSetup(DatabaseConfiguration configuration)
{
this.configuration = configuration;
}

public async Task Execute(IDocumentStore documentStore, CancellationToken cancellationToken)
{
try
Expand Down Expand Up @@ -86,7 +81,5 @@ public static async Task DeleteLegacySagaDetailsIndex(IDocumentStore documentSto
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex"), cancellationToken);
}
}

readonly DatabaseConfiguration configuration;
}
}
107 changes: 71 additions & 36 deletions src/ServiceControl.Audit.Persistence.RavenDB/EmbeddedDatabase.cs
@@ -1,4 +1,6 @@
namespace ServiceControl.Audit.Persistence.RavenDB
#nullable enable

namespace ServiceControl.Audit.Persistence.RavenDB
{
using System;
using System.Diagnostics;
Expand All @@ -8,43 +10,50 @@
using System.Threading;
using System.Threading.Tasks;
using ByteSizeLib;
using Microsoft.Extensions.Hosting;
using NServiceBus.Logging;
using Raven.Client.Documents;
using Raven.Client.Documents.Conventions;
using Raven.Client.ServerWide.Operations;
using Raven.Embedded;

public class EmbeddedDatabase(DatabaseConfiguration configuration) : IDisposable
public sealed class EmbeddedDatabase : IDisposable
{
public string ServerUrl { get; } = configuration.ServerConfiguration.ServerUrl;
EmbeddedDatabase(DatabaseConfiguration configuration, IHostApplicationLifetime lifetime)
{
this.configuration = configuration;
ServerUrl = configuration.ServerConfiguration.ServerUrl;
shutdownTokenSourceRegistration = shutdownTokenSource.Token.Register(() => isStopping = true);
applicationStoppingRegistration = lifetime.ApplicationStopping.Register(() => isStopping = true);
}

public string ServerUrl { get; private set; }

static (string LicenseFileName, string ServerDirectory) GetRavenLicenseFileNameAndServerDirectory()
{
var assembly = Assembly.GetExecutingAssembly();
var assemblyDirectory = Path.GetDirectoryName(assembly.Location);

var licenseFileName = "RavenLicense.json";
var ravenLicense = Path.Combine(assemblyDirectory, licenseFileName);
var serverDirectory = Path.Combine(assemblyDirectory, "RavenDBServer");
var ravenLicense = Path.Combine(assemblyDirectory!, licenseFileName);
var serverDirectory = Path.Combine(assemblyDirectory!, "RavenDBServer");

if (File.Exists(ravenLicense))
{
return (ravenLicense, serverDirectory);
}
else
{
var assemblyName = Path.GetFileName(assembly.Location);
throw new Exception($"RavenDB license not found. Make sure the RavenDB license file '{licenseFileName}' is stored in the same directory as {assemblyName}.");
}

var assemblyName = Path.GetFileName(assembly.Location);
throw new Exception($"RavenDB license not found. Make sure the RavenDB license file '{licenseFileName}' is stored in the same directory as {assemblyName}.");
}

public static EmbeddedDatabase Start(DatabaseConfiguration databaseConfiguration)
public static EmbeddedDatabase Start(DatabaseConfiguration databaseConfiguration, IHostApplicationLifetime lifetime)
{
var licenseFileNameAndServerDirectory = GetRavenLicenseFileNameAndServerDirectory();

var nugetPackagesPath = Path.Combine(databaseConfiguration.ServerConfiguration.DbPath, "Packages", "NuGet");

logger.InfoFormat("Loading RavenDB license from {0}", licenseFileNameAndServerDirectory.LicenseFileName);
Logger.InfoFormat("Loading RavenDB license from {0}", licenseFileNameAndServerDirectory.LicenseFileName);
var serverOptions = new ServerOptions
{
CommandLineArgs =
Expand All @@ -66,59 +75,69 @@ public static EmbeddedDatabase Start(DatabaseConfiguration databaseConfiguration
serverOptions.ServerDirectory = licenseFileNameAndServerDirectory.ServerDirectory;
}

var embeddedDatabase = new EmbeddedDatabase(databaseConfiguration);

var embeddedDatabase = new EmbeddedDatabase(databaseConfiguration, lifetime);
embeddedDatabase.Start(serverOptions);

return embeddedDatabase;
}

void Start(ServerOptions serverOptions)
{
EmbeddedServer.Instance.ServerProcessExited += (sender, args) =>
{
if (sender is Process process && process.HasExited && process.ExitCode != 0)
{
logger.Warn($"RavenDB server process exited unexpectedly with exitCode: {process.ExitCode}. Process will be restarted.");
restartRequired = true;
}
};

EmbeddedServer.Instance.ServerProcessExited += OnServerProcessExited;
EmbeddedServer.Instance.StartServer(serverOptions);

_ = Task.Run(async () =>
{
while (!shutdownTokenSource.IsCancellationRequested)
while (!isStopping)
{
try
{
await Task.Delay(delayBetweenRestarts, shutdownTokenSource.Token);
if (restartRequired)
if (!restartRequired)
{
logger.Info("Restarting RavenDB server process");
continue;
}
await EmbeddedServer.Instance.RestartServerAsync();
restartRequired = false;
Logger.Info("Restarting RavenDB server process");
logger.Info("RavenDB server process restarted successfully.");
}
await EmbeddedServer.Instance.RestartServerAsync();
restartRequired = false;
Logger.Info("RavenDB server process restarted successfully.");
}
catch (OperationCanceledException)
{
//no-op
}
catch (Exception e)
{
logger.Fatal($"RavenDB server restart failed. Restart will be retried in {delayBetweenRestarts}.", e);
Logger.Fatal($"RavenDB server restart failed. Restart will be retried in {delayBetweenRestarts}.", e);
}
}
});

RecordStartup();
}

void OnServerProcessExited(object? sender, ServerProcessExitedEventArgs _)
{
if (isStopping)
{
return;
}

restartRequired = true;
if (sender is Process process)
{
Logger.Warn($"RavenDB server process exited unexpectedly with exitCode: {process.ExitCode}. Process will be restarted.");
}
else
{
Logger.Warn($"RavenDB server process exited unexpectedly. Process will be restarted.");
}
}

public async Task<IDocumentStore> Connect(CancellationToken cancellationToken)
{
var dbOptions = new DatabaseOptions(configuration.Name)
Expand Down Expand Up @@ -150,8 +169,19 @@ public async Task DeleteDatabase(string dbName)

public void Dispose()
{
if (disposed)
{
return;
}

EmbeddedServer.Instance.ServerProcessExited -= OnServerProcessExited;

shutdownTokenSource.Cancel();
EmbeddedServer.Instance?.Dispose();
EmbeddedServer.Instance.Dispose();
shutdownTokenSource.Dispose();
applicationStoppingRegistration.Dispose();
shutdownTokenSourceRegistration.Dispose();
disposed = true;
}

void RecordStartup()
Expand All @@ -167,7 +197,7 @@ void RecordStartup()
RavenDB Logging Level: {configuration.ServerConfiguration.LogsMode}
-------------------------------------------------------------";

logger.Info(startupMessage);
Logger.Info(startupMessage);
}

long DataSize()
Expand Down Expand Up @@ -224,10 +254,15 @@ static long DirSize(DirectoryInfo d)
return size;
}

CancellationTokenSource shutdownTokenSource = new();
bool disposed;
bool restartRequired;
bool isStopping;
readonly CancellationTokenSource shutdownTokenSource = new();
readonly DatabaseConfiguration configuration;
readonly CancellationTokenRegistration applicationStoppingRegistration;
readonly CancellationTokenRegistration shutdownTokenSourceRegistration;

static TimeSpan delayBetweenRestarts = TimeSpan.FromSeconds(60);
static readonly ILog logger = LogManager.GetLogger<EmbeddedDatabase>();
static readonly ILog Logger = LogManager.GetLogger<EmbeddedDatabase>();
}
}
@@ -1,6 +1,10 @@
namespace ServiceControl.Audit.Persistence.RavenDB
{
interface IRavenPersistenceLifecycle : IPersistenceLifecycle, IRavenDocumentStoreProvider
using System.Threading;
using System.Threading.Tasks;

interface IRavenPersistenceLifecycle
{
Task Initialize(CancellationToken cancellationToken = default);
}
}

0 comments on commit df9a6cf

Please sign in to comment.