Skip to content

Commit

Permalink
Fix teams websocket connection not reconnecting after a graceful close (
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko authored Feb 1, 2025
1 parent 0622e0a commit 0e3efb7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
44 changes: 29 additions & 15 deletions lib/livebook/teams/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,17 @@ defmodule Livebook.Teams.Connection do
send(data.listener, :connected)
send(self(), {:loop_ping, ref})
Logger.info("Teams WebSocket connection - established")

{:keep_state, %{data | http_conn: conn, ref: ref, websocket: websocket}}

{:transport_error, reason} ->
send(data.listener, {:connection_error, reason})
Logger.warning("Teams WebSocket connection - transport error: #{inspect(reason)}")

{:keep_state_and_data, {{:timeout, :backoff}, @backoff, nil}}

{:server_error, error} ->
reason = LivebookProto.Error.decode(error).details
send(data.listener, {:server_error, reason})
Logger.warning("Teams WebSocket connection - server error : #{inspect(reason)}")

Logger.warning("Teams WebSocket connection - server error: #{inspect(reason)}")
{:keep_state, data}
end
end
Expand All @@ -73,9 +70,12 @@ defmodule Livebook.Teams.Connection do
Process.send_after(self(), {:loop_ping, data.ref}, @loop_ping_delay)
{:keep_state, %{data | http_conn: conn, websocket: websocket}}

{:error, conn, websocket, _reason} ->
Logger.warning("Teams WebSocket connection - ping error")
{:keep_state, %{data | http_conn: conn, websocket: websocket}}
{:error, conn, websocket, reason} ->
data = %{data | http_conn: conn, websocket: websocket}
send(data.listener, {:connection_error, reason})
Logger.warning("Teams WebSocket connection - ping error: #{inspect(reason)}")
ensure_closed(data)
{:keep_state, data, {:next_event, :internal, :connect}}
end
end

Expand Down Expand Up @@ -106,7 +106,8 @@ defmodule Livebook.Teams.Connection do
data = %{data | http_conn: conn, websocket: websocket}
send(data.listener, {:connection_error, reason})
:gen_statem.reply(from, {:error, reason})

Logger.warning("Teams WebSocket connection - send error: #{inspect(reason)}")
ensure_closed(data)
{:keep_state, data, {:next_event, :internal, :connect}}
end
end
Expand All @@ -117,19 +118,32 @@ defmodule Livebook.Teams.Connection do
case WebSocket.receive(data.http_conn, data.ref, data.websocket, message) do
{:ok, conn, websocket, binaries} ->
data = %{data | http_conn: conn, websocket: websocket}

for binary <- binaries do
%{type: {topic, message}} = LivebookProto.Event.decode(binary)
send(data.listener, {:event, topic, message})
end

handle_messages(data, binaries)
{:keep_state, data}

{:closed, conn, websocket, binaries} ->
handle_messages(data, binaries)
data = %{data | http_conn: conn, websocket: websocket}
Logger.warning("Teams WebSocket connection - closed")
{:keep_state, data, {:next_event, :internal, :connect}}

{:error, conn, websocket, reason} ->
send(data.listener, {:connection_error, reason})
data = %{data | http_conn: conn, websocket: websocket}

Logger.warning("Teams WebSocket connection - receive error: #{inspect(reason)}")
ensure_closed(data)
{:keep_state, data, {:next_event, :internal, :connect}}
end
end

defp handle_messages(data, binaries) do
for binary <- binaries do
%{type: {topic, message}} = LivebookProto.Event.decode(binary)
send(data.listener, {:event, topic, message})
end
end

defp ensure_closed(data) do
_ = WebSocket.disconnect(data.http_conn, data.websocket, data.ref)
end
end
15 changes: 9 additions & 6 deletions lib/livebook/teams/web_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ defmodule Livebook.Teams.WebSocket do
"""
@spec receive(conn(), ref(), websocket(), term()) ::
{:ok, conn(), websocket(), list(binary())}
| {:closed, conn(), websocket(), list(binary())}
| {:error, conn(), websocket(), String.t()}
def receive(conn, ref, websocket, message \\ receive(do: (message -> message))) do
with {:ok, conn, [{:data, ^ref, data}]} <- Mint.WebSocket.stream(conn, message),
Expand All @@ -142,19 +143,21 @@ defmodule Livebook.Teams.WebSocket do
{:ok, conn, websocket, response}
else
{:close, response} ->
handle_disconnect(conn, websocket, ref, response)
with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do
{:closed, conn, websocket, response}
end

{:error, conn, exception} when is_exception(exception) ->
{:error, conn, websocket, Exception.message(exception)}

{:error, conn, exception, []} when is_exception(exception) ->
{:error, conn, websocket, Exception.message(exception)}
end
end

defp handle_disconnect(conn, websocket, ref, response) do
with {:ok, conn, websocket} <- disconnect(conn, websocket, ref) do
{:ok, conn, websocket, response}
:unknown ->
# Message does not belong to this socket. For example, this
# can be a leftover :tcp_close or :ssl_close from a previously
# gracefully closed socket.
{:ok, conn, websocket, []}
end
end

Expand Down

0 comments on commit 0e3efb7

Please sign in to comment.