Skip to content

Commit

Permalink
Cleanup eqxShipping (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jun 3, 2020
1 parent 765bf47 commit 81d7c87
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- Cleanup `eqxShipping` [#71](https://github.com/jet/dotnet-templates/pull/71)

### Removed
### Fixed

Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project ToolsVersion="15.0">
<Project>
<PropertyGroup>
<Authors>@jet @bartelink contributors</Authors>
<Company>Jet.com</Company>
Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project ToolsVersion="15.0">
<Project>
<Target Name="ComputePackageVersion" AfterTargets="MinVer" Condition=" '$(BUILD_PR)' != '' AND '$(BUILD_PR)' != '%24(SYSTEM.PULLREQUEST.PULLREQUESTNUMBER)' ">
<PropertyGroup>
<PackageVersion>$(MinVerMajor).$(MinVerMinor).$(MinVerPatch)-pr.$(BUILD_PR)</PackageVersion>
Expand Down
3 changes: 2 additions & 1 deletion dotnet-templates.sln
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ EndProjectSection
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Templates", "src\Equinox.Templates\Equinox.Templates.fsproj", "{8C92B728-85A5-4231-863A-E4236E46CC36}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "equinox-shipping", "equinox-shipping", "{DAE9E2B9-EDA2-4064-B0CE-FD5294549374}"
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "eqxShipping", "eqxShipping", "{DAE9E2B9-EDA2-4064-B0CE-FD5294549374}"
ProjectSection(SolutionItems) = preProject
equinox-shipping\.template.config\template.json = equinox-shipping\.template.config\template.json
equinox-shipping\README.md = equinox-shipping\README.md
equinox-shipping\Shipping.sln = equinox-shipping\Shipping.sln
EndProjectSection
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "equinox-shipping\Domain\Domain.fsproj", "{7B96FCF8-0BB5-4494-A143-628882A6E50A}"
Expand Down
1 change: 1 addition & 0 deletions equinox-shipping/Domain/FinalizationProcessManager.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Service

// Caller should generate the TransactionId via a deterministic hash of the shipmentIds in order to ensure idempotency (and sharing of fate) of identical requests
member __.TryFinalizeContainer(transactionId, containerId, shipmentIds) : Async<bool> =
if Array.isEmpty shipmentIds then invalidArg "shipmentIds" "must not be empty"
let initialRequest = Events.FinalizationRequested {| container = containerId; shipments = shipmentIds |}
execute transactionId (Some initialRequest)

Expand Down
8 changes: 4 additions & 4 deletions equinox-shipping/Domain/FinalizationTransaction.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module Events =
| AssignmentCompleted
/// Signifies all processing for the transaction has completed - the Watchdog looks for this event
| Completed
| Snapshotted of State
| Snapshotted of {| state : State |}
interface TypeShape.UnionContract.IUnionContract

and [<Newtonsoft.Json.JsonConverter(typeof<FsCodec.NewtonsoftJson.UnionConverter>)>]
Expand All @@ -32,7 +32,7 @@ module Events =

/// Used by the Watchdog to infer whether a given event signifies that the processing has reached a terminal state
let isTerminalEvent (encoded : FsCodec.ITimelineEvent<_>) =
encoded.EventType = "Completed" // TODO nameof("Completed")
encoded.EventType = "Completed" // TODO nameof(Completed)

module Fold =

Expand All @@ -57,13 +57,13 @@ module Fold =
| State.Reverting _state, Events.Completed -> State.Completed {| success = false |}
| State.Assigning state, Events.AssignmentCompleted -> State.Assigned {| container = state.container; shipments = state.shipments |}
| State.Assigned _, Events.Completed -> State.Completed {| success = true |}
| _, Events.Snapshotted state -> state
| _, Events.Snapshotted state -> state.state
// this shouldn't happen, but, if we did produce invalid events, we'll just ignore them
| state, _ -> state
let fold : State -> Events.Event seq -> State = Seq.fold evolve

let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let toSnapshot state = Events.Snapshotted state
let toSnapshot state = Events.Snapshotted {| state = state |}

[<RequireQualifiedAccess>]
type Action =
Expand Down
4 changes: 2 additions & 2 deletions equinox-shipping/Domain/Shipment.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ module Fold =
let evolve (state: State) = function
| Events.Reserved event -> { reservation = Some event.transaction; association = None }
| Events.Revoked -> initial
| Events.Assigned event -> { state with association = Some event.container }
| Events.Snapshotted event -> { reservation = event.reservation; association = event.association }
| Events.Assigned event -> { state with association = Some event.container }
| Events.Snapshotted event -> { reservation = event.reservation; association = event.association }
let fold : State -> Events.Event seq -> State = Seq.fold evolve

let isOrigin = function Events.Snapshotted _ -> true | _ -> false
Expand Down
5 changes: 5 additions & 0 deletions equinox-shipping/Watchdog/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Projector.Stats<Outcome>(log, statsInterval, stateInterval)

let mutable completed, deferred, failed, succeeded = 0, 0, 0, 0
member val StatsInterval = statsInterval

override __.HandleOk res = res |> function
| Outcome.Completed -> completed <- completed + 1
Expand All @@ -25,6 +26,10 @@ type Stats(log, statsInterval, stateInterval) =

open Shipping.Domain

let isRelevant = function
| FinalizationTransaction.Match _ -> true
| _ -> false

let handle
(processingTimeout : TimeSpan)
(driveTransaction : Shipping.Domain.TransactionId -> Async<bool>)
Expand Down
55 changes: 28 additions & 27 deletions equinox-shipping/Watchdog/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ module Args =
| Some (Parameters.Cosmos cosmos) -> CosmosSourceArguments cosmos
| _ -> raise (MissingArg "Must specify cosmos for Src")
member x.SourceParams() =
let srcC = x.Source
let auxColl =
match srcC.LeaseContainer with
| None -> { database = srcC.Database; container = srcC.Container + "-aux" }
| Some sc -> { database = srcC.Database; container = sc }
Log.Information("Max read backlog: {maxReadAhead}", x.MaxReadAhead)
Log.Information("Processing Lease {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, auxColl.database, auxColl.container, srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds))
(srcC, (auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency))
let srcC = x.Source
let auxColl =
match srcC.LeaseContainer with
| None -> { database = srcC.Database; container = srcC.Container + "-aux" }
| Some sc -> { database = srcC.Database; container = sc }
Log.Information("Max read backlog: {maxReadAhead}", x.MaxReadAhead)
Log.Information("Processing Lease {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, auxColl.database, auxColl.container, srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds))
(srcC, (auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency))
and [<NoEquality; NoComparison>] CosmosSourceParameters =
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-md"; Unique>] MaxDocuments of int
Expand Down Expand Up @@ -198,6 +198,11 @@ module Logging =

let [<Literal>] AppName = "Watchdog"

let createSink log (processingTimeout, stats : Handler.Stats) (maxReadAhead, maxConcurrentStreams) driveTransaction
: Propulsion.ProjectorPipeline<Propulsion.Ingestion.Ingester<_, _>> =
let handle = Handler.handle processingTimeout driveTransaction
Propulsion.Streams.StreamsProjector.Start(log, maxReadAhead, maxConcurrentStreams, handle, stats, stats.StatsInterval)

open Shipping.Domain

let createProcessManager maxDop (context, cache) =
Expand All @@ -208,34 +213,30 @@ let createProcessManager maxDop (context, cache) =

let build (args : Args.Arguments) =
let (source, (aux, leaseId, startFromTail, maxDocuments, lagFrequency)) = args.SourceParams()
let monitoredDiscovery, monitored, monitoredConnector = source.BuildConnectionDetails()

// Connect to the Target CosmosDB, wire to Process Manager
let cosmos = source.Cosmos
let (discovery, database, container, connector) = cosmos.BuildConnectionDetails()
let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously
let cache = Equinox.Cache(AppName, sizeMb=10)
let context = Equinox.Cosmos.Context(connection, database, container)
let processManager = createProcessManager args.ProcessManagerMaxDop (context, cache)
let processManager =
let (discovery, database, container, connector) = source.Cosmos.BuildConnectionDetails()
let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously
let cache = Equinox.Cache(AppName, sizeMb=10)
let context = Equinox.Cosmos.Context(connection, database, container)
createProcessManager args.ProcessManagerMaxDop (context, cache)

// Hook the Watchdog Handler to the Process Manager
let isRelevantToWatchdogHandler = function
| FinalizationTransaction.Match _ -> true
| _ -> false
let handle = Shipping.Watchdog.Handler.handle args.ProcessingTimeout processManager.Drive
let stats = Shipping.Watchdog.Handler.Stats(Log.Logger, statsInterval=args.StatsInterval, stateInterval=args.StateInterval)
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats, args.StatsInterval)
let sink =
let stats = Handler.Stats(Serilog.Log.Logger, statsInterval=args.StatsInterval, stateInterval=args.StateInterval)
createSink Log.Logger (args.ProcessingTimeout, stats) (args.MaxReadAhead, args.MaxConcurrentStreams) processManager.Drive

// Wire up the CFP to feed in the items
let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq =
docs
|> Seq.collect EquinoxCosmosParser.enumStreamEvents
|> Seq.filter (fun e -> isRelevantToWatchdogHandler e.stream)
|> Seq.filter (fun e -> Handler.isRelevant e.stream)
let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems)
let runSource =
let monitoredDiscovery, monitored, monitoredConnector = source.BuildConnectionDetails()
CosmosSource.Run(Log.Logger, monitoredConnector.CreateClient(AppName, monitoredDiscovery), monitored, aux,
leaseId, startFromTail, createObserver,
?maxDocuments = maxDocuments, ?lagReportFreq = lagFrequency)
?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
sink, runSource

let run args =
Expand Down

0 comments on commit 81d7c87

Please sign in to comment.