Skip to content

Commit

Permalink
Remove halboy and vent from tests
Browse files Browse the repository at this point in the history
to simplify them
  • Loading branch information
phutchin committed Jan 25, 2024
1 parent 680c233 commit 6b90666
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 64 deletions.
4 changes: 1 addition & 3 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
[org.apache.curator/curator-test "5.0.0"]
[com.impossibl.pgjdbc-ng/pgjdbc-ng "0.8.4"]
[hikari-cp "2.12.0"]
[b-social/jason "0.1.5"]
[b-social/vent "0.6.5"]
[halboy "5.1.0"]]
[b-social/jason "0.1.5"]]
:eftest {:multithread? false}}}

:cloverage
Expand Down
23 changes: 9 additions & 14 deletions test/kafka_event_processor/processing_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
[kafka-event-processor.test-support.postgres.database :as database]
[kafka-event-processor.test-support.system :as system]
[kafka-event-processor.test-support.conditional-execution :refer [do-until]]
[halboy.resource :as hal]
[kafka-event-processor.utils.generators :as generators]
[halboy.json :as hal-json]
[jason.convenience :refer [->wire-json]]
[kafka-event-processor.test-support.kafka.producer :as producer])
(:import [kafka_event_processor.processor.protocols EventHandler]))

Expand All @@ -26,32 +23,30 @@
^EventHandler event-handler (:event-handler @test-system)
event-id (generators/uuid)
message "I am an event"
event (-> (hal/new-resource)
(hal/add-href :self (str "http://localhost/event/" event-id))
(hal/add-property :id event-id)
(hal/add-property :message message)
(hal/add-property :type :test)
hal-json/resource->map
->wire-json)]
event {:id event-id
:message message
:type :test}]
(producer/publish-messages kafka "test"
[(producer/create-message event)])

(let [read-events (do-until
(fn [] @events-atom)
{:matcher #(= 1 (count %))
:timeout 60000})
event (first read-events)]
event (first read-events)
event-properties (:payload event)]
(is (= 1 (count read-events)))
(is (= event-id (:id event)))
(is (= message (:message event))))
(is (= event-id (:id event-properties)))
(is (= message (:message event-properties)))
(is (= (:topic event) "test")))

(let [read-cursors (do-until
(fn [] @(.atom event-handler))
{:matcher #(= 1 (count %))
:timeout 60000})
cursor (first read-cursors)]
(is (= 1 (count read-cursors)))
(is (= event-id (:event-id cursor)))
(is (= event-id (get-in cursor [:payload :id])))
(is (= "test" (:topic cursor)))
(is (= :main (:processor cursor)))
(is (int? (:partition cursor))))))))
8 changes: 2 additions & 6 deletions test/kafka_event_processor/test_support/kafka/producer.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(ns kafka-event-processor.test-support.kafka.producer
(:require
[halboy.resource :as hal]
[halboy.json :as hal-json]
[jason.convenience :as json]

[kafka-event-processor.utils.properties :as properties])
(:import
Expand All @@ -21,10 +20,7 @@
overrides)))

(defn create-message [payload]
(-> (hal/new-resource)
(hal/add-property :schema nil)
(hal/add-property :payload payload)
(hal-json/resource->json)))
(json/->wire-json payload))

(defn publish-messages
[{:keys [broker-host broker-port]} topic messages]
Expand Down
47 changes: 6 additions & 41 deletions test/kafka_event_processor/test_support/system.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@
[kafka-event-processor.test-support.database :as db]
[kafka-event-processor.kafka.system :as kafka-system]
[kafka-event-processor.processor.system :as processors]
[vent.core :as vent]
[kafka-event-processor.test-support.kafka.combined :as kafka]
[freeport.core :refer [get-free-port!]]
[vent.hal :as vent-hal]
[halboy.resource :as hal]
[halboy.json :as hal-json]
[kafka-event-processor.processor.protocols :refer [EventHandler]]
[clojure.string :as string]))
[jason.convenience :as json]
[kafka-event-processor.processor.protocols :refer [EventHandler]]))

(defn with-system-lifecycle [system-atom]
(fn [f]
Expand All @@ -22,55 +17,25 @@
(finally
(reset! system-atom (component/stop-system @system-atom))))))

(defn test-act []
(vent/action [{:keys [id message atom]}]
(swap! atom conj {:id id :message message})))

(defn test-gather [channel-and-payload]
(vent/gatherer []
(let [event (:payload channel-and-payload)]
{:id (hal/get-property event :id)
:message (hal/get-property event :message)})))

(vent/defruleset all
(vent/options
:event-type-fn (vent-hal/event-type-property :type))
(vent/from-channel :test
(vent/on-type :test
(vent/gather test-gather)
(vent/act test-act))))

(defn- event-resource->id-from-href
[event-resource href]
(let [href (hal/get-href event-resource href)]
(when-not (nil? href) (last (string/split href #"/")))))

(defn- event-resource->id
"Get the id from the event resource"
[event-resource]
(event-resource->id-from-href event-resource :self))

(deftype AtomEventHandler
[atom]
EventHandler
(extract-payload
[this event]
(-> event
(hal-json/json->resource)
(hal/get-property :payload)
(hal-json/json->resource)))
(json/<-wire-json event))
(processable?
[this processor event event-context]
true)
(on-event
[this processor {:keys [topic payload]} _]
(vent/react-to all {:channel topic :payload payload} processor))
[this processor event _]
(swap! (:atom processor) conj event))
(on-complete
[this processor {:keys [topic partition payload]} {:keys [event-processor]}]
(swap! atom conj {:processor event-processor
:topic topic
:partition partition
:event-id (event-resource->id payload)})))
:payload payload})))

(defn new-system
([] (new-system {}))
Expand Down

0 comments on commit 6b90666

Please sign in to comment.