Skip to content

Commit

Permalink
Update to FsKafka/FsKafka0 1.4.2 (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored May 13, 2020
1 parent 17e3857 commit edfb9be
Show file tree
Hide file tree
Showing 23 changed files with 60 additions and 1,024 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- `Kafka`: Targets [`FsKafka`/`FsKafka0` v `1.4.2`](https://github.com/jet/FsKafka/blob/master/CHANGELOG.md#1.4.2) [#64](https://github.com/jet/propulsion/pull/64)

### Removed

- `Propulsion.Kafka0` Some `Propulsion.Kafka0`-namespaced shimming elements are now found in the `FsKafka` namespace in `FsKafka0` [#64](https://github.com/jet/propulsion/pull/64)
- `Propulsion.Kafka`: `KafkaMonitor` is now found in the `FsKafka` namespace in `FsKafka`/FsKafka0` (NOTE: integration tests continue to live in this repo) [#64](https://github.com/jet/propulsion/pull/64)

### Fixed

- `Kafka`: Change buffer grouping to include `Topic` alongside `PartitionId` - existing implementation did not guarantee marking progress where consuming from more than one Topic concurrently [#63](https://github.com/jet/propulsion/pull/63)
Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project ToolsVersion="15.0">
<Project>
<PropertyGroup>
<Authors>@jet @bartelink @eiriktsarpalis and contributors</Authors>
<Company>Jet.com</Company>
Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project ToolsVersion="15.0">
<Project>
<Target Name="ComputePackageVersion" AfterTargets="MinVer" Condition=" '$(BUILD_PR)' != '' AND '$(BUILD_PR)' != '%24(SYSTEM.PULLREQUEST.PULLREQUESTNUMBER)' ">
<PropertyGroup>
<PackageVersion>$(MinVerMajor).$(MinVerMinor).$(MinVerPatch)-pr.$(BUILD_PR)</PackageVersion>
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ The components within this repository are delivered as a multi-targeted Nuget pa
- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion including `ParallelProjector`, `StreamsProjector`. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics`, `Serilog`
- `Propulsion.Cosmos` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/) Provides bindings to Azure CosmosDb a) writing to `Equinox.Cosmos` :- `CosmosSink` b) reading from CosmosDb's changefeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) :- `CosmosSource`. [Depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDB.ChangeFeedProcessor`, `Serilog`
- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`, `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. Implements a `KafkaMonitor` that can log status information based on [Burrow](https://github.com/linkedin/Burrow). [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v ` = 1.4.1`, `Serilog`
- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but targets older `Confluent.Kafka`/`librdkafka` version for for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`, `Serilog`
- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v ` = 1.4.2`, `Serilog`
- `Propulsion.Kafka0` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka0.svg)](https://www.nuget.org/packages/Propulsion.Kafka0/). Same functionality/purpose as `Propulsion.Kafka` but uses `FsKafka0` instead of `FsKafka` in order to target an older `Confluent.Kafka`/`librdkafka` version pairing for interoperability with systems that have a hard dependency on that. [Depends](https://www.fuget.org/packages/Propulsion.Kafka0) on `Confluent.Kafka [0.11.3]`, `librdkafka.redist [0.11.4]`, `Serilog`

The ubiquitous `Serilog` dependency is solely on the core module, not any sinks, i.e. you configure to emit to `NLog` etc.

Expand Down
17 changes: 2 additions & 15 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
name: $(Rev:r)

# Summary:
# Linux: Tests with netcoreapp3.1 using docker Kafka instance
# net461 on Mono is not working, no investigation why as yet, but local run validates it
# Linux: Tests using docker Kafka instance (note test suite does not run net461 as CK on mono is not supported)
# Windows: Builds and emits nupkg as artifacts
# MacOS: Builds only

jobs:
- job: Windows
pool:
vmImage: 'vs2017-win2016'
vmImage: 'windows-latest'
steps:
- script: dotnet pack build.proj
displayName: dotnet pack build.proj
Expand All @@ -24,12 +23,6 @@ jobs:
pool:
vmImage: 'ubuntu-latest'
steps:
- task: UseDotNet@2
displayName: 'Install .NET Core sdk'
inputs:
packageType: sdk
version: 3.1.101
installationPath: $(Agent.ToolsDirectory)/dotnet
- script: |
docker network create kafka-net
docker run -d --name zookeeper --network kafka-net --publish 2181:2181 zookeeper:3.4
Expand All @@ -52,11 +45,5 @@ jobs:
pool:
vmImage: 'macOS-latest'
steps:
- task: UseDotNet@2
displayName: 'Install .NET Core sdk'
inputs:
packageType: sdk
version: 3.1.101
installationPath: $(Agent.ToolsDirectory)/dotnet
- script: dotnet pack build.proj
displayName: dotnet pack build.proj
2 changes: 1 addition & 1 deletion build.proj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project ToolsVersion="15.0">
<Project>

<Import Project="Directory.Build.props" />

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
<GenerateDocumentationFile Condition=" '$(Configuration)' == 'Release' ">true</GenerateDocumentationFile>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
<GenerateDocumentationFile Condition=" '$(Configuration)' == 'Release' ">true</GenerateDocumentationFile>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ open Serilog
open System
open System.Collections.Generic

module Bindings =
let mapMessage (x : ConsumeResult<string,string>) = x.Message
module Binding =
let mapConsumeResult (x : ConsumeResult<string,string>) = KeyValuePair(x.Message.Key,x.Message.Value)
let inline partitionId (x : ConsumeResult<_,_>) = let p = x.Partition in p.Value
let topicPartition (topic : string) (partition : int) = TopicPartition(topic, Partition partition)
let partitionValue (partition : Partition) = let p = partition in p.Value
let offsetUnset = Offset.Unset
let makeTopicPartition (topic : string) (partition : int) = TopicPartition(topic, Partition partition)
let createConsumer log config : IConsumer<string,string> * (unit -> unit) =
let consumer = ConsumerBuilder.WithLogging(log, config)
consumer, consumer.Close
Expand Down
18 changes: 9 additions & 9 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type KafkaIngestionEngine<'Info>
let mkSubmission topicPartition span : Submission.SubmissionBatch<'S, 'M> =
let checkpoint () =
counter.Delta(-span.reservation) // counterbalance Delta(+) per ingest, below
Bindings.storeOffset log consumer span.highWaterMark
Binding.storeOffset log consumer span.highWaterMark
{ source = topicPartition; onCompletion = checkpoint; messages = span.messages.ToArray() }
let ingest result =
let message = Bindings.mapMessage result
let message = Binding.message result
let sz = approximateMessageBytes message
counter.Delta(+sz) // counterbalanced by Delta(-) in checkpoint(), below
intervalMsgs <- intervalMsgs + 1L
Expand Down Expand Up @@ -111,7 +111,7 @@ type KafkaIngestionEngine<'Info>
submit()
maybeLogStats()
| false, Some intervalRemainder ->
Bindings.tryConsume log consumer intervalRemainder ingest
Binding.tryConsume log consumer intervalRemainder ingest
finally
submit () // We don't want to leak our reservations against the counter and want to pass of messages we ingested
dumpStats () // Unconditional logging when completing
Expand All @@ -134,7 +134,7 @@ type ConsumerPipeline private (inner : IConsumer<string, string>, task : Task<un
float config.Buffering.maxInFlightBytes / 1024. / 1024. / 1024., maxDelay.TotalSeconds, maxItems)
let limiterLog = log.ForContext(Serilog.Core.Constants.SourceContextPropertyName, Core.Constants.messageCounterSourceContext)
let limiter = Core.InFlightMessageCounter(limiterLog, config.Buffering.minInFlightBytes, config.Buffering.maxInFlightBytes)
let consumer, closeConsumer = Bindings.createConsumer log config.Inner // teardown is managed by ingester.Pump()
let consumer, closeConsumer = Binding.createConsumer log config.Inner // teardown is managed by ingester.Pump()
consumer.Subscribe config.Topics
let ingester = KafkaIngestionEngine<'M>(log, limiter, consumer, closeConsumer, mapResult, submit, maxItems, maxDelay, statsInterval=statsInterval)
let cts = new CancellationTokenSource()
Expand Down Expand Up @@ -203,7 +203,7 @@ type ParallelConsumer private () =
static member Start
( log : ILogger, config : KafkaConsumerConfig, maxDop, handle : KeyValuePair<string, string> -> Async<unit>,
?maxSubmissionsPerPartition, ?pumpInterval, ?statsInterval, ?logExternalStats) =
ParallelConsumer.Start<KeyValuePair<string, string>>(log, config, maxDop, Bindings.mapConsumeResult, handle >> Async.Catch,
ParallelConsumer.Start<KeyValuePair<string, string>>(log, config, maxDop, Binding.mapConsumeResult, handle >> Async.Catch,
?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, ?statsInterval=statsInterval, ?logExternalStats=logExternalStats)

type EventMetrics = Streams.EventMetrics
Expand Down Expand Up @@ -299,7 +299,7 @@ module Core =
stats : Streams.Scheduling.Stats<EventMetrics * 'Outcome, EventMetrics * exn>, statsInterval,
?maximizeOffsetWriting, ?maxSubmissionsPerPartition, ?pumpInterval, ?logExternalState, ?idleDelay)=
StreamsConsumer.Start<KeyValuePair<string, string>, 'Outcome>(
log, config, Bindings.mapConsumeResult, keyValueToStreamEvents, prepare, handle, maxDop,
log, config, Binding.mapConsumeResult, keyValueToStreamEvents, prepare, handle, maxDop,
stats, statsInterval=statsInterval,
?maxSubmissionsPerPartition=maxSubmissionsPerPartition,
?pumpInterval=pumpInterval,
Expand All @@ -316,7 +316,7 @@ module Core =
stats : Streams.Scheduling.Stats<EventMetrics * 'Outcome, EventMetrics * exn>, statsInterval,
?maximizeOffsetWriting, ?maxSubmissionsPerPartition, ?pumpInterval, ?logExternalState, ?idleDelay, ?maxBatches) =
StreamsConsumer.Start<KeyValuePair<string, string>, 'Outcome>(
log, config, Bindings.mapConsumeResult, keyValueToStreamEvents, handle, maxDop,
log, config, Binding.mapConsumeResult, keyValueToStreamEvents, handle, maxDop,
stats, statsInterval,
?maxSubmissionsPerPartition=maxSubmissionsPerPartition,
?pumpInterval=pumpInterval,
Expand Down Expand Up @@ -359,7 +359,7 @@ type StreamNameSequenceGenerator() =
member __.ConsumeResultToStreamEvent(toStreamName : ConsumeResult<_, _> -> StreamName)
: ConsumeResult<string, string> -> Propulsion.Streams.StreamEvent<byte[]> seq =
let toDataAndContext (result : ConsumeResult<string, string>) =
let message = Bindings.mapMessage result
let message = Binding.message result
System.Text.Encoding.UTF8.GetBytes message.Value, null
__.ConsumeResultToStreamEvent(toStreamName, toDataAndContext)

Expand All @@ -382,7 +382,7 @@ type StreamNameSequenceGenerator() =
member __.ConsumeResultToStreamEvent(toDataAndContext : ConsumeResult<_, _> -> byte[] * obj, ?defaultCategory)
: ConsumeResult<string, string> -> Propulsion.Streams.StreamEvent<byte[]> seq =
let toStreamName (result : ConsumeResult<string, string>) =
let message = Bindings.mapMessage result
let message = Binding.message result
Core.parseMessageKey (defaultArg defaultCategory "") message.Key
let toTimelineEvent (result : ConsumeResult<string, string>, index) =
let data, context = toDataAndContext result
Expand Down
Loading

0 comments on commit edfb9be

Please sign in to comment.