Skip to content

Commit

Permalink
FS3579 cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 10, 2025
1 parent 62d9324 commit 8702547
Show file tree
Hide file tree
Showing 13 changed files with 24 additions and 28 deletions.
3 changes: 1 addition & 2 deletions equinox-shipping/Watchdog/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ let handle
| TransactionWatchdog.Stuck ->
let! success = driveTransaction transId
return Outcome.Resolved success, Propulsion.Sinks.Events.next events
| other ->
return failwithf "Span from unexpected category %A" other }
| sn, _xs -> return failwith $"Span from unexpected category %s{FsCodec.StreamName.toString sn}" }

type Factory private () =

Expand Down
3 changes: 1 addition & 2 deletions feed-consumer/Ingester.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ let handle maxDop stream events = async {
let! added = Async.Parallel(maybeAdd, maxDegreeOfParallelism = maxDop)
let outcome = { added = Seq.length added; notReady = results.Length - ready.Length; dups = results.Length - ticketIds.Length }
return outcome, Propulsion.Sinks.Events.index events + ticketIds.LongLength
| x -> return failwithf "Unexpected stream %O" x
}
| sn, _ -> return failwith $"Unexpected stream %s{FsCodec.StreamName.toString sn}" }

type Factory private () =

Expand Down
3 changes: 1 addition & 2 deletions periodic-ingester/Ingester.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ let handle stream events = async {
| PipelineEvent.TicketEvents (ticketId, items) ->
// TODO : Ingest the data
return IngestionOutcome.Unchanged, Propulsion.Sinks.Events.next events
| x -> return failwithf "Unexpected stream %O" x
}
| sn, _ -> return failwith $"Unexpected stream %s{FsCodec.StreamName.toString sn}" }

type Factory private () =

Expand Down
2 changes: 1 addition & 1 deletion propulsion-consumer/Consumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PackageReference Include="Argu" Version="6.1.4" />
<!-- TODO remove when moving off 3.1.0-rc.4-->
<PackageReference Include="FsCodec" Version="3.1.0-rc.4" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.1.0-rc.4" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.1.0-rc.5" />
<PackageReference Include="Propulsion.Kafka" Version="3.0.0-rc.14.10" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
</ItemGroup>
Expand Down
3 changes: 1 addition & 2 deletions propulsion-hotel/Reactor/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ let private handle (processor: GroupCheckoutProcess.Service) stream _events = as
// the change feed. In those cases, Propulsion will drop any incoming events that would represent duplication of processing,
// (and not even invoke the Handler unless one or more of the feed events are beyond the write position)
return outcome, ver'
| other ->
return failwithf $"Span from unexpected category %s{FsCodec.StreamName.toString other}" }
| sn -> return failwith $"Span from unexpected category %s{FsCodec.StreamName.toString sn}" }

let private createService store =
let stays = GuestStay.Factory.create store
Expand Down
16 changes: 8 additions & 8 deletions propulsion-indexer/App/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ type [<NoEquality; NoComparison>] CosmosParameters =
member p.Usage = p |> function
| Verbose -> "request Verbose Logging from Store. Default: off"
| ConnectionMode _ -> "override the connection mode. Default: Direct."
| Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable ${CONNECTION} specified)"
| Database _ -> $"specify a database name for store. (optional if environment variable ${DATABASE} specified)"
| Container _ -> $"specify a container name for store. (optional if environment variable ${CONTAINER} specified)"
| Views _ -> $"specify a views Container name for Cosmos views. (optional if environment variable ${VIEWS} specified)"
| Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable $%s{CONNECTION} specified)"
| Database _ -> $"specify a database name for store. (optional if environment variable $%s{DATABASE} specified)"
| Container _ -> $"specify a container name for store. (optional if environment variable $%s{CONTAINER} specified)"
| Views _ -> $"specify a views Container name for Cosmos views. (optional if environment variable $%s{VIEWS} specified)"
| Timeout _ -> "specify operation timeout in seconds. Default: 5."
| Retries _ -> "specify operation retries. Default: 1."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5."
Expand Down Expand Up @@ -73,10 +73,10 @@ type [<NoEquality; NoComparison>] CosmosSourceParameters =
member p.Usage = p |> function
| Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off"
| ConnectionMode _ -> "override the connection mode. Default: Direct."
| Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {CONNECTION} specified)"
| Database _ -> $"specify a database name for store. (optional if environment variable {DATABASE} specified)"
| Container _ -> $"specify a container name for store. (optional if environment variable {CONTAINER} specified)"
| Views _ -> $"specify a container name for views container. (optional if environment variable {VIEWS} specified)"
| Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable %s{CONNECTION} specified)"
| Database _ -> $"specify a database name for store. (optional if environment variable %s{DATABASE} specified)"
| Container _ -> $"specify a container name for store. (optional if environment variable %s{CONTAINER} specified)"
| Views _ -> $"specify a container name for views container. (optional if environment variable %s{VIEWS} specified)"
| Timeout _ -> "specify operation timeout in seconds. Default: 5."
| Retries _ -> "specify operation retries. Default: 1."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5."
Expand Down
2 changes: 1 addition & 1 deletion propulsion-indexer/App/CosmosDumpSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type [<Sealed; AbstractClass>] CosmosDumpSource private () =
try let items = if isEof then Array.empty
else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray
struct (TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Batch<_>))
with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) }
with e -> raise <| exn($"File Parse error on L%d{lineNo}: '%s{line.Substring(0, 200)}'", e) }
let source =
let checkpointStore = Equinox.MemoryStore.VolatileStore()
let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.FromMinutes 1) checkpointStore
Expand Down
2 changes: 1 addition & 1 deletion propulsion-indexer/App/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Equinox.CosmosStore.CosmosStoreConnector with
let contexts =
client.CreateContext(role, databaseId, containerId, tipMaxEvents = 256, queryMaxItems = 500),
// In general, the views container won't write events. We also know we generally won't attach a CFP, so we keep events in tip
client.CreateContext($"{role}(Views)", databaseId, viewsContainerId, tipMaxEvents = 128),
client.CreateContext($"%s{role}(Views)", databaseId, viewsContainerId, tipMaxEvents = 128),
// NOTE the tip limits for this connection are set to be effectively infinite in order to ensure that writes never trigger calving from the tip
client.CreateContext("snapshotUpdater", databaseId, containerId, tipMaxEvents = 1024, tipMaxJsonLength = 1024 * 1024,
skipLog = not (logSnapshotConfig = Some true))
Expand Down
2 changes: 1 addition & 1 deletion propulsion-indexer/Domain/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ module Ingester =
for x in inputs do
if x.Index >= c.Version then // NOTE source and target need to have 1:1 matching event indexes, or things would be much more complex
match inputCodec.Decode x with
| ValueNone -> failwith $"Unknown EventType {x.EventType} at index {x.Index}"
| ValueNone -> failwith $"Unknown EventType %s{x.EventType} at index %d{x.Index}"
| ValueSome d -> struct (x, d) |] // So we require all source events to exactly one event in the target

type Service<'id, 'e, 's, 'f> internal (codec: IEventCodec<'e, 'f, unit>, resolve: 'id -> Equinox.Decider<Event<'e, 'f>, State>) =
Expand Down
10 changes: 5 additions & 5 deletions propulsion-indexer/Indexer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module Args =
let incIndexes = p.Contains IncIdx
let allowEts, denyEts = p.GetResults IncEvent, p.GetResults ExcEvent
let isPlain = Seq.forall (fun x -> Char.IsLetterOrDigit x || x = '_')
let asRe = Seq.map (fun x -> if isPlain x then $"^{x}$" else x)
let asRe = Seq.map (fun x -> if isPlain x then $"^%s{x}$" else x)
let (|Filter|) exprs =
let values, pats = List.partition isPlain exprs
let valuesContains = let set = System.Collections.Generic.HashSet(values) in set.Contains
Expand Down Expand Up @@ -155,7 +155,7 @@ module Args =
| Choice2Of2 f, Index _ ->
let! contexts = f.Connect()
return Choice2Of3 (f.Filepath, (f.Skip, f.Trunc), store contexts)
| Choice2Of2 _, (Summarize _ | Snapshot _ as x) -> return x |> failwithf "unexpected %A"
| Choice2Of2 _, (Summarize _ | Snapshot _ as x) -> return failwith $"unexpected %A{x}"
| Choice1Of2 c, action ->
let lsc = match action with Snapshot _ -> true | _ -> false
let! contexts, monitored, leases = c.ConnectWithFeed(lsc = lsc)
Expand Down Expand Up @@ -287,12 +287,12 @@ let build (args: Args.Arguments) = async {
| Args.Action.SummarizeFile _ -> summarize ()
| Args.Action.Sync a -> sync a
| Args.Action.Export a -> export a
| x -> x |> failwithf "unexpected %A"
| x -> failwith $"unexpected %A{x}"
| Choice2Of3 (filePath, skipTrunc, store) -> // Index from file to store (no change feed involved)
return mkFileSource filePath skipTrunc <||
match args.Action with
| Args.Action.Index _ -> index store
| x -> x |> failwithf "unexpected %A"
| x -> failwith $"unexpected %A{x}"
| Choice3Of3 (monitored, leases, (startFromTail, maxItems, tailSleepInterval, _lagFrequency), store) -> // normal case - consume from change feed, write to store
let parseFeedDoc, sink =
match args.Action with
Expand All @@ -301,7 +301,7 @@ let build (args: Args.Arguments) = async {
| Args.Action.Snapshot _ -> snapshot store
| Args.Action.Sync a -> sync a
| Args.Action.Export a -> export a
| Args.Action.SummarizeFile _ as x -> x |> failwithf "unexpected %A"
| Args.Action.SummarizeFile _ as x -> failwith $"unexpected %A{x}"
let source =
Propulsion.CosmosStore.CosmosStoreSource(
Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink,
Expand Down
2 changes: 1 addition & 1 deletion propulsion-indexer/Indexer/Snapshotter.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let handle todo
let! res, pos' =
match stream with
| Todo.Reactions.For id -> todo id
| sn -> failwith $"Unexpected category %A{sn}"
| sn -> failwith $"Unexpected category %s{FsCodec.StreamName.toString sn}"
// a) if the tryUpdate saw a version beyond what (Propulsion.Sinks.Events.nextIndex events) would suggest, then we pass that information out
// in order to have the scheduler drop all events until we meet an event that signifies we may need to re-update
// b) the fact that we use the same Microsoft.Azure.Cosmos.CosmosClient for the Change Feed and the Equinox-based Services means we are guaranteed
Expand Down
2 changes: 1 addition & 1 deletion propulsion-reactor/Reactor.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<PackageReference Include="Propulsion.SqlStreamStore" Version="3.0.0-rc.14.10" />
<!-- TODO remove when moving off 3.1.0-rc.4-->
<PackageReference Include="FsCodec" Version="3.1.0-rc.4" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.1.0-rc.4" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.1.0-rc.5" />
<!--#if (kafka || sourceKafka) -->
<PackageReference Include="Propulsion.Kafka" Version="3.0.0-rc.14.10" />
<!--#endif-->
Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-consumer/SummaryConsumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-rc.1" />
<!-- TODO remove when moving off 3.1.0-rc.4-->
<PackageReference Include="FsCodec" Version="3.1.0-rc.4" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.1.0-rc.4" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.1.0-rc.5" />
<PackageReference Include="Propulsion.Kafka" Version="3.0.0-rc.14.10" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
</ItemGroup>
Expand Down

0 comments on commit 8702547

Please sign in to comment.