Skip to content

Commit

Permalink
Merge pull request #30 from hfiguera/fix-echo-consumer
Browse files Browse the repository at this point in the history
Fix echo consumer example
  • Loading branch information
sescobb27 authored Sep 23, 2019
2 parents 8b29898 + 4562b54 commit 9bcd090
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 12 deletions.
8 changes: 4 additions & 4 deletions examples/echo_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ defmodule Example.EchoConsumer do
################################

# Confirmation sent by the broker after registering this process as a consumer
def basic_consume_ok(_adapter, _channel, _consumer_tag) do
def basic_consume_ok(_state, _consumer_tag) do
Logger.info("[consumer] successfully registered as a consumer (basic_consume_ok)")
:ok
end

# This is sent for each message consumed, where `payload` contains the message
# content and `meta` contains all the metadata set when sending with
# Basic.publish or additional info set by the broker;
def basic_deliver(adapter, channel, payload, %{delivery_tag: delivery_tag}) do
def basic_deliver(%{adapter: adapter, channel: channel}, payload, %{delivery_tag: delivery_tag}) do
Logger.info("[consumer] consuming payload (#{inspect payload})")
:ok = adapter.ack(channel, delivery_tag, requeue: false)
:ok
end

# Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
def basic_cancel(_adapter, _channel, _consumer_tag, _no_wait) do
def basic_cancel(_state, _consumer_tag, _no_wait) do
Logger.error("[consumer] consumer was cancelled by the broker (basic_cancel)")
:ok
end

# Confirmation sent by the broker to the consumer process after a Basic.cancel
def basic_cancel_ok(_adapter, _channel, _consumer_tag) do
def basic_cancel_ok(_state, _consumer_tag) do
Logger.error("[consumer] consumer was cancelled by the broker (basic_cancel_ok)")
:ok
end
Expand Down
3 changes: 1 addition & 2 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ defmodule ExRabbitPool.Consumer do
# process and monitors it handle crashes and reconnections
defp handle_channel_checkout(
{:ok, %{pid: channel_pid} = channel},
%{config: config, queue: queue, adapter: adapter, config: config} = state
%{config: config, queue: queue, adapter: adapter} = state
) do
config = Keyword.get(config, :options, [])

Expand Down Expand Up @@ -201,7 +201,6 @@ defmodule ExRabbitPool.Consumer do

def basic_deliver(%{adapter: adapter, channel: channel}, payload, %{delivery_tag: tag}) do
:ok = adapter.ack(channel, tag)
IO.puts("[*] RabbitMQ message received: #{payload}")
end

def basic_consume_ok(_state, _consumer_tag), do: :ok
Expand Down
4 changes: 0 additions & 4 deletions test/integration/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,12 @@ defmodule ExRabbitPool.ConsumerTest do
queue: queue
} do
pid = start_supervised!({TestDefaultConsumer, pool_id: pool_id, queue: queue})
Process.group_leader(pid, self())
:erlang.trace(pid, true, [:receive])

ExRabbitPool.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = RabbitMQ.publish(channel, "#{queue}_exchange", "", "Hello Consumer!")
assert_receive {:trace, ^pid, :receive, {:basic_deliver, "Hello Consumer!", _}}, 1000

assert_receive {:io_request, ^pid, _,
{:put_chars, :unicode, "[*] RabbitMQ message received: Hello Consumer!\n"}}

{:ok, result} = Queue.status(channel, queue)
assert result == %{consumer_count: 1, message_count: 0, queue: queue}
end)
Expand Down
4 changes: 2 additions & 2 deletions test/worker/rabbit_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule ExRabbitPool.Worker.RabbitConnectionTest do
assert {:error, :out_of_channels} = ConnWorker.checkout_channel(pid)
%{channels: channels, monitors: monitors} = ConnWorker.state(pid)
assert Enum.empty?(channels)
assert Map.size(monitors) == 1
assert Kernel.map_size(monitors) == 1
assert :ok = ConnWorker.checkin_channel(pid, channel)
end

Expand All @@ -55,7 +55,7 @@ defmodule ExRabbitPool.Worker.RabbitConnectionTest do
pid = start_supervised!({ConnWorker, config})
assert {:ok, channel} = ConnWorker.checkout_channel(pid)
%{monitors: monitors} = ConnWorker.state(pid)
assert Map.size(monitors) == 1
assert Kernel.map_size(monitors) == 1
assert :ok = ConnWorker.checkin_channel(pid, channel)
%{monitors: monitors} = ConnWorker.state(pid)
assert Enum.empty?(monitors)
Expand Down

0 comments on commit 9bcd090

Please sign in to comment.