Skip to content

Commit

Permalink
fix: proposal to fix errors
Browse files Browse the repository at this point in the history
  • Loading branch information
gabheadz committed Jan 23, 2025
1 parent 7152ca3 commit 7d81fb6
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 8 deletions.
41 changes: 41 additions & 0 deletions channel-sender/lib/channel_sender_ex/core/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ defmodule ChannelSenderEx.Core.Channel do
GenStateMachine.call(server, {:socket_disconnected_reason, reason}, timeout)
end

@doc """
get information about this channel
"""
def info(server, timeout \\ @on_connected_channel_reply_timeout) do
GenStateMachine.call(server, :info, timeout)
end

@doc """
operation to mark a message as acknowledged
"""
Expand Down Expand Up @@ -121,6 +128,13 @@ defmodule ChannelSenderEx.Core.Channel do
end
end

def waiting({:call, from}, :info, data) do
actions = [
_reply = {:reply, from, {:waiting, data}}
]
{:keep_state_and_data, actions}
end

## stop the process with a timeout cause if the socket is not
## authenticated in the given time
def waiting(:state_timeout, :waiting_timeout, data) do
Expand All @@ -129,6 +143,7 @@ defmodule ChannelSenderEx.Core.Channel do
end

def waiting({:call, from}, {:socket_connected, socket_pid}, data) do
Logger.debug("Channel #{data.channel} received socket connected notification. Socket pid: #{inspect(socket_pid)}")
socket_ref = Process.monitor(socket_pid)
new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil}

Expand Down Expand Up @@ -168,6 +183,7 @@ defmodule ChannelSenderEx.Core.Channel do
{:EXIT, _, {:name_conflict, {c_ref, _}, _, new_pid}},
data = %{channel: c_ref}
) do
Logger.debug("Channel #{data.channel} stopping. Cause: :name_conflict")
send(new_pid, {:twins_last_letter, data})
{:stop, :normal, %{data | stop_cause: :name_conflict}}
end
Expand Down Expand Up @@ -203,6 +219,13 @@ defmodule ChannelSenderEx.Core.Channel do
{:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]}
end

def connected({:call, from}, :info, data) do
actions = [
_reply = {:reply, from, {:connected, data}}
]
{:keep_state_and_data, actions}
end

# this method will be called when the socket is disconnected
# to inform this process about the disconnection reason
# this will be later used to define if this process will go back to the waiting state
Expand Down Expand Up @@ -298,6 +321,17 @@ defmodule ChannelSenderEx.Core.Channel do
end
end

def connected({:call, from}, {:socket_connected, socket_pid}, data) do
socket_ref = Process.monitor(socket_pid)
new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil}

actions = [
_reply = {:reply, from, :ok}
]
Logger.debug("Channel #{data.channel} overwritting socket pid.")
{:keep_state, new_data, actions}
end

## Handle info notification when socket process terminates. This method is called because the socket is monitored.
## via Process.monitor(socket_pid) in the waited/connected state.
def connected(:info, {:DOWN, _ref, :process, _object, _reason}, data) do
Expand All @@ -317,6 +351,13 @@ defmodule ChannelSenderEx.Core.Channel do
{:stop, :normal, %{data | stop_cause: :name_conflict}}
end

# capture shutdown signal
def connected(:info, {:EXIT, from_pid, :shutdown}, data) do
source_process = Process.info(from_pid)
Logger.warning("Channel #{inspect(data)} received shutdown signal: #{inspect(source_process)}")
:keep_state_and_data
end

@impl true
def terminate(reason, state, data) do
level = if reason == :normal, do: :info, else: :warning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcess do
timeout = Application.get_env(:channel_sender_ex,
:on_connected_channel_reply_timeout)
Channel.socket_connected(pid, socket_pid, timeout)
pid
end
catch
_type, _err -> :noproc
Expand Down
63 changes: 56 additions & 7 deletions channel-sender/lib/channel_sender_ex/transport/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ defmodule ChannelSenderEx.Transport.Socket do
# not providing a valid channel reference
@invalid_channel_code "1007"

# Error to indicate the shared secret for the channel is invalid
@invalid_secret_code 1008

# Error code to indicate that the channel is already connected
# and a new socket process is trying to connect to it.
@invalid_already_stablished "1009"

require Logger

alias ChannelSenderEx.Core.Channel
alias ChannelSenderEx.Core.ChannelRegistry
alias ChannelSenderEx.Core.ProtocolMessage
alias ChannelSenderEx.Core.PubSub.ReConnectProcess
Expand Down Expand Up @@ -70,18 +76,39 @@ defmodule ChannelSenderEx.Transport.Socket do
action_fn = fn _ -> check_channel_registered(channel_ref) end
# retries 3 times the lookup of the channel reference (useful when running as a cluster with several nodes)
# with a backoff strategy of 100ms initial delay and max of 500ms delay.
execute(100, 500, 3, action_fn, fn ->
result = execute(100, 500, 3, action_fn, fn ->
Logger.error("Socket unable to start. channel_ref process does not exist yet, ref: #{inspect(channel_ref)}")
{:error, @invalid_channel_code}
end)

case result do
{:error, _desc} = e->
e
{pid, res} ->
validate_channel_is_waiting(pid, res)
end
end

defp validate_channel_is_waiting(pid, res) when is_pid(pid) do
{status, _data} = Channel.info(pid)
case status do
:waiting ->
# process can continue, and socket process will be linked to the channel process
res
_ ->
# channel is already in a connected state, and a previous socket process
# was already linked to it.
{:error, @invalid_already_stablished}
end
end

defp check_channel_registered(res = {@channel_key, channel_ref}) do
case ChannelRegistry.lookup_channel_addr(channel_ref) do
:noproc ->
Logger.warning("Channel #{channel_ref} not found, retrying query...")
:retry
_ -> res
pid ->
{pid, res}
end
end

Expand Down Expand Up @@ -207,22 +234,44 @@ defmodule ChannelSenderEx.Transport.Socket do
end
end

# @impl :cowboy_websocket
# def websocket_info({:DOWN, ref, :process, _pid, _cause}, state = {channel_ref, :connected, _enc, {_app, _usr, ref}, _data}) do
# Logger.warning("Socket for channel #{channel_ref} : spawning process for re-conection")
# #spawn_monitor(ReConnectProcess, :start, [self(), channel_ref])

# new_pid = ReConnectProcess.start(self(), channel_ref)
# Logger.debug("Socket for channel #{channel_ref} : channel process found for re-conection: #{inspect(new_pid)}")
# Process.monitor(new_pid)

# {_commands = [], state}
# end

@impl :cowboy_websocket
def websocket_info({:DOWN, ref, :process, _pid, _cause}, state = {channel_ref, :connected, _, {_, _, ref}, _}) do
Logger.warning("Socket for channel #{channel_ref} : spawning process for re-conection")
spawn_monitor(ReConnectProcess, :start, [self(), channel_ref])
def websocket_info({:DOWN, ref, proc, pid, cause}, state = {channel_ref, _, _, _, _}) do
Logger.warning("Socket for channel #{channel_ref} : received DOWN message: #{inspect({ref, proc, pid, cause})}")

new_pid = ReConnectProcess.start(self(), channel_ref)
Logger.debug("Socket for channel #{channel_ref} : channel process found for re-conection: #{inspect(new_pid)}")
Process.monitor(new_pid)

{_commands = [], state}
end

@impl :cowboy_websocket
def websocket_info({:DOWN, _ref, :process, _pid, :no_channel}, state = {channel_ref, :connected, _, {_, _, _}, _}) do
Logger.warning("Socket for channel #{channel_ref} : spawning process for re-conection")
spawn_monitor(ReConnectProcess, :start, [self(), channel_ref])
#spawn_monitor(ReConnectProcess, :start, [self(), channel_ref])

new_pid = ReConnectProcess.start(self(), channel_ref)
Logger.debug("Socket for channel #{channel_ref} : channel process found for re-conection: #{inspect(new_pid)}")
Process.monitor(new_pid)

{_commands = [], state}
end

@impl :cowboy_websocket
def websocket_info(_message, state) do
def websocket_info(message, state) do
Logger.debug("Socket received untracked info message: #{inspect(message)} and Data: #{inspect(state)}")
{_commands = [], state}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do

Process.sleep(500)
assert Process.alive?(channel_pid) == false

on_exit(fn ->
Application.delete_env(:channel_sender_ex, :channel_shutdown_on_clean_close)
Application.delete_env(:channel_sender_ex, :channel_shutdown_on_disconnection)
Helper.compile(:channel_sender_ex)
end)
end

test "Should not restart channel when terminated normal (Waiting timeout)" do
Expand Down Expand Up @@ -153,6 +159,65 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
assert Map.get(pending_msg, "82") == msg2
end

test "Should not allow multiple socket to one channel process", %{
port: port,
channel: channel,
secret: secret
} do
{conn, stream} = assert_connect_and_authenticate(port, channel, secret)

# try to open a new socket connection and link it to the same channel
conn2 = connect(port, channel)
assert_receive {:gun_response, _, _, :fin, 400, resp}, 500

assert Enum.any?(resp, fn {k, v} ->
String.contains?(k, "x-error-code") and String.contains?(v, "1009")
end)

:gun.close(conn2)
end

test "Should supervisor re-create channel process when exits abnormally", %{
port: port,
channel: channel,
secret: secret
} do
{conn, stream} = assert_connect_and_authenticate(port, channel, secret)

channel_pid = ChannelRegistry.lookup_channel_addr(channel)
assert is_pid(channel_pid)
assert Process.alive?(channel_pid)

# terminate channel process
Process.exit(channel_pid, :kill)
Process.sleep(100)

# verify new process
channel_pid2 = ChannelRegistry.lookup_channel_addr(channel)
assert is_pid(channel_pid2)
assert Process.alive?(channel_pid2)
assert channel_pid != channel_pid2
end

test "Should channel ignore :shutdown signal", %{
port: port,
channel: channel,
secret: secret
} do
{conn, stream} = assert_connect_and_authenticate(port, channel, secret)

channel_pid = ChannelRegistry.lookup_channel_addr(channel)
assert is_pid(channel_pid)
assert Process.alive?(channel_pid)

# try to terminate channel process
Process.exit(channel_pid, :shutdown)
Process.sleep(100)

# verify process still alive
assert Process.alive?(channel_pid)
end

defp deliver_message(channel, message_id \\ "42") do
{data, message} = build_message(message_id)
channel_response = PubSubCore.deliver_to_channel(channel, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcessTest do
{ChannelRegistry, [], [lookup_channel_addr: fn(_) -> :c.pid(0, 200, 0) end]},
{Channel, [], [socket_connected: fn(_, _, _) -> :ok end]},
]) do
assert ReConnectProcess.connect_socket_to_channel("channel_ref", :c.pid(0, 250, 0)) == :ok
assert is_pid(ReConnectProcess.connect_socket_to_channel("channel_ref", :c.pid(0, 250, 0)))
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do
:gun.close(conn)
end

test "Should not connect twice to socket", %{port: port, channel: channel, secret: secret} do
{conn, _stream} = assert_connect_and_authenticate(port, channel, secret)

conn2 = connect(port, channel)
assert_receive {:gun_response, ^conn2, _stream, :fin, 400, headers}, 300
assert Enum.any?(headers, fn {k, v} -> k == "x-error-code" and v == "1009" end)

:gun.close(conn)
end

test "Should authenticate", %{port: port, channel: channel, secret: secret} do
{conn, _stream} = assert_connect_and_authenticate(port, channel, secret)
:gun.close(conn)
Expand Down

0 comments on commit 7d81fb6

Please sign in to comment.