Skip to content

Commit

Permalink
Add the update ABI (#2137)
Browse files Browse the repository at this point in the history
Co-authored-by: Zeke Foppa <[email protected]>
  • Loading branch information
Centril and bfops authored Jan 27, 2025
1 parent 729dbb1 commit afdc0d6
Show file tree
Hide file tree
Showing 12 changed files with 916 additions and 225 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ paste = "1.0"
petgraph = { version = "0.6.5", default-features = false }
pin-project-lite = "0.2.9"
postgres-types = "0.2.5"
pretty_assertions = "1.4"
pretty_assertions = { version = "1.4", features = ["unstable"] }
proc-macro2 = "1.0"
prometheus = "0.13.0"
proptest = "1.4"
Expand Down
3 changes: 2 additions & 1 deletion crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,11 +813,12 @@ pub fn datastore_insert_bsatn(table_id: TableId, row: &mut [u8]) -> Result<&[u8]
///
/// Returns an error if
/// - a table with the provided `table_id` doesn't exist
/// - an index with the provided `index_id` doesn't exist
/// - an index with the provided `index_id` doesn't exist or if the index was not unique.
/// - there were unique constraint violations
/// - `row` doesn't decode from BSATN to a `ProductValue`
/// according to the `ProductType` that the table's schema specifies
/// or if `row` cannot project to the index's type.
/// - the row was not found
#[inline]
pub fn datastore_update_bsatn(table_id: TableId, index_id: IndexId, row: &mut [u8]) -> Result<&[u8], Errno> {
let row_ptr = row.as_mut_ptr();
Expand Down
36 changes: 23 additions & 13 deletions crates/bindings/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,21 +347,11 @@ impl<Tbl: Table, Col: Index + Column<Table = Tbl>> UniqueColumn<Tbl, Col::ColTyp
/// or if either the delete or the insertion would violate a constraint.
#[track_caller]
pub fn update(&self, new_row: Tbl::Row) -> Tbl::Row {
let (deleted, buf) = self._delete(Col::get_field(&new_row));
if !deleted {
update_row_didnt_exist(Tbl::TABLE_NAME, Col::COLUMN_NAME)
}
insert::<Tbl>(new_row, buf).unwrap_or_else(|e| panic!("{e}"))
let buf = IterBuf::take();
update::<Tbl>(Col::index_id(), new_row, buf)
}
}

#[cold]
#[inline(never)]
#[track_caller]
fn update_row_didnt_exist(table_name: &str, unique_column: &str) -> ! {
panic!("UniqueColumn::update: row in table `{table_name}` being updated by unique column `{unique_column}` did not already exist")
}

pub trait Index {
fn index_id() -> IndexId;
}
Expand Down Expand Up @@ -813,7 +803,7 @@ fn insert<T: Table>(mut row: T::Row, mut buf: IterBuf) -> Result<T::Row, TryInse
// Insert row into table.
// When table has an auto-incrementing column, we must re-decode the changed `buf`.
let res = sys::datastore_insert_bsatn(table_id, &mut buf).map(|gen_cols| {
// Let the caller handle any generated columns written back by `sys::insert` to `buf`.
// Let the caller handle any generated columns written back by `sys::datastore_insert_bsatn` to `buf`.
T::integrate_generated_columns(&mut row, gen_cols);
row
});
Expand All @@ -829,6 +819,26 @@ fn insert<T: Table>(mut row: T::Row, mut buf: IterBuf) -> Result<T::Row, TryInse
})
}

/// Update a row of type `T` to `row` using the index identified by `index_id`.
#[track_caller]
fn update<T: Table>(index_id: IndexId, mut row: T::Row, mut buf: IterBuf) -> T::Row {
let table_id = T::table_id();
// Encode the row as bsatn into the buffer `buf`.
buf.clear();
buf.serialize_into(&row).unwrap();

// Insert row into table.
// When table has an auto-incrementing column, we must re-decode the changed `buf`.
let res = sys::datastore_update_bsatn(table_id, index_id, &mut buf).map(|gen_cols| {
// Let the caller handle any generated columns written back by `sys::datastore_update_bsatn` to `buf`.
T::integrate_generated_columns(&mut row, gen_cols);
row
});

// TODO(centril): introduce a `TryUpdateError`.
res.unwrap_or_else(|e| panic!("unexpected update error: {e}"))
}

/// A table iterator which yields values of the `TableType` corresponding to the table.
struct TableIter<T: DeserializeOwned> {
/// The underlying source of our `Buffer`s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use super::{
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
IterByColEqTx,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx};
use crate::{
db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx},
error::IndexError,
};
use crate::{
db::{
datastore::{
Expand Down Expand Up @@ -132,8 +135,8 @@ fn ignore_duplicate_insert_error<T>(res: std::result::Result<T, InsertError>) ->
match res {
Ok(_) => Ok(()),
Err(InsertError::Duplicate(_)) => Ok(()),
// TODO(error-handling): impl From<InsertError> for DBError.
Err(err) => Err(TableError::Insert(err).into()),
Err(InsertError::Bflatn(e)) => Err(e.into()),
Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()),
}
}

Expand Down Expand Up @@ -321,7 +324,7 @@ impl CommittedState {
let blob_store = &mut self.blob_store;
table
.delete_equal_row(blob_store, rel)
.map_err(TableError::Insert)?
.map_err(TableError::Bflatn)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
Ok(())
}
Expand All @@ -333,8 +336,11 @@ impl CommittedState {
row: &ProductValue,
) -> Result<()> {
let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema);
table.insert(blob_store, row).map_err(TableError::Insert)?;
Ok(())
table.insert(blob_store, row).map(drop).map_err(|e| match e {
InsertError::Bflatn(e) => TableError::Bflatn(e).into(),
InsertError::Duplicate(e) => TableError::Duplicate(e).into(),
InsertError::IndexError(e) => IndexError::UniqueConstraintViolation(e).into(),
})
}

pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> {
Expand Down Expand Up @@ -511,16 +517,16 @@ impl CommittedState {
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();

// First, merge index id fast-lookup map changes and delete indices.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);

// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);

// Merge index id fast-lookup map changes.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
if self.tx_consumes_offset(&tx_data, ctx) {
Expand Down Expand Up @@ -613,9 +619,21 @@ impl CommittedState {
}

fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
for index_id in index_id_map_removals.into_iter().flatten() {
self.index_id_map.remove(index_id);
// Remove indices that tx-state removed.
// It's not necessarily the case that the index already existed in the committed state.
for (index_id, table_id) in index_id_map_removals
.into_iter()
.flatten()
.filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x)))
{
assert!(self
.tables
.get_mut(&table_id)
.expect("table to delete index from should exist")
.delete_index(&self.blob_store, index_id));
}

// Add the ones tx-state added.
self.index_id_map.extend(index_id_map);
}

Expand Down
Loading

1 comment on commit afdc0d6

@github-actions
Copy link

@github-actions github-actions bot commented on afdc0d6 Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

Please sign in to comment.