Skip to content

Commit

Permalink
Update Jet Infra deps
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 3, 2022
1 parent 0d0a2d7 commit 76ce151
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 123 deletions.
Expand Up @@ -18,11 +18,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="3.0.5" />
<PackageReference Include="Equinox.EventStore" Version="3.0.5" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.5" />
<PackageReference Include="FsCodec.SystemTextJson" Version="2.3.0-rc.2" />
<PackageReference Include="Propulsion" Version="2.12.0-rc.3" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.6" />
<PackageReference Include="Equinox.EventStore" Version="3.0.6" />
<PackageReference Include="Equinox.MemoryStore" Version="3.0.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="2.3.1" />
<PackageReference Include="Propulsion" Version="2.12.1" />
</ItemGroup>

</Project>
10 changes: 5 additions & 5 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/ApiClient.fs
Expand Up @@ -3,7 +3,7 @@ module ECommerce.FeedConsumer.ApiClient
open FSharp.UMX
open System.Net.Http

open FeedConsumerTemplate.Domain
open ECommerce.FeedConsumer.Domain

(* The feed presents a Tranche (series of epochs) per FC *)

Expand Down Expand Up @@ -34,7 +34,7 @@ type SliceDto = { closed : bool; tickets : ItemDto[]; position : TicketsCheckpoi
type Session(client: HttpClient) =

member _.Send(req : HttpRequestMessage) : Async<HttpResponseMessage> =
client.Send(req)
client.Send2(req)

type TicketsClient(session: Session) =

Expand All @@ -43,20 +43,20 @@ type TicketsClient(session: Session) =
member _.ActiveFcs() : Async<FcId[]> = async {
let request = HttpReq.get () |> HttpReq.withPath basePath
let! response = session.Send request
let! body = response |> HttpRes.deserializeOkJsonNet<TicketsTranchesDto>
let! body = response |> HttpRes.deserializeOkStj<TicketsTranchesDto>
return [| for f in body.activeEpochs -> f.fc |]
}

member _.ReadPage(fc : FcId, index : int) : Async<SliceDto> = async {
let request = HttpReq.post () |> HttpReq.withPathf "%s/%O/%d" basePath fc index
let! response = session.Send request
return! response |> HttpRes.deserializeOkJsonNet<SliceDto>
return! response |> HttpRes.deserializeOkStj<SliceDto>
}

member _.Poll(fc : FcId, checkpoint: TicketsCheckpoint) : Async<SliceDto> = async {
let request = HttpReq.create () |> HttpReq.withPathf "%s/%O/slice/%O" basePath fc checkpoint
let! response = session.Send request
return! response |> HttpRes.deserializeOkJsonNet<SliceDto>
return! response |> HttpRes.deserializeOkStj<SliceDto>
}

type Session with
Expand Down
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<WarningLevel>5</WarningLevel>
</PropertyGroup>

Expand All @@ -14,15 +14,9 @@
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.5" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.5" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.3" />
<PackageReference Include="Propulsion.Feed" Version="2.12.0-rc.3" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\ECommerce.Domain\ECommerce.Domain.fsproj" />
<ProjectReference Include="..\ECommerce.Infrastructure\ECommerce.Infrastructure.fsproj" />
</ItemGroup>

</Project>
98 changes: 8 additions & 90 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Infrastructure.fs
@@ -1,95 +1,12 @@
[<AutoOpen>]
module ECommerce.FeedConsumer.Infrastructure

open Serilog
open System
open System.Text

module Config =

let log = Log.ForContext("isMetric", true)

module EnvVar =

let tryGet varName : string option = Environment.GetEnvironmentVariable varName |> Option.ofObj

module Log =

/// Allow logging to filter out emission of log messages whose information is also surfaced as metrics
let isStoreMetrics e = Filters.Matching.WithProperty("isMetric").Invoke e

type Equinox.CosmosStore.CosmosStoreConnector with

member private x.LogConfiguration(connectionName, databaseId, containerId) =
let o = x.Options
let timeout, retries429, timeout429 = o.RequestTimeout, o.MaxRetryAttemptsOnRateLimitedRequests, o.MaxRetryWaitTimeOnRateLimitedRequests
Log.Information("CosmosDb {name} {mode} {endpointUri} timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s",
connectionName, o.ConnectionMode, x.Endpoint, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds)
Log.Information("CosmosDb {name} Database {database} Container {container}",
connectionName, databaseId, containerId)

/// Connect a CosmosStoreClient, including warming up
member x.ConnectStore(connectionName, databaseId, containerId) =
x.LogConfiguration(connectionName, databaseId, containerId)
Equinox.CosmosStore.CosmosStoreClient.Connect(x.CreateAndInitialize, databaseId, containerId)

module CosmosStoreContext =

/// Create with default packing and querying policies. Search for other `module CosmosStoreContext` impls for custom variations
let create (storeClient : Equinox.CosmosStore.CosmosStoreClient) =
let maxEvents = 256
Equinox.CosmosStore.CosmosStoreContext(storeClient, tipMaxEvents=maxEvents)

/// Equinox and Propulsion provide metrics as properties in log emissions
/// These helpers wire those to pass through virtual Log Sinks that expose them as Prometheus metrics.
module Sinks =

let tags appName = ["app", appName]

let equinoxMetricsOnly tags (l : LoggerConfiguration) =
l.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink())
.WriteTo.Sink(Equinox.CosmosStore.Prometheus.LogSink(tags))

let equinoxAndPropulsionConsumerMetrics tags group (l : LoggerConfiguration) =
l |> equinoxMetricsOnly tags
|> fun l -> l.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags, group))

let equinoxAndPropulsionFeedConsumerMetrics tags source (l : LoggerConfiguration) =
l |> equinoxAndPropulsionConsumerMetrics tags (Propulsion.Feed.SourceId.toString source)
|> fun l -> l.WriteTo.Sink(Propulsion.Feed.Prometheus.LogSink(tags))

let console (configuration : LoggerConfiguration) =
let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj} {NewLine}{Exception}"
configuration.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)

[<System.Runtime.CompilerServices.Extension>]
type Logging() =

[<System.Runtime.CompilerServices.Extension>]
static member Configure(configuration : LoggerConfiguration, ?verbose) =
configuration
.Destructure.FSharpTypes()
.Enrich.FromLogContext()
|> fun c -> if verbose = Some true then c.MinimumLevel.Debug() else c

[<System.Runtime.CompilerServices.Extension>]
static member private Sinks(configuration : LoggerConfiguration, configureMetricsSinks, configureConsoleSink, ?isMetric) =
let configure (a : Configuration.LoggerSinkConfiguration) : unit =
a.Logger(configureMetricsSinks >> ignore) |> ignore // unconditionally feed all log events to the metrics sinks
a.Logger(fun l -> // but filter what gets emitted to the console sink
let l = match isMetric with None -> l | Some predicate -> l.Filter.ByExcluding(Func<Serilog.Events.LogEvent, bool> predicate)
configureConsoleSink l |> ignore)
|> ignore
configuration.WriteTo.Async(bufferSize=65536, blockWhenFull=true, configure=Action<_> configure)

[<System.Runtime.CompilerServices.Extension>]
static member Sinks(configuration : LoggerConfiguration, configureMetricsSinks, verboseStore) =
configuration.Sinks(configureMetricsSinks, Sinks.console, ?isMetric = if verboseStore then None else Some Log.isStoreMetrics)

type Async with
/// Re-raise an exception so that the current stacktrace is preserved
static member Raise(e : #exn) : Async<'T> = Async.FromContinuations (fun (_,ec,_) -> ec e)

open System.Text

type StringBuilder with
member sb.Appendf fmt = Printf.ksprintf (ignore << sb.Append) fmt
member sb.Appendfn fmt = Printf.ksprintf (ignore << sb.AppendLine) fmt
Expand All @@ -99,6 +16,7 @@ type StringBuilder with
builder instance
instance.ToString()

open System
open System.Net
open System.Net.Http
open System.Runtime.Serialization
Expand Down Expand Up @@ -151,7 +69,7 @@ type HttpClient with
/// Drop-in replacement for HttpClient.SendAsync which addresses known timeout issues
/// </summary>
/// <param name="msg">HttpRequestMessage to be submitted.</param>
member client.Send(msg : HttpRequestMessage) = async {
member client.Send2(msg : HttpRequestMessage) = async {
let! ct = Async.CancellationToken
try return! client.SendAsync(msg, ct) |> Async.AwaitTask
// address https://github.com/dotnet/corefx/issues/20296
Expand Down Expand Up @@ -253,13 +171,13 @@ type HttpResponseMessage with

module HttpRes =

let codec = Config.EventCodec.forUnion<Event>
// let codec = ECommerce.Domain.Config.EventCodec.forUnion<Event>
let serdes = FsCodec.SystemTextJson.Serdes(FsCodec.SystemTextJson.Options.Create())

/// Deserialize body using default Json.Net profile - throw with content details if StatusCode is unexpected or decoding fails
let deserializeExpectedJsonNet<'t> expectedStatusCode (res : HttpResponseMessage) =
let deserializeExpectedStj<'t> expectedStatusCode (res : HttpResponseMessage) =
res.Interpret(expectedStatusCode, serdes.Deserialize<'t>)

/// Deserialize body using default Json.Net profile - throw with content details if StatusCode is not OK or decoding fails
let deserializeOkJsonNet<'t> =
deserializeExpectedJsonNet<'t> HttpStatusCode.OK
let deserializeOkStj<'t> =
deserializeExpectedStj<'t> HttpStatusCode.OK
@@ -1,7 +1,7 @@
module ECommerce.FeedConsumer.Ingester

open ECommerce.FeedConsumer.Domain
open System
open FeedConsumerTemplate.Domain

type Outcome = { added : int; notReady : int; dups : int }

Expand Down
11 changes: 7 additions & 4 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs
@@ -1,5 +1,7 @@
module ECommerce.FeedConsumer.Program

open ECommerce.Infrastructure // ConnectStore etc
open ECommerce.Domain // Config etc
open Serilog
open System

Expand Down Expand Up @@ -101,16 +103,17 @@ let build (args : Args.Arguments) =
let cache = Equinox.Cache (AppName, sizeMb = 10)
let context = args.Cosmos.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create

let log = Log.forGroup args.SourceId // needs to have a `group` tag for Propulsion.Streams Prometheus metrics
let sink =
let handle = Ingester.handle args.TicketsDop
let stats = Ingester.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
Propulsion.Streams.StreamsProjector.Start(Log.Logger, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval)
let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval)
Propulsion.Streams.StreamsProjector.Start(log, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval)
let pumpSource =
let checkpoints = Propulsion.Feed.ReaderCheckpoint.CosmosStore.create Config.log (context, cache)
let feed = ApiClient.TicketsFeed args.BaseUri
let source =
Propulsion.Feed.FeedSource(
Log.Logger, args.StatsInterval, args.SourceId, args.TailSleepInterval,
log, args.StatsInterval, args.SourceId, args.TailSleepInterval,
checkpoints, args.CheckpointInterval, feed.Poll, sink)
source.Pump feed.ReadTranches
sink, pumpSource
Expand All @@ -124,7 +127,7 @@ let run args = async {
[<EntryPoint>]
let main argv =
try let args = Args.parse EnvVar.tryGet argv
try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName) args.SourceId
try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName)
Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger()
try run args |> Async.RunSynchronously
with e when not (e :? MissingArg) -> Log.Fatal(e, "Exiting"); 2
Expand Down
Expand Up @@ -12,10 +12,11 @@
<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.5" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.6" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.3" />
<PackageReference Include="Propulsion.EventStore" Version="2.12.0-rc.3" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.1" />
<PackageReference Include="Propulsion.EventStore" Version="2.12.1" />
<PackageReference Include="Propulsion.Feed" Version="2.12.1" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down
Expand Up @@ -7,6 +7,7 @@ open System
module Log =

let isStoreMetrics x = Serilog.Filters.Matching.WithProperty("isMetric").Invoke x
let forGroup group = Log.ForContext("group", group)

module EnvVar =

Expand Down Expand Up @@ -65,14 +66,18 @@ module Sinks =
l.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink())
.WriteTo.Sink(Equinox.CosmosStore.Prometheus.LogSink(tags))

let equinoxAndPropulsionConsumerMetrics tags group (l : LoggerConfiguration) =
let equinoxAndPropulsionConsumerMetrics tags (l : LoggerConfiguration) =
l |> equinoxMetricsOnly tags
|> fun l -> l.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags, group))
|> fun l -> l.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags))

let equinoxAndPropulsionCosmosConsumerMetrics tags group (l : LoggerConfiguration) =
l |> equinoxAndPropulsionConsumerMetrics tags group
let equinoxAndPropulsionCosmosConsumerMetrics tags (l : LoggerConfiguration) =
l |> equinoxAndPropulsionConsumerMetrics tags
|> fun l -> l.WriteTo.Sink(Propulsion.CosmosStore.Prometheus.LogSink(tags))

let equinoxAndPropulsionFeedConsumerMetrics tags (l : LoggerConfiguration) =
l |> equinoxAndPropulsionConsumerMetrics tags
|> fun l -> l.WriteTo.Sink(Propulsion.Feed.Prometheus.LogSink(tags))

let console (configuration : LoggerConfiguration) =
let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj} {NewLine}{Exception}"
configuration.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t)
Expand Down

0 comments on commit 76ce151

Please sign in to comment.