From 0e3efb7b6e48399e17669d6394e224cbaa34ec09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Sat, 1 Feb 2025 10:24:17 +0100 Subject: [PATCH] Fix teams websocket connection not reconnecting after a graceful close (#2924) --- lib/livebook/teams/connection.ex | 44 +++++++++++++++++++++----------- lib/livebook/teams/web_socket.ex | 15 ++++++----- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/lib/livebook/teams/connection.ex b/lib/livebook/teams/connection.ex index 61e6151cd09..c08fc2e7c9e 100644 --- a/lib/livebook/teams/connection.ex +++ b/lib/livebook/teams/connection.ex @@ -45,20 +45,17 @@ defmodule Livebook.Teams.Connection do send(data.listener, :connected) send(self(), {:loop_ping, ref}) Logger.info("Teams WebSocket connection - established") - {:keep_state, %{data | http_conn: conn, ref: ref, websocket: websocket}} {:transport_error, reason} -> send(data.listener, {:connection_error, reason}) Logger.warning("Teams WebSocket connection - transport error: #{inspect(reason)}") - {:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}} {:server_error, error} -> reason = LivebookProto.Error.decode(error).details send(data.listener, {:server_error, reason}) - Logger.warning("Teams WebSocket connection - server error : #{inspect(reason)}") - + Logger.warning("Teams WebSocket connection - server error: #{inspect(reason)}") {:keep_state, data} end end @@ -73,9 +70,12 @@ defmodule Livebook.Teams.Connection do Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay) {:keep_state, %{data | http_conn: conn, websocket: websocket}} - {:error, conn, websocket, _reason} -> - Logger.warning("Teams WebSocket connection - ping error") - {:keep_state, %{data | http_conn: conn, websocket: websocket}} + {:error, conn, websocket, reason} -> + data = %{data | http_conn: conn, websocket: websocket} + send(data.listener, {:connection_error, reason}) + Logger.warning("Teams WebSocket connection - ping error: #{inspect(reason)}") + ensure_closed(data) + {:keep_state, data, {:next_event, :internal, :connect}} end end @@ -106,7 +106,8 @@ defmodule Livebook.Teams.Connection do data = %{data | http_conn: conn, websocket: websocket} send(data.listener, {:connection_error, reason}) :gen_statem.reply(from, {:error, reason}) - + Logger.warning("Teams WebSocket connection - send error: #{inspect(reason)}") + ensure_closed(data) {:keep_state, data, {:next_event, :internal, :connect}} end end @@ -117,19 +118,32 @@ defmodule Livebook.Teams.Connection do case WebSocket.receive(data.http_conn, data.ref, data.websocket, message) do {:ok, conn, websocket, binaries} -> data = %{data | http_conn: conn, websocket: websocket} - - for binary <- binaries do - %{type: {topic, message}} = LivebookProto.Event.decode(binary) - send(data.listener, {:event, topic, message}) - end - + handle_messages(data, binaries) {:keep_state, data} + {:closed, conn, websocket, binaries} -> + handle_messages(data, binaries) + data = %{data | http_conn: conn, websocket: websocket} + Logger.warning("Teams WebSocket connection - closed") + {:keep_state, data, {:next_event, :internal, :connect}} + {:error, conn, websocket, reason} -> send(data.listener, {:connection_error, reason}) data = %{data | http_conn: conn, websocket: websocket} - + Logger.warning("Teams WebSocket connection - receive error: #{inspect(reason)}") + ensure_closed(data) {:keep_state, data, {:next_event, :internal, :connect}} end end + + defp handle_messages(data, binaries) do + for binary <- binaries do + %{type: {topic, message}} = LivebookProto.Event.decode(binary) + send(data.listener, {:event, topic, message}) + end + end + + defp ensure_closed(data) do + _ = WebSocket.disconnect(data.http_conn, data.websocket, data.ref) + end end diff --git a/lib/livebook/teams/web_socket.ex b/lib/livebook/teams/web_socket.ex index cfc65197efa..cb7ed7090a1 100644 --- a/lib/livebook/teams/web_socket.ex +++ b/lib/livebook/teams/web_socket.ex @@ -134,6 +134,7 @@ defmodule Livebook.Teams.WebSocket do """ @spec receive(conn(), ref(), websocket(), term()) :: {:ok, conn(), websocket(), list(binary())} + | {:closed, conn(), websocket(), list(binary())} | {:error, conn(), websocket(), String.t()} def receive(conn, ref, websocket, message \\ receive(do: (message -> message))) do with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(conn, message), @@ -142,19 +143,21 @@ defmodule Livebook.Teams.WebSocket do {:ok, conn, websocket, response} else {:close, response} -> - handle_disconnect(conn, websocket, ref, response) + with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do + {:closed, conn, websocket, response} + end {:error, conn, exception} when is_exception(exception) -> {:error, conn, websocket, Exception.message(exception)} {:error, conn, exception, []} when is_exception(exception) -> {:error, conn, websocket, Exception.message(exception)} - end - end - defp handle_disconnect(conn, websocket, ref, response) do - with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do - {:ok, conn, websocket, response} + :unknown -> + # Message does not belong to this socket. For example, this + # can be a leftover :tcp_close or :ssl_close from a previously + # gracefully closed socket. + {:ok, conn, websocket, []} end end