diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 74db442c..fb612d75 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -8,11 +8,11 @@ different ports to test different features (such as authenticationn). To run normal tests, do this from the root of the project: ```bash -docker-compose up --daemon +docker-compose up --detach mix test ``` -The `--daemon` flag runs the instances as daemons in the background. Give it a +The `--detach` flag runs the instances as daemons in the background. Give it a minute between starting the services and running `mix test.all` since Cassandra takes a while to start. You can check whether the Docker containers are ready with `docker-compose ps`. To stop the services, run `docker-compose stop`. diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index 6e2b7441..059bfd1d 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -345,6 +345,16 @@ defmodule Xandra.Cluster do *Available since v0.18.0*. """ ], + use_rpc_address_for_peer_address: [ + type: :boolean, + doc: """ + In the system.peers table use the `rpc_address` column for the + peer/Host address and not the `peer` column + """, + default: false, + required: false + ], + # Internal for testing, not exposed. xandra_module: [type: :atom, default: Xandra, doc: false], diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index 7b16c24c..07d0fdaa 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -29,7 +29,8 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: [type: {:tuple, [{:or, [:string, :any]}, :non_neg_integer]}, required: true], connection_options: [type: :keyword_list, required: true], autodiscovered_nodes_port: [type: :non_neg_integer, required: true], - refresh_topology_interval: [type: :timeout, required: true] + refresh_topology_interval: [type: :timeout, required: true], + use_rpc_address_for_peer_address: [type: :boolean, default: false, required: false] ] defstruct [ @@ -52,6 +53,10 @@ defmodule Xandra.Cluster.ControlConnection do # The interval at which to refresh the cluster topology. :refresh_topology_interval, + # In the system.peers table use the `rpc_address` column for the + # peer/Host address and not the `peer` column + :use_rpc_address_for_peer_address, + # The protocol module of the node we're connected to. :protocol_module, @@ -83,6 +88,7 @@ defmodule Xandra.Cluster.ControlConnection do contact_node: contact_node, autodiscovered_nodes_port: Keyword.fetch!(options, :autodiscovered_nodes_port), refresh_topology_interval: Keyword.fetch!(options, :refresh_topology_interval), + use_rpc_address_for_peer_address: Keyword.fetch!(options, :use_rpc_address_for_peer_address), connection_options: connection_opts, transport: transport } @@ -144,7 +150,8 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port + state.port, + state.use_rpc_address_for_peer_address ), :ok <- Transport.setopts(state.transport, active: :once) do state = refresh_topology(state, peers) @@ -213,7 +220,8 @@ defmodule Xandra.Cluster.ControlConnection do state.autodiscovered_nodes_port, state.protocol_module, state.ip, - state.port + state.port, + state.use_rpc_address_for_peer_address ), :ok <- register_to_events(state), :ok <- Transport.setopts(state.transport, active: :once) do @@ -351,7 +359,8 @@ defmodule Xandra.Cluster.ControlConnection do :inet.port_number(), module(), :inet.ip_address(), - :inet.port_number() + :inet.port_number(), + boolean() ) :: {:ok, [Host.t()]} | {:error, :closed | :inet.posix()} def fetch_cluster_topology( @@ -359,13 +368,14 @@ defmodule Xandra.Cluster.ControlConnection do autodiscovered_nodes_port, protocol_module, ip, - port + port, + use_rpc_address ) when is_integer(autodiscovered_nodes_port) and is_atom(protocol_module) do with {:ok, [local_node_info]} <- query(transport, protocol_module, @select_local_query), {:ok, peers} <- query(transport, protocol_module, @select_peers_query) do local_peer = %Host{ - queried_peer_to_host(local_node_info) + queried_peer_to_host(local_node_info, use_rpc_address) | address: ip, port: port } @@ -376,7 +386,7 @@ defmodule Xandra.Cluster.ControlConnection do # https://user.cassandra.apache.narkive.com/APRtj5hb/system-peers-and-decommissioned-nodes. peers = for peer_attrs <- peers, - peer = %Host{queried_peer_to_host(peer_attrs) | port: autodiscovered_nodes_port}, + peer = %Host{queried_peer_to_host(peer_attrs, use_rpc_address) | port: autodiscovered_nodes_port}, not is_nil(peer.host_id), do: peer @@ -412,13 +422,81 @@ defmodule Xandra.Cluster.ControlConnection do end end - defp queried_peer_to_host(%{"peer" => _} = peer_attrs) do - {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, use_rpc_address) when is_tuple(rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") peer_attrs = Map.put(peer_attrs, "address", address) - queried_peer_to_host(peer_attrs) + queried_peer_to_host(peer_attrs, use_rpc_address) end - defp queried_peer_to_host(%{} = peer_attrs) do + defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, use_rpc_address) do + {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + peer_attrs = Map.delete(peer_attrs, "peer") + peer_attrs = + case :inet.parse_address(address) do + {:ok, valid_address_tuple} -> + Map.put(peer_attrs, "address", valid_address_tuple) + + error -> + Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # failed to parse, however, use what was returned in the table, see if + # node_validation will pass on it + Map.put(peer_attrs, "address", address) + end + queried_peer_to_host(peer_attrs, use_rpc_address) + end + + + # defp queried_peer_to_host(%{"rpc_address" => rpc_address} = peer_attrs, true = use_rpc_address) when is_tuple(rpc_address) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + # peer_attrs = Map.delete(peer_attrs, "peer") + # peer_attrs = Map.put(peer_attrs, "address", address) + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end + + # defp queried_peer_to_host(%{"rpc_address" => _} = peer_attrs, true = use_rpc_address) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "rpc_address") + # peer_attrs = Map.delete(peer_attrs, "peer") + # peer_attrs = + # case :inet.parse_address(address) do + # {:ok, valid_address_tuple} -> + # Map.put(peer_attrs, "address", valid_address_tuple) + + # error -> + # Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # # failed to parse, however, use what was returned in the table, see if + # # node_validation will pass on it + # Map.put(peer_attrs, "address", address) + # end + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end + + # defp queried_peer_to_host(%{"peer" => peer} = peer_attrs, use_rpc_address) when is_tuple(peer) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + # peer_attrs = Map.delete(peer_attrs, "rpc_address") + # peer_attrs = Map.put(peer_attrs, "address", address) + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end + + # defp queried_peer_to_host(%{"peer" => _} = peer_attrs, use_rpc_address) do + # {address, peer_attrs} = Map.pop!(peer_attrs, "peer") + # peer_attrs = Map.delete(peer_attrs, "rpc_address") + # peer_attrs = + # case :inet.parse_address(address) do + # {:ok, valid_address_tuple} -> + # Map.put(peer_attrs, "address", valid_address_tuple) + + # error -> + # Logger.error("queried_peer_to_host: error converting address (#{inspect(address)}) to tuple, error: #{inspect(error)}") + # # failed to parse, however, use what was returned in the table, see if + # # node_validation will pass on it + # Map.put(peer_attrs, "address", address) + # end + + # queried_peer_to_host(peer_attrs, use_rpc_address) + # end + + defp queried_peer_to_host(%{} = peer_attrs, _) do columns = [ "address", "data_center", diff --git a/lib/xandra/cluster/host.ex b/lib/xandra/cluster/host.ex index e94cc015..a2bc1fc5 100644 --- a/lib/xandra/cluster/host.ex +++ b/lib/xandra/cluster/host.ex @@ -67,6 +67,7 @@ defmodule Xandra.Cluster.Host do String.t() def format_peername({address, port}) do if ip_address?(address) do + IO.puts("DEBUG -- format_peername - input #{inspect({address, port})} return #{:inet.ntoa(address)}:#{port}}") "#{:inet.ntoa(address)}:#{port}" else "#{address}:#{port}" diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index aa94ccca..1700aa82 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -107,6 +107,10 @@ defmodule Xandra.Cluster.Pool do # The name of the cluster (if present), only used for Telemetry events. :name, + # In the system.peers table use the `rpc_address` column for the + # peer/Host address and not the `peer` column + use_rpc_address_for_peer_address: false, + # A map of peername ({address, port}) to info about that peer. # Each info map is: # %{pool_pid: pid(), pool_ref: ref(), host: Host.t(), status: :up | :down | :connected} @@ -463,6 +467,7 @@ defmodule Xandra.Cluster.Pool do defp start_pool(%__MODULE__{} = data, %Host{} = host) do conn_options = Keyword.merge(data.pool_options, nodes: [Host.format_address(host)], cluster_pid: self()) + # Keyword.merge(data.pool_options, nodes: [host], cluster_pid: self()) peername = Host.to_peername(host) @@ -564,7 +569,8 @@ defmodule Xandra.Cluster.Pool do contact_node: {host.address, host.port}, connection_options: data.pool_options, autodiscovered_nodes_port: data.autodiscovered_nodes_port, - refresh_topology_interval: data.refresh_topology_interval + refresh_topology_interval: data.refresh_topology_interval, + use_rpc_address_for_peer_address: data.use_rpc_address_for_peer_address ] case data.control_conn_mod.start_link(control_conn_opts) do diff --git a/lib/xandra/options_validators.ex b/lib/xandra/options_validators.ex index 99d33b13..3ea866ab 100644 --- a/lib/xandra/options_validators.ex +++ b/lib/xandra/options_validators.ex @@ -62,6 +62,28 @@ defmodule Xandra.OptionsValidators do end end + def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_tuple(address) do + case :inet.ntoa(address) do + {:error, :einval} -> + {:error, + "expected valid address tuple, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: :einval"} + + _valid_address -> + {:ok, {address, port}} + end + end + + def validate_node(%Xandra.Cluster.Host{address: address, port: port}) when is_list(address) do + case :inet.parse_address(address) do + {:ok, valid_address} -> + {:ok, {valid_address, port}} + + error -> + {:error, + "expected valid address char list, got: address: #{inspect(address)} and port: #{inspect(port)}, with error: #{inspect(error)}"} + end + end + def validate_node(other) do {:error, "expected node to be a string or a {ip, port} tuple, got: #{inspect(other)}"} end diff --git a/lib/xandra/transport.ex b/lib/xandra/transport.ex index b8a858db..e7d8c5f5 100644 --- a/lib/xandra/transport.ex +++ b/lib/xandra/transport.ex @@ -43,6 +43,9 @@ defmodule Xandra.Transport do @spec address_and_port(t()) :: {:ok, {:inet.ip_address(), :inet.port_number()}} | {:error, error_reason} def address_and_port(%__MODULE__{socket: socket} = transport) when not is_nil(socket) do + IO.puts("inet_mod(transport.module).peername(socket) transport.module: #{inspect(transport.module)}, socket #{inspect(socket)}") + IO.puts("\tinet_mod(transport.module).peername(socket): #{inspect(inet_mod(transport.module).peername(socket))}") + inet_mod(transport.module).peername(socket) end