Skip to content

Commit

Permalink
Merge pull request #130 from zendesk/dasch-wrap-message
Browse files Browse the repository at this point in the history
Wrap the rdkafka message
  • Loading branch information
dasch authored Sep 18, 2019
2 parents 10ed857 + 1639637 commit 82b5eea
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
21 changes: 21 additions & 0 deletions lib/racecar/message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require "forwardable"

module Racecar
class Message
extend Forwardable

def initialize(rdkafka_message)
@rdkafka_message = rdkafka_message
end

def_delegators :@rdkafka_message, :topic, :partition, :offset, :key, :value, :headers

def create_time
@rdkafka_message.timestamp
end

def ==(other)
@rdkafka_message == other.instance_variable_get(:@rdkafka_message)
end
end
end
5 changes: 3 additions & 2 deletions lib/racecar/runner.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "rdkafka"
require "racecar/pause"
require "racecar/message"

module Racecar
class Runner
Expand Down Expand Up @@ -154,7 +155,7 @@ def process(message)

@instrumenter.instrument("process_message.racecar", payload) do
with_pause(message.topic, message.partition, message.offset..message.offset) do
processor.process(message)
processor.process(Racecar::Message.new(message))
processor.deliver!
consumer.store_offset(message)
end
Expand All @@ -173,7 +174,7 @@ def process_batch(messages)
@instrumenter.instrument("process_batch.racecar", payload) do
first, last = messages.first, messages.last
with_pause(first.topic, first.partition, first.offset..last.offset) do
processor.process_batch(messages)
processor.process_batch(messages.map {|message| Racecar::Message.new(message) })
processor.deliver!
consumer.store_offset(messages.last)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ def producer(*)
topic: "greetings"
)

expect(processor).to receive(:process_batch).with([kafka.received_messages["greetings"][0]])
expect(processor).to receive(:process_batch).with(kafka.received_messages["greetings"][1, 2])
expect(processor).to receive(:process_batch).with([Racecar::Message.new(kafka.received_messages["greetings"][0])])
expect(processor).to receive(:process_batch).with(kafka.received_messages["greetings"][1, 2].map {|message| Racecar::Message.new(message) })

runner.run
end
Expand Down

0 comments on commit 82b5eea

Please sign in to comment.