Skip to content

Commit

Permalink
FeedSource polish (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Nov 15, 2022
1 parent d3246c9 commit ef942d2
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 77 deletions.
1 change: 0 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
root = true


[*.fs]
indent_style = space
indent_size = 4
Expand Down
10 changes: 5 additions & 5 deletions src/Propulsion.Cosmos/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ChangeFeedObserverContext = { source : ContainerId; leasePrefix : string }
/// Provides F#-friendly wrapping to compose a ChangeFeedObserver from functions
type ChangeFeedObserver =
static member Create
( /// Base logger context; will be decorated with a `partitionId` property when passed to `assign`, `init` and `ingest`
( /// Base logger context; will be decorated with a `partition` property when passed to `assign`, `init` and `ingest`
log : ILogger,
/// Callback responsible for
/// - handling ingestion of a batch of documents (potentially offloading work to another control path)
Expand All @@ -43,24 +43,24 @@ type ChangeFeedObserver =
?dispose : unit -> unit) =
let mutable log = log
let _open (ctx : IChangeFeedObserverContext) = async {
log <- log.ForContext("partitionId",ctx.PartitionKeyRangeId)
log <- log.ForContext("partition", ctx.PartitionKeyRangeId)
let rangeId = int ctx.PartitionKeyRangeId
match init with
| Some f -> f log rangeId
| None -> ()
match assign with
| Some f -> return! f log rangeId
| None -> log.Information("Reader {partitionId} Assigned", ctx.PartitionKeyRangeId) }
| None -> log.Information("Reader {partition} Assigned", ctx.PartitionKeyRangeId) }
let _process (ctx, docs) = async {
try do! ingest log ctx docs |> Async.AwaitTaskCorrect
with e ->
log.Error(e, "Reader {partitionId} Handler Threw", ctx.PartitionKeyRangeId)
log.Error(e, "Reader {partition} Handler Threw", ctx.PartitionKeyRangeId)
do! Async.Raise e }
let _close (ctx : IChangeFeedObserverContext, reason) = async {
log.Warning "Closing" // Added to enable diagnosing underlying CFP issues; will be removed eventually
match revoke with
| Some f -> return! f log
| None -> log.Information("Reader {partitionId} Revoked {reason}", ctx.PartitionKeyRangeId, reason) }
| None -> log.Information("Reader {partition} Revoked {reason}", ctx.PartitionKeyRangeId, reason) }
{ new IChangeFeedObserver with
member _.OpenAsync ctx = Async.StartAsTask(_open ctx) :> _
member _.ProcessChangesAsync(ctx, docs, ct) = Async.StartAsTask(_process(ctx, docs), cancellationToken=ct) :> _
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ type ChangeFeedProcessor =
return! observer.Ingest(ctx, checkpoint, changes) |> Async.AwaitTask
#endif
with e ->
log.Error(e, "Reader {processorName}/{partitionId} Handler Threw", processorName, context.LeaseToken)
log.Error(e, "Reader {processorName}/{partition} Handler Threw", processorName, context.LeaseToken)
do! Async.Raise e }
fun ctx chg chk ct -> Async.StartAsTask(aux ctx chg chk, cancellationToken = ct) :> Task
let acquireAsync leaseToken = log.Information("Reader {partitionId} Assigned", leaseTokenToPartitionId leaseToken); Task.CompletedTask
let releaseAsync leaseToken = log.Information("Reader {partitionId} Revoked", leaseTokenToPartitionId leaseToken); Task.CompletedTask
let acquireAsync leaseToken = log.Information("Reader {partition} Assigned", leaseTokenToPartitionId leaseToken); Task.CompletedTask
let releaseAsync leaseToken = log.Information("Reader {partition} Revoked", leaseTokenToPartitionId leaseToken); Task.CompletedTask
let notifyError =
notifyError
|> Option.defaultValue (fun i ex -> log.Error(ex, "Reader {partitionId} error", i))
|> Option.defaultValue (fun i ex -> log.Error(ex, "Reader {partition} error", i))
|> fun f -> fun leaseToken ex -> f (leaseTokenToPartitionId leaseToken) ex; Task.CompletedTask
monitored
.GetChangeFeedProcessorBuilderWithManualCheckpoint(processorName_, Container.ChangeFeedHandlerWithManualCheckpoint handler)
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type CosmosSource =
database = context.source.database; container = context.source.container; group = context.leasePrefix; rangeId = int ctx.PartitionKeyRangeId
token = epoch; latency = sw.Elapsed; rc = rc; age = age; docs = docs.Count
ingestLatency = pt.Elapsed; ingestQueued = cur }
(log |> Log.withMetric m).Information("Reader {partitionId} {token,9} age {age:dd\.hh\:mm\:ss} {count,4} docs {requestCharge,6:f1}RU {l,5:f1}s Wait {pausedS:f3}s Ahead {cur}/{max}",
(log |> Log.withMetric m).Information("Reader {partition} {token,9} age {age:dd\.hh\:mm\:ss} {count,4} docs {requestCharge,6:f1}RU {l,5:f1}s Wait {pausedS:f3}s Ahead {cur}/{max}",
ctx.PartitionKeyRangeId, epoch, age, docs.Count, rc, readS, postS, cur, max)
sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
}
Expand All @@ -87,7 +87,7 @@ type CosmosStoreSource =
database = ctx.source.Database.Id; container = ctx.source.Id; group = ctx.group; rangeId = int ctx.rangeId
token = ctx.epoch; latency = readElapsed; rc = ctx.requestCharge; age = age; docs = docs.Count
ingestLatency = pt.Elapsed; ingestQueued = cur }
(log |> Log.withMetric m).Information("Reader {partitionId} {token,9} age {age:dd\.hh\:mm\:ss} {count,4} docs {requestCharge,6:f1}RU {l,5:f1}s Wait {pausedS:f3}s Ahead {cur}/{max}",
(log |> Log.withMetric m).Information("Reader {partition} {token,9} age {age:dd\.hh\:mm\:ss} {count,4} docs {requestCharge,6:f1}RU {l,5:f1}s Wait {pausedS:f3}s Ahead {cur}/{max}",
ctx.rangeId, ctx.epoch, age, docs.Count, ctx.requestCharge, readElapsed.TotalSeconds, pt.ElapsedSeconds, cur, max)
sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
}
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore.Lambda/SqsNotificationBatch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ open System.Collections.Generic
type SqsNotificationBatch(event : SQSEvent) =
let inputs = [|
for r in event.Records ->
let trancheId = r.MessageAttributes["TrancheId"].StringValue |> Propulsion.Feed.TrancheId.parse
let trancheId = r.MessageAttributes["Tranche"].StringValue |> Propulsion.Feed.TrancheId.parse
let position = r.MessageAttributes["Position"].StringValue |> int64 |> Propulsion.Feed.Position.parse
struct (trancheId, position, r.MessageId) |]

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore.Notifier/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ let private mkRequest topicArn messages =
let req = PublishBatchRequest(TopicArn = topicArn)
messages |> Seq.iteri (fun i struct (trancheId, pos) ->
let e = PublishBatchRequestEntry(Id = string i, Subject = trancheId, Message = pos, MessageGroupId = trancheId, MessageDeduplicationId = trancheId + pos)
e.MessageAttributes.Add("TrancheId", MessageAttributeValue(StringValue = trancheId, DataType="String"))
e.MessageAttributes.Add("Tranche", MessageAttributeValue(StringValue = trancheId, DataType="String"))
e.MessageAttributes.Add("Position", MessageAttributeValue(StringValue = pos, DataType="String"))
req.PublishBatchRequestEntries.Add(e))
req
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/DynamoStoreIndexer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, epochBytesCutoff,
let epochs = AppendsEpoch.Config.create storeLog (epochBytesCutoff, maxVersion, maxStreams) (context, cache)
let index = AppendsIndex.Config.create storeLog (context, cache)
let createIngester trancheId =
let log = log.ForContext("trancheId", trancheId)
let log = log.ForContext("tranche", trancheId)
let readIngestionEpoch () = index.ReadIngestionEpochId trancheId
let markIngestionEpoch epochId = index.MarkIngestionEpochId(trancheId, epochId)
let ingest (eid, items) = epochs.Ingest(trancheId, eid, items)
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ module private Impl =
all.Length, chosenEvents, totalEvents, streamEvents
let largeEnoughToLog = streamEvents.Count > batchCutoff
if largeEnoughToLog then
log.Information("DynamoStoreSource {sourceId}/{trancheId}/{epochId}@{offset} {mode:l} {totalChanges} changes {loadingS}/{totalS} streams {loadingE}/{totalE} events",
log.Information("DynamoStoreSource {source}/{tranche}/{epochId}@{offset} {mode:l} {totalChanges} changes {loadingS}/{totalS} streams {loadingE}/{totalE} events",
sourceId, string tid, string epochId, offset, (if hydrating then "Hydrating" else "Feeding"), totalChanges, streamEvents.Count, totalStreams, chosenEvents, totalEvents)

let buffer, cache = ResizeArray<AppendsEpoch.Events.StreamSpan>(), System.Collections.Concurrent.ConcurrentDictionary()
Expand Down Expand Up @@ -95,7 +95,7 @@ module private Impl =
| loadedNow when prevLoaded <> loadedNow ->
prevLoaded <- loadedNow
let eventsLoaded = cache.Values |> Seq.sumBy Array.length
log.Information("DynamoStoreSource {sourceId}/{trancheId}/{epochId}@{offset}/{totalChanges} {result} {batch} {events}e Loaded {loadedS}/{loadingS}s {loadedE}/{loadingE}e",
log.Information("DynamoStoreSource {source}/{tranche}/{epochId}@{offset}/{totalChanges} {result} {batch} {events}e Loaded {loadedS}/{loadingS}s {loadedE}/{loadingE}e",
sourceId, string tid, string epochId, Option.toNullable i, version, "Hydrated", batchIndex, len, cache.Count, streamEvents.Count, eventsLoaded, chosenEvents)
| _ -> ()
batchIndex <- batchIndex + 1
Expand Down
28 changes: 11 additions & 17 deletions src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Propulsion.Feed.Core

open System.Threading
open FSharp.Control
open Propulsion // Async.Raise
open Propulsion.Feed
Expand Down Expand Up @@ -38,7 +37,7 @@ module Log =
[<AutoOpen>]
module private Impl =

type Stats(log : ILogger, statsInterval : TimeSpan, source : SourceId, tranche : TrancheId, renderPos : Position -> string) =
type Stats(partition : int, source : SourceId, tranche : TrancheId, renderPos : Position -> string) =

let mutable batchLastPosition = Position.parse -1L
let mutable batchCaughtUp = false
Expand All @@ -50,17 +49,17 @@ module private Impl =

let mutable lastCommittedPosition = Position.parse -1L

let report () =
member _.Dump(log : ILogger) =
let p pos = match pos with p when p = Position.parse -1L -> Nullable() | x -> Nullable x
let m = Log.Metric.Read {
source = source; tranche = tranche
token = p batchLastPosition; latency = readLatency; pages = recentPagesRead; items = recentEvents
ingestLatency = ingestLatency; ingestQueued = currentBatches }
let readS, postS = readLatency.TotalSeconds, ingestLatency.TotalSeconds
let inline r pos = match pos with p when p = Position.parse -1L -> null | x -> renderPos x
(log |> Log.withMetric m).Information(
"Reader {source:l}/{tranche:l} Tail {caughtUp} Position {readPosition} Committed {lastCommittedPosition} Pages {pagesRead} Empty {pagesEmpty} Events {events} | Recent {l:f1}s Pages {recentPagesRead} Empty {recentPagesEmpty} Events {recentEvents} | Wait {pausedS:f1}s Ahead {cur}/{max}",
source, tranche, batchCaughtUp, r batchLastPosition, r lastCommittedPosition, pagesRead, pagesEmpty, events, readS, recentPagesRead, recentPagesEmpty, recentEvents, postS, currentBatches, maxBatches)
(Log.withMetric m log).ForContext("tail", batchCaughtUp).Information(
"Reader {partition} {state} Position {readPosition} Committed {lastCommittedPosition} Pages {pagesRead} Empty {pagesEmpty} Events {events} | Recent {l:f1}s Pages {recentPagesRead} Empty {recentPagesEmpty} Events {recentEvents} | Wait {pausedS:f1}s Ahead {cur}/{max}",
partition, (if batchCaughtUp then "Tail" else "Busy"), r batchLastPosition, r lastCommittedPosition, pagesRead, pagesEmpty, events, readS, recentPagesRead, recentPagesEmpty, recentEvents, postS, currentBatches, maxBatches)
readLatency <- TimeSpan.Zero; ingestLatency <- TimeSpan.Zero;
recentPagesRead <- 0; recentEvents <- 0; recentPagesEmpty <- 0

Expand All @@ -84,13 +83,8 @@ module private Impl =
currentBatches <- cur
maxBatches <- max

member _.Pump(ct : CancellationToken) = task {
while not ct.IsCancellationRequested do
do! Task.Delay(TimeSpan.toMs statsInterval, ct)
report () }

type FeedReader
( log : ILogger, sourceId, trancheId, statsInterval : TimeSpan,
( log : ILogger, partition, source, tranche,
// Walk all content in the source. Responsible for managing exceptions, retries and backoff.
// Implementation is expected to inject an appropriate sleep based on the supplied `Position`
// Processing loop will abort if an exception is yielded
Expand Down Expand Up @@ -118,11 +112,10 @@ type FeedReader
// Stop processing when the crawl function yields a Batch that isTail. Default false.
?stopAtTail) =

let log = log.ForContext("source", sourceId).ForContext("tranche", trancheId)
let stats = Stats(log, statsInterval, sourceId, trancheId, renderPos)
let stats = Stats(partition, source, tranche, renderPos)

let commit position = async {
try do! commitCheckpoint (sourceId, trancheId, position)
try do! commitCheckpoint (source, tranche, position)
stats.UpdateCommittedPosition(position)
log.Debug("Committed checkpoint {position}", position)
with e ->
Expand All @@ -142,12 +135,13 @@ type FeedReader
let! struct (cur, max) = submitBatch { isTail = batch.isTail; epoch = epoch; checkpoint = commit batch.checkpoint; items = streamEvents; onCompletion = ignore }
stats.UpdateCurMax(ingestTimer.Elapsed, cur, max) }

member _.Log = log
member _.DumpStats() = stats.Dump(log)

member _.Pump(initialPosition : Position) = async {
log.Debug("Starting reading stream from position {initialPosition}", renderPos initialPosition)
stats.UpdateCommittedPosition(initialPosition)
// Commence reporting stats until such time as we quit pumping
let! ct = Async.CancellationToken
Task.start (fun () -> stats.Pump ct)
let mutable currentPos, lastWasTail = initialPosition, false
while not (ct.IsCancellationRequested || (lastWasTail && defaultArg stopAtTail false)) do
for readLatency, batch in crawl (lastWasTail, currentPos) do
Expand Down
Loading

0 comments on commit ef942d2

Please sign in to comment.