Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer decorator protocol #19

Merged
merged 6 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [[2.0.0]] (https://github.com/AppsFlyer/ketu/pull/19) - 2024-09-03
### Changed
- consumer decorator API breaking change - use ConsumerDecorator protocol instead of `consumer-decorator` function.

## [[1.1.0]](https://github.com/AppsFlyer/ketu/pull/18) - 2024-07-29

### Added
Expand Down
30 changes: 17 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
A Clojure Apache Kafka client with core.async api

```clojure
[com.appsflyer/ketu "1.1.0"]
[com.appsflyer/ketu "2.0.0"]
```

## Features
Expand Down Expand Up @@ -78,12 +78,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options

| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> Iterable<ConsumerRecord>` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:<br/>consumer-context: {:ketu.source/consumer consumer}<br/>pool-fn: fn [] -> Iterable<ConsumerRecord> <br/>Returns an iterable collection of consumerRecord.<br/>The decorator should call the poll-fn on behalf of the consumer source.<br/> |
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) |

#### Producer-sink options

Expand Down Expand Up @@ -151,6 +150,7 @@ The decorator processes all immediately available commands in the commands-chan,
(ns consumer-decorator-example
(:require [clojure.core.async :as async]
[ketu.async.source :as source]
[ketu.decorators.consumer.protocol :refer [ConsumerDecorator]]
[ketu.async.sink :as sink]))

(let [commands-chan (async/chan 10)
Expand All @@ -161,12 +161,16 @@ The decorator processes all immediately available commands in the commands-chan,
:group-id "example"
:value-type :string
:shape :value
:ketu.source/consumer-decorator (fn [consumer-ctx poll-fn]
(loop []
(when-let [command (async/poll! commands-chan)]
(command consumer-ctx)
(recur)))
(poll-fn))}
:ketu.source/consumer-decorator (reify ConsumerDecorator
(poll! [consumer-ctx poll-fn]
(loop []
(when-let [command (async/poll! commands-chan)]
(command consumer-ctx)
(recur)))
(poll-fn))
(validate [this opts]
;custom validation logic of the consumer options can be added here
true))}
source (source/source consumer-chan consumer-opts)

producer-chan (async/chan 10)
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/ketu "1.1.0"
(defproject com.appsflyer/ketu "2.0.0-SNAPSHOT"
:description "Clojure Apache Kafka client with core.async api"
:url "https://github.com/AppsFlyer/ketu"
:license {:name "Apache License, Version 2.0"
Expand Down
9 changes: 3 additions & 6 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[ketu.clients.consumer :as consumer]
[ketu.shape.consumer :as shape]
[ketu.spec]
[ketu.decorators.consumer.decorator :as consumer-decorator]
[ketu.util.log :as log])
(:import (java.time Duration)
(org.apache.kafka.clients.consumer Consumer)
Expand Down Expand Up @@ -104,16 +105,12 @@
^long close-consumer? (:ketu.source/close-consumer? opts)
consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts)
should-poll? (volatile! true)
decorator-fn (some-> (:ketu.source/consumer-decorator opts)
(partial {:ketu.source/consumer consumer}))

abort-pending-put (async/chan)
done-putting (async/chan)

subscribe! (or (subscribe-fn opts) (assign-fn opts))
poll-impl (poll-fn consumer should-poll? opts)
poll! (if (some? decorator-fn)
(partial decorator-fn poll-impl)
poll! (if (some? (:ketu.source/consumer-decorator opts))
(consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts)
poll-impl)
->data (->data-fn opts)
put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put))
Expand Down
11 changes: 11 additions & 0 deletions src/ketu/decorators/consumer/decorator.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(ns ketu.decorators.consumer.decorator
(:require [ketu.decorators.consumer.protocol :as cdp]))

(defn- valid? [consumer-decorator consumer-opts]
(when (not (cdp/valid? consumer-decorator consumer-opts))
(throw (Exception. "Consumer decorator validation failed"))))

(defn decorate-poll-fn
[consumer-ctx poll-fn {:keys [ketu.source/consumer-decorator] :as consumer-opts}]
(valid? consumer-decorator consumer-opts)
#(cdp/poll! consumer-decorator consumer-ctx poll-fn))
17 changes: 17 additions & 0 deletions src/ketu/decorators/consumer/protocol.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
(ns ketu.decorators.consumer.protocol)

(defprotocol ConsumerDecorator
"Consumer decorator provides a way to extend the consumer source functionality.
The decorator runs in the context of the polling thread and allows custom control on the internal consumer instance"
(poll! [this consumer-ctx poll-fn]
"Decorates the internal consumer poll loop.
- Parameters:
- `consumer-ctx`: A map containing the consumer context, typically {:ketu.source/consumer consumer}.
- `poll-fn`: A function with no arguments that returns an Iterable of ConsumerRecord.
- Returns: An iterable collection of ConsumerRecord.
- The decorator should call the `poll-fn` on behalf of the consumer source.")
(valid? [this consumer-opts]
"Validates the consumer options according to the decorator logic.
- Parameters:
- `consumer-opts`: A map of consumer options to be validated.
- Returns: true if the consumer options are valid according to the decorator logic, false otherwise."))
7 changes: 4 additions & 3 deletions src/ketu/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
(:require [clojure.set]
[clojure.spec.alpha :as s]
[clojure.string]
[expound.alpha :as expound]
[clojure.core.async.impl.protocols])
[ketu.decorators.consumer.protocol]
[expound.alpha :as expound])
(:import (java.util.regex Pattern)
(ketu.decorators.consumer.protocol ConsumerDecorator)
(org.apache.kafka.clients.producer Callback)
(org.apache.kafka.common.serialization Deserializer Serializer)))

Expand All @@ -28,7 +29,7 @@
(s/def :ketu.source/close-out-chan? boolean?)
(s/def :ketu.source/close-consumer? boolean?)
(s/def :ketu.source/create-rebalance-listener-obj fn?)
(s/def :ketu.source/consumer-decorator fn?)
(s/def :ketu.source/consumer-decorator #(instance? ConsumerDecorator %))
(s/def :ketu.source.assign/topic :ketu/topic)
(s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?))
(s/def :ketu.source/assign-single-topic-partitions
Expand Down
86 changes: 53 additions & 33 deletions test/ketu/async/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[clojure.core.async :as async]
[ketu.clients.consumer :as consumer]
[ketu.clients.producer :as producer]
[ketu.decorators.consumer.protocol :refer [ConsumerDecorator]]
idantavor marked this conversation as resolved.
Show resolved Hide resolved
[ketu.async.source :as source]
[ketu.async.sink :as sink]
[ketu.test.kafka-setup :as kafka-setup])
Expand Down Expand Up @@ -213,37 +214,56 @@
(.close ^AdminClient admin-client)))))

(deftest consumer-decorator
(let [consumer-chan (async/chan 10)
result-chan (async/chan 100)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-decorator (fn [{_consumer :ketu.source/consumer} poll-fn]
(let [records (poll-fn)]
(doseq [^ConsumerRecord record records]
(async/>!! result-chan (String. ^"[B" (.value record))))
records))}
source (source/source consumer-chan clicks-consumer-opts)
clicks-producer-opts {:name "clicks-producer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:key-type :string
:internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}
:shape [:vector :key :value]}
producer-chan (async/chan 10)
sink (sink/sink producer-chan clicks-producer-opts)
input-values #{"1" "2" "3"}]
(try
(doseq [value input-values]
(async/>!! producer-chan ["1" value]))
(is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan)))))
(is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan)))))
(finally
(Thread/sleep 2000)
(source/stop! source)
(async/close! producer-chan)
(sink/stop! sink)))))
(testing "consumer decorator functionality"
(let [consumer-chan (async/chan 10)
result-chan (async/chan 100)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-decorator (reify ConsumerDecorator
(poll! [_ {_consumer :ketu.source/consumer} poll-fn]
(let [records (poll-fn)]
(doseq [^ConsumerRecord record records]
(async/>!! result-chan (String. ^"[B" (.value record))))
records))
(valid? [_ _]
true))}
source (source/source consumer-chan clicks-consumer-opts)
clicks-producer-opts {:name "clicks-producer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:key-type :string
:internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}
:shape [:vector :key :value]}
producer-chan (async/chan 10)
sink (sink/sink producer-chan clicks-producer-opts)
input-values #{"1" "2" "3"}]
(try
(doseq [value input-values]
(async/>!! producer-chan ["1" value]))
(is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan)))))
(is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan)))))
(finally
(Thread/sleep 2000)
(source/stop! source)
(async/close! producer-chan)
(sink/stop! sink))))

(testing "consumer decorator validation failure"
(let [consumer-chan (async/chan 10)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-decorator (reify ConsumerDecorator
(poll! [_ _ _]
nil)
(valid? [_ _]
false))}]
(is (thrown-with-msg? Exception #"Consumer decorator validation failed"
(source/source consumer-chan clicks-consumer-opts)))))))
Loading