Skip to content

Commit

Permalink
Consumer decorator protocol (#19)
Browse files Browse the repository at this point in the history
* Consumer decorator protocol
  • Loading branch information
idantavor authored Sep 5, 2024
1 parent 862c79f commit 156e37d
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 56 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [[2.0.0]] (https://github.com/AppsFlyer/ketu/pull/19) - 2024-09-03
### Changed
- consumer decorator API breaking change - use ConsumerDecorator protocol instead of `consumer-decorator` function.

## [[1.1.0]](https://github.com/AppsFlyer/ketu/pull/18) - 2024-07-29

### Added
Expand Down
30 changes: 17 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
A Clojure Apache Kafka client with core.async api

```clojure
[com.appsflyer/ketu "1.1.0"]
[com.appsflyer/ketu "2.0.0"]
```

## Features
Expand Down Expand Up @@ -78,12 +78,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options

| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> Iterable<ConsumerRecord>` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:<br/>consumer-context: {:ketu.source/consumer consumer}<br/>pool-fn: fn [] -> Iterable<ConsumerRecord> <br/>Returns an iterable collection of consumerRecord.<br/>The decorator should call the poll-fn on behalf of the consumer source.<br/> |
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) |

#### Producer-sink options

Expand Down Expand Up @@ -151,6 +150,7 @@ The decorator processes all immediately available commands in the commands-chan,
(ns consumer-decorator-example
(:require [clojure.core.async :as async]
[ketu.async.source :as source]
[ketu.decorators.consumer.protocol :refer [ConsumerDecorator]]
[ketu.async.sink :as sink]))

(let [commands-chan (async/chan 10)
Expand All @@ -161,12 +161,16 @@ The decorator processes all immediately available commands in the commands-chan,
:group-id "example"
:value-type :string
:shape :value
:ketu.source/consumer-decorator (fn [consumer-ctx poll-fn]
(loop []
(when-let [command (async/poll! commands-chan)]
(command consumer-ctx)
(recur)))
(poll-fn))}
:ketu.source/consumer-decorator (reify ConsumerDecorator
(poll! [consumer-ctx poll-fn]
(loop []
(when-let [command (async/poll! commands-chan)]
(command consumer-ctx)
(recur)))
(poll-fn))
(validate [this opts]
;custom validation logic of the consumer options can be added here
true))}
source (source/source consumer-chan consumer-opts)

producer-chan (async/chan 10)
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/ketu "1.1.0"
(defproject com.appsflyer/ketu "2.0.0-SNAPSHOT"
:description "Clojure Apache Kafka client with core.async api"
:url "https://github.com/AppsFlyer/ketu"
:license {:name "Apache License, Version 2.0"
Expand Down
9 changes: 3 additions & 6 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[ketu.clients.consumer :as consumer]
[ketu.shape.consumer :as shape]
[ketu.spec]
[ketu.decorators.consumer.decorator :as consumer-decorator]
[ketu.util.log :as log])
(:import (java.time Duration)
(org.apache.kafka.clients.consumer Consumer)
Expand Down Expand Up @@ -104,16 +105,12 @@
^long close-consumer? (:ketu.source/close-consumer? opts)
consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts)
should-poll? (volatile! true)
decorator-fn (some-> (:ketu.source/consumer-decorator opts)
(partial {:ketu.source/consumer consumer}))

abort-pending-put (async/chan)
done-putting (async/chan)

subscribe! (or (subscribe-fn opts) (assign-fn opts))
poll-impl (poll-fn consumer should-poll? opts)
poll! (if (some? decorator-fn)
(partial decorator-fn poll-impl)
poll! (if (some? (:ketu.source/consumer-decorator opts))
(consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts)
poll-impl)
->data (->data-fn opts)
put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put))
Expand Down
11 changes: 11 additions & 0 deletions src/ketu/decorators/consumer/decorator.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(ns ketu.decorators.consumer.decorator
(:require [ketu.decorators.consumer.protocol :as cdp]))

(defn- valid? [consumer-decorator consumer-opts]
(when (not (cdp/valid? consumer-decorator consumer-opts))
(throw (Exception. "Consumer decorator validation failed"))))

(defn decorate-poll-fn
[consumer-ctx poll-fn {:keys [ketu.source/consumer-decorator] :as consumer-opts}]
(valid? consumer-decorator consumer-opts)
#(cdp/poll! consumer-decorator consumer-ctx poll-fn))
17 changes: 17 additions & 0 deletions src/ketu/decorators/consumer/protocol.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
(ns ketu.decorators.consumer.protocol)

(defprotocol ConsumerDecorator
"Consumer decorator provides a way to extend the consumer source functionality.
The decorator runs in the context of the polling thread and allows custom control on the internal consumer instance"
(poll! [this consumer-ctx poll-fn]
"Decorates the internal consumer poll loop.
- Parameters:
- `consumer-ctx`: A map containing the consumer context, typically {:ketu.source/consumer consumer}.
- `poll-fn`: A function with no arguments that returns an Iterable of ConsumerRecord.
- Returns: An iterable collection of ConsumerRecord.
- The decorator should call the `poll-fn` on behalf of the consumer source.")
(valid? [this consumer-opts]
"Validates the consumer options according to the decorator logic.
- Parameters:
- `consumer-opts`: A map of consumer options to be validated.
- Returns: true if the consumer options are valid according to the decorator logic, false otherwise."))
7 changes: 4 additions & 3 deletions src/ketu/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
(:require [clojure.set]
[clojure.spec.alpha :as s]
[clojure.string]
[expound.alpha :as expound]
[clojure.core.async.impl.protocols])
[ketu.decorators.consumer.protocol]
[expound.alpha :as expound])
(:import (java.util.regex Pattern)
(ketu.decorators.consumer.protocol ConsumerDecorator)
(org.apache.kafka.clients.producer Callback)
(org.apache.kafka.common.serialization Deserializer Serializer)))

Expand All @@ -28,7 +29,7 @@
(s/def :ketu.source/close-out-chan? boolean?)
(s/def :ketu.source/close-consumer? boolean?)
(s/def :ketu.source/create-rebalance-listener-obj fn?)
(s/def :ketu.source/consumer-decorator fn?)
(s/def :ketu.source/consumer-decorator #(instance? ConsumerDecorator %))
(s/def :ketu.source.assign/topic :ketu/topic)
(s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?))
(s/def :ketu.source/assign-single-topic-partitions
Expand Down
86 changes: 53 additions & 33 deletions test/ketu/async/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[clojure.core.async :as async]
[ketu.clients.consumer :as consumer]
[ketu.clients.producer :as producer]
[ketu.decorators.consumer.protocol :refer [ConsumerDecorator]]
[ketu.async.source :as source]
[ketu.async.sink :as sink]
[ketu.test.kafka-setup :as kafka-setup])
Expand Down Expand Up @@ -213,37 +214,56 @@
(.close ^AdminClient admin-client)))))

(deftest consumer-decorator
(let [consumer-chan (async/chan 10)
result-chan (async/chan 100)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-decorator (fn [{_consumer :ketu.source/consumer} poll-fn]
(let [records (poll-fn)]
(doseq [^ConsumerRecord record records]
(async/>!! result-chan (String. ^"[B" (.value record))))
records))}
source (source/source consumer-chan clicks-consumer-opts)
clicks-producer-opts {:name "clicks-producer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:key-type :string
:internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}
:shape [:vector :key :value]}
producer-chan (async/chan 10)
sink (sink/sink producer-chan clicks-producer-opts)
input-values #{"1" "2" "3"}]
(try
(doseq [value input-values]
(async/>!! producer-chan ["1" value]))
(is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan)))))
(is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan)))))
(finally
(Thread/sleep 2000)
(source/stop! source)
(async/close! producer-chan)
(sink/stop! sink)))))
(testing "consumer decorator functionality"
(let [consumer-chan (async/chan 10)
result-chan (async/chan 100)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-decorator (reify ConsumerDecorator
(poll! [_ {_consumer :ketu.source/consumer} poll-fn]
(let [records (poll-fn)]
(doseq [^ConsumerRecord record records]
(async/>!! result-chan (String. ^"[B" (.value record))))
records))
(valid? [_ _]
true))}
source (source/source consumer-chan clicks-consumer-opts)
clicks-producer-opts {:name "clicks-producer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:key-type :string
:internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}
:shape [:vector :key :value]}
producer-chan (async/chan 10)
sink (sink/sink producer-chan clicks-producer-opts)
input-values #{"1" "2" "3"}]
(try
(doseq [value input-values]
(async/>!! producer-chan ["1" value]))
(is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan)))))
(is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan)))))
(finally
(Thread/sleep 2000)
(source/stop! source)
(async/close! producer-chan)
(sink/stop! sink))))

(testing "consumer decorator validation failure"
(let [consumer-chan (async/chan 10)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-decorator (reify ConsumerDecorator
(poll! [_ _ _]
nil)
(valid? [_ _]
false))}]
(is (thrown-with-msg? Exception #"Consumer decorator validation failed"
(source/source consumer-chan clicks-consumer-opts)))))))

0 comments on commit 156e37d

Please sign in to comment.