Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the update ABI #2137

Merged
merged 8 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ paste = "1.0"
petgraph = { version = "0.6.5", default-features = false }
pin-project-lite = "0.2.9"
postgres-types = "0.2.5"
pretty_assertions = "1.4"
pretty_assertions = { version = "1.4", features = ["unstable"] }
proc-macro2 = "1.0"
prometheus = "0.13.0"
proptest = "1.4"
Expand Down
3 changes: 2 additions & 1 deletion crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,11 +813,12 @@ pub fn datastore_insert_bsatn(table_id: TableId, row: &mut [u8]) -> Result<&[u8]
///
/// Returns an error if
/// - a table with the provided `table_id` doesn't exist
/// - an index with the provided `index_id` doesn't exist
/// - an index with the provided `index_id` doesn't exist or if the index was not unique.
/// - there were unique constraint violations
/// - `row` doesn't decode from BSATN to a `ProductValue`
/// according to the `ProductType` that the table's schema specifies
/// or if `row` cannot project to the index's type.
/// - the row was not found
#[inline]
pub fn datastore_update_bsatn(table_id: TableId, index_id: IndexId, row: &mut [u8]) -> Result<&[u8], Errno> {
let row_ptr = row.as_mut_ptr();
Expand Down
36 changes: 23 additions & 13 deletions crates/bindings/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,21 +347,11 @@ impl<Tbl: Table, Col: Index + Column<Table = Tbl>> UniqueColumn<Tbl, Col::ColTyp
/// or if either the delete or the insertion would violate a constraint.
#[track_caller]
pub fn update(&self, new_row: Tbl::Row) -> Tbl::Row {
let (deleted, buf) = self._delete(Col::get_field(&new_row));
if !deleted {
update_row_didnt_exist(Tbl::TABLE_NAME, Col::COLUMN_NAME)
}
insert::<Tbl>(new_row, buf).unwrap_or_else(|e| panic!("{e}"))
let buf = IterBuf::take();
update::<Tbl>(Col::index_id(), new_row, buf)
}
}

#[cold]
#[inline(never)]
#[track_caller]
fn update_row_didnt_exist(table_name: &str, unique_column: &str) -> ! {
panic!("UniqueColumn::update: row in table `{table_name}` being updated by unique column `{unique_column}` did not already exist")
}

pub trait Index {
fn index_id() -> IndexId;
}
Expand Down Expand Up @@ -813,7 +803,7 @@ fn insert<T: Table>(mut row: T::Row, mut buf: IterBuf) -> Result<T::Row, TryInse
// Insert row into table.
// When table has an auto-incrementing column, we must re-decode the changed `buf`.
let res = sys::datastore_insert_bsatn(table_id, &mut buf).map(|gen_cols| {
// Let the caller handle any generated columns written back by `sys::insert` to `buf`.
// Let the caller handle any generated columns written back by `sys::datastore_insert_bsatn` to `buf`.
T::integrate_generated_columns(&mut row, gen_cols);
row
});
Expand All @@ -829,6 +819,26 @@ fn insert<T: Table>(mut row: T::Row, mut buf: IterBuf) -> Result<T::Row, TryInse
})
}

/// Update a row of type `T` to `row` using the index identified by `index_id`.
#[track_caller]
fn update<T: Table>(index_id: IndexId, mut row: T::Row, mut buf: IterBuf) -> T::Row {
let table_id = T::table_id();
// Encode the row as bsatn into the buffer `buf`.
buf.clear();
buf.serialize_into(&row).unwrap();

// Insert row into table.
// When table has an auto-incrementing column, we must re-decode the changed `buf`.
let res = sys::datastore_update_bsatn(table_id, index_id, &mut buf).map(|gen_cols| {
// Let the caller handle any generated columns written back by `sys::datastore_update_bsatn` to `buf`.
T::integrate_generated_columns(&mut row, gen_cols);
row
});

// TODO(centril): introduce a `TryUpdateError`.
res.unwrap_or_else(|e| panic!("unexpected update error: {e}"))
}

/// A table iterator which yields values of the `TableType` corresponding to the table.
struct TableIter<T: DeserializeOwned> {
/// The underlying source of our `Buffer`s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use super::{
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
IterByColEqTx,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx};
use crate::{
db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx},
error::IndexError,
};
use crate::{
db::{
datastore::{
Expand Down Expand Up @@ -132,8 +135,8 @@ fn ignore_duplicate_insert_error<T>(res: std::result::Result<T, InsertError>) ->
match res {
Ok(_) => Ok(()),
Err(InsertError::Duplicate(_)) => Ok(()),
// TODO(error-handling): impl From<InsertError> for DBError.
Err(err) => Err(TableError::Insert(err).into()),
Err(InsertError::Bflatn(e)) => Err(e.into()),
Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()),
}
}

Expand Down Expand Up @@ -321,7 +324,7 @@ impl CommittedState {
let blob_store = &mut self.blob_store;
table
.delete_equal_row(blob_store, rel)
.map_err(TableError::Insert)?
.map_err(TableError::Bflatn)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
Ok(())
}
Expand All @@ -333,8 +336,11 @@ impl CommittedState {
row: &ProductValue,
) -> Result<()> {
let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema);
table.insert(blob_store, row).map_err(TableError::Insert)?;
Ok(())
table.insert(blob_store, row).map(drop).map_err(|e| match e {
InsertError::Bflatn(e) => TableError::Bflatn(e).into(),
InsertError::Duplicate(e) => TableError::Duplicate(e).into(),
InsertError::IndexError(e) => IndexError::UniqueConstraintViolation(e).into(),
})
}

pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> {
Expand Down Expand Up @@ -511,16 +517,16 @@ impl CommittedState {
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();

// First, merge index id fast-lookup map changes and delete indices.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);

// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);

// Merge index id fast-lookup map changes.
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
if self.tx_consumes_offset(&tx_data, ctx) {
Expand Down Expand Up @@ -613,9 +619,21 @@ impl CommittedState {
}

fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
for index_id in index_id_map_removals.into_iter().flatten() {
self.index_id_map.remove(index_id);
// Remove indices that tx-state removed.
// It's not necessarily the case that the index already existed in the committed state.
for (index_id, table_id) in index_id_map_removals
.into_iter()
.flatten()
.filter_map(|index_id| self.index_id_map.remove(index_id).map(|x| (*index_id, x)))
{
assert!(self
.tables
.get_mut(&table_id)
.expect("table to delete index from should exist")
.delete_index(&self.blob_store, index_id));
}

// Add the ones tx-state added.
self.index_id_map.extend(index_id_map);
}

Expand Down
Loading
Loading