Skip to content

Commit

Permalink
feat: Make a test work
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Oct 19, 2024
1 parent 80c0e25 commit 1564b4d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
11 changes: 9 additions & 2 deletions lib/client/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule RabbitMQStream.Client.Lifecycle do
How each command is handled by the Client
- `:connect`: NOOP
- `:close`: NOOP
- `:create_stream`:
- `:create_stream`: (Issue: Creates the stream on node)
- `:delete_stream`:
- `:query_offset`: Forward to 'control' connection
- `:delete_producer`:
Expand All @@ -36,7 +36,7 @@ defmodule RabbitMQStream.Client.Lifecycle do
- `:query_producer_sequence`: Forward to 'control' connection
- `:create_super_stream`:
- `:declare_producer`: Spawns a new connection
- `:publish`:
- `:publish`: Forwards it to the correct broker
"""

Expand Down Expand Up @@ -142,6 +142,13 @@ defmodule RabbitMQStream.Client.Lifecycle do
{:reply, :ok, conn}
end

@impl true
def handle_cast({:publish, opts}, %Client{} = conn) do
{_client_pid, broker_pid, _args} = Map.get(conn.clients, opts[:producer_id])
GenServer.cast(broker_pid, {:publish, opts})
{:noreply, conn}
end

@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, %Client{} = conn) do
case Map.get(conn.clients, ref) do
Expand Down
2 changes: 1 addition & 1 deletion lib/connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ defmodule RabbitMQStream.Connection do
is_binary(stream_name) do
GenServer.call(
server,
{:declare_producer, stream_name: stream_name, producer_reference: producer_reference}
{:declare_producer, stream_name: stream_name, producer_reference: producer_reference, pid: self()}
)
end

Expand Down
9 changes: 6 additions & 3 deletions test/client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ defmodule RabbitMQStreamTest.ClientTest do
assert :ok = RabbitMQStream.Connection.connect(conn)

{:ok, client} = RabbitMQStream.Client.start_link(host: "localhost")
dbg(client)

RabbitMQStream.Connection.create_stream(conn, "stream1")
RabbitMQStream.Connection.create_stream(conn, "stream2")
RabbitMQStream.Connection.create_stream(conn, "stream3")

{:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream1", self(), :next, 999)

Expand All @@ -34,5 +31,11 @@ defmodule RabbitMQStreamTest.ClientTest do
reference_name: "client-producer",
stream_name: "stream1"
)

message = Jason.encode!(%{message: "Hello, world2!"})

ClientProducer.publish(message)

assert_receive {:deliver, %{osiris_chunk: %{data_entries: [^message]}}}, 500
end
end

0 comments on commit 1564b4d

Please sign in to comment.