diff --git a/channel-sender/lib/channel_sender_ex/core/channel.ex b/channel-sender/lib/channel_sender_ex/core/channel.ex index 94d26e2..031999b 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel.ex @@ -161,6 +161,7 @@ defmodule ChannelSenderEx.Core.Channel do end def waiting({:call, from}, {:socket_connected, socket_pid}, data) do + Logger.debug("Channel #{data.channel} received socket connected notification. Socket pid: #{inspect(socket_pid)}") socket_ref = Process.monitor(socket_pid) new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil} @@ -200,6 +201,7 @@ defmodule ChannelSenderEx.Core.Channel do {:EXIT, _, {:name_conflict, {c_ref, _}, _, new_pid}}, data = %{channel: c_ref} ) do + Logger.warning("Channel #{data.channel}, stopping process #{inspect(self())} in status :waiting due to :name_conflict, and starting new process #{inspect(new_pid)}") send(new_pid, {:twins_last_letter, data}) {:stop, :normal, %{data | stop_cause: :name_conflict}} end @@ -242,13 +244,16 @@ defmodule ChannelSenderEx.Core.Channel do {:keep_state_and_data, actions} end - def connected({:call, from}, {:socket_connected, socket_pid}, data) do + def connected({:call, from}, {:socket_connected, socket_pid}, data = %{socket: {old_socket_pid, old_socket_ref}}) do + Process.demonitor(old_socket_ref) + send(old_socket_pid, :terminate_socket) socket_ref = Process.monitor(socket_pid) new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil} actions = [ _reply = {:reply, from, :ok} ] + Logger.debug("Channel #{data.channel} overwritting socket pid.") {:keep_state, new_data, actions} end @@ -364,7 +369,7 @@ defmodule ChannelSenderEx.Core.Channel do {:EXIT, _, {:name_conflict, {c_ref, _}, _, new_pid}}, data = %{channel: c_ref} ) do - Logger.error("Channel #{data.channel} stopping, reason: #{inspect(:name_conflict)}") + Logger.warning("Channel #{data.channel}, stopping process #{inspect(self())} in status :waiting due to :name_conflict, and starting new process #{inspect(new_pid)}") send(new_pid, {:twins_last_letter, data}) {:stop, :normal, %{data | stop_cause: :name_conflict}} end diff --git a/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex b/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex index 805a50f..3af2cdc 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex @@ -7,9 +7,15 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do alias ChannelSenderEx.Core.Channel alias ChannelSenderEx.Core.RulesProvider + import ChannelSenderEx.Core.Retry.ExponentialBackoff, only: [execute: 5] + @max_retries 3 + @min_backoff 50 + @max_backoff 300 def start_link(_) do - result = Horde.DynamicSupervisor.start_link(__MODULE__, [strategy: :one_for_one, shutdown: 1000], name: __MODULE__) + opts = [strategy: :one_for_one, shutdown: 1000] + result = Horde.DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__) + Logger.debug("ChannelSupervisor: #{inspect(result)}") result end @@ -32,7 +38,11 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do @spec start_channel(channel_init_args()) :: any() def start_channel(args) do - Horde.DynamicSupervisor.start_child(__MODULE__, channel_child_spec(args)) + action_fn = fn _ -> start_channel_retried(args) end + + execute(@min_backoff, @max_backoff, @max_retries, action_fn, fn -> + raise("Error creating channel") + end) end @spec channel_child_spec(channel_init_args()) :: any() @@ -51,6 +61,20 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do } end + defp start_channel_retried(args = {channel_ref, _application, _user_ref, _meta}) do + case Horde.DynamicSupervisor.start_child(__MODULE__, channel_child_spec(args)) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + {:ok, pid} + + {:error, reason} -> + Logger.warning("Error starting channel #{channel_ref}: #{inspect(reason)}") + :retry + end + end + defp via_tuple(ref, app, usr) do {:via, Horde.Registry, {ChannelSenderEx.Core.ChannelRegistry, ref, {app, usr}}} end diff --git a/channel-sender/lib/channel_sender_ex/core/retry/exponential_backoff.ex b/channel-sender/lib/channel_sender_ex/core/retry/exponential_backoff.ex index 325f915..07c5a82 100644 --- a/channel-sender/lib/channel_sender_ex/core/retry/exponential_backoff.ex +++ b/channel-sender/lib/channel_sender_ex/core/retry/exponential_backoff.ex @@ -9,7 +9,7 @@ defmodule ChannelSenderEx.Core.Retry.ExponentialBackoff do defp normalize(value) when is_function(value), do: value defp normalize(value) when is_atom(value), do: fn -> exit(value) end - def loop(_, _, max_retries, _, on_give_up, max_retries), do: on_give_up.() + def loop(_, _, max_retries, _, on_give_up, current_tries) when max_retries == current_tries, do: on_give_up.() def loop(initial, max_delay, max_retries, action_fn, on_give_up, iter) do actual_delay = exp_back_off(initial, max_delay, iter) diff --git a/channel-sender/lib/channel_sender_ex/transport/socket.ex b/channel-sender/lib/channel_sender_ex/transport/socket.ex index 9f9a851..946a217 100644 --- a/channel-sender/lib/channel_sender_ex/transport/socket.ex +++ b/channel-sender/lib/channel_sender_ex/transport/socket.ex @@ -19,7 +19,7 @@ defmodule ChannelSenderEx.Transport.Socket do # Error code to indicate that the channel is already connected # and a new socket process is trying to connect to it. - @invalid_already_stablished "3009" + @socket_replaced "3009" ## ----------------- ## Retryable errors @@ -72,22 +72,11 @@ defmodule ChannelSenderEx.Transport.Socket do end @impl :cowboy_websocket - def websocket_init(state) do + def websocket_init(state = {ref, _, _}) do Logger.debug("Socket init with pid: #{inspect(self())} starting... #{inspect(state)}") - {ref, _, _} = state - proc = lookup_channel_addr({"channel", ref}) - case proc do - {:ok, pid} -> - case validate_channel_is_waiting(pid) do - {:error, desc, data} -> - Logger.warning(""" - Socket init with pid: #{inspect(self())} will not continue for #{ref}. - Error: #{desc}. There is a socket already connected = #{inspect(data.socket)} - """) - {_commands = [{:close, 1001, desc}], state} - _ -> - {_commands = [], state} - end + case lookup_channel_addr({"channel", ref}) do + {:ok, _pid} -> + {_commands = [], state} {:error, desc} -> {_commands = [{:close, 1001, desc}], state} end @@ -162,21 +151,12 @@ defmodule ChannelSenderEx.Transport.Socket do end end - # @impl :cowboy_websocket - # def websocket_info({:DOWN, ref, :process, _pid, cause}, state = {channel_ref, :connected, _, {_, _, ref}, _}) do - # case cause do - # :normal -> - # Logger.info("Socket for channel #{channel_ref}. Related process #{inspect(ref)} down normally") - # {_commands = [{:close, @normal_close_code, "close"}], state} - # _ -> - # Logger.warning(""" - # Socket for channel #{channel_ref}. Related Process #{inspect(ref)} - # down with cause #{inspect(cause)}. Spawning process for re-conection - # """) - # spawn_monitor(ReConnectProcess, :start, [self(), channel_ref]) - # {_commands = [], state} - # end - # end + @impl :cowboy_websocket + def websocket_info(:terminate_socket, state = {channel_ref, _, _, _, _}) do + # ! check if we need to do something with the new_socket_pid + Logger.info("Socket for channel #{channel_ref} : received terminate_socket message") + {_commands = [{:close, 1001, <<@socket_replaced>>}], state} + end @impl :cowboy_websocket def websocket_info({:DOWN, ref, proc, pid, cause}, state = {channel_ref, _, _, _, _}) do @@ -254,43 +234,19 @@ defmodule ChannelSenderEx.Transport.Socket do action_fn = fn _ -> check_channel_registered(channel_ref) end # retries 3 times the lookup of the channel reference (useful when running as a cluster with several nodes) # with a backoff strategy of 100ms initial delay and max of 500ms delay. - result = execute(50, 300, 3, action_fn, fn -> + execute(100, 500, 3, action_fn, fn -> Logger.error("Socket unable to start. channel_ref process does not exist yet, ref: #{inspect(channel_ref)}") {:error, <<@invalid_channel_code>>} end) - - case result do - {:error, _desc} = e -> - e - {pid, _res} -> - {:ok, pid} - end end defp check_channel_registered(res = {@channel_key, channel_ref}) do case ChannelRegistry.lookup_channel_addr(channel_ref) do :noproc -> - Logger.warning("Socket: #{channel_ref} not found, retrying query...") + Logger.warning("Channel #{channel_ref} not found, retrying query...") :retry pid -> - {pid, res} - end - end - - # defp check_channel_registered({:error, _desc}) do - # :retry - # end - - defp validate_channel_is_waiting(pid) when is_pid(pid) do - {status, data} = Channel.info(pid) - case status do - :waiting -> - # process can continue, and socket process will be linked to the channel process - {:ok, data} - _ -> - # channel is already in a connected state, and a previous socket process - # was already linked to it. - {:error, <<@invalid_already_stablished>>, data} + {:ok, pid} end end diff --git a/channel-sender/mix.exs b/channel-sender/mix.exs index f2db13a..b5adf5c 100644 --- a/channel-sender/mix.exs +++ b/channel-sender/mix.exs @@ -21,8 +21,13 @@ defmodule ChannelSenderEx.MixProject do # Run "mix help compile.app" to learn about applications. def application do + extra_apps = if Mix.env() == :dev do + [:logger, :telemetry, :observer, :wx, :runtime_tools] + else + [:logger, :telemetry] + end [ - extra_applications: [:logger], + extra_applications: extra_apps, mod: {ChannelSenderEx.Application, []} ] end @@ -39,12 +44,12 @@ defmodule ChannelSenderEx.MixProject do {:gen_state_machine, "~> 3.0"}, {:jason, "~> 1.2"}, {:cors_plug, "~> 3.0"}, - {:horde, "~> 0.8.7"}, + {:horde, "~> 0.9.0"}, {:hackney, "~> 1.20.1", only: :test}, {:plug_crypto, "~> 2.1"}, {:stream_data, "~> 0.4", only: [:test]}, {:gun, "~> 1.3", only: [:test, :benchee]}, - {:libcluster, "~> 3.3"}, + {:libcluster, "~> 3.4.1"}, {:vapor, "~> 0.10.0"}, {:mock, "~> 0.3.0", only: :test}, {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, diff --git a/channel-sender/mix.lock b/channel-sender/mix.lock index af2924b..259938e 100644 --- a/channel-sender/mix.lock +++ b/channel-sender/mix.lock @@ -16,7 +16,7 @@ "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "gun": {:hex, :gun, "1.3.3", "cf8b51beb36c22b9c8df1921e3f2bc4d2b1f68b49ad4fbc64e91875aa14e16b4", [:rebar3], [{:cowlib, "~> 2.7.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "3106ce167f9c9723f849e4fb54ea4a4d814e3996ae243a1c828b256e749041e0"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, - "horde": {:hex, :horde, "0.8.7", "e51ab8e0e5bc7dcd0caa85d84b144cccfde97994bd865d822c7e489746b87e7f", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "835aede887d777542f85e0a88293c18113abcc1356006050ec216da16aa5e0e3"}, + "horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"}, diff --git a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs index 70132f2..c4976a2 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs @@ -223,7 +223,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do assert Map.get(pending_msg, "82") == msg2 end - test "Should not allow multiple socket to one channel process", %{ + test "Should allow new socket to one channel process", %{ port: port, channel: channel, secret: secret @@ -232,7 +232,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do # try to open a new socket connection and link it to the same channel conn2 = connect(port, channel) - assert_receive {:gun_ws, _, _, {:close, 1001, "3009"}}, 500 + assert_receive {:gun_upgrade, _, _, ["websocket"], _}, 500 :gun.close(conn2) end diff --git a/channel-sender/test/channel_sender_ex/core/node_observer_test.exs b/channel-sender/test/channel_sender_ex/core/node_observer_test.exs index 82bc4a3..7a35b23 100644 --- a/channel-sender/test/channel_sender_ex/core/node_observer_test.exs +++ b/channel-sender/test/channel_sender_ex/core/node_observer_test.exs @@ -6,7 +6,11 @@ defmodule ChannelSenderEx.Core.NodeObserverTest do alias ChannelSenderEx.Core.{ChannelRegistry, ChannelSupervisor} setup do - {:ok, pid} = NodeObserver.start_link([]) + {:ok, pid} = + case NodeObserver.start_link([]) do + {:ok, pid} -> {:ok, pid} + {:error, {:already_started, pid}} -> {:ok, pid} + end {:ok, _} = Application.ensure_all_started(:telemetry) @@ -19,16 +23,17 @@ defmodule ChannelSenderEx.Core.NodeObserverTest do test "handles nodeup message", %{pid: pid} do assert capture_log(fn -> - send(pid, {:nodeup, :some_node, :visible}) - :timer.sleep(100) # Allow some time for the message to be processed - end) == "" + send(pid, {:nodeup, :some_node, :visible}) + # Allow some time for the message to be processed + :timer.sleep(100) + end) == "" end test "handles nodedown message", %{pid: pid} do assert capture_log(fn -> - send(pid, {:nodedown, :some_node, :visible}) - :timer.sleep(100) # Allow some time for the message to be processed - end) == "" + send(pid, {:nodedown, :some_node, :visible}) + # Allow some time for the message to be processed + :timer.sleep(100) + end) == "" end - end diff --git a/clients/client-js/src/async-client.ts b/clients/client-js/src/async-client.ts index d2e9ccb..9c0a29b 100644 --- a/clients/client-js/src/async-client.ts +++ b/clients/client-js/src/async-client.ts @@ -1,40 +1,40 @@ -import {JsonDecoder} from "./json-decoder"; -import {MessageDecoder} from "./serializer" -import {ChannelMessage} from "./channel-message"; -import {RetryTimer} from "./retry-timer"; -import {BinaryDecoder} from "./binary-decoder"; -import {Protocol} from "./protocol"; -import {Cache} from "./cache"; -import {Utils} from "./utils"; +import { JsonDecoder } from "./json-decoder"; +import { MessageDecoder } from "./serializer" +import { ChannelMessage } from "./channel-message"; +import { RetryTimer } from "./retry-timer"; +import { BinaryDecoder } from "./binary-decoder"; +import { Protocol } from "./protocol"; +import { Cache } from "./cache"; +import { Utils } from "./utils"; export class AsyncClient { private actualToken; - private socket : WebSocket; + private socket: WebSocket; public isOpen: boolean = false; public isActive: boolean = false; - private stateCallbacks = {open: [], close: [], error: [], message: []} + private stateCallbacks = { open: [], close: [], error: [], message: [] } private bindings = []; private ref = 0; - private pendingHeartbeatRef : string = null; - private closeWasClean = false; + private pendingHeartbeatRef: string = null; + private closeWasClean: boolean = false; private heartbeatTimer = null; - private readonly heartbeatIntervalMs : number; - private tearingDown : boolean = false; - private reconnectTimer : RetryTimer; + private readonly heartbeatIntervalMs: number; + private tearingDown: boolean = false; + private reconnectTimer: RetryTimer; private serializer: MessageDecoder; private subProtocols: string[] = [Protocol.JSON] private cache: Cache; - constructor(private config: AsyncConfig, private readonly transport : any = null) { + constructor(private config: AsyncConfig, private readonly transport: any = null) { const intWindow = typeof window !== "undefined" ? window : null; this.transport = transport || intWindow['WebSocket']; this.heartbeatIntervalMs = config.heartbeat_interval || 750; - this.reconnectTimer = new RetryTimer(() => this.teardown(() => this.connect()), - 50, - (num) => Utils.jitter(num, 0.25)); + this.reconnectTimer = new RetryTimer(() => this.teardown(() => this.connect()), + 50, + (num) => Utils.jitter(num, 0.25), config.maxReconnectAttempts); this.actualToken = config.channel_secret; - if (config.enable_binary_transport && typeof TextDecoder !== "undefined"){ + if (config.enable_binary_transport && typeof TextDecoder !== "undefined") { this.subProtocols.push(Protocol.BINARY) } if (!config.dedupCacheDisable) { @@ -42,29 +42,35 @@ export class AsyncClient { } else { this.cache = undefined; } - + if (intWindow && (config.checkConnectionOnFocus || config.checkConnectionOnFocus === undefined)) { + intWindow.addEventListener('focus', () => { + if (!this.closeWasClean) { + this.connect(); + } + }); + } } - public connect(){ - if (this.socket) { + public connect() { + if (this.socket && this.socket.readyState == SocketState.OPEN) { // TODO: Verify conditions console.debug('async-client. socket already created'); return; } console.debug('async-client. connect() called') this.socket = new this.transport(this.socketUrl(), this.subProtocols); this.socket.binaryType = "arraybuffer"; - this.socket.onopen = (event) => this.onSocketOpen(event) - this.socket.onerror = error => this.onSocketError(error) - this.socket.onmessage = event => this.onSocketMessage(event) - this.socket.onclose = event => this.onSocketClose(event) + this.socket.onopen = (event) => this.onSocketOpen(event) + this.socket.onerror = error => this.onSocketError(error) + this.socket.onmessage = event => this.onSocketMessage(event) + this.socket.onclose = event => this.onSocketClose(event) console.log(`async-client. Subprotocols: ${this.subProtocols}`); } - public listenEvent(eventName : string, callBack : (msg: any) => void) { - this.bindings.push({eventName, callBack}); + public listenEvent(eventName: string, callBack: (msg: any) => void) { + this.bindings.push({ eventName, callBack }); } - public doOnSocketOpen(callback){ + public doOnSocketOpen(callback) { this.stateCallbacks.open.push(callback) } @@ -74,44 +80,44 @@ export class AsyncClient { this.reconnectTimer.reset(); this.resetHeartbeat(); this.socket.send(`Auth::${this.actualToken}`) - this.stateCallbacks.open.forEach((callback) => callback(event) ); + this.stateCallbacks.open.forEach((callback) => callback(event)); } - private selectSerializerForProtocol() : void { - if (this.socket.protocol == Protocol.BINARY){ + private selectSerializerForProtocol(): void { + if (this.socket.protocol == Protocol.BINARY) { this.serializer = new BinaryDecoder(); - }else { + } else { this.serializer = new JsonDecoder(); } } private onSocketError(event) { - // + // console.log('error', event) } private onSocketMessage(event) { const message = this.serializer.decode(event) - if (!this.isActive && message.event == "AuthOk"){ + if (!this.isActive && message.event == "AuthOk") { this.isActive = true; console.log('async-client. Auth OK'); - }else if(message.event == ":hb" && message.correlation_id == this.pendingHeartbeatRef){ + } else if (message.event == ":hb" && message.correlation_id == this.pendingHeartbeatRef) { this.pendingHeartbeatRef = null; - }else if(message.event == ":n_token"){ + } else if (message.event == ":n_token") { this.actualToken = message.payload; this.ackMessage(message); this.handleMessage(message); - }else if (this.isActive){ + } else if (this.isActive) { this.ackMessage(message); this.handleMessage(message); - }else { + } else { const txt = "Unexpected message before AuthOK"; console.error(txt, message); } } - private sendHeartbeat(){ - if(!this.isActive){ return } - if(this.pendingHeartbeatRef){ + private sendHeartbeat() { + if (!this.isActive) { return } + if (this.pendingHeartbeatRef) { this.pendingHeartbeatRef = null const reason = "heartbeat timeout. Attempting to re-establish connection"; console.info(`async-client. ${reason}`) @@ -122,38 +128,39 @@ export class AsyncClient { this.socket.send(`hb::${this.pendingHeartbeatRef}`); } - private abnormalClose(reason){ + private abnormalClose(reason) { this.closeWasClean = false; console.warn(`async-client. Abnormal close: ${reason}`) this.socket.close(1000, reason); } - public disconnect() : void { + public disconnect(): void { + console.info('async-client. disconnect() called') this.closeWasClean = true; this.isActive = false; clearInterval(this.heartbeatTimer); this.reconnectTimer.reset(); this.socket.close(1000, "Client disconnect"); - console.info('async-client. disconnect called') + console.info('async-client. disconnect() called end') } - private makeRef() : string { + private makeRef(): string { const newRef = this.ref + 1 - if(newRef === this.ref){ this.ref = 0 } else { this.ref = newRef } + if (newRef === this.ref) { this.ref = 0 } else { this.ref = newRef } return this.ref.toString() } private handleMessage(message: ChannelMessage) { this.bindings .filter(handler => this.matchHandlerExpr(handler.eventName, message.event)) - .filter(handler => this.deDupFilter(message.message_id)) + .filter(_handler => this.deDupFilter(message.message_id)) .forEach(handler => handler.callBack(message)) } private matchHandlerExpr(eventExpr: string, actualEventName: string): boolean { if (eventExpr === actualEventName) return true; const regexString = '^' + eventExpr.replace(/\*/g, '([^.]+)').replace(/#/g, '([^.]+\\.?)+') + '$'; - return actualEventName.search(regexString) !== -1; + return actualEventName.search(regexString) !== -1; } private deDupFilter(message_id: string): boolean { @@ -168,15 +175,19 @@ export class AsyncClient { } } - private ackMessage(message: ChannelMessage){ - if (this.socket.readyState != SocketState.CLOSING && this.socket.readyState != SocketState.CLOSED) + private ackMessage(message: ChannelMessage) { + if (this.socket.readyState != SocketState.CLOSING && this.socket.readyState != SocketState.CLOSED) { this.socket.send(`Ack::${message.message_id}`); + } } private onSocketClose(event) { - console.warn(`async-client. channel close: ${event.code}`); + console.warn(`async-client. channel close: ${event.code} ${event.reason}`); clearInterval(this.heartbeatTimer) - if(!this.closeWasClean && event.code != 4403){ + const reason = this.extractReason(event.reason); + const shouldRetry = event.code > 1001 || (event.code == 1001 && reason >= 3050); + + if (!this.closeWasClean && shouldRetry && event.reason != 'Invalid token for channel') { console.log(`async-client. Scheduling reconnect, clean: ${this.closeWasClean}`) this.reconnectTimer.schedule() } else { @@ -185,43 +196,48 @@ export class AsyncClient { } - private resetHeartbeat() : void { + private extractReason(reason): number { + const reasonNumber = parseInt(reason); + if (isNaN(reasonNumber)) { + return 0; + } + return reasonNumber; + } + + private resetHeartbeat(): void { this.pendingHeartbeatRef = null clearInterval(this.heartbeatTimer) this.heartbeatTimer = setInterval(() => this.sendHeartbeat(), this.heartbeatIntervalMs) } //Testing Only - public rawSocket() : WebSocket { + public rawSocket(): WebSocket { return this.socket; } - public getDecoder() : MessageDecoder { + public getDecoder(): MessageDecoder { return this.serializer; } - private socketUrl() : string { + private socketUrl(): string { return `${this.config.socket_url}?channel=${this.config.channel_ref}`; } private teardown(callback?: () => void): void { - if(this.tearingDown) return; + if (this.tearingDown) return; this.tearingDown = true; - if(!this.socket) { + if (!this.socket) { this.tearingDown = false; return callback && callback(); } - if (this.socket && this.socket.readyState != SocketState.CLOSED && this.socket.readyState != SocketState.CLOSING){ + if (this.socket && this.socket.readyState != SocketState.CLOSED && this.socket.readyState != SocketState.CLOSING) { this.socket.close(); } this.waitForSocketClosed(() => { if (this.socket) { - // if(this.socket.readyState != SocketState.CLOSED) { - // console.log("#DBG8", `Socket maybe open after wait close ${this.socket.readyState}`) - // } - this.socket.onclose = function(){} // noop + this.socket.onclose = function () { } // noop this.socket = null } this.tearingDown = false; @@ -230,13 +246,13 @@ export class AsyncClient { } - private waitForSocketClosed(callback? : () => void, tries = 1) : void { + private waitForSocketClosed(callback?: () => void, tries = 1): void { if (tries === 5 || !this.socket || this.socket.readyState === SocketState.CLOSED) { callback(); return } - setTimeout(() => this.waitForSocketClosed(callback, tries + 1),150 * tries) + setTimeout(() => this.waitForSocketClosed(callback, tries + 1), 150 * tries) } @@ -249,15 +265,17 @@ enum SocketState { CLOSED } -export interface AsyncConfig{ +export interface AsyncConfig { socket_url: string; - channel_ref:string; + channel_ref: string; channel_secret: string; enable_binary_transport?: boolean; - heartbeat_interval? : number; - dedupCacheDisable? : boolean; - dedupCacheMaxSize? : number; - dedupCacheTtl? : number; + heartbeat_interval?: number; + dedupCacheDisable?: boolean; + dedupCacheMaxSize?: number; + dedupCacheTtl?: number; + maxReconnectAttempts?: number; + checkConnectionOnFocus?: boolean; } diff --git a/clients/client-js/src/retry-timer.ts b/clients/client-js/src/retry-timer.ts index 21f77bc..1feb8e5 100644 --- a/clients/client-js/src/retry-timer.ts +++ b/clients/client-js/src/retry-timer.ts @@ -1,37 +1,44 @@ -import {Utils} from "./utils"; +import { Utils } from "./utils"; +const MAX_DELAY = 6000; export class RetryTimer { - private timer : number; + private timer: number; private tries: number = 0; constructor( private callback: () => void, - private initial: number = 10, - private jitterFn: (x: number) => number) { + private initial: number = 10, + private jitterFn: (x: number) => number, + private maxRetries: number = 10) { } - public reset() : void { + public reset(): void { clearTimeout(this.timer) this.tries = 0; } - public schedule() : void { + public schedule(): void { if (typeof window !== 'undefined') { + const delayMS = this.delay(); + console.log(`async-client. scheduling retry in ${delayMS} ms`); this.timer = window.setTimeout(() => { this.tries = this.tries + 1; - this.callback(); - }, this.delay()) + if (this.tries <= this.maxRetries) { + console.log(`async-client. retrying ${this.tries} of ${this.maxRetries}`); + this.callback(); + } + }, delayMS) } else { console.warn(`async-client. could not setup scheduler for rety: window is undefined.`); } } - private delay() : number { + private delay(): number { if (this.jitterFn === undefined) { - return Utils.expBackoff(this.initial, 6000, this.tries, (num) => Utils.jitter(num, 0.25)); + return Utils.expBackoff(this.initial, MAX_DELAY, this.tries, (num) => Utils.jitter(num, 0.25)); } else { - return Utils.expBackoff(this.initial, 6000, this.tries, this.jitterFn); + return Utils.expBackoff(this.initial, MAX_DELAY, this.tries, this.jitterFn); } } diff --git a/clients/client-js/test/async-client.test.ts b/clients/client-js/test/async-client.test.ts index d8144b2..8f828b7 100644 --- a/clients/client-js/test/async-client.test.ts +++ b/clients/client-js/test/async-client.test.ts @@ -1,17 +1,18 @@ import * as chai from 'chai'; -import {AsyncClient, AsyncConfig} from "../src/async-client"; -import {Server, WebSocket} from 'mock-socket'; +import { AsyncClient, AsyncConfig } from "../src/async-client"; +import { Server, WebSocket } from 'mock-socket'; -import {ChannelMessage} from "../src/channel-message"; -import {JsonDecoder} from "../src/json-decoder"; -import {BinaryDecoder} from "../src/binary-decoder"; +import { ChannelMessage } from "../src/channel-message"; +import { JsonDecoder } from "../src/json-decoder"; +import { BinaryDecoder } from "../src/binary-decoder"; import "fast-text-encoding" -import {Protocol} from "../src/protocol"; +import { Protocol } from "../src/protocol"; const assert = chai.assert; +const TIMEOUT = 10000; -function timeout(millis : number) : Promise { +function timeout(millis: number): Promise { return new Promise(resolve => { // @ts-ignore setTimeout(resolve, millis, "timeout"); @@ -19,9 +20,9 @@ function timeout(millis : number) : Promise { } -describe('Async client Tests', function() { +describe('Async client Tests', function () { let mockServer; - let client : AsyncClient; + let client: AsyncClient; let config: AsyncConfig = { socket_url: "wss://host.local/socket", channel_ref: "ab771f3434aaghjgr", @@ -41,26 +42,26 @@ describe('Async client Tests', function() { client.disconnect(); }); - it('Should try to connect with correct url' , () => { + it('Should try to connect with correct url', () => { client.connect(); assert.equal(client.rawSocket().url, "wss://host.local/socket?channel=ab771f3434aaghjgr"); client.disconnect(); }); - it('Should notify socket connect' , async() => { + it('Should notify socket connect', async () => { client.connect(); const isOpen = await new Promise(resolve => client.doOnSocketOpen((event) => resolve(client.isOpen))); assert.isTrue(isOpen); client.disconnect(); }); - it('Should authenticate with server and route message' , async() => { + it('Should authenticate with server and route message', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "person.registered", "CC111222"]'); - }else { + } else { socket.send('["", "", "NoAuth", ""]'); } }); @@ -78,13 +79,13 @@ describe('Async client Tests', function() { }); - it('Should send ack on message' , async() => { + it('Should send ack on message', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "person.registered", "CC111222"]'); - }else if (data == 'Ack::12'){ + } else if (data == 'Ack::12') { socket.send('["14", "", "ack.reply.ok", "ok"]'); } }); @@ -101,9 +102,9 @@ describe('Async client Tests', function() { }); -describe('Async client No Dedup Tests', function() { +describe('Async client No Dedup Tests', function () { let mockServer; - let client : AsyncClient; + let client: AsyncClient; let config: AsyncConfig = { socket_url: "wss://host.local/socket", channel_ref: "ab771f3434aaghjgr", @@ -124,21 +125,23 @@ describe('Async client No Dedup Tests', function() { client.disconnect(); }); - it('Should authenticate with server and route duplicated message' , async() => { + it('Should authenticate with server and route duplicated message', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["333444", "001", "person.registered", "CC111222"]'); socket.send('["333444", "002", "person.registered", "CC111222"]'); socket.send('["444444", "003", "person.registered", "CC983979"]'); - }else { + } else { socket.send('["", "", "NoAuth", ""]'); } }); }); - assert.isFalse(client.isActive); + const clientActive: boolean = client.isActive; + + assert.isFalse(clientActive); const message = new Promise(resolve => { var counter = 0; client.listenEvent("person.registered", message => { @@ -158,7 +161,7 @@ describe('Async client No Dedup Tests', function() { }); -describe('Async Reconnection Tests', () => { +describe('Async Reconnection Tests', () => { let config = { socket_url: "wss://reconnect.local:8984/socket", @@ -166,14 +169,14 @@ describe('Async Reconnection Tests', () => { channel_secret: "secret234342432dsfghjikujyg1221", heartbeat_interval: 200 }; - - it('Should ReConnect when server closes the socket' , async() => { + + it('Should ReConnect when server closes the socket', async () => { let mockServer = new Server("wss://reconnect.local:8984/socket"); - let client : AsyncClient = new AsyncClient(config, WebSocket); + let client: AsyncClient = new AsyncClient(config, WebSocket); mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "person.registered", "CC111222"]'); } @@ -186,38 +189,38 @@ describe('Async Reconnection Tests', () => { const result = await message; assert.equal(result, "CC111222"); - mockServer.close(); - mockServer.stop(); - - // @ts-ignore - await timeout(200); + mockServer.close({ code: 1006, reason: "Server closed", wasClean: false }); + mockServer.stop(() => { + console.log("Server stopped"); + }); const newData = new Promise(resolve => client.listenEvent("person.registered2", message => resolve(message.payload))); mockServer = new Server("wss://reconnect.local:8984/socket"); mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); // @ts-ignore setTimeout(() => socket.send('["120", "", "person.registered2", "CC1112223"]'), 200) } }); }); - + const message2 = await newData; assert.equal(message2, "CC1112223"); client.disconnect(); mockServer.close(); mockServer.stop(); - }); + console.log("Done"); + }).timeout(TIMEOUT); - it('Should ReConnect when no heartbeat' , async() => { + it('Should ReConnect when no heartbeat', async () => { config.socket_url = "wss://reconnect.local:8987/socket"; let mockServer = new Server(config.socket_url); - let client : AsyncClient = new AsyncClient(config, WebSocket); + let client: AsyncClient = new AsyncClient(config, WebSocket); let respondBeat = false; let socketSender; let connectCount = 0; @@ -227,12 +230,12 @@ describe('Async Reconnection Tests', () => { socket.on('message', raw_data => { if (typeof raw_data == "string") { let data = String(raw_data); - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { connectCount = connectCount + 1; socket.send('["", "", "AuthOk", ""]'); // @ts-ignore setTimeout(() => socket.send('["12", "", "person.registered", "CC111222"]'), 200) - }else if (data.startsWith("hb::") && respondBeat){ + } else if (data.startsWith("hb::") && respondBeat) { let correlation = data.split("::")[1]; socket.send(`["", ${correlation}, ":hb", ""]`); } @@ -244,7 +247,7 @@ describe('Async Reconnection Tests', () => { await timeout(600); respondBeat = true; - const lastCount = connectCount; + const lastCount = connectCount; // @ts-ignore console.log("Count", connectCount); @@ -263,7 +266,7 @@ describe('Async Reconnection Tests', () => { }); -describe('Refresh token Tests', () => { +describe('Refresh token Tests', () => { let config = { socket_url: "wss://reconnect.local:8985/socket", @@ -273,14 +276,14 @@ describe('Refresh token Tests', () => { }; - it('Should ReConnect with new token' , async() => { + it('Should ReConnect with new token', async () => { let mockServer = new Server(config.socket_url); - let client : AsyncClient = new AsyncClient(config, WebSocket); + let client: AsyncClient = new AsyncClient(config, WebSocket); // let socketSender; mockServer.on('connection', socket => { // socketSender = socket; socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["01", "", ":n_token", "new_token_secret12243"]'); socket.send('["02", "", "person.registered", "CC10202029"]'); @@ -294,11 +297,9 @@ describe('Refresh token Tests', () => { const result = await message; console.log(`>>>>> result is: ${result}`); - mockServer.close(); + mockServer.close({ code: 1006, reason: "Server closed", wasClean: false }); mockServer.stop(); - // @ts-ignore - await timeout(200); config.channel_secret = "new_token_secret12243"; const newData = new Promise(resolve => client.listenEvent("person.registered2", message => resolve(message.payload))); @@ -307,13 +308,13 @@ describe('Refresh token Tests', () => { socket.on('message', raw_data => { if (typeof raw_data == "string") { let data = String(raw_data); - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "person.registered2", "CC1112223"]'); - }else if (data.startsWith("Auth::")){ + } else if (data.startsWith("Auth::")) { // @ts-ignore console.log("Credenciales no validas"); - mockServer.close({code: 4403, reason: "Invalid auth", wasClean: true}) + mockServer.close({ code: 4403, reason: "Invalid auth", wasClean: true }) } } }); @@ -330,7 +331,7 @@ describe('Refresh token Tests', () => { }); -describe('Protocol negotiation Tests', function() { +describe('Protocol negotiation Tests', function () { let client: AsyncClient; let mockServer: Server; let baseConf = { @@ -344,8 +345,9 @@ describe('Protocol negotiation Tests', function() { mockServer.stop(() => done()); }); - it('Should use Json decoder when specified' , async() => { - let config = {...baseConf, + it('Should use Json decoder when specified', async () => { + let config = { + ...baseConf, enable_binary_transport: false }; initServer((protocols) => { @@ -357,8 +359,9 @@ describe('Protocol negotiation Tests', function() { assert.instanceOf(decoder, JsonDecoder); }); - it('Should use binary protocol when available' , async() => { - let config = {...baseConf, + it('Should use binary protocol when available', async () => { + let config = { + ...baseConf, enable_binary_transport: true }; initServer((protocols) => { @@ -370,8 +373,9 @@ describe('Protocol negotiation Tests', function() { assert.instanceOf(decoder, BinaryDecoder); }); - it('Should fallback to json protocol when server select it' , async() => { - let config = {...baseConf, + it('Should fallback to json protocol when server select it', async () => { + let config = { + ...baseConf, enable_binary_transport: true }; initServer((protocols) => { @@ -383,8 +387,9 @@ describe('Protocol negotiation Tests', function() { assert.instanceOf(decoder, JsonDecoder); }); - it('Should fallback to Json decoder when Binary decoder is not available' , async() => { - let config = {...baseConf, + it('Should fallback to Json decoder when Binary decoder is not available', async () => { + let config = { + ...baseConf, enable_binary_transport: true }; @@ -411,7 +416,7 @@ describe('Protocol negotiation Tests', function() { }) }; - let connectAndGetDecoderSelected = async(config) => { + let connectAndGetDecoderSelected = async (config) => { client = new AsyncClient(config, WebSocket); const connected = new Promise(resolve => client.doOnSocketOpen(() => resolve(true))); client.connect(); @@ -421,9 +426,9 @@ describe('Protocol negotiation Tests', function() { } }); -describe('Event handler matching Tests', function() { +describe('Event handler matching Tests', function () { let mockServer; - let client : AsyncClient; + let client: AsyncClient; let config = { socket_url: "wss://host.local/socket", channel_ref: "ab771f3434aaghjgr", @@ -443,13 +448,13 @@ describe('Event handler matching Tests', function() { client.disconnect(); }); - it('Should match direct equality' , async() => { + it('Should match direct equality', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "quick.orange.rabbit", "Hi, There"]'); - }else { + } else { socket.send('["", "", "NoAuth", ""]'); } }); @@ -467,13 +472,13 @@ describe('Event handler matching Tests', function() { client.disconnect(); }); - it('Should match single word wildcard I' , async() => { + it('Should match single word wildcard I', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "quick.orange.rabbit", "Hi, There Rabbit"]'); - }else { + } else { socket.send('["", "", "NoAuth", ""]'); } }); @@ -492,13 +497,13 @@ describe('Event handler matching Tests', function() { client.disconnect(); }); - it('Should match single word wildcard II' , async() => { + it('Should match single word wildcard II', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "lazy.brown.fox", "Hi, There Fox"]'); - }else { + } else { socket.send('["", "", "NoAuth", ""]'); } }); @@ -517,13 +522,13 @@ describe('Event handler matching Tests', function() { client.disconnect(); }); - it('Should match single word wildcard III' , async() => { + it('Should match single word wildcard III', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "lazy.orange.elephant", "Hi, There Elephant"]'); - }else { + } else { socket.send('["", "", "NoAuth", ""]'); } }); @@ -542,14 +547,14 @@ describe('Event handler matching Tests', function() { client.disconnect(); }); - it('Should match multi word wildcard' , async() => { + it('Should match multi word wildcard', async () => { mockServer.on('connection', socket => { socket.on('message', data => { - if (data == `Auth::${config.channel_secret}`){ + if (data == `Auth::${config.channel_secret}`) { socket.send('["", "", "AuthOk", ""]'); socket.send('["12", "", "quick.white.male.bird", "Hi, There Male Bird"]'); socket.send('["12", "", "quick.orange.rabbit", "Hi, There Rabbit"]'); - }else { + } else { socket.send('["", "", "NoAuth", ""]'); } });