From 87025471a9c9d7ddee1a56f44c182d4665334cf0 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 10 Feb 2025 12:03:21 +0000 Subject: [PATCH] FS3579 cleanup --- equinox-shipping/Watchdog/Handler.fs | 3 +-- feed-consumer/Ingester.fs | 3 +-- periodic-ingester/Ingester.fs | 3 +-- propulsion-consumer/Consumer.fsproj | 2 +- propulsion-hotel/Reactor/Handler.fs | 3 +-- propulsion-indexer/App/Args.fs | 16 ++++++++-------- propulsion-indexer/App/CosmosDumpSource.fs | 2 +- propulsion-indexer/App/Infrastructure.fs | 2 +- propulsion-indexer/Domain/Store.fs | 2 +- propulsion-indexer/Indexer/Program.fs | 10 +++++----- propulsion-indexer/Indexer/Snapshotter.fs | 2 +- propulsion-reactor/Reactor.fsproj | 2 +- .../SummaryConsumer.fsproj | 2 +- 13 files changed, 24 insertions(+), 28 deletions(-) diff --git a/equinox-shipping/Watchdog/Handler.fs b/equinox-shipping/Watchdog/Handler.fs index 50443a62c..b9108680c 100644 --- a/equinox-shipping/Watchdog/Handler.fs +++ b/equinox-shipping/Watchdog/Handler.fs @@ -50,8 +50,7 @@ let handle | TransactionWatchdog.Stuck -> let! success = driveTransaction transId return Outcome.Resolved success, Propulsion.Sinks.Events.next events - | other -> - return failwithf "Span from unexpected category %A" other } + | sn, _xs -> return failwith $"Span from unexpected category %s{FsCodec.StreamName.toString sn}" } type Factory private () = diff --git a/feed-consumer/Ingester.fs b/feed-consumer/Ingester.fs index 96af5001b..034efe38c 100644 --- a/feed-consumer/Ingester.fs +++ b/feed-consumer/Ingester.fs @@ -57,8 +57,7 @@ let handle maxDop stream events = async { let! added = Async.Parallel(maybeAdd, maxDegreeOfParallelism = maxDop) let outcome = { added = Seq.length added; notReady = results.Length - ready.Length; dups = results.Length - ticketIds.Length } return outcome, Propulsion.Sinks.Events.index events + ticketIds.LongLength - | x -> return failwithf "Unexpected stream %O" x -} + | sn, _ -> return failwith $"Unexpected stream %s{FsCodec.StreamName.toString sn}" } type Factory private () = diff --git a/periodic-ingester/Ingester.fs b/periodic-ingester/Ingester.fs index cd7752690..41b436241 100644 --- a/periodic-ingester/Ingester.fs +++ b/periodic-ingester/Ingester.fs @@ -50,8 +50,7 @@ let handle stream events = async { | PipelineEvent.TicketEvents (ticketId, items) -> // TODO : Ingest the data return IngestionOutcome.Unchanged, Propulsion.Sinks.Events.next events - | x -> return failwithf "Unexpected stream %O" x -} + | sn, _ -> return failwith $"Unexpected stream %s{FsCodec.StreamName.toString sn}" } type Factory private () = diff --git a/propulsion-consumer/Consumer.fsproj b/propulsion-consumer/Consumer.fsproj index 50ee9d782..446f1c975 100644 --- a/propulsion-consumer/Consumer.fsproj +++ b/propulsion-consumer/Consumer.fsproj @@ -17,7 +17,7 @@ - + diff --git a/propulsion-hotel/Reactor/Handler.fs b/propulsion-hotel/Reactor/Handler.fs index e7aee0737..0495732c5 100644 --- a/propulsion-hotel/Reactor/Handler.fs +++ b/propulsion-hotel/Reactor/Handler.fs @@ -51,8 +51,7 @@ let private handle (processor: GroupCheckoutProcess.Service) stream _events = as // the change feed. In those cases, Propulsion will drop any incoming events that would represent duplication of processing, // (and not even invoke the Handler unless one or more of the feed events are beyond the write position) return outcome, ver' - | other -> - return failwithf $"Span from unexpected category %s{FsCodec.StreamName.toString other}" } + | sn -> return failwith $"Span from unexpected category %s{FsCodec.StreamName.toString sn}" } let private createService store = let stays = GuestStay.Factory.create store diff --git a/propulsion-indexer/App/Args.fs b/propulsion-indexer/App/Args.fs index 3f1c8f174..bd27591bd 100644 --- a/propulsion-indexer/App/Args.fs +++ b/propulsion-indexer/App/Args.fs @@ -31,10 +31,10 @@ type [] CosmosParameters = member p.Usage = p |> function | Verbose -> "request Verbose Logging from Store. Default: off" | ConnectionMode _ -> "override the connection mode. Default: Direct." - | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable ${CONNECTION} specified)" - | Database _ -> $"specify a database name for store. (optional if environment variable ${DATABASE} specified)" - | Container _ -> $"specify a container name for store. (optional if environment variable ${CONTAINER} specified)" - | Views _ -> $"specify a views Container name for Cosmos views. (optional if environment variable ${VIEWS} specified)" + | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable $%s{CONNECTION} specified)" + | Database _ -> $"specify a database name for store. (optional if environment variable $%s{DATABASE} specified)" + | Container _ -> $"specify a container name for store. (optional if environment variable $%s{CONTAINER} specified)" + | Views _ -> $"specify a views Container name for Cosmos views. (optional if environment variable $%s{VIEWS} specified)" | Timeout _ -> "specify operation timeout in seconds. Default: 5." | Retries _ -> "specify operation retries. Default: 1." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." @@ -73,10 +73,10 @@ type [] CosmosSourceParameters = member p.Usage = p |> function | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" | ConnectionMode _ -> "override the connection mode. Default: Direct." - | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {CONNECTION} specified)" - | Database _ -> $"specify a database name for store. (optional if environment variable {DATABASE} specified)" - | Container _ -> $"specify a container name for store. (optional if environment variable {CONTAINER} specified)" - | Views _ -> $"specify a container name for views container. (optional if environment variable {VIEWS} specified)" + | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable %s{CONNECTION} specified)" + | Database _ -> $"specify a database name for store. (optional if environment variable %s{DATABASE} specified)" + | Container _ -> $"specify a container name for store. (optional if environment variable %s{CONTAINER} specified)" + | Views _ -> $"specify a container name for views container. (optional if environment variable %s{VIEWS} specified)" | Timeout _ -> "specify operation timeout in seconds. Default: 5." | Retries _ -> "specify operation retries. Default: 1." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." diff --git a/propulsion-indexer/App/CosmosDumpSource.fs b/propulsion-indexer/App/CosmosDumpSource.fs index 7d9cac362..9cebb17cf 100644 --- a/propulsion-indexer/App/CosmosDumpSource.fs +++ b/propulsion-indexer/App/CosmosDumpSource.fs @@ -23,7 +23,7 @@ type [] CosmosDumpSource private () = try let items = if isEof then Array.empty else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray struct (TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Batch<_>)) - with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) } + with e -> raise <| exn($"File Parse error on L%d{lineNo}: '%s{line.Substring(0, 200)}'", e) } let source = let checkpointStore = Equinox.MemoryStore.VolatileStore() let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.FromMinutes 1) checkpointStore diff --git a/propulsion-indexer/App/Infrastructure.fs b/propulsion-indexer/App/Infrastructure.fs index 7037b1b56..fd996f390 100644 --- a/propulsion-indexer/App/Infrastructure.fs +++ b/propulsion-indexer/App/Infrastructure.fs @@ -111,7 +111,7 @@ type Equinox.CosmosStore.CosmosStoreConnector with let contexts = client.CreateContext(role, databaseId, containerId, tipMaxEvents = 256, queryMaxItems = 500), // In general, the views container won't write events. We also know we generally won't attach a CFP, so we keep events in tip - client.CreateContext($"{role}(Views)", databaseId, viewsContainerId, tipMaxEvents = 128), + client.CreateContext($"%s{role}(Views)", databaseId, viewsContainerId, tipMaxEvents = 128), // NOTE the tip limits for this connection are set to be effectively infinite in order to ensure that writes never trigger calving from the tip client.CreateContext("snapshotUpdater", databaseId, containerId, tipMaxEvents = 1024, tipMaxJsonLength = 1024 * 1024, skipLog = not (logSnapshotConfig = Some true)) diff --git a/propulsion-indexer/Domain/Store.fs b/propulsion-indexer/Domain/Store.fs index 820c69f14..334db26e7 100644 --- a/propulsion-indexer/Domain/Store.fs +++ b/propulsion-indexer/Domain/Store.fs @@ -72,7 +72,7 @@ module Ingester = for x in inputs do if x.Index >= c.Version then // NOTE source and target need to have 1:1 matching event indexes, or things would be much more complex match inputCodec.Decode x with - | ValueNone -> failwith $"Unknown EventType {x.EventType} at index {x.Index}" + | ValueNone -> failwith $"Unknown EventType %s{x.EventType} at index %d{x.Index}" | ValueSome d -> struct (x, d) |] // So we require all source events to exactly one event in the target type Service<'id, 'e, 's, 'f> internal (codec: IEventCodec<'e, 'f, unit>, resolve: 'id -> Equinox.Decider, State>) = diff --git a/propulsion-indexer/Indexer/Program.fs b/propulsion-indexer/Indexer/Program.fs index 99d1b2a0d..ba07b653b 100644 --- a/propulsion-indexer/Indexer/Program.fs +++ b/propulsion-indexer/Indexer/Program.fs @@ -65,7 +65,7 @@ module Args = let incIndexes = p.Contains IncIdx let allowEts, denyEts = p.GetResults IncEvent, p.GetResults ExcEvent let isPlain = Seq.forall (fun x -> Char.IsLetterOrDigit x || x = '_') - let asRe = Seq.map (fun x -> if isPlain x then $"^{x}$" else x) + let asRe = Seq.map (fun x -> if isPlain x then $"^%s{x}$" else x) let (|Filter|) exprs = let values, pats = List.partition isPlain exprs let valuesContains = let set = System.Collections.Generic.HashSet(values) in set.Contains @@ -155,7 +155,7 @@ module Args = | Choice2Of2 f, Index _ -> let! contexts = f.Connect() return Choice2Of3 (f.Filepath, (f.Skip, f.Trunc), store contexts) - | Choice2Of2 _, (Summarize _ | Snapshot _ as x) -> return x |> failwithf "unexpected %A" + | Choice2Of2 _, (Summarize _ | Snapshot _ as x) -> return failwith $"unexpected %A{x}" | Choice1Of2 c, action -> let lsc = match action with Snapshot _ -> true | _ -> false let! contexts, monitored, leases = c.ConnectWithFeed(lsc = lsc) @@ -287,12 +287,12 @@ let build (args: Args.Arguments) = async { | Args.Action.SummarizeFile _ -> summarize () | Args.Action.Sync a -> sync a | Args.Action.Export a -> export a - | x -> x |> failwithf "unexpected %A" + | x -> failwith $"unexpected %A{x}" | Choice2Of3 (filePath, skipTrunc, store) -> // Index from file to store (no change feed involved) return mkFileSource filePath skipTrunc <|| match args.Action with | Args.Action.Index _ -> index store - | x -> x |> failwithf "unexpected %A" + | x -> failwith $"unexpected %A{x}" | Choice3Of3 (monitored, leases, (startFromTail, maxItems, tailSleepInterval, _lagFrequency), store) -> // normal case - consume from change feed, write to store let parseFeedDoc, sink = match args.Action with @@ -301,7 +301,7 @@ let build (args: Args.Arguments) = async { | Args.Action.Snapshot _ -> snapshot store | Args.Action.Sync a -> sync a | Args.Action.Export a -> export a - | Args.Action.SummarizeFile _ as x -> x |> failwithf "unexpected %A" + | Args.Action.SummarizeFile _ as x -> failwith $"unexpected %A{x}" let source = Propulsion.CosmosStore.CosmosStoreSource( Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink, diff --git a/propulsion-indexer/Indexer/Snapshotter.fs b/propulsion-indexer/Indexer/Snapshotter.fs index 98c551b53..e5b51fd98 100644 --- a/propulsion-indexer/Indexer/Snapshotter.fs +++ b/propulsion-indexer/Indexer/Snapshotter.fs @@ -38,7 +38,7 @@ let handle todo let! res, pos' = match stream with | Todo.Reactions.For id -> todo id - | sn -> failwith $"Unexpected category %A{sn}" + | sn -> failwith $"Unexpected category %s{FsCodec.StreamName.toString sn}" // a) if the tryUpdate saw a version beyond what (Propulsion.Sinks.Events.nextIndex events) would suggest, then we pass that information out // in order to have the scheduler drop all events until we meet an event that signifies we may need to re-update // b) the fact that we use the same Microsoft.Azure.Cosmos.CosmosClient for the Change Feed and the Equinox-based Services means we are guaranteed diff --git a/propulsion-reactor/Reactor.fsproj b/propulsion-reactor/Reactor.fsproj index fab4a4294..f2e0726b7 100644 --- a/propulsion-reactor/Reactor.fsproj +++ b/propulsion-reactor/Reactor.fsproj @@ -42,7 +42,7 @@ - + diff --git a/propulsion-summary-consumer/SummaryConsumer.fsproj b/propulsion-summary-consumer/SummaryConsumer.fsproj index 6e29dc831..79fc072d1 100644 --- a/propulsion-summary-consumer/SummaryConsumer.fsproj +++ b/propulsion-summary-consumer/SummaryConsumer.fsproj @@ -20,7 +20,7 @@ - +