Skip to content

Commit

Permalink
Biting the bullet to switch to the provider model
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Mar 26, 2024
1 parent 25ad59e commit 2a22d8f
Show file tree
Hide file tree
Showing 23 changed files with 413 additions and 534 deletions.
66 changes: 34 additions & 32 deletions src/ServiceControl.Persistence.RavenDB/ErrorMessagesDataStore.cs
Expand Up @@ -25,7 +25,8 @@
using ServiceControl.Recoverability;

class ErrorMessagesDataStore(
IDocumentStore documentStore,
IRavenSessionProvider sessionProvider,
IRavenDocumentStoreProvider documentStoreProvider,
IBodyStorage bodyStorage,
ExpirationManager expirationManager)
: IErrorMessageDataStore
Expand All @@ -36,7 +37,7 @@ class ErrorMessagesDataStore(
bool includeSystemMessages
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Query<MessagesViewIndex.SortAndFilterOptions, MessagesViewIndex>()
.IncludeSystemMessagesWhere(includeSystemMessages)
.Statistics(out var stats)
Expand All @@ -57,7 +58,7 @@ bool includeSystemMessages
bool includeSystemMessages
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Query<MessagesViewIndex.SortAndFilterOptions, MessagesViewIndex>()
.IncludeSystemMessagesWhere(includeSystemMessages)
.Where(m => m.ReceivingEndpointName == endpointName)
Expand All @@ -80,7 +81,7 @@ bool includeSystemMessages
SortInfo sortInfo
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Query<MessagesViewIndex.SortAndFilterOptions, MessagesViewIndex>()
.Statistics(out var stats)
.Search(x => x.Query, searchKeyword)
Expand All @@ -102,7 +103,7 @@ SortInfo sortInfo
bool includeSystemMessages
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Query<MessagesViewIndex.SortAndFilterOptions, MessagesViewIndex>()
.Statistics(out var stats)
.Where(m => m.ConversationId == conversationId)
Expand All @@ -122,7 +123,7 @@ bool includeSystemMessages
SortInfo sortInfo
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Query<MessagesViewIndex.SortAndFilterOptions, MessagesViewIndex>()
.Statistics(out var stats)
.Search(x => x.Query, searchTerms)
Expand All @@ -138,7 +139,7 @@ SortInfo sortInfo

public async Task FailedMessageMarkAsArchived(string failedMessageId)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var failedMessage = await session.LoadAsync<FailedMessage>(FailedMessageIdGenerator.MakeDocumentId(failedMessageId));

if (failedMessage.Status != FailedMessageStatus.Archived)
Expand All @@ -153,30 +154,30 @@ public async Task FailedMessageMarkAsArchived(string failedMessageId)

public async Task<FailedMessage[]> FailedMessagesFetch(Guid[] ids)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var docIds = ids.Select(g => FailedMessageIdGenerator.MakeDocumentId(g.ToString()));
var results = await session.LoadAsync<FailedMessage>(docIds);
return results.Values.Where(x => x != null).ToArray();
}

public async Task StoreFailedErrorImport(FailedErrorImport failure)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
await session.StoreAsync(failure);

await session.SaveChangesAsync();
}

public Task<IEditFailedMessagesManager> CreateEditFailedMessageManager()
{
var session = documentStore.OpenAsyncSession();
var session = sessionProvider.OpenAsyncSession();
var manager = new EditFailedMessageManager(session, expirationManager);
return Task.FromResult((IEditFailedMessagesManager)manager);
}

public async Task<QueryResult<FailureGroupView>> GetFailureGroupView(string groupId, string status, string modified)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var document = await session.Advanced
.AsyncDocumentQuery<FailureGroupView, ArchivedGroupsViewIndex>()
.Statistics(out var stats)
Expand All @@ -190,7 +191,7 @@ public async Task<QueryResult<FailureGroupView>> GetFailureGroupView(string grou

public async Task<IList<FailureGroupView>> GetFailureGroupsByClassifier(string classifier)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var groups = session
.Query<FailureGroupView, ArchivedGroupsViewIndex>()
.Where(v => v.Type == classifier);
Expand All @@ -211,7 +212,7 @@ public async Task<IList<FailureGroupView>> GetFailureGroupsByClassifier(string c
SortInfo sortInfo
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Advanced
.AsyncDocumentQuery<FailedMessageViewIndex.SortAndFilterOptions, FailedMessageViewIndex>()
.Statistics(out var stats)
Expand All @@ -236,7 +237,7 @@ SortInfo sortInfo
string queueAddress
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var stats = await session.Advanced
.AsyncDocumentQuery<FailedMessageViewIndex.SortAndFilterOptions, FailedMessageViewIndex>()
.FilterByStatusWhere(status)
Expand All @@ -255,7 +256,7 @@ string queueAddress
SortInfo sortInfo
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Advanced
.AsyncDocumentQuery<FailedMessageViewIndex.SortAndFilterOptions, FailedMessageViewIndex>()
.Statistics(out var stats)
Expand All @@ -277,7 +278,7 @@ SortInfo sortInfo

public async Task<IDictionary<string, object>> ErrorsSummary()
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var facetResults = await session.Query<FailedMessage, FailedMessageFacetsIndex>()
.AggregateBy(new List<Facet>
{
Expand Down Expand Up @@ -311,22 +312,22 @@ SortInfo sortInfo

async Task<FailedMessage> ErrorByDocumentId(string documentId)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var message = await session.LoadAsync<FailedMessage>(documentId);
return message;
}

public Task<INotificationsManager> CreateNotificationsManager()
{
// the session manager manages the lifetime of the session
var manager = new NotificationsManager(documentStore.OpenAsyncSession());
var manager = new NotificationsManager(sessionProvider.OpenAsyncSession());

return Task.FromResult<INotificationsManager>(manager);
}

public async Task<FailedMessageView> ErrorLastBy(string failedMessageId)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var message = await session.LoadAsync<FailedMessage>(FailedMessageIdGenerator.MakeDocumentId(failedMessageId));
if (message == null)
{
Expand Down Expand Up @@ -387,7 +388,7 @@ static FailedMessageView Map(FailedMessage message, IAsyncDocumentSession sessio

public async Task EditComment(string groupId, string comment)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var groupComment =
await session.LoadAsync<GroupComment>(GroupComment.MakeId(groupId))
?? new GroupComment { Id = GroupComment.MakeId(groupId) };
Expand All @@ -400,7 +401,7 @@ await session.LoadAsync<GroupComment>(GroupComment.MakeId(groupId))

public async Task DeleteComment(string groupId)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
session.Delete(GroupComment.MakeId(groupId));
await session.SaveChangesAsync();
}
Expand All @@ -413,7 +414,7 @@ public async Task DeleteComment(string groupId)
PagingInfo pagingInfo
)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session.Advanced
.AsyncDocumentQuery<FailureGroupMessageView, FailedMessages_ByGroup>()
.Statistics(out var stats)
Expand All @@ -434,7 +435,7 @@ PagingInfo pagingInfo

public async Task<QueryStatsInfo> GetGroupErrorsCount(string groupId, string status, string modified)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var queryResult = await session.Advanced
.AsyncDocumentQuery<FailureGroupMessageView, FailedMessages_ByGroup>()
.WhereEquals(view => view.FailureGroupId, groupId)
Expand All @@ -447,7 +448,7 @@ public async Task<QueryStatsInfo> GetGroupErrorsCount(string groupId, string sta

public async Task<QueryResult<IList<FailureGroupView>>> GetGroup(string groupId, string status, string modified)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var queryResult = await session.Advanced
.AsyncDocumentQuery<FailureGroupView, FailureGroupsViewIndex>()
.Statistics(out var stats)
Expand All @@ -463,7 +464,7 @@ public async Task<bool> MarkMessageAsResolved(string failedMessageId)
{
var documentId = FailedMessageIdGenerator.MakeDocumentId(failedMessageId);

using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
session.Advanced.UseOptimisticConcurrency = true;

var failedMessage = await session.LoadAsync<FailedMessage>(documentId);
Expand All @@ -484,7 +485,7 @@ public async Task<bool> MarkMessageAsResolved(string failedMessageId)

public async Task ProcessPendingRetries(DateTime periodFrom, DateTime periodTo, string queueAddress, Func<string, Task> processCallback)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var prequery = session.Advanced
.AsyncDocumentQuery<FailedMessageViewIndex.SortAndFilterOptions, FailedMessageViewIndex>()
.WhereEquals("Status", (int)FailedMessageStatus.RetryIssued)
Expand Down Expand Up @@ -546,6 +547,7 @@ public async Task<string[]> UnArchiveMessagesByRange(DateTime from, DateTime to)
RetrieveDetails = true
});

var documentStore = documentStoreProvider.GetDocumentStore();
var operation = await documentStore.Operations.SendAsync(patch);

var result = await operation.WaitForCompletionAsync<BulkOperationResult>();
Expand All @@ -561,7 +563,7 @@ public async Task<string[]> UnArchiveMessages(IEnumerable<string> failedMessageI
{
Dictionary<string, FailedMessage> failedMessages;

using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
session.Advanced.UseOptimisticConcurrency = true;

var documentIds = failedMessageIds.Select(FailedMessageIdGenerator.MakeDocumentId);
Expand All @@ -585,7 +587,7 @@ public async Task<string[]> UnArchiveMessages(IEnumerable<string> failedMessageI

public async Task RevertRetry(string messageUniqueId)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var failedMessage = await session
.LoadAsync<FailedMessage>(FailedMessageIdGenerator.MakeDocumentId(messageUniqueId));
if (failedMessage != null)
Expand All @@ -605,13 +607,13 @@ public async Task RevertRetry(string messageUniqueId)

public async Task RemoveFailedMessageRetryDocument(string uniqueMessageId)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
await session.Advanced.RequestExecutor.ExecuteAsync(new DeleteDocumentCommand(FailedMessageRetry.MakeDocumentId(uniqueMessageId), null), session.Advanced.Context);
}

public async Task<string[]> GetRetryPendingMessages(DateTime from, DateTime to, string queueAddress)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
var query = session
.Query<FailedMessageViewIndex.SortAndFilterOptions, FailedMessageViewIndex>()
.Where(o => o.Status == FailedMessageStatus.RetryIssued && o.LastModified >= from.Ticks && o.LastModified <= to.Ticks && o.QueueAddress == queueAddress)
Expand Down Expand Up @@ -659,7 +661,7 @@ public async Task<byte[]> FetchFromFailedMessage(string uniqueMessageId)

public async Task StoreEventLogItem(EventLogItem logItem)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
await session.StoreAsync(logItem);

expirationManager.EnableExpiration(session, logItem);
Expand All @@ -669,7 +671,7 @@ public async Task StoreEventLogItem(EventLogItem logItem)

public async Task StoreFailedMessagesForTestsOnly(params FailedMessage[] failedMessages)
{
using var session = documentStore.OpenAsyncSession();
using var session = sessionProvider.OpenAsyncSession();
foreach (var message in failedMessages)
{
await session.StoreAsync(message);
Expand Down
35 changes: 12 additions & 23 deletions src/ServiceControl.Persistence.RavenDB/EventLogDataStore.cs
Expand Up @@ -6,37 +6,26 @@
using Persistence.Infrastructure;
using Raven.Client.Documents;

class EventLogDataStore : IEventLogDataStore
class EventLogDataStore(IRavenSessionProvider sessionProvider) : IEventLogDataStore
{
readonly IDocumentStore documentStore;

public EventLogDataStore(IDocumentStore documentStore)
{
this.documentStore = documentStore;
}

public async Task Add(EventLogItem logItem)
{
using (var session = documentStore.OpenAsyncSession())
{
await session.StoreAsync(logItem);
await session.SaveChangesAsync();
}
using var session = sessionProvider.OpenAsyncSession();
await session.StoreAsync(logItem);
await session.SaveChangesAsync();
}

public async Task<(IList<EventLogItem>, int, string)> GetEventLogItems(PagingInfo pagingInfo)
{
using (var session = documentStore.OpenAsyncSession())
{
var results = await session
.Query<EventLogItem>()
.Statistics(out var stats)
.OrderByDescending(p => p.RaisedAt)
.Paging(pagingInfo)
.ToListAsync();
using var session = sessionProvider.OpenAsyncSession();
var results = await session
.Query<EventLogItem>()
.Statistics(out var stats)
.OrderByDescending(p => p.RaisedAt)
.Paging(pagingInfo)
.ToListAsync();

return (results, stats.TotalResults, stats.ResultEtag.ToString());
}
return (results, stats.TotalResults, stats.ResultEtag.ToString());
}
}
}

0 comments on commit 2a22d8f

Please sign in to comment.