Skip to content

Commit

Permalink
agent: add diagnostics for db connections
Browse files Browse the repository at this point in the history
- Adds periodic logging of connection pool size
- Sets the application name to the k8s pod name, to better correlate
  with pg_stat_activity
- Sets a connection acquisition timeout
- Configures logging of slow statements
- Pings the connection when returning it to the pool, so that we can
  detect and remove bad connections
  • Loading branch information
psFried committed Oct 18, 2024
1 parent fcc99e8 commit bce14ba
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ librocksdb-sys = { version = "0.16.0", default-features = false, features = [
"snappy",
"rtti",
] }
log = "0.4" # only used to configure logging of dependencies
lz4 = "1.24.0"
lz4_flex = "0.11.0"
mime = "0.3"
Expand Down
1 change: 1 addition & 0 deletions crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ humantime-serde = { workspace = true }
itertools = { workspace = true }
jsonwebtoken = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
Expand Down
40 changes: 38 additions & 2 deletions crates/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use clap::Parser;
use derivative::Derivative;
use futures::FutureExt;
use rand::Rng;
use sqlx::{ConnectOptions, Connection};

/// Agent is a daemon which runs server-side tasks of the Flow control-plane.
#[derive(Derivative, Parser)]
Expand Down Expand Up @@ -96,12 +97,15 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
.into_string()
.expect("os path must be utf8");

// The HOSTNAME variable will be set to the name of the pod in k8s
let application_name = std::env::var("HOSTNAME").unwrap_or_else(|_| "agent".to_string());
let mut pg_options = args
.database_url
.as_str()
.parse::<sqlx::postgres::PgConnectOptions>()
.context("parsing database URL")?
.application_name("agent");
.application_name(&application_name);
pg_options.log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_secs(10));

// If a database CA was provided, require that we use TLS with full cert verification.
if let Some(ca) = &args.database_ca {
Expand All @@ -113,10 +117,42 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
pg_options = pg_options.ssl_mode(sqlx::postgres::PgSslMode::Prefer);
}

let pg_pool = sqlx::postgres::PgPool::connect_with(pg_options)
let pg_pool = sqlx::postgres::PgPoolOptions::new()
.acquire_timeout(std::time::Duration::from_secs(5))
.after_release(|conn, meta| {
let fut = async move {
let r =tokio::time::timeout(std::time::Duration::from_secs(5), async {
conn.ping()
});
if let Err(err) = r.await {
tracing::warn!(error = ?err, conn_meta = ?meta, "connection was put back in a bad state, removing from the pool");
Ok(false)
} else {
Ok(true) // connection is good
}
};
fut.boxed()
})
.connect_with(pg_options)
.await
.context("connecting to database")?;

// Periodically log information about the connection pool to aid in debugging.
let pool_copy = pg_pool.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(120));
loop {
interval.tick().await;
let total_connections = pool_copy.size();
let idle_connections = pool_copy.num_idle();
tracing::info!(
total_connections,
idle_connections,
"db connection pool stats"
);
}
});

let system_user_id = agent_sql::get_user_id_for_email(&args.accounts_email, &pg_pool)
.await
.context("querying for agent user id")?;
Expand Down

0 comments on commit bce14ba

Please sign in to comment.