Skip to content

Commit

Permalink
consumer decorator support
Browse files Browse the repository at this point in the history
  • Loading branch information
idantavor committed Jul 23, 2024
1 parent 961820c commit a2f758e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
13 changes: 8 additions & 5 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,18 @@
close-out-chan? (:ketu.source/close-out-chan? opts)
^long close-consumer? (:ketu.source/close-consumer? opts)
consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts)
interceptor-fn (or (some-> (:ketu.source/consumer-interceptor opts)
(partial {:ketu.source/consumer consumer}))
identity)
decorator-fn (some-> (:ketu.source/consumer-decorator opts)
(partial {:ketu.source/consumer consumer}))

should-poll? (volatile! true)
abort-pending-put (async/chan)
done-putting (async/chan)

subscribe! (or (subscribe-fn opts) (assign-fn opts))
poll! (poll-fn consumer opts)
poll-impl (poll-fn consumer opts)
poll! (if (some? decorator-fn)
(partial decorator-fn poll-impl)
poll-impl)
->data (->data-fn opts)
put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put))

Expand All @@ -123,7 +126,7 @@
(subscribe! consumer)

(while @should-poll?
(let [records (interceptor-fn (poll!))]
(let [records (poll!)]
(run! put! records)))

(catch WakeupException e
Expand Down
2 changes: 1 addition & 1 deletion src/ketu/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
(s/def :ketu.source/close-out-chan? boolean?)
(s/def :ketu.source/close-consumer? boolean?)
(s/def :ketu.source/create-rebalance-listener-obj fn?)
(s/def :ketu.source/consumer-interceptor fn?)
(s/def :ketu.source/consumer-decorator fn?)
(s/def :ketu.source.assign/topic :ketu/topic)
(s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?))
(s/def :ketu.source/assign-single-topic-partitions
Expand Down
19 changes: 10 additions & 9 deletions test/ketu/async/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,20 @@
(source/stop! source)
(.close ^AdminClient admin-client)))))

(deftest consumer-interceptor
(deftest consumer-decorator
(let [consumer-chan (async/chan 10)
result-chan (async/chan 100)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-interceptor (fn [{_consumer :ketu.source/consumer} records]
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-decorator (fn [{_consumer :ketu.source/consumer} poll-fn]
(let [records (poll-fn)]
(doseq [^ConsumerRecord record records]
(async/>!! result-chan (String. ^"[B" (.value record))))
records)}
records))}
source (source/source consumer-chan clicks-consumer-opts)
clicks-producer-opts {:name "clicks-producer"
:brokers (kafka-setup/get-bootstrap-servers)
Expand Down

0 comments on commit a2f758e

Please sign in to comment.