Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Minor Connection.Manager and Consumer cleanup and flake reduction #2305

Merged
merged 9 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 40 additions & 41 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ defmodule Electric.Connection.Manager do
]
end

defmodule ConnectionBackoff do
defstruct [
:backoff,
:retries_started_at,
:timer_ref
]
end

use GenServer

require Logger
Expand Down Expand Up @@ -193,7 +185,11 @@ defmodule Electric.Connection.Manager do
timeline_opts: timeline_opts,
shape_cache_opts: shape_cache_opts,
pg_lock_acquired: false,
connection_backoff: %ConnectionBackoff{backoff: :backoff.init(1000, 10_000)},
connection_backoff: %{
backoff: :backoff.init(1000, 10_000),
retries_started_at: nil,
timer_ref: nil
},
stack_id: Keyword.fetch!(opts, :stack_id),
stack_events_registry: Keyword.fetch!(opts, :stack_events_registry),
tweaks: Keyword.fetch!(opts, :tweaks),
Expand All @@ -207,13 +203,13 @@ defmodule Electric.Connection.Manager do
end

@impl true
def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do
def handle_call(:get_pg_version, _from, %State{pg_version: pg_version} = state) do
# If we haven't queried the PG version by the time it is requested, that's a fatal error.
false = is_nil(pg_version)
{:reply, pg_version, state}
end

def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do
def handle_call(:get_status, _from, %State{pg_lock_acquired: pg_lock_acquired} = state) do
status =
cond do
not pg_lock_acquired ->
Expand All @@ -230,16 +226,16 @@ defmodule Electric.Connection.Manager do
{:reply, status, state}
end

def handle_call(:await_active, from, %{pool_pid: nil} = state) do
{:noreply, %{state | awaiting_active: [from | state.awaiting_active]}}
def handle_call(:await_active, from, %State{pool_pid: nil} = state) do
{:noreply, %State{state | awaiting_active: [from | state.awaiting_active]}}
end

def handle_call(:await_active, _from, state) do
{:reply, :ok, state}
end

def handle_call(:drop_replication_slot_on_stop, _from, state) do
{:reply, :ok, %{state | drop_slot_requested: true}}
{:reply, :ok, %State{state | drop_slot_requested: true}}
end

def handle_call(:report_retained_wal_size, _from, state) do
Expand All @@ -263,7 +259,7 @@ defmodule Electric.Connection.Manager do
case start_lock_connection(opts) do
{:ok, pid, connection_opts} ->
state = mark_connection_succeeded(state)
state = %{state | lock_connection_pid: pid, connection_opts: connection_opts}
state = %State{state | lock_connection_pid: pid, connection_opts: connection_opts}

Electric.StackSupervisor.dispatch_stack_event(
state.stack_events_registry,
Expand All @@ -290,7 +286,7 @@ defmodule Electric.Connection.Manager do
case start_replication_client(opts) do
{:ok, pid, connection_opts} ->
state = mark_connection_succeeded(state)
state = %{state | replication_client_pid: pid, connection_opts: connection_opts}
state = %State{state | replication_client_pid: pid, connection_opts: connection_opts}

if is_nil(state.pool_pid) do
# This is the case where Connection.Manager starts connections from the initial state.
Expand Down Expand Up @@ -346,7 +342,7 @@ defmodule Electric.Connection.Manager do
log_collector_pid = lookup_log_collector_pid(shapes_sup_pid)
Process.monitor(log_collector_pid)

state = %{
state = %State{
state
| pool_pid: pool_pid,
shape_log_collector_pid: log_collector_pid,
Expand All @@ -357,7 +353,7 @@ defmodule Electric.Connection.Manager do
GenServer.reply(awaiting, :ok)
end

{:noreply, %{state | awaiting_active: []}}
{:noreply, %State{state | awaiting_active: []}}

{:error, reason} ->
handle_connection_error(reason, state, "regular")
Expand All @@ -367,11 +363,11 @@ defmodule Electric.Connection.Manager do
@impl true
def handle_info(
{:timeout, tref, step},
%{connection_backoff: %ConnectionBackoff{timer_ref: tref} = conn_backoff} = state
%State{connection_backoff: %{timer_ref: tref} = conn_backoff} = state
) do
state = %State{
state
| connection_backoff: %ConnectionBackoff{conn_backoff | timer_ref: nil}
| connection_backoff: %{conn_backoff | timer_ref: nil}
}

handle_continue(step, state)
Expand All @@ -391,7 +387,7 @@ defmodule Electric.Connection.Manager do
"Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}"
)

state = %{state | replication_client_pid: nil}
state = %State{state | replication_client_pid: nil}
state = schedule_reconnection(:start_replication_client, state)
{:noreply, state}
end
Expand All @@ -408,7 +404,10 @@ defmodule Electric.Connection.Manager do
{:stop, {:shutdown, reason}, state}
end

def handle_info({:DOWN, _ref, :process, pid, reason}, %{shape_log_collector_pid: pid} = state) do
def handle_info(
{:DOWN, _ref, :process, pid, reason},
%State{shape_log_collector_pid: pid} = state
) do
# The replication client would normally exit together with the shape log collector when it
# is blocked on a call to either `ShapeLogCollector.handle_relation_msg/2` or
# `ShapeLogCollector.store_transaction/2` and the log collector encounters a storage error.
Expand All @@ -433,7 +432,7 @@ defmodule Electric.Connection.Manager do
drop_slot(state)
end

{:noreply, %{state | shape_log_collector_pid: nil, replication_client_pid: nil}}
{:noreply, %State{state | shape_log_collector_pid: nil, replication_client_pid: nil}}
end

# Periodically log the status of the lock connection until it is acquired for
Expand All @@ -448,11 +447,11 @@ defmodule Electric.Connection.Manager do
end

@impl true
def handle_cast(:exclusive_connection_lock_acquired, %{pg_lock_acquired: false} = state) do
def handle_cast(:exclusive_connection_lock_acquired, %State{pg_lock_acquired: false} = state) do
# As soon as we acquire the connection lock, we try to start the replication connection
# first because it requires additional privileges compared to regular "pooled" connections,
# so failure to open a replication connection should be reported ASAP.
{:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
{:noreply, %State{state | pg_lock_acquired: true}, {:continue, :start_replication_client}}
end

def handle_cast({:pg_info_looked_up, {server_version, system_identifier, timeline_id}}, state) do
Expand All @@ -467,7 +466,7 @@ defmodule Electric.Connection.Manager do
)

{:noreply,
%{
%State{
state
| pg_version: server_version,
pg_system_identifier: system_identifier,
Expand Down Expand Up @@ -561,7 +560,7 @@ defmodule Electric.Connection.Manager do
)

# disable IPv6 and retry immediately
state = %{
state = %State{
state
| ipv6_enabled: false,
connection_opts: connection_opts |> Keyword.put(:ipv6, false) |> update_tcp_opts()
Expand Down Expand Up @@ -656,7 +655,7 @@ defmodule Electric.Connection.Manager do
defp schedule_reconnection(
step,
%State{
connection_backoff: %ConnectionBackoff{
connection_backoff: %{
backoff: backoff,
retries_started_at: retries_started_at
}
Expand All @@ -668,7 +667,7 @@ defmodule Electric.Connection.Manager do

%State{
state
| connection_backoff: %ConnectionBackoff{
| connection_backoff: %{
backoff: backoff,
retries_started_at: retries_started_at || System.monotonic_time(:millisecond),
timer_ref: tref
Expand All @@ -677,29 +676,29 @@ defmodule Electric.Connection.Manager do
end

# If total backoff time is 0 then there were no reconnection attempts
defp mark_connection_succeeded(
%State{connection_backoff: %ConnectionBackoff{retries_started_at: nil}} = state
),
do: state
defp mark_connection_succeeded(%State{connection_backoff: %{retries_started_at: nil}} = state),
do: state

# Otherwise, reset the backoff and total backoff time
defp mark_connection_succeeded(
%State{connection_backoff: %ConnectionBackoff{backoff: backoff}} = state
) do
defp mark_connection_succeeded(%State{connection_backoff: %{backoff: backoff}} = state) do
{_, backoff} = :backoff.succeed(backoff)
Logger.info("Reconnection succeeded after #{inspect(total_retry_time(state))}ms")

%State{
state
| connection_backoff: %ConnectionBackoff{backoff: backoff}
| connection_backoff: %{
state.connection_backoff
| backoff: backoff,
retries_started_at: nil
}
}
end

defp total_retry_time(%State{connection_backoff: %ConnectionBackoff{retries_started_at: nil}}),
defp total_retry_time(%State{connection_backoff: %{retries_started_at: nil}}),
do: 0

defp total_retry_time(%State{
connection_backoff: %ConnectionBackoff{retries_started_at: retries_started_at}
connection_backoff: %{retries_started_at: retries_started_at}
}),
do: retries_started_at - System.monotonic_time(:millisecond)

Expand Down Expand Up @@ -780,11 +779,11 @@ defmodule Electric.Connection.Manager do
log_collector_pid
end

defp drop_slot(%{pool_pid: nil} = _state) do
defp drop_slot(%State{pool_pid: nil} = _state) do
Logger.warning("Skipping slot drop, pool connection not available")
end

defp drop_slot(%{pool_pid: pool} = state) do
defp drop_slot(%State{pool_pid: pool} = state) do
publication_name = Keyword.fetch!(state.replication_opts, :publication_name)
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)
slot_temporary? = Keyword.fetch!(state.replication_opts, :slot_temporary?)
Expand Down
28 changes: 0 additions & 28 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ defmodule Electric.ShapeCacheBehaviour do
@type shape_def :: Shape.t()
@type xmin :: non_neg_integer()

@doc "Update a shape's status with a new log offset"
@callback update_shape_latest_offset(shape_handle(), LogOffset.t(), keyword()) :: :ok

@callback get_shape(shape_def(), opts :: Access.t()) ::
{shape_handle(), current_snapshot_offset :: LogOffset.t()} | nil
@callback get_or_create_shape_handle(shape_def(), opts :: Access.t()) ::
Expand Down Expand Up @@ -113,24 +110,6 @@ defmodule Electric.ShapeCache do
end
end

@impl Electric.ShapeCacheBehaviour
@spec update_shape_latest_offset(shape_handle(), LogOffset.t(), opts :: Access.t()) ::
:ok | {:error, term()}
def update_shape_latest_offset(shape_handle, latest_offset, opts) do
meta_table = get_shape_meta_table(opts)
shape_status = Access.get(opts, :shape_status, ShapeStatus)

if shape_status.set_latest_offset(meta_table, shape_handle, latest_offset) do
:ok
else
Logger.warning(
"Tried to update latest offset for shape #{shape_handle} which doesn't exist"
)

:error
end
end

@impl Electric.ShapeCacheBehaviour
@spec list_shapes(Access.t()) :: [{shape_handle(), Shape.t()}]
def list_shapes(opts) do
Expand Down Expand Up @@ -348,13 +327,6 @@ defmodule Electric.ShapeCache do
publication_manager: state.publication_manager,
chunk_bytes_threshold: state.chunk_bytes_threshold,
log_producer: state.log_producer,
shape_cache:
{__MODULE__,
%{
server: state.name,
shape_meta_table: state.shape_meta_table,
stack_id: state.stack_id
}},
registry: state.registry,
db_pool: state.db_pool,
run_with_conn_fn: state.run_with_conn_fn,
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ defmodule Electric.Shapes.Consumer do
shape_handle: shape_handle,
log_state: log_state,
chunk_bytes_threshold: chunk_bytes_threshold,
shape_cache: {shape_cache, shape_cache_opts},
shape_status: {shape_status, shape_status_state},
registry: registry,
storage: storage
} = state
Expand Down Expand Up @@ -326,7 +326,7 @@ defmodule Electric.Shapes.Consumer do
Map.new(shape_attrs(state.shape_handle, state.shape))
)

shape_cache.update_shape_latest_offset(shape_handle, last_log_offset, shape_cache_opts)
shape_status.set_latest_offset(shape_status_state, shape_handle, last_log_offset)

notify_listeners(registry, :new_changes, shape_handle, last_log_offset)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ defmodule Electric.Shapes.ConsumerSupervisor do
shape: [type: {:struct, Electric.Shapes.Shape}, required: true],
inspector: [type: :mod_arg, required: true],
log_producer: [type: @genserver_name_schema, required: true],
shape_cache: [type: :mod_arg, required: true],
registry: [type: :atom, required: true],
shape_status: [type: :mod_arg, required: true],
storage: [type: :mod_arg, required: true],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
@test_pg_id "12345"

# Higher timeout is needed for some tests that tend to run slower on CI.
@receive_timeout 1000
@receive_timeout 2000

def load_column_info({"public", "users"}, _),
do: {:ok, @test_shape.table_info[{"public", "users"}][:columns]}
Expand Down Expand Up @@ -780,7 +780,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"}
end

@tag stack_ready_timeout: 1000
@tag stack_ready_timeout: 5000
test "waits until stack ready and proceeds", ctx do
conn_task =
Task.async(fn ->
Expand Down
47 changes: 0 additions & 47 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -328,53 +328,6 @@ defmodule Electric.ShapeCacheTest do
} = map
end

test "updates latest offset correctly", %{
shape_cache_opts: opts,
storage: storage
} do
{shape_handle, initial_offset} = ShapeCache.get_or_create_shape_handle(@shape, opts)
assert :started = ShapeCache.await_snapshot_start(shape_handle, opts)

assert {^shape_handle, offset_after_snapshot} =
ShapeCache.get_or_create_shape_handle(@shape, opts)

expected_offset_after_log_entry =
LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0)

:ok =
ShapeCache.update_shape_latest_offset(shape_handle, expected_offset_after_log_entry, opts)

assert {^shape_handle, offset_after_log_entry} =
ShapeCache.get_or_create_shape_handle(@shape, opts)

assert initial_offset == @zero_offset
assert initial_offset == offset_after_snapshot
assert LogOffset.compare(offset_after_log_entry, offset_after_snapshot) == :gt
assert offset_after_log_entry == expected_offset_after_log_entry

# Stop snapshot process gracefully to prevent errors being logged in the test
storage = Storage.for_shape(shape_handle, storage)

stream =
Storage.get_log_stream(
LogOffset.before_all(),
LogOffset.last_before_real_offsets(),
storage
)

Stream.run(stream)
end

test "errors if appending to untracked shape_handle", %{shape_cache_opts: opts} do
shape_handle = "foo"
log_offset = LogOffset.new(1000, 0)

{:error, log} =
with_log(fn -> ShapeCache.update_shape_latest_offset(shape_handle, log_offset, opts) end)

assert log =~ "Tried to update latest offset for shape #{shape_handle} which doesn't exist"
end

test "correctly propagates the error", %{shape_cache_opts: opts} do
shape = %Shape{
@shape
Expand Down
Loading
Loading