Skip to content

Commit

Permalink
suuport consumer interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
idantavor committed Jul 21, 2024
1 parent 613ecd3 commit 91697cc
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
6 changes: 4 additions & 2 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@
^long close-consumer? (:ketu.source/close-consumer? opts)
consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts)
commands-chan (:ketu.source/consumer-commands-chan opts)

interceptor-fn (or (some-> (:ketu.source/consumer-interceptor opts)
(partial consumer))
identity)
should-poll? (volatile! true)
abort-pending-put (async/chan)
done-putting (async/chan)
Expand All @@ -129,7 +131,7 @@

(while @should-poll?
(optionally-execute-commands!)
(let [records (poll!)]
(let [records (interceptor-fn (poll!))]
(run! put! records)))

(catch WakeupException e
Expand Down
5 changes: 3 additions & 2 deletions src/ketu/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
(s/def :ketu.source/close-consumer? boolean?)
(s/def :ketu.source/create-rebalance-listener-obj fn?)
(s/def :ketu.source/consumer-commands-chan #(extends? clojure.core.async.impl.protocols/ReadPort (type %)))

(s/def :ketu.source/consumer-interceptor fn?)
(s/def :ketu.source.assign/topic :ketu/topic)
(s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?))
(s/def :ketu.source/assign-single-topic-partitions
Expand Down Expand Up @@ -79,7 +79,8 @@
:ketu.source/consumer-thread-timeout-ms
:ketu.source/close-out-chan?
:ketu.source/close-consumer?
:ketu.source/consumer-commands-chan]))
:ketu.source/consumer-commands-chan
:ketu.source/consumer-interceptors]))

(s/def :ketu.apache.producer/config map?)
(s/def :ketu.sink/sender-threads-num pos-int?)
Expand Down
37 changes: 36 additions & 1 deletion test/ketu/async/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
(:import (java.time Duration)
(org.apache.kafka.common PartitionInfo TopicPartition)
(org.apache.kafka.clients.producer RecordMetadata)
(org.apache.kafka.clients.consumer Consumer)
(org.apache.kafka.clients.consumer Consumer ConsumerRecord)
(org.apache.kafka.clients.admin AdminClient NewTopic)
(java.util.concurrent TimeUnit)))

Expand Down Expand Up @@ -246,3 +246,38 @@
(finally
(Thread/sleep 2000)
(source/stop! source)))))

(deftest consumer-interceptor
(let [consumer-chan (async/chan 10)
result-chan (async/chan 100)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-interceptor (fn [_consumer records]
(doseq [^ConsumerRecord record records]
(async/>!! result-chan (String. ^"[B" (.value record))))
records)}
source (source/source consumer-chan clicks-consumer-opts)
clicks-producer-opts {:name "clicks-producer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:key-type :string
:internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}
:shape [:vector :key :value]}
producer-chan (async/chan 10)
sink (sink/sink producer-chan clicks-producer-opts)
input-values #{"1" "2" "3"}]
(try
(doseq [value input-values]
(async/>!! producer-chan ["1" value]))
(is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan)))))
(is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan)))))
(finally
(Thread/sleep 2000)
(source/stop! source)
(async/close! producer-chan)
(sink/stop! sink)))))

0 comments on commit 91697cc

Please sign in to comment.