Skip to content

Commit

Permalink
perf: avoid read after create when possible (#4041)
Browse files Browse the repository at this point in the history
  • Loading branch information
Weakky authored Jul 12, 2023
1 parent 6b0aef6 commit 96f20d9
Show file tree
Hide file tree
Showing 28 changed files with 281 additions and 93 deletions.
3 changes: 2 additions & 1 deletion psl/builtin-connectors/src/cockroach_datamodel_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ const CAPABILITIES: ConnectorCapabilities = enumflags2::make_bitflags!(Connector
SupportsTxIsolationSerializable |
NativeUpsert |
MultiSchema |
FilteredInlineChildNestedToOneDisconnect
FilteredInlineChildNestedToOneDisconnect |
InsertReturning
});

const SCALAR_TYPE_DEFAULTS: &[(ScalarType, CockroachType)] = &[
Expand Down
3 changes: 2 additions & 1 deletion psl/builtin-connectors/src/postgres_datamodel_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ const CAPABILITIES: ConnectorCapabilities = enumflags2::make_bitflags!(Connector
SupportsTxIsolationReadCommitted |
SupportsTxIsolationRepeatableRead |
SupportsTxIsolationSerializable |
NativeUpsert
NativeUpsert |
InsertReturning
});

pub struct PostgresDatamodelConnector;
Expand Down
3 changes: 3 additions & 0 deletions psl/builtin-connectors/src/sqlite_datamodel_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const CAPABILITIES: ConnectorCapabilities = enumflags2::make_bitflags!(Connector
SupportsTxIsolationSerializable |
NativeUpsert |
FilteredInlineChildNestedToOneDisconnect
// InsertReturning - While SQLite does support RETURNING, it does not return column information on the way back from the database.
// This column type information is necessary in order to preserve consistency for some data types such as int, where values could overflow.
// Since we care to stay consistent with reads, it is not enabled.
});

pub struct SqliteDatamodelConnector;
Expand Down
3 changes: 2 additions & 1 deletion psl/psl-core/src/datamodel_connector/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ capabilities!(
SupportsTxIsolationRepeatableRead,
SupportsTxIsolationSerializable,
SupportsTxIsolationSnapshot,
NativeUpsert
NativeUpsert,
InsertReturning
);

/// Contains all capabilities that the connector is able to serve.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod metrics {
CockroachDb => (), // not deterministic
MySql(_) => assert_eq!(total_queries, 12),
Vitess(_) => assert_eq!(total_queries, 11),
Postgres(_) => assert_eq!(total_queries, 14),
Postgres(_) => assert_eq!(total_queries, 11),
}

assert_eq!(total_operations, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod prisma_15581 {
runner,
r#"mutation { createOnetest(data: { reference: 3 }) { reference created_at other } }"#
);

Ok(())
}

Expand All @@ -45,6 +46,7 @@ mod prisma_15581 {
runner,
r#"mutation { createOnetest2(data: { reference: 3 }) { reference updated_at other } }"#
);

Ok(())
}

Expand All @@ -63,16 +65,11 @@ mod prisma_15581 {

#[connector_test(only(Postgres), schema(pg_schema))]
async fn create_one_model_with_low_precision_datetime_in_id(runner: Runner) -> TestResult<()> {
let result = runner
.query(r#"mutation { createOnetest(data: { reference: 3 }) { reference created_at other } }"#)
.await?
.to_string_pretty();

// This is a test that confirms the current behaviour. Ideally, the create mutation above
// should work.
assert!(
result.contains("\"error\": \"Query createOnetest is required to return data, but found no record(s).\"")
run_query!(
runner,
r#"mutation { createOnetest(data: { reference: 3 }) { reference created_at other } }"#
);

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ impl WriteOperations for MongoDbConnection {
&mut self,
model: &Model,
args: WriteArgs,
// The field selection on a create is never used on MongoDB as it cannot return more than the ID.
_selected_fields: FieldSelection,
_trace_id: Option<String>,
) -> connector_interface::Result<SelectionResult> {
) -> connector_interface::Result<SingleRecord> {
catch(async move { write::create_record(&self.database, &mut self.session, model, args).await }).await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ impl<'conn> WriteOperations for MongoDbTransaction<'conn> {
&mut self,
model: &Model,
args: connector_interface::WriteArgs,
// The field selection on a create is never used on MongoDB as it cannot return more than the ID.
_selected_fields: FieldSelection,
_trace_id: Option<String>,
) -> connector_interface::Result<SelectionResult> {
) -> connector_interface::Result<SingleRecord> {
catch(async move {
write::create_record(&self.connection.database, &mut self.connection.session, model, args).await
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn create_record<'conn>(
session: &mut ClientSession,
model: &Model,
mut args: WriteArgs,
) -> crate::Result<SelectionResult> {
) -> crate::Result<SingleRecord> {
let coll = database.collection::<Document>(model.db_name());

let span = info_span!(
Expand Down Expand Up @@ -72,7 +72,10 @@ pub async fn create_record<'conn>(
.await?;
let id_value = value_from_bson(insert_result.inserted_id, &id_meta)?;

Ok(SelectionResult::from((id_field, id_value)))
Ok(SingleRecord {
record: Record::new(vec![id_value]),
field_names: vec![id_field.db_name().to_owned()],
})
}

pub async fn create_records<'conn>(
Expand Down
3 changes: 2 additions & 1 deletion query-engine/connectors/query-connector/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,9 @@ pub trait WriteOperations {
&mut self,
model: &Model,
args: WriteArgs,
selected_fields: FieldSelection,
trace_id: Option<String>,
) -> crate::Result<SelectionResult>;
) -> crate::Result<SingleRecord>;

/// Inserts many records at once into the database.
async fn create_records(
Expand Down
2 changes: 1 addition & 1 deletion query-engine/connectors/query-connector/src/write_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl WriteArgs {
}
}

pub fn as_record_projection(&self, model_projection: ModelProjection) -> Option<SelectionResult> {
pub fn as_selection_result(&self, model_projection: ModelProjection) -> Option<SelectionResult> {
let pairs: Vec<_> = model_projection
.scalar_fields()
.map(|field| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,20 @@ where
&mut self,
model: &Model,
args: WriteArgs,
selected_fields: FieldSelection,
trace_id: Option<String>,
) -> connector::Result<SelectionResult> {
) -> connector::Result<SingleRecord> {
catch(self.connection_info.clone(), async move {
let ctx = Context::new(&self.connection_info, trace_id.as_deref());
write::create_record(&self.inner, &self.connection_info.sql_family(), model, args, &ctx).await
write::create_record(
&self.inner,
&self.connection_info.sql_family(),
model,
args,
selected_fields,
&ctx,
)
.await
})
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,15 @@ pub(crate) async fn native_upsert(
let where_condition = upsert.filter().aliased_condition_from(None, false, ctx);
let update = build_update_and_set_query(upsert.model(), upsert.update().clone(), ctx).so_that(where_condition);

let insert = create_record(upsert.model(), upsert.create().clone(), ctx);
let insert = create_record(upsert.model(), upsert.create().clone(), &selected_fields, ctx);

let constraints: Vec<_> = upsert.unique_constraints().as_columns(ctx).collect();
let query: Query = insert
.on_conflict(OnConflict::Update(update, constraints))
.returning(selected_fields.as_columns(ctx))
.into();
let query: Query = insert.on_conflict(OnConflict::Update(update, constraints)).into();

let result_set = conn.query(query).await?;

let row = result_set.into_single()?;
let record = Record::from(row.to_sql_row(&meta)?);

Ok(SingleRecord { record, field_names })
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::column_metadata;
use crate::filter_conversion::AliasedCondition;
use crate::query_builder::write::{build_update_and_set_query, chunk_update_with_ids};
use crate::row::ToSqlRow;
use crate::{
error::SqlError, model_extensions::*, query_builder::write, sql_trace::SqlTraceComment, Context, QueryExt,
Queryable,
Expand All @@ -21,15 +23,15 @@ use user_facing_errors::query_engine::DatabaseConstraint;

async fn generate_id(
conn: &dyn Queryable,
primary_key: &FieldSelection,
id_field: &FieldSelection,
args: &WriteArgs,
ctx: &Context<'_>,
) -> crate::Result<Option<SelectionResult>> {
// Go through all the values and generate a select statement with the correct MySQL function
let (pk_select, need_select) = primary_key
let (id_select, need_select) = id_field
.selections()
.filter_map(|field| match field {
SelectedField::Scalar(x) if x.default_value().is_some() && !args.has_arg_for(x.db_name()) => x
SelectedField::Scalar(sf) if sf.default_value().is_some() && !args.has_arg_for(sf.db_name()) => sf
.default_value()
.unwrap()
.to_dbgenerated_func()
Expand All @@ -50,9 +52,9 @@ async fn generate_id(

// db generate values only if needed
if need_select {
let pk_select = pk_select.add_trace_id(ctx.trace_id);
let pk_select = id_select.add_trace_id(ctx.trace_id);
let pk_result = conn.query(pk_select.into()).await?;
let result = try_convert(&(primary_key.into()), pk_result)?;
let result = try_convert(&(id_field.into()), pk_result)?;

Ok(Some(result))
} else {
Expand All @@ -67,18 +69,19 @@ pub(crate) async fn create_record(
sql_family: &SqlFamily,
model: &Model,
mut args: WriteArgs,
selected_fields: FieldSelection,
ctx: &Context<'_>,
) -> crate::Result<SelectionResult> {
let pk = model.primary_identifier();
) -> crate::Result<SingleRecord> {
let id_field: FieldSelection = model.primary_identifier();

let returned_id = if *sql_family == SqlFamily::Mysql {
generate_id(conn, &pk, &args, ctx).await?
generate_id(conn, &id_field, &args, ctx)
.await?
.or_else(|| args.as_selection_result(ModelProjection::from(id_field)))
} else {
args.as_record_projection(pk.clone().into())
args.as_selection_result(ModelProjection::from(id_field))
};

let returned_id = returned_id.or_else(|| args.as_record_projection(pk.clone().into()));

let args = match returned_id {
Some(ref pk) if *sql_family == SqlFamily::Mysql => {
for (field, value) in pk.pairs.iter() {
Expand All @@ -91,7 +94,7 @@ pub(crate) async fn create_record(
_ => args,
};

let insert = write::create_record(model, args, ctx);
let insert = write::create_record(model, args, &ModelProjection::from(&selected_fields), ctx);

let result_set = match conn.insert(insert).await {
Ok(id) => id,
Expand Down Expand Up @@ -137,16 +140,34 @@ pub(crate) async fn create_record(
};

match (returned_id, result_set.len(), result_set.last_insert_id()) {
// All values provided in the write arrghs
(Some(identifier), _, _) if !identifier.misses_autogen_value() => Ok(identifier),

// with a working RETURNING statement
(_, n, _) if n > 0 => Ok(try_convert(&model.primary_identifier().into(), result_set)?),
(_, n, _) if n > 0 => {
let row = result_set.into_single()?;
let field_names: Vec<_> = selected_fields.db_names().collect();
let idents = ModelProjection::from(&selected_fields).type_identifiers_with_arities();
let meta = column_metadata::create(&field_names, &idents);
let sql_row = row.to_sql_row(&meta)?;
let record = Record::from(sql_row);

Ok(SingleRecord { record, field_names })
}

// All values provided in the write args
(Some(identifier), _, _) if !identifier.misses_autogen_value() => {
let field_names = identifier.db_names().map(ToOwned::to_owned).collect();
let record = Record::from(identifier);

Ok(SingleRecord { record, field_names })
}

// We have an auto-incremented id that we got from MySQL or SQLite
(Some(mut identifier), _, Some(num)) if identifier.misses_autogen_value() => {
identifier.add_autogen_value(num as i64);
Ok(identifier)

let field_names = identifier.db_names().map(ToOwned::to_owned).collect();
let record = Record::from(identifier);

Ok(SingleRecord { record, field_names })
}

(_, _, _) => panic!("Could not figure out an ID in create"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,20 @@ impl<'tx> WriteOperations for SqlConnectorTransaction<'tx> {
&mut self,
model: &Model,
args: WriteArgs,
selected_fields: FieldSelection,
trace_id: Option<String>,
) -> connector::Result<SelectionResult> {
) -> connector::Result<SingleRecord> {
catch(self.connection_info.clone(), async move {
let ctx = Context::new(&self.connection_info, trace_id.as_deref());
write::create_record(&self.inner, &self.connection_info.sql_family(), model, args, &ctx).await
write::create_record(
&self.inner,
&self.connection_info.sql_family(),
model,
args,
selected_fields,
&ctx,
)
.await
})
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use tracing::Span;

/// `INSERT` a new record to the database. Resulting an `INSERT` ast and an
/// optional `RecordProjection` if available from the arguments or model.
pub(crate) fn create_record(model: &Model, mut args: WriteArgs, ctx: &Context<'_>) -> Insert<'static> {
pub(crate) fn create_record(
model: &Model,
mut args: WriteArgs,
selected_fields: &ModelProjection,
ctx: &Context<'_>,
) -> Insert<'static> {
let fields: Vec<_> = model
.fields()
.scalar()
Expand All @@ -27,7 +32,7 @@ pub(crate) fn create_record(model: &Model, mut args: WriteArgs, ctx: &Context<'_
});

Insert::from(insert)
.returning(ModelProjection::from(model.primary_identifier()).as_columns(ctx))
.returning(selected_fields.as_columns(ctx))
.append_trace(&Span::current())
.add_trace_id(ctx.trace_id)
}
Expand Down
11 changes: 9 additions & 2 deletions query-engine/core/src/interpreter/query_interpreters/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ async fn create_one(
q: CreateRecord,
trace_id: Option<String>,
) -> InterpretationResult<QueryResult> {
let res = tx.create_record(&q.model, q.args, trace_id).await?;
let res = tx.create_record(&q.model, q.args, q.selected_fields, trace_id).await?;

Ok(QueryResult::Id(Some(res)))
Ok(QueryResult::RecordSelection(Box::new(RecordSelection {
name: q.name,
fields: q.selection_order,
aggregation_rows: None,
model: q.model,
scalars: res.into(),
nested: vec![],
})))
}

async fn create_many(
Expand Down
3 changes: 3 additions & 0 deletions query-engine/core/src/query_ast/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ impl ToGraphviz for WriteQuery {

#[derive(Debug, Clone)]
pub struct CreateRecord {
pub name: String,
pub model: Model,
pub args: WriteArgs,
pub selected_fields: FieldSelection,
pub selection_order: Vec<String>,
}

#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 96f20d9

Please sign in to comment.