Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logic for reconnecting to new nodes #309

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 83 additions & 18 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ defmodule Xandra.Cluster.ControlConnection do

data = update_in(data.lbp, &LoadBalancingPolicy.update_host(&1, host, :added))
send(data.cluster, {:host_added, new_host})
put_in(data.peers[{new_host.address, new_host.port}], %{status: :up, host: host})
put_in(data.peers[{new_host.address, new_host.port}], %{status: :up, host: host, last_seen_at: System.system_time(:millisecond)})
peixian marked this conversation as resolved.
Show resolved Hide resolved
else
_ -> data
end
Expand Down Expand Up @@ -354,6 +354,7 @@ defmodule Xandra.Cluster.ControlConnection do

data = update_in(data.lbp, &LoadBalancingPolicy.update_host(&1, host_info.host, :down))
data = put_in(data.peers[{address, port}].status, :down)
data = put_in(data.peers[{address, port}].last_seen_at, System.system_time(:millisecond))
send(data.cluster, {:host_down, host_info.host})
{:keep_state, data}
else
Expand Down Expand Up @@ -396,6 +397,7 @@ defmodule Xandra.Cluster.ControlConnection do

case transport.connect(address, port, data.transport_options, @default_timeout) do
{:ok, socket} ->
Logger.debug("Connected to #{inspect(node)}")
peixian marked this conversation as resolved.
Show resolved Hide resolved
with {:ok, supported_opts, proto_mod} <- request_options(transport, socket, proto_vsn),
Logger.debug("Supported options: #{inspect(supported_opts)}"),
{:ok, {ip, port}} <- inet_mod(transport).peername(socket),
Expand All @@ -410,7 +412,10 @@ defmodule Xandra.Cluster.ControlConnection do
:ok <- register_to_events(data, connected_node),
:ok <- inet_mod(transport).setopts(socket, active: :once) do
[local_host | _] = peers
{:ok, %ConnectedNode{connected_node | host: local_host}, peers}

res = {:ok, %ConnectedNode{connected_node | host: local_host}, peers}
Logger.debug("#{inspect(res)}")
res
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is something we should log. If we want to, we should log it with a friendly message, but I think we can skip the logging for now and go back to the previous code. Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this section because between the last log DEBUG line and the next logging DEBUG line, there was quite a few places where state was being changed that couldn't be debugged without this. I'll add an additional message to it

else
{:error, {:use_this_protocol_instead, _failed_protocol_version, proto_vsn}} ->
Logger.debug("Cassandra said to use protocol #{inspect(proto_vsn)}, reconnecting")
Expand All @@ -429,6 +434,38 @@ defmodule Xandra.Cluster.ControlConnection do
Logger.metadata(xandra_address: nil, xandra_port: nil)
end

# checks if the node is actually up by executing a node ready
defp attempt_to_connect_to_node({address, port} = node, data) do
import Utils, only: [request_options: 3]
%__MODULE__{options: options, transport: transport} = data

# A nil :protocol_version means "negotiate". A non-nil one means "enforce".
proto_vsn = Keyword.get(options, :protocol_version)

Logger.metadata(xandra_address: format_address(address), xandra_port: port)
Logger.debug("Attempting to connect to #{inspect(node)}")

with {:ok, socket} <- transport.connect(address, port, data.transport_options, @default_timeout),
{:ok, supported_opts, proto_mod} <- request_options(transport, socket, proto_vsn),
{:ok, {ip, port}} <- inet_mod(transport).peername(socket),
connected_node = %ConnectedNode{
socket: socket,
protocol_module: proto_mod,
ip: ip,
port: port
},
:ok <- startup_connection(data, connected_node, supported_opts)
do
Logger.debug("Node #{inspect(node)} is up")
:ok
else
_ ->
Logger.debug("Node #{inspect(node)} is still down")
{:error, :timeout}
end

end

defp refresh_topology(%__MODULE__{peers: old_peers} = data, new_peers) do
old_peers_set = old_peers |> Map.keys() |> MapSet.new()
new_peers_set = MapSet.new(new_peers, &Host.to_peername/1)
Expand All @@ -446,25 +483,30 @@ defmodule Xandra.Cluster.ControlConnection do
})
end)

{existing_hosts, discovered_hosts} =
Enum.reduce(new_peers, {[], []}, fn %Host{} = host, {existing_acc, discovered_acc} ->

{existing_hosts, discovered_hosts, downed_hosts} =
Enum.reduce(new_peers, {[], [], MapSet.new()}, fn %Host{} = host, {existing_acc, discovered_acc, downed_acc} ->
peername = Host.to_peername(host)

case Map.fetch(old_peers, peername) do
{:ok, %{status: :up}} ->
{existing_acc ++ [host], discovered_acc}
{existing_acc ++ [host], discovered_acc, downed_acc}

{:ok, %{status: :down}} ->
execute_telemetry(data, [:change_event], %{}, %{
event_type: :host_up,
host: host,
changed: true,
source: :xandra
})

send(data.cluster, {:host_up, host})
{existing_acc ++ [host], discovered_acc}

case attempt_to_connect_to_node({host.address, host.port}, data) do
:ok ->
execute_telemetry(data, [:change_event], %{}, %{
event_type: :host_up,
host: host,
changed: true,
source: :xandra
})
send(data.cluster, {:host_up, host})
{existing_acc ++ [host], discovered_acc, downed_acc}
_ ->
send(data.cluster, {:host_down, host})
{existing_acc ++ [host], discovered_acc, MapSet.put(downed_acc, peername)}
end
:error ->
execute_telemetry(data, [:change_event], %{}, %{
event_type: :host_added,
Expand All @@ -473,7 +515,7 @@ defmodule Xandra.Cluster.ControlConnection do
source: :xandra
})

{existing_acc, discovered_acc ++ [host]}
{existing_acc, discovered_acc ++ [host], downed_acc}
end
end)

Expand All @@ -483,7 +525,12 @@ defmodule Xandra.Cluster.ControlConnection do

final_peers =
Enum.reduce(existing_hosts ++ discovered_hosts, %{}, fn host, acc ->
Map.put(acc, Host.to_peername(host), %{host: host, status: :up})
peername = Host.to_peername(host)
status = case MapSet.member?(downed_hosts, peername) do
true -> :down
false -> :up
end
Map.put(acc, Host.to_peername(host), %{host: host, status: status, last_seen_at: System.system_time(:millisecond)})
end)

data =
Expand Down Expand Up @@ -539,6 +586,10 @@ defmodule Xandra.Cluster.ControlConnection do
local_peer = queried_peer_to_host(local_node_info)
local_peer = %Host{local_peer | address: node.ip, port: node.port}

Logger.debug("Local peer: #{inspect(local_peer)}")
Logger.debug("Peers: #{inspect(peers)}")

Logger.debug("Cluster: #{inspect(data.cluster)}")
# We filter out the peers with null host_id because they seem to be nodes that are down or
# decommissioned but not removed from the cluster. See
# https://github.com/lexhide/xandra/pull/196 and
Expand All @@ -554,13 +605,25 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

defp same_peer_was_recently_downed?(%__MODULE__{} = data, peer) do
with {:ok, peer_state} <- Map.fetch(data.peers, peer),
:down <- peer_state.status,
last_down_time <- peer_state |> Access.get(:last_seen_at, 0)
do
last_down_time + @default_backoff > System.monotonic_time(:millisecond)
else
_ -> false
end
end

defp handle_change_event(data, _connected_node, %StatusChange{
effect: "UP",
address: address,
port: port
}) do
peer = {address, port}
%{host: host, status: status} = Map.fetch!(data.peers, peer)

telemetry_meta = %{event_type: :host_up, source: :cassandra, host: host}

case status do
Expand Down Expand Up @@ -600,7 +663,9 @@ defmodule Xandra.Cluster.ControlConnection do
execute_telemetry(data, [:change_event], %{}, Map.put(telemetry_meta, :changed, true))
data = update_in(data.lbp, &LoadBalancingPolicy.update_host(&1, host, :down))
send(data.cluster, {:host_down, host})
{put_in(data.peers[peer].status, :down), _actions = []}
data = put_in(data.peers[peer].status, :down)
data = put_in(data.peers[peer].last_seen_at, System.system_time(:millisecond))
{data, _actions = []}
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/xandra/connection/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ defmodule Xandra.Connection.Utils do
# TODO: handle :error frames for things like :protocol_violation.
case frame do
%Frame{kind: :ready, body: <<>>} ->
Logger.debug("FRAME READY #{inspect(frame)}, #{inspect(rest)}, #{inspect(socket)}}")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already logged one line below, so I don't think we should log this line as well.

Logger.debug("Received READY frame")
:ok

Expand Down
31 changes: 31 additions & 0 deletions test_ccm/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
ARG ELIXIR_VERSION=1.14.1
ARG OTP_VERSION=25.1
FROM hexpm/elixir:${ELIXIR_VERSION}-erlang-${OTP_VERSION}-ubuntu-xenial-20210804

WORKDIR /app

ENV MIX_ENV=test

# Install Docker and Docker Compose to control sibling containers, and Git for installing
# Git dependencies if necessary.
RUN apt-get update && \
apt-get install -y git openssl curl python3-pip python3-dev openjdk-8-jdk socat

RUN pip3 install ccm
RUN ccm create test -v 3.11.0 && ccm populate -n 4

# Copy only the files needed to fetch and compile deps.
COPY mix.exs .
COPY mix.lock .

# Install rebar3 and Hex and then compile dependencies. This will be cached by
# Docker unless we change dependencies.
RUN mix do local.rebar --force, \
local.hex --force, \
deps.get --only test, \
deps.compile

# Now copy Xandra's code.
COPY lib lib
COPY test test
COPY test_ccm test_ccm
Loading