Skip to content

Commit

Permalink
Re-haul Telemetry documentation
Browse files Browse the repository at this point in the history
Closes #325.
  • Loading branch information
whatyouhide committed Aug 18, 2023
1 parent 23c3012 commit 67e9cf1
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 291 deletions.
3 changes: 2 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[
inputs: [
"{mix,.formatter}.exs",
"{config,lib,test,test_clustering}/**/*.{ex,exs}"
"{config,lib,test,test_clustering}/**/*.{ex,exs}",
"pages/*.exs"
],
import_deps: [:stream_data]
]
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
/priv/plts
erl_crash.dump
*.ez
/pages/Telemetry events.md
94 changes: 3 additions & 91 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -92,97 +92,9 @@ defmodule Xandra.Cluster do
## Telemetry
This section describes all the Telemetry events that `Xandra.Cluster` emits. These events
are available since *v0.15.0*. See also `Xandra.Telemetry`.
* `[:xandra, :cluster, :change_event]` — emitted when there is a change in the
cluster, either as reported by Cassandra itself or as detected by Xandra.
**Measurements**: *none*.
**Metadata**:
* `:event_type` - one of `:host_up` (a host went up), `:host_down` (a host went down),
`:host_added` (a host was added to the cluster topology), or `:host_removed` (a host
was removed from the cluster topology).
* `:source` - one of `:cassandra` or `:xandra`. If the event was reported by
Cassandra itself, the source is `:cassandra`. If the event was detected by
Xandra, the source is `:xandra`.
* `:changed` (`t:boolean/0`) - this is `true` if the node wasn't in the state
reported by the event, and `false` if the node was already in the reported state.
* `:host` (`t:Xandra.Cluster.Host.t/0`) - the host that went up or down.
* `:cluster_pid` (`t:pid/0`) - the PID of the cluster process.
* `:cluster_name` - the name of the cluster executing the event, if provided
through the `:name` option in `start_link/1`. `nil` if no `:name` was provided.
* `[:xandra, :cluster, :control_connection, :connected]` — emitted when the control
connection for the cluster is established.
**Measurements**: *none*.
**Metadata**:
* `:host` (`t:Xandra.Cluster.Host.t/0`) - the host that the control connection is
connected to.
* `:cluster_pid` (`t:pid/0`) - the PID of the cluster process.
* `:cluster_name` - the name of the cluster executing the event, if provided
through the `:name` option in `start_link/1`. `nil` if no `:name` was provided.
* `[:xandra, :cluster, :control_connection, :disconnected]` — emitted when the control
connection for the cluster is established.
**Measurements**: *none*.
**Metadata**:
* `:host` (`t:Xandra.Cluster.Host.t/0`) - the host that the control connection is
connected to.
* `:reason` - the reason for the disconnection. For example, `:closed` if the connected
node closes the connection peacefully.
* `:cluster_pid` (`t:pid/0`) - the PID of the cluster process.
* `:cluster_name` - the name of the cluster executing the event, if provided
through the `:name` option in `start_link/1`. `nil` if no `:name` was provided.
* `[:xandra, :cluster, :control_connection, :failed_to_connect]` — (available since v0.17.0)
emitted when the control connection for the cluster fails to connect to the given node.
**Measurements**: *none*.
**Metadata**:
* `:host` (`t:Xandra.Cluster.Host.t/0`) - the host that the control connection failed
to connect to.
* `:reason` - the reason for the failure.
* `:cluster_pid` (`t:pid/0`) - the PID of the cluster process.
* `:cluster_name` - the name of the cluster executing the event, if provided
through the `:name` option in `start_link/1`. `nil` if no `:name` was provided.
* `[:xandra, :cluster, :pool, :started | :restarted]` — (available since v0.17.0) emitted
when a pool of connection to a node is started or restarted.
**Measurements**: *none*.
**Metadata**:
* `:host` (`t:Xandra.Cluster.Host.t/0`) - the host that the pool connected or reconnected
to.
* `:cluster_pid` (`t:pid/0`) - the PID of the cluster process.
* `:cluster_name` - the name of the cluster executing the event, if provided
through the `:name` option in `start_link/1`. `nil` if no `:name` was provided.
* `[:xandra, :cluster, :discovered_peers]` - (available since v0.17.0) executed when
the Xandra cluster's control connection discovers peers. The peers might have been
already discovered in the past, so you'll need to keep track of new peers if you need to.
**Measurements**:
* `:peers` (list of `t:Xandra.Cluster.Host.t/0`) the discovered peers.
**Metadata**:
* `:cluster_pid` (`t:pid/0`) - the PID of the cluster process.
* `:cluster_name` - the name of the cluster executing the event, if provided
through the `:name` option in `start_link/1`. `nil` if no `:name` was provided.
`Xandra.Cluster` emits several Telemetry events to help you log, instrument,
and debug your application. See the [*Telemetry Events*](telemetry-events.html)
page in the guides for a comprehensive list of the events that Xandra emits.
"""

alias Xandra.{Batch, ConnectionError, OptionsValidators, Prepared, RetryStrategy}
Expand Down
1 change: 0 additions & 1 deletion lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ defmodule Xandra.Cluster.ControlConnection do
end

defp refresh_topology(%__MODULE__{} = state, new_peers) do
:telemetry.execute([:xandra, :cluster, :discovered_peers], %{peers: new_peers}, %{})
send(state.cluster_pid, {:discovered_hosts, new_peers})
state
end
Expand Down
110 changes: 52 additions & 58 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,25 @@ defmodule Xandra.Connection do
|> Keyword.merge(@forced_transport_options)
}

state = %__MODULE__{
transport: transport,
prepared_cache: Keyword.fetch!(options, :prepared_cache),
compressor: compressor,
default_consistency: Keyword.fetch!(options, :default_consistency),
atom_keys?: Keyword.fetch!(options, :atom_keys),
current_keyspace: nil,
address: address,
port: port,
connection_name: connection_name,
cluster_pid: cluster_pid,
pool_index: Keyword.fetch!(options, :pool_index)
}

case Transport.connect(transport, address, port, @default_timeout) do
{:ok, transport} ->
{:ok, peername} = Transport.address_and_port(transport)

state = %__MODULE__{
transport: transport,
prepared_cache: Keyword.fetch!(options, :prepared_cache),
compressor: compressor,
default_consistency: Keyword.fetch!(options, :default_consistency),
atom_keys?: Keyword.fetch!(options, :atom_keys),
current_keyspace: nil,
address: address,
port: port,
connection_name: connection_name,
cluster_pid: cluster_pid,
pool_index: Keyword.fetch!(options, :pool_index),
peername: peername
}
state = %__MODULE__{state | transport: transport, peername: peername}

with {:ok, supported_options, protocol_module} <-
Utils.request_options(transport, enforced_protocol),
Expand All @@ -71,13 +72,14 @@ defmodule Xandra.Connection do
compressor,
options
) do
:telemetry.execute([:xandra, :connected], %{}, %{
connection_name: connection_name,
address: address,
port: port,
protocol_module: protocol_module,
supported_options: supported_options
})
:telemetry.execute(
[:xandra, :connected],
%{},
telemetry_meta(state, %{
protocol_module: protocol_module,
supported_options: supported_options
})
)

if cluster_pid do
send(cluster_pid, {:xandra, :connected, peername, self()})
Expand All @@ -93,12 +95,14 @@ defmodule Xandra.Connection do
"""

{:error, {:use_this_protocol_instead, failed_protocol_version, protocol_version}} ->
:telemetry.execute([:xandra, :debug, :downgrading_protocol], %{}, %{
failed_version: failed_protocol_version,
new_version: protocol_version,
address: address,
port: port
})
:telemetry.execute(
[:xandra, :debug, :downgrading_protocol],
%{},
telemetry_meta(state, %{
failed_version: failed_protocol_version,
new_version: protocol_version
})
)

_transport = Transport.close(transport)
options = Keyword.put(options, :protocol_version, protocol_version)
Expand All @@ -119,13 +123,11 @@ defmodule Xandra.Connection do
{:error, _reason} -> address
end

:telemetry.execute([:xandra, :failed_to_connect], %{}, %{
connection: self(),
connection_name: connection_name,
address: address,
port: port,
reason: reason
})
:telemetry.execute(
[:xandra, :failed_to_connect],
%{},
telemetry_meta(state, %{reason: reason})
)

if cluster_pid do
send(cluster_pid, {:xandra, :failed_to_connect, {ipfied_address, port}, self()})
Expand Down Expand Up @@ -193,14 +195,7 @@ defmodule Xandra.Connection do
force? = Keyword.fetch!(options, :force)

telemetry_metadata = Keyword.fetch!(options, :telemetry_metadata)

metadata = %{
query: prepared,
connection_name: state.connection_name,
address: state.address,
port: state.port,
extra_metadata: telemetry_metadata
}
metadata = telemetry_meta(state, %{query: prepared, extra_metadata: telemetry_metadata})

case prepared_cache_lookup(state, prepared, force?) do
{:ok, prepared} ->
Expand Down Expand Up @@ -293,14 +288,7 @@ defmodule Xandra.Connection do
assert_valid_compressor(state.compressor, options[:compressor])

telemetry_metadata = Keyword.fetch!(options, :telemetry_metadata)

metadata = %{
query: query,
connection_name: state.connection_name,
address: state.address,
port: state.port,
extra_metadata: telemetry_metadata
}
metadata = telemetry_meta(state, %{query: query, extra_metadata: telemetry_metadata})

:telemetry.span(
[:xandra, :execute_query],
Expand Down Expand Up @@ -354,7 +342,7 @@ defmodule Xandra.Connection do
if warnings != [] do
metadata =
state
|> Map.take([:address, :port, :current_keyspace])
|> telemetry_meta(Map.take(state, [:current_keyspace]))
|> Map.put(:query, query)

:telemetry.execute([:xandra, :server_warnings], %{warnings: warnings}, metadata)
Expand All @@ -368,13 +356,7 @@ defmodule Xandra.Connection do

@impl true
def disconnect(exception, %__MODULE__{} = state) do
:telemetry.execute([:xandra, :disconnected], %{}, %{
connection: self(),
connection_name: state.connection_name,
address: state.address,
port: state.port,
reason: exception
})
:telemetry.execute([:xandra, :disconnected], %{}, telemetry_meta(state, %{reason: exception}))

if state.cluster_pid do
send(state.cluster_pid, {:xandra, :disconnected, state.peername, self()})
Expand Down Expand Up @@ -502,4 +484,16 @@ defmodule Xandra.Connection do
"module (which uses the #{inspect(initial_algorithm)} algorithm)"
end
end

defp telemetry_meta(%__MODULE__{} = state, extra_meta) do
Map.merge(
%{
connection: self(),
connection_name: state.connection_name,
address: state.address,
port: state.port
},
extra_meta
)
end
end
Loading

0 comments on commit 67e9cf1

Please sign in to comment.