Skip to content

Commit

Permalink
Reconnection backoff & heartbeat mechanism (#1346)
Browse files Browse the repository at this point in the history
* connection backoff and wake up

heartbeat mechanism

unit test the heartbeat mechanism

remove max_connection_attempts

add code_change/4 to add the missing fields in the state

* close the socket when disconnecting, cap at 6 hours for now

* add an assertion on transport closed

* stop incrementing the attempts to avoid overflow
  • Loading branch information
bchamagne authored Oct 4, 2024
1 parent 9c6ee55 commit e6f49d0
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 12 deletions.
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ config :archethic, Archethic.P2P.GeoPatch.GeoIP, MockGeoIP

config :archethic, Archethic.P2P.BootstrappingSeeds, enabled: false

config :archethic, Archethic.P2P.Client.Connection,
backoff_strategy: :static,
heartbeat_interval: 200,
reconnect_delay: 50

config :archethic, Archethic.Mining.PendingTransactionValidation, validate_node_ip: true

config :archethic, Archethic.Metrics.Poller, enabled: false
Expand Down
132 changes: 122 additions & 10 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,22 @@ defmodule Archethic.P2P.Client.Connection do
require Logger

use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary
@vsn 1
@vsn 2
@table_name :connection_status
@max_reconnect_delay :timer.hours(6)

@heartbeat_interval Keyword.get(
Application.compile_env(:archethic, __MODULE__, []),
:heartbeat_interval,
10_000
)
@reconnect_delay Keyword.get(
Application.compile_env(:archethic, __MODULE__, []),
:reconnect_delay,
500
)
# we cap the attemps to avoid doing 2 ** BIGNUM
@max_attempts (@max_reconnect_delay / @reconnect_delay) |> :math.log2() |> :math.ceil()

@doc """
Starts a new connection
Expand Down Expand Up @@ -59,6 +73,18 @@ defmodule Archethic.P2P.Client.Connection do
end
end

@doc """
When called, if disconnect, it will try to connect to socket
Noop if it's already connected
It's used when some node has been offline for a long time
It has connected to us so we know we can connect to it as well
"""
@spec wake_up(Crypto.key()) :: :ok
def wake_up(public_key) do
GenStateMachine.cast(via_tuple(public_key), :wake_up)
end

@doc """
Get the availability timer and reset it with a new start time if it was already started
"""
Expand Down Expand Up @@ -102,7 +128,10 @@ defmodule Archethic.P2P.Client.Connection do
request_id: 0,
messages: %{},
send_tasks: %{},
availability_timer: {nil, 0}
availability_timer: {nil, 0},
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
}

{:ok, :initializing, data, [{:next_event, :internal, {:connect, from}}]}
Expand Down Expand Up @@ -190,7 +219,7 @@ defmodule Archethic.P2P.Client.Connection do
end)

# Reconnect with backoff
actions = [{{:timeout, :reconnect}, 500, nil} | actions]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil} | actions]
{:keep_state, new_data, actions}
end

Expand All @@ -204,20 +233,25 @@ defmodule Archethic.P2P.Client.Connection do

# Start availability timer
new_data =
Map.update!(data, :availability_timer, fn
data
|> Map.put(:reconnect_attempts, 0)
|> Map.put(:heartbeats_sent, 0)
|> Map.put(:heartbeats_received, 0)
|> Map.update!(:availability_timer, fn
{nil, time} ->
{System.monotonic_time(:second), time}

timer ->
timer
end)

Process.send_after(self(), :heartbeat, @heartbeat_interval)

{:keep_state, new_data}
end

def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data

# called from the :disconnected or :initializing state
def handle_event(
Expand Down Expand Up @@ -258,9 +292,11 @@ defmodule Archethic.P2P.Client.Connection do
end

# this message is used to delay next connection attempt
def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do
def handle_event({:timeout, :reconnect}, _event_data, _state, data) do
actions = [{:next_event, :internal, {:connect, nil}}]
{:keep_state_and_data, actions}

new_data = Map.update!(data, :reconnect_attempts, &min(@max_attempts, &1 + 1))
{:keep_state, new_data, actions}
end

def handle_event(
Expand All @@ -273,6 +309,25 @@ defmodule Archethic.P2P.Client.Connection do
:keep_state_and_data
end

def handle_event(
:cast,
:wake_up,
:disconnected,
data
) do
actions = [{:next_event, :internal, {:connect, nil}}]
{:keep_state, %{data | reconnect_attempts: 0}, actions}
end

def handle_event(
:cast,
:wake_up,
_,
_data
) do
:keep_state_and_data
end

def handle_event(
:cast,
{:send_message, ref, from, message, timeout},
Expand Down Expand Up @@ -381,6 +436,36 @@ defmodule Archethic.P2P.Client.Connection do
end
end

def handle_event(
:info,
:heartbeat,
{:connected, socket},
data = %{
transport: transport,
heartbeats_sent: heartbeats_sent,
heartbeats_received: heartbeats_received
}
) do
# disconnect if missed more than 2 heartbeats
if heartbeats_sent - heartbeats_received >= 2 do
transport.handle_close(socket)
{:next_state, :disconnected, data}
else
transport.handle_send(socket, "hb")
Process.send_after(self(), :heartbeat, @heartbeat_interval)
{:keep_state, %{data | heartbeats_sent: heartbeats_sent + 1}}
end
end

def handle_event(
:info,
:heartbeat,
_state,
_data
) do
:keep_state_and_data
end

def handle_event(:info, {ref, :ok}, {:connected, _socket}, data = %{send_tasks: send_tasks}) do
case Map.pop(send_tasks, ref) do
{nil, _} ->
Expand Down Expand Up @@ -440,13 +525,13 @@ defmodule Archethic.P2P.Client.Connection do

# Task.async sending us the result of the handle_connect
def handle_event(:info, {_ref, {:error, _reason, nil}}, _, data) do
actions = [{{:timeout, :reconnect}, 500, nil}]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}]
{:next_state, :disconnected, data, actions}
end

def handle_event(:info, {_ref, {:error, reason, from}}, _, data) do
send(from, {:error, reason})
actions = [{{:timeout, :reconnect}, 500, nil}]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}]
{:next_state, :disconnected, data, actions}
end

Expand All @@ -456,7 +541,8 @@ defmodule Archethic.P2P.Client.Connection do
{:connected, _socket},
data = %{
transport: transport,
node_public_key: node_public_key
node_public_key: node_public_key,
heartbeats_received: heartbeats_received
}
) do
case transport.handle_message(event) do
Expand All @@ -467,6 +553,9 @@ defmodule Archethic.P2P.Client.Connection do

{:next_state, :disconnected, data}

{:ok, "hb"} ->
{:keep_state, %{data | heartbeats_received: heartbeats_received + 1}}

{:ok, msg} ->
set_node_connected(node_public_key)

Expand Down Expand Up @@ -540,5 +629,28 @@ defmodule Archethic.P2P.Client.Connection do
:ets.delete(@table_name, node_public_key)
end

def code_change(1, state, data, _extra) do
{:ok, state,
data
|> Map.merge(%{
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
})}
end

def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data}

defp backoff(attempts) do
config = Application.get_env(:archethic, __MODULE__, [])

case Keyword.get(config, :backoff_strategy, :exponential) do
:static ->
@reconnect_delay

:exponential ->
# cap at a few hours
min(@max_reconnect_delay, 2 ** attempts * @reconnect_delay)
end
end
end
1 change: 1 addition & 0 deletions lib/archethic/p2p/client/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ defmodule Archethic.P2P.Client.Transport do
@callback handle_connect(:inet.ip_address(), :inet.port_number()) :: {:ok, :inet.socket()}
@callback handle_message(tuple()) :: {:ok, binary()} | {:error, :closed} | {:error, any()}
@callback handle_send(:inet.socket(), binary()) :: :ok
@callback handle_close(:inet.socket()) :: :ok
end
5 changes: 5 additions & 0 deletions lib/archethic/p2p/client/transport/tcp_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ defmodule Archethic.P2P.Client.Transport.TCPImpl do
:gen_tcp.connect(ip, port, @options, 4000)
end

@impl Transport
def handle_close(socket) do
:gen_tcp.close(socket)
end

@impl Transport
def handle_message({:tcp, socket, data}) do
:inet.setopts(socket, active: :once)
Expand Down
18 changes: 18 additions & 0 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.P2P.ListenerProtocol do

alias Archethic.Crypto
alias Archethic.P2P
alias Archethic.P2P.Client.Connection
alias Archethic.P2P.Message
alias Archethic.P2P.MessageEnvelop
alias Archethic.TaskSupervisor
Expand All @@ -35,6 +36,19 @@ defmodule Archethic.P2P.ListenerProtocol do
})
end

def handle_info(
{_transport, socket, "hb"},
state = %{transport: transport}
) do
:inet.setopts(socket, active: :once)

Task.Supervisor.start_child(TaskSupervisor, fn ->
transport.send(socket, "hb")
end)

{:noreply, state}
end

def handle_info(
{_transport, socket, err},
state = %{transport: transport, ip: ip}
Expand All @@ -45,6 +59,7 @@ defmodule Archethic.P2P.ListenerProtocol do
end

transport.close(socket)

{:noreply, state}
end

Expand Down Expand Up @@ -84,6 +99,9 @@ defmodule Archethic.P2P.ListenerProtocol do
)

if valid_signature? do
# we may attempt to wakeup a connection that offline
Connection.wake_up(sender_pkey)

message
|> process_msg(sender_pkey)
|> encode_response(message_id, sender_pkey)
Expand Down
Loading

0 comments on commit e6f49d0

Please sign in to comment.