Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: run upkeep manually #12664

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

use std::{sync::Arc, thread::available_parallelism};

use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::OnComponentInitializedHook,
BuilderContext, NodeAdapter,
};
use alloy_primitives::{BlockNumber, B256};
use eyre::{Context, OptionExt};
use rayon::ThreadPoolBuilder;
Expand Down Expand Up @@ -34,6 +39,7 @@ use reth_node_core::{
use reth_node_metrics::{
chain::ChainSpecInfo,
hooks::Hooks,
recorder::install_prometheus_recorder,
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
Expand All @@ -58,12 +64,6 @@ use tokio::sync::{
oneshot, watch,
};

use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::OnComponentInitializedHook,
BuilderContext, NodeAdapter,
};

/// Allows to set a tree viewer for a configured blockchain provider.
// TODO: remove this helper trait once the engine revamp is done, the new
// blockchain provider won't require a TreeViewer.
Expand Down Expand Up @@ -509,6 +509,9 @@ where

/// Starts the prometheus endpoint.
pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
// ensure recorder runs upkeep periodically
install_prometheus_recorder().spawn_upkeep();

let listen_addr = self.node_config().metrics;
if let Some(addr) = listen_addr {
info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
Expand Down
69 changes: 61 additions & 8 deletions crates/node/metrics/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,78 @@
use eyre::WrapErr;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
use std::sync::LazyLock;
use std::sync::{atomic::AtomicBool, LazyLock};

/// Installs the Prometheus recorder as the global recorder.
pub fn install_prometheus_recorder() -> &'static PrometheusHandle {
///
/// Note: This must be installed before any metrics are `described`.
///
/// Caution: This only configures the global recorder and does not spawn the exporter.
/// Callers must run [`PrometheusRecorder::spawn_upkeep`] manually.
pub fn install_prometheus_recorder() -> &'static PrometheusRecorder {
&PROMETHEUS_RECORDER_HANDLE
}

/// The default Prometheus recorder handle. We use a global static to ensure that it is only
/// installed once.
static PROMETHEUS_RECORDER_HANDLE: LazyLock<PrometheusHandle> =
static PROMETHEUS_RECORDER_HANDLE: LazyLock<PrometheusRecorder> =
LazyLock::new(|| PrometheusRecorder::install().unwrap());

/// Prometheus recorder installer
/// A handle to the Prometheus recorder.
///
/// This is intended to be used as the global recorder.
/// Callers must ensure that [`PrometheusRecorder::spawn_upkeep`] is called once.
#[derive(Debug)]
pub struct PrometheusRecorder;
pub struct PrometheusRecorder {
handle: PrometheusHandle,
upkeep: AtomicBool,
}

impl PrometheusRecorder {
const fn new(handle: PrometheusHandle) -> Self {
Self { handle, upkeep: AtomicBool::new(false) }
}

/// Returns a reference to the [`PrometheusHandle`].
pub const fn handle(&self) -> &PrometheusHandle {
&self.handle
}

/// Spawns the upkeep task if there hasn't been one spawned already.
///
/// ## Panics
///
/// This method must be called from within an existing Tokio runtime or it will panic.
///
/// See also [`PrometheusHandle::run_upkeep`]
pub fn spawn_upkeep(&self) {
if self
.upkeep
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::Acquire,
)
.is_err()
{
return;
}

let handle = self.handle.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
handle.run_upkeep();
}
});
}

/// Installs Prometheus as the metrics recorder.
pub fn install() -> eyre::Result<PrometheusHandle> {
///
/// Caution: This only configures the global recorder and does not spawn the exporter.
/// Callers must run [`Self::spawn_upkeep`] manually.
pub fn install() -> eyre::Result<Self> {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();

Expand All @@ -31,7 +84,7 @@ impl PrometheusRecorder {
.install()
.wrap_err("Couldn't set metrics recorder.")?;

Ok(handle)
Ok(Self::new(handle))
}
}

Expand All @@ -52,7 +105,7 @@ mod tests {
process.describe();
process.collect();

let metrics = PROMETHEUS_RECORDER_HANDLE.render();
let metrics = PROMETHEUS_RECORDER_HANDLE.handle.render();
assert!(metrics.contains("process_cpu_seconds_total"), "{metrics:?}");
}
}
2 changes: 1 addition & 1 deletion crates/node/metrics/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl MetricServer {
let hook = hook.clone();
let service = tower::service_fn(move |_| {
(hook)();
let metrics = handle.render();
let metrics = handle.handle().render();
let mut response = Response::new(metrics);
response
.headers_mut()
Expand Down
Loading