Skip to content

Commit

Permalink
Added ElasticSearch left fold projections
Browse files Browse the repository at this point in the history
Cleaned EventStoreDB README links
Minor updates in other places
  • Loading branch information
oskardudycz committed Jul 14, 2021
1 parent e624f4b commit aa65a44
Show file tree
Hide file tree
Showing 20 changed files with 169 additions and 68 deletions.
32 changes: 32 additions & 0 deletions Core.ElasticSearch/Config.cs
@@ -0,0 +1,32 @@
using System;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Nest;

namespace Core.ElasticSearch
{
public class ElasticSearchConfig
{
public string Url { get; set; } = default!;
public string DefaultIndex { get; set; } = default!;
}

public static class ElasticSearchConfigExtensions
{
private const string DefaultConfigKey = "ElasticSearch";
public static void AddElasticsearch(
this IServiceCollection services, IConfiguration configuration, Action<ConnectionSettings>? config = null)
{
var elasticSearchConfig = configuration.GetSection(DefaultConfigKey).Get<ElasticSearchConfig>();

var settings = new ConnectionSettings(new Uri(elasticSearchConfig.Url))
.DefaultIndex(elasticSearchConfig.DefaultIndex);

config?.Invoke(settings);

var client = new ElasticClient(settings);

services.AddSingleton<IElasticClient>(client);
}
}
}
1 change: 1 addition & 0 deletions Core.ElasticSearch/Core.ElasticSearch.csproj
Expand Up @@ -10,6 +10,7 @@
<ItemGroup>
<PackageReference Include="NEST" Version="7.13.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
Expand Down
26 changes: 0 additions & 26 deletions Core.ElasticSearch/ElasticSearchConfig.cs

This file was deleted.

40 changes: 40 additions & 0 deletions Core.ElasticSearch/Indices/IndexNameMapper.cs
@@ -0,0 +1,40 @@
using System;
using System.Collections.Concurrent;
using System.Linq;

namespace Core.ElasticSearch.Indices
{
public class IndexNameMapper
{
private static readonly IndexNameMapper Instance = new();

private readonly ConcurrentDictionary<Type, string> typeNameMap = new();

public static void AddCustomMap<TStream>(string mappedStreamName) =>
AddCustomMap(typeof(TStream), mappedStreamName);

public static void AddCustomMap(Type streamType, string mappedStreamName)
{
Instance.typeNameMap.AddOrUpdate(streamType, mappedStreamName, (_, _) => mappedStreamName);
}

public static string ToIndexPrefix<TStream>() => ToIndexPrefix(typeof(TStream));

public static string ToIndexPrefix(Type streamType) => Instance.typeNameMap.GetOrAdd(streamType, (_) =>
{
var modulePrefix = streamType.Namespace!.Split(".").First();
return $"{modulePrefix}-{streamType.Name}".ToLower();
});

public static string ToIndexName<TStream>(object? tenantId = null) =>
ToIndexName(typeof(TStream));

public static string ToIndexName(Type streamType, object? tenantId = null)
{
var tenantPrefix = tenantId != null ? $"{tenantId}-" : "";

return $"{tenantPrefix}{ToIndexPrefix(streamType)}".ToLower();
}

}
}
62 changes: 62 additions & 0 deletions Core.ElasticSearch/Projections/ElasticSearchProjection.cs
@@ -0,0 +1,62 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Core.ElasticSearch.Indices;
using Core.Events;
using Core.Projections;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using Nest;

namespace Core.ElasticSearch.Projections
{
public class ElasticSearchProjection<TEvent, TView> : IEventHandler<TEvent>
where TView : class, IProjection
where TEvent : IEvent
{
private readonly IElasticClient elasticClient;
private readonly Func<TEvent, string> getId;

public ElasticSearchProjection(
IElasticClient elasticClient,
Func<TEvent, string> getId
)
{
this.elasticClient = elasticClient ?? throw new ArgumentNullException(nameof(elasticClient));
this.getId = getId ?? throw new ArgumentNullException(nameof(getId));
}

public async Task Handle(TEvent @event, CancellationToken ct)
{
string id = getId(@event);

var entity = (await elasticClient.GetAsync<TView>(id, ct: ct))?.Source
?? (TView) Activator.CreateInstance(typeof(TView), true)!;

entity.When(@event);

var result = await elasticClient.UpdateAsync<TView>(id,
u => u.Doc(entity).Upsert(entity).Index(IndexNameMapper.ToIndexName<TView>()),
ct
);
}
}

public static class ElasticSearchProjectionConfig
{
public static IServiceCollection Project<TEvent, TView>(this IServiceCollection services,
Func<TEvent, string> getId)
where TView : class, IProjection
where TEvent : IEvent
{
services.AddTransient<INotificationHandler<TEvent>>(sp =>
{
var session = sp.GetRequiredService<IElasticClient>();
return new ElasticSearchProjection<TEvent, TView>(session, getId);
});

return services;
}
}
}
15 changes: 8 additions & 7 deletions Core.ElasticSearch/Repository/ElasticSearchRepository.cs
@@ -1,19 +1,20 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Core.Aggregates;
using Core.ElasticSearch.Indices;
using Core.Events;
using Core.Repositories;
using Nest;
using IAggregate = Core.Aggregates.IAggregate;

namespace Core.ElasticSearch.Repository
{
public class ElasticSearchRepository<T>: IRepository<T> where T : class, IAggregate, new()
public class ElasticSearchRepository<T>: Repositories.IRepository<T> where T : class, IAggregate, new()
{
private readonly Nest.IElasticClient elasticClient;
private readonly IElasticClient elasticClient;
private readonly IEventBus eventBus;

public ElasticSearchRepository(
Nest.IElasticClient elasticClient,
IElasticClient elasticClient,
IEventBus eventBus
)
{
Expand All @@ -29,12 +30,12 @@ IEventBus eventBus

public Task Add(T aggregate, CancellationToken cancellationToken)
{
return elasticClient.IndexAsync(aggregate, i => i.Id(aggregate.Id), cancellationToken);
return elasticClient.IndexAsync(aggregate, i => i.Id(aggregate.Id).Index(IndexNameMapper.ToIndexName<T>()), cancellationToken);
}

public Task Update(T aggregate, CancellationToken cancellationToken)
{
return elasticClient.UpdateAsync<T>(aggregate.Id, i => i.Doc(aggregate), cancellationToken);
return elasticClient.UpdateAsync<T>(aggregate.Id, i => i.Doc(aggregate).Index(IndexNameMapper.ToIndexName<T>()), cancellationToken);
}

public Task Delete(T aggregate, CancellationToken cancellationToken)
Expand Down
1 change: 0 additions & 1 deletion Core.EventStoreDB/Config.cs
@@ -1,6 +1,5 @@
using System;
using Core.EventStoreDB.Subscriptions;
using Core.Subscriptions;
using EventStore.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
Expand Down
1 change: 0 additions & 1 deletion Core.EventStoreDB/Serialization/EventStoreDBSerializer.cs
@@ -1,6 +1,5 @@
using System.Text;
using Core.Events;
using Core.Reflection;
using EventStore.Client;
using Newtonsoft.Json;

Expand Down
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using Core.Events;
using Core.EventStoreDB.Serialization;
using Core.Subscriptions;
using EventStore.Client;

namespace Core.EventStoreDB.Subscriptions
Expand Down
@@ -1,7 +1,7 @@
using System.Threading;
using System.Threading.Tasks;

namespace Core.Subscriptions
namespace Core.EventStoreDB.Subscriptions
{
public interface ISubscriptionCheckpointRepository
{
Expand Down
Expand Up @@ -2,7 +2,7 @@
using System.Threading;
using System.Threading.Tasks;

namespace Core.Subscriptions
namespace Core.EventStoreDB.Subscriptions
{
public class InMemorySubscriptionCheckpointRepository: ISubscriptionCheckpointRepository
{
Expand Down
@@ -1,18 +1,13 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Core.Events;
using Core.Events.External;
using Core.EventStoreDB.Serialization;
using Core.Reflection;
using Core.Subscriptions;
using Core.Threading;
using EventStore.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Core.EventStoreDB.Subscriptions
{
Expand Down
4 changes: 2 additions & 2 deletions Core.Testing/TestContext.cs
Expand Up @@ -105,8 +105,8 @@ public async Task PublishInternalEvent(IEvent @event)

public void Dispose()
{
server?.Dispose();
Client?.Dispose();
server.Dispose();
Client.Dispose();
}

public IReadOnlyCollection<TEvent> PublishedInternalEventsOfType<TEvent>()
Expand Down
Expand Up @@ -11,7 +11,7 @@

namespace Carts.Tests.Carts.InitializingCart
{
public class InitializeCardCommandHandlerTests
public class InitializeCartCommandHandlerTests
{
[Fact]
public async Task ForInitCardCommand_ShouldAddNewCart()
Expand Down
Expand Up @@ -7,7 +7,6 @@
using Core.Exceptions;
using Core.Queries;
using EventStore.Client;
using Marten;

namespace Carts.Carts.GettingCartAtVersion
{
Expand Down
30 changes: 15 additions & 15 deletions Sample/EventStoreDB/ECommerce/README.md
Expand Up @@ -21,7 +21,7 @@ Sample is showing the typical flow of the Event Sourcing app with [EventStoreDB]
- Login: `admin@pgadmin.org`, Password: `admin`
- To connect to server Use host: `postgres`, user: `postgres`, password: `Password12!`
4. Open, build and run `ECommerce.sln` solution.
- Swagger should be available at: http://localhost:5000/index.html
- Swagger should be available at: http://localhost:5000/index.html


## Overview
Expand All @@ -31,35 +31,35 @@ It uses:
- Stores events from Aggregate method results to EventStoreDB,
- Builds read models using [Subscription to `$all`](https://developers.eventstore.com/clients/grpc/subscribing-to-streams/#subscribing-to-all).
- Read models are stored as [Marten](https://martendb.io/) documents.
- App has Swagger and predefined [docker-compose](https://github.com/oskardudycz/EventSourcing.NetCore/pull/49/files#diff-bd9579f0d00fbcbca25416ada9698a7f38fdd91b710c1651e0849d56843a6b45) to run and play with samples.
- App has Swagger and predefined [docker-compose](./docker/docker-compose.yml) to run and play with samples.

## Write Model

- Most of the write model infrastructure was reused from other samples,
- Added new project `Core.EventStoreDB` for specific EventStoreDB code,
- Added [EventStoreDBRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Repository/EventStoreDBRepository.cs) repository to load and store aggregate state,
- Added separate [IProjection](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Projections/IProjection.cs) interface to handle the same way stream aggregation and materialised projections,
- Thanks to that added dedicated [AggregateStream](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Events/AggregateStreamExtensions.cs#L12) method for stream aggregation
- Added [EventStoreDBRepository](./Core/Core.EventStoreDB/Repository/EventStoreDBRepository.cs) repository to load and store aggregate state,
- Added separate [IProjection](./Core/Core/Projections/IProjection.cs) interface to handle the same way stream aggregation and materialised projections,
- Thanks to that added dedicated [AggregateStream](./Core/Core.EventStoreDB/Events/AggregateStreamExtensions.cs#L12) method for stream aggregation
- See [sample Aggregate](./Carts/Carts/Carts/Cart.cs)

## Read Model
- Read models are rebuilt with eventual consistency using subscribe to all EventStoreDB feature,
- Added hosted service [SubscribeToAllBackgroundWorker](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped.
- Added [ISubscriptionCheckpointRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Subscriptions/ISubscriptionCheckpointRepository.cs) for handling Subscription checkpointing.
- Added checkpointing to EventStoreDB stream with [EventStoreDBSubscriptionCheckpointRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs) and dummy in-memory checkpointer [InMemorySubscriptionCheckpointRepository](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Subscriptions/InMemorySubscriptionCheckpointRepository.cs),
- Added [MartenExternalProjection](https://github.com/oskardudycz/EventSourcing.NetCore/pull/49/files#diff-6d8dadf8ab81a9441836a5403632ef3616a1dc42788b5feae1c56a4f2321d4eeR12) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. Another (e.g. ElasticSearch, EntityFramework) can be implemented the same way.
- Added hosted service [SubscribeToAllBackgroundWorker](./Core/Core.EventStoreDB/Subscriptions/SubscribeToAllBackgroundWorker.cs) to handle subscribing to all. It handles checkpointing and simple retries if the connection was dropped.
- Added [ISubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/ISubscriptionCheckpointRepository.cs) for handling Subscription checkpointing.
- Added checkpointing to EventStoreDB stream with [EventStoreDBSubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionCheckpointRepository.cs) and dummy in-memory checkpointer [InMemorySubscriptionCheckpointRepository](./Core/Core.EventStoreDB/Subscriptions/InMemorySubscriptionCheckpointRepository.cs),
- Added [MartenExternalProjection](./Core/Core.Marten/ExternalProjections/MartenExternalProjection.cs) as a sample how to project with [`left-fold`](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) into external storage. Another (e.g. ElasticSearch, EntityFramework) can be implemented the same way.

## Tests
- Added sample of unit testing in [`Carts.Tests`](./Carts/Carts.Tests):
- [Aggregate unit tests](./Carts/Carts.Tests/Carts/InitCartTests.cs)
- [Command handler unit tests](./Carts/Carts.Tests/Carts/CommandHandlers/InitCardCommandHandlerTests.cs)
- [Aggregate unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartTests.cs)
- [Command handler unit tests](./Carts/Carts.Tests/Carts/InitializingCart/InitializeCartCommandHandlerTests.cs)
- Added sample of integration testing in [`Carts.Api.Tests`](./Carts/Carts.Api.Tests)
- [API integration tests](./Carts/Carts.Api.Tests/Carts/InitCartTests.cs)
- [API integration tests](./Carts/Carts.Api.Tests/Carts/InitializingCart/InitializeCartTests.cs)

## Other
- Added [EventTypeMapper](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Events/EventTypeMapper.cs) class to allow both convention-based mapping (by the .NET type name) and custom to handle event versioning,
- Added [StreamNameMapper](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core/Events/StreamNameMapper.cs) class for convention-based id (and optional tenant) mapping based on the stream type and module,
- IoC [registration helpers for EventStoreDB configuration](https://github.com/oskardudycz/EventSourcing.NetCore/blob/main/Core.EventStoreDB/Config.cs),
- Added [EventTypeMapper](./Core/Core/Events/EventTypeMapper.cs) class to allow both convention-based mapping (by the .NET type name) and custom to handle event versioning,
- Added [StreamNameMapper](./Core/Core/Events/StreamNameMapper.cs) class for convention-based id (and optional tenant) mapping based on the stream type and module,
- IoC [registration helpers for EventStoreDB configuration](./Core/Core.EventStoreDB/Config.cs),


## Trivia
Expand Down
6 changes: 3 additions & 3 deletions Sample/MeetingsManagement/MeetingsSearch.Api/appsettings.json
Expand Up @@ -6,9 +6,9 @@
},
"AllowedHosts": "*",

"elasticsearch": {
"index": "meetings",
"url": "http://localhost:9200/"
"Elasticsearch": {
"DefaultIndex": "meetings",
"Url": "http://localhost:9200/"
},

"KafkaConsumer": {
Expand Down
4 changes: 3 additions & 1 deletion Sample/MeetingsManagement/docker/docker-compose.yml
Expand Up @@ -29,7 +29,9 @@ services:
networks:
- esnetwork
kibana:
image: docker.elastic.co/kibana/kibana:6.3.0
image: docker.elastic.co/kibana/kibana:7.13.3
volumes:
- ./kibana.yml:/usr/share/kibana/config/kibana.yml
ports:
- "5601:5601"
networks:
Expand Down
@@ -1,5 +1,4 @@
using System;
using Tickets.Reservations;
using Tickets.Reservations.NumberGeneration;

namespace Tickets.Tests.Stubs.Reservations
Expand Down
Expand Up @@ -3,7 +3,6 @@
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Headers;
using Microsoft.Extensions.Primitives;
using Microsoft.Net.Http.Headers;

Expand Down

0 comments on commit aa65a44

Please sign in to comment.