Skip to content

Commit

Permalink
fix(ws): Change reconnection strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
juancgalvis committed Feb 4, 2025
1 parent 1ab15f0 commit 2fb0967
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 239 deletions.
9 changes: 7 additions & 2 deletions channel-sender/lib/channel_sender_ex/core/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ defmodule ChannelSenderEx.Core.Channel do
end

def waiting({:call, from}, {:socket_connected, socket_pid}, data) do
Logger.debug("Channel #{data.channel} received socket connected notification. Socket pid: #{inspect(socket_pid)}")
socket_ref = Process.monitor(socket_pid)
new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil}

Expand Down Expand Up @@ -200,6 +201,7 @@ defmodule ChannelSenderEx.Core.Channel do
{:EXIT, _, {:name_conflict, {c_ref, _}, _, new_pid}},
data = %{channel: c_ref}
) do
Logger.warning("Channel #{data.channel}, stopping process #{inspect(self())} in status :waiting due to :name_conflict, and starting new process #{inspect(new_pid)}")
send(new_pid, {:twins_last_letter, data})
{:stop, :normal, %{data | stop_cause: :name_conflict}}
end
Expand Down Expand Up @@ -242,13 +244,16 @@ defmodule ChannelSenderEx.Core.Channel do
{:keep_state_and_data, actions}
end

def connected({:call, from}, {:socket_connected, socket_pid}, data) do
def connected({:call, from}, {:socket_connected, socket_pid}, data = %{socket: {old_socket_pid, old_socket_ref}}) do
Process.demonitor(old_socket_ref)
send(old_socket_pid, :terminate_socket)
socket_ref = Process.monitor(socket_pid)
new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil}

actions = [
_reply = {:reply, from, :ok}
]

Logger.debug("Channel #{data.channel} overwritting socket pid.")
{:keep_state, new_data, actions}
end
Expand Down Expand Up @@ -364,7 +369,7 @@ defmodule ChannelSenderEx.Core.Channel do
{:EXIT, _, {:name_conflict, {c_ref, _}, _, new_pid}},
data = %{channel: c_ref}
) do
Logger.error("Channel #{data.channel} stopping, reason: #{inspect(:name_conflict)}")
Logger.warning("Channel #{data.channel}, stopping process #{inspect(self())} in status :waiting due to :name_conflict, and starting new process #{inspect(new_pid)}")
send(new_pid, {:twins_last_letter, data})
{:stop, :normal, %{data | stop_cause: :name_conflict}}
end
Expand Down
28 changes: 26 additions & 2 deletions channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do

alias ChannelSenderEx.Core.Channel
alias ChannelSenderEx.Core.RulesProvider
import ChannelSenderEx.Core.Retry.ExponentialBackoff, only: [execute: 5]
@max_retries 3
@min_backoff 50
@max_backoff 300

def start_link(_) do
result = Horde.DynamicSupervisor.start_link(__MODULE__, [strategy: :one_for_one, shutdown: 1000], name: __MODULE__)
opts = [strategy: :one_for_one, shutdown: 1000]
result = Horde.DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)

Logger.debug("ChannelSupervisor: #{inspect(result)}")
result
end
Expand All @@ -32,7 +38,11 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do

@spec start_channel(channel_init_args()) :: any()
def start_channel(args) do
Horde.DynamicSupervisor.start_child(__MODULE__, channel_child_spec(args))
action_fn = fn _ -> start_channel_retried(args) end

execute(@min_backoff, @max_backoff, @max_retries, action_fn, fn ->
raise("Error creating channel")
end)
end

@spec channel_child_spec(channel_init_args()) :: any()
Expand All @@ -51,6 +61,20 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do
}
end

defp start_channel_retried(args = {channel_ref, _application, _user_ref, _meta}) do
case Horde.DynamicSupervisor.start_child(__MODULE__, channel_child_spec(args)) do
{:ok, pid} ->
{:ok, pid}

{:error, {:already_started, pid}} ->
{:ok, pid}

{:error, reason} ->
Logger.warning("Error starting channel #{channel_ref}: #{inspect(reason)}")
:retry
end
end

defp via_tuple(ref, app, usr) do
{:via, Horde.Registry, {ChannelSenderEx.Core.ChannelRegistry, ref, {app, usr}}}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule ChannelSenderEx.Core.Retry.ExponentialBackoff do
defp normalize(value) when is_function(value), do: value
defp normalize(value) when is_atom(value), do: fn -> exit(value) end

def loop(_, _, max_retries, _, on_give_up, max_retries), do: on_give_up.()
def loop(_, _, max_retries, _, on_give_up, current_tries) when max_retries == current_tries, do: on_give_up.()

def loop(initial, max_delay, max_retries, action_fn, on_give_up, iter) do
actual_delay = exp_back_off(initial, max_delay, iter)
Expand Down
72 changes: 14 additions & 58 deletions channel-sender/lib/channel_sender_ex/transport/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule ChannelSenderEx.Transport.Socket do

# Error code to indicate that the channel is already connected
# and a new socket process is trying to connect to it.
@invalid_already_stablished "3009"
@socket_replaced "3009"

## -----------------
## Retryable errors
Expand Down Expand Up @@ -72,22 +72,11 @@ defmodule ChannelSenderEx.Transport.Socket do
end

@impl :cowboy_websocket
def websocket_init(state) do
def websocket_init(state = {ref, _, _}) do
Logger.debug("Socket init with pid: #{inspect(self())} starting... #{inspect(state)}")
{ref, _, _} = state
proc = lookup_channel_addr({"channel", ref})
case proc do
{:ok, pid} ->
case validate_channel_is_waiting(pid) do
{:error, desc, data} ->
Logger.warning("""
Socket init with pid: #{inspect(self())} will not continue for #{ref}.
Error: #{desc}. There is a socket already connected = #{inspect(data.socket)}
""")
{_commands = [{:close, 1001, desc}], state}
_ ->
{_commands = [], state}
end
case lookup_channel_addr({"channel", ref}) do
{:ok, _pid} ->
{_commands = [], state}
{:error, desc} ->
{_commands = [{:close, 1001, desc}], state}
end
Expand Down Expand Up @@ -162,21 +151,12 @@ defmodule ChannelSenderEx.Transport.Socket do
end
end

# @impl :cowboy_websocket
# def websocket_info({:DOWN, ref, :process, _pid, cause}, state = {channel_ref, :connected, _, {_, _, ref}, _}) do
# case cause do
# :normal ->
# Logger.info("Socket for channel #{channel_ref}. Related process #{inspect(ref)} down normally")
# {_commands = [{:close, @normal_close_code, "close"}], state}
# _ ->
# Logger.warning("""
# Socket for channel #{channel_ref}. Related Process #{inspect(ref)}
# down with cause #{inspect(cause)}. Spawning process for re-conection
# """)
# spawn_monitor(ReConnectProcess, :start, [self(), channel_ref])
# {_commands = [], state}
# end
# end
@impl :cowboy_websocket
def websocket_info(:terminate_socket, state = {channel_ref, _, _, _, _}) do
# ! check if we need to do something with the new_socket_pid
Logger.info("Socket for channel #{channel_ref} : received terminate_socket message")
{_commands = [{:close, 1001, <<@socket_replaced>>}], state}
end

@impl :cowboy_websocket
def websocket_info({:DOWN, ref, proc, pid, cause}, state = {channel_ref, _, _, _, _}) do
Expand Down Expand Up @@ -254,43 +234,19 @@ defmodule ChannelSenderEx.Transport.Socket do
action_fn = fn _ -> check_channel_registered(channel_ref) end
# retries 3 times the lookup of the channel reference (useful when running as a cluster with several nodes)
# with a backoff strategy of 100ms initial delay and max of 500ms delay.
result = execute(50, 300, 3, action_fn, fn ->
execute(100, 500, 3, action_fn, fn ->
Logger.error("Socket unable to start. channel_ref process does not exist yet, ref: #{inspect(channel_ref)}")
{:error, <<@invalid_channel_code>>}
end)

case result do
{:error, _desc} = e ->
e
{pid, _res} ->
{:ok, pid}
end
end

defp check_channel_registered(res = {@channel_key, channel_ref}) do
case ChannelRegistry.lookup_channel_addr(channel_ref) do
:noproc ->
Logger.warning("Socket: #{channel_ref} not found, retrying query...")
Logger.warning("Channel #{channel_ref} not found, retrying query...")
:retry
pid ->
{pid, res}
end
end

# defp check_channel_registered({:error, _desc}) do
# :retry
# end

defp validate_channel_is_waiting(pid) when is_pid(pid) do
{status, data} = Channel.info(pid)
case status do
:waiting ->
# process can continue, and socket process will be linked to the channel process
{:ok, data}
_ ->
# channel is already in a connected state, and a previous socket process
# was already linked to it.
{:error, <<@invalid_already_stablished>>, data}
{:ok, pid}
end
end

Expand Down
11 changes: 8 additions & 3 deletions channel-sender/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ defmodule ChannelSenderEx.MixProject do

# Run "mix help compile.app" to learn about applications.
def application do
extra_apps = if Mix.env() == :dev do
[:logger, :telemetry, :observer, :wx, :runtime_tools]
else
[:logger, :telemetry]
end
[
extra_applications: [:logger],
extra_applications: extra_apps,
mod: {ChannelSenderEx.Application, []}
]
end
Expand All @@ -39,12 +44,12 @@ defmodule ChannelSenderEx.MixProject do
{:gen_state_machine, "~> 3.0"},
{:jason, "~> 1.2"},
{:cors_plug, "~> 3.0"},
{:horde, "~> 0.8.7"},
{:horde, "~> 0.9.0"},
{:hackney, "~> 1.20.1", only: :test},
{:plug_crypto, "~> 2.1"},
{:stream_data, "~> 0.4", only: [:test]},
{:gun, "~> 1.3", only: [:test, :benchee]},
{:libcluster, "~> 3.3"},
{:libcluster, "~> 3.4.1"},
{:vapor, "~> 0.10.0"},
{:mock, "~> 0.3.0", only: :test},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
Expand Down
2 changes: 1 addition & 1 deletion channel-sender/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"},
"gun": {:hex, :gun, "1.3.3", "cf8b51beb36c22b9c8df1921e3f2bc4d2b1f68b49ad4fbc64e91875aa14e16b4", [:rebar3], [{:cowlib, "~> 2.7.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "3106ce167f9c9723f849e4fb54ea4a4d814e3996ae243a1c828b256e749041e0"},
"hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"},
"horde": {:hex, :horde, "0.8.7", "e51ab8e0e5bc7dcd0caa85d84b144cccfde97994bd865d822c7e489746b87e7f", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "835aede887d777542f85e0a88293c18113abcc1356006050ec216da16aa5e0e3"},
"horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
assert Map.get(pending_msg, "82") == msg2
end

test "Should not allow multiple socket to one channel process", %{
test "Should allow new socket to one channel process", %{
port: port,
channel: channel,
secret: secret
Expand All @@ -232,7 +232,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do

# try to open a new socket connection and link it to the same channel
conn2 = connect(port, channel)
assert_receive {:gun_ws, _, _, {:close, 1001, "3009"}}, 500
assert_receive {:gun_upgrade, _, _, ["websocket"], _}, 500

:gun.close(conn2)
end
Expand Down
21 changes: 13 additions & 8 deletions channel-sender/test/channel_sender_ex/core/node_observer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ defmodule ChannelSenderEx.Core.NodeObserverTest do
alias ChannelSenderEx.Core.{ChannelRegistry, ChannelSupervisor}

setup do
{:ok, pid} = NodeObserver.start_link([])
{:ok, pid} =
case NodeObserver.start_link([]) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
end

{:ok, _} = Application.ensure_all_started(:telemetry)

Expand All @@ -19,16 +23,17 @@ defmodule ChannelSenderEx.Core.NodeObserverTest do

test "handles nodeup message", %{pid: pid} do
assert capture_log(fn ->
send(pid, {:nodeup, :some_node, :visible})
:timer.sleep(100) # Allow some time for the message to be processed
end) == ""
send(pid, {:nodeup, :some_node, :visible})
# Allow some time for the message to be processed
:timer.sleep(100)
end) == ""
end

test "handles nodedown message", %{pid: pid} do
assert capture_log(fn ->
send(pid, {:nodedown, :some_node, :visible})
:timer.sleep(100) # Allow some time for the message to be processed
end) == ""
send(pid, {:nodedown, :some_node, :visible})
# Allow some time for the message to be processed
:timer.sleep(100)
end) == ""
end

end
Loading

0 comments on commit 2fb0967

Please sign in to comment.