diff --git a/channel-sender/lib/channel_sender_ex/core/channel.ex b/channel-sender/lib/channel_sender_ex/core/channel.ex index a1d88fa..2dc5900 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel.ex @@ -70,6 +70,13 @@ defmodule ChannelSenderEx.Core.Channel do GenStateMachine.call(server, {:socket_disconnected_reason, reason}, timeout) end + @doc """ + get information about this channel + """ + def info(server, timeout \\ @on_connected_channel_reply_timeout) do + GenStateMachine.call(server, :info, timeout) + end + @doc """ operation to mark a message as acknowledged """ @@ -121,6 +128,13 @@ defmodule ChannelSenderEx.Core.Channel do end end + def waiting({:call, from}, :info, data) do + actions = [ + _reply = {:reply, from, {:waiting, data}} + ] + {:keep_state_and_data, actions} + end + ## stop the process with a timeout cause if the socket is not ## authenticated in the given time def waiting(:state_timeout, :waiting_timeout, data) do @@ -129,6 +143,7 @@ defmodule ChannelSenderEx.Core.Channel do end def waiting({:call, from}, {:socket_connected, socket_pid}, data) do + Logger.debug("Channel #{data.channel} received socket connected notification. Socket pid: #{inspect(socket_pid)}") socket_ref = Process.monitor(socket_pid) new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil} @@ -168,6 +183,7 @@ defmodule ChannelSenderEx.Core.Channel do {:EXIT, _, {:name_conflict, {c_ref, _}, _, new_pid}}, data = %{channel: c_ref} ) do + Logger.debug("Channel #{data.channel} stopping. Cause: :name_conflict") send(new_pid, {:twins_last_letter, data}) {:stop, :normal, %{data | stop_cause: :name_conflict}} end @@ -203,6 +219,13 @@ defmodule ChannelSenderEx.Core.Channel do {:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]} end + def connected({:call, from}, :info, data) do + actions = [ + _reply = {:reply, from, {:connected, data}} + ] + {:keep_state_and_data, actions} + end + # this method will be called when the socket is disconnected # to inform this process about the disconnection reason # this will be later used to define if this process will go back to the waiting state @@ -298,6 +321,17 @@ defmodule ChannelSenderEx.Core.Channel do end end + def connected({:call, from}, {:socket_connected, socket_pid}, data) do + socket_ref = Process.monitor(socket_pid) + new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil} + + actions = [ + _reply = {:reply, from, :ok} + ] + Logger.debug("Channel #{data.channel} overwritting socket pid.") + {:keep_state, new_data, actions} + end + ## Handle info notification when socket process terminates. This method is called because the socket is monitored. ## via Process.monitor(socket_pid) in the waited/connected state. def connected(:info, {:DOWN, _ref, :process, _object, _reason}, data) do @@ -317,6 +351,13 @@ defmodule ChannelSenderEx.Core.Channel do {:stop, :normal, %{data | stop_cause: :name_conflict}} end + # capture shutdown signal + def connected(:info, {:EXIT, from_pid, :shutdown}, data) do + source_process = Process.info(from_pid) + Logger.warning("Channel #{inspect(data)} received shutdown signal: #{inspect(source_process)}") + :keep_state_and_data + end + @impl true def terminate(reason, state, data) do level = if reason == :normal, do: :info, else: :warning diff --git a/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex b/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex index 1fa4200..b89f91a 100644 --- a/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex +++ b/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex @@ -37,6 +37,7 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcess do timeout = Application.get_env(:channel_sender_ex, :on_connected_channel_reply_timeout) Channel.socket_connected(pid, socket_pid, timeout) + pid end catch _type, _err -> :noproc diff --git a/channel-sender/lib/channel_sender_ex/transport/socket.ex b/channel-sender/lib/channel_sender_ex/transport/socket.ex index da20d8d..cae8edc 100644 --- a/channel-sender/lib/channel_sender_ex/transport/socket.ex +++ b/channel-sender/lib/channel_sender_ex/transport/socket.ex @@ -12,10 +12,16 @@ defmodule ChannelSenderEx.Transport.Socket do # not providing a valid channel reference @invalid_channel_code "1007" + # Error to indicate the shared secret for the channel is invalid @invalid_secret_code 1008 + # Error code to indicate that the channel is already connected + # and a new socket process is trying to connect to it. + @invalid_already_stablished "1009" + require Logger + alias ChannelSenderEx.Core.Channel alias ChannelSenderEx.Core.ChannelRegistry alias ChannelSenderEx.Core.ProtocolMessage alias ChannelSenderEx.Core.PubSub.ReConnectProcess @@ -70,10 +76,30 @@ defmodule ChannelSenderEx.Transport.Socket do action_fn = fn _ -> check_channel_registered(channel_ref) end # retries 3 times the lookup of the channel reference (useful when running as a cluster with several nodes) # with a backoff strategy of 100ms initial delay and max of 500ms delay. - execute(100, 500, 3, action_fn, fn -> + result = execute(100, 500, 3, action_fn, fn -> Logger.error("Socket unable to start. channel_ref process does not exist yet, ref: #{inspect(channel_ref)}") {:error, @invalid_channel_code} end) + + case result do + {:error, _desc} = e-> + e + {pid, res} -> + validate_channel_is_waiting(pid, res) + end + end + + defp validate_channel_is_waiting(pid, res) when is_pid(pid) do + {status, _data} = Channel.info(pid) + case status do + :waiting -> + # process can continue, and socket process will be linked to the channel process + res + _ -> + # channel is already in a connected state, and a previous socket process + # was already linked to it. + {:error, @invalid_already_stablished} + end end defp check_channel_registered(res = {@channel_key, channel_ref}) do @@ -81,7 +107,8 @@ defmodule ChannelSenderEx.Transport.Socket do :noproc -> Logger.warning("Channel #{channel_ref} not found, retrying query...") :retry - _ -> res + pid -> + {pid, res} end end @@ -207,22 +234,44 @@ defmodule ChannelSenderEx.Transport.Socket do end end + # @impl :cowboy_websocket + # def websocket_info({:DOWN, ref, :process, _pid, _cause}, state = {channel_ref, :connected, _enc, {_app, _usr, ref}, _data}) do + # Logger.warning("Socket for channel #{channel_ref} : spawning process for re-conection") + # #spawn_monitor(ReConnectProcess, :start, [self(), channel_ref]) + + # new_pid = ReConnectProcess.start(self(), channel_ref) + # Logger.debug("Socket for channel #{channel_ref} : channel process found for re-conection: #{inspect(new_pid)}") + # Process.monitor(new_pid) + + # {_commands = [], state} + # end + @impl :cowboy_websocket - def websocket_info({:DOWN, ref, :process, _pid, _cause}, state = {channel_ref, :connected, _, {_, _, ref}, _}) do - Logger.warning("Socket for channel #{channel_ref} : spawning process for re-conection") - spawn_monitor(ReConnectProcess, :start, [self(), channel_ref]) + def websocket_info({:DOWN, ref, proc, pid, cause}, state = {channel_ref, _, _, _, _}) do + Logger.warning("Socket for channel #{channel_ref} : received DOWN message: #{inspect({ref, proc, pid, cause})}") + + new_pid = ReConnectProcess.start(self(), channel_ref) + Logger.debug("Socket for channel #{channel_ref} : channel process found for re-conection: #{inspect(new_pid)}") + Process.monitor(new_pid) + {_commands = [], state} end @impl :cowboy_websocket def websocket_info({:DOWN, _ref, :process, _pid, :no_channel}, state = {channel_ref, :connected, _, {_, _, _}, _}) do Logger.warning("Socket for channel #{channel_ref} : spawning process for re-conection") - spawn_monitor(ReConnectProcess, :start, [self(), channel_ref]) + #spawn_monitor(ReConnectProcess, :start, [self(), channel_ref]) + + new_pid = ReConnectProcess.start(self(), channel_ref) + Logger.debug("Socket for channel #{channel_ref} : channel process found for re-conection: #{inspect(new_pid)}") + Process.monitor(new_pid) + {_commands = [], state} end @impl :cowboy_websocket - def websocket_info(_message, state) do + def websocket_info(message, state) do + Logger.debug("Socket received untracked info message: #{inspect(message)} and Data: #{inspect(state)}") {_commands = [], state} end diff --git a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs index f58ee14..c818906 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs @@ -109,6 +109,12 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do Process.sleep(500) assert Process.alive?(channel_pid) == false + + on_exit(fn -> + Application.delete_env(:channel_sender_ex, :channel_shutdown_on_clean_close) + Application.delete_env(:channel_sender_ex, :channel_shutdown_on_disconnection) + Helper.compile(:channel_sender_ex) + end) end test "Should not restart channel when terminated normal (Waiting timeout)" do @@ -153,6 +159,65 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do assert Map.get(pending_msg, "82") == msg2 end + test "Should not allow multiple socket to one channel process", %{ + port: port, + channel: channel, + secret: secret + } do + {conn, stream} = assert_connect_and_authenticate(port, channel, secret) + + # try to open a new socket connection and link it to the same channel + conn2 = connect(port, channel) + assert_receive {:gun_response, _, _, :fin, 400, resp}, 500 + + assert Enum.any?(resp, fn {k, v} -> + String.contains?(k, "x-error-code") and String.contains?(v, "1009") + end) + + :gun.close(conn2) + end + + test "Should supervisor re-create channel process when exits abnormally", %{ + port: port, + channel: channel, + secret: secret + } do + {conn, stream} = assert_connect_and_authenticate(port, channel, secret) + + channel_pid = ChannelRegistry.lookup_channel_addr(channel) + assert is_pid(channel_pid) + assert Process.alive?(channel_pid) + + # terminate channel process + Process.exit(channel_pid, :kill) + Process.sleep(100) + + # verify new process + channel_pid2 = ChannelRegistry.lookup_channel_addr(channel) + assert is_pid(channel_pid2) + assert Process.alive?(channel_pid2) + assert channel_pid != channel_pid2 + end + + test "Should channel ignore :shutdown signal", %{ + port: port, + channel: channel, + secret: secret + } do + {conn, stream} = assert_connect_and_authenticate(port, channel, secret) + + channel_pid = ChannelRegistry.lookup_channel_addr(channel) + assert is_pid(channel_pid) + assert Process.alive?(channel_pid) + + # try to terminate channel process + Process.exit(channel_pid, :shutdown) + Process.sleep(100) + + # verify process still alive + assert Process.alive?(channel_pid) + end + defp deliver_message(channel, message_id \\ "42") do {data, message} = build_message(message_id) channel_response = PubSubCore.deliver_to_channel(channel, message) diff --git a/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs b/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs index c6e7b20..9c3547f 100644 --- a/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs +++ b/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs @@ -39,7 +39,7 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcessTest do {ChannelRegistry, [], [lookup_channel_addr: fn(_) -> :c.pid(0, 200, 0) end]}, {Channel, [], [socket_connected: fn(_, _, _) -> :ok end]}, ]) do - assert ReConnectProcess.connect_socket_to_channel("channel_ref", :c.pid(0, 250, 0)) == :ok + assert is_pid(ReConnectProcess.connect_socket_to_channel("channel_ref", :c.pid(0, 250, 0))) end end diff --git a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs index a77ea23..a0b7865 100644 --- a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs @@ -88,6 +88,16 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do :gun.close(conn) end + test "Should not connect twice to socket", %{port: port, channel: channel, secret: secret} do + {conn, _stream} = assert_connect_and_authenticate(port, channel, secret) + + conn2 = connect(port, channel) + assert_receive {:gun_response, ^conn2, _stream, :fin, 400, headers}, 300 + assert Enum.any?(headers, fn {k, v} -> k == "x-error-code" and v == "1009" end) + + :gun.close(conn) + end + test "Should authenticate", %{port: port, channel: channel, secret: secret} do {conn, _stream} = assert_connect_and_authenticate(port, channel, secret) :gun.close(conn)