Skip to content

Commit

Permalink
Prefetch config including test (#31)
Browse files Browse the repository at this point in the history
Prefetch config including test
  • Loading branch information
sescobb27 authored Sep 26, 2019
2 parents 9bcd090 + 22d5a46 commit 08592c7
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
/cover
/deps
/doc
/log
/.fetch
erl_crash.dump
*.ez
*.beam
/config/*.secret.exs
*~
2 changes: 2 additions & 0 deletions lib/clients/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ defmodule ExRabbitPool.Clients.Adapter do
:ok | AMQP.Basic.error()
@callback declare_exchange(AMQP.Channel.t(), AMQP.Basic.exchange(), keyword()) ::
:ok | AMQP.Basic.error()
@callback qos(AMQP.Channel.t(), keyword()) ::
:ok | AMQP.Basic.error()
end
5 changes: 5 additions & 0 deletions lib/clients/fake_rabbitmq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ defmodule ExRabbitPool.FakeRabbitMQ do
def queue_bind(_channel, _queue, _exchange, _options \\ []) do
:ok
end

@impl true
def qos(_channel, _options \\ []) do
:ok
end
end
5 changes: 5 additions & 0 deletions lib/clients/rabbitmq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ defmodule ExRabbitPool.RabbitMQ do
{:error, error}
end
end

@impl true
def qos(channel, options \\ []) do
Basic.qos(channel, options)
end
end
14 changes: 7 additions & 7 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,11 @@ defmodule ExRabbitPool.Consumer do
{:ok, %{pid: channel_pid} = channel},
%{config: config, queue: queue, adapter: adapter} = state
) do
config = Keyword.get(config, :options, [])

case adapter.consume(channel, queue, self(), config) do
{:ok, consumer_tag} ->
ref = Process.monitor(channel_pid)
{:noreply, %State{state | channel: channel, monitor: ref, consumer_tag: consumer_tag}}

with :ok <- setup_channel(state, channel),
{:ok, consumer_tag} <- adapter.consume(channel, queue, self(), config) do
ref = Process.monitor(channel_pid)
{:noreply, %State{state | channel: channel, monitor: ref, consumer_tag: consumer_tag}}
else
{:error, reason} ->
schedule_connect(config)
{:noreply, %State{state | channel: nil, consumer_tag: nil}}
Expand All @@ -203,10 +201,12 @@ defmodule ExRabbitPool.Consumer do
:ok = adapter.ack(channel, tag)
end

def setup_channel(_state, _channel), do: :ok
def basic_consume_ok(_state, _consumer_tag), do: :ok
def basic_cancel(_state, _consumer_tag, _no_wait), do: :ok
def basic_cancel_ok(_state, _consumer_tag), do: :ok

defoverridable setup_channel: 2
defoverridable basic_deliver: 3
defoverridable basic_consume_ok: 2
defoverridable basic_cancel: 3
Expand Down
73 changes: 65 additions & 8 deletions test/integration/consumer_test.exs
Original file line number Diff line number Diff line change
@@ -1,20 +1,42 @@
defmodule ExRabbitPool.ConsumerTest do
use ExUnit.Case, async: false

import ExUnit.CaptureLog
alias ExRabbitPool.Worker.SetupQueue
alias ExRabbitPool.RabbitMQ
alias AMQP.Queue
require Logger

@moduletag :integration

defmodule TestConsumer do
use ExRabbitPool.Consumer

def setup_channel(%{adapter: adapter, config: config}, channel) do
config = Keyword.get(config, :options, [])
Logger.warn("Setting up channel with options: #{inspect(config)}")
adapter.qos(channel, config)
end

def basic_deliver(%{adapter: adapter, channel: channel}, _payload, %{delivery_tag: tag}) do
:ok = adapter.ack(channel, tag)
end
end

defmodule TestConsumerNoAck do
use ExRabbitPool.Consumer

def setup_channel(%{adapter: adapter, config: config}, channel) do
config = Keyword.get(config, :options, [])
Logger.warn("Setting up channel with options: #{inspect(config)}")
adapter.qos(channel, config)
end

def basic_deliver(_state, _payload, _meta) do
:ok
end
end

defmodule TestDefaultConsumer do
use ExRabbitPool.Consumer
end
Expand Down Expand Up @@ -86,15 +108,50 @@ defmodule ExRabbitPool.ConsumerTest do
end

test "should be able to consume messages out of rabbitmq", %{pool_id: pool_id, queue: queue} do
pid = start_supervised!({TestConsumer, pool_id: pool_id, queue: queue})
:erlang.trace(pid, true, [:receive])
logs =
capture_log(fn ->
pid =
start_supervised!(
{TestConsumer, pool_id: pool_id, queue: queue, options: [prefetch_count: 19]}
)

:erlang.trace(pid, true, [:receive])

ExRabbitPool.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer!")
assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer!", _}}, 1000
{:ok, result} = Queue.status(channel, queue)
assert result == %{consumer_count: 1, message_count: 0, queue: queue}
end)
end)

assert logs =~ "Setting up channel with options: [prefetch_count: 19]"
end

ExRabbitPool.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer!")
assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer!", _}}, 1000
{:ok, result} = Queue.status(channel, queue)
assert result == %{consumer_count: 1, message_count: 0, queue: queue}
end)
test "consumable messages should not exceed prefetch_count", %{pool_id: pool_id, queue: queue} do
logs =
capture_log(fn ->
pid =
start_supervised!(
{TestConsumerNoAck, pool_id: pool_id, queue: queue, options: [prefetch_count: 2]}
)

:erlang.trace(pid, true, [:receive])

ExRabbitPool.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer 1!")
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer 2!")
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer 3!")
assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer 1!", _}}, 1000
assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer 2!", _}}, 1000
refute_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer 3!", _}}, 1000

{:ok, result} = Queue.status(channel, queue)
assert result == %{consumer_count: 1, message_count: 1, queue: queue}
end)
end)

assert logs =~ "Setting up channel with options: [prefetch_count: 2]"
end

test "should be able to consume messages out of rabbitmq with default consumer", %{
Expand Down

0 comments on commit 08592c7

Please sign in to comment.