diff --git a/lib/slipstream/connection/impl.ex b/lib/slipstream/connection/impl.ex index 1b81cb5..99be94e 100644 --- a/lib/slipstream/connection/impl.ex +++ b/lib/slipstream/connection/impl.ex @@ -77,7 +77,13 @@ defmodule Slipstream.Connection.Impl do end def push_message(message, state) do - push_message({:text, encode(message, state)}, state) + message = + case encode(message, state) do + {:binary, message} -> {:binary, message} + message when is_binary(message) -> {:text, message} + end + + push_message(message, state) end # coveralls-ignore-start diff --git a/lib/slipstream/serializer.ex b/lib/slipstream/serializer.ex index 3acd8ff..dcd6a73 100644 --- a/lib/slipstream/serializer.ex +++ b/lib/slipstream/serializer.ex @@ -5,8 +5,13 @@ defmodule Slipstream.Serializer do @doc """ Encodes `Slipstream.Message` structs to binary. + + Should return either a binary (string) when using a text based protocol + or `{:binary, binary}` for cases where a binary protocol is used over + the wire (such as MessagePack). """ - @callback encode!(Slipstream.Message.t(), options :: Keyword.t()) :: binary() + @callback encode!(Slipstream.Message.t(), options :: Keyword.t()) :: + binary() | {:binary, binary()} @doc """ Decodes binary into `Slipstream.Message` struct. diff --git a/test/fixtures/good_example.ex b/test/fixtures/good_example.ex index ece7cc9..004c8f4 100644 --- a/test/fixtures/good_example.ex +++ b/test/fixtures/good_example.ex @@ -5,16 +5,17 @@ defmodule Slipstream.GoodExample do use Slipstream, restart: :transient - @config Application.compile_env!(:slipstream, __MODULE__) - def start_link(opts) do Slipstream.start_link(__MODULE__, opts) end @impl Slipstream - def init(test_proc) do + def init(opts) do + {test_proc, opts} = Keyword.pop!(opts, :pid) + opts = Keyword.put_new(opts, :uri, "ws://localhost:4001/socket/websocket") + socket = - @config + opts |> connect!() |> assign(:test_proc, test_proc) diff --git a/test/slipstream/client_telemetry_test.exs b/test/slipstream/client_telemetry_test.exs index e796763..b9e3e82 100644 --- a/test/slipstream/client_telemetry_test.exs +++ b/test/slipstream/client_telemetry_test.exs @@ -45,7 +45,7 @@ defmodule Slipstream.ClientTelemetryTest do end test "when we connect and disconnect, we get expected telemetry" do - start_supervised!({@client, self()}) + start_supervised!({@client, pid: self()}) assert_receive {@client, :connected} assert_receive {:telemetry, [:slipstream, :client, :connect, :start], @@ -66,7 +66,7 @@ defmodule Slipstream.ClientTelemetryTest do test "when we join a channel, we get expected telemetry" do topic = "test:good" - pid = start_supervised!({@client, self()}) + pid = start_supervised!({@client, pid: self()}) assert_receive {@client, :connected} join(pid, topic) assert_receive {@client, :joined, ^topic, %{}} @@ -88,7 +88,7 @@ defmodule Slipstream.ClientTelemetryTest do test "when we receive a message, we get expected telemetry" do topic = "test:good" - pid = start_supervised!({@client, self()}) + pid = start_supervised!({@client, pid: self()}) assert_receive {@client, :connected} join(pid, topic) assert_receive {@client, :joined, ^topic, %{}} diff --git a/test/slipstream/connection_telemetry_test.exs b/test/slipstream/connection_telemetry_test.exs index ad9f847..45dabf7 100644 --- a/test/slipstream/connection_telemetry_test.exs +++ b/test/slipstream/connection_telemetry_test.exs @@ -45,7 +45,7 @@ defmodule Slipstream.ConnectionTelemetryTest do end test "when we connect and disconnect, we get expected telemetry" do - start_supervised!({@client, self()}) + start_supervised!({@client, [pid: self()]}) assert_receive {@client, :connected} assert_receive {:telemetry, [:slipstream, :connection, :connect, :start], @@ -68,7 +68,7 @@ defmodule Slipstream.ConnectionTelemetryTest do end test "when we successfully connect, we handle mint messages" do - start_supervised!({@client, self()}) + start_supervised!({@client, pid: self()}) assert_receive {@client, :connected} assert_receive {:telemetry, [:slipstream, :connection, :connect, :start], diff --git a/test/slipstream/integration_test.exs b/test/slipstream/integration_test.exs index abfd53c..494aa6a 100644 --- a/test/slipstream/integration_test.exs +++ b/test/slipstream/integration_test.exs @@ -28,183 +28,240 @@ defmodule Slipstream.IntegrationTest do end end - describe "given a connection has been established through the #{@client}" do - setup do - pid = start_supervised!({@client, self()}) - assert_receive {@client, :connected}, @timeout + defmodule EtfSerializer do + alias Slipstream.Message + + @behaviour Slipstream.Serializer + + @impl true + def encode!(%Message{} = msg, _opts) do + data = [ + msg.join_ref, + msg.ref, + msg.topic, + msg.event, + stringify_keys(msg.payload) + ] + + {:binary, :erlang.term_to_binary(data)} + end - [pid: pid, good_topic: "test:good", bad_topic: "test:bad"] + @impl true + def decode!(binary, _opts) do + [join_ref, ref, topic, event, payload] = :erlang.binary_to_term(binary) + + payload = + case payload do + {:binary, data} -> {:binary, data} + other -> stringify_keys(other) + end + + %Message{ + join_ref: join_ref, + ref: ref, + topic: topic, + event: event, + payload: payload + } end - test "joining a good channel works", c do - topic = c.good_topic + defp stringify_keys(payload), do: Jason.encode!(payload) |> Jason.decode!() + end - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout - end + for message_type <- [:normal, :binary] do + describe "#{message_type}: given a connection has been established through the #{@client}" do + @describetag message_type: message_type + setup c do + opts = + case c.message_type do + :normal -> + [uri: "ws://localhost:4001/socket/websocket", pid: self()] - test "duplicate joins do not result in an actual duplicate join", c do - topic = c.good_topic + :binary -> + [ + uri: "ws://localhost:4001/socket/etf/websocket", + pid: self(), + serializer: EtfSerializer + ] + end - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + pid = start_supervised!({@client, opts}) + assert_receive {@client, :connected}, @timeout - :ok = join(c.pid, topic) - refute_receive {@client, :joined, ^topic, _reply}, @timeout - end + [pid: pid, good_topic: "test:good", bad_topic: "test:bad"] + end - test "once a channel is joined it can be left", c do - topic = c.good_topic + test "joining a good channel works", c do + topic = c.good_topic - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout + end - :ok = GenServer.cast(c.pid, {:leave, topic}) - assert_receive {@client, :left, ^topic}, @timeout - end + test "duplicate joins do not result in an actual duplicate join", c do + topic = c.good_topic - test "once a channel is left, you cannot duplicate the leave", c do - topic = c.good_topic + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + :ok = join(c.pid, topic) + refute_receive {@client, :joined, ^topic, _reply}, @timeout + end - :ok = GenServer.cast(c.pid, {:leave, topic}) - assert_receive {@client, :left, ^topic}, @timeout + test "once a channel is joined it can be left", c do + topic = c.good_topic - :ok = GenServer.cast(c.pid, {:leave, topic}) - refute_receive {@client, :left, ^topic}, @timeout - end + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - test "a message may be pushed to the remote", c do - topic = c.good_topic + :ok = GenServer.cast(c.pid, {:leave, topic}) + assert_receive {@client, :left, ^topic}, @timeout + end - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + test "once a channel is left, you cannot duplicate the leave", c do + topic = c.good_topic - :ok = GenServer.cast(c.pid, {:push, topic, "quicksand", %{}}) - assert_receive {@server, :in, ^topic, "quicksand", %{}}, @timeout - end + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - test "if a topic is not yet joined, you may not push a message", c do - topic = c.good_topic + :ok = GenServer.cast(c.pid, {:leave, topic}) + assert_receive {@client, :left, ^topic}, @timeout - :ok = GenServer.cast(c.pid, {:push, topic, "quicksand", %{}}) - refute_receive {@server, :in, ^topic, "quicksand", %{}}, @timeout - end + :ok = GenServer.cast(c.pid, {:leave, topic}) + refute_receive {@client, :left, ^topic}, @timeout + end - test "a message may be received from the server", c do - topic = c.good_topic + test "a message may be pushed to the remote", c do + topic = c.good_topic - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - :ok = GenServer.cast(c.pid, {:push, topic, "push to me", %{}}) + :ok = GenServer.cast(c.pid, {:push, topic, "quicksand", %{}}) + assert_receive {@server, :in, ^topic, "quicksand", %{}}, @timeout + end - assert_receive {@client, :received_message, ^topic, "foo", - %{"bar" => "baz"}}, - @timeout - end + test "if a topic is not yet joined, you may not push a message", c do + topic = c.good_topic - test "a reply may be received from the server", c do - topic = c.good_topic + :ok = GenServer.cast(c.pid, {:push, topic, "quicksand", %{}}) + refute_receive {@server, :in, ^topic, "quicksand", %{}}, @timeout + end - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + test "a message may be received from the server", c do + topic = c.good_topic - :ok = GenServer.cast(c.pid, {:push, topic, "ping", %{}}) + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - assert_receive {@client, :received_reply, _ref, - {:ok, %{"pong" => "pong"}}}, - @timeout - end + :ok = GenServer.cast(c.pid, {:push, topic, "push to me", %{}}) - test "a regular broadcast may be received from the server", c do - topic = c.good_topic + assert_receive {@client, :received_message, ^topic, "foo", + %{"bar" => "baz"}}, + @timeout + end - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + test "a reply may be received from the server", c do + topic = c.good_topic - :ok = GenServer.cast(c.pid, {:push, topic, "broadcast", %{}}) + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - assert_receive {@client, :received_message, ^topic, "broadcast event", - %{"hello" => "everyone!"}}, - @timeout - end + :ok = GenServer.cast(c.pid, {:push, topic, "ping", %{}}) - test "a binary broadcast may be received from the server", c do - topic = c.good_topic + assert_receive {@client, :received_reply, _ref, + {:ok, %{"pong" => "pong"}}}, + @timeout + end - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + test "a regular broadcast may be received from the server", c do + topic = c.good_topic - :ok = GenServer.cast(c.pid, {:push, topic, "binary broadcast", %{}}) + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - assert_receive {@client, :received_message, ^topic, "broadcast event", - {:binary, "🏴‍☠️"}}, - @timeout - end + :ok = GenServer.cast(c.pid, {:push, topic, "broadcast", %{}}) - test "a connection may be disconnected", c do - topic = c.good_topic + assert_receive {@client, :received_message, ^topic, "broadcast event", + %{"hello" => "everyone!"}}, + @timeout + end - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + test "a binary broadcast may be received from the server", c do + topic = c.good_topic - :ok = GenServer.cast(c.pid, :disconnect) - assert_receive {@client, :disconnected, reason}, @timeout - assert reason == :client_disconnect_requested - end + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - test "if the remote server raises, we handle a topic disconnect event", c do - topic = c.good_topic + :ok = GenServer.cast(c.pid, {:push, topic, "binary broadcast", %{}}) - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + assert_receive {@client, :received_message, ^topic, "broadcast event", + {:binary, "🏴‍☠️"}}, + @timeout + end - :ok = GenServer.cast(c.pid, {:push, topic, "raise", %{}}) - assert_receive {@client, :topic_closed, ^topic, {:error, %{}}}, @timeout - end + test "a connection may be disconnected", c do + topic = c.good_topic - test "if the remote server stops, we handle a left event", c do - topic = c.good_topic + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + :ok = GenServer.cast(c.pid, :disconnect) + assert_receive {@client, :disconnected, reason}, @timeout + assert reason == :client_disconnect_requested + end - :ok = GenServer.cast(c.pid, {:push, topic, "stop", %{}}) - assert_receive {@client, :left, ^topic}, @timeout - end + test "if the remote server raises, we handle a topic disconnect event", + c do + topic = c.good_topic - test "trying to join a non-existent topic fails", c do - topic = "test:no function clause matching" + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - :ok = join(c.pid, topic) + :ok = GenServer.cast(c.pid, {:push, topic, "raise", %{}}) + assert_receive {@client, :topic_closed, ^topic, {:error, %{}}}, @timeout + end - assert_receive {@client, :topic_closed, ^topic, - {:failed_to_join, %{"reason" => "join crashed"}}}, - @timeout - end + test "if the remote server stops, we handle a left event", c do + topic = c.good_topic - test "trying to join a bad topic fails", c do - topic = c.bad_topic + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - :ok = join(c.pid, topic) + :ok = GenServer.cast(c.pid, {:push, topic, "stop", %{}}) + assert_receive {@client, :left, ^topic}, @timeout + end - assert_receive {@client, :topic_closed, ^topic, - {:failed_to_join, %{"bad" => "join"}}}, - @timeout - end + test "trying to join a non-existent topic fails", c do + topic = "test:no function clause matching" + + :ok = join(c.pid, topic) + + assert_receive {@client, :topic_closed, ^topic, + {:failed_to_join, %{"reason" => "join crashed"}}}, + @timeout + end + + test "trying to join a bad topic fails", c do + topic = c.bad_topic + + :ok = join(c.pid, topic) + + assert_receive {@client, :topic_closed, ^topic, + {:failed_to_join, %{"bad" => "join"}}}, + @timeout + end - test "we may receive a reply which is just an atom", c do - topic = c.good_topic + test "we may receive a reply which is just an atom", c do + topic = c.good_topic - :ok = join(c.pid, topic) - assert_receive {@client, :joined, ^topic, _reply}, @timeout + :ok = join(c.pid, topic) + assert_receive {@client, :joined, ^topic, _reply}, @timeout - :ok = GenServer.cast(c.pid, {:push, topic, "ack", %{}}) + :ok = GenServer.cast(c.pid, {:push, topic, "ack", %{}}) - assert_receive {@client, :received_reply, _ref, :ok}, @timeout + assert_receive {@client, :received_reply, _ref, :ok}, @timeout + end end end diff --git a/test/support/lib/slipstream_web/endpoint.ex b/test/support/lib/slipstream_web/endpoint.ex index b67071c..d6fb3df 100644 --- a/test/support/lib/slipstream_web/endpoint.ex +++ b/test/support/lib/slipstream_web/endpoint.ex @@ -20,6 +20,11 @@ defmodule SlipstreamWeb.Endpoint do longpoll: false ) + socket("/socket/etf", SlipstreamWeb.UserSocket, + websocket: [serializer: [{SlipstreamWeb.EtfSerializer, "~> 2.0.0"}]], + longpoll: false + ) + # Serve at "/" the static files from "priv/static" directory. # # You should set gzip to true if you are running phx.digest diff --git a/test/support/lib/slipstream_web/etf_serializer.ex b/test/support/lib/slipstream_web/etf_serializer.ex new file mode 100644 index 0000000..9f4bbe0 --- /dev/null +++ b/test/support/lib/slipstream_web/etf_serializer.ex @@ -0,0 +1,44 @@ +defmodule SlipstreamWeb.EtfSerializer do + @moduledoc false + @behaviour Phoenix.Socket.Serializer + + alias Phoenix.Socket.{Broadcast, Message, Reply} + + @impl true + def fastlane!(%Broadcast{} = msg) do + data = :erlang.term_to_binary([nil, nil, msg.topic, msg.event, msg.payload]) + {:socket_push, :binary, data} + end + + @impl true + def encode!(%Reply{} = reply) do + data = [ + reply.join_ref, + reply.ref, + reply.topic, + "phx_reply", + %{status: reply.status, response: reply.payload} + ] + + {:socket_push, :binary, :erlang.term_to_binary(data)} + end + + def encode!(%Message{} = msg) do + data = [msg.join_ref, msg.ref, msg.topic, msg.event, msg.payload] + {:socket_push, :binary, :erlang.term_to_binary(data)} + end + + @impl true + def decode!(raw_message, _opts) do + [join_ref, ref, topic, event, payload | _] = + :erlang.binary_to_term(raw_message) + + %Message{ + topic: topic, + event: event, + payload: payload, + ref: ref, + join_ref: join_ref + } + end +end