Skip to content

Commit

Permalink
Diagnostic improvements and bulk insert time config setting (#4049)
Browse files Browse the repository at this point in the history
* Add exception logging to DomainEvents

* Add NServiceBus reference

* Add debug logging to EndpointInstanceMonitor

* Add configurable bulk insert timeout

---------

Co-authored-by: WilliamBZA <WilliamBZA@users.noreply.github.com>
  • Loading branch information
2 people authored and mikeminutillo committed Apr 5, 2024
1 parent bb8de64 commit 762674c
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 8 deletions.
Expand Up @@ -11,14 +11,16 @@ public class DatabaseConfiguration
TimeSpan auditRetentionPeriod,
int maxBodySizeToStore,
int minimumStorageLeftRequiredForIngestion,
ServerConfiguration serverConfiguration)
ServerConfiguration serverConfiguration,
TimeSpan bulkInsertCommitTimeout)
{
Name = name;
ExpirationProcessTimerInSeconds = expirationProcessTimerInSeconds;
EnableFullTextSearch = enableFullTextSearch;
AuditRetentionPeriod = auditRetentionPeriod;
MaxBodySizeToStore = maxBodySizeToStore;
ServerConfiguration = serverConfiguration;
BulkInsertCommitTimeout = bulkInsertCommitTimeout;
MinimumStorageLeftRequiredForIngestion = minimumStorageLeftRequiredForIngestion;
}

Expand All @@ -37,5 +39,7 @@ public class DatabaseConfiguration
public int MaxBodySizeToStore { get; }

public int MinimumStorageLeftRequiredForIngestion { get; internal set; } //Setting for ATT only

public TimeSpan BulkInsertCommitTimeout { get; }
}
}
Expand Up @@ -14,6 +14,7 @@ public class RavenPersistenceConfiguration : IPersistenceConfiguration
public const string LogPathKey = "LogPath";
public const string RavenDbLogLevelKey = "RavenDBLogLevel";
public const string MinimumStorageLeftRequiredForIngestionKey = "MinimumStorageLeftRequiredForIngestion";
public const string BulkInsertCommitTimeoutInSecondsKey = "BulkInsertCommitTimeoutInSeconds";

public IEnumerable<string> ConfigurationKeys => new[]{
DatabaseNameKey,
Expand All @@ -23,7 +24,8 @@ public class RavenPersistenceConfiguration : IPersistenceConfiguration
ExpirationProcessTimerInSecondsKey,
LogPathKey,
RavenDbLogLevelKey,
MinimumStorageLeftRequiredForIngestionKey
MinimumStorageLeftRequiredForIngestionKey,
BulkInsertCommitTimeoutInSecondsKey
};

public string Name => "RavenDB";
Expand Down Expand Up @@ -98,14 +100,17 @@ internal static DatabaseConfiguration GetDatabaseConfiguration(PersistenceSettin

var expirationProcessTimerInSeconds = GetExpirationProcessTimerInSeconds(settings);

var bulkInsertTimeout = TimeSpan.FromSeconds(GetBulkInsertCommitTimeout(settings));

return new DatabaseConfiguration(
databaseName,
expirationProcessTimerInSeconds,
settings.EnableFullTextSearchOnBodies,
settings.AuditRetentionPeriod,
settings.MaxBodySizeToStore,
minimumStorageLeftRequiredForIngestion,
serverConfiguration);
serverConfiguration,
bulkInsertTimeout);
}

static int GetExpirationProcessTimerInSeconds(PersistenceSettings settings)
Expand All @@ -132,8 +137,33 @@ static int GetExpirationProcessTimerInSeconds(PersistenceSettings settings)
return expirationProcessTimerInSeconds;
}

static int GetBulkInsertCommitTimeout(PersistenceSettings settings)
{
var bulkInsertCommitTimeoutInSeconds = BulkInsertCommitTimeoutInSecondsDefault;

if (settings.PersisterSpecificSettings.TryGetValue(BulkInsertCommitTimeoutInSecondsKey, out var bulkInsertCommitTimeoutString))
{
bulkInsertCommitTimeoutInSeconds = int.Parse(bulkInsertCommitTimeoutString);
}

if (bulkInsertCommitTimeoutInSeconds < 0)
{
Logger.Error($"BulkInsertCommitTimeout cannot be negative. Defaulting to {BulkInsertCommitTimeoutInSecondsDefault}");
return BulkInsertCommitTimeoutInSecondsDefault;
}

if (bulkInsertCommitTimeoutInSeconds > TimeSpan.FromHours(1).TotalSeconds)
{
Logger.Error($"BulkInsertCommitTimeout cannot be larger than {TimeSpan.FromHours(1).TotalSeconds}. Defaulting to {BulkInsertCommitTimeoutInSecondsDefault}");
return BulkInsertCommitTimeoutInSecondsDefault;
}

return bulkInsertCommitTimeoutInSeconds;
}

static readonly ILog Logger = LogManager.GetLogger(typeof(RavenPersistenceConfiguration));

const int ExpirationProcessTimerInSecondsDefault = 600;
const int BulkInsertCommitTimeoutInSecondsDefault = 60;
}
}
Expand Up @@ -20,7 +20,7 @@ class RavenAuditIngestionUnitOfWorkFactory : IAuditIngestionUnitOfWorkFactory

public IAuditIngestionUnitOfWork StartNew(int batchSize)
{
var timedCancellationSource = new CancellationTokenSource(TimeSpan.FromMinutes(1));
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
var bulkInsert = documentStoreProvider.GetDocumentStore()
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);

Expand Down
Expand Up @@ -27,7 +27,7 @@ public static async Task<EmbeddedDatabase> GetInstance(CancellationToken cancell
var logsMode = "Operations";
var serverUrl = $"http://localhost:{PortUtility.FindAvailablePort(33334)}";

embeddedDatabase = EmbeddedDatabase.Start(new DatabaseConfiguration("audit", 60, true, TimeSpan.FromMinutes(5), 120000, 5, new ServerConfiguration(dbPath, serverUrl, logPath, logsMode)));
embeddedDatabase = EmbeddedDatabase.Start(new DatabaseConfiguration("audit", 60, true, TimeSpan.FromMinutes(5), 120000, 5, new ServerConfiguration(dbPath, serverUrl, logPath, logsMode), TimeSpan.FromSeconds(60)));

//make sure that the database is up
while (true)
Expand Down
25 changes: 22 additions & 3 deletions src/ServiceControl.DomainEvents/DomainEvents.cs
Expand Up @@ -3,9 +3,12 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Logging;

public class DomainEvents : IDomainEvents
{
static readonly ILog Log = LogManager.GetLogger<DomainEvents>();

readonly IServiceProvider serviceProvider;
public DomainEvents(IServiceProvider serviceProvider) => this.serviceProvider = serviceProvider;

Expand All @@ -14,15 +17,31 @@ public class DomainEvents : IDomainEvents
var handlers = serviceProvider.GetServices<IDomainHandler<T>>();
foreach (var handler in handlers)
{
await handler.Handle(domainEvent)
.ConfigureAwait(false);
try
{
await handler.Handle(domainEvent)
.ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error($"Unexpected error publishing domain event {typeof(T)}", e);
throw;
}
}

var ieventHandlers = serviceProvider.GetServices<IDomainHandler<IDomainEvent>>();
foreach (var handler in ieventHandlers)
{
await handler.Handle(domainEvent)
try
{
await handler.Handle(domainEvent)
.ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error($"Unexpected error publishing domain event {typeof(T)}", e);
throw;
}
}
}
}
Expand Down
Expand Up @@ -6,6 +6,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="NServiceBus" />
</ItemGroup>

</Project>
5 changes: 5 additions & 0 deletions src/ServiceControl/Monitoring/EndpointInstanceMonitor.cs
Expand Up @@ -5,6 +5,8 @@ namespace ServiceControl.Monitoring
using Contracts.HeartbeatMonitoring;
using EndpointControl.Contracts;
using Infrastructure.DomainEvents;
using NLog.Fluent;
using NServiceBus.Logging;
using ServiceControl.Operations;
using ServiceControl.Persistence;

Expand Down Expand Up @@ -38,6 +40,7 @@ public async Task UpdateStatus(HeartbeatStatus newStatus, DateTime? latestTimest
if (newStatus != status)
{
await RaiseStateChangeEvents(newStatus, latestTimestamp);
Log.DebugFormat("Endpoint {0} status updated from {1} to {2}", Id.LogicalName, status, newStatus);
}

lastSeen = latestTimestamp;
Expand Down Expand Up @@ -132,6 +135,8 @@ public KnownEndpointsView GetKnownView()
};
}

static readonly ILog Log = LogManager.GetLogger<EndpointInstanceMonitor>();

IDomainEvents domainEvents;
DateTime? lastSeen;
HeartbeatStatus status;
Expand Down

0 comments on commit 762674c

Please sign in to comment.