From edfb9be18a88d139110873a873514c6e124b5d74 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 13 May 2020 17:36:13 +0100 Subject: [PATCH] Update to FsKafka/FsKafka0 1.4.2 (#67) --- CHANGELOG.md | 7 + Directory.Build.props | 2 +- Directory.Build.targets | 2 +- README.md | 4 +- azure-pipelines.yml | 17 +- build.proj | 2 +- .../Propulsion.Cosmos.fsproj | 2 +- .../Propulsion.EventStore.fsproj | 2 +- .../{Bindings.fs => Binding.fs} | 8 +- src/Propulsion.Kafka/Consumers.fs | 18 +- src/Propulsion.Kafka/Monitor.fs | 365 ------------------ src/Propulsion.Kafka/Propulsion.Kafka.fsproj | 7 +- .../{Bindings.fs => Binding.fs} | 8 +- src/Propulsion.Kafka0/ConfluentKafka1Shims.fs | 141 ------- src/Propulsion.Kafka0/FsKafkaShims.fs | 336 ---------------- .../Propulsion.Kafka0.fsproj | 10 +- src/Propulsion/Propulsion.fsproj | 2 +- .../ConsumersIntegration.fs | 24 +- .../MonitorIntegration.fs | 4 +- .../MonitorTests.fs | 111 ------ .../Propulsion.Kafka.Integration.fsproj | 1 - .../Propulsion.Kafka0.Integration.fsproj | 1 - tools/Propulsion.Tool/Program.fs | 10 + 23 files changed, 60 insertions(+), 1024 deletions(-) rename src/Propulsion.Kafka/{Bindings.fs => Binding.fs} (73%) delete mode 100644 src/Propulsion.Kafka/Monitor.fs rename src/Propulsion.Kafka0/{Bindings.fs => Binding.fs} (79%) delete mode 100644 src/Propulsion.Kafka0/ConfluentKafka1Shims.fs delete mode 100644 src/Propulsion.Kafka0/FsKafkaShims.fs delete mode 100644 tests/Propulsion.Kafka.Integration/MonitorTests.fs diff --git a/CHANGELOG.md b/CHANGELOG.md index b48fdc68..9afafff9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,14 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added ### Changed + +- `Kafka`: Targets [`FsKafka`/`FsKafka0` v `1.4.2`](https://github.com/jet/FsKafka/blob/master/CHANGELOG.md#1.4.2) [#64](https://github.com/jet/propulsion/pull/64) + ### Removed + +- `Propulsion.Kafka0` Some `Propulsion.Kafka0`-namespaced shimming elements are now found in the `FsKafka` namespace in `FsKafka0` [#64](https://github.com/jet/propulsion/pull/64) +- `Propulsion.Kafka`: `KafkaMonitor` is now found in the `FsKafka` namespace in `FsKafka`/FsKafka0` (NOTE: integration tests continue to live in this repo) [#64](https://github.com/jet/propulsion/pull/64) + ### Fixed - `Kafka`: Change buffer grouping to include `Topic` alongside `PartitionId` - existing implementation did not guarantee marking progress where consuming from more than one Topic concurrently [#63](https://github.com/jet/propulsion/pull/63) diff --git a/Directory.Build.props b/Directory.Build.props index b9bb2a91..d20cd9e8 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,4 +1,4 @@ - + @jet @bartelink @eiriktsarpalis and contributors Jet.com diff --git a/Directory.Build.targets b/Directory.Build.targets index cd0335a9..9998a4c6 100644 --- a/Directory.Build.targets +++ b/Directory.Build.targets @@ -1,4 +1,4 @@ - + $(MinVerMajor).$(MinVerMinor).$(MinVerPatch)-pr.$(BUILD_PR) diff --git a/README.md b/README.md index 0aef1f27..e137cb69 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,8 @@ The components within this repository are delivered as a multi-targeted Nuget pa - `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion including `ParallelProjector`, `StreamsProjector`. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics`, `Serilog` - `Propulsion.Cosmos` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/) Provides bindings to Azure CosmosDb a) writing to `Equinox.Cosmos` :- `CosmosSink` b) reading from CosmosDb's changefeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) :- `CosmosSource`. [Depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDB.ChangeFeedProcessor`, `Serilog` - `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`, `Serilog` -- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. Implements a `KafkaMonitor` that can log status information based on [Burrow](https://github.com/linkedin/Burrow). [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v ` = 1.4.1`, `Serilog` -- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but targets older `Confluent.Kafka`/`librdkafka` version for for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`, `Serilog` +- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v ` = 1.4.2`, `Serilog` +- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but uses `FsKafka0` instead of `FsKafka` in order to target an older `Confluent.Kafka`/`librdkafka` version pairing for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`, `Serilog` The ubiquitous `Serilog` dependency is solely on the core module, not any sinks, i.e. you configure to emit to `NLog` etc. diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 3f6aac26..43c6d6bc 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,15 +1,14 @@ name: $(Rev:r) # Summary: -# Linux: Tests with netcoreapp3.1 using docker Kafka instance -# net461 on Mono is not working, no investigation why as yet, but local run validates it +# Linux: Tests using docker Kafka instance (note test suite does not run net461 as CK on mono is not supported) # Windows: Builds and emits nupkg as artifacts # MacOS: Builds only jobs: - job: Windows pool: - vmImage: 'vs2017-win2016' + vmImage: 'windows-latest' steps: - script: dotnet pack build.proj displayName: dotnet pack build.proj @@ -24,12 +23,6 @@ jobs: pool: vmImage: 'ubuntu-latest' steps: - - task: UseDotNet@2 - displayName: 'Install .NET Core sdk' - inputs: - packageType: sdk - version: 3.1.101 - installationPath: $(Agent.ToolsDirectory)/dotnet - script: | docker network create kafka-net docker run -d --name zookeeper --network kafka-net --publish 2181:2181 zookeeper:3.4 @@ -52,11 +45,5 @@ jobs: pool: vmImage: 'macOS-latest' steps: - - task: UseDotNet@2 - displayName: 'Install .NET Core sdk' - inputs: - packageType: sdk - version: 3.1.101 - installationPath: $(Agent.ToolsDirectory)/dotnet - script: dotnet pack build.proj displayName: dotnet pack build.proj \ No newline at end of file diff --git a/build.proj b/build.proj index 8f12b78c..b50a4e88 100644 --- a/build.proj +++ b/build.proj @@ -1,4 +1,4 @@ - + diff --git a/src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj b/src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj index efa760cc..84fe1ccc 100644 --- a/src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj +++ b/src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj @@ -6,7 +6,7 @@ false true true - true + true diff --git a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj index 9cd17f01..8bc8b7c2 100644 --- a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj +++ b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj @@ -6,7 +6,7 @@ false true true - true + true diff --git a/src/Propulsion.Kafka/Bindings.fs b/src/Propulsion.Kafka/Binding.fs similarity index 73% rename from src/Propulsion.Kafka/Bindings.fs rename to src/Propulsion.Kafka/Binding.fs index 18dc5f9c..215f4305 100644 --- a/src/Propulsion.Kafka/Bindings.fs +++ b/src/Propulsion.Kafka/Binding.fs @@ -6,13 +6,9 @@ open Serilog open System open System.Collections.Generic -module Bindings = - let mapMessage (x : ConsumeResult) = x.Message +module Binding = let mapConsumeResult (x : ConsumeResult) = KeyValuePair(x.Message.Key,x.Message.Value) - let inline partitionId (x : ConsumeResult<_,_>) = let p = x.Partition in p.Value - let topicPartition (topic : string) (partition : int) = TopicPartition(topic, Partition partition) - let partitionValue (partition : Partition) = let p = partition in p.Value - let offsetUnset = Offset.Unset + let makeTopicPartition (topic : string) (partition : int) = TopicPartition(topic, Partition partition) let createConsumer log config : IConsumer * (unit -> unit) = let consumer = ConsumerBuilder.WithLogging(log, config) consumer, consumer.Close diff --git a/src/Propulsion.Kafka/Consumers.fs b/src/Propulsion.Kafka/Consumers.fs index dc6b7401..d6fc52ab 100644 --- a/src/Propulsion.Kafka/Consumers.fs +++ b/src/Propulsion.Kafka/Consumers.fs @@ -70,10 +70,10 @@ type KafkaIngestionEngine<'Info> let mkSubmission topicPartition span : Submission.SubmissionBatch<'S, 'M> = let checkpoint () = counter.Delta(-span.reservation) // counterbalance Delta(+) per ingest, below - Bindings.storeOffset log consumer span.highWaterMark + Binding.storeOffset log consumer span.highWaterMark { source = topicPartition; onCompletion = checkpoint; messages = span.messages.ToArray() } let ingest result = - let message = Bindings.mapMessage result + let message = Binding.message result let sz = approximateMessageBytes message counter.Delta(+sz) // counterbalanced by Delta(-) in checkpoint(), below intervalMsgs <- intervalMsgs + 1L @@ -111,7 +111,7 @@ type KafkaIngestionEngine<'Info> submit() maybeLogStats() | false, Some intervalRemainder -> - Bindings.tryConsume log consumer intervalRemainder ingest + Binding.tryConsume log consumer intervalRemainder ingest finally submit () // We don't want to leak our reservations against the counter and want to pass of messages we ingested dumpStats () // Unconditional logging when completing @@ -134,7 +134,7 @@ type ConsumerPipeline private (inner : IConsumer, task : Task(log, limiter, consumer, closeConsumer, mapResult, submit, maxItems, maxDelay, statsInterval=statsInterval) let cts = new CancellationTokenSource() @@ -203,7 +203,7 @@ type ParallelConsumer private () = static member Start ( log : ILogger, config : KafkaConsumerConfig, maxDop, handle : KeyValuePair -> Async, ?maxSubmissionsPerPartition, ?pumpInterval, ?statsInterval, ?logExternalStats) = - ParallelConsumer.Start>(log, config, maxDop, Bindings.mapConsumeResult, handle >> Async.Catch, + ParallelConsumer.Start>(log, config, maxDop, Binding.mapConsumeResult, handle >> Async.Catch, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, ?statsInterval=statsInterval, ?logExternalStats=logExternalStats) type EventMetrics = Streams.EventMetrics @@ -299,7 +299,7 @@ module Core = stats : Streams.Scheduling.Stats, statsInterval, ?maximizeOffsetWriting, ?maxSubmissionsPerPartition, ?pumpInterval, ?logExternalState, ?idleDelay)= StreamsConsumer.Start, 'Outcome>( - log, config, Bindings.mapConsumeResult, keyValueToStreamEvents, prepare, handle, maxDop, + log, config, Binding.mapConsumeResult, keyValueToStreamEvents, prepare, handle, maxDop, stats, statsInterval=statsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, @@ -316,7 +316,7 @@ module Core = stats : Streams.Scheduling.Stats, statsInterval, ?maximizeOffsetWriting, ?maxSubmissionsPerPartition, ?pumpInterval, ?logExternalState, ?idleDelay, ?maxBatches) = StreamsConsumer.Start, 'Outcome>( - log, config, Bindings.mapConsumeResult, keyValueToStreamEvents, handle, maxDop, + log, config, Binding.mapConsumeResult, keyValueToStreamEvents, handle, maxDop, stats, statsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, @@ -359,7 +359,7 @@ type StreamNameSequenceGenerator() = member __.ConsumeResultToStreamEvent(toStreamName : ConsumeResult<_, _> -> StreamName) : ConsumeResult -> Propulsion.Streams.StreamEvent seq = let toDataAndContext (result : ConsumeResult) = - let message = Bindings.mapMessage result + let message = Binding.message result System.Text.Encoding.UTF8.GetBytes message.Value, null __.ConsumeResultToStreamEvent(toStreamName, toDataAndContext) @@ -382,7 +382,7 @@ type StreamNameSequenceGenerator() = member __.ConsumeResultToStreamEvent(toDataAndContext : ConsumeResult<_, _> -> byte[] * obj, ?defaultCategory) : ConsumeResult -> Propulsion.Streams.StreamEvent seq = let toStreamName (result : ConsumeResult) = - let message = Bindings.mapMessage result + let message = Binding.message result Core.parseMessageKey (defaultArg defaultCategory "") message.Key let toTimelineEvent (result : ConsumeResult, index) = let data, context = toDataAndContext result diff --git a/src/Propulsion.Kafka/Monitor.fs b/src/Propulsion.Kafka/Monitor.fs deleted file mode 100644 index 7c49b281..00000000 --- a/src/Propulsion.Kafka/Monitor.fs +++ /dev/null @@ -1,365 +0,0 @@ -// Implements a watchdog that can be used to have a service self-detect stalled consumers and/or consistently growing lags -// Adapted from https://github.com/linkedin/Burrow by @jgardella -namespace Propulsion.Kafka - -open Confluent.Kafka -open FsKafka -open Serilog -open System -open System.Diagnostics - -type PartitionResult = - | OkReachedZero // check 1 - | WarningLagIncreasing // check 3 - | ErrorPartitionStalled of lag: int64 // check 2 - | Healthy - -module MonitorImpl = -#if NET461 - module Array = - let head xs = Seq.head xs - let last xs = Seq.last xs -#endif - module private Map = - let mergeChoice (f : 'a -> Choice<'b * 'c, 'b, 'c> -> 'd) (map1 : Map<'a, 'b>) (map2 : Map<'a, 'c>) : Map<'a, 'd> = - Set.union (map1 |> Seq.map (fun k -> k.Key) |> set) (map2 |> Seq.map (fun k -> k.Key) |> set) - |> Seq.map (fun k -> - match Map.tryFind k map1, Map.tryFind k map2 with - | Some b, Some c -> k, f k (Choice1Of3 (b,c)) - | Some b, None -> k, f k (Choice2Of3 b) - | None, Some c -> k, f k (Choice3Of3 c) - | None, None -> failwith "invalid state") - |> Map.ofSeq - - /// Progress information for a consumer in a group. - type [] private ConsumerProgressInfo = - { /// The consumer group id. - group : string - - /// The name of the kafka topic. - topic : string - - /// Progress info for each partition. - partitions : ConsumerPartitionProgressInfo[] - - /// The total lag across all partitions. - totalLag : int64 - - /// The minimum lead across all partitions. - minLead : int64 } - /// Progress information for a consumer in a group, for a specific topic-partition. - and [] private ConsumerPartitionProgressInfo = - { /// The partition id within the topic. - partition : int - - /// The consumer's current offset. - consumerOffset : Offset - - /// The offset at the current start of the topic. - earliestOffset : Offset - - /// The offset at the current end of the topic. - highWatermarkOffset : Offset - - /// The distance between the high watermark offset and the consumer offset. - lag : int64 - - /// The distance between the consumer offset and the earliest offset. - lead : int64 - - /// The number of messages in the partition. - messageCount : int64 } - - /// Operations for providing consumer progress information. - module private ConsumerInfo = - - /// Returns consumer progress information. - /// Note that this does not join the group as a consumer instance - let progress (timeout : TimeSpan) (consumer : IConsumer<'k,'v>) (topic : string) (ps : int[]) = async { - let topicPartitions = ps |> Seq.map (Bindings.topicPartition topic) - - let sw = System.Diagnostics.Stopwatch.StartNew() - let committedOffsets = - consumer.Committed(topicPartitions, timeout) - |> Seq.sortBy(fun e -> Bindings.partitionValue e.Partition) - |> Seq.map(fun e -> Bindings.partitionValue e.Partition, e) - |> Map.ofSeq - - let timeout = let elapsed = sw.Elapsed in if elapsed > timeout then TimeSpan.Zero else timeout - elapsed - let! watermarkOffsets = - topicPartitions - |> Seq.map(fun tp -> async { - return Bindings.partitionValue tp.Partition, consumer.QueryWatermarkOffsets(tp, timeout)} ) - |> Async.Parallel - let watermarkOffsets = watermarkOffsets |> Map.ofArray - - let partitions = - (watermarkOffsets, committedOffsets) - ||> Map.mergeChoice (fun p -> function - | Choice1Of3 (hwo,cOffset) -> - let e,l,o = (let v = hwo.Low in v.Value),(let v = hwo.High in v.Value),let v = cOffset.Offset in v.Value - // Consumer offset of (Invalid Offset -1001) indicates that no consumer offset is present. In this case, we should calculate lag as the high water mark minus earliest offset - let lag, lead = - match o with - | offset when offset = let v = Bindings.offsetUnset in v.Value -> l - e, 0L - | _ -> l - o, o - e - { partition = p ; consumerOffset = cOffset.Offset ; earliestOffset = hwo.Low ; highWatermarkOffset = hwo.High ; lag = lag ; lead = lead ; messageCount = l - e } - | Choice2Of3 hwo -> - // in the event there is no consumer offset present, lag should be calculated as high watermark minus earliest - // this prevents artifically high lags for partitions with no consumer offsets - let e,l = (let v = hwo.Low in v.Value),let v = hwo.High in v.Value - { partition = p ; consumerOffset = Bindings.offsetUnset; earliestOffset = hwo.Low ; highWatermarkOffset = hwo.High ; lag = l - e ; lead = 0L ; messageCount = l - e } - //failwithf "unable to find consumer offset for topic=%s partition=%i" topic p - | Choice3Of3 o -> - let invalid = Bindings.offsetUnset - { partition = p ; consumerOffset = o.Offset ; earliestOffset = invalid ; highWatermarkOffset = invalid ; lag = invalid.Value ; lead = invalid.Value ; messageCount = -1L }) - |> Seq.map (fun kvp -> kvp.Value) - |> Seq.toArray - - return { - topic = topic ; group = consumer.Name ; partitions = partitions - totalLag = partitions |> Seq.sumBy (fun p -> p.lag) - minLead = - if partitions.Length > 0 then - partitions |> Seq.map (fun p -> p.lead) |> Seq.min - else let v = Bindings.offsetUnset in v.Value } } - - type PartitionInfo = - { partition : int - consumerOffset : OffsetValue - earliestOffset : OffsetValue - highWatermarkOffset : OffsetValue - lag : int64 } - - [] - type Window = Window of PartitionInfo [] - - let private toPartitionInfo (info : ConsumerPartitionProgressInfo) = { - partition = info.partition - consumerOffset = OffsetValue.ofOffset info.consumerOffset - earliestOffset = OffsetValue.ofOffset info.earliestOffset - highWatermarkOffset = OffsetValue.ofOffset info.highWatermarkOffset - lag = info.lag } - let private createPartitionInfoList (info : ConsumerProgressInfo) = - Window (Array.map toPartitionInfo info.partitions) - - // Naive insert and copy out buffer - type private RingBuffer<'A> (capacity : int) = - let buffer : 'A [] = Array.zeroCreate capacity - let mutable head,tail,size = 0,-1,0 - - member __.TryCopyFull() = - if size <> capacity then None - else - let arr = Array.zeroCreate size - let mutable i = head - for x = 0 to size - 1 do - arr.[x] <- buffer.[i % capacity] - i <- i + 1 - Some arr - - member __.Add(x : 'A) = - tail <- (tail + 1) % capacity - buffer.[tail] <- x - if (size < capacity) then - size <- size + 1 - else - head <- (head + 1) % capacity - - member __.Clear() = - head <- 0 - tail <- -1 - size <- 0 - - module Rules = - - // Rules taken from https://github.com/linkedin/Burrow - // Rule 1: If over the stored period, the lag is ever zero for the partition, the period is OK - // Rule 2: If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled) - // Rule 3: If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow) - - // The following rules are not implementable given our poll based implementation - they should also not be needed - // Rule 4: If the difference between now and the lastPartition offset timestamp is greater than the difference between the lastPartition and firstPartition offset timestamps, the - // consumer has stopped committing offsets for that partition (error), unless - // Rule 5: If the lag is -1, this is a special value that means there is no broker offset yet. Consider it good (will get caught in the next refresh of topics) - - // If lag is ever zero in the window, no other checks needed - let checkRule1 (partitionInfoWindow : PartitionInfo []) = - partitionInfoWindow |> Array.exists (fun i -> i.lag = 0L) - - // If there is lag, the offsets should be progressing in window - let checkRule2 (partitionInfoWindow : PartitionInfo []) = - let offsetsIndicateLag (firstConsumerOffset : OffsetValue) (lastConsumerOffset : OffsetValue) = - match (firstConsumerOffset, lastConsumerOffset) with - | Valid validFirst, Valid validLast -> - validLast - validFirst <= 0L - | Unset, Valid _ -> - // Partition got its initial offset value this window, check again next window. - false - | Valid _, Unset -> - // Partition somehow lost its offset in this window, something's probably wrong. - true - | Unset, Unset -> - // Partition has invalid offsets for the entire window, there may be lag. - true - - let firstWindowPartitions = partitionInfoWindow |> Array.head - let lastWindowPartitions = partitionInfoWindow |> Array.last - - let checkPartitionForLag (firstWindowPartition : PartitionInfo) (lastWindowPartition : PartitionInfo) = - match lastWindowPartition.lag with - | 0L -> None - | lastPartitionLag when offsetsIndicateLag firstWindowPartition.consumerOffset lastWindowPartition.consumerOffset -> - if lastWindowPartition.partition <> firstWindowPartition.partition then failwithf "Partitions did not match in rule2" - Some lastPartitionLag - | _ -> None - - checkPartitionForLag firstWindowPartitions lastWindowPartitions - - // Has the lag reduced between steps in the window - let checkRule3 (partitionInfoWindow : PartitionInfo []) = - let lagDecreasing = - partitionInfoWindow - |> Seq.pairwise - |> Seq.exists (fun (prev, curr) -> curr.lag < prev.lag) - - not lagDecreasing - - let checkRulesForPartition (partitionInfoWindow : PartitionInfo []) = - if checkRule1 partitionInfoWindow then OkReachedZero else - - match checkRule2 partitionInfoWindow with - | Some lag -> - ErrorPartitionStalled lag - | None when checkRule3 partitionInfoWindow -> - WarningLagIncreasing - | _ -> - Healthy - - let checkRulesForAllPartitions (windows : Window []) = - windows - |> Seq.collect (fun (Window partitionInfo) -> partitionInfo) - |> Seq.groupBy (fun p -> p.partition) - |> Seq.map (fun (p, info) -> p, checkRulesForPartition (Array.ofSeq info)) - - let private queryConsumerProgress intervalMs (consumer : IConsumer<'k,'v>) (topic : string) = async { - let partitionIds = [| for t in consumer.Assignment do if t.Topic = topic then yield Bindings.partitionValue t.Partition |] - let! r = ConsumerInfo.progress intervalMs consumer topic partitionIds - return createPartitionInfoList r } - - let run (consumer : IConsumer<'k,'v> ) (interval,windowSize,failResetCount) (topic : string) (group : string) (onQuery,onCheckFailed,onStatus) = - let getAssignedPartitions () = seq { for x in consumer.Assignment do if x.Topic = topic then yield Bindings.partitionValue x.Partition } - let buffer = new RingBuffer<_>(windowSize) - let validateAssignments = - let mutable assignments = getAssignedPartitions() |> set - fun () -> - let current = getAssignedPartitions() |> set - if current <> assignments then - buffer.Clear() - assignments <- current - assignments.Count <> 0 - - let checkConsumerProgress () = async { - let! res = queryConsumerProgress interval consumer topic - onQuery res - buffer.Add res - match buffer.TryCopyFull() with - | None -> () - | Some ci -> - let states = Rules.checkRulesForAllPartitions ci |> List.ofSeq - onStatus topic group states } - - let rec loop failCount = async { - let sw = Stopwatch.StartNew() - let! failCount = async { - try if validateAssignments () then - do! checkConsumerProgress() - return 0 - with exn -> - let count' = failCount + 1 - // If it's been too long since we've successfully obtained a reading, discard preceding values to avoid false positives e.g. re stalled consumers - if count' = failResetCount then - buffer.Clear() - onCheckFailed count' exn - return count' - } - match sw.Elapsed with - | e when e < interval -> - let rem = interval-e - do! Async.Sleep (int rem.TotalMilliseconds) - | _ -> () - return! loop failCount } - loop 0 - - module Logging = - - let logResults (log : ILogger) topic group (partitionResults : (int * PartitionResult) seq) = - let cat = function - | OkReachedZero | Healthy -> Choice1Of3 () - | ErrorPartitionStalled _lag -> Choice2Of3 () - | WarningLagIncreasing -> Choice3Of3 () - match partitionResults |> Seq.groupBy (snd >> cat) |> List.ofSeq with - | [ Choice1Of3 (), _ ] -> log.Information("Monitoring... {topic}/{group} Healthy", topic, group) - | errs -> - for res in errs do - match res with - | Choice1Of3 (), _ -> () - | Choice2Of3 (), errs -> - let lag = function (partitionId, ErrorPartitionStalled lag) -> Some (partitionId,lag) | x -> failwithf "mismapped %A" x - log.Error("Monitoring... {topic}/{group} Stalled with backlogs on {@stalled} [(partition,lag)]", topic, group, errs |> Seq.choose lag) - | Choice3Of3 (), warns -> - log.Warning("Monitoring... {topic}/{group} Growing lags on {@partitionIds}", topic, group, warns |> Seq.map fst) - - let logLatest (logger : ILogger) (topic : string) (consumerGroup : string) (Window partitionInfos) = - let partitionOffsets = - partitionInfos - |> Seq.sortBy (fun p -> p.partition) - |> Seq.map (fun p -> p.partition, p.highWatermarkOffset, p.consumerOffset) - - let aggregateLag = partitionInfos |> Seq.sumBy (fun p -> p.lag) - - logger.Information("Monitoring... {topic}/{consumerGroup} lag {lag} offsets {offsets}", - topic, consumerGroup, aggregateLag, partitionOffsets) - - let logFailure (log : ILogger) (topic : string) (group : string) failCount exn = - log.Warning(exn, "Monitoring... {topic}/{group} Exception # {failCount}", topic, group, failCount) - -/// Used to manage a set of bacground tasks that perdically (based on `interval`) grab the broker's recorded high/low watermarks -/// and then map that to a per-partition status for each partition that the consumer being observed has been assigned -type KafkaMonitor<'k,'v> - ( log : ILogger, - /// Interval between checks of high/low watermarks. Default 30s - ?interval, - /// Number if readings per partition to use in order to make inferences. Default 10 (at default interval of 30s, implies a 5m window). - ?windowSize, - /// Number of failed calls to broker that should trigger discarding of buffered readings in order to avoid false positives. Default 3. - ?failResetCount) = - let failResetCount = defaultArg failResetCount 3 - let interval = defaultArg interval (TimeSpan.FromSeconds 30.) - let windowSize = defaultArg windowSize 10 - let onStatus, onCheckFailed = new Event(), new Event() - - /// Periodically supplies the status for all assigned partitions (whenever we've gathered `windowSize` of readings) - /// Subscriber can e.g. use this to force a consumer restart if no progress is being made - [] member __.OnStatus = onStatus.Publish - - /// Raised whenever call to broker to ascertain watermarks has failed - /// Subscriber can e.g. raise an alert if enough consecutive failures have occurred - [] member __.OnCheckFailed = onCheckFailed.Publish - - // One of these runs per topic - member private __.Pump(consumer, topic, group) = - let onQuery res = - MonitorImpl.Logging.logLatest log topic group res - let onStatus topic group xs = - MonitorImpl.Logging.logResults log topic group xs - onStatus.Trigger(topic, xs) - let onCheckFailed count exn = - MonitorImpl.Logging.logFailure log topic group count exn - onCheckFailed.Trigger(topic, count, exn) - MonitorImpl.run consumer (interval,windowSize,failResetCount) topic group (onQuery,onCheckFailed,onStatus) - - /// Commences a child task per subscribed topic that will ob - member __.StartAsChild(target : IConsumer<'k,'v>, group) = async { - for topic in target.Subscription do - Async.Start(__.Pump(target, topic, group)) } diff --git a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj index 00138d25..afbbf73f 100644 --- a/src/Propulsion.Kafka/Propulsion.Kafka.fsproj +++ b/src/Propulsion.Kafka/Propulsion.Kafka.fsproj @@ -6,15 +6,14 @@ false true true - true + true - + - @@ -27,7 +26,7 @@ - + diff --git a/src/Propulsion.Kafka0/Bindings.fs b/src/Propulsion.Kafka0/Binding.fs similarity index 79% rename from src/Propulsion.Kafka0/Bindings.fs rename to src/Propulsion.Kafka0/Binding.fs index 4f4623a4..d6329eb5 100644 --- a/src/Propulsion.Kafka0/Bindings.fs +++ b/src/Propulsion.Kafka0/Binding.fs @@ -9,13 +9,9 @@ open System.Collections.Generic type IConsumer<'K,'V> = Consumer<'K,'V> type ConsumeResult<'K,'V> = Message<'K,'V> -module Bindings = - let mapMessage : ConsumeResult<_,_> -> Message<_,_> = id +module Binding = let mapConsumeResult (x : ConsumeResult) = KeyValuePair(x.Key,x.Value) - let inline partitionId (x : ConsumeResult<_,_>) = x.Partition - let inline topicPartition (topic : string) (partition : int) = TopicPartition(topic, partition) - let partitionValue = id - let offsetUnset = Offset.Invalid + let inline makeTopicPartition (topic : string) (partition : int) = TopicPartition(topic, partition) let createConsumer log config : IConsumer * (unit -> unit) = ConsumerBuilder.WithLogging(log, config) let inline storeOffset (log : ILogger) (consumer : IConsumer<_,_>) (highWaterMark : ConsumeResult) = diff --git a/src/Propulsion.Kafka0/ConfluentKafka1Shims.fs b/src/Propulsion.Kafka0/ConfluentKafka1Shims.fs deleted file mode 100644 index b777489c..00000000 --- a/src/Propulsion.Kafka0/ConfluentKafka1Shims.fs +++ /dev/null @@ -1,141 +0,0 @@ -namespace FsKafka - -open System -open System.Collections.Generic - -[] -module Types = - [] - type CompressionType = None | GZip | Snappy | Lz4 - - [] - type Acks = Zero | Leader | All - - [] - type Partitioner = Random | Consistent | ConsistentRandom - - [] - type AutoOffsetReset = Earliest | Latest | None - -// Stand-ins for stuff presented in Confluent.Kafka v1 -namespace Propulsion.Kafka0.Confluent.Kafka - -open FsKafka -open System -open System.Collections.Generic - -[] -module Config = - - [] - type ConfigKey<'T> = Key of id : string * render : ('T -> obj) with - member k.Id = let (Key(id,_)) = k in id - - static member (==>) (Key(id, render) : ConfigKey<'T>, value : 'T) = - match render value with - | null -> nullArg id - | :? string as str when String.IsNullOrWhiteSpace str -> nullArg id - | obj -> KeyValuePair(id, obj) - - let private mkKey id render = Key(id, render >> box) - - (* shared keys applying to producers and consumers alike *) - - let bootstrapServers = mkKey "bootstrap.servers" id - let clientId = mkKey "client.id" id - let logConnectionClose = mkKey "log.connection.close" id - let maxInFlight = mkKey "max.in.flight.requests.per.connection" id - let retryBackoff = mkKey "retry.backoff.ms" id - let socketKeepAlive = mkKey "socket.keepalive.enable" id - let statisticsInterval = mkKey "statistics.interval.ms" id - - /// Config keys applying to Producers - module Producer = - let acks = mkKey "acks" (function Acks.Zero -> 0 | Acks.Leader -> 1 | Acks.All -> -1) - // NOTE CK 0.11.4 adds a "compression.type" alias - we use "compression.codec" as 0.11.3 will otherwise throw - let compression = mkKey "compression.codec" (function CompressionType.None -> "none" | CompressionType.GZip -> "gzip" | CompressionType.Snappy -> "snappy" | CompressionType.Lz4 -> "lz4") - let linger = mkKey "linger.ms" id - let messageSendRetries = mkKey "message.send.max.retries" id - let partitioner = mkKey "partitioner" (function Partitioner.Random -> "random" | Partitioner.Consistent -> "consistent" | Partitioner.ConsistentRandom -> "consistent_random") - - /// Config keys applying to Consumers - module Consumer = - let autoCommitInterval = mkKey "auto.commit.interval.ms" id - let autoOffsetReset = mkKey "auto.offset.reset" (function AutoOffsetReset.Earliest -> "earliest" | AutoOffsetReset.Latest -> "latest" | AutoOffsetReset.None -> "none") - let enableAutoCommit = mkKey "enable.auto.commit" id - let enableAutoOffsetStore = mkKey "enable.auto.offset.store" id - let groupId = mkKey "group.id" id - let fetchMaxBytes = mkKey "fetch.message.max.bytes" id - let fetchMinBytes = mkKey "fetch.min.bytes" id - -[] -module private NullableHelpers = - let (|Null|HasValue|) (x:Nullable<'T>) = - if x.HasValue then HasValue x.Value - else Null - -type ProducerConfig() = - let vals = Dictionary() - let set key value = vals.[key] <- box value - - member __.Set(key, value) = set key value - - member val ClientId = null with get, set - member val BootstrapServers = null with get, set - member val RetryBackoffMs = Nullable() with get, set - member val MessageSendMaxRetries = Nullable() with get, set - member val Acks = Nullable() with get, set - member val SocketKeepaliveEnable = Nullable() with get, set - member val LogConnectionClose = Nullable() with get, set - member val MaxInFlight = Nullable() with get, set - member val LingerMs = Nullable() with get, set - member val Partitioner = Nullable() with get, set - member val CompressionType = Nullable() with get, set - member val StatisticsIntervalMs = Nullable() with get, set - - member __.Render() : KeyValuePair[] = - [| match __.ClientId with null -> () | v -> yield Config.clientId ==> v - match __.BootstrapServers with null -> () | v -> yield Config.bootstrapServers ==> v - match __.RetryBackoffMs with Null -> () | HasValue v -> yield Config.retryBackoff ==> v - match __.MessageSendMaxRetries with Null -> () | HasValue v -> yield Config.Producer.messageSendRetries ==> v - match __.Acks with Null -> () | HasValue v -> yield Config.Producer.acks ==> v - match __.SocketKeepaliveEnable with Null -> () | HasValue v -> yield Config.socketKeepAlive ==> v - match __.LogConnectionClose with Null -> () | HasValue v -> yield Config.logConnectionClose ==> v - match __.MaxInFlight with Null -> () | HasValue v -> yield Config.maxInFlight ==> v - match __.LingerMs with Null -> () | HasValue v -> yield Config.Producer.linger ==> v - match __.Partitioner with Null -> () | HasValue v -> yield Config.Producer.partitioner ==> v - match __.CompressionType with Null -> () | HasValue v -> yield Config.Producer.compression ==> v - match __.StatisticsIntervalMs with Null -> () | HasValue v -> yield Config.statisticsInterval ==> v - yield! vals |] - -type ConsumerConfig() = - let vals = Dictionary() - let set key value = vals.[key] <- box value - - member __.Set(key, value) = set key value - - member val ClientId = null with get, set - member val BootstrapServers = null with get, set - member val GroupId = null with get, set - member val AutoOffsetReset = Nullable() with get, set - member val FetchMaxBytes = Nullable() with get, set - member val EnableAutoCommit = Nullable() with get, set - member val EnableAutoOffsetStore = Nullable() with get, set - member val LogConnectionClose = Nullable() with get, set - member val FetchMinBytes = Nullable() with get, set - member val StatisticsIntervalMs = Nullable() with get, set - member val AutoCommitIntervalMs = Nullable() with get, set - - member __.Render() : KeyValuePair[] = - [| match __.ClientId with null -> () | v -> yield Config.clientId ==> v - match __.BootstrapServers with null -> () | v -> yield Config.bootstrapServers ==> v - match __.GroupId with null -> () | v -> yield Config.Consumer.groupId ==> v - match __.AutoOffsetReset with Null -> () | HasValue v -> yield Config.Consumer.autoOffsetReset ==> v - match __.FetchMaxBytes with Null -> () | HasValue v -> yield Config.Consumer.fetchMaxBytes ==> v - match __.LogConnectionClose with Null -> () | HasValue v -> yield Config.logConnectionClose ==> v - match __.EnableAutoCommit with Null -> () | HasValue v -> yield Config.Consumer.enableAutoCommit ==> v - match __.EnableAutoOffsetStore with Null -> () | HasValue v -> yield Config.Consumer.enableAutoOffsetStore ==> v - match __.FetchMinBytes with Null -> () | HasValue v -> yield Config.Consumer.fetchMinBytes ==> v - match __.AutoCommitIntervalMs with Null -> () | HasValue v -> yield Config.Consumer.autoCommitInterval ==> v - match __.StatisticsIntervalMs with Null -> () | HasValue v -> yield Config.statisticsInterval ==> v - yield! vals |] diff --git a/src/Propulsion.Kafka0/FsKafkaShims.fs b/src/Propulsion.Kafka0/FsKafkaShims.fs deleted file mode 100644 index e044a3e7..00000000 --- a/src/Propulsion.Kafka0/FsKafkaShims.fs +++ /dev/null @@ -1,336 +0,0 @@ -// Shims for stuff that's present in FsKafka 1.x -namespace FsKafka - -open Confluent.Kafka -open Newtonsoft.Json -open Newtonsoft.Json.Linq -open Propulsion.Kafka.Internal // Async Helpers -open Propulsion.Kafka0.Confluent.Kafka -open Serilog -open System -open System.Threading -open System.Threading.Tasks -open System.Collections.Generic - -// Cloned from FsKafka master branch -module Core = - [] - type ConsumerBufferingConfig = { minInFlightBytes : int64; maxInFlightBytes : int64; maxBatchSize : int; maxBatchDelay : TimeSpan } - - module Constants = - let messageCounterSourceContext = "FsKafka.Core.InFlightMessageCounter" - - type InFlightMessageCounter(log : ILogger, minInFlightBytes : int64, maxInFlightBytes : int64) = - do if minInFlightBytes < 1L then invalidArg "minInFlightBytes" "must be positive value" - if maxInFlightBytes < 1L then invalidArg "maxInFlightBytes" "must be positive value" - if minInFlightBytes > maxInFlightBytes then invalidArg "maxInFlightBytes" "must be greater than minInFlightBytes" - - let mutable inFlightBytes = 0L - - member __.InFlightMb = float inFlightBytes / 1024. / 1024. - member __.Delta(numBytes : int64) = Interlocked.Add(&inFlightBytes, numBytes) |> ignore - member __.IsOverLimitNow() = Volatile.Read(&inFlightBytes) > maxInFlightBytes - member __.AwaitThreshold busyWork = - if __.IsOverLimitNow() then - log.Information("Consuming... breached in-flight message threshold (now ~{max:n0}B), quiescing until it drops to < ~{min:n1}GB", - inFlightBytes, float minInFlightBytes / 1024. / 1024. / 1024.) - while Volatile.Read(&inFlightBytes) > minInFlightBytes do - busyWork () - log.Verbose "Consumer resuming polling" - -module Config = - let validateBrokerUri (u : Uri) = - if not u.IsAbsoluteUri then invalidArg "broker" "should be of 'host:port' format" - if String.IsNullOrEmpty u.Authority then - // handle a corner case in which Uri instances are erroneously putting the hostname in the `scheme` field. - if System.Text.RegularExpressions.Regex.IsMatch(string u, "^\S+:[0-9]+$") then string u - else invalidArg "broker" "should be of 'host:port' format" - - else u.Authority - -/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings -[] -type KafkaProducerConfig private (inner, bootstrapServers) = - member __.Inner : ProducerConfig = inner - member __.BootstrapServers = bootstrapServers - - member __.Acks = let v = inner.Acks in v.Value - member __.MaxInFlight = let v = inner.MaxInFlight in v.Value - member __.Compression = let v = inner.CompressionType in v.GetValueOrDefault(CompressionType.None) - - /// Creates and wraps a Confluent.Kafka ProducerConfig with the specified settings - static member Create - ( clientId : string, bootstrapServers, acks, - /// Message compression. Defaults to None. - ?compression, - /// Maximum in-flight requests. Default: 1_000_000. - /// NB <> 1 implies potential reordering of writes should a batch fail and then succeed in a subsequent retry - ?maxInFlight, - /// Time to wait for other items to be produced before sending a batch. Default: 0ms - /// NB the linger setting alone does provide any hard guarantees; see BatchedProducer.CreateWithConfigOverrides - ?linger : TimeSpan, - /// Number of retries. Confluent.Kafka default: 2. Default: 60. - ?retries, - /// Backoff interval. Confluent.Kafka default: 100ms. Default: 1s. - ?retryBackoff, - /// Statistics Interval. Default: no stats. - ?statisticsInterval, - /// Confluent.Kafka default: false. Defaults to true. - ?socketKeepAlive, - /// Partition algorithm. Default: `ConsistentRandom`. - ?partitioner, - ?config : IDictionary, - /// Miscellaneous configuration parameters to be passed to the underlying Confluent.Kafka producer configuration. - ?custom, - /// Postprocesses the ProducerConfig after the rest of the rules have been applied - ?customize) = - let c = - ProducerConfig( - ClientId = clientId, BootstrapServers = bootstrapServers, - RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms - MessageSendMaxRetries = Nullable (defaultArg retries 60), // default 2 - Acks = Nullable acks, - SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), // default: false - LogConnectionClose = Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 - MaxInFlight = Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000 - config |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v)) - linger |> Option.iter (fun x -> c.LingerMs <- Nullable (int x.TotalMilliseconds)) // default 0 - partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x) - compression |> Option.iter (fun x -> c.CompressionType <- Nullable x) - statisticsInterval |> Option.iter (fun x -> c.StatisticsIntervalMs <- Nullable (int x.TotalMilliseconds)) - custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v)) - customize |> Option.iter (fun f -> f c) - KafkaProducerConfig(c, bootstrapServers) - -[] -module Impl = - let encoding = System.Text.Encoding.UTF8 - let mkSerializer() = new Confluent.Kafka.Serialization.StringSerializer(encoding) - let mkDeserializer() = new Confluent.Kafka.Serialization.StringDeserializer(encoding) - -/// Creates and wraps a Confluent.Kafka Producer with the supplied configuration -type KafkaProducer private (inner : Producer, topic : string, unsub) = - member __.Inner = inner - member __.Topic = topic - - interface IDisposable with member __.Dispose() = unsub(); inner.Dispose() - - /// Produces a single item, yielding a response upon completion/failure of the ack - /// - /// There's no assurance of ordering [without dropping `maxInFlight` down to `1` and annihilating throughput]. - /// Thus its critical to ensure you don't submit another message for the same key until you've had a success / failure response from the call. - member __.ProduceAsync(key, value) : Async>= async { - let! res = inner.ProduceAsync(topic, key = key, ``val`` = value) |> Async.AwaitTaskCorrect - // Propulsion.Kafka.Producer duplicates this check, but this one should remain for consistency with Confluent.Kafka v1 - if res.Error.HasError then return failwithf "ProduceAsync error %O" res.Error - return res } - - static member Create(log : ILogger, config : KafkaProducerConfig, topic : string): KafkaProducer = - if String.IsNullOrEmpty topic then nullArg "topic" - log.Information("Producing... {bootstrapServers} / {topic} compression={compression} maxInFlight={maxInFlight} acks={acks}", - config.BootstrapServers, topic, config.Compression, config.MaxInFlight, config.Acks) - let p = new Producer(config.Inner.Render(), mkSerializer (), mkSerializer()) - let d1 = p.OnLog.Subscribe(fun m -> log.Information("Producing... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) - let d2 = p.OnError.Subscribe(fun e -> log.Error("Producing... {reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) - new KafkaProducer(p, topic, fun () -> for x in [d1;d2] do x.Dispose()) - -type IProducer<'K,'V> = Confluent.Kafka.Producer<'K,'V> -type DeliveryReport<'K,'V> = Confluent.Kafka.Message<'K,'V> - -type BatchedProducer private (log: ILogger, inner : IProducer, topic : string) = - member __.Inner = inner - member __.Topic = topic - - interface IDisposable with member __.Dispose() = inner.Dispose() - - /// Produces a batch of supplied key/value messages. Results are returned in order of writing (which may vary from order of submission). - /// - /// 1. if there is an immediate local config issue - /// 2. upon receipt of the first failed `DeliveryReport` (NB without waiting for any further reports, which can potentially leave some results in doubt should a 'batch' get split) - /// - /// Note that the delivery and/or write order may vary from the supplied order unless `maxInFlight` is 1 (which massively constrains throughput). - /// Thus it's important to note that supplying >1 item into the queue bearing the same key without maxInFlight=1 risks them being written out of order onto the topic. - member __.ProduceBatch(keyValueBatch : (string * string)[]) = async { - if Array.isEmpty keyValueBatch then return [||] else - - let! ct = Async.CancellationToken - - let tcs = new TaskCompletionSource[]>() - let numMessages = keyValueBatch.Length - let numMessages = keyValueBatch.Length - let results = Array.zeroCreate> numMessages - let numCompleted = ref 0 - - use _ = ct.Register(fun _ -> tcs.TrySetCanceled() |> ignore) - - let handler (m : DeliveryReport) = - if m.Error.HasError then - let errorMsg = exn (sprintf "Error on message topic=%s code=%O reason=%s" m.Topic m.Error.Code m.Error.Reason) - tcs.TrySetException errorMsg |> ignore - else - let i = Interlocked.Increment numCompleted - results.[i - 1] <- m - if i = numMessages then tcs.TrySetResult results |> ignore - let handler' = - { new IDeliveryHandler with - member __.MarshalData = false - member __.HandleDeliveryReport m = handler m } - for key,value in keyValueBatch do - inner.ProduceAsync(topic, key, value, blockIfQueueFull = true, deliveryHandler = handler') - log.Debug("Produced {count}",!numCompleted) - return! Async.AwaitTaskCorrect tcs.Task } - - /// Creates and wraps a Confluent.Kafka Producer that affords a batched production mode. - /// The default settings represent a best effort at providing batched, ordered delivery semantics - /// NB See caveats on the `ProduceBatch` API for further detail as to the semantics - static member CreateWithConfigOverrides - ( log : ILogger, config : KafkaProducerConfig, topic : string, - /// Default: 1 - /// NB Having a <> 1 value for maxInFlight runs two risks due to the intrinsic lack of - /// batching mechanisms within the Confluent.Kafka client: - /// 1) items within the initial 'batch' can get written out of order in the face of timeouts and/or retries - /// 2) items beyond the linger period may enter a separate batch, which can potentially get scheduled for transmission out of order - ?maxInFlight, - /// Having a non-zero linger is critical to items getting into the correct groupings - /// (even if it of itself does not guarantee anything based on Kafka's guarantees). Default: 100ms - ?linger: TimeSpan) : BatchedProducer = - let lingerMs = match linger with Some x -> int x.TotalMilliseconds | None -> 100 - log.Information("Producing... Using batch Mode with linger={lingerMs}", lingerMs) - config.Inner.LingerMs <- Nullable lingerMs - config.Inner.MaxInFlight <- Nullable (defaultArg maxInFlight 1) - let inner = KafkaProducer.Create(log, config, topic) - new BatchedProducer(log, inner.Inner, topic) - -/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specfic settings -[] -type KafkaConsumerConfig = private { inner: ConsumerConfig; topics: string list; buffering: Core.ConsumerBufferingConfig } with - member __.Buffering = __.buffering - member __.Inner = __.inner - member __.Topics = __.topics - - /// Builds a Kafka Consumer Config suitable for KafkaConsumer.Start* - static member Create - ( /// Identify this consumer in logs etc - clientId, bootstrapServers, topics, - /// Consumer group identifier. - groupId, - /// Specifies handling when Consumer Group does not yet have an offset recorded. Confluent.Kafka default: start from Latest. Default: start from Earliest. - ?autoOffsetReset, - /// Default 100kB. - ?fetchMaxBytes, - /// Minimum number of bytes to wait for (subject to timeout with default of 100ms). Default 1B. - ?fetchMinBytes, - /// Stats reporting interval for the consumer. Default: no reporting. - ?statisticsInterval : TimeSpan, - ?config : IDictionary, - /// Misc configuration parameter to be passed to the underlying CK consumer. - ?custom, - /// Postprocesses the ConsumerConfig after the rest of the rules have been applied - ?customize, - ?autoCommitInterval, - - (* Client-side batching / limiting of reading ahead to constrain memory consumption *) - - /// Minimum total size of consumed messages in-memory for the consumer to attempt to fill. Default 2/3 of maxInFlightBytes. - ?minInFlightBytes, - /// Maximum total size of consumed messages in-memory before broker polling is throttled. Default 24MiB. - ?maxInFlightBytes, - /// Message batch linger time. Default 500ms. - ?maxBatchDelay, - /// Maximum number of messages to group per batch on consumer callbacks for BatchedConsumer. Default 1000. - ?maxBatchSize) = - let maxInFlightBytes = defaultArg maxInFlightBytes (16L * 1024L * 1024L) - let minInFlightBytes = defaultArg minInFlightBytes (maxInFlightBytes * 2L / 3L) - let fetchMaxBytes = defaultArg fetchMaxBytes 100_000 - let c = - ConsumerConfig( - ClientId = clientId, BootstrapServers = bootstrapServers, GroupId = groupId, - AutoOffsetReset = Nullable (defaultArg autoOffsetReset AutoOffsetReset.Earliest), // default: latest - FetchMaxBytes = Nullable fetchMaxBytes, // default: 524_288_000 - EnableAutoCommit = Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call - EnableAutoOffsetStore = Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets - LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 - config |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v)) - fetchMinBytes |> Option.iter (fun x -> c.FetchMinBytes <- x) // Fetch waits for this amount of data for up to FetchWaitMaxMs (100) - statisticsInterval |> Option.iter (fun x -> c.StatisticsIntervalMs <- Nullable <| int x.TotalMilliseconds) - autoCommitInterval |> Option.iter (fun x -> c.AutoCommitIntervalMs <- Nullable <| int x.TotalMilliseconds) - custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v)) - customize |> Option.iter unit> (fun f -> f c) - { inner = c - topics = match Seq.toList topics with [] -> invalidArg "topics" "must be non-empty collection" | ts -> ts - buffering = { - maxBatchDelay = defaultArg maxBatchDelay (TimeSpan.FromMilliseconds 500.); maxBatchSize = defaultArg maxBatchSize 1000 - minInFlightBytes = minInFlightBytes; maxInFlightBytes = maxInFlightBytes } } - -// Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md -type KafkaPartitionMetrics = - { partition: int - [] - fetchState: string - [] - nextOffset: int64 - [] - storedOffset: int64 - [] - committedOffset: int64 - [] - loOffset: int64 - [] - hiOffset: int64 - [] - consumerLag: int64 } - -type OffsetValue = - | Unset - | Valid of value: int64 - override this.ToString() = - match this with - | Unset -> "Unset" - | Valid value -> value.ToString() -module OffsetValue = - let ofOffset (offset : Offset) = - match offset.Value with - | _ when offset = Offset.Invalid -> Unset - | valid -> Valid valid - -type ConsumerBuilder = - static member private WithLogging(log: ILogger, c : Consumer<_,_>, ?onRevoke) = - let d1 = c.OnLog.Subscribe(fun m -> - log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) - let d2 = c.OnError.Subscribe(fun e -> - log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) - let d3 = c.OnPartitionsAssigned.Subscribe(fun tps -> - for topic,partitions in tps |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> p.Partition |]) do - log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions) - c.Assign tps) - let d4 = c.OnPartitionsRevoked.Subscribe(fun tps -> - for topic,partitions in tps |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> p.Partition |]) do - log.Information("Consuming... Revoked {topic:l} {partitions}", topic, partitions) - c.Unassign () - onRevoke |> Option.iter (fun f -> f tps)) - let d5 = c.OnPartitionEOF.Subscribe(fun tpo -> - log.Verbose("Consuming... EOF {topic} partition={partition} offset={offset}", tpo.Topic, tpo.Partition, let o = tpo.Offset in o.Value)) - let d6 = c.OnOffsetsCommitted.Subscribe(fun cos -> - for t,ps in cos.Offsets |> Seq.groupBy (fun p -> p.Topic) do - let o = seq { for p in ps -> p.Partition, OffsetValue.ofOffset p.Offset(*, fmtError p.Error*) } - let e = cos.Error - if not e.HasError then log.Information("Consuming... Committed {topic} {offsets}", t, o) - else log.Warning("Consuming... Committed {topic} {offsets} reason={error} code={code} isBrokerError={isBrokerError}", t, o, e.Reason, e.Code, e.IsBrokerError)) - let d7 = c.OnStatistics.Subscribe(fun json -> - let stats = JToken.Parse json - for t in stats.Item("topics").Children() do - if t.HasValues && c.Subscription |> Seq.exists (fun ct -> ct = t.First.Item("topic").ToString()) then - let topic, partitions = let tm = t.First in tm.Item("topic").ToString(), tm.Item("partitions").Children() - let metrics = [| - for tm in partitions do - if tm.HasValues then - let kpm = tm.First.ToObject() - if kpm.partition <> -1 then - yield kpm |] - let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag) - log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics)) - fun () -> for d in [d1;d2;d3;d4;d5;d6;d7] do d.Dispose() - static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) = - let consumer = new Consumer<_,_>(config.Render(), mkDeserializer(), mkDeserializer()) - let unsubLog = ConsumerBuilder.WithLogging(log, consumer, ?onRevoke = onRevoke) - consumer, unsubLog diff --git a/src/Propulsion.Kafka0/Propulsion.Kafka0.fsproj b/src/Propulsion.Kafka0/Propulsion.Kafka0.fsproj index 6b47f243..45338709 100644 --- a/src/Propulsion.Kafka0/Propulsion.Kafka0.fsproj +++ b/src/Propulsion.Kafka0/Propulsion.Kafka0.fsproj @@ -6,7 +6,7 @@ false true true - true + true KAFKA0 $(DefineConstants);NET461 @@ -14,11 +14,8 @@ - - - + - @@ -30,9 +27,8 @@ - - + diff --git a/src/Propulsion/Propulsion.fsproj b/src/Propulsion/Propulsion.fsproj index b23eb225..528517de 100644 --- a/src/Propulsion/Propulsion.fsproj +++ b/src/Propulsion/Propulsion.fsproj @@ -6,7 +6,7 @@ false true true - true + true NET461 diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index a3ded3e1..e8860ce9 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -63,8 +63,8 @@ module Helpers = type TestMeta = { key : string; value : string; partition : int; offset : int64 } let mapParallelConsumeResultToKeyValuePair (x : ConsumeResult<_, _>) : KeyValuePair = - let m = Bindings.mapMessage x - KeyValuePair(m.Key, JsonConvert.SerializeObject { key = m.Key; value = m.Value; partition = Bindings.partitionId x; offset = let o = x.Offset in o.Value }) + let m = Binding.message x + KeyValuePair(m.Key, JsonConvert.SerializeObject { key = m.Key; value = m.Value; partition = Binding.partitionValue x.Partition; offset = let o = x.Offset in o.Value }) type TestMessage = { producerId : int ; messageId : int } type ConsumedTestMessage = { consumerId : int ; meta : TestMeta; payload : TestMessage } type ConsumerCallback = ConsumerPipeline -> ConsumedTestMessage -> Async @@ -176,9 +176,9 @@ module Helpers = } let mapStreamConsumeResultToDataAndContext (x: ConsumeResult<_,string>) : byte[] * obj = - let m = Bindings.mapMessage x + let m = Binding.message x System.Text.Encoding.UTF8.GetBytes(m.Value), - box { key = m.Key; value = m.Value; partition = Bindings.partitionId x; offset = let o = x.Offset in o.Value } + box { key = m.Key; value = m.Value; partition = Binding.partitionValue x.Partition; offset = let o = x.Offset in o.Value } let runConsumersStream log (config : KafkaConsumerConfig) (numConsumers : int) (timeout : TimeSpan option) (handler : ConsumerCallback) = async { let mkConsumer (consumerId : int) = async { @@ -243,7 +243,7 @@ and ParallelConsumer(testOutputHelper) = do! __.RunProducers(log, bootstrapServers, topic, 1, 10) // populate the topic with a few messages - let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId) + let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, AutoOffsetReset.Earliest) let! r = Async.Catch <| __.RunConsumers(log, config, 1, (fun _ _ -> async { return raise <|IndexOutOfRangeException() })) test <@ match r with @@ -265,7 +265,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche #if DEBUG let numConsumers = 10 #else - // TODO debug why this is happy locally buy not on the AzureDevOps CI Rig + // TODO debug why this is happy locally but not on the AzureDevOps CI Rig let numConsumers = 1 #endif let messagesPerProducer = 1000 @@ -287,7 +287,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche // Section: run the test let producers = __.RunProducers(log, bootstrapServers, topic, numProducers, messagesPerProducer) - let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, statisticsInterval=TimeSpan.FromSeconds 5.) + let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, AutoOffsetReset.Earliest, statisticsInterval=TimeSpan.FromSeconds 5.) let consumers = __.RunConsumers(log, config, numConsumers, consumerCallback) let! _ = Async.Parallel [ producers ; consumers ] @@ -330,7 +330,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche let messageCount = ref 0 let groupId1 = newId() - let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId1) + let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId1, AutoOffsetReset.Earliest) do! __.RunConsumers(log, config, 1, (fun c _m -> async { if Interlocked.Increment(messageCount) >= numMessages then c.Stop() })) @@ -338,7 +338,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche let messageCount = ref 0 let groupId2 = newId() - let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId2) + let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId2, AutoOffsetReset.Earliest) do! __.RunConsumers(log, config, 1, (fun c _m -> async { if Interlocked.Increment(messageCount) >= numMessages then c.Stop() })) @@ -350,7 +350,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche let numMessages = 10 let topic = newId() // dev kafka topics are created and truncated automatically let groupId = newId() - let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, autoCommitInterval=TimeSpan.FromSeconds 1.) + let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, AutoOffsetReset.Earliest, autoCommitInterval=TimeSpan.FromSeconds 1.) do! __.RunProducers(log, bootstrapServers, topic, 1, numMessages) // populate the topic with a few messages @@ -378,7 +378,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche let numMessages = 10 let topic = newId() // dev kafka topics are created and truncated automatically let groupId = newId() - let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId) + let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, AutoOffsetReset.Earliest) do! __.RunProducers(log, bootstrapServers, topic, 1, numMessages) // populate the topic with a few messages @@ -413,7 +413,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche let maxBatchSize = 20 let topic = newId() // dev kafka topics are created and truncated automatically let groupId = newId() - let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, maxBatchSize=maxBatchSize) + let config = KafkaConsumerConfig.Create("panther", bootstrapServers, [topic], groupId, AutoOffsetReset.Earliest, maxBatchSize=maxBatchSize) // Produce messages in the topic do! __.RunProducers(log, bootstrapServers, topic, 1, numMessages) diff --git a/tests/Propulsion.Kafka.Integration/MonitorIntegration.fs b/tests/Propulsion.Kafka.Integration/MonitorIntegration.fs index 1ea634de..72270bf2 100644 --- a/tests/Propulsion.Kafka.Integration/MonitorIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/MonitorIntegration.fs @@ -15,7 +15,7 @@ let mkProducer log broker topic = // test config creates topics with 4 partitions let testPartitionCount = 4 let createConsumerConfig broker topic groupId = - KafkaConsumerConfig.Create("tiger", broker, [topic], groupId, maxBatchSize = 1) + KafkaConsumerConfig.Create("tiger", broker, [topic], groupId, AutoOffsetReset.Earliest, maxBatchSize = 1) let startConsumerFromConfig log config handler = let handler' r = async { do! handler r @@ -37,7 +37,7 @@ let onlyConsumeFirstBatchHandler = let observedPartitions = System.Collections.Concurrent.ConcurrentDictionary() fun (item : ConsumeResult) -> async { // make first handle succeed to ensure consumer has offsets - let partitionId = Bindings.partitionValue item.Partition + let partitionId = Binding.partitionValue item.Partition if not <| observedPartitions.TryAdd(partitionId,()) then do! Async.Sleep Int32.MaxValue } type TimeoutGuard(?maxMinutes) = diff --git a/tests/Propulsion.Kafka.Integration/MonitorTests.fs b/tests/Propulsion.Kafka.Integration/MonitorTests.fs deleted file mode 100644 index 239da66d..00000000 --- a/tests/Propulsion.Kafka.Integration/MonitorTests.fs +++ /dev/null @@ -1,111 +0,0 @@ -module Propulsion.Kafka.Integration.MonitorTests - -open FsKafka -open Propulsion.Kafka -open Propulsion.Kafka.MonitorImpl -open Swensen.Unquote -open System.Diagnostics -open Xunit - -let partitionInfo partition consumerOffset lag : PartitionInfo = - { partition = partition - consumerOffset = Valid consumerOffset - earliestOffset = Valid 0L - highWatermarkOffset = Valid 0L - lag = lag } - -let consumerInfo consumerOffset lag = - [| partitionInfo 0 consumerOffset lag // custom partition - partitionInfo 1 consumerOffset 0L // non-lagging partition - |] |> Window - -// Tests based on https://github.com/linkedin/Burrow/wiki/Consumer-Lag-Evaluation-Rules#examples -// In all cases, we include a partition which has no lag, to ensure that the absence of lag on one -// partition does not cause the monitor to ignore lag on another partition. - -let checkRules = Array.ofSeq << Rules.checkRulesForAllPartitions - -[] -let ``No errors because rule 1 is met`` () = - let consumerInfos = [| - consumerInfo 10L 0L - consumerInfo 20L 0L - consumerInfo 30L 0L - consumerInfo 40L 0L - consumerInfo 50L 0L - consumerInfo 60L 0L - consumerInfo 70L 1L - consumerInfo 80L 3L - consumerInfo 90L 5L - consumerInfo 100L 5L - |] - - let result = checkRules consumerInfos - - test - <@ result = - [| 0, OkReachedZero - 1, OkReachedZero |] @> - -[] -let ``Error because rule 2 is violated`` () = - let consumerInfos = [| - consumerInfo 10L 1L - consumerInfo 10L 1L - consumerInfo 10L 1L - consumerInfo 10L 1L - consumerInfo 10L 1L - consumerInfo 10L 2L - consumerInfo 10L 2L - consumerInfo 10L 2L - consumerInfo 10L 3L - consumerInfo 10L 3L - |] - - let result = checkRules consumerInfos - - test <@ result = - [| 0, ErrorPartitionStalled 3L - 1, OkReachedZero |] @> - -[] -let ``Error because rule 3 is violated`` () = - let consumerInfos = [| - consumerInfo 10L 1L - consumerInfo 20L 1L - consumerInfo 30L 1L - consumerInfo 40L 1L - consumerInfo 50L 1L - consumerInfo 60L 2L - consumerInfo 70L 2L - consumerInfo 80L 2L - consumerInfo 90L 3L - consumerInfo 100L 3L - |] - - let result = checkRules consumerInfos - - test <@ result = - [| 0, WarningLagIncreasing - 1, OkReachedZero |] @> - -[] -let ``No error because rule 3 is not violated`` () = - let consumerInfos = [| - consumerInfo 10L 5L - consumerInfo 20L 3L - consumerInfo 30L 5L - consumerInfo 40L 2L - consumerInfo 50L 1L - consumerInfo 60L 1L - consumerInfo 70L 2L - consumerInfo 80L 1L - consumerInfo 90L 4L - consumerInfo 100L 6L - |] - - let result = checkRules consumerInfos - - test <@ result = - [| 0, Healthy - 1, OkReachedZero |] @> diff --git a/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj b/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj index fd515acb..1435018d 100644 --- a/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj +++ b/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj @@ -9,7 +9,6 @@ - diff --git a/tests/Propulsion.Kafka0.Integration/Propulsion.Kafka0.Integration.fsproj b/tests/Propulsion.Kafka0.Integration/Propulsion.Kafka0.Integration.fsproj index d128afa4..1d61c871 100644 --- a/tests/Propulsion.Kafka0.Integration/Propulsion.Kafka0.Integration.fsproj +++ b/tests/Propulsion.Kafka0.Integration/Propulsion.Kafka0.Integration.fsproj @@ -10,7 +10,6 @@ - diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index a5f532e8..a705b638 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -14,6 +14,16 @@ open System open System.Collections.Generic open System.Diagnostics +module Config = + let validateBrokerUri (broker : Uri) = + if not broker.IsAbsoluteUri then invalidArg "broker" "should be of 'host:port' format" + if String.IsNullOrEmpty broker.Authority then + // handle a corner case in which Uri instances are erroneously putting the hostname in the `scheme` field. + if System.Text.RegularExpressions.Regex.IsMatch(string broker, "^\S+:[0-9]+$") then string broker + else invalidArg "broker" "should be of 'host:port' format" + + else broker.Authority + exception MissingArg of string let private getEnvVarForArgumentOrThrow varName argName =