From d2caf9a007a137994e91ab709c87eb29fe32489b Mon Sep 17 00:00:00 2001 From: Eirik Tsarpalis Date: Fri, 18 May 2018 16:42:49 +0100 Subject: [PATCH] initial commit --- .gitattributes | 26 + .gitignore | 186 ++++++ LICENSE.txt | 24 + build.cmd | 13 + confluent-kafka-fsharp.sln | 58 ++ src/Confluent.Kafka.FSharp/AssemblyInfo.fs | 19 + .../Confluent.Kafka.FSharp.fsproj | 22 + src/Confluent.Kafka.FSharp/ConfluentKafka.fs | 560 ++++++++++++++++++ src/Confluent.Kafka.FSharp/Script.fsx | 3 + src/Confluent.Kafka.FSharp/paket.references | 6 + .../Confluent.Kafka.FSharp.Tests.fsproj | 28 + tests/Confluent.Kafka.FSharp.Tests/Main.fs | 8 + tests/Confluent.Kafka.FSharp.Tests/Tests.fs | 245 ++++++++ tests/Confluent.Kafka.FSharp.Tests/Utils.fs | 41 ++ .../paket.references | 6 + 15 files changed, 1245 insertions(+) create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 LICENSE.txt create mode 100644 build.cmd create mode 100644 confluent-kafka-fsharp.sln create mode 100644 src/Confluent.Kafka.FSharp/AssemblyInfo.fs create mode 100644 src/Confluent.Kafka.FSharp/Confluent.Kafka.FSharp.fsproj create mode 100644 src/Confluent.Kafka.FSharp/ConfluentKafka.fs create mode 100644 src/Confluent.Kafka.FSharp/Script.fsx create mode 100644 src/Confluent.Kafka.FSharp/paket.references create mode 100644 tests/Confluent.Kafka.FSharp.Tests/Confluent.Kafka.FSharp.Tests.fsproj create mode 100644 tests/Confluent.Kafka.FSharp.Tests/Main.fs create mode 100644 tests/Confluent.Kafka.FSharp.Tests/Tests.fs create mode 100644 tests/Confluent.Kafka.FSharp.Tests/Utils.fs create mode 100644 tests/Confluent.Kafka.FSharp.Tests/paket.references diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..7ff73f42 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,26 @@ +# Auto detect text files +* text=auto + +# Custom for Visual Studio +*.cs diff=csharp text=auto eol=lf +*.vb diff=csharp text=auto eol=lf +*.fs diff=csharp text=auto eol=lf +*.fsi diff=csharp text=auto eol=lf +*.fsx diff=csharp text=auto eol=lf +*.sln text eol=crlf merge=union +*.csproj merge=union +*.vbproj merge=union +*.fsproj merge=union +*.dbproj merge=union + +# Standard to msysgit +*.doc diff=astextplain +*.DOC diff=astextplain +*.docx diff=astextplain +*.DOCX diff=astextplain +*.dot diff=astextplain +*.DOT diff=astextplain +*.pdf diff=astextplain +*.PDF diff=astextplain +*.rtf diff=astextplain +*.RTF diff=astextplain diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..368bc72f --- /dev/null +++ b/.gitignore @@ -0,0 +1,186 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. + +# User-specific files +*.suo +*.user +*.sln.docstates + +# Xamarin Studio / monodevelop user-specific +*.userprefs +*.dll.mdb +*.exe.mdb + +# Build results + +[Dd]ebug/ +[Rr]elease/ +x64/ +build/ +[Bb]in/ +[Oo]bj/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +*_i.c +*_p.c +*.ilk +*.meta +*.obj +*.pch +*.pdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.log +*.scc + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opensdf +*.sdf +*.cachefile + +# Visual Studio profiler +*.psess +*.vsp +*.vspx + +# Other Visual Studio data +.vs/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# NCrunch +*.ncrunch* +.*crunch*.local.xml + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.Publish.xml + +# Enable nuget.exe in the .nuget folder (though normally executables are not tracked) +!.nuget/NuGet.exe + +# Windows Azure Build Output +csx +*.build.csdef + +# Windows Store app package directory +AppPackages/ + +# VSCode +.vscode/ + +# Others +sql/ +*.Cache +ClientBin/ +[Ss]tyle[Cc]op.* +~$* +*~ +*.dbmdl +*.[Pp]ublish.xml +*.pfx +*.publishsettings + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file to a newer +# Visual Studio version. Backup files are not needed, because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm + +# SQL Server files +App_Data/*.mdf +App_Data/*.ldf + + +#LightSwitch generated files +GeneratedArtifacts/ +_Pvt_Extensions/ +ModelManifest.xml + +# ========================= +# Windows detritus +# ========================= + +# Windows image file caches +Thumbs.db +ehthumbs.db + +# Folder config file +Desktop.ini + +# Recycle Bin used on file shares +$RECYCLE.BIN/ + +# Mac desktop service store files +.DS_Store + +# =================================================== +# Exclude F# project specific directories and files +# =================================================== + +# NuGet Packages Directory +packages/ + +# Test results produced by build +TestResults.xml + +# Nuget outputs +nuget/*.nupkg +release.cmd +release.sh +localpackages/ +paket-files +*.orig +.paket/paket.exe +docsrc/content/license.md +docsrc/content/release-notes.md +.fake +docsrc/tools/FSharp.Formatting.svclog diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 00000000..68a49daa --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,24 @@ +This is free and unencumbered software released into the public domain. + +Anyone is free to copy, modify, publish, use, compile, sell, or +distribute this software, either in source code form or as a compiled +binary, for any purpose, commercial or non-commercial, and by any +means. + +In jurisdictions that recognize copyright laws, the author or authors +of this software dedicate any and all copyright interest in the +software to the public domain. We make this dedication for the benefit +of the public at large and to the detriment of our heirs and +successors. We intend this dedication to be an overt act of +relinquishment in perpetuity of all present and future rights to this +software under copyright law. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. + +For more information, please refer to diff --git a/build.cmd b/build.cmd new file mode 100644 index 00000000..b2a87f12 --- /dev/null +++ b/build.cmd @@ -0,0 +1,13 @@ +@echo off +cls + +.paket\paket.exe restore +if errorlevel 1 ( + exit /b %errorlevel% +) + +IF NOT EXIST build.fsx ( + .paket\paket.exe update + packages\build\FAKE\tools\FAKE.exe init.fsx +) +packages\build\FAKE\tools\FAKE.exe build.fsx %* diff --git a/confluent-kafka-fsharp.sln b/confluent-kafka-fsharp.sln new file mode 100644 index 00000000..2d4a1789 --- /dev/null +++ b/confluent-kafka-fsharp.sln @@ -0,0 +1,58 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.27703.2000 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".paket", ".paket", "{63297B98-5CED-492C-A5B7-A5B4F73CF142}" + ProjectSection(SolutionItems) = preProject + paket.dependencies = paket.dependencies + paket.lock = paket.lock + EndProjectSection +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Confluent.Kafka.FSharp", "src\Confluent.Kafka.FSharp\Confluent.Kafka.FSharp.fsproj", "{6416ACF3-CF15-4081-9FB5-41FFAE60FA28}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "project", "project", "{BF60BC93-E09B-4E5F-9D85-95A519479D54}" + ProjectSection(SolutionItems) = preProject + .gitattributes = .gitattributes + .gitignore = .gitignore + .travis.yml = .travis.yml + appveyor.yml = appveyor.yml + build.cmd = build.cmd + build.fsx = build.fsx + build.sh = build.sh + LICENSE.txt = LICENSE.txt + README.md = README.md + RELEASE_NOTES.md = RELEASE_NOTES.md + EndProjectSection +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{ED8079DD-2B06-4030-9F0F-DC548F98E1C4}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Confluent.Kafka.FSharp.Tests", "tests\Confluent.Kafka.FSharp.Tests\Confluent.Kafka.FSharp.Tests.fsproj", "{3C1E132C-52DB-4837-88E3-A467F68E3E2A}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{F83AE0FC-2D62-4724-8F8A-8CEBCC420C51}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {6416ACF3-CF15-4081-9FB5-41FFAE60FA28}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6416ACF3-CF15-4081-9FB5-41FFAE60FA28}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6416ACF3-CF15-4081-9FB5-41FFAE60FA28}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6416ACF3-CF15-4081-9FB5-41FFAE60FA28}.Release|Any CPU.Build.0 = Release|Any CPU + {3C1E132C-52DB-4837-88E3-A467F68E3E2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3C1E132C-52DB-4837-88E3-A467F68E3E2A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3C1E132C-52DB-4837-88E3-A467F68E3E2A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3C1E132C-52DB-4837-88E3-A467F68E3E2A}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {6416ACF3-CF15-4081-9FB5-41FFAE60FA28} = {F83AE0FC-2D62-4724-8F8A-8CEBCC420C51} + {3C1E132C-52DB-4837-88E3-A467F68E3E2A} = {ED8079DD-2B06-4030-9F0F-DC548F98E1C4} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {BBF8BBE6-A5F9-4342-91CC-636FABE84B4F} + EndGlobalSection +EndGlobal diff --git a/src/Confluent.Kafka.FSharp/AssemblyInfo.fs b/src/Confluent.Kafka.FSharp/AssemblyInfo.fs new file mode 100644 index 00000000..3e0a7ebb --- /dev/null +++ b/src/Confluent.Kafka.FSharp/AssemblyInfo.fs @@ -0,0 +1,19 @@ +// Auto-Generated by FAKE; do not edit +namespace System +open System.Reflection + +[] +[] +[] +[] +[] +[] +do () + +module internal AssemblyVersionInformation = + let [] AssemblyTitle = "Confluent.Kafka.FSharp" + let [] AssemblyProduct = "confluent-kafka-fsharp" + let [] AssemblyDescription = "Lightweight F# wrapper for Confluent.Kafka" + let [] AssemblyVersion = "0.0.1" + let [] AssemblyFileVersion = "0.0.1" + let [] AssemblyConfiguration = "Release" diff --git a/src/Confluent.Kafka.FSharp/Confluent.Kafka.FSharp.fsproj b/src/Confluent.Kafka.FSharp/Confluent.Kafka.FSharp.fsproj new file mode 100644 index 00000000..0c33c668 --- /dev/null +++ b/src/Confluent.Kafka.FSharp/Confluent.Kafka.FSharp.fsproj @@ -0,0 +1,22 @@ + + + + netstandard2.0; net461 + AnyCPU + bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml + + + 5 + 52,1178 + --warnon:1182 + true + + + + + + + + + + \ No newline at end of file diff --git a/src/Confluent.Kafka.FSharp/ConfluentKafka.fs b/src/Confluent.Kafka.FSharp/ConfluentKafka.fs new file mode 100644 index 00000000..d3cf0e4a --- /dev/null +++ b/src/Confluent.Kafka.FSharp/ConfluentKafka.fs @@ -0,0 +1,560 @@ +namespace Jet.ConfluentKafka + +open System +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +open NLog +open Confluent.Kafka +open System.Reactive.Linq + +[] +module private Helpers = + + let logger = LogManager.GetLogger __SOURCE_FILE__ + let encoding = System.Text.Encoding.UTF8 + + type Logger with + static member Create name = LogManager.GetLogger name + + member inline ts.log (format, level) = + let inline trace (message:string) = ts.Log(level, message) + Printf.kprintf trace format + + member inline ts.log (message:string, level) = ts.Log(level, message) + member inline ts.info format = ts.log (format, LogLevel.Info) + member inline ts.warn format = ts.log (format, LogLevel.Warn) + member inline ts.error format = ts.log (format, LogLevel.Error) + member inline ts.verbose format = ts.log (format, LogLevel.Debug) + member inline ts.trace format = ts.log (format, LogLevel.Trace) + member inline ts.critical format = ts.log (format, LogLevel.Fatal) + + type Async with + static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> = + Async.FromContinuations <| fun (k,ek,_) -> + task.ContinueWith (fun (t:Task<'T>) -> + if t.IsFaulted then + let e = t.Exception + if e.InnerExceptions.Count = 1 then ek e.InnerExceptions.[0] + else ek e + elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled.")) + elif t.IsCompleted then k t.Result + else ek(Exception "invalid Task state!")) + |> ignore + +[] +type Acks = + | Zero + | One + | All + +[] +type Compression = + | None + | GZip + | Snappy + | LZ4 + +[] +type AutoOffsetReset = + | Earliest + | Latest + | None + +[] +type Partitioner = + | Random + | Consistent + | ConsistentRandom + +type KafkaConfiguration = KeyValuePair + +type ConfigKey<'T> = Key of id:string * render:('T -> obj) +with + member k.Id = let (Key(id,_)) = k in id + + static member (==>) (Key(id, render) : ConfigKey<'T>, value : 'T) = + match render value with + | null -> nullArg id + | :? string as str when String.IsNullOrWhiteSpace str -> nullArg id + | obj -> KeyValuePair(id, obj) + +[] +module Config = + + // A small DSL for interfacing with Kafka configuration + // c.f. https://kafka.apache.org/documentation/#configuration + + let private mkKey id render = Key(id, render >> box) + let private ms (t : TimeSpan) = int t.TotalMilliseconds + + // shared keys + let broker = mkKey "bootstrap.servers" (fun (u:Uri) -> string u) + let clientId = mkKey "client.id" id + let maxInFlight = mkKey "max.in.flight.requests.per.connection" id + let logConnectionClose = mkKey "log.connection.close" id + let apiVersionRequest = mkKey "api.version.request" id + let brokerVersionFallback = mkKey "broker.version.fallback" id + let rebalanceTimeout = mkKey "rebalance.timeout.ms" ms + + // producers + let linger = mkKey "linger.ms" ms + let acks = mkKey "acks" (function Acks.Zero -> 0 | Acks.One -> 1 | Acks.All -> -1) + let batchNumMessages = mkKey "batch.num.messages" id + let retries = mkKey "retries" id + let retryBackoff = mkKey "retry.backoff.ms" ms + let timeout = mkKey "request.timeout.ms" ms + let messageSendMaxRetries = mkKey "message.send.max.retries" id + let statisticsInterval = mkKey "statistics.interval.ms" ms + let compression = + mkKey "compression.type" + (function + | Compression.None -> "none" + | Compression.GZip -> "gzip" + | Compression.Snappy -> "snappy" + | Compression.LZ4 -> "lz4") + + let partitioner = + mkKey "partitioner" + (function + | Partitioner.Random -> "random" + | Partitioner.Consistent -> "consistent" + | Partitioner.ConsistentRandom -> "consistent_random") + + // consumers + let enableAutoCommit = mkKey "enable.auto.commit" id + let enableAutoOffsetStore = mkKey "enable.auto.offset.store" id + let autoCommitInterval = mkKey "auto.commit.interval.ms" ms + let groupId = mkKey "group.id" id + let fetchMaxBytes = mkKey "fetch.message.max.bytes" id + let fetchMinBytes = mkKey "fetch.min.bytes" id + let fetchMaxWait = mkKey "fetch.wait.max.ms" ms + let checkCrc = mkKey "check.crcs" id + let heartbeatInterval = mkKey "heartbeat.interval.ms" ms + let sessionTimeout = mkKey "session.timeout.ms" ms + let topicConfig = mkKey "default.topic.config" id : ConfigKey> + let autoOffsetReset = + mkKey "auto.offset.reset" + (function + | AutoOffsetReset.Earliest -> "earliest" + | AutoOffsetReset.Latest -> "lastest" + | AutoOffsetReset.None -> "none") + +// NB we deliberately wrap all types exposed by Confluent.Kafka +// to avoid needing to reference librdkafka everywehere +[] +type KafkaMessage internal (message : Message) = + member internal __.UnderlyingMessage = message + member __.Topic = message.Topic + member __.Partition = message.Partition + member __.Offset = message.Offset.Value + member __.Key = encoding.GetString message.Key + member __.Value = encoding.GetString message.Value + +type KafkaProducer private (producer : Producer, topic : string) = + + let d1 = producer.OnLog.Subscribe (fun m -> logger.info "producer_info|%s level=%d name=%s facility=%s" m.Message m.Level m.Name m.Facility) + let d2 = producer.OnError.Subscribe (fun e -> logger.error "producer_error|%s code=%O isBrokerError=%b" e.Reason e.Code e.IsBrokerError) + + let tryWrap (msg : Message) = + if msg.Error.HasError then + let errorMsg = sprintf "Error writing message topic=%s code=%O reason=%s" topic msg.Error.Code msg.Error.Reason + Choice2Of2(Exception errorMsg) + else + Choice1Of2(KafkaMessage msg) + + /// https://github.com/edenhill/librdkafka/wiki/Statistics + [] + member __.OnStatistics = producer.OnStatistics + member __.Topic = topic + member __.Produce(key : string, value : string) = async { + let keyBytes = match key with null -> [||] | _ -> encoding.GetBytes key + let valueBytes = encoding.GetBytes value + let! msg = + producer.ProduceAsync(topic, keyBytes, valueBytes) + |> Async.AwaitTaskCorrect + + match tryWrap msg with + | Choice1Of2 msg -> return msg + | Choice2Of2 e -> return raise e + } + + /// + /// Produces a batch of supplied key/value messages. Results are returned in order of writing. + /// + /// + /// Enable marshalling of data in returned messages. Defaults to false. + member __.ProduceBatch(keyValueBatch : seq, ?marshalDeliveryReportData : bool) = async { + match Seq.toArray keyValueBatch with + | [||] -> return [||] + | keyValueBatch -> + + let! ct = Async.CancellationToken + + let tcs = new TaskCompletionSource() + let numMessages = keyValueBatch.Length + let results = Array.zeroCreate numMessages + let numCompleted = ref 0 + + use _ = ct.Register(fun _ -> tcs.TrySetCanceled() |> ignore) + + let handler = + { new IDeliveryHandler with + member __.MarshalData = defaultArg marshalDeliveryReportData false + member __.HandleDeliveryReport m = + match tryWrap m with + | Choice2Of2 e -> + tcs.TrySetException e |> ignore + + | Choice1Of2 m -> + let i = Interlocked.Increment numCompleted + results.[i - 1] <- m + if i = numMessages then tcs.TrySetResult results |> ignore } + + + do for key,value in keyValueBatch do + let keyBytes = encoding.GetBytes key + let valueBytes = encoding.GetBytes value + producer.ProduceAsync(topic, + keyBytes, 0, keyBytes.Length, + valueBytes, 0, valueBytes.Length, + blockIfQueueFull = true, + deliveryHandler = handler) + + + return! Async.AwaitTaskCorrect tcs.Task + } + + interface IDisposable with member __.Dispose() = for d in [d1;d2;producer:>_] do d.Dispose() + + /// + /// Creates a Kafka producer instance with supplied configuration + /// + /// + /// + /// Message compression. Defaults to 'none'. + /// Number of retries. Defaults to 60. + /// Backoff interval. Defaults to 1 second. + /// Statistics Interval. Defaults to no stats. + /// Linger time. Defaults to 200 milliseconds. + /// Acks setting. Defaults to 'One'. + /// Partitioner setting. Defaults to 'consistent_random'. + /// A magical flag that attempts to provide compatibility for Kafunk consumers. Defalts to true. + /// Misc configuration parameter to be passed to the underlying CK producer. + static member Create(clientId : string, broker : Uri, topic : string, + ?compression, ?retries, ?retryBackoff, ?statisticsInterval, + ?linger, ?acks, ?partitioner, ?kafunkCompatibility : bool, ?miscConfig) = + if String.IsNullOrEmpty topic then nullArg "topic" + + let linger = defaultArg linger (TimeSpan.FromMilliseconds 200.) + let compression = defaultArg compression Compression.None + let retries = defaultArg retries 60 + let retryBackoff = defaultArg retryBackoff (TimeSpan.FromSeconds 1.) + let partitioner = defaultArg partitioner Partitioner.ConsistentRandom + let acks = defaultArg acks Acks.One + let miscConfig = defaultArg miscConfig [||] + let kafunkCompatibility = defaultArg kafunkCompatibility true + + let config = + [| + yield Config.clientId ==> clientId + yield Config.broker ==> broker + + match statisticsInterval with Some t -> yield Config.statisticsInterval ==> t | None -> () + yield Config.linger ==> linger + yield Config.compression ==> compression + yield Config.partitioner ==> partitioner + yield Config.retries ==> retries + yield Config.retryBackoff ==> retryBackoff + yield Config.acks ==> acks + + if kafunkCompatibility then + yield Config.apiVersionRequest ==> false + yield Config.brokerVersionFallback ==> "0.9.0" + + // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 + yield Config.logConnectionClose ==> false + + yield! miscConfig + |] + + logger.info "Starting Kafka Producer on topic=%s broker=%O compression=%O acks=%O" topic broker compression acks + let producer = new Producer(config, manualPoll = false, disableDeliveryReports = false) + new KafkaProducer(producer, topic) + +type KafkaConsumerConfig = + { + /// Client identifier; should typically be superhero name + clientId : string + broker : Uri + topics : string list + /// Consumer group identifier + groupId : string + + autoOffsetReset : AutoOffsetReset + fetchMinBytes : int + fetchMaxBytes : int + retryBackoff : TimeSpan + retries : int + statisticsInterval : TimeSpan option + + miscConfig : KafkaConfiguration list + + /// Poll timeout used by the Confluent.Kafka consumer + pollTimeout : TimeSpan + /// Maximum number of messages to group per batch on consumer callbacks + maxBatchSize : int + /// Message batch linger time + maxBatchDelay : TimeSpan + /// Maximum total size of consumed messages in-memory before polling is throttled. + maxInFlightBytes : int64 + /// Consumed offsets commit interval + offsetCommitInterval : TimeSpan + } +with + /// + /// Creates a Kafka consumer configuration object. + /// + /// Should typically be superhero name. + /// + /// + /// Consumer group id. + /// Auto offset reset. Defaults to Earliest. + /// Fetch max bytes. Defaults to 100KB. + /// Fetch min bytes. Defaults to 10B. + /// Max retries. Defaults to 60. + /// Retry backoff interval. Defaults to 1 second. + /// Poll timeout used by the CK consumer. Defaults to 200ms. + /// Max number of messages to use in batched consumers. Defaults to 1000. + /// Max batch linger time. Defaults to 1 second. + /// Offset commit interval. Defaults to 2 seconds. + /// Maximum total size of consumed messages in-memory before polling is throttled. Defaults to 64MiB. + /// Misc configuration parameters to pass to the underlying CK consumer. + static member Create(clientId, broker, topics, groupId, + ?autoOffsetReset, ?fetchMaxBytes, ?fetchMinBytes, + ?retries, ?retryBackoff, ?statisticsInterval, + ?pollTimeout, ?maxBatchSize, ?maxBatchDelay, ?offsetCommitInterval, ?maxInFlightBytes, + ?miscConfiguration) = + { + clientId = clientId + broker = broker + topics = + match Seq.toList topics with + | [] -> invalidArg "topics" "must be non-empty collection" + | ts -> ts + + groupId = groupId + + miscConfig = match miscConfiguration with None -> [] | Some c -> Seq.toList c + + autoOffsetReset = defaultArg autoOffsetReset AutoOffsetReset.Earliest + fetchMaxBytes = defaultArg fetchMaxBytes 100000 + fetchMinBytes = defaultArg fetchMinBytes 10 // TODO check if sane default + retries = defaultArg retries 60 + retryBackoff = defaultArg retryBackoff (TimeSpan.FromSeconds 1.) + statisticsInterval = statisticsInterval + + pollTimeout = defaultArg pollTimeout (TimeSpan.FromMilliseconds 200.) + maxBatchSize = defaultArg maxBatchSize 1000 + maxBatchDelay = defaultArg maxBatchDelay (TimeSpan.FromMilliseconds 500.) + maxInFlightBytes = defaultArg maxInFlightBytes (64L * 1024L * 1024L) + offsetCommitInterval = defaultArg offsetCommitInterval (TimeSpan.FromSeconds 10.) + } + +module private ConsumerImpl = + + type KafkaConsumerConfig with + member c.ToKeyValuePairs() = + [| + yield Config.clientId ==> c.clientId + yield Config.broker ==> c.broker + yield Config.groupId ==> c.groupId + + yield Config.fetchMaxBytes ==> c.fetchMaxBytes + yield Config.fetchMinBytes ==> c.fetchMinBytes + yield Config.retries ==> c.retries + yield Config.retryBackoff ==> c.retryBackoff + match c.statisticsInterval with None -> () | Some t -> yield Config.statisticsInterval ==> t + + yield Config.enableAutoCommit ==> true + yield Config.enableAutoOffsetStore ==> false + yield Config.autoCommitInterval ==> c.offsetCommitInterval + + yield Config.topicConfig ==> + [ + Config.autoOffsetReset ==> AutoOffsetReset.Earliest + ] + + // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 + yield Config.logConnectionClose ==> false + + yield! c.miscConfig + |] + + + /// used for calculating approximate message size in bytes + let getMessageSize (message : Message) = + let inline len (x:byte[]) = match x with null -> 0L | _ -> x.LongLength + 16L + len message.Key + len message.Value + + let getBatchOffset (batch : KafkaMessage[]) = + let maxOffset = batch |> Array.maxBy (fun m -> m.Offset) + maxOffset.UnderlyingMessage.TopicPartitionOffset + + let mkMessage (msg : Message) = + if msg.Error.HasError then + failwithf "error consuming message topic=%s partition=%d offset=%O code=%O reason=%s" + msg.Topic msg.Partition msg.Offset msg.Error.Code msg.Error.Reason + + KafkaMessage(msg) + + let mkBatchedMessage (msgs : IList) = + msgs |> Seq.map mkMessage |> Seq.toArray + + type MessageSizeCounter(maxInFlightBytes : int64) = + do if maxInFlightBytes < 1L then invalidArg "maxInFlightBytes" "must be positive value" + let mutable count = 0L + member __.Add(size : int64) = Interlocked.Add(&count, size) |> ignore + member __.IsThresholdReached = count > maxInFlightBytes + member __.Count = count + member __.Max = maxInFlightBytes + + type Consumer with + static member Create (config : KafkaConsumerConfig) = + let kvps = config.ToKeyValuePairs() + let consumer = new Consumer(kvps) + let _ = consumer.OnPartitionsAssigned.Subscribe(fun m -> consumer.Assign m) + let _ = consumer.OnPartitionsRevoked.Subscribe(fun m -> consumer.Unassign()) + consumer.Subscribe config.topics + consumer + + member c.StoreOffset(tpo : TopicPartitionOffset) = + c.StoreOffsets[| TopicPartitionOffset(tpo.Topic, tpo.Partition, Offset(tpo.Offset.Value + 1L)) |] + |> ignore + + member c.RunPoll(pollTimeout : TimeSpan, sizeCounter : MessageSizeCounter) = + let cts = new CancellationTokenSource() + let poll() = + while not cts.IsCancellationRequested do + while sizeCounter.IsThresholdReached do Thread.Sleep 50 + c.Poll(pollTimeout) + + let _ = Async.StartAsTask(async { poll() }) + { new IDisposable with member __.Dispose() = cts.Cancel() } + + member c.OnBatchedPartitionMessages(maxBatchSize : int, maxDelay : TimeSpan) = + c.OnMessage + .GroupBy(fun m -> m.TopicPartition) + .SelectMany(fun group -> group.Buffer(maxDelay, maxBatchSize)) + .Where(fun batch -> batch.Count > 0) + + member c.WithLogging() = + let fmtError (e : Error) = if e.HasError then sprintf " reason=%s code=%O isBrokerError=%b" e.Reason e.Code e.IsBrokerError else "" + let fmtTopicPartitions (topicPartitions : seq) = + topicPartitions + |> Seq.groupBy (fun p -> p.Topic) + |> Seq.map (fun (t,ps) -> sprintf "topic=%s|partitions=[%s]" t (ps |> Seq.map (fun p -> string p.Partition) |> String.concat "; ")) + + let fmtTopicPartitionOffsets (topicPartitionOffsets : seq) = + topicPartitionOffsets + |> Seq.groupBy (fun p -> p.Topic) + |> Seq.map (fun (t,ps) -> sprintf "topic=%s|offsets=[%s]" t (ps |> Seq.map (fun p -> sprintf "%d@%O" p.Partition p.Offset) |> String.concat "; ")) + + let fmtTopicPartitionOffsetErrors (topicPartitionOffsetErrors : seq) = + topicPartitionOffsetErrors + |> Seq.groupBy (fun p -> p.Topic) + |> Seq.map (fun (t,ps) -> sprintf "topic=%s|offsets=[%s]" t (ps |> Seq.map (fun p -> sprintf "%d@%O%s" p.Partition p.Offset (fmtError p.Error)) |> String.concat "; ")) + + let d1 = c.OnLog.Subscribe (fun m -> logger.info "consumer_info|%s level=%d name=%s facility=%s" m.Message m.Level m.Name m.Facility) + let d2 = c.OnError.Subscribe (fun e -> logger.error "consumer_error|%s" (fmtError e)) + let d3 = c.OnPartitionsAssigned.Subscribe (fun tps -> for fmt in fmtTopicPartitions tps do logger.info "consumer_partitions_assigned|%s" fmt) + let d4 = c.OnPartitionsRevoked.Subscribe (fun tps -> for fmt in fmtTopicPartitions tps do logger.info "consumer_partitions_revoked|%s" fmt) + let d5 = c.OnPartitionEOF.Buffer(TimeSpan.FromSeconds 5., 20).Subscribe(fun tpo -> for fmt in fmtTopicPartitionOffsets tpo do logger.verbose "consumer_partition_eof|%s" fmt) + let d6 = c.OnOffsetsCommitted.Subscribe (fun cos -> for fmt in fmtTopicPartitionOffsetErrors cos.Offsets do logger.info "consumer_committed_offsets|%s%s" fmt (fmtError cos.Error)) + { new IDisposable with member __.Dispose() = for d in [d1;d2;d3;d4;d5;d6] do d.Dispose() } + + let mkMessageConsumer (config : KafkaConsumerConfig) (ct : CancellationToken) (consumer : Consumer) (handler : KafkaMessage -> unit) = async { + let tcs = new TaskCompletionSource() + use _ = ct.Register(fun _ -> tcs.TrySetResult () |> ignore) + + use _ = consumer.WithLogging() + + let counter = new MessageSizeCounter(config.maxInFlightBytes) + let runHandler (msg : Message) = + try + let size = getMessageSize msg + counter.Add size + do handler(mkMessage msg) + consumer.StoreOffset msg.TopicPartitionOffset + counter.Add -size + with e -> + tcs.TrySetException e |> ignore + + use _ = consumer.OnMessage.Subscribe runHandler + use _ = consumer.RunPoll(config.pollTimeout, counter) + + // await for handler faults or external cancellation + do! Async.AwaitTaskCorrect tcs.Task + } + + let mkBatchedMessageConsumer (config : KafkaConsumerConfig) (ct : CancellationToken) (consumer : Consumer) (handler : KafkaMessage[] -> Async) = async { + let tcs = new TaskCompletionSource() + use cts = CancellationTokenSource.CreateLinkedTokenSource(ct) + use _ = ct.Register(fun _ -> tcs.TrySetResult () |> ignore) + + use _ = consumer.WithLogging() + + let counter = new MessageSizeCounter(config.maxInFlightBytes) + let runHandler batch = + let wrapper = async { + try + let size = batch |> Seq.sumBy getMessageSize + counter.Add size + let batch = mkBatchedMessage batch + do! handler batch + consumer.StoreOffset(getBatchOffset batch) + counter.Add -size + with e -> + tcs.TrySetException e |> ignore + } + + Async.StartImmediate(wrapper, cts.Token) + + use _ = consumer.OnBatchedPartitionMessages(config.maxBatchSize, config.maxBatchDelay).Subscribe runHandler + use _ = consumer.RunPoll(config.pollTimeout, counter) + + // await for handler faults or external cancellation + do! Async.AwaitTaskCorrect tcs.Task + } + +open ConsumerImpl + +type KafkaConsumer private (config : KafkaConsumerConfig, isBatchConsumer : bool, consumeTask : CancellationToken -> Consumer -> Async) = + let cts = new CancellationTokenSource() + let consumer = Consumer.Create config + let task = Async.StartAsTask(consumeTask cts.Token consumer) + + /// https://github.com/edenhill/librdkafka/wiki/Statistics + [] + member __.OnStatistics = consumer.OnStatistics + member __.Config = config + member __.IsBatchConsumer = isBatchConsumer + member __.Status = task.Status + member __.AwaitConsumer() = Async.AwaitTaskCorrect task + member __.Stop() = cts.Cancel() ; task.Result ; consumer.Dispose() + + interface IDisposable with member __.Dispose() = __.Stop() + + /// Starts a kafka consumer with provided configuration and message handler + static member Start (config : KafkaConsumerConfig) (handler : KafkaMessage -> unit) = + let mkConsumer c ct = ConsumerImpl.mkMessageConsumer config c ct handler + new KafkaConsumer(config, false, mkConsumer) + + /// Starts a kafka consumer with provider configuration and batch message handler. + /// Message batches are grouped by Kafka partition + static member StartBatched (config : KafkaConsumerConfig) (handler : KafkaMessage[] -> Async) = + if List.isEmpty config.topics then invalidArg "config" "must specify at least one topic" + logger.info "Starting Kafka consumer on topics=%A groupId=%s broker=%O autoOffsetReset=%O maxBatchSize=%O" config.topics config.groupId config.broker config.autoOffsetReset config.maxBatchSize + let mkConsumer c ct = ConsumerImpl.mkBatchedMessageConsumer config c ct handler + new KafkaConsumer(config, true, mkConsumer) \ No newline at end of file diff --git a/src/Confluent.Kafka.FSharp/Script.fsx b/src/Confluent.Kafka.FSharp/Script.fsx new file mode 100644 index 00000000..48e5e0e3 --- /dev/null +++ b/src/Confluent.Kafka.FSharp/Script.fsx @@ -0,0 +1,3 @@ +#r "bin/Debug/net461/Confluent.Kafka.FSharp.dll" + +open Jet.ConfluentKafka \ No newline at end of file diff --git a/src/Confluent.Kafka.FSharp/paket.references b/src/Confluent.Kafka.FSharp/paket.references new file mode 100644 index 00000000..a269651b --- /dev/null +++ b/src/Confluent.Kafka.FSharp/paket.references @@ -0,0 +1,6 @@ +FSharp.Core +System.ValueTuple + +Confluent.Kafka +System.Reactive +NLog \ No newline at end of file diff --git a/tests/Confluent.Kafka.FSharp.Tests/Confluent.Kafka.FSharp.Tests.fsproj b/tests/Confluent.Kafka.FSharp.Tests/Confluent.Kafka.FSharp.Tests.fsproj new file mode 100644 index 00000000..b8f1bc4e --- /dev/null +++ b/tests/Confluent.Kafka.FSharp.Tests/Confluent.Kafka.FSharp.Tests.fsproj @@ -0,0 +1,28 @@ + + + + net461; netcoreapp2.0 + AnyCPU + Exe + bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml + true + true + + + 5 + 52,1178 + --warnon:1182 + true + + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/Confluent.Kafka.FSharp.Tests/Main.fs b/tests/Confluent.Kafka.FSharp.Tests/Main.fs new file mode 100644 index 00000000..e4ae2ef8 --- /dev/null +++ b/tests/Confluent.Kafka.FSharp.Tests/Main.fs @@ -0,0 +1,8 @@ +module Jet.ConfluentKafka.Tests.Main + +#nowarn "988" // Main module of program is empty + +// required for netcoreapp builds +#if NETCOREAPP2_0 +[] let main _ = 0 +#endif \ No newline at end of file diff --git a/tests/Confluent.Kafka.FSharp.Tests/Tests.fs b/tests/Confluent.Kafka.FSharp.Tests/Tests.fs new file mode 100644 index 00000000..416ab2c1 --- /dev/null +++ b/tests/Confluent.Kafka.FSharp.Tests/Tests.fs @@ -0,0 +1,245 @@ +module Jet.ConfluentKafka.Tests.``Kafka Integration Tests`` + +open System +open System.Threading +open System.Collections.Concurrent +open System.Threading.Tasks +open Newtonsoft.Json +open NUnit.Framework + +open Jet.ConfluentKafka + +[] +module Helpers = + + type KafkaConsumer with + member c.StopAfter(delay : TimeSpan) = + Task.Delay(delay).ContinueWith(fun (_:Task) -> c.Stop()) |> ignore + + type TestMessage = { producerId : int ; messageId : int } + type ConsumedTestMessage = { consumerId : int ; message : KafkaMessage ; payload : TestMessage } + type ConsumerCallback = + | Message of (KafkaConsumer -> ConsumedTestMessage -> unit) + | Batched of (KafkaConsumer -> ConsumedTestMessage [] -> Async) + + let runProducers (broker : Uri) (topic : string) (numProducers : int) (messagesPerProducer : int) = async { + let runProducer (producerId : int) = async { + use producer = KafkaProducer.Create("panther", broker, topic) + + let! results = + [1 .. messagesPerProducer] + |> Seq.map (fun msgId -> + let key = string msgId + let value = JsonConvert.SerializeObject { producerId = producerId ; messageId = msgId } + key, value) + + |> Seq.chunkBySize 100 + |> Seq.map producer.ProduceBatch + |> Async.Parallel + + return Array.concat results + } + + return! Async.Parallel [for i in 1 .. numProducers -> runProducer i] + } + + let runConsumers (broker : Uri) (topic : string) (groupId : string) (numConsumers : int) (timeout : TimeSpan option) (callback : ConsumerCallback) = async { + let mkConsumer (consumerId : int) = async { + let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId) + let deserialize (msg : KafkaMessage) = { consumerId = consumerId ; message = msg ; payload = JsonConvert.DeserializeObject<_> msg.Value } + + // need to pass the consumer instance to the handler callback + // do a bit of cyclic dependency fixups + let consumerCell = ref None + let rec getConsumer() = + // avoid potential race conditions by polling + match !consumerCell with + | None -> Thread.SpinWait 20; getConsumer() + | Some c -> c + + let consumer = + match callback with + | Message handler -> KafkaConsumer.Start config (fun msg -> handler (getConsumer()) (deserialize msg)) + | Batched handler -> KafkaConsumer.StartBatched config (fun batch -> handler (getConsumer()) (Array.map deserialize batch)) + + consumerCell := Some consumer + + timeout |> Option.iter (fun t -> consumer.StopAfter t) + + do! consumer.AwaitConsumer() + } + + do! Async.Parallel [for i in 1 .. numConsumers -> mkConsumer i] |> Async.Ignore + } + + +let getKafkaTestBroker() = + match Environment.GetEnvironmentVariable "CONFLUENT_KAFKA_TEST_BROKER" with + | x when String.IsNullOrWhiteSpace x -> invalidOp "Test runner needs to specify CONFLUENT_KAFKA_TEST_BROKER environment variable" + | uri -> Uri uri + +[] +let ``ConfluentKafka producer-consumer basic roundtrip`` () = utask { + let broker = getKafkaTestBroker() + + let numProducers = 10 + let numConsumers = 10 + let messagesPerProducer = 1000 + + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + + let consumedBatches = new ConcurrentBag() + let consumerCallback = + Batched(fun consumer batch -> + async { + do consumedBatches.Add batch + let messageCount = consumedBatches |> Seq.sumBy Array.length + // signal cancellation if consumed items reaches expected size + if messageCount >= numProducers * messagesPerProducer then + consumer.Stop() + }) + + // Section: run the test + let producers = runProducers broker topic numProducers messagesPerProducer |> Async.Ignore + let consumers = runConsumers broker topic groupId numConsumers None consumerCallback + + do! + [ producers ; consumers ] + |> Async.Parallel + |> Async.Ignore + + // Section: assertion checks + let ``consumed batches should be non-empty`` = + consumedBatches + |> Seq.forall (not << Array.isEmpty) + + Assert.IsTrue(``consumed batches should be non-empty``, "consumed batches should all be non-empty") + + let ``batches should be grouped by partition`` = + consumedBatches + |> Seq.map (fun batch -> batch |> Seq.distinctBy (fun b -> b.message.Partition) |> Seq.length) + |> Seq.forall (fun numKeys -> numKeys = 1) + + Assert.IsTrue(``batches should be grouped by partition``, "batches should be grouped by partition") + + let allMessages = + consumedBatches + |> Seq.concat + |> Seq.toArray + + let ``all message keys should have expected value`` = + allMessages |> Array.forall (fun msg -> int msg.message.Key = msg.payload.messageId) + + Assert.IsTrue(``all message keys should have expected value``, "all message keys should have expected value") + + let ``should have consumed all expected messages`` = + allMessages + |> Array.groupBy (fun msg -> msg.payload.producerId) + |> Array.map (fun (_, gp) -> gp |> Array.distinctBy (fun msg -> msg.payload.messageId)) + |> Array.forall (fun gp -> gp.Length = messagesPerProducer) + + Assert.IsTrue(``should have consumed all expected messages``, "should have consumed all expected messages") +} + +[] +let ``ConfluentKafka consumer should have expected exception semantics`` () = utask { + let broker = getKafkaTestBroker() + + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + + let! _ = runProducers broker topic 1 10 // populate the topic with a few messages + + runConsumers broker topic groupId 1 None (Message (fun _ _ -> raise <| IndexOutOfRangeException())) + |> Assert.ThrowsAsync +} + +[] +let ``Given a topic different consumer group ids should be consuming the same message set`` () = utask { + let broker = getKafkaTestBroker() + let numMessages = 10 + + let topic = newId() // dev kafka topics are created and truncated automatically + + let! _ = runProducers broker topic 1 numMessages // populate the topic with a few messages + + let messageCount = ref 0 + let groupId1 = newId() + let! _ = + Message (fun c _ -> if Interlocked.Increment messageCount = numMessages then c.Stop()) + |> runConsumers broker topic groupId1 1 None + + Assert.AreEqual(numMessages, !messageCount) + + let messageCount = ref 0 + let groupId2 = newId() + let! _ = + Message (fun c _ -> if Interlocked.Increment messageCount = numMessages then c.Stop()) + |> runConsumers broker topic groupId2 1 None + + Assert.AreEqual(numMessages, !messageCount) +} + +[] +let ``Spawning a new consumer with same consumer group id should not receive new messages`` () = utask { + let broker = getKafkaTestBroker() + + let numMessages = 10 + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + + let! _ = runProducers broker topic 1 numMessages // populate the topic with a few messages + + // expected to read 10 messages from the first consumer + let messageCount = ref 0 + let! _ = + Message (fun c _ -> + if Interlocked.Increment messageCount = numMessages then + c.StopAfter(TimeSpan.FromSeconds 1.)) // cancel after 1 second to allow offsets to be committed) + |> runConsumers broker topic groupId 1 None + + Assert.AreEqual(numMessages, !messageCount) + + // expected to read no messages from the subsequent consumer + let messageCount = ref 0 + let! _ = + Message (fun c _ -> if Interlocked.Increment messageCount = numMessages then c.Stop()) + |> runConsumers broker topic groupId 1 (Some (TimeSpan.FromSeconds 10.)) + + Assert.AreEqual(0, !messageCount) +} + +[] +let ``Commited offsets should not result in missing messages`` () = utask { + let broker = getKafkaTestBroker() + + let numMessages = 10 + let topic = newId() // dev kafka topics are created and truncated automatically + let groupId = newId() + + let! _ = runProducers broker topic 1 numMessages // populate the topic with a few messages + + // expected to read 10 messages from the first consumer + let messageCount = ref 0 + let! _ = + Message (fun c _ -> + if Interlocked.Increment messageCount = numMessages then + c.StopAfter(TimeSpan.FromSeconds 1.)) // cancel after 1 second to allow offsets to be committed) + |> runConsumers broker topic groupId 1 None + + Assert.AreEqual(numMessages, !messageCount) + + let! _ = runProducers broker topic 1 numMessages // produce more messages + + // expected to read 10 messages from the subsequent consumer, + // this is to verify there are no off-by-one errors in how offsets are committed + let messageCount = ref 0 + let! _ = + Message (fun c _ -> + if Interlocked.Increment messageCount = numMessages then + c.StopAfter(TimeSpan.FromSeconds 1.)) // cancel after 1 second to allow offsets to be committed) + |> runConsumers broker topic groupId 1 None + + Assert.AreEqual(numMessages, !messageCount) +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.FSharp.Tests/Utils.fs b/tests/Confluent.Kafka.FSharp.Tests/Utils.fs new file mode 100644 index 00000000..c992b13b --- /dev/null +++ b/tests/Confluent.Kafka.FSharp.Tests/Utils.fs @@ -0,0 +1,41 @@ +[] +module Jet.ConfluentKafka.Tests.Utils + +open System.Threading +open System.Threading.Tasks +open NUnit.Framework + +let newId() = System.Guid.NewGuid().ToString("N") + +type Assert with + static member ThrowsAsync<'ExpectedExn, 'T when 'ExpectedExn :> exn> (workflow : Async<'T>) : unit = + let testDelegate = AsyncTestDelegate(fun () -> Async.StartAsTask workflow :> Task) + Assert.ThrowsAsync<'ExpectedExn>(testDelegate) |> ignore + +[] +type AsyncBuilderAbstract() = + member __.Zero() = async.Zero() + member __.Return t = async.Return t + member __.ReturnFrom t = async.ReturnFrom t + member __.Bind(f,g) = async.Bind(f,g) + member __.Combine(f,g) = async.Combine(f,g) + member __.Delay f = async.Delay f + member __.While(c,b) = async.While(c,b) + member __.For(xs,b) = async.For(xs,b) + member __.Using(d,b) = async.Using(d,b) + member __.TryWith(b,e) = async.TryWith(b,e) + member __.TryFinally(b,f) = async.TryFinally(b,f) + +type TaskBuilder(?ct : CancellationToken) = + inherit AsyncBuilderAbstract() + member __.Run f : Task<'T> = Async.StartAsTask(f, ?cancellationToken = ct) + +type UnitTaskBuilder() = + inherit AsyncBuilderAbstract() + member __.Run f : Task = Async.StartAsTask f :> _ + +/// Async builder variation that automatically runs top-level expression as task +let task = new TaskBuilder() + +/// Async builder variation that automatically runs top-level expression as untyped task +let utask = new UnitTaskBuilder() \ No newline at end of file diff --git a/tests/Confluent.Kafka.FSharp.Tests/paket.references b/tests/Confluent.Kafka.FSharp.Tests/paket.references new file mode 100644 index 00000000..9d949d3d --- /dev/null +++ b/tests/Confluent.Kafka.FSharp.Tests/paket.references @@ -0,0 +1,6 @@ +FSharp.Core +System.ValueTuple +NUnit +NUnit3TestAdapter +Newtonsoft.Json +Microsoft.NET.Test.Sdk \ No newline at end of file