Skip to content

Commit

Permalink
Wiring updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 22, 2022
1 parent 7477f2a commit 7626601
Show file tree
Hide file tree
Showing 10 changed files with 559 additions and 446 deletions.
30 changes: 16 additions & 14 deletions Sample/ECommerce.Equinox/ECommerce.Api/Program.fs
Expand Up @@ -6,10 +6,8 @@ open Microsoft.Extensions.DependencyInjection
open Serilog
open System

exception MissingArg of message : string with override this.Message = this.message

type Configuration(tryGet) =
inherit App.Configuration(tryGet)
inherit Args.Configuration(tryGet)

let [<Literal>] AppName = "ECommerce.Web"

Expand All @@ -20,25 +18,28 @@ module Args =
[<NoEquality; NoComparison>]
type Parameters =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<CliPrefix(CliPrefix.None); Last>] Cosmos of ParseResults<App.CosmosParameters>
| [<CliPrefix(CliPrefix.None); Last>] Dynamo of ParseResults<App.DynamoParameters>
| [<CliPrefix(CliPrefix.None); Last>] Esdb of ParseResults<App.EsdbParameters>
| [<AltCommandLine "-p"; Unique>] PrometheusPort of int
| [<CliPrefix(CliPrefix.None); Last>] Cosmos of ParseResults<Args.Cosmos.Parameters>
| [<CliPrefix(CliPrefix.None); Last>] Dynamo of ParseResults<Args.Dynamo.Parameters>
| [<CliPrefix(CliPrefix.None); Last>] Esdb of ParseResults<Args.Esdb.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose _ -> "request verbose logging."
| PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off (optional if environment variable PROMETHEUS_PORT specified)"
| Cosmos _ -> "specify CosmosDB input parameters"
| Dynamo _ -> "specify DynamoDB input parameters"
| Esdb _ -> "specify EventStore input parameters"
and [<NoComparison; NoEquality>] StoreArguments = Cosmos of App.CosmosArguments | Dynamo of App.DynamoArguments | Esdb of App.EsdbArguments
and [<NoComparison; NoEquality>] StoreArguments = Cosmos of Args.Cosmos.Arguments | Dynamo of Args.Dynamo.Arguments | Esdb of Args.Esdb.Arguments
and [<RequireQualifiedAccess>]
Arguments(c : Configuration, a : ParseResults<Parameters>) =
member val Verbose = a.Contains Parameters.Verbose
member val Verbose = a.Contains Verbose
member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort)
member val Store : StoreArguments =
match a.TryGetSubCommand() with
| Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (App.CosmosArguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (App.DynamoArguments (c, dynamo))
| Some (Parameters.Esdb es) -> StoreArguments.Esdb (App.EsdbArguments (c, es))
| _ -> App.missingArg "Must specify one of cosmos, dynamo or esdb for store"
| Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo))
| Some (Parameters.Esdb es) -> StoreArguments.Esdb (Args.Esdb.Arguments (c, es))
| _ -> Args.missingArg "Must specify one of cosmos, dynamo or esdb for store"
member x.Connect() =
let cache = Equinox.Cache (AppName, sizeMb = 10)
match x.Store with
Expand Down Expand Up @@ -72,6 +73,7 @@ type Logging() =
c .MinimumLevel.Debug()
.MinimumLevel.Override("Microsoft.AspNetCore", Serilog.Events.LogEventLevel.Warning)
.WriteTo.Sink(Equinox.CosmosStore.Prometheus.LogSink(customTags))
.WriteTo.Sink(Equinox.DynamoStore.Prometheus.LogSink(customTags))
.Enrich.FromLogContext()
.WriteTo.Console()

Expand All @@ -94,8 +96,8 @@ let main argv =
let metrics = Sinks.tags AppName |> Sinks.equinoxMetricsOnly
try Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.StoreVerbose).CreateLogger()
try run args; 0
with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2
with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
with MissingArg msg -> eprintfn $"%s{msg}"; 1
with Args.MissingArg msg -> eprintfn $"%s{msg}"; 1
| :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1
| e -> eprintfn $"Exception %s{e.Message}"; 1
7 changes: 4 additions & 3 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Ingester.fs
Expand Up @@ -6,7 +6,7 @@ open System
type Outcome = { added : int; notReady : int; dups : int }

/// Gathers stats based on the outcome of each Span processed for periodic emission
type Stats(log, statsInterval, stateInterval) =
type Stats(log, statsInterval, stateInterval, logExternalStats) =
inherit Propulsion.Streams.Stats<Outcome>(log, statsInterval, stateInterval)

let mutable added, notReady, dups = 0, 0, 0
Expand All @@ -20,10 +20,11 @@ type Stats(log, statsInterval, stateInterval) =
log.Information(exn, "Unhandled")

override _.DumpStats() =
base.DumpStats()
if added <> 0 || notReady <> 0 || dups <> 0 then
log.Information(" Added {added} Not Yet Shipped {notReady} Duplicates {dups}", added, notReady, dups)
log.Information("👉Added {added} Not Yet Shipped {notReady} Duplicates {dups}", added, notReady, dups)
added <- 0; notReady <- 0; dups <- 0
base.DumpStats()
logExternalStats log

module PipelineEvent =

Expand Down
41 changes: 22 additions & 19 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs
Expand Up @@ -6,7 +6,7 @@ open Serilog
open System

type Configuration(tryGet) =
inherit App.Configuration(tryGet)
inherit Args.Configuration(tryGet)

member _.BaseUri = base.get "API_BASE_URI"
member _.Group = base.get "API_CONSUMER_GROUP"
Expand All @@ -19,6 +19,7 @@ module Args =
[<NoEquality; NoComparison>]
type Parameters =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<AltCommandLine "-p"; Unique>] PrometheusPort of int

| [<AltCommandLine "-g"; Unique>] Group of string
| [<AltCommandLine "-f"; Unique>] BaseUri of string
Expand All @@ -27,12 +28,13 @@ module Args =
| [<AltCommandLine "-w"; Unique>] FcsDop of int
| [<AltCommandLine "-t"; Unique>] TicketsDop of int

| [<CliPrefix(CliPrefix.None); Unique; Last>] Cosmos of ParseResults<App.CosmosParameters>
| [<CliPrefix(CliPrefix.None); Unique; Last>] Dynamo of ParseResults<App.DynamoParameters>
| [<CliPrefix(CliPrefix.None); Last>] Esdb of ParseResults<App.EsdbParameters>
| [<CliPrefix(CliPrefix.None); Unique; Last>] Cosmos of ParseResults<Args.Cosmos.Parameters>
| [<CliPrefix(CliPrefix.None); Unique; Last>] Dynamo of ParseResults<Args.Dynamo.Parameters>
| [<CliPrefix(CliPrefix.None); Last>] Esdb of ParseResults<Args.Esdb.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose _ -> "request verbose logging."
| PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off (optional if environment variable PROMETHEUS_PORT specified)"
| Group _ -> "specify Api Consumer Group Id. (optional if environment variable API_CONSUMER_GROUP specified)"
| BaseUri _ -> "specify Api endpoint. (optional if environment variable API_BASE_URI specified)"
| MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 8."
Expand All @@ -43,9 +45,10 @@ module Args =
| Esdb _ -> "specify EventStore input parameters"

[<NoComparison; NoEquality>]
type StoreArguments = Cosmos of App.CosmosArguments | Dynamo of App.DynamoArguments | Esdb of App.EsdbArguments
type StoreArguments = Cosmos of Args.Cosmos.Arguments | Dynamo of Args.Dynamo.Arguments | Esdb of Args.Esdb.Arguments
type Arguments(c : Configuration, a : ParseResults<Parameters>) =
member val Verbose = a.Contains Parameters.Verbose
member val Verbose = a.Contains Verbose
member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort)
member val SourceId = a.TryGetResult Group |> Option.defaultWith (fun () -> c.Group) |> Propulsion.Feed.SourceId.parse
member val BaseUri = a.TryGetResult BaseUri |> Option.defaultWith (fun () -> c.BaseUri) |> Uri
member val MaxReadAhead = a.GetResult(MaxReadAhead,8)
Expand All @@ -59,23 +62,23 @@ module Args =
member val CacheSizeMb = 10
member val Store : StoreArguments =
match a.TryGetSubCommand() with
| Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (App.CosmosArguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (App.DynamoArguments (c, dynamo))
| Some (Parameters.Esdb es) -> StoreArguments.Esdb (App.EsdbArguments (c, es))
| _ -> App.missingArg "Must specify one of cosmos, dynamo or esdb for store"
| Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo))
| Some (Parameters.Esdb es) -> StoreArguments.Esdb (Args.Esdb.Arguments (c, es))
| _ -> Args.missingArg "Must specify one of cosmos, dynamo or esdb for store"
member x.Connect() =
let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb)
match x.Store with
| StoreArguments.Cosmos ca ->
let context = ca.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
Config.Store.Cosmos (context, cache)
Config.Store.Cosmos (context, cache), Equinox.CosmosStore.Core.Log.InternalMetrics.dump
| StoreArguments.Dynamo da ->
let context = da.Connect() |> DynamoStoreContext.create
Config.Store.Dynamo (context, cache)
Config.Store.Dynamo (context, cache), Equinox.DynamoStore.Core.Log.InternalMetrics.dump
| StoreArguments.Esdb ea ->
let context = ea.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create
Config.Store.Esdb (context, cache)
member x.CheckpointerIn(store) : Propulsion.Feed.IFeedCheckpointStore =
Config.Store.Esdb (context, cache), Equinox.EventStoreDb.Log.InternalMetrics.dump
member x.CheckpointStore(store) : Propulsion.Feed.IFeedCheckpointStore =
match store with
| Config.Store.Cosmos (context, cache) ->
Propulsion.Feed.ReaderCheckpoint.CosmosStore.create Config.log (x.ConsumerGroupName, x.CheckpointInterval) (context, cache)
Expand All @@ -100,15 +103,15 @@ module Args =
Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv)

let build (args : Args.Arguments) =
let store = args.Connect()
let store, dumpMetrics = args.Connect()

let log = Log.forGroup args.SourceId // needs to have a `group` tag for Propulsion.Streams Prometheus metrics
let sink =
let handle = Ingester.handle args.TicketsDop
let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval)
let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = dumpMetrics)
Propulsion.Streams.StreamsProjector.Start(log, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval)
let pumpSource =
let checkpoints = args.CheckpointerIn(store)
let checkpoints = args.CheckpointStore(store)
let feed = ApiClient.TicketsFeed args.BaseUri
let source =
Propulsion.Feed.FeedSource(
Expand All @@ -129,8 +132,8 @@ let main argv =
try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName)
Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.StoreVerbose).CreateLogger()
try run args |> Async.RunSynchronously
with e when not (e :? App.MissingArg) -> Log.Fatal(e, "Exiting"); 2
with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
with App.MissingArg msg -> eprintfn "%s" msg; 1
with Args.MissingArg msg -> eprintfn "%s" msg; 1
| :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
| e -> eprintf "Exception %s" e.Message; 1
Expand Up @@ -16,8 +16,8 @@
<PackageReference Include="Equinox.DynamoStore.Prometheus" Version="4.0.0-beta.4.2" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.13.0-beta.2" />
<PackageReference Include="Propulsion.EventStoreDb" Version="2.13.0-2.13.0-beta.2" />
<PackageReference Include="Propulsion.DynamoStore" Version="2.13.0-2.13.0-beta.2" />
<PackageReference Include="Propulsion.EventStoreDb" Version="2.13.0-beta.2" />
<PackageReference Include="Propulsion.DynamoStore" Version="2.13.0-beta.2" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down

0 comments on commit 7626601

Please sign in to comment.