From fcee23b0db5a63db49ce893782fbfcea50f501d3 Mon Sep 17 00:00:00 2001 From: Roberto Miranda Date: Fri, 26 Apr 2024 13:52:18 +0100 Subject: [PATCH] Add support to deliver messages in batches --- lib/streamy/dispatcher.rb | 4 ++++ lib/streamy/event.rb | 4 ++++ lib/streamy/message_buses/kafka_message_bus.rb | 4 ++++ lib/streamy/message_buses/message_bus.rb | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/lib/streamy/dispatcher.rb b/lib/streamy/dispatcher.rb index d6dbbd8..980402c 100644 --- a/lib/streamy/dispatcher.rb +++ b/lib/streamy/dispatcher.rb @@ -8,6 +8,10 @@ def dispatch Streamy.message_bus.deliver(**message_params) end + def self.dispatch_many(events) + Streamy.message_bus.deliver_many(events.map(&:to_message)) + end + private attr_reader :event diff --git a/lib/streamy/event.rb b/lib/streamy/event.rb index 1082d24..eaa20e3 100644 --- a/lib/streamy/event.rb +++ b/lib/streamy/event.rb @@ -16,6 +16,10 @@ def self.publish(**args) new(**args).publish end + def self.publish_many(events) + Streamy.dispatcher.dispatch_many(events) + end + priority :standard def publish diff --git a/lib/streamy/message_buses/kafka_message_bus.rb b/lib/streamy/message_buses/kafka_message_bus.rb index 2936b6b..dcc8bd1 100644 --- a/lib/streamy/message_buses/kafka_message_bus.rb +++ b/lib/streamy/message_buses/kafka_message_bus.rb @@ -23,6 +23,10 @@ def deliver(key:, topic:, payload:, priority:) end end + def deliver_many(messages) + sync_producer.produce_many_sync(messages.except(:priority)) + end + def shutdown async_producer.close if async_producer? sync_producers.map(&:close) diff --git a/lib/streamy/message_buses/message_bus.rb b/lib/streamy/message_buses/message_bus.rb index c45faec..165289f 100644 --- a/lib/streamy/message_buses/message_bus.rb +++ b/lib/streamy/message_buses/message_bus.rb @@ -4,6 +4,10 @@ class MessageBus def deliver(key:, topic:, payload:, priority:) # NOOP: Implement delivery logic end + + def deliver_many(messages) + # NOOP: Implement delivery logic + end end end end