From e359bdca32897e61bbc8cb21ba288915c4ea2677 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 17:57:20 +0200 Subject: [PATCH 1/9] Use `Consumer.whereis` everywhere --- .../test/electric/shapes/consumer_test.exs | 90 +++++++++---------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 047b97ef24..1d371909b3 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -172,7 +172,7 @@ defmodule Electric.Shapes.ConsumerTest do Mock.ShapeCache |> expect(:update_shape_latest_offset, 2, fn @shape_handle1, ^last_log_offset, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) ref = make_ref() @@ -215,8 +215,8 @@ defmodule Electric.Shapes.ConsumerTest do @shape_handle1, ^last_log_offset, _ -> :ok @shape_handle2, ^last_log_offset, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) ref1 = make_ref() ref2 = make_ref() @@ -281,7 +281,7 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:update_shape_latest_offset, fn @shape_handle2, _offset, _ -> :ok end) |> allow( self(), - Shapes.Consumer.name(ctx.stack_id, @shape_handle2) + Shapes.Consumer.whereis(ctx.stack_id, @shape_handle2) ) txn = @@ -313,13 +313,13 @@ defmodule Electric.Shapes.ConsumerTest do Mock.ShapeStatus |> expect(:remove_shape, 1, fn _, @shape_handle1 -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) shape1 = ctx.shapes[@shape_handle1] Mock.PublicationManager |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) txn = %Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset} @@ -371,7 +371,7 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, fn _, @shape_handle1 -> :ok end) |> allow( self(), - Shapes.Consumer.name(ctx.stack_id, @shape_handle1) + Shapes.Consumer.whereis(ctx.stack_id, @shape_handle1) ) shape = ctx.shapes[@shape_handle1] @@ -380,7 +380,7 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:remove_shape, 1, fn ^shape, _ -> :ok end) |> allow( self(), - Shapes.Consumer.name(ctx.stack_id, @shape_handle1) + Shapes.Consumer.whereis(ctx.stack_id, @shape_handle1) ) txn = @@ -410,7 +410,7 @@ defmodule Electric.Shapes.ConsumerTest do Mock.ShapeCache |> expect(:update_shape_latest_offset, fn @shape_handle1, ^last_log_offset, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) ref = make_ref() Registry.register(ctx.registry, @shape_handle1, ref) @@ -438,16 +438,16 @@ defmodule Electric.Shapes.ConsumerTest do } ref1 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle1)) ref2 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle2))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle2)) Mock.ShapeStatus |> expect(:remove_shape, 0, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) assert :ok = ShapeLogCollector.handle_relation_msg(rel, ctx.producer) @@ -466,23 +466,23 @@ defmodule Electric.Shapes.ConsumerTest do } ref1 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle1)) ref2 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle2))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle2)) # also cleans up inspector cache and shape status cache Mock.Inspector |> expect(:clean, 1, fn _, _ -> true end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:clean, 0, fn _, _ -> true end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) Mock.ShapeStatus |> expect(:remove_shape, 1, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) assert :ok = ShapeLogCollector.handle_relation_msg(rel, ctx.producer) @@ -504,32 +504,32 @@ defmodule Electric.Shapes.ConsumerTest do } ref1 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle1)) ref2 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle2))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle2)) # also cleans up inspector cache and shape status cache Mock.Inspector |> expect(:clean, 1, fn _, _ -> true end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:clean, 0, fn _, _ -> true end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) Mock.ShapeStatus |> expect(:remove_shape, 1, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) shape1 = ctx.shapes[@shape_handle1] shape2 = ctx.shapes[@shape_handle2] Mock.PublicationManager |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) assert :ok = ShapeLogCollector.handle_relation_msg(rel, ctx.producer) @@ -543,7 +543,7 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:update_shape_latest_offset, fn @shape_handle1, _, _ -> raise "The unexpected error" end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) lsn = Lsn.from_string("0/10") @@ -561,25 +561,25 @@ defmodule Electric.Shapes.ConsumerTest do }) ref1 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle1)) ref2 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle2))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle2)) Mock.ShapeStatus |> expect(:remove_shape, 1, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) shape1 = ctx.shapes[@shape_handle1] shape2 = ctx.shapes[@shape_handle2] Mock.PublicationManager |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) :ok = ShapeLogCollector.store_transaction(txn, ctx.producer) @@ -592,27 +592,27 @@ defmodule Electric.Shapes.ConsumerTest do test "consumer crashing stops affected consumer and cleans affected shape", ctx do ref1 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle1)) ref2 = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle2))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle2)) Mock.ShapeStatus |> expect(:remove_shape, 1, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) shape1 = ctx.shapes[@shape_handle1] shape2 = ctx.shapes[@shape_handle2] Mock.PublicationManager |> expect(:remove_shape, 1, fn ^shape1, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> expect(:remove_shape, 0, fn ^shape2, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle2)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) - GenServer.cast(Consumer.name(ctx.stack_id, @shape_handle1), :unexpected_cast) + GenServer.cast(Consumer.whereis(ctx.stack_id, @shape_handle1), :unexpected_cast) assert_receive {Support.TestStorage, :cleanup!, @shape_handle1} refute_receive {Support.TestStorage, :cleanup!, @shape_handle2} @@ -623,19 +623,19 @@ defmodule Electric.Shapes.ConsumerTest do test "consumer exiting normally does not clean up the shape", ctx do ref = - Process.monitor(GenServer.whereis(Consumer.name(ctx.stack_id, @shape_handle1))) + Process.monitor(Consumer.whereis(ctx.stack_id, @shape_handle1)) Mock.ShapeStatus |> expect(:remove_shape, 0, fn _, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) shape1 = ctx.shapes[@shape_handle1] Mock.PublicationManager |> expect(:remove_shape, 0, fn ^shape1, _ -> :ok end) - |> allow(self(), Consumer.name(ctx.stack_id, @shape_handle1)) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) - GenServer.stop(Consumer.name(ctx.stack_id, @shape_handle1)) + GenServer.stop(Consumer.whereis(ctx.stack_id, @shape_handle1)) refute_receive {Support.TestStorage, :cleanup!, @shape_handle1} From b76498949f912a2ccaabb15581ea4210c5b38b26 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 17:59:06 +0200 Subject: [PATCH 2/9] Remove `ConnectionBackoff` struct --- .../lib/electric/connection/manager.ex | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index c2b4a6af38..8157ae7046 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -69,14 +69,6 @@ defmodule Electric.Connection.Manager do ] end - defmodule ConnectionBackoff do - defstruct [ - :backoff, - :retries_started_at, - :timer_ref - ] - end - use GenServer require Logger @@ -193,7 +185,11 @@ defmodule Electric.Connection.Manager do timeline_opts: timeline_opts, shape_cache_opts: shape_cache_opts, pg_lock_acquired: false, - connection_backoff: %ConnectionBackoff{backoff: :backoff.init(1000, 10_000)}, + connection_backoff: %{ + backoff: :backoff.init(1000, 10_000), + retries_started_at: nil, + timer_ref: nil + }, stack_id: Keyword.fetch!(opts, :stack_id), stack_events_registry: Keyword.fetch!(opts, :stack_events_registry), tweaks: Keyword.fetch!(opts, :tweaks), @@ -367,11 +363,11 @@ defmodule Electric.Connection.Manager do @impl true def handle_info( {:timeout, tref, step}, - %{connection_backoff: %ConnectionBackoff{timer_ref: tref} = conn_backoff} = state + %{connection_backoff: %{timer_ref: tref} = conn_backoff} = state ) do state = %State{ state - | connection_backoff: %ConnectionBackoff{conn_backoff | timer_ref: nil} + | connection_backoff: %{conn_backoff | timer_ref: nil} } handle_continue(step, state) @@ -656,7 +652,7 @@ defmodule Electric.Connection.Manager do defp schedule_reconnection( step, %State{ - connection_backoff: %ConnectionBackoff{ + connection_backoff: %{ backoff: backoff, retries_started_at: retries_started_at } @@ -668,7 +664,7 @@ defmodule Electric.Connection.Manager do %State{ state - | connection_backoff: %ConnectionBackoff{ + | connection_backoff: %{ backoff: backoff, retries_started_at: retries_started_at || System.monotonic_time(:millisecond), timer_ref: tref @@ -677,29 +673,29 @@ defmodule Electric.Connection.Manager do end # If total backoff time is 0 then there were no reconnection attempts - defp mark_connection_succeeded( - %State{connection_backoff: %ConnectionBackoff{retries_started_at: nil}} = state - ), - do: state + defp mark_connection_succeeded(%State{connection_backoff: %{retries_started_at: nil}} = state), + do: state # Otherwise, reset the backoff and total backoff time - defp mark_connection_succeeded( - %State{connection_backoff: %ConnectionBackoff{backoff: backoff}} = state - ) do + defp mark_connection_succeeded(%State{connection_backoff: %{backoff: backoff}} = state) do {_, backoff} = :backoff.succeed(backoff) Logger.info("Reconnection succeeded after #{inspect(total_retry_time(state))}ms") %State{ state - | connection_backoff: %ConnectionBackoff{backoff: backoff} + | connection_backoff: %{ + state.connection_backoff + | backoff: backoff, + retries_started_at: nil + } } end - defp total_retry_time(%State{connection_backoff: %ConnectionBackoff{retries_started_at: nil}}), + defp total_retry_time(%State{connection_backoff: %{retries_started_at: nil}}), do: 0 defp total_retry_time(%State{ - connection_backoff: %ConnectionBackoff{retries_started_at: retries_started_at} + connection_backoff: %{retries_started_at: retries_started_at} }), do: retries_started_at - System.monotonic_time(:millisecond) From fc85b2dcc07c613e53c667c8eb60675e0a199052 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 18:02:07 +0200 Subject: [PATCH 3/9] Use `State` struct everywhere --- .../lib/electric/connection/manager.ex | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 8157ae7046..8661c8a6c8 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -203,13 +203,13 @@ defmodule Electric.Connection.Manager do end @impl true - def handle_call(:get_pg_version, _from, %{pg_version: pg_version} = state) do + def handle_call(:get_pg_version, _from, %State{pg_version: pg_version} = state) do # If we haven't queried the PG version by the time it is requested, that's a fatal error. false = is_nil(pg_version) {:reply, pg_version, state} end - def handle_call(:get_status, _from, %{pg_lock_acquired: pg_lock_acquired} = state) do + def handle_call(:get_status, _from, %State{pg_lock_acquired: pg_lock_acquired} = state) do status = cond do not pg_lock_acquired -> @@ -226,8 +226,8 @@ defmodule Electric.Connection.Manager do {:reply, status, state} end - def handle_call(:await_active, from, %{pool_pid: nil} = state) do - {:noreply, %{state | awaiting_active: [from | state.awaiting_active]}} + def handle_call(:await_active, from, %State{pool_pid: nil} = state) do + {:noreply, %State{state | awaiting_active: [from | state.awaiting_active]}} end def handle_call(:await_active, _from, state) do @@ -235,7 +235,7 @@ defmodule Electric.Connection.Manager do end def handle_call(:drop_replication_slot_on_stop, _from, state) do - {:reply, :ok, %{state | drop_slot_requested: true}} + {:reply, :ok, %State{state | drop_slot_requested: true}} end def handle_call(:report_retained_wal_size, _from, state) do @@ -259,7 +259,7 @@ defmodule Electric.Connection.Manager do case start_lock_connection(opts) do {:ok, pid, connection_opts} -> state = mark_connection_succeeded(state) - state = %{state | lock_connection_pid: pid, connection_opts: connection_opts} + state = %State{state | lock_connection_pid: pid, connection_opts: connection_opts} Electric.StackSupervisor.dispatch_stack_event( state.stack_events_registry, @@ -286,7 +286,7 @@ defmodule Electric.Connection.Manager do case start_replication_client(opts) do {:ok, pid, connection_opts} -> state = mark_connection_succeeded(state) - state = %{state | replication_client_pid: pid, connection_opts: connection_opts} + state = %State{state | replication_client_pid: pid, connection_opts: connection_opts} if is_nil(state.pool_pid) do # This is the case where Connection.Manager starts connections from the initial state. @@ -342,7 +342,7 @@ defmodule Electric.Connection.Manager do log_collector_pid = lookup_log_collector_pid(shapes_sup_pid) Process.monitor(log_collector_pid) - state = %{ + state = %State{ state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid, @@ -353,7 +353,7 @@ defmodule Electric.Connection.Manager do GenServer.reply(awaiting, :ok) end - {:noreply, %{state | awaiting_active: []}} + {:noreply, %State{state | awaiting_active: []}} {:error, reason} -> handle_connection_error(reason, state, "regular") @@ -363,7 +363,7 @@ defmodule Electric.Connection.Manager do @impl true def handle_info( {:timeout, tref, step}, - %{connection_backoff: %{timer_ref: tref} = conn_backoff} = state + %State{connection_backoff: %{timer_ref: tref} = conn_backoff} = state ) do state = %State{ state @@ -387,7 +387,7 @@ defmodule Electric.Connection.Manager do "Handling the exit of the replication client #{inspect(pid)} with reason #{inspect(reason)}" ) - state = %{state | replication_client_pid: nil} + state = %State{state | replication_client_pid: nil} state = schedule_reconnection(:start_replication_client, state) {:noreply, state} end @@ -404,7 +404,10 @@ defmodule Electric.Connection.Manager do {:stop, {:shutdown, reason}, state} end - def handle_info({:DOWN, _ref, :process, pid, reason}, %{shape_log_collector_pid: pid} = state) do + def handle_info( + {:DOWN, _ref, :process, pid, reason}, + %State{shape_log_collector_pid: pid} = state + ) do # The replication client would normally exit together with the shape log collector when it # is blocked on a call to either `ShapeLogCollector.handle_relation_msg/2` or # `ShapeLogCollector.store_transaction/2` and the log collector encounters a storage error. @@ -429,7 +432,7 @@ defmodule Electric.Connection.Manager do drop_slot(state) end - {:noreply, %{state | shape_log_collector_pid: nil, replication_client_pid: nil}} + {:noreply, %State{state | shape_log_collector_pid: nil, replication_client_pid: nil}} end # Periodically log the status of the lock connection until it is acquired for @@ -444,11 +447,11 @@ defmodule Electric.Connection.Manager do end @impl true - def handle_cast(:exclusive_connection_lock_acquired, %{pg_lock_acquired: false} = state) do + def handle_cast(:exclusive_connection_lock_acquired, %State{pg_lock_acquired: false} = state) do # As soon as we acquire the connection lock, we try to start the replication connection # first because it requires additional privileges compared to regular "pooled" connections, # so failure to open a replication connection should be reported ASAP. - {:noreply, %{state | pg_lock_acquired: true}, {:continue, :start_replication_client}} + {:noreply, %State{state | pg_lock_acquired: true}, {:continue, :start_replication_client}} end def handle_cast({:pg_info_looked_up, {server_version, system_identifier, timeline_id}}, state) do @@ -463,7 +466,7 @@ defmodule Electric.Connection.Manager do ) {:noreply, - %{ + %State{ state | pg_version: server_version, pg_system_identifier: system_identifier, @@ -557,7 +560,7 @@ defmodule Electric.Connection.Manager do ) # disable IPv6 and retry immediately - state = %{ + state = %State{ state | ipv6_enabled: false, connection_opts: connection_opts |> Keyword.put(:ipv6, false) |> update_tcp_opts() @@ -776,11 +779,11 @@ defmodule Electric.Connection.Manager do log_collector_pid end - defp drop_slot(%{pool_pid: nil} = _state) do + defp drop_slot(%State{pool_pid: nil} = _state) do Logger.warning("Skipping slot drop, pool connection not available") end - defp drop_slot(%{pool_pid: pool} = state) do + defp drop_slot(%State{pool_pid: pool} = state) do publication_name = Keyword.fetch!(state.replication_opts, :publication_name) slot_name = Keyword.fetch!(state.replication_opts, :slot_name) slot_temporary? = Keyword.fetch!(state.replication_opts, :slot_temporary?) From db37a579fc1b0132d5f326a869023e769c9e9ea0 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 18:32:43 +0200 Subject: [PATCH 4/9] Remove `ShapeCache` dependency for `Consumer` --- .../sync-service/lib/electric/shape_cache.ex | 28 ----------- .../lib/electric/shapes/consumer.ex | 4 +- .../electric/shapes/consumer_supervisor.ex | 1 - .../test/electric/shape_cache_test.exs | 47 ------------------- .../test/electric/shapes/consumer_test.exs | 34 +++++--------- 5 files changed, 15 insertions(+), 99 deletions(-) diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index 93886f7fa4..61b7a70461 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -9,9 +9,6 @@ defmodule Electric.ShapeCacheBehaviour do @type shape_def :: Shape.t() @type xmin :: non_neg_integer() - @doc "Update a shape's status with a new log offset" - @callback update_shape_latest_offset(shape_handle(), LogOffset.t(), keyword()) :: :ok - @callback get_shape(shape_def(), opts :: Access.t()) :: {shape_handle(), current_snapshot_offset :: LogOffset.t()} | nil @callback get_or_create_shape_handle(shape_def(), opts :: Access.t()) :: @@ -113,24 +110,6 @@ defmodule Electric.ShapeCache do end end - @impl Electric.ShapeCacheBehaviour - @spec update_shape_latest_offset(shape_handle(), LogOffset.t(), opts :: Access.t()) :: - :ok | {:error, term()} - def update_shape_latest_offset(shape_handle, latest_offset, opts) do - meta_table = get_shape_meta_table(opts) - shape_status = Access.get(opts, :shape_status, ShapeStatus) - - if shape_status.set_latest_offset(meta_table, shape_handle, latest_offset) do - :ok - else - Logger.warning( - "Tried to update latest offset for shape #{shape_handle} which doesn't exist" - ) - - :error - end - end - @impl Electric.ShapeCacheBehaviour @spec list_shapes(Access.t()) :: [{shape_handle(), Shape.t()}] def list_shapes(opts) do @@ -348,13 +327,6 @@ defmodule Electric.ShapeCache do publication_manager: state.publication_manager, chunk_bytes_threshold: state.chunk_bytes_threshold, log_producer: state.log_producer, - shape_cache: - {__MODULE__, - %{ - server: state.name, - shape_meta_table: state.shape_meta_table, - stack_id: state.stack_id - }}, registry: state.registry, db_pool: state.db_pool, run_with_conn_fn: state.run_with_conn_fn, diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index e3e00735d0..56b7f85ca4 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -264,7 +264,7 @@ defmodule Electric.Shapes.Consumer do shape_handle: shape_handle, log_state: log_state, chunk_bytes_threshold: chunk_bytes_threshold, - shape_cache: {shape_cache, shape_cache_opts}, + shape_status: {shape_status, shape_status_state}, registry: registry, storage: storage } = state @@ -326,7 +326,7 @@ defmodule Electric.Shapes.Consumer do Map.new(shape_attrs(state.shape_handle, state.shape)) ) - shape_cache.update_shape_latest_offset(shape_handle, last_log_offset, shape_cache_opts) + shape_status.set_latest_offset(shape_status_state, shape_handle, last_log_offset) notify_listeners(registry, :new_changes, shape_handle, last_log_offset) diff --git a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex index 93b244059e..8fff61e6ce 100644 --- a/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex +++ b/packages/sync-service/lib/electric/shapes/consumer_supervisor.ex @@ -12,7 +12,6 @@ defmodule Electric.Shapes.ConsumerSupervisor do shape: [type: {:struct, Electric.Shapes.Shape}, required: true], inspector: [type: :mod_arg, required: true], log_producer: [type: @genserver_name_schema, required: true], - shape_cache: [type: :mod_arg, required: true], registry: [type: :atom, required: true], shape_status: [type: :mod_arg, required: true], storage: [type: :mod_arg, required: true], diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index c1652ae065..ae324a65cd 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -328,53 +328,6 @@ defmodule Electric.ShapeCacheTest do } = map end - test "updates latest offset correctly", %{ - shape_cache_opts: opts, - storage: storage - } do - {shape_handle, initial_offset} = ShapeCache.get_or_create_shape_handle(@shape, opts) - assert :started = ShapeCache.await_snapshot_start(shape_handle, opts) - - assert {^shape_handle, offset_after_snapshot} = - ShapeCache.get_or_create_shape_handle(@shape, opts) - - expected_offset_after_log_entry = - LogOffset.new(Electric.Postgres.Lsn.from_integer(1000), 0) - - :ok = - ShapeCache.update_shape_latest_offset(shape_handle, expected_offset_after_log_entry, opts) - - assert {^shape_handle, offset_after_log_entry} = - ShapeCache.get_or_create_shape_handle(@shape, opts) - - assert initial_offset == @zero_offset - assert initial_offset == offset_after_snapshot - assert LogOffset.compare(offset_after_log_entry, offset_after_snapshot) == :gt - assert offset_after_log_entry == expected_offset_after_log_entry - - # Stop snapshot process gracefully to prevent errors being logged in the test - storage = Storage.for_shape(shape_handle, storage) - - stream = - Storage.get_log_stream( - LogOffset.before_all(), - LogOffset.last_before_real_offsets(), - storage - ) - - Stream.run(stream) - end - - test "errors if appending to untracked shape_handle", %{shape_cache_opts: opts} do - shape_handle = "foo" - log_offset = LogOffset.new(1000, 0) - - {:error, log} = - with_log(fn -> ShapeCache.update_shape_latest_offset(shape_handle, log_offset, opts) end) - - assert log =~ "Tried to update latest offset for shape #{shape_handle} which doesn't exist" - end - test "correctly propagates the error", %{shape_cache_opts: opts} do shape = %Shape{ @shape diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 1d371909b3..81a0c9b99a 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -123,10 +123,6 @@ defmodule Electric.Shapes.ConsumerTest do Shapes.Consumer.whereis(ctx.stack_id, shape_handle) end) - Mock.ShapeCache - |> allow(self(), fn -> - Shapes.Consumer.whereis(ctx.stack_id, shape_handle) - end) {:ok, consumer} = start_supervised( @@ -142,7 +138,6 @@ defmodule Electric.Shapes.ConsumerTest do Electric.DbPool ), registry: registry_name, - shape_cache: {Mock.ShapeCache, []}, shape_status: {Mock.ShapeStatus, []}, publication_manager: {Mock.PublicationManager, []}, storage: storage, @@ -170,8 +165,8 @@ defmodule Electric.Shapes.ConsumerTest do last_log_offset = log_offset(@shape_handle1, ctx) lsn = lsn(@shape_handle1, ctx) - Mock.ShapeCache - |> expect(:update_shape_latest_offset, 2, fn @shape_handle1, ^last_log_offset, _ -> :ok end) + Mock.ShapeStatus + |> expect(:set_latest_offset, 2, fn _, @shape_handle1, ^last_log_offset -> :ok end) |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) ref = make_ref() @@ -210,10 +205,10 @@ defmodule Electric.Shapes.ConsumerTest do xid = 150 - Mock.ShapeCache - |> expect(:update_shape_latest_offset, 2, fn - @shape_handle1, ^last_log_offset, _ -> :ok - @shape_handle2, ^last_log_offset, _ -> :ok + Mock.ShapeStatus + |> expect(:set_latest_offset, 2, fn + _, @shape_handle1, ^last_log_offset -> :ok + _, @shape_handle2, ^last_log_offset -> :ok end) |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) @@ -277,12 +272,9 @@ defmodule Electric.Shapes.ConsumerTest do ref1 = Shapes.Consumer.monitor(ctx.stack_id, @shape_handle1) ref2 = Shapes.Consumer.monitor(ctx.stack_id, @shape_handle2) - Mock.ShapeCache - |> expect(:update_shape_latest_offset, fn @shape_handle2, _offset, _ -> :ok end) - |> allow( - self(), - Shapes.Consumer.whereis(ctx.stack_id, @shape_handle2) - ) + Mock.ShapeStatus + |> expect(:set_latest_offset, fn _, @shape_handle2, _offset -> :ok end) + |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle2)) txn = %Transaction{ @@ -408,8 +400,8 @@ defmodule Electric.Shapes.ConsumerTest do lsn = Lsn.from_string("0/10") last_log_offset = LogOffset.new(lsn, 0) - Mock.ShapeCache - |> expect(:update_shape_latest_offset, fn @shape_handle1, ^last_log_offset, _ -> :ok end) + Mock.ShapeStatus + |> expect(:set_latest_offset, fn _, @shape_handle1, ^last_log_offset -> :ok end) |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) ref = make_ref() @@ -539,8 +531,8 @@ defmodule Electric.Shapes.ConsumerTest do test "unexpected error while handling events stops affected consumer and cleans affected shape", ctx do - Mock.ShapeCache - |> expect(:update_shape_latest_offset, fn @shape_handle1, _, _ -> + Mock.ShapeStatus + |> expect(:set_latest_offset, fn _, @shape_handle1, _ -> raise "The unexpected error" end) |> allow(self(), Consumer.whereis(ctx.stack_id, @shape_handle1)) From 48f75b4fe91d8f38fcd33a1d6720ced834f9db7e Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 18:45:13 +0200 Subject: [PATCH 5/9] Fix formatting --- packages/sync-service/test/electric/shapes/consumer_test.exs | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 81a0c9b99a..972e405fb8 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -123,7 +123,6 @@ defmodule Electric.Shapes.ConsumerTest do Shapes.Consumer.whereis(ctx.stack_id, shape_handle) end) - {:ok, consumer} = start_supervised( {Shapes.ConsumerSupervisor, From d92feb3bc68daef9bf2f5164722eae2ad1121e62 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 19:40:11 +0200 Subject: [PATCH 6/9] Fix consumer test flakes --- .../test/electric/shapes/consumer_test.exs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 972e405fb8..b6c94e7213 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -19,6 +19,8 @@ defmodule Electric.Shapes.ConsumerTest do import Mox + @receive_timeout 1_000 + @shape_handle1 "#{__MODULE__}-shape1" @shape1 Shape.new!("public.test_table", inspector: StubInspector.new([%{name: "id", type: "int8", pk_position: 0}]) @@ -120,7 +122,7 @@ defmodule Electric.Shapes.ConsumerTest do |> expect(:set_snapshot_xmin, 1, fn _, ^shape_handle, _ -> :ok end) |> expect(:mark_snapshot_started, 1, fn _, ^shape_handle -> :ok end) |> allow(self(), fn -> - Shapes.Consumer.whereis(ctx.stack_id, shape_handle) + Consumer.whereis(ctx.stack_id, shape_handle) end) {:ok, consumer} = @@ -147,7 +149,9 @@ defmodule Electric.Shapes.ConsumerTest do ) assert_receive {Support.TestStorage, :set_shape_definition, ^shape_handle, ^shape} - + # Wait for the virtual snapshot to have started to avoid overriding any of the + # defined Mox expectations + :started = GenServer.call(Consumer.name(ctx.stack_id, shape_handle), :await_snapshot_start) consumer end @@ -186,14 +190,14 @@ defmodule Electric.Shapes.ConsumerTest do }) assert :ok = ShapeLogCollector.store_transaction(txn, ctx.producer) - assert_receive {^ref, :new_changes, ^last_log_offset}, 1000 + assert_receive {^ref, :new_changes, ^last_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} txn2 = %{txn | xid: xid} assert :ok = ShapeLogCollector.store_transaction(txn2, ctx.producer) - assert_receive {^ref, :new_changes, ^last_log_offset}, 1000 + assert_receive {^ref, :new_changes, ^last_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} refute_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _} end @@ -243,8 +247,8 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.store_transaction(txn, ctx.producer) - assert_receive {^ref1, :new_changes, ^last_log_offset}, 1000 - assert_receive {^ref2, :new_changes, ^last_log_offset}, 1000 + assert_receive {^ref1, :new_changes, ^last_log_offset}, @receive_timeout + assert_receive {^ref2, :new_changes, ^last_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, [{_offset, _key, _type, serialized_record}]} @@ -415,8 +419,8 @@ defmodule Electric.Shapes.ConsumerTest do }) assert :ok = ShapeLogCollector.store_transaction(txn, ctx.producer) + assert_receive {^ref, :new_changes, ^last_log_offset}, @receive_timeout assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _} - assert_receive {^ref, :new_changes, ^last_log_offset}, 1000 end test "does not clean shapes if relation didn't change", ctx do From ea975c95f70b010e73170e9ba03e0cab7c4c09f0 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 19:44:13 +0200 Subject: [PATCH 7/9] Fix formatting --- packages/sync-service/test/electric/shapes/consumer_test.exs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index b6c94e7213..39d2e611ba 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -151,7 +151,9 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {Support.TestStorage, :set_shape_definition, ^shape_handle, ^shape} # Wait for the virtual snapshot to have started to avoid overriding any of the # defined Mox expectations - :started = GenServer.call(Consumer.name(ctx.stack_id, shape_handle), :await_snapshot_start) + :started = + GenServer.call(Consumer.name(ctx.stack_id, shape_handle), :await_snapshot_start) + consumer end From 56e5157156934e86c0fc5045e4e735e9e75d15b0 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 19:44:49 +0200 Subject: [PATCH 8/9] Increase stack ready timeout for test --- .../sync-service/test/electric/plug/serve_shape_plug_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index 56640572e7..46bff6538b 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -780,7 +780,7 @@ defmodule Electric.Plug.ServeShapePlugTest do assert Jason.decode!(conn.resp_body) == %{"message" => "Stack not ready"} end - @tag stack_ready_timeout: 1000 + @tag stack_ready_timeout: 5000 test "waits until stack ready and proceeds", ctx do conn_task = Task.async(fn -> From fc4aaf7b562cdf46d8fa9b5d79ba3e6dfaa038c6 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 5 Feb 2025 19:45:36 +0200 Subject: [PATCH 9/9] Higher timeout --- .../sync-service/test/electric/plug/serve_shape_plug_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index 46bff6538b..53414d79ea 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -42,7 +42,7 @@ defmodule Electric.Plug.ServeShapePlugTest do @test_pg_id "12345" # Higher timeout is needed for some tests that tend to run slower on CI. - @receive_timeout 1000 + @receive_timeout 2000 def load_column_info({"public", "users"}, _), do: {:ok, @test_shape.table_info[{"public", "users"}][:columns]}