Skip to content

Commit

Permalink
Track query and datastore cpu usage metrics (#2140)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime authored Jan 29, 2025
1 parent 696fa45 commit 4b4484a
Show file tree
Hide file tree
Showing 23 changed files with 1,117 additions and 170 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ impl<T, L: Deref<Target = [T]>> RowListLen for L {
}
}

pub trait ByteListLen {
/// Returns the uncompressed size of the list in bytes
fn num_bytes(&self) -> usize;
}

impl ByteListLen for Vec<ByteString> {
fn num_bytes(&self) -> usize {
self.iter().map(|str| str.len()).sum()
}
}

/// A format / codec used by the websocket API.
///
/// This can be e.g., BSATN, JSON.
Expand All @@ -67,7 +78,14 @@ pub trait WebsocketFormat: Sized {
type Single: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone;

/// The type used for the encoding of a list of items.
type List: SpacetimeType + for<'de> Deserialize<'de> + Serialize + RowListLen + Debug + Clone + Default;
type List: SpacetimeType
+ for<'de> Deserialize<'de>
+ Serialize
+ RowListLen
+ ByteListLen
+ Debug
+ Clone
+ Default;

/// Encodes the `elems` to a list in the format and also returns the length of the list.
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64);
Expand Down Expand Up @@ -899,6 +917,13 @@ impl<B: AsRef<[u8]>, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList<B, I> {
}
}

impl<B: AsRef<[u8]>, I> ByteListLen for BsatnRowList<B, I> {
/// Returns the uncompressed size of the list in bytes
fn num_bytes(&self) -> usize {
self.rows_data.as_ref().len()
}
}

impl BsatnRowList {
/// Returns the element at `index` in the list.
pub fn get(&self, index: usize) -> Option<Bytes> {
Expand Down
21 changes: 15 additions & 6 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
};
use crate::execution_context::Workload;
use crate::{
db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
},
subscription::record_exec_metrics,
};
use crate::{
db::{
datastore::{
Expand All @@ -34,7 +37,7 @@ use core::{cell::RefCell, ops::RangeBounds};
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::{txdata, Txdata};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics};
use spacetimedb_lib::{Address, Identity};
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
Expand Down Expand Up @@ -629,13 +632,14 @@ impl MutTxDatastore for Locking {
}

/// This utility is responsible for recording all transaction metrics.
pub(super) fn record_metrics(
pub(super) fn record_tx_metrics(
ctx: &ExecutionContext,
tx_timer: Instant,
lock_wait_time: Duration,
committed: bool,
tx_data: Option<&TxData>,
committed_state: Option<&CommittedState>,
metrics: Option<ExecutionMetrics>,
) {
let workload = &ctx.workload();
let db = &ctx.database_identity();
Expand All @@ -662,6 +666,10 @@ pub(super) fn record_metrics(
.with_label_values(workload, db, reducer)
.observe(elapsed_time);

if let Some(metrics) = metrics {
record_exec_metrics(workload, db, metrics);
}

/// Update table rows and table size gauges,
/// and sets them to zero if no table is present.
fn update_table_gauges(db: &Identity, table_id: &TableId, table_name: &str, table: Option<&Table>) {
Expand Down Expand Up @@ -719,6 +727,7 @@ impl MutTx for Locking {
lock_wait_time,
timer,
ctx,
metrics: ExecutionMetrics::default(),
}
}

Expand Down
31 changes: 25 additions & 6 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
committed_state::CommittedState,
datastore::{record_metrics, Result},
datastore::{record_tx_metrics, Result},
delete_table::DeleteTable,
sequence::{Sequence, SequencesState},
state_view::{IndexSeekIterIdMutTx, ScanIterByColRangeMutTx, StateView},
Expand Down Expand Up @@ -33,8 +33,8 @@ use core::cell::RefCell;
use core::ops::RangeBounds;
use core::{iter, ops::Bound};
use smallvec::SmallVec;
use spacetimedb_lib::db::raw_def::v9::RawSql;
use spacetimedb_lib::db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP};
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics};
use spacetimedb_primitives::{ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId};
use spacetimedb_sats::{
bsatn::{self, to_writer, DecodeError, Deserializer},
Expand Down Expand Up @@ -70,6 +70,7 @@ pub struct MutTxId {
pub(super) lock_wait_time: Duration,
pub(crate) timer: Instant,
pub(crate) ctx: ExecutionContext,
pub(crate) metrics: ExecutionMetrics,
}

impl MutTxId {
Expand Down Expand Up @@ -1052,13 +1053,14 @@ impl MutTxId {
let tx_data = committed_state_write_lock.merge(tx_state, &self.ctx);
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
record_metrics(
record_tx_metrics(
&self.ctx,
self.timer,
self.lock_wait_time,
true,
Some(&tx_data),
Some(&committed_state_write_lock),
Some(self.metrics),
);
tx_data
}
Expand All @@ -1072,13 +1074,14 @@ impl MutTxId {
let tx_data = committed_state_write_lock.merge(tx_state, &self.ctx);
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
record_metrics(
record_tx_metrics(
&self.ctx,
self.timer,
self.lock_wait_time,
true,
Some(&tx_data),
Some(&committed_state_write_lock),
Some(self.metrics),
);
// Update the workload type of the execution context
self.ctx.workload = workload.into();
Expand All @@ -1094,13 +1097,29 @@ impl MutTxId {
pub fn rollback(self) {
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
record_metrics(&self.ctx, self.timer, self.lock_wait_time, false, None, None);
record_tx_metrics(
&self.ctx,
self.timer,
self.lock_wait_time,
false,
None,
None,
Some(self.metrics),
);
}

pub fn rollback_downgrade(mut self, workload: Workload) -> TxId {
// Record metrics for the transaction at the very end,
// right before we drop and release the lock.
record_metrics(&self.ctx, self.timer, self.lock_wait_time, false, None, None);
record_tx_metrics(
&self.ctx,
self.timer,
self.lock_wait_time,
false,
None,
None,
Some(self.metrics),
);
// Update the workload type of the execution context
self.ctx.workload = workload.into();
TxId {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/db/datastore/locking_tx_datastore/tx.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::datastore::record_metrics;
use super::datastore::record_tx_metrics;
use super::{
committed_state::CommittedState,
datastore::Result,
Expand Down Expand Up @@ -86,7 +86,7 @@ impl StateView for TxId {

impl TxId {
pub(super) fn release(self) {
record_metrics(&self.ctx, self.timer, self.lock_wait_time, true, None, None);
record_tx_metrics(&self.ctx, self.timer, self.lock_wait_time, true, None, None, None);
}

/// The Number of Distinct Values (NDV) for a column or list of columns,
Expand Down
23 changes: 14 additions & 9 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@ metrics_group!(
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
pub rdb_num_rows_deleted: IntCounterVec,

#[name = spacetime_num_rows_fetched_total]
#[help = "The cumulative number of rows fetched from a table"]
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
pub rdb_num_rows_fetched: IntCounterVec,
#[name = spacetime_num_rows_scanned_total]
#[help = "The cumulative number of rows scanned from the database"]
#[labels(txn_type: WorkloadType, db: Identity)]
pub rdb_num_rows_scanned: IntCounterVec,

#[name = spacetime_num_index_keys_scanned_total]
#[help = "The cumulative number of keys scanned from an index"]
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
pub rdb_num_keys_scanned: IntCounterVec,
#[name = spacetime_num_bytes_scanned_total]
#[help = "The cumulative number of bytes scanned from the database"]
#[labels(txn_type: WorkloadType, db: Identity)]
pub rdb_num_bytes_scanned: IntCounterVec,

#[name = spacetime_num_bytes_written_total]
#[help = "The cumulative number of bytes written to the database"]
#[labels(txn_type: WorkloadType, db: Identity)]
pub rdb_num_bytes_written: IntCounterVec,

#[name = spacetime_num_index_seeks_total]
#[help = "The cumulative number of index seeks"]
#[labels(txn_type: WorkloadType, db: Identity, reducer_or_query: str, table_id: u32, table_name: str)]
#[labels(txn_type: WorkloadType, db: Identity)]
pub rdb_num_index_seeks: IntCounterVec,

#[name = spacetime_num_txns_total]
Expand Down
Loading

2 comments on commit 4b4484a

@github-actions
Copy link

@github-actions github-actions bot commented on 4b4484a Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

@github-actions
Copy link

@github-actions github-actions bot commented on 4b4484a Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.