Skip to content

Commit

Permalink
feat: Simplify 'respond' logic
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Oct 20, 2024
1 parent 186dce1 commit dc749d5
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions lib/client/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule RabbitMQStream.Client.Lifecycle do
use GenServer
require Logger
alias RabbitMQStream.Client
alias RabbitMQStream.Message

@moduledoc """
This module defines the lifecycle of the RabbitMQStream.Client.
Expand Down Expand Up @@ -185,27 +186,33 @@ defmodule RabbitMQStream.Client.Lifecycle do
{:noreply, conn}
end

# An issue with only forwarding the messages to the broker is that it adds an extra message pass.
# To workaround this issue we could buffer messages so that it offsets the possible performance hit.
# Or we could attempt to use ':ets' in some way to prevent work around this, but there doesn't
# seem to be a way to do this while keeping the exact same interface as a Connection.
def handle_cast({:publish, opts}, %Client{} = conn) do
def handle_cast({:respond, opts}, %Client{} = conn) do
# We only accept one type of 'respond' command
%Message.Request{
command: :consumer_update,
data: %Message.Types.ConsumerUpdateRequestData{subscription_id: subscription_id}
} = Keyword.fetch!(opts, :request)

broker_pid =
conn.clients
|> Map.get(opts[:producer_id])
|> Map.get(subscription_id)
|> then(&elem(&1, 1))

GenServer.cast(broker_pid, {:publish, opts})
GenServer.cast(broker_pid, {:respond, opts})
{:noreply, conn}
end

def handle_cast({:respond, opts}, %Client{} = conn) do
# An issue with only forwarding the messages to the broker is that it adds an extra message pass.
# To workaround this issue we could buffer messages so that it offsets the possible performance hit.
# Or we could attempt to use ':ets' in some way to prevent work around this, but there doesn't
# seem to be a way to do this while keeping the exact same interface as a Connection.
def handle_cast({:publish, opts}, %Client{} = conn) do
broker_pid =
conn.clients
|> Map.get(opts[:subscription_id])
|> Map.get(opts[:producer_id])
|> then(&elem(&1, 1))

GenServer.cast(broker_pid, {:respond, opts})
GenServer.cast(broker_pid, {:publish, opts})
{:noreply, conn}
end

Expand Down

0 comments on commit dc749d5

Please sign in to comment.