diff --git a/lib/racecar/message.rb b/lib/racecar/message.rb new file mode 100644 index 00000000..500aa209 --- /dev/null +++ b/lib/racecar/message.rb @@ -0,0 +1,21 @@ +require "forwardable" + +module Racecar + class Message + extend Forwardable + + def initialize(rdkafka_message) + @rdkafka_message = rdkafka_message + end + + def_delegators :@rdkafka_message, :topic, :partition, :offset, :key, :value, :headers + + def create_time + @rdkafka_message.timestamp + end + + def ==(other) + @rdkafka_message == other.instance_variable_get(:@rdkafka_message) + end + end +end diff --git a/lib/racecar/runner.rb b/lib/racecar/runner.rb index 6fbb6d71..235160bb 100644 --- a/lib/racecar/runner.rb +++ b/lib/racecar/runner.rb @@ -1,5 +1,6 @@ require "rdkafka" require "racecar/pause" +require "racecar/message" module Racecar class Runner @@ -154,7 +155,7 @@ def process(message) @instrumenter.instrument("process_message.racecar", payload) do with_pause(message.topic, message.partition, message.offset..message.offset) do - processor.process(message) + processor.process(Racecar::Message.new(message)) processor.deliver! consumer.store_offset(message) end @@ -173,7 +174,7 @@ def process_batch(messages) @instrumenter.instrument("process_batch.racecar", payload) do first, last = messages.first, messages.last with_pause(first.topic, first.partition, first.offset..last.offset) do - processor.process_batch(messages) + processor.process_batch(messages.map {|message| Racecar::Message.new(message) }) processor.deliver! consumer.store_offset(messages.last) end diff --git a/spec/runner_spec.rb b/spec/runner_spec.rb index 4869506e..e5ed6b24 100644 --- a/spec/runner_spec.rb +++ b/spec/runner_spec.rb @@ -439,8 +439,8 @@ def producer(*) topic: "greetings" ) - expect(processor).to receive(:process_batch).with([kafka.received_messages["greetings"][0]]) - expect(processor).to receive(:process_batch).with(kafka.received_messages["greetings"][1, 2]) + expect(processor).to receive(:process_batch).with([Racecar::Message.new(kafka.received_messages["greetings"][0])]) + expect(processor).to receive(:process_batch).with(kafka.received_messages["greetings"][1, 2].map {|message| Racecar::Message.new(message) }) runner.run end