diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ca7086c..763402d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,10 +52,10 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226) - `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https://github.com/jet/propulsion/pull/139) -- `Propulsion.Feed`: Moved implementations into main `Propulsion` library. While this adds a `FSharp.Control.TaskSeq` dependency, it makes maintenance and navigation easier +- `Propulsion.Feed`: Moved implementations into main `Propulsion` library. While this adds a `FSharp.Control.TaskSeq` dependency, it makes maintenance and navigation easier [#265](https://github.com/jet/propulsion/pull/265) - `Propulsion.Feed`,`Kafka`: Replaced `Async` with `task` for supervision [#158](https://github.com/jet/propulsion/pull/158), [#159](https://github.com/jet/propulsion/pull/159) - `Propulsion.Kafka`: Target `FsCodec.NewtonsoftJson` v `3.0.0` [#139](https://github.com/jet/propulsion/pull/139) -- `Propulsion.Prometheus`: Extracted `Propulsion.Prometheus` and `Propulsion.Feed.Prometheus` in order to remove `Prometheus` dependency from core package +- `Propulsion.Prometheus`: Extracted `Propulsion.Prometheus` and `Propulsion.Feed.Prometheus` in order to remove `Prometheus` dependency from core package [#265](https://github.com/jet/propulsion/pull/265) - `Propulsion.Tool`: `project` renamed to `sync`; sources now have a `from` prefix [#252](https://github.com/jet/propulsion/pull/252) ### Removed diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index 0ab5f3e7..f71e24f3 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -32,7 +32,7 @@ module private Impl = | Exceptions.ProvisionedThroughputExceeded when not force -> () | e -> storeLog.Warning(e, "DynamoStoreSource commit failure") - let mkBatch position isTail items: Propulsion.Feed.Core.Batch = + let mkBatch position isTail items: Propulsion.Feed.Batch = { items = items; checkpoint = position; isTail = isTail } let sliceBatch epochId offset items = mkBatch (Checkpoint.positionOfEpochAndOffset epochId offset) false items diff --git a/src/Propulsion.EventStoreDb/EventStoreSource.fs b/src/Propulsion.EventStoreDb/EventStoreSource.fs index ae4918fb..54af4191 100644 --- a/src/Propulsion.EventStoreDb/EventStoreSource.fs +++ b/src/Propulsion.EventStoreDb/EventStoreSource.fs @@ -17,7 +17,7 @@ module private Impl = let pos = let p = pos |> Propulsion.Feed.Position.toInt64 |> uint64 in Position(p, p) let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, withData, cancellationToken = ct) let! batch = res |> TaskSeq.map _.Event |> TaskSeq.toArrayAsync - return ({ checkpoint = checkpointPos batch; items = toItems streamFilter batch; isTail = batch.LongLength <> batchSize }: Propulsion.Feed.Core.Batch<_>) } + return ({ checkpoint = checkpointPos batch; items = toItems streamFilter batch; isTail = batch.LongLength <> batchSize }: Propulsion.Feed.Batch<_>) } // @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk let private chunk (pos: Position) = uint64 pos.CommitPosition >>> 28 diff --git a/src/Propulsion.MessageDb/MessageDbSource.fs b/src/Propulsion.MessageDb/MessageDbSource.fs index 81c93554..e502e1c6 100644 --- a/src/Propulsion.MessageDb/MessageDbSource.fs +++ b/src/Propulsion.MessageDb/MessageDbSource.fs @@ -53,7 +53,7 @@ module Internal = let sn = reader.GetString(6) |> FsCodec.StreamName.parse struct (sn, event) - member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct): Task> = task { + member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct): Task> = task { use! conn = connect ct use command = GetCategoryMessages.prepareCommand conn category fromPositionInclusive batchSize @@ -61,7 +61,7 @@ module Internal = let events = [| while reader.Read() do parseRow reader |] let checkpoint = match Array.tryLast events with Some (_, ev) -> unbox ev.Context | None -> fromPositionInclusive - return ({ checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 }: Core.Batch<_>) } + return ({ checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 }: Batch<_>) } member _.TryReadCategoryLastVersion(category: TrancheId, ct): Task = task { use! conn = connect ct @@ -70,7 +70,7 @@ module Internal = use! reader = command.ExecuteReaderAsync(ct) return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone } - let internal readBatch batchSize (store: MessageDbCategoryClient) struct (category, pos, ct): Task> = + let internal readBatch batchSize (store: MessageDbCategoryClient) struct (category, pos, ct): Task> = let positionInclusive = Position.toInt64 pos store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) diff --git a/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs b/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs index 224bbe6e..d00403eb 100644 --- a/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs +++ b/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs @@ -19,7 +19,7 @@ module private Impl = if streamFilter sn then Some struct (sn, msg) else None) let! items = if not withData then task { return filtered |> Seq.map (toStreamEvent null) |> Array.ofSeq } else filtered |> Seq.map readWithDataAsStreamEvent |> Propulsion.Internal.Task.sequential ct - return ({ checkpoint = Propulsion.Feed.Position.parse page.NextPosition; items = items; isTail = page.IsEnd }: Propulsion.Feed.Core.Batch<_>) } + return ({ checkpoint = Propulsion.Feed.Position.parse page.NextPosition; items = items; isTail = page.IsEnd }: Propulsion.Feed.Batch<_>) } let readTailPositionForTranche (store: SqlStreamStore.IStreamStore) _trancheId ct = task { let! lastEventPos = store.ReadHeadPosition(ct) diff --git a/src/Propulsion/Feed.fs b/src/Propulsion/Feed.fs index a6a0ff26..028c6278 100644 --- a/src/Propulsion/Feed.fs +++ b/src/Propulsion/Feed.fs @@ -33,3 +33,12 @@ type IFeedCheckpointStore = /// Determines the starting position, and checkpointing frequency for a given tranche abstract member Start: source: SourceId * tranche: TrancheId * establishOrigin: Func> option * ct: CancellationToken -> Task abstract member Commit: source: SourceId * tranche: TrancheId * pos: Position * CancellationToken -> Task + +[] +type Batch<'F> = + { items: Propulsion.Streams.StreamEvent<'F>[] + /// Next computed read position (inclusive). Checkpoint stores treat absence of a value as `Position.initial` (= `0`) + checkpoint: Position + /// Indicates whether the end of a feed has been reached (a batch being empty does not necessarily imply that) + /// Implies tail sleep delay. May trigger completion of `Monitor.AwaitCompletion` + isTail: bool } diff --git a/src/Propulsion/FeedReader.fs b/src/Propulsion/FeedReader.fs index 70062151..86b21126 100644 --- a/src/Propulsion/FeedReader.fs +++ b/src/Propulsion/FeedReader.fs @@ -5,16 +5,6 @@ open Propulsion.Feed open Propulsion.Internal open Serilog open System -open System.Collections.Generic - -[] -type Batch<'F> = - { items: Propulsion.Streams.StreamEvent<'F>[] - /// Next computed read position (inclusive). Checkpoint stores treat absence of a value as `Position.initial` (= `0`) - checkpoint: Position - /// Indicates whether the end of a feed has been reached (a batch being empty does not necessarily imply that) - /// Implies tail sleep delay. May trigger completion of `Monitor.AwaitCompletion` - isTail: bool } module internal TimelineEvent = diff --git a/src/Propulsion/FeedSource.fs b/src/Propulsion/FeedSource.fs index 41e45514..834e99d6 100644 --- a/src/Propulsion/FeedSource.fs +++ b/src/Propulsion/FeedSource.fs @@ -5,7 +5,6 @@ open Propulsion open Propulsion.Feed open Propulsion.Internal open System -open System.Collections.Generic /// Drives reading and checkpointing for a set of feeds (tranches) of a custom source feed type FeedSourceBase internal @@ -169,7 +168,7 @@ type FeedSource let readTs = Stopwatch.timestamp () let! page = readPage.Invoke(trancheId, pos, ct) let items' = page.items |> Array.map (fun x -> struct (streamName, x)) - yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail }: Core.Batch<_>)) } + yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail }: Batch<_>)) } member internal _.Pump(readTranches: Func>, readPage: Func>>, ct): Task = diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs index 5c6acc83..05fbc4d5 100644 --- a/src/Propulsion/Internal.fs +++ b/src/Propulsion/Internal.fs @@ -90,6 +90,7 @@ module Exception = let [] (|Log|_|) log (e: exn) = log e; ValueNone type CancellationToken = System.Threading.CancellationToken +type IAsyncEnumerable<'T> = System.Collections.Generic.IAsyncEnumerable<'T> type Task = System.Threading.Tasks.Task type Task<'T> = System.Threading.Tasks.Task<'T> open System.Threading.Tasks @@ -156,9 +157,7 @@ module Task = parallel_ 1 ct xs let parallelUnlimited ct xs: Task<'t []> = parallel_ 0 ct xs - let inline ignore<'T> (a: Task<'T>): Task = task { - let! _ = a - return () } + let inline ignore<'T> (a: Task<'T>): Task = task { let! _ = a in return () } let ofUnitTask (x: Task): Task = task { return! x } let periodically (f: CancellationToken -> Task) interval (ct: CancellationToken) = task { let t = new System.Threading.PeriodicTimer(interval) // no use as ct will Dispose diff --git a/src/Propulsion/JsonSource.fs b/src/Propulsion/JsonSource.fs index b595b788..ba15d46a 100644 --- a/src/Propulsion/JsonSource.fs +++ b/src/Propulsion/JsonSource.fs @@ -37,7 +37,7 @@ type [] JsonSource private () = let lineNo = int64 i + 1L try let items = if isEof then Array.empty else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray - struct (System.TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>)) + struct (System.TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Batch<_>)) with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) } let source = SinglePassFeedSource(log, statsInterval, sourceId, crawl, checkpoints, sink, string) source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] }) diff --git a/src/Propulsion/PeriodicSource.fs b/src/Propulsion/PeriodicSource.fs index b332d102..38e730d5 100644 --- a/src/Propulsion/PeriodicSource.fs +++ b/src/Propulsion/PeriodicSource.fs @@ -7,7 +7,6 @@ namespace Propulsion.Feed open FSharp.Control // taskSeq open Propulsion.Internal open System -open System.Collections.Generic /// Int64.MaxValue = 9223372036854775807 /// ([datetimeoffset]::FromUnixTimeSeconds(9223372036854775807 / 1000000000)) is in 2262 @@ -55,7 +54,7 @@ type PeriodicSource defaultArg renderPos DateTimeOffsetPosition.render, defaultArg shutdownTimeout (TimeSpan.seconds 5)) // We don't want to checkpoint for real until we know the scheduler has handled the full set of pages in the crawl. - let crawlInternal (read: Func<_, IAsyncEnumerable>) trancheId (_wasLast, position) ct: IAsyncEnumerable)> = taskSeq { + let crawlInternal (read: Func<_, IAsyncEnumerable>) trancheId (_wasLast, position) ct: IAsyncEnumerable)> = taskSeq { let startDate = DateTimeOffsetPosition.getDateTimeOffset position let dueDate = startDate + refreshInterval match dueDate - DateTimeOffset.UtcNow with @@ -83,14 +82,14 @@ type PeriodicSource let items = Array.zeroCreate ready buffer.CopyTo(0, items, 0, ready) buffer.RemoveRange(0, ready) - yield struct (elapsed, ({ items = items; checkpoint = position; isTail = false }: Core.Batch<_>)) + yield struct (elapsed, ({ items = items; checkpoint = position; isTail = false }: Batch<_>)) elapsed <- TimeSpan.Zero | _ -> () let items, checkpoint = match buffer.ToArray() with | [||] as noItems -> noItems, basePosition | finalItem -> finalItem, let struct (_s, e) = Array.last finalItem in e |> Core.TimelineEvent.toCheckpointPosition - yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true }: Core.Batch<_>) } + yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true }: Batch<_>) } member internal _.Pump(readTranches: Func>, // The TaskSeq is expected to manage its own resilience strategy (retries etc).
diff --git a/src/Propulsion/SinglePassFeedSource.fs b/src/Propulsion/SinglePassFeedSource.fs index c74312dc..960c47eb 100644 --- a/src/Propulsion/SinglePassFeedSource.fs +++ b/src/Propulsion/SinglePassFeedSource.fs @@ -1,9 +1,7 @@ namespace Propulsion.Feed -open Propulsion.Feed.Core +open Propulsion.Internal open System -open System.Collections.Generic -open System.Threading /// Drives reading from the Source, stopping when the Tail of each of the Tranches has been reached type SinglePassFeedSource @@ -12,7 +10,8 @@ type SinglePassFeedSource crawl: Func)>>, checkpoints: IFeedCheckpointStore, sink: Propulsion.Sinks.SinkPipeline, ?renderPos, ?logReadFailure, ?readFailureSleepInterval, ?logCommitFailure) = - inherit TailingFeedSource(log, statsInterval, sourceId, (*tailSleepInterval*)TimeSpan.Zero, checkpoints, (*establishOrigin*)None, sink, defaultArg renderPos string, + inherit Propulsion.Feed.Core.TailingFeedSource( + log, statsInterval, sourceId, (*tailSleepInterval*)TimeSpan.Zero, checkpoints, (*establishOrigin*)None, sink, defaultArg renderPos string, crawl, ?logReadFailure = logReadFailure, ?readFailureSleepInterval = readFailureSleepInterval, ?logCommitFailure = logCommitFailure, readersStopAtTail = true) diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 2e65573c..a08dba66 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -6,7 +6,6 @@ open Propulsion.Internal open Propulsion.MessageDb open Swensen.Unquote open System -open System.Collections.Generic open System.Diagnostics open Xunit @@ -76,7 +75,7 @@ let ``It processes events for a category`` () = task { let! checkpoints = makeCheckpoints consumerGroup let stats = stats log let mutable stop = ignore - let handled = HashSet<_>() + let handled = System.Collections.Generic.HashSet<_>() let handle stream (events: Propulsion.Sinks.Event[]) _ct = task { lock handled (fun _ -> for evt in events do diff --git a/tests/Propulsion.Tests/SinkHealthTests.fs b/tests/Propulsion.Tests/SinkHealthTests.fs index c9d19fb2..ebce386c 100644 --- a/tests/Propulsion.Tests/SinkHealthTests.fs +++ b/tests/Propulsion.Tests/SinkHealthTests.fs @@ -40,7 +40,7 @@ type Scenario(testOutput) = [| sid "a-ok", mk 0 "EventType" failingSid, mk 0 "EventType" stuckSid, mk 0 "EventType" |] - let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = items; isTail = true; checkpoint = Unchecked.defaultof<_> }: Core.Batch<_>)) + let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = items; isTail = true; checkpoint = Unchecked.defaultof<_> }: Batch<_>)) let extractHealthCheckExn (ex: Choice<_, exn>) = trap <@ match ex with @@ -49,7 +49,7 @@ type Scenario(testOutput) = [] let run () = async { - let source = Propulsion.Feed.SinglePassFeedSource(log, TimeSpan.FromSeconds 5, SourceId.parse "sid", crawl, checkpoints, sink, string) + let source = SinglePassFeedSource(log, TimeSpan.FromSeconds 5, SourceId.parse "sid", crawl, checkpoints, sink, string) let src = source.Start(fun _ct -> task { return [| TrancheId.parse "tid" |] }) let! monEx = src.Monitor.AwaitCompletion(propagationDelay = TimeSpan.FromSeconds 1, awaitFullyCaughtUp = true) |> Propulsion.Internal.Async.ofTask |> Async.Catch let me = extractHealthCheckExn monEx diff --git a/tests/Propulsion.Tests/SourceTests.fs b/tests/Propulsion.Tests/SourceTests.fs index ce97c4ff..995cd29b 100644 --- a/tests/Propulsion.Tests/SourceTests.fs +++ b/tests/Propulsion.Tests/SourceTests.fs @@ -22,7 +22,7 @@ type Scenario(testOutput) = [] let ``TailingFeedSource Stop / AwaitCompletion semantics`` () = task { - let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Core.Batch<_>)) + let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Batch<_>)) let source = Propulsion.Feed.Core.TailingFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", TimeSpan.FromMinutes 1, checkpoints, (*establishOrigin*)None, sink, string, crawl) use src = source.Start(fun ct -> source.Pump((fun _ -> task { return [| TrancheId.parse "tid" |] }), ct)) @@ -38,8 +38,8 @@ type Scenario(testOutput) = [] let SinglePassFeedSource withWait = async { - let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Core.Batch<_>)) - let source = Propulsion.Feed.SinglePassFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", crawl, checkpoints, sink, string) + let crawl _ _ _ = TaskSeq.singleton <| struct (TimeSpan.FromSeconds 0.1, ({ items = Array.empty; isTail = true; checkpoint = Unchecked.defaultof<_> }: Batch<_>)) + let source = SinglePassFeedSource(log, TimeSpan.FromMinutes 1, SourceId.parse "sid", crawl, checkpoints, sink, string) use src = source.Start(fun _ct -> task { return [| TrancheId.parse "tid" |] }) // SinglePassFeedSource completion includes Waiting for Completion of all Batches on all Tranches and Flushing of Checkpoints // Hence waiting with the Monitor is not actually necessary (though it provides progress logging which otherwise would be less thorough)