diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fe0331..9873af7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [[2.0.0]] (https://github.com/AppsFlyer/ketu/pull/19) - 2024-09-03 +### Changed +- consumer decorator API breaking change - use ConsumerDecorator protocol instead of `consumer-decorator` function. + ## [[1.1.0]](https://github.com/AppsFlyer/ketu/pull/18) - 2024-07-29 ### Added diff --git a/README.md b/README.md index 8899605..4313097 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A Clojure Apache Kafka client with core.async api ```clojure -[com.appsflyer/ketu "1.1.0"] +[com.appsflyer/ketu "2.0.0"] ``` ## Features @@ -78,12 +78,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options - -| Key | Type | Req? | Notes | -|---------------------------------|-----------------------------------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> Iterable` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> Iterable
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| +| Key | Type | Req? | Notes | +|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | #### Producer-sink options @@ -151,6 +150,7 @@ The decorator processes all immediately available commands in the commands-chan, (ns consumer-decorator-example (:require [clojure.core.async :as async] [ketu.async.source :as source] + [ketu.decorators.consumer.protocol :refer [ConsumerDecorator]] [ketu.async.sink :as sink])) (let [commands-chan (async/chan 10) @@ -161,12 +161,16 @@ The decorator processes all immediately available commands in the commands-chan, :group-id "example" :value-type :string :shape :value - :ketu.source/consumer-decorator (fn [consumer-ctx poll-fn] - (loop [] - (when-let [command (async/poll! commands-chan)] - (command consumer-ctx) - (recur))) - (poll-fn))} + :ketu.source/consumer-decorator (reify ConsumerDecorator + (poll! [consumer-ctx poll-fn] + (loop [] + (when-let [command (async/poll! commands-chan)] + (command consumer-ctx) + (recur))) + (poll-fn)) + (validate [this opts] + ;custom validation logic of the consumer options can be added here + true))} source (source/source consumer-chan consumer-opts) producer-chan (async/chan 10) diff --git a/project.clj b/project.clj index 65604aa..439b94f 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/ketu "1.1.0" +(defproject com.appsflyer/ketu "2.0.0-SNAPSHOT" :description "Clojure Apache Kafka client with core.async api" :url "https://github.com/AppsFlyer/ketu" :license {:name "Apache License, Version 2.0" diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 5918b6b..051db46 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -4,6 +4,7 @@ [ketu.clients.consumer :as consumer] [ketu.shape.consumer :as shape] [ketu.spec] + [ketu.decorators.consumer.decorator :as consumer-decorator] [ketu.util.log :as log]) (:import (java.time Duration) (org.apache.kafka.clients.consumer Consumer) @@ -104,16 +105,12 @@ ^long close-consumer? (:ketu.source/close-consumer? opts) consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts) should-poll? (volatile! true) - decorator-fn (some-> (:ketu.source/consumer-decorator opts) - (partial {:ketu.source/consumer consumer})) - abort-pending-put (async/chan) done-putting (async/chan) - subscribe! (or (subscribe-fn opts) (assign-fn opts)) poll-impl (poll-fn consumer should-poll? opts) - poll! (if (some? decorator-fn) - (partial decorator-fn poll-impl) + poll! (if (some? (:ketu.source/consumer-decorator opts)) + (consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts) poll-impl) ->data (->data-fn opts) put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) diff --git a/src/ketu/decorators/consumer/decorator.clj b/src/ketu/decorators/consumer/decorator.clj new file mode 100644 index 0000000..25e8cff --- /dev/null +++ b/src/ketu/decorators/consumer/decorator.clj @@ -0,0 +1,11 @@ +(ns ketu.decorators.consumer.decorator + (:require [ketu.decorators.consumer.protocol :as cdp])) + +(defn- valid? [consumer-decorator consumer-opts] + (when (not (cdp/valid? consumer-decorator consumer-opts)) + (throw (Exception. "Consumer decorator validation failed")))) + +(defn decorate-poll-fn + [consumer-ctx poll-fn {:keys [ketu.source/consumer-decorator] :as consumer-opts}] + (valid? consumer-decorator consumer-opts) + #(cdp/poll! consumer-decorator consumer-ctx poll-fn)) diff --git a/src/ketu/decorators/consumer/protocol.clj b/src/ketu/decorators/consumer/protocol.clj new file mode 100644 index 0000000..95aad2e --- /dev/null +++ b/src/ketu/decorators/consumer/protocol.clj @@ -0,0 +1,17 @@ +(ns ketu.decorators.consumer.protocol) + +(defprotocol ConsumerDecorator + "Consumer decorator provides a way to extend the consumer source functionality. + The decorator runs in the context of the polling thread and allows custom control on the internal consumer instance" + (poll! [this consumer-ctx poll-fn] + "Decorates the internal consumer poll loop. + - Parameters: + - `consumer-ctx`: A map containing the consumer context, typically {:ketu.source/consumer consumer}. + - `poll-fn`: A function with no arguments that returns an Iterable of ConsumerRecord. + - Returns: An iterable collection of ConsumerRecord. + - The decorator should call the `poll-fn` on behalf of the consumer source.") + (valid? [this consumer-opts] + "Validates the consumer options according to the decorator logic. + - Parameters: + - `consumer-opts`: A map of consumer options to be validated. + - Returns: true if the consumer options are valid according to the decorator logic, false otherwise.")) diff --git a/src/ketu/spec.clj b/src/ketu/spec.clj index abe78a3..2311364 100644 --- a/src/ketu/spec.clj +++ b/src/ketu/spec.clj @@ -2,9 +2,10 @@ (:require [clojure.set] [clojure.spec.alpha :as s] [clojure.string] - [expound.alpha :as expound] - [clojure.core.async.impl.protocols]) + [ketu.decorators.consumer.protocol] + [expound.alpha :as expound]) (:import (java.util.regex Pattern) + (ketu.decorators.consumer.protocol ConsumerDecorator) (org.apache.kafka.clients.producer Callback) (org.apache.kafka.common.serialization Deserializer Serializer))) @@ -28,7 +29,7 @@ (s/def :ketu.source/close-out-chan? boolean?) (s/def :ketu.source/close-consumer? boolean?) (s/def :ketu.source/create-rebalance-listener-obj fn?) -(s/def :ketu.source/consumer-decorator fn?) +(s/def :ketu.source/consumer-decorator #(instance? ConsumerDecorator %)) (s/def :ketu.source.assign/topic :ketu/topic) (s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?)) (s/def :ketu.source/assign-single-topic-partitions diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index bec34a5..99f294e 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -6,6 +6,7 @@ [clojure.core.async :as async] [ketu.clients.consumer :as consumer] [ketu.clients.producer :as producer] + [ketu.decorators.consumer.protocol :refer [ConsumerDecorator]] [ketu.async.source :as source] [ketu.async.sink :as sink] [ketu.test.kafka-setup :as kafka-setup]) @@ -213,37 +214,56 @@ (.close ^AdminClient admin-client))))) (deftest consumer-decorator - (let [consumer-chan (async/chan 10) - result-chan (async/chan 100) - clicks-consumer-opts {:name "clicks-consumer" - :brokers (kafka-setup/get-bootstrap-servers) - :topic "clicks" - :group-id "clicks-test-consumer" - :auto-offset-reset "earliest" - :shape :value - :ketu.source/consumer-decorator (fn [{_consumer :ketu.source/consumer} poll-fn] - (let [records (poll-fn)] - (doseq [^ConsumerRecord record records] - (async/>!! result-chan (String. ^"[B" (.value record)))) - records))} - source (source/source consumer-chan clicks-consumer-opts) - clicks-producer-opts {:name "clicks-producer" - :brokers (kafka-setup/get-bootstrap-servers) - :topic "clicks" - :key-type :string - :internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"} - :shape [:vector :key :value]} - producer-chan (async/chan 10) - sink (sink/sink producer-chan clicks-producer-opts) - input-values #{"1" "2" "3"}] - (try - (doseq [value input-values] - (async/>!! producer-chan ["1" value])) - (is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan))))) - (is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan))))) - (finally - (Thread/sleep 2000) - (source/stop! source) - (async/close! producer-chan) - (sink/stop! sink))))) + (testing "consumer decorator functionality" + (let [consumer-chan (async/chan 10) + result-chan (async/chan 100) + clicks-consumer-opts {:name "clicks-consumer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :group-id "clicks-test-consumer" + :auto-offset-reset "earliest" + :shape :value + :ketu.source/consumer-decorator (reify ConsumerDecorator + (poll! [_ {_consumer :ketu.source/consumer} poll-fn] + (let [records (poll-fn)] + (doseq [^ConsumerRecord record records] + (async/>!! result-chan (String. ^"[B" (.value record)))) + records)) + (valid? [_ _] + true))} + source (source/source consumer-chan clicks-consumer-opts) + clicks-producer-opts {:name "clicks-producer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :key-type :string + :internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"} + :shape [:vector :key :value]} + producer-chan (async/chan 10) + sink (sink/sink producer-chan clicks-producer-opts) + input-values #{"1" "2" "3"}] + (try + (doseq [value input-values] + (async/>!! producer-chan ["1" value])) + (is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan))))) + (is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan))))) + (finally + (Thread/sleep 2000) + (source/stop! source) + (async/close! producer-chan) + (sink/stop! sink)))) + (testing "consumer decorator validation failure" + (let [consumer-chan (async/chan 10) + clicks-consumer-opts {:name "clicks-consumer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :group-id "clicks-test-consumer" + :auto-offset-reset "earliest" + :shape :value + :ketu.source/consumer-decorator (reify ConsumerDecorator + (poll! [_ _ _] + nil) + (valid? [_ _] + false))}] + (is (thrown-with-msg? Exception #"Consumer decorator validation failed" + (source/source consumer-chan clicks-consumer-opts)))))))