Skip to content

Commit

Permalink
Handle control connection shutdowns in cluster pool
Browse files Browse the repository at this point in the history
Closes #326.
  • Loading branch information
whatyouhide committed Sep 13, 2023
1 parent 00b84db commit c43f80c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
14 changes: 12 additions & 2 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,18 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data}
end

# Propagate all exits by exiting with the same reason. After all, if the control
# connection process or the pool supervisor crash, we want to crash this so that
# Handle the control connection shutting itself down.
def handle_event(
:info,
{:EXIT, control_connection_pid, {:shutdown, _reason}},
_state,
%__MODULE__{control_connection: control_connection_pid}
) do
{:keep_state_and_data, {:next_event, :internal, :start_control_connection}}
end

# Propagate all unhandled exits by exiting with the same reason. After all, if the control
# connection process or the pool supervisor *crash*, we want to crash this so that
# the whole thing is restarted.
def handle_event(:info, {:EXIT, _pid, reason}, _state, %__MODULE__{} = _data) do
exit(reason)
Expand Down
4 changes: 3 additions & 1 deletion test/xandra/cluster/control_connection_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
defmodule Xandra.Cluster.ControlConnectionTest do
use ExUnit.Case, async: true
# We keep this as not async because we could have Telemetry events from other
# tests that interfere with this one.
use ExUnit.Case, async: false

import ExUnit.CaptureLog

Expand Down
30 changes: 28 additions & 2 deletions test/xandra/cluster/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Xandra.Cluster.PoolTest do
alias Xandra.Cluster.ControlConnection
alias Xandra.Cluster.Host
alias Xandra.Cluster.Pool
alias Xandra.TestHelper

@protocol_version XandraTest.IntegrationCase.protocol_version()
@port String.to_integer(System.get_env("CASSANDRA_PORT", "9052"))
Expand Down Expand Up @@ -345,12 +346,37 @@ defmodule Xandra.Cluster.PoolTest do
cluster_ref = Process.monitor(cluster)

control_conn = get_state(cluster).control_connection
ref = Process.monitor(control_conn)
control_conn_ref = Process.monitor(control_conn)
Process.exit(control_conn, :kill)
assert_receive {:DOWN, ^ref, _, _, _}
assert_receive {:DOWN, ^control_conn_ref, _, _, _}

assert_receive {:DOWN, ^cluster_ref, _, _, :killed}
end

@tag :capture_log
test "if the control connection shuts down with :closed, the pool is fine and starts a new one",
%{cluster_options: cluster_options, pool_options: pool_options} do
telemetry_ref =
:telemetry_test.attach_event_handlers(self(), [
[:xandra, :cluster, :control_connection, :connected],
[:xandra, :cluster, :control_connection, :disconnected]
])

cluster_options = Keyword.merge(cluster_options, sync_connect: 1000)
cluster = TestHelper.start_link_supervised!(spec(cluster_options, pool_options))

assert_telemetry(telemetry_ref, [:control_connection, :connected])

control_conn = get_state(cluster).control_connection
control_conn_ref = Process.monitor(control_conn)

:ok = :gen_tcp.shutdown(:sys.get_state(control_conn).transport.socket, :read_write)
assert_telemetry(telemetry_ref, [:control_connection, :disconnected])
assert_receive {:DOWN, ^control_conn_ref, _, _, _}

# Make sure we reconnect to the control connection.
assert_telemetry(telemetry_ref, [:control_connection, :connected])
end
end

defp spec(cluster_opts, pool_opts) do
Expand Down

0 comments on commit c43f80c

Please sign in to comment.