diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index f1ea02f9..eda157d1 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -6,8 +6,6 @@ defmodule Xandra.Connection do alias Xandra.{Batch, ConnectionError, Prepared, Frame, Simple, SetKeyspace} alias __MODULE__.Utils - require Logger - @default_timeout 5_000 @forced_transport_options [packet: :raw, mode: :binary, active: false] @@ -42,9 +40,6 @@ defmodule Xandra.Connection do |> Keyword.get(:transport_options, []) |> Keyword.merge(@forced_transport_options) - # Set the logger metadata for the whole process. - :ok = Logger.metadata(xandra_address: inspect(address), xandra_port: port) - case transport.connect(address, port, transport_options, @default_timeout) do {:ok, socket} -> {:ok, peername} = inet_mod(transport).peername(socket) @@ -69,9 +64,6 @@ defmodule Xandra.Connection do with {:ok, supported_options, protocol_module} <- Utils.request_options(transport, socket, enforced_protocol), state = %__MODULE__{state | protocol_module: protocol_module}, - Logger.metadata(xandra_protocol_module: state.protocol_module), - Logger.debug("Connected successfully, using protocol #{inspect(protocol_module)}"), - Logger.debug("Supported options: #{inspect(supported_options)}"), :ok <- startup_connection( transport, @@ -98,9 +90,12 @@ defmodule Xandra.Connection do """ {:error, {:use_this_protocol_instead, failed_protocol_version, protocol_version}} -> - Logger.debug( - "Could not use protocol #{inspect(failed_protocol_version)}, downgrading to #{inspect(protocol_version)}" - ) + :telemetry.execute([:xandra, :debug, :downgrading_protocol], %{}, %{ + failed_version: failed_protocol_version, + new_version: protocol_version, + address: address, + port: port + }) :ok = transport.close(socket) options = Keyword.put(options, :protocol_version, protocol_version) diff --git a/lib/xandra/connection/utils.ex b/lib/xandra/connection/utils.ex index 79093a9f..042a8b61 100644 --- a/lib/xandra/connection/utils.ex +++ b/lib/xandra/connection/utils.ex @@ -3,8 +3,6 @@ defmodule Xandra.Connection.Utils do alias Xandra.{ConnectionError, Error, Frame} - require Logger - @typep transport :: :gen_tcp | :ssl @typep socket :: :gen_tcp.socket() | :ssl.sslsocket() @@ -123,25 +121,22 @@ defmodule Xandra.Connection.Utils do |> protocol_module.encode_request(requested_options) |> Frame.encode_v4(protocol_module) - Logger.debug( - "Sending STARTUP frame with protocol #{inspect(protocol_module)} and " <> - "requested options: #{inspect(requested_options)}" - ) - # However, we need to pass the compressor module around when we # receive the response to this frame because if we said we want to use # compression, this response is already compressed. with :ok <- transport.send(socket, payload), + tmtry_meas = %{protocol_module: protocol_module, requested_options: requested_options}, + :telemetry.execute([:xandra, :debug, :sent_frame], tmtry_meas, %{frame_type: :STARTUP}), {:ok, frame, rest} <- recv_frame(transport, socket, :v4_or_less, compressor) do "" = rest # TODO: handle :error frames for things like :protocol_violation. case frame do %Frame{kind: :ready, body: <<>>} -> - Logger.debug("Received READY frame") + :telemetry.execute([:xandra, :debug, :received_frame], %{}, %{frame_type: :READY}) :ok %Frame{kind: :authenticate} -> - Logger.debug("Received AUTHENTICATE frame, authenticating connection") + :telemetry.execute([:xandra, :debug, :received_frame], %{}, %{frame_type: :AUTHENTICATE}) authenticate_connection( transport, diff --git a/lib/xandra/telemetry.ex b/lib/xandra/telemetry.ex index 5d7539b4..3f9025e8 100644 --- a/lib/xandra/telemetry.ex +++ b/lib/xandra/telemetry.ex @@ -12,7 +12,7 @@ defmodule Xandra.Telemetry do Here is a comprehensive list of the Telemetry events that Xandra emits. - ### Connection events + ### Connection Events * `[:xandra, :connected]` — executed when a connection connects to its Cassandra node. * **Measurements**: *none*. @@ -20,6 +20,9 @@ defmodule Xandra.Telemetry do * `:connection_name` - given name of the connection or `nil` if not set * `:address` - the address of the node the connection is connected to * `:port` - the port of the node the connection is connected to + * `:protocol_module` - the protocol module used by the connection + * `:supported_options` - Cassandra supported options as a map (mostly useful for + internal debugging) * `[:xandra, :disconnected]` — executed when a connection disconnects from its Cassandra node. * **Measurements**: *none*. @@ -29,7 +32,7 @@ defmodule Xandra.Telemetry do * `:port` - the port of the node the connection is connected to * `:reason` - the reason for the disconnection (usually a `DBConnection.ConnectionError`) - ### Query events + ### Query Events The `[:xandra, :prepare_query, ...]` and `[:xandra, :execute_query, ...]` events are Telemetry **spans**. See @@ -139,9 +142,36 @@ defmodule Xandra.Telemetry do * `:query` - the query that caused the warning, of type `t:Xandra.Batch.t/0`, `t:Xandra.Prepared.t/0`, or `t:Xandra.Simple.t/0` - ### Cluster events + ### Cluster Events See the "Telemetry" section in the documentation for `Xandra.Cluster`. + + ### Debugging Events + + These events are mostly meant for *debugging* Xandra itself and its internals. + You can use these events to monitor exchanges of *Cassandra Native Protocol* frames + between Xandra and the Cassandra server, for example. + + * `[:xandra, :debug, :received_frame]` + * Measurements: none + * Metadata: + * `:frame_type` - the type of the frame, for example `:READY` or `:AUTHENTICATE` + + * `[:xandra, :debug, :sent_frame]` + * Measurements: + * `:requested_options` - only for `STARTUP` frames + * `:protocol_module` - only for `STARTUP` frames + * Metadata: + * `:frame_type` - the type of the frame, for example `:STARTUP` + + * `[:xandra, :debug, :downgrading_protocol]` + * Measurements: none + * Metadata: + * `:failed_version` - the protocol version that failed + * `:new_version` - the protocol that we're downgrading to + * `:address` - the address of the node the connection is connecting to + * `:port` - the port of the node the connection is connecting to + """ @moduledoc since: "0.15.0" @@ -206,6 +236,22 @@ defmodule Xandra.Telemetry do :ok end + @spec attach_debug_handler() :: :ok + def attach_debug_handler do + events = [ + [:xandra, :debug, :received_frame], + [:xandra, :debug, :sent_frame], + [:xandra, :connected] + ] + + :telemetry.attach_many( + "xandra-debug-telemetry-handler", + events, + &__MODULE__.handle_debug_event/4, + :no_config + ) + end + @doc false @spec handle_event( :telemetry.event_name(), @@ -258,4 +304,44 @@ defmodule Xandra.Telemetry do Logger.debug("Executed query in #{duration}ms: #{inspect(metadata.query)}", logger_meta) end end + + @doc false + def handle_debug_event(event, measurements, metadata, config) + + def handle_debug_event([:xandra, :debug, :received_frame], _measurements, metadata, :no_config) do + Logger.debug("Received frame #{metadata.frame_type}", []) + end + + def handle_debug_event([:xandra, :debug, :sent_frame], measurements, metadata, :no_config) do + message = + if metadata.frame_type == :STARTUP do + "Sent frame STARTUP with protocol #{inspect(measurements.protocol_module)} " <> + "and requested options: #{inspect(measurements.requested_options)}" + else + "Sent frame #{metadata.frame_type}" + end + + Logger.debug(message) + end + + def handle_debug_event( + [:xandra, :debug, :downgrading_protocol], + _measurements, + metadata, + :no_config + ) do + Logger.debug( + "Could not use protocol #{inspect(metadata.failed_version)}, " <> + "downgrading to #{inspect(metadata.new_version)}", + xandra_address: metadata.address, + xandra_port: metadata.port + ) + end + + def handle_debug_event([:xandra, :connected], _measurements, metadata, :no_config) do + logger_meta = [xandra_address: metadata.address, xandra_port: metadata.port] + + Logger.debug("Connected using protocol #{inspect(metadata.protocol_module)}", logger_meta) + Logger.debug("Supported options: #{inspect(metadata.supported_options)}", logger_meta) + end end