Skip to content

Commit

Permalink
Track compute metrics for sql dml with new engine (#2190)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime authored Jan 31, 2025
1 parent 382db47 commit 6aa75bd
Show file tree
Hide file tree
Showing 25 changed files with 1,098 additions and 549 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 28 additions & 33 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ use http::StatusCode;

use spacetimedb::client::ClientActorIndex;
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
use spacetimedb::execution_context::Workload;
use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
use spacetimedb::identity::{AuthCtx, Identity};
use spacetimedb::json::client_api::StmtResultJson;
use spacetimedb::messages::control_db::{Database, HostType, Node, Replica};
use spacetimedb::sql;
use spacetimedb::sql::execute::translate_col;
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld};
use spacetimedb_lib::ProductTypeElement;
use spacetimedb_paths::server::ModuleLogsDir;
Expand Down Expand Up @@ -82,37 +80,34 @@ impl Host {
self.replica_id,
move |db| -> axum::response::Result<_, (StatusCode, String)> {
tracing::info!(sql = body);
let results =
sql::execute::run(db, &body, auth, Some(&module_host.info().subscriptions)).map_err(|e| {
log::warn!("{}", e);
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let json = db.with_read_only(Workload::Sql, |tx| {
results
.into_iter()
.map(|result| {
let rows = result.data;
let schema = result
.head
.fields
.iter()
.map(|x| {
let ty = x.algebraic_type.clone();
let name = translate_col(tx, x.field);
ProductTypeElement::new(ty, name)
})
.collect();
StmtResultJson { schema, rows }
})
.collect::<Vec<_>>()
});

Ok(json)

// We need a header for query results
let mut header = vec![];

let rows = sql::execute::run(
// Returns an empty result set for mutations
db,
&body,
auth,
Some(&module_host.info().subscriptions),
&mut header,
)
.map_err(|e| {
log::warn!("{}", e);
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

// Turn the header into a `ProductType`
let schema = header
.into_iter()
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
.collect();

Ok(vec![StmtResultJson { schema, rows }])
},
)
.await
Expand Down
17 changes: 9 additions & 8 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,13 @@ impl Tx for Locking {
let committed_state_shared_lock = self.committed_state.read_arc();
let lock_wait_time = timer.elapsed();
let ctx = ExecutionContext::with_workload(self.database_identity, workload);
let metrics = ExecutionMetrics::default();
Self::Tx {
committed_state_shared_lock,
lock_wait_time,
timer,
ctx,
metrics,
}
}

Expand Down Expand Up @@ -639,7 +641,7 @@ pub(super) fn record_tx_metrics(
committed: bool,
tx_data: Option<&TxData>,
committed_state: Option<&CommittedState>,
metrics: Option<ExecutionMetrics>,
metrics: ExecutionMetrics,
) {
let workload = &ctx.workload();
let db = &ctx.database_identity();
Expand All @@ -666,9 +668,7 @@ pub(super) fn record_tx_metrics(
.with_label_values(workload, db, reducer)
.observe(elapsed_time);

if let Some(metrics) = metrics {
record_exec_metrics(workload, db, metrics);
}
record_exec_metrics(workload, db, metrics);

/// Update table rows and table size gauges,
/// and sets them to zero if no table is present.
Expand Down Expand Up @@ -1004,10 +1004,10 @@ mod tests {
use crate::db::datastore::system_tables::{
system_tables, StColumnRow, StConstraintData, StConstraintFields, StConstraintRow, StIndexAlgorithm,
StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields, StSequenceFields, StSequenceRow,
StTableRow, StVarFields, StVarValue, ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONSTRAINT_ID,
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
StTableRow, StVarFields, ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME,
ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID,
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
};
use crate::db::datastore::traits::{IsolationLevel, MutTx};
use crate::db::datastore::Result;
Expand All @@ -1018,6 +1018,7 @@ mod tests {
use pretty_assertions::{assert_eq, assert_matches};
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt};
use spacetimedb_primitives::{col_list, ColId, ScheduleId};
use spacetimedb_sats::algebraic_value::ser::value_serialize;
Expand Down
58 changes: 53 additions & 5 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ use core::cell::RefCell;
use core::ops::RangeBounds;
use core::{iter, ops::Bound};
use smallvec::SmallVec;
use spacetimedb_lib::db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP};
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore};
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics};
use spacetimedb_lib::{
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
query::Delta,
};
use spacetimedb_primitives::{ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId};
use spacetimedb_sats::{
bsatn::{self, to_writer, DecodeError, Deserializer},
Expand Down Expand Up @@ -73,6 +77,48 @@ pub struct MutTxId {
pub(crate) metrics: ExecutionMetrics,
}

impl Datastore for MutTxId {
fn blob_store(&self) -> &dyn BlobStore {
&self.committed_state_write_lock.blob_store
}

fn table(&self, table_id: TableId) -> Option<&Table> {
self.committed_state_write_lock.get_table(table_id)
}
}

/// Note, deltas are evaluated using read-only transactions, not mutable ones.
/// Nevertheless this contract is still required for query evaluation.
impl DeltaStore for MutTxId {
fn has_inserts(&self, _: TableId) -> Option<Delta> {
None
}

fn has_deletes(&self, _: TableId) -> Option<Delta> {
None
}

fn inserts_for_table(&self, _: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
None
}

fn deletes_for_table(&self, _: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
None
}
}

impl MutDatastore for MutTxId {
fn insert_product_value(&mut self, table_id: TableId, row: &ProductValue) -> anyhow::Result<()> {
self.insert_via_serialize_bsatn(table_id, row)?;
Ok(())
}

fn delete_product_value(&mut self, table_id: TableId, row: &ProductValue) -> anyhow::Result<()> {
self.delete_by_row_value(table_id, row)?;
Ok(())
}
}

impl MutTxId {
fn drop_col_eq(&mut self, table_id: TableId, col_pos: ColId, value: &AlgebraicValue) -> Result<()> {
let rows = self.iter_by_col_eq(table_id, col_pos, value)?;
Expand Down Expand Up @@ -1065,7 +1111,7 @@ impl MutTxId {
true,
Some(&tx_data),
Some(&committed_state_write_lock),
Some(self.metrics),
self.metrics,
);
tx_data
}
Expand All @@ -1086,7 +1132,7 @@ impl MutTxId {
true,
Some(&tx_data),
Some(&committed_state_write_lock),
Some(self.metrics),
self.metrics,
);
// Update the workload type of the execution context
self.ctx.workload = workload.into();
Expand All @@ -1095,6 +1141,7 @@ impl MutTxId {
lock_wait_time: Duration::ZERO,
timer: Instant::now(),
ctx: self.ctx,
metrics: ExecutionMetrics::default(),
};
(tx_data, tx)
}
Expand All @@ -1109,7 +1156,7 @@ impl MutTxId {
false,
None,
None,
Some(self.metrics),
self.metrics,
);
}

Expand All @@ -1123,7 +1170,7 @@ impl MutTxId {
false,
None,
None,
Some(self.metrics),
self.metrics,
);
// Update the workload type of the execution context
self.ctx.workload = workload.into();
Expand All @@ -1132,6 +1179,7 @@ impl MutTxId {
lock_wait_time: Duration::ZERO,
timer: Instant::now(),
ctx: self.ctx,
metrics: ExecutionMetrics::default(),
}
}
}
Expand Down
12 changes: 11 additions & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{
use crate::db::datastore::locking_tx_datastore::state_view::IterTx;
use crate::execution_context::ExecutionContext;
use spacetimedb_execution::Datastore;
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_primitives::{ColList, TableId};
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_schema::schema::TableSchema;
Expand All @@ -25,6 +26,7 @@ pub struct TxId {
pub(super) lock_wait_time: Duration,
pub(super) timer: Instant,
pub(crate) ctx: ExecutionContext,
pub(crate) metrics: ExecutionMetrics,
}

impl Datastore for TxId {
Expand Down Expand Up @@ -86,7 +88,15 @@ impl StateView for TxId {

impl TxId {
pub(super) fn release(self) {
record_tx_metrics(&self.ctx, self.timer, self.lock_wait_time, true, None, None, None);
record_tx_metrics(
&self.ctx,
self.timer,
self.lock_wait_time,
true,
None,
None,
self.metrics,
);
}

/// The Number of Distinct Values (NDV) for a column or list of columns,
Expand Down
Loading

2 comments on commit 6aa75bd

@github-actions
Copy link

@github-actions github-actions bot commented on 6aa75bd Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

@github-actions
Copy link

@github-actions github-actions bot commented on 6aa75bd Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.