Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IPv6 support in Xandra.Cluster #328

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ different ports to test different features (such as authenticationn). To run
normal tests, do this from the root of the project:

```bash
docker-compose up --daemon
docker-compose up --detach
mix test
```

The `--daemon` flag runs the instances as daemons in the background. Give it a
The `--detach` flag runs the instances as daemons in the background. Give it a
minute between starting the services and running `mix test.all` since Cassandra
takes a while to start. You can check whether the Docker containers are ready
with `docker-compose ps`. To stop the services, run `docker-compose stop`.
Expand Down
9 changes: 9 additions & 0 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,15 @@ defmodule Xandra.Cluster do
*Available since v0.18.0*.
"""
],
use_rpc_address_for_peer_address: [
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the cases when we'd want to use peer instead of rpc_address?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peer was the original implementation. I don't know why you would use peer instead of rpc_address and not much information when I google this, so going to phone a friend. @relistan do you know or think this should just solely use rpc_address and no longer use peer? (I think that is what @whatyouhide is asking).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's what I’m saying. @jacquelineIO I also recommend doing just what any other C* driver maintained by Datastax is doing. I always reference the Python driver. I don't have much time to look into this until maybe Sunday, so if you want you can just take a look there and use what they're using 😉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it is using rpc_address with a fallback to peer if rpc_address isn't available

So I can remove this flag and have a preference for using rpc_address, which the code is basically already set up to do

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do that so that we don't have to put this decision on users 🙃 Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, catching up here. That sounds right to me, then.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacquelineIO any news on this and timeline on when I can expect these changes to go in? I'm planning the 0.18.0 release, that's why I ask 🙃

type: :boolean,
doc: """
In the system.peers table use the `rpc_address` column for the
peer/Host address and not the `peer` column
""",
default: false,
required: false
],

# Internal for testing, not exposed.
xandra_module: [type: :atom, default: Xandra, doc: false],
Expand Down
88 changes: 78 additions & 10 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ defmodule Xandra.Cluster.ControlConnection do
contact_node: [type: {:tuple, [{:or, [:string, :any]}, :non_neg_integer]}, required: true],
connection_options: [type: :keyword_list, required: true],
autodiscovered_nodes_port: [type: :non_neg_integer, required: true],
refresh_topology_interval: [type: :timeout, required: true]
refresh_topology_interval: [type: :timeout, required: true],
use_rpc_address_for_peer_address: [type: :boolean, required: true]
]

defstruct [
Expand All @@ -52,6 +53,10 @@ defmodule Xandra.Cluster.ControlConnection do
# The interval at which to refresh the cluster topology.
:refresh_topology_interval,

# In the system.peers table use the `rpc_address` column for the
# peer/Host address and not the `peer` column
:use_rpc_address_for_peer_address,

# The protocol module of the node we're connected to.
:protocol_module,

Expand Down Expand Up @@ -83,6 +88,8 @@ defmodule Xandra.Cluster.ControlConnection do
contact_node: contact_node,
autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port),
refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval),
use_rpc_address_for_peer_address:
Keyword.fetch!(options, :use_rpc_address_for_peer_address),
connection_options: connection_opts,
transport: transport
}
Expand Down Expand Up @@ -144,7 +151,8 @@ defmodule Xandra.Cluster.ControlConnection do
state.autodiscovered_nodes_port,
state.protocol_module,
state.ip,
state.port
state.port,
state.use_rpc_address_for_peer_address
),
:ok <- Transport.setopts(state.transport, active: :once) do
state = refresh_topology(state, peers)
Expand Down Expand Up @@ -213,7 +221,8 @@ defmodule Xandra.Cluster.ControlConnection do
state.autodiscovered_nodes_port,
state.protocol_module,
state.ip,
state.port
state.port,
state.use_rpc_address_for_peer_address
),
:ok <- register_to_events(state),
:ok <- Transport.setopts(state.transport, active: :once) do
Expand Down Expand Up @@ -351,21 +360,23 @@ defmodule Xandra.Cluster.ControlConnection do
:inet.port_number(),
module(),
:inet.ip_address(),
:inet.port_number()
:inet.port_number(),
boolean()
) ::
{:ok, [Host.t()]} | {:error, :closed | :inet.posix()}
def fetch_cluster_topology(
%Transport{} = transport,
autodiscovered_nodes_port,
protocol_module,
ip,
port
port,
use_rpc_address
)
when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do
with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query),
{:ok, peers} <- query(transport, protocol_module, @select_peers_query) do
local_peer = %Host{
queried_peer_to_host(local_node_info)
queried_peer_to_host(local_node_info, use_rpc_address)
| address: ip,
port: port
}
Expand All @@ -376,7 +387,10 @@ defmodule Xandra.Cluster.ControlConnection do
# https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes.
peers =
for peer_attrs <- peers,
peer = %Host{queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port},
peer = %Host{
queried_peer_to_host(peer_attrs, use_rpc_address)
| port: autodiscovered_nodes_port
},
not is_nil(peer.host_id),
do: peer

Expand Down Expand Up @@ -412,13 +426,67 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do
defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address)
when is_tuple(rpc_address) do
{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs, use_rpc_address)
end

defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is it that we get rpc_address as a string? Xandra should return :inet.ip_address/0 for IPs, with a 4-element tuple for IPv4 and 8-element tuple for IPv6.

Copy link
Contributor Author

@jacquelineIO jacquelineIO Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the peers table is queried, it was using the string.

For example this was one of the logs (when address is ipv6 this was another issue because the validation for ipv6 strings fails in xandra)

[] 21:35:58.910 [info] Xandra: [:connected]: %{address: '10.4.0.12', connection_name: nil, port: 11172, protocol_module: Xandra.Protocol.V4, supported_options: %{"COMPRESSION" => ["snappy", "lz4"], "CQL_VERSION" => ["3.4.5"], "PROTOCOL_VERSIONS" => ["3/v3", "4/v4", "5/v5", "6/v6-beta"]}}

Now the log looks like

[] 16:52:41.083 [info] Xandra Cluster: [:change_event]: %{cluster_name: {:local, FastSummariesConsumer.Store}, cluster_pid: #PID<0.5488.0>, event_type: :host_added, host: %Xandra.Cluster.Host{address: {10, 4, 0, 12},

There is still another place where strings are being used for the original host for the control connection if I'm not mistaken but it broke a lot of tests trying to change that out. That might be good for a different PR to fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other location where string is in use https://github.com/Shimmur/xandra/blob/c563d737b99bcdec195a38408db490fec046a5e8/lib/xandra/cluster/pool.ex#L469 and I think might be good to convert this to a tuple as well.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the peers table is queried, it was using the string.

I’m confused. If you look at the schema for the system.peers table in C* itself, rpc_address is of type inet, so how can the result of the query to that table include rpc_address as a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also adding the validate_node options. There was only one previously and it took a string https://github.com/lexhide/xandra/blob/main/lib/xandra/options_validators.ex#L51
if it were not a string, it would have failed...

Copy link
Contributor

@relistan relistan Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about in this particular case, but we were seeing IPv6 addresses getting returned as strings. This was then causing DNS lookups which failed. @jacquelineIO can confirm if this is the reason or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whatyouhide you are right about the query returning the inet format.
It looks like there are some back n forth translations happening, and maybe it shouldn't be.
https://github.com/lexhide/xandra/blob/96303f07bae820ff693d85f2e1585cb343118e00/lib/xandra/cluster/pool.ex#L464-L466 part of the code is formatting the :node as a string.
e.g.

host = %Xandra.Cluster.Host{address: {10, 4, 0, 12}, port: 1234}

Xandra.Cluster.Host.format_address(host)
"10.4.0.12:1234"

so conn_options has a string and the id is a tuple. https://github.com/lexhide/xandra/blob/96303f07bae820ff693d85f2e1585cb343118e00/lib/xandra/cluster/pool.ex#L469-L471

When validate_node is run on the node address it is fine because it was a string.

So now I'm confused too. I do not completely follow the flow here...

I assumed this was pulling a string and wanted to make sure it was only using a tuple. BUT then also validate node started failing (if my memory is correct) so I had to update that too. So there might be multiple (at least one more) places in the code that a connection is happening, the first one beingstart_pool...

I got the desired result but the original connection is still a string. I this it makes sense to not use format_address for code (maybe only for logging things).

(I also feel like I'm talking in circles)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacquelineIO okay yeah this code I have in Xandra is a little bit messy. I'll do another pass and fix this up later on then, so that we can standardize on having a single way of representing addresses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whatyouhide that went in already 8340b4d

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cool awesome! Will review this soon then

{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")

peer_attrs =
case :inet.parse_address(address) do
{:ok, valid_address_tuple} ->
Map.put(peer_attrs, "address", valid_address_tuple)

error ->
Logger.error(
"queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}"
)

# failed to parse, however, use what was returned in the table, see if
# node_validation will validate it
Map.put(peer_attrs, "address", address)
end

queried_peer_to_host(peer_attrs, use_rpc_address)
end

defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address)
when is_tuple(peer) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")
peer_attrs = Map.delete(peer_attrs, "rpc_address")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs)
queried_peer_to_host(peer_attrs, use_rpc_address)
end

defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")
peer_attrs = Map.delete(peer_attrs, "rpc_address")

peer_attrs =
case :inet.parse_address(address) do
{:ok, valid_address_tuple} ->
Map.put(peer_attrs, "address", valid_address_tuple)

error ->
Logger.error(
"queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}"
)

# failed to parse, however, use what was returned in the table, see if
# node_validation will validate it
Map.put(peer_attrs, "address", address)
end

queried_peer_to_host(peer_attrs, use_rpc_address)
end

defp queried_peer_to_host(%{} = peer_attrs) do
defp queried_peer_to_host(%{} = peer_attrs, _) do
columns = [
"address",
"data_center",
Expand Down
49 changes: 27 additions & 22 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ defmodule Xandra.Cluster.Pool do
# The name of the cluster (if present), only used for Telemetry events.
:name,

# In the system.peers table use the `rpc_address` column for the
# peer/Host address and not the `peer` column
use_rpc_address_for_peer_address: false,

# A map of peername ({address, port}) to info about that peer.
# Each info map is:
# %{pool_pid: pid(), pool_ref: ref(), host: Host.t(), status: :up | :down | :connected}
Expand Down Expand Up @@ -152,25 +156,26 @@ defmodule Xandra.Cluster.Pool do
queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting)
queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout)

data =
%__MODULE__{
pool_options: pool_opts,
contact_nodes: Keyword.fetch!(cluster_opts, :nodes),
load_balancing_module: lb_mod,
load_balancing_state: lb_mod.init(lb_opts),
autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port),
xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module),
control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module),
target_pools: Keyword.fetch!(cluster_opts, :target_pools),
name: Keyword.get(cluster_opts, :name),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
reqs_before_connecting: %{
queue: :queue.new(),
max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size)
},
sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil}
}
data = %__MODULE__{
pool_options: pool_opts,
contact_nodes: Keyword.fetch!(cluster_opts, :nodes),
load_balancing_module: lb_mod,
load_balancing_state: lb_mod.init(lb_opts),
autodiscovered_nodes_port: Keyword.fetch!(cluster_opts, :autodiscovered_nodes_port),
xandra_mod: Keyword.fetch!(cluster_opts, :xandra_module),
control_conn_mod: Keyword.fetch!(cluster_opts, :control_connection_module),
target_pools: Keyword.fetch!(cluster_opts, :target_pools),
name: Keyword.get(cluster_opts, :name),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
reqs_before_connecting: %{
queue: :queue.new(),
max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size)
},
sync_connect_ref: sync_connect_ref_or_nil && {parent, sync_connect_ref_or_nil},
use_rpc_address_for_peer_address:
Keyword.fetch!(cluster_opts, :use_rpc_address_for_peer_address)
}

actions = [
{:next_event, :internal, :start_control_connection},
Expand Down Expand Up @@ -425,8 +430,7 @@ defmodule Xandra.Cluster.Pool do
end

defp handle_host_added(%__MODULE__{} = data, %Host{} = host) do
data =
update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))
data = update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))

data =
update_in(
Expand Down Expand Up @@ -564,7 +568,8 @@ defmodule Xandra.Cluster.Pool do
contact_node: {host.address, host.port},
connection_options: data.pool_options,
autodiscovered_nodes_port: data.autodiscovered_nodes_port,
refresh_topology_interval: data.refresh_topology_interval
refresh_topology_interval: data.refresh_topology_interval,
use_rpc_address_for_peer_address: data.use_rpc_address_for_peer_address
]

case data.control_conn_mod.start_link(control_conn_opts) do
Expand Down
22 changes: 22 additions & 0 deletions lib/xandra/options_validators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ defmodule Xandra.OptionsValidators do
end
end

def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_tuple(address) do
case :inet.ntoa(address) do
{:error, :einval} ->
{:error,
"expected valid address tuple, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: :einval"}

_valid_address ->
{:ok, {address, port}}
end
end

def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_list(address) do
case :inet.parse_address(address) do
{:ok, valid_address} ->
{:ok, {valid_address, port}}

error ->
{:error,
"expected valid address char list, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: #{inspect(error)}"}
end
end

def validate_node(other) do
{:error, "expected node to be a string or a {ip, port} tuple, got: #{inspect(other)}"}
end
Expand Down
31 changes: 30 additions & 1 deletion test/xandra/cluster/control_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ defmodule Xandra.Cluster.ControlConnectionTest do
refresh_topology_interval: 60_000,
autodiscovered_nodes_port: @port,
connection_options: [protocol_version: @protocol_version],
contact_node: {~c"127.0.0.1", @port}
contact_node: {~c"127.0.0.1", @port},
use_rpc_address_for_peer_address: false
]

%{mirror_ref: mirror_ref, mirror: mirror, start_options: start_options}
Expand All @@ -42,6 +43,15 @@ defmodule Xandra.Cluster.ControlConnectionTest do
assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer
end

test "reports data upon successful connection with use_rpc_address_for_peer_address = true",
%{mirror_ref: mirror_ref, start_options: start_options} do
start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true)

start_control_connection!(start_options)
assert_receive {^mirror_ref, {:discovered_hosts, [local_peer]}}
assert %Host{address: {127, 0, 0, 1}, data_center: "datacenter1", rack: "rack1"} = local_peer
end

test "fails to start if it can't connect to the contact point node",
%{start_options: start_options} do
telemetry_event = [:xandra, :cluster, :control_connection, :failed_to_connect]
Expand Down Expand Up @@ -197,6 +207,25 @@ defmodule Xandra.Cluster.ControlConnectionTest do
[%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}}
end

test "sends :discovered_hosts message when refreshing the cluster topology with use_rpc_address_for_peer_address = true",
%{mirror_ref: mirror_ref, start_options: start_options} do
start_options = Keyword.merge(start_options, use_rpc_address_for_peer_address: true)

ctrl_conn = start_control_connection!(start_options)
assert_receive {^mirror_ref, {:discovered_hosts, [%Host{address: {127, 0, 0, 1}}]}}

new_peers = [
%Host{address: {192, 168, 1, 1}, port: @port, data_center: "datacenter1"},
%Host{address: {192, 168, 1, 2}, port: @port, data_center: "datacenter2"}
]

GenServer.cast(ctrl_conn, {:refresh_topology, new_peers})

assert_receive {^mirror_ref,
{:discovered_hosts,
[%Host{address: {192, 168, 1, 1}}, %Host{address: {192, 168, 1, 2}}]}}
end

test "triggers a topology refresh with the :refresh_topology message",
%{mirror_ref: mirror_ref, start_options: start_options} do
ctrl_conn = start_control_connection!(start_options)
Expand Down
Loading
Loading