From 81d7c87d3e52b06c8f1bdf670197587bf3a29179 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 3 Jun 2020 15:37:23 +0100 Subject: [PATCH] Cleanup eqxShipping (#71) --- CHANGELOG.md | 3 + Directory.Build.props | 2 +- Directory.Build.targets | 2 +- dotnet-templates.sln | 3 +- .../Domain/FinalizationProcessManager.fs | 1 + .../Domain/FinalizationTransaction.fs | 8 +-- equinox-shipping/Domain/Shipment.fs | 4 +- equinox-shipping/Watchdog/Handler.fs | 5 ++ equinox-shipping/Watchdog/Program.fs | 55 ++++++++++--------- 9 files changed, 47 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1c2bb453..a47f0e2db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added ### Changed + +- Cleanup `eqxShipping` [#71](https://github.com/jet/dotnet-templates/pull/71) + ### Removed ### Fixed diff --git a/Directory.Build.props b/Directory.Build.props index 25a9f9523..22417ed9c 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,4 +1,4 @@ - + @jet @bartelink contributors Jet.com diff --git a/Directory.Build.targets b/Directory.Build.targets index 990271990..13fa1ee4a 100644 --- a/Directory.Build.targets +++ b/Directory.Build.targets @@ -1,4 +1,4 @@ - + $(MinVerMajor).$(MinVerMinor).$(MinVerPatch)-pr.$(BUILD_PR) diff --git a/dotnet-templates.sln b/dotnet-templates.sln index 4c3c61e56..5859ea49e 100644 --- a/dotnet-templates.sln +++ b/dotnet-templates.sln @@ -80,10 +80,11 @@ EndProjectSection EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Templates", "src\Equinox.Templates\Equinox.Templates.fsproj", "{8C92B728-85A5-4231-863A-E4236E46CC36}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "equinox-shipping", "equinox-shipping", "{DAE9E2B9-EDA2-4064-B0CE-FD5294549374}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "eqxShipping", "eqxShipping", "{DAE9E2B9-EDA2-4064-B0CE-FD5294549374}" ProjectSection(SolutionItems) = preProject equinox-shipping\.template.config\template.json = equinox-shipping\.template.config\template.json equinox-shipping\README.md = equinox-shipping\README.md + equinox-shipping\Shipping.sln = equinox-shipping\Shipping.sln EndProjectSection EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "equinox-shipping\Domain\Domain.fsproj", "{7B96FCF8-0BB5-4494-A143-628882A6E50A}" diff --git a/equinox-shipping/Domain/FinalizationProcessManager.fs b/equinox-shipping/Domain/FinalizationProcessManager.fs index a73e0ee33..5e2a7d6e7 100644 --- a/equinox-shipping/Domain/FinalizationProcessManager.fs +++ b/equinox-shipping/Domain/FinalizationProcessManager.fs @@ -50,6 +50,7 @@ type Service // Caller should generate the TransactionId via a deterministic hash of the shipmentIds in order to ensure idempotency (and sharing of fate) of identical requests member __.TryFinalizeContainer(transactionId, containerId, shipmentIds) : Async = + if Array.isEmpty shipmentIds then invalidArg "shipmentIds" "must not be empty" let initialRequest = Events.FinalizationRequested {| container = containerId; shipments = shipmentIds |} execute transactionId (Some initialRequest) diff --git a/equinox-shipping/Domain/FinalizationTransaction.fs b/equinox-shipping/Domain/FinalizationTransaction.fs index 8adabeb48..b100635cb 100644 --- a/equinox-shipping/Domain/FinalizationTransaction.fs +++ b/equinox-shipping/Domain/FinalizationTransaction.fs @@ -16,7 +16,7 @@ module Events = | AssignmentCompleted /// Signifies all processing for the transaction has completed - the Watchdog looks for this event | Completed - | Snapshotted of State + | Snapshotted of {| state : State |} interface TypeShape.UnionContract.IUnionContract and [)>] @@ -32,7 +32,7 @@ module Events = /// Used by the Watchdog to infer whether a given event signifies that the processing has reached a terminal state let isTerminalEvent (encoded : FsCodec.ITimelineEvent<_>) = - encoded.EventType = "Completed" // TODO nameof("Completed") + encoded.EventType = "Completed" // TODO nameof(Completed) module Fold = @@ -57,13 +57,13 @@ module Fold = | State.Reverting _state, Events.Completed -> State.Completed {| success = false |} | State.Assigning state, Events.AssignmentCompleted -> State.Assigned {| container = state.container; shipments = state.shipments |} | State.Assigned _, Events.Completed -> State.Completed {| success = true |} - | _, Events.Snapshotted state -> state + | _, Events.Snapshotted state -> state.state // this shouldn't happen, but, if we did produce invalid events, we'll just ignore them | state, _ -> state let fold : State -> Events.Event seq -> State = Seq.fold evolve let isOrigin = function Events.Snapshotted _ -> true | _ -> false - let toSnapshot state = Events.Snapshotted state + let toSnapshot state = Events.Snapshotted {| state = state |} [] type Action = diff --git a/equinox-shipping/Domain/Shipment.fs b/equinox-shipping/Domain/Shipment.fs index 70cec0858..cbba0ba85 100644 --- a/equinox-shipping/Domain/Shipment.fs +++ b/equinox-shipping/Domain/Shipment.fs @@ -23,8 +23,8 @@ module Fold = let evolve (state: State) = function | Events.Reserved event -> { reservation = Some event.transaction; association = None } | Events.Revoked -> initial - | Events.Assigned event -> { state with association = Some event.container } - | Events.Snapshotted event -> { reservation = event.reservation; association = event.association } + | Events.Assigned event -> { state with association = Some event.container } + | Events.Snapshotted event -> { reservation = event.reservation; association = event.association } let fold : State -> Events.Event seq -> State = Seq.fold evolve let isOrigin = function Events.Snapshotted _ -> true | _ -> false diff --git a/equinox-shipping/Watchdog/Handler.fs b/equinox-shipping/Watchdog/Handler.fs index 55f5ec86e..108b7a2a4 100644 --- a/equinox-shipping/Watchdog/Handler.fs +++ b/equinox-shipping/Watchdog/Handler.fs @@ -10,6 +10,7 @@ type Stats(log, statsInterval, stateInterval) = inherit Propulsion.Streams.Projector.Stats(log, statsInterval, stateInterval) let mutable completed, deferred, failed, succeeded = 0, 0, 0, 0 + member val StatsInterval = statsInterval override __.HandleOk res = res |> function | Outcome.Completed -> completed <- completed + 1 @@ -25,6 +26,10 @@ type Stats(log, statsInterval, stateInterval) = open Shipping.Domain +let isRelevant = function + | FinalizationTransaction.Match _ -> true + | _ -> false + let handle (processingTimeout : TimeSpan) (driveTransaction : Shipping.Domain.TransactionId -> Async) diff --git a/equinox-shipping/Watchdog/Program.fs b/equinox-shipping/Watchdog/Program.fs index 3db09d4f7..d829f6e5d 100644 --- a/equinox-shipping/Watchdog/Program.fs +++ b/equinox-shipping/Watchdog/Program.fs @@ -66,17 +66,17 @@ module Args = | Some (Parameters.Cosmos cosmos) -> CosmosSourceArguments cosmos | _ -> raise (MissingArg "Must specify cosmos for Src") member x.SourceParams() = - let srcC = x.Source - let auxColl = - match srcC.LeaseContainer with - | None -> { database = srcC.Database; container = srcC.Container + "-aux" } - | Some sc -> { database = srcC.Database; container = sc } - Log.Information("Max read backlog: {maxReadAhead}", x.MaxReadAhead) - Log.Information("Processing Lease {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}", - x.ConsumerGroupName, auxColl.database, auxColl.container, srcC.MaxDocuments) - if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") - srcC.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) - (srcC, (auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)) + let srcC = x.Source + let auxColl = + match srcC.LeaseContainer with + | None -> { database = srcC.Database; container = srcC.Container + "-aux" } + | Some sc -> { database = srcC.Database; container = sc } + Log.Information("Max read backlog: {maxReadAhead}", x.MaxReadAhead) + Log.Information("Processing Lease {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}", + x.ConsumerGroupName, auxColl.database, auxColl.container, srcC.MaxDocuments) + if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") + srcC.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) + (srcC, (auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)) and [] CosmosSourceParameters = | [] FromTail | [] MaxDocuments of int @@ -198,6 +198,11 @@ module Logging = let [] AppName = "Watchdog" +let createSink log (processingTimeout, stats : Handler.Stats) (maxReadAhead, maxConcurrentStreams) driveTransaction + : Propulsion.ProjectorPipeline> = + let handle = Handler.handle processingTimeout driveTransaction + Propulsion.Streams.StreamsProjector.Start(log, maxReadAhead, maxConcurrentStreams, handle, stats, stats.StatsInterval) + open Shipping.Domain let createProcessManager maxDop (context, cache) = @@ -208,34 +213,30 @@ let createProcessManager maxDop (context, cache) = let build (args : Args.Arguments) = let (source, (aux, leaseId, startFromTail, maxDocuments, lagFrequency)) = args.SourceParams() - let monitoredDiscovery, monitored, monitoredConnector = source.BuildConnectionDetails() // Connect to the Target CosmosDB, wire to Process Manager - let cosmos = source.Cosmos - let (discovery, database, container, connector) = cosmos.BuildConnectionDetails() - let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously - let cache = Equinox.Cache(AppName, sizeMb=10) - let context = Equinox.Cosmos.Context(connection, database, container) - let processManager = createProcessManager args.ProcessManagerMaxDop (context, cache) + let processManager = + let (discovery, database, container, connector) = source.Cosmos.BuildConnectionDetails() + let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously + let cache = Equinox.Cache(AppName, sizeMb=10) + let context = Equinox.Cosmos.Context(connection, database, container) + createProcessManager args.ProcessManagerMaxDop (context, cache) - // Hook the Watchdog Handler to the Process Manager - let isRelevantToWatchdogHandler = function - | FinalizationTransaction.Match _ -> true - | _ -> false - let handle = Shipping.Watchdog.Handler.handle args.ProcessingTimeout processManager.Drive - let stats = Shipping.Watchdog.Handler.Stats(Log.Logger, statsInterval=args.StatsInterval, stateInterval=args.StateInterval) - let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats, args.StatsInterval) + let sink = + let stats = Handler.Stats(Serilog.Log.Logger, statsInterval=args.StatsInterval, stateInterval=args.StateInterval) + createSink Log.Logger (args.ProcessingTimeout, stats) (args.MaxReadAhead, args.MaxConcurrentStreams) processManager.Drive // Wire up the CFP to feed in the items let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = docs |> Seq.collect EquinoxCosmosParser.enumStreamEvents - |> Seq.filter (fun e -> isRelevantToWatchdogHandler e.stream) + |> Seq.filter (fun e -> Handler.isRelevant e.stream) let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems) let runSource = + let monitoredDiscovery, monitored, monitoredConnector = source.BuildConnectionDetails() CosmosSource.Run(Log.Logger, monitoredConnector.CreateClient(AppName, monitoredDiscovery), monitored, aux, leaseId, startFromTail, createObserver, - ?maxDocuments = maxDocuments, ?lagReportFreq = lagFrequency) + ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency) sink, runSource let run args =