Skip to content

Commit

Permalink
Allow prefetch_count when consuming a channel.
Browse files Browse the repository at this point in the history
When starting up a consumer process this adds support for setting a
prefetch on the channel.

For example:

    children = [
      {EchoConsumer, [pool_id: :consumers_pool, queue: "echo_queue", options: [prefetch_count: 1]]},
  • Loading branch information
jdl committed May 31, 2019
1 parent 7a1c698 commit ce03aa3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
10 changes: 9 additions & 1 deletion lib/clients/rabbitmq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ defmodule ExRabbitPool.RabbitMQ do
end

@impl true
def consume(%Channel{} = channel, queue, consumer_pid \\ nil, options \\ []) do
def consume(channel, queue, consumer_pid \\ nil, options \\ [])

def consume(%Channel{} = channel, queue, consumer_pid, [prefetch_count: prefetch_count] = options) do
Logger.warn("[ExRabbitPool.RabbitMQ.consume] queue: #{inspect queue} setting prefetch_count to #{prefetch_count}")
:ok = Basic.qos(channel, prefetch_count: prefetch_count)
Basic.consume(channel, queue, consumer_pid, options)
end

def consume(%Channel{} = channel, queue, consumer_pid, options) do
Basic.consume(channel, queue, consumer_pid, options)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ defmodule ExRabbitPool.Consumer do
# process and monitors it handle crashes and reconnections
defp handle_channel_checkout(
{:ok, %{pid: channel_pid} = channel},
%{config: config, queue: queue, adapter: adapter, config: config} = state
%{config: config, queue: queue, adapter: adapter} = state
) do
config = Keyword.get(config, :options, [])

Expand Down
23 changes: 14 additions & 9 deletions test/integration/consumer_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule ExRabbitPool.ConsumerTest do
use ExUnit.Case, async: false

import ExUnit.CaptureLog
alias ExRabbitPool.Worker.SetupQueue
alias ExRabbitPool.RabbitMQ
alias AMQP.Queue
Expand Down Expand Up @@ -86,15 +87,19 @@ defmodule ExRabbitPool.ConsumerTest do
end

test "should be able to consume messages out of rabbitmq", %{pool_id: pool_id, queue: queue} do
pid = start_supervised!({TestConsumer, pool_id: pool_id, queue: queue})
:erlang.trace(pid, true, [:receive])

ExRabbitPool.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer!")
assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer!", _}}, 1000
{:ok, result} = Queue.status(channel, queue)
assert result == %{consumer_count: 1, message_count: 0, queue: queue}
end)
logs =
capture_log(fn ->
pid = start_supervised!({TestConsumer, pool_id: pool_id, queue: queue, options: [prefetch_count: 19]})
:erlang.trace(pid, true, [:receive])

ExRabbitPool.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer!")
assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer!", _}}, 1000
{:ok, result} = Queue.status(channel, queue)
assert result == %{consumer_count: 1, message_count: 0, queue: queue}
end)
end)
assert logs =~ "[ExRabbitPool.RabbitMQ.consume] queue: #{inspect queue} setting prefetch_count to 19"
end

test "should be able to consume messages out of rabbitmq with default consumer", %{
Expand Down

0 comments on commit ce03aa3

Please sign in to comment.