Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jacqueline/ipv6 handling changes on v17 #327

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ different ports to test different features (such as authenticationn). To run
normal tests, do this from the root of the project:

```bash
docker-compose up --daemon
docker-compose up --detach
mix test
```

The `--daemon` flag runs the instances as daemons in the background. Give it a
The `--detach` flag runs the instances as daemons in the background. Give it a
minute between starting the services and running `mix test.all` since Cassandra
takes a while to start. You can check whether the Docker containers are ready
with `docker-compose ps`. To stop the services, run `docker-compose stop`.
Expand Down
10 changes: 10 additions & 0 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,16 @@ defmodule Xandra.Cluster do
*Available since v0.18.0*.
"""
],
use_rpc_address_for_peer_address: [
type: :boolean,
doc: """
In the system.peers table use the `rpc_address` column for the
peer/Host address and not the `peer` column
""",
default: false,
required: false
],


# Internal for testing, not exposed.
xandra_module: [type: :atom, default: Xandra, doc: false],
Expand Down
100 changes: 89 additions & 11 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ defmodule Xandra.Cluster.ControlConnection do
contact_node: [type: {:tuple, [{:or, [:string, :any]}, :non_neg_integer]}, required: true],
connection_options: [type: :keyword_list, required: true],
autodiscovered_nodes_port: [type: :non_neg_integer, required: true],
refresh_topology_interval: [type: :timeout, required: true]
refresh_topology_interval: [type: :timeout, required: true],
use_rpc_address_for_peer_address: [type: :boolean, default: false, required: false]
]

defstruct [
Expand All @@ -52,6 +53,10 @@ defmodule Xandra.Cluster.ControlConnection do
# The interval at which to refresh the cluster topology.
:refresh_topology_interval,

# In the system.peers table use the `rpc_address` column for the
# peer/Host address and not the `peer` column
:use_rpc_address_for_peer_address,

# The protocol module of the node we're connected to.
:protocol_module,

Expand Down Expand Up @@ -83,6 +88,7 @@ defmodule Xandra.Cluster.ControlConnection do
contact_node: contact_node,
autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port),
refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval),
use_rpc_address_for_peer_address: Keyword.fetch!(options, :use_rpc_address_for_peer_address),
connection_options: connection_opts,
transport: transport
}
Expand Down Expand Up @@ -144,7 +150,8 @@ defmodule Xandra.Cluster.ControlConnection do
state.autodiscovered_nodes_port,
state.protocol_module,
state.ip,
state.port
state.port,
state.use_rpc_address_for_peer_address
),
:ok <- Transport.setopts(state.transport, active: :once) do
state = refresh_topology(state, peers)
Expand Down Expand Up @@ -213,7 +220,8 @@ defmodule Xandra.Cluster.ControlConnection do
state.autodiscovered_nodes_port,
state.protocol_module,
state.ip,
state.port
state.port,
state.use_rpc_address_for_peer_address
),
:ok <- register_to_events(state),
:ok <- Transport.setopts(state.transport, active: :once) do
Expand Down Expand Up @@ -351,21 +359,23 @@ defmodule Xandra.Cluster.ControlConnection do
:inet.port_number(),
module(),
:inet.ip_address(),
:inet.port_number()
:inet.port_number(),
boolean()
) ::
{:ok, [Host.t()]} | {:error, :closed | :inet.posix()}
def fetch_cluster_topology(
%Transport{} = transport,
autodiscovered_nodes_port,
protocol_module,
ip,
port
port,
use_rpc_address
)
when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do
with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query),
{:ok, peers} <- query(transport, protocol_module, @select_peers_query) do
local_peer = %Host{
queried_peer_to_host(local_node_info)
queried_peer_to_host(local_node_info, use_rpc_address)
| address: ip,
port: port
}
Expand All @@ -376,7 +386,7 @@ defmodule Xandra.Cluster.ControlConnection do
# https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes.
peers =
for peer_attrs <- peers,
peer = %Host{queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port},
peer = %Host{queried_peer_to_host(peer_attrs, use_rpc_address) | port: autodiscovered_nodes_port},
not is_nil(peer.host_id),
do: peer

Expand Down Expand Up @@ -412,13 +422,81 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do
{address, peer_attrs} = Map.pop!(peer_attrs, "peer")
defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, use_rpc_address) when is_tuple(rpc_address) do
{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")
peer_attrs = Map.put(peer_attrs, "address", address)
queried_peer_to_host(peer_attrs)
queried_peer_to_host(peer_attrs, use_rpc_address)
end

defp queried_peer_to_host(%{} = peer_attrs) do
defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, use_rpc_address) do
{address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
peer_attrs = Map.delete(peer_attrs, "peer")
peer_attrs =
case :inet.parse_address(address) do
{:ok, valid_address_tuple} ->
Map.put(peer_attrs, "address", valid_address_tuple)

error ->
Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}")
# failed to parse, however, use what was returned in the table, see if
# node_validation will pass on it
Map.put(peer_attrs, "address", address)
end
queried_peer_to_host(peer_attrs, use_rpc_address)
end


# defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do
# {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
# peer_attrs = Map.delete(peer_attrs, "peer")
# peer_attrs = Map.put(peer_attrs, "address", address)
# queried_peer_to_host(peer_attrs, use_rpc_address)
# end

# defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do
# {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address")
# peer_attrs = Map.delete(peer_attrs, "peer")
# peer_attrs =
# case :inet.parse_address(address) do
# {:ok, valid_address_tuple} ->
# Map.put(peer_attrs, "address", valid_address_tuple)

# error ->
# Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}")
# # failed to parse, however, use what was returned in the table, see if
# # node_validation will pass on it
# Map.put(peer_attrs, "address", address)
# end
# queried_peer_to_host(peer_attrs, use_rpc_address)
# end

# defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do
# {address, peer_attrs} = Map.pop!(peer_attrs, "peer")
# peer_attrs = Map.delete(peer_attrs, "rpc_address")
# peer_attrs = Map.put(peer_attrs, "address", address)
# queried_peer_to_host(peer_attrs, use_rpc_address)
# end

# defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do
# {address, peer_attrs} = Map.pop!(peer_attrs, "peer")
# peer_attrs = Map.delete(peer_attrs, "rpc_address")
# peer_attrs =
# case :inet.parse_address(address) do
# {:ok, valid_address_tuple} ->
# Map.put(peer_attrs, "address", valid_address_tuple)

# error ->
# Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}")
# # failed to parse, however, use what was returned in the table, see if
# # node_validation will pass on it
# Map.put(peer_attrs, "address", address)
# end

# queried_peer_to_host(peer_attrs, use_rpc_address)
# end

defp queried_peer_to_host(%{} = peer_attrs, _) do
columns = [
"address",
"data_center",
Expand Down
1 change: 1 addition & 0 deletions lib/xandra/cluster/host.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ defmodule Xandra.Cluster.Host do
String.t()
def format_peername({address, port}) do
if ip_address?(address) do
IO.puts("DEBUG -- format_peername - input #{inspect({address, port})} return #{:inet.ntoa(address)}:#{port}}")
"#{:inet.ntoa(address)}:#{port}"
else
"#{address}:#{port}"
Expand Down
8 changes: 7 additions & 1 deletion lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ defmodule Xandra.Cluster.Pool do
# The name of the cluster (if present), only used for Telemetry events.
:name,

# In the system.peers table use the `rpc_address` column for the
# peer/Host address and not the `peer` column
use_rpc_address_for_peer_address: false,

# A map of peername ({address, port}) to info about that peer.
# Each info map is:
# %{pool_pid: pid(), pool_ref: ref(), host: Host.t(), status: :up | :down | :connected}
Expand Down Expand Up @@ -463,6 +467,7 @@ defmodule Xandra.Cluster.Pool do
defp start_pool(%__MODULE__{} = data, %Host{} = host) do
conn_options =
Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self())
# Keyword.merge(data.pool_options, nodes: [host], cluster_pid: self())

peername = Host.to_peername(host)

Expand Down Expand Up @@ -564,7 +569,8 @@ defmodule Xandra.Cluster.Pool do
contact_node: {host.address, host.port},
connection_options: data.pool_options,
autodiscovered_nodes_port: data.autodiscovered_nodes_port,
refresh_topology_interval: data.refresh_topology_interval
refresh_topology_interval: data.refresh_topology_interval,
use_rpc_address_for_peer_address: data.use_rpc_address_for_peer_address
]

case data.control_conn_mod.start_link(control_conn_opts) do
Expand Down
22 changes: 22 additions & 0 deletions lib/xandra/options_validators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ defmodule Xandra.OptionsValidators do
end
end

def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_tuple(address) do
case :inet.ntoa(address) do
{:error, :einval} ->
{:error,
"expected valid address tuple, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: :einval"}

_valid_address ->
{:ok, {address, port}}
end
end

def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_list(address) do
case :inet.parse_address(address) do
{:ok, valid_address} ->
{:ok, {valid_address, port}}

error ->
{:error,
"expected valid address char list, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: #{inspect(error)}"}
end
end

def validate_node(other) do
{:error, "expected node to be a string or a {ip, port} tuple, got: #{inspect(other)}"}
end
Expand Down
3 changes: 3 additions & 0 deletions lib/xandra/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ defmodule Xandra.Transport do
@spec address_and_port(t()) ::
{:ok, {:inet.ip_address(), :inet.port_number()}} | {:error, error_reason}
def address_and_port(%__MODULE__{socket: socket} = transport) when not is_nil(socket) do
IO.puts("inet_mod(transport.module).peername(socket) transport.module: #{inspect(transport.module)}, socket #{inspect(socket)}")
IO.puts("\tinet_mod(transport.module).peername(socket): #{inspect(inet_mod(transport.module).peername(socket))}")

inet_mod(transport.module).peername(socket)
end

Expand Down
Loading