From a2f758eabd2367b154c7dce0b087fa4a5a774c54 Mon Sep 17 00:00:00 2001 From: Idan Tavor Date: Tue, 23 Jul 2024 17:18:43 +0300 Subject: [PATCH] consumer decorator support --- src/ketu/async/source.clj | 13 ++++++++----- src/ketu/spec.clj | 2 +- test/ketu/async/integration_test.clj | 19 ++++++++++--------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 3af56d4..55fc6b9 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -102,15 +102,18 @@ close-out-chan? (:ketu.source/close-out-chan? opts) ^long close-consumer? (:ketu.source/close-consumer? opts) consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts) - interceptor-fn (or (some-> (:ketu.source/consumer-interceptor opts) - (partial {:ketu.source/consumer consumer})) - identity) + decorator-fn (some-> (:ketu.source/consumer-decorator opts) + (partial {:ketu.source/consumer consumer})) + should-poll? (volatile! true) abort-pending-put (async/chan) done-putting (async/chan) subscribe! (or (subscribe-fn opts) (assign-fn opts)) - poll! (poll-fn consumer opts) + poll-impl (poll-fn consumer opts) + poll! (if (some? decorator-fn) + (partial decorator-fn poll-impl) + poll-impl) ->data (->data-fn opts) put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) @@ -123,7 +126,7 @@ (subscribe! consumer) (while @should-poll? - (let [records (interceptor-fn (poll!))] + (let [records (poll!)] (run! put! records))) (catch WakeupException e diff --git a/src/ketu/spec.clj b/src/ketu/spec.clj index fc31d8c..4cfce42 100644 --- a/src/ketu/spec.clj +++ b/src/ketu/spec.clj @@ -28,7 +28,7 @@ (s/def :ketu.source/close-out-chan? boolean?) (s/def :ketu.source/close-consumer? boolean?) (s/def :ketu.source/create-rebalance-listener-obj fn?) -(s/def :ketu.source/consumer-interceptor fn?) +(s/def :ketu.source/consumer-decorator fn?) (s/def :ketu.source.assign/topic :ketu/topic) (s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?)) (s/def :ketu.source/assign-single-topic-partitions diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index 067b952..bec34a5 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -212,19 +212,20 @@ (source/stop! source) (.close ^AdminClient admin-client))))) -(deftest consumer-interceptor +(deftest consumer-decorator (let [consumer-chan (async/chan 10) result-chan (async/chan 100) - clicks-consumer-opts {:name "clicks-consumer" - :brokers (kafka-setup/get-bootstrap-servers) - :topic "clicks" - :group-id "clicks-test-consumer" - :auto-offset-reset "earliest" - :shape :value - :ketu.source/consumer-interceptor (fn [{_consumer :ketu.source/consumer} records] + clicks-consumer-opts {:name "clicks-consumer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :group-id "clicks-test-consumer" + :auto-offset-reset "earliest" + :shape :value + :ketu.source/consumer-decorator (fn [{_consumer :ketu.source/consumer} poll-fn] + (let [records (poll-fn)] (doseq [^ConsumerRecord record records] (async/>!! result-chan (String. ^"[B" (.value record)))) - records)} + records))} source (source/source consumer-chan clicks-consumer-opts) clicks-producer-opts {:name "clicks-producer" :brokers (kafka-setup/get-bootstrap-servers)