Skip to content

Commit

Permalink
cache if a table is scheduler or not
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril committed Jan 27, 2025
1 parent f388240 commit 91fe2fb
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 48 deletions.
89 changes: 80 additions & 9 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx};
use crate::db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
};
use crate::execution_context::Workload;
use crate::{
db::{
Expand Down Expand Up @@ -573,9 +576,9 @@ impl MutTxDatastore for Locking {
tx: &'a mut Self::MutTx,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)> {
let (gens, row_ref) = tx.insert::<true>(table_id, row)?;
Ok((gens, row_ref.collapse()))
) -> Result<(ColList, RowRef<'a>, InsertFlags)> {
let (gens, row_ref, insert_flags) = tx.insert::<true>(table_id, row)?;
Ok((gens, row_ref.collapse(), insert_flags))
}

fn update_mut_tx<'a>(
Expand All @@ -584,7 +587,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)> {
) -> Result<(ColList, RowRef<'a>, UpdateFlags)> {
tx.update(table_id, index_id, row)
}

Expand Down Expand Up @@ -1002,12 +1005,13 @@ mod tests {
use pretty_assertions::{assert_eq, assert_matches};
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::resolved_type_via_v9;
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt};
use spacetimedb_primitives::{col_list, ColId, ScheduleId};
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType};
use spacetimedb_schema::def::{BTreeAlgorithm, ConstraintData, IndexAlgorithm, UniqueConstraintData};
use spacetimedb_schema::schema::{
ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema,
ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, ScheduleSchema, SequenceSchema,
};
use spacetimedb_table::table::UniqueConstraintViolation;

Expand Down Expand Up @@ -1406,7 +1410,7 @@ mod tests {
row: &ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let row = to_vec(&row).unwrap();
let (gen_cols, row_ref) = datastore.insert_mut_tx(tx, table_id, &row)?;
let (gen_cols, row_ref, _) = datastore.insert_mut_tx(tx, table_id, &row)?;
let gen_cols = row_ref.project(&gen_cols)?;
Ok((gen_cols, row_ref))
}
Expand All @@ -1419,7 +1423,7 @@ mod tests {
row: &ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let row = to_vec(&row).unwrap();
let (gen_cols, row_ref) = datastore.update_mut_tx(tx, table_id, index_id, &row)?;
let (gen_cols, row_ref, _) = datastore.update_mut_tx(tx, table_id, index_id, &row)?;
let gen_cols = row_ref.project(&gen_cols)?;
Ok((gen_cols, row_ref))
}
Expand Down Expand Up @@ -2498,6 +2502,73 @@ mod tests {
Ok(())
}

#[test]
fn test_scheduled_table_insert_and_update() -> ResultTest<()> {
let table_id = TableId::SENTINEL;
// Build the minimal schema that is a valid scheduler table.
let schema = TableSchema::new(
table_id,
"Foo".into(),
vec![
ColumnSchema {
table_id,
col_pos: 0.into(),
col_name: "id".into(),
col_type: AlgebraicType::U64,
},
ColumnSchema {
table_id,
col_pos: 1.into(),
col_name: "at".into(),
col_type: ScheduleAt::get_type(),
},
],
vec![IndexSchema {
table_id,
index_id: IndexId::SENTINEL,
index_name: "id_idx".into(),
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: 0.into() }),
}],
vec![ConstraintSchema {
table_id,
constraint_id: ConstraintId::SENTINEL,
constraint_name: "id_unique".into(),
data: ConstraintData::Unique(UniqueConstraintData { columns: 0.into() }),
}],
vec![],
StTableType::User,
StAccess::Public,
Some(ScheduleSchema {
table_id,
schedule_id: ScheduleId::SENTINEL,
schedule_name: "schedule".into(),
reducer_name: "reducer".into(),
at_column: 1.into(),
}),
Some(0.into()),
);

// Create the table.
let datastore = get_datastore()?;
let mut tx = begin_mut_tx(&datastore);
let table_id = datastore.create_table_mut_tx(&mut tx, schema)?;
let index_id = datastore
.index_id_from_name_mut_tx(&tx, "id_idx")?
.expect("there should be an index with this name");

// Make us a row and insert + identity update.
let row = &product![24u64, value_serialize(&ScheduleAt::Interval(42))];
let row = &to_vec(row).unwrap();
let (_, _, insert_flags) = datastore.insert_mut_tx(&mut tx, table_id, row)?;
let (_, _, update_flags) = datastore.update_mut_tx(&mut tx, table_id, index_id, row)?;

// The whole point of the test.
assert!(insert_flags.is_scheduler_table);
assert!(update_flags.is_scheduler_table);

Ok(())
}

#[test]
fn test_row_level_security() -> ResultTest<()> {
let (_, mut tx, table_id) = setup_table()?;
Expand Down
42 changes: 31 additions & 11 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ use super::{
tx_state::{DeleteTable, IndexIdMap, TxState},
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx;
use crate::db::datastore::locking_tx_datastore::state_view::{
IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx,
};
use crate::db::datastore::system_tables::{
with_sys_table_buf, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields,
StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields,
StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
};
use crate::db::datastore::traits::{RowTypeForTable, TxData};
use crate::db::datastore::{
locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx, traits::InsertFlags,
};
use crate::db::datastore::{
locking_tx_datastore::state_view::{
IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx,
},
traits::UpdateFlags,
};
use crate::execution_context::Workload;
use crate::{
error::{IndexError, SequenceError, TableError},
Expand Down Expand Up @@ -1174,7 +1179,7 @@ impl MutTxId {
&'a mut self,
table_id: TableId,
row: &T,
) -> Result<(ColList, RowRefInsertion<'a>)> {
) -> Result<(ColList, RowRefInsertion<'a>, InsertFlags)> {
thread_local! {
static BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
}
Expand All @@ -1199,11 +1204,12 @@ impl MutTxId {
/// Returns:
/// - a list of columns which have been replaced with generated values.
/// - a ref to the inserted row.
/// - any insert flags.
pub(super) fn insert<const GENERATE: bool>(
&mut self,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRefInsertion<'_>)> {
) -> Result<(ColList, RowRefInsertion<'_>, InsertFlags)> {
// Get the insert table, so we can write the row into it.
let (tx_table, tx_blob_store, ..) = self
.tx_state
Expand All @@ -1213,6 +1219,10 @@ impl MutTxId {
)
.ok_or(TableError::IdNotFoundState(table_id))?;

let insert_flags = InsertFlags {
is_scheduler_table: tx_table.is_scheduler(),
};

// 1. Insert the physical row.
let (tx_row_ref, blob_bytes) = tx_table.insert_physically_bsatn(tx_blob_store, row)?;
// 2. Optionally: Detect, generate, write sequence values.
Expand Down Expand Up @@ -1322,7 +1332,7 @@ impl MutTxId {
// SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`.
unsafe { commit_table.get_row_ref_unchecked(blob_store, commit_ptr) },
);
return Ok((gen_cols, rri));
return Ok((gen_cols, rri, insert_flags));
}

// Pacify the borrow checker.
Expand Down Expand Up @@ -1350,15 +1360,15 @@ impl MutTxId {
// as there haven't been any interleaving `&mut` calls that could invalidate the pointer.
tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr)
});
Ok((gen_cols, rri))
Ok((gen_cols, rri, insert_flags))
}
// `row` previously present in insert tables; do nothing but return `ptr`.
Err(InsertError::Duplicate(DuplicateError(ptr))) => {
let rri = RowRefInsertion::Existed(
// SAFETY: `tx_table` told us that `ptr` refers to a valid row in it.
unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) },
);
Ok((gen_cols, rri))
Ok((gen_cols, rri, insert_flags))
}

// Unwrap these error into `TableError::{IndexError, Bflatn}`:
Expand All @@ -1382,7 +1392,13 @@ impl MutTxId {
/// Returns:
/// - a list of columns which have been replaced with generated values.
/// - a ref to the new row.
pub(crate) fn update(&mut self, table_id: TableId, index_id: IndexId, row: &[u8]) -> Result<(ColList, RowRef<'_>)> {
/// - any update flags.
pub(crate) fn update(
&mut self,
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'_>, UpdateFlags)> {
let tx_removed_index = self.tx_state_removed_index(index_id);

// 1. Insert the physical row into the tx insert table.
Expand Down Expand Up @@ -1425,6 +1441,10 @@ impl MutTxId {
// SAFETY: `tx_table.is_row_present(tx_row_ptr)` holds as we haven't deleted it yet.
let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) };

let update_flags = UpdateFlags {
is_scheduler_table: tx_table.is_scheduler(),
};

// 3. Find the old row and remove it.
//----------------------------------------------------------------------
#[inline]
Expand Down Expand Up @@ -1564,7 +1584,7 @@ impl MutTxId {
// per post-condition of `confirm_insertion` and `confirm_update`
// in the if/else branches respectively.
let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) };
return Ok((cols_to_gen, tx_row_ref));
return Ok((cols_to_gen, tx_row_ref, update_flags));
};

// When we reach here, we had an error and we need to revert the insertion of `tx_row_ref`.
Expand Down
18 changes: 16 additions & 2 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,18 @@ impl Program {
}
}

/// Additional information about an insert operation.
pub struct InsertFlags {
/// Is the table a scheduler table?
pub is_scheduler_table: bool,
}

/// Additional information about an update operation.
pub struct UpdateFlags {
/// Is the table a scheduler table?
pub is_scheduler_table: bool,
}

pub trait TxDatastore: DataRow + Tx {
type IterTx<'a>: Iterator<Item = Self::RowRef<'a>>
where
Expand Down Expand Up @@ -486,6 +498,7 @@ pub trait MutTxDatastore: TxDatastore + MutTx {
///
/// Returns the list of columns with sequence-trigger values that were replaced with generated ones
/// and a reference to the row as a [`RowRef`].
/// Also returns any additional insert flags.
///
/// Generated columns are columns with an auto-inc sequence
/// and where the column was `0` in `row`.
Expand All @@ -494,12 +507,13 @@ pub trait MutTxDatastore: TxDatastore + MutTx {
tx: &'a mut Self::MutTx,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)>;
) -> Result<(ColList, RowRef<'a>, InsertFlags)>;
/// Updates a row to `row`, encoded in BSATN, into the table identified by `table_id`
/// using the index identified by `index_id`.
///
/// Returns the list of columns with sequence-trigger values that were replaced with generated ones
/// and a reference to the row as a [`RowRef`].
/// Also returns any additional update flags.
///
/// Generated columns are columns with an auto-inc sequence
/// and where the column was `0` in `row`.
Expand All @@ -509,7 +523,7 @@ pub trait MutTxDatastore: TxDatastore + MutTx {
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)>;
) -> Result<(ColList, RowRef<'a>, UpdateFlags)>;

/// Obtain the [`Metadata`] for this datastore.
///
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::datastore::locking_tx_datastore::state_view::{
};
use super::datastore::system_tables::ST_MODULE_ID;
use super::datastore::traits::{
IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
UpdateFlags,
};
use super::datastore::{
locking_tx_datastore::{
Expand Down Expand Up @@ -1130,7 +1131,7 @@ impl RelationalDB {
tx: &'a mut MutTx,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>), DBError> {
) -> Result<(ColList, RowRef<'a>, InsertFlags), DBError> {
self.inner.insert_mut_tx(tx, table_id, row)
}

Expand All @@ -1140,7 +1141,7 @@ impl RelationalDB {
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>), DBError> {
) -> Result<(ColList, RowRef<'a>, UpdateFlags), DBError> {
self.inner.update_mut_tx(tx, table_id, index_id, row)
}

Expand Down Expand Up @@ -1540,7 +1541,7 @@ pub mod tests_utils {
table_id: TableId,
row: &T,
) -> Result<(AlgebraicValue, RowRef<'a>), DBError> {
let (gen_cols, row_ref) = db.insert(tx, table_id, &to_vec(row).unwrap())?;
let (gen_cols, row_ref, _) = db.insert(tx, table_id, &to_vec(row).unwrap())?;
let gen_cols = row_ref.project(&gen_cols).unwrap();
Ok((gen_cols, row_ref))
}
Expand Down
Loading

0 comments on commit 91fe2fb

Please sign in to comment.