diff --git a/Cargo.toml b/Cargo.toml index 024b6c3..790a462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ default-members = ["crates/shell", "crates/sqlite"] [profile.dev] panic = "abort" +strip = true [profile.release] panic = "abort" diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs index 02ef496..166a28c 100644 --- a/crates/core/src/bucket_priority.rs +++ b/crates/core/src/bucket_priority.rs @@ -1,5 +1,3 @@ -use core::ops::RangeInclusive; - use serde::{de::Visitor, Deserialize}; use sqlite_nostd::ResultCode; @@ -15,16 +13,13 @@ impl BucketPriority { } pub const HIGHEST: BucketPriority = BucketPriority(0); - pub const LOWEST: BucketPriority = BucketPriority(3); } impl TryFrom for BucketPriority { type Error = SQLiteError; fn try_from(value: i32) -> Result { - const VALID: RangeInclusive = (BucketPriority::HIGHEST.0)..=(BucketPriority::LOWEST.0); - - if !VALID.contains(&value) { + if value < BucketPriority::HIGHEST.0 { return Err(SQLiteError( ResultCode::MISUSE, Some("Invalid bucket priority".into()), diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 022520d..30cf11c 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -40,8 +40,6 @@ bucket_list(bucket, checksum) AS ( json_extract(json_each.value, '$.bucket') as bucket, json_extract(json_each.value, '$.checksum') as checksum FROM json_each(json_extract(?1, '$.buckets')) - WHERE IFNULL(json_extract(json_each.value, '$.priority'), 1) <= - IFNULL(json_extract(?1, '$.priority'), 3) ) SELECT bucket_list.bucket as bucket, diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index 8746355..4ac441b 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -84,7 +84,7 @@ extern "C" fn update( let result = insert_operation(db, args[3].text()); vtab_result(vtab, result) } else if op == "sync_local" { - let result = sync_local(db, args[3]); + let result = sync_local(db, &args[3]); if let Ok(result_row) = result { unsafe { *p_row_id = result_row; diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 249f538..527e2bb 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -6,91 +6,223 @@ use serde::Deserialize; use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; -use sqlite_nostd::{self as sqlite, Value}; +use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; use crate::ext::SafeManagedStmt; use crate::util::{internal_table_name, quote_internal_name}; -fn can_apply_sync_changes( - db: *mut sqlite::sqlite3, +pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { + let mut operation = SyncOperation::new(db, data)?; + operation.apply() +} + +struct PartialSyncOperation<'a> { + /// The lowest priority part of the partial sync operation. priority: BucketPriority, -) -> Result { - // Don't publish downloaded data until the upload queue is empty (except for downloaded data in - // priority 0, which is published earlier). - if !priority.may_publish_with_outstanding_uploads() { - // language=SQLite - let statement = db.prepare_v2( - "\ -SELECT group_concat(name) -FROM ps_buckets -WHERE target_op > last_op AND name = '$local'", - )?; - - if statement.step()? != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::ABORT)); - } + /// The JSON-encoded arguments passed by the client SDK. This includes the priority and a list + /// of bucket names in that (and higher) priorities. + args: &'a str, +} - if statement.column_type(0)? == ColumnType::Text { - return Ok(false); - } +struct SyncOperation<'a> { + db: *mut sqlite::sqlite3, + data_tables: BTreeSet, + partial: Option>, +} - let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; - if statement.step()? != ResultCode::DONE { - return Ok(false); - } +impl<'a> SyncOperation<'a> { + fn new(db: *mut sqlite::sqlite3, data: &'a V) -> Result { + return Ok(Self { + db: db, + data_tables: BTreeSet::new(), + partial: match data.value_type() { + ColumnType::Text => { + let text = data.text(); + if text.len() > 0 { + #[derive(Deserialize)] + struct PartialSyncLocalArguments { + #[serde(rename = "buckets")] + _buckets: Vec, + priority: BucketPriority, + } + + let args: PartialSyncLocalArguments = serde_json::from_str(text)?; + Some(PartialSyncOperation { + priority: args.priority, + args: text, + }) + } else { + None + } + } + _ => None, + }, + }); } - Ok(true) -} + fn can_apply_sync_changes(&self) -> Result { + // Don't publish downloaded data until the upload queue is empty (except for downloaded data + //in priority 0, which is published earlier). + + let needs_check = match &self.partial { + Some(p) => !p.priority.may_publish_with_outstanding_uploads(), + None => true, + }; + + if needs_check { + // language=SQLite + let statement = self.db.prepare_v2( + "\ + SELECT group_concat(name) + FROM ps_buckets + WHERE target_op > last_op AND name = '$local'", + )?; + + if statement.step()? != ResultCode::ROW { + return Err(SQLiteError::from(ResultCode::ABORT)); + } + + if statement.column_type(0)? == ColumnType::Text { + return Ok(false); + } + + let statement = self.db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; + if statement.step()? != ResultCode::DONE { + return Ok(false); + } + } -pub fn sync_local(db: *mut sqlite::sqlite3, data: *mut sqlite::value) -> Result { - #[derive(Deserialize)] - struct SyncLocalArguments { - #[serde(rename = "buckets")] - _buckets: Vec, - priority: Option, + Ok(true) } - const FALLBACK_PRIORITY: BucketPriority = BucketPriority::LOWEST; - let (has_args, priority) = match data.value_type() { - ColumnType::Text => { - let text = data.text(); - if text.len() > 0 { - let args: SyncLocalArguments = serde_json::from_str(text)?; - (true, args.priority.unwrap_or(FALLBACK_PRIORITY)) + fn apply(&mut self) -> Result { + if !self.can_apply_sync_changes()? { + return Ok(0); + } + + self.collect_tables()?; + let statement = self.collect_full_operations()?; + // TODO: cache statements + while statement.step().into_db_result(self.db)? == ResultCode::ROW { + let type_name = statement.column_text(0)?; + let id = statement.column_text(1)?; + let buckets = statement.column_int(3)?; + let data = statement.column_text(2); + + let table_name = internal_table_name(type_name); + + if self.data_tables.contains(&table_name) { + let quoted = quote_internal_name(type_name, false); + + if buckets == 0 { + // DELETE + let delete_statement = self + .db + .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) + .into_db_result(self.db)?; + delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + delete_statement.exec()?; + } else { + // INSERT/UPDATE + let insert_statement = self + .db + .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) + .into_db_result(self.db)?; + insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; + insert_statement.exec()?; + } } else { - (false, FALLBACK_PRIORITY) + if buckets == 0 { + // DELETE + // language=SQLite + let delete_statement = self + .db + .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") + .into_db_result(self.db)?; + delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; + delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; + delete_statement.exec()?; + } else { + // INSERT/UPDATE + // language=SQLite + let insert_statement = self + .db + .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") + .into_db_result(self.db)?; + insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; + insert_statement.exec()?; + } } } - _ => (false, FALLBACK_PRIORITY), - }; - if !can_apply_sync_changes(db, priority)? { - return Ok(0); + self.set_last_applied_op()?; + self.mark_completed()?; + + Ok(1) } - // language=SQLite - let statement = db - .prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'") - .into_db_result(db)?; - let mut tables: BTreeSet = BTreeSet::new(); + fn collect_tables(&mut self) -> Result<(), SQLiteError> { + // language=SQLite + let statement = self + .db + .prepare_v2( + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'", + ) + .into_db_result(self.db)?; - while statement.step()? == ResultCode::ROW { - let name = statement.column_text(0)?; - tables.insert(String::from(name)); + while statement.step()? == ResultCode::ROW { + let name = statement.column_text(0)?; + self.data_tables.insert(String::from(name)); + } + Ok(()) } - // Query for updated objects + fn collect_full_operations(&self) -> Result { + Ok(match &self.partial { + None => { + // Complete sync + self.db + .prepare_v2( + "\ +-- 1. Filter oplog by the ops added but not applied yet (oplog b). +-- SELECT DISTINCT / UNION is important for cases with many duplicate ids. +WITH updated_rows AS ( + SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id + AND (b.op_id > buckets.last_applied_op) + UNION SELECT row_type, row_id FROM ps_updated_rows +) - // language=SQLite - let statement = db - .prepare_v2( - "\ +-- 3. Group the objects from different buckets together into a single one (ops). +SELECT b.row_type as type, + b.row_id as id, + r.data as data, + count(r.bucket) as buckets, + /* max() affects which row is used for 'data' */ + max(r.op_id) as op_id +-- 2. Find *all* current ops over different buckets for those objects (oplog r). +FROM updated_rows b + LEFT OUTER JOIN ps_oplog AS r + ON r.row_type = b.row_type + AND r.row_id = b.row_id +-- Group for (3) +GROUP BY b.row_type, b.row_id", + ) + .into_db_result(self.db)? + } + Some(partial) => { + let stmt = self + .db + .prepare_v2( + "\ -- 1. Filter oplog by the ops added but not applied yet (oplog b). -- SELECT DISTINCT / UNION is important for cases with many duplicate ids. WITH - involved_buckets (id) AS ( + involved_buckets (id) AS MATERIALIZED ( SELECT id FROM ps_buckets WHERE ?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))) ), @@ -98,13 +230,11 @@ WITH SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op) WHERE buckets.id IN (SELECT id FROM involved_buckets) - UNION SELECT TRUE, row_type, row_id FROM ps_updated_rows ) -- 3. Group the objects from different buckets together into a single one (ops). SELECT b.row_type as type, b.row_id as id, - b.local as local, r.data as data, count(r.bucket) as buckets, /* max() affects which row is used for 'data' */ @@ -117,101 +247,61 @@ FROM updated_rows b AND r.bucket IN (SELECT id FROM involved_buckets) -- Group for (3) GROUP BY b.row_type, b.row_id", - ) - .into_db_result(db)?; + ) + .into_db_result(self.db)?; + stmt.bind_text(1, partial.args, Destructor::STATIC)?; - if has_args { - statement.bind_value(1, data)?; - } else { - statement.bind_null(1)?; + stmt + } + }) } - // TODO: cache statements - while statement.step().into_db_result(db)? == ResultCode::ROW { - let type_name = statement.column_text(0)?; - let id = statement.column_text(1)?; - let local = statement.column_int(2)? == 1; - let buckets = statement.column_int(4)?; - let data = statement.column_text(3); - - let table_name = internal_table_name(type_name); - - if local && buckets == 0 && priority == BucketPriority::HIGHEST { - // These rows are still local and they haven't been uploaded yet (which we allow for - // buckets with priority=0 completing). We should just keep them around. - continue; - } - - if tables.contains(&table_name) { - let quoted = quote_internal_name(type_name, false); - - if buckets == 0 { - // DELETE - let delete_statement = db - .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) - .into_db_result(db)?; - delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - delete_statement.exec()?; - } else { - // INSERT/UPDATE - let insert_statement = db - .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) - .into_db_result(db)?; - insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; - insert_statement.exec()?; - } - } else { - if buckets == 0 { - // DELETE + fn set_last_applied_op(&self) -> Result<(), SQLiteError> { + match &self.partial { + Some(partial) => { // language=SQLite - let delete_statement = db - .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") - .into_db_result(db)?; - delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; - delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; - delete_statement.exec()?; - } else { - // INSERT/UPDATE + let updated = self + .db + .prepare_v2( "\ + UPDATE ps_buckets + SET last_applied_op = last_op + WHERE last_applied_op != last_op AND + name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))", + ) + .into_db_result(self.db)?; + updated.bind_text(1, partial.args, Destructor::STATIC)?; + updated.exec()?; + } + None => { // language=SQLite - let insert_statement = db - .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") - .into_db_result(db)?; - insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; - insert_statement.exec()?; + self.db + .exec_safe( + "UPDATE ps_buckets + SET last_applied_op = last_op + WHERE last_applied_op != last_op", + ) + .into_db_result(self.db)?; } } - } - // language=SQLite - let updated = db - .prepare_v2( - "UPDATE ps_buckets - SET last_applied_op = last_op - WHERE last_applied_op != last_op AND - (?1 IS NULL OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))))", - ) - .into_db_result(db)?; - if has_args { - updated.bind_value(1, data)?; - } else { - updated.bind_null(1)?; + Ok(()) } - updated.exec()?; - if priority == BucketPriority::LOWEST { - // language=SQLite - db.exec_safe("DELETE FROM ps_updated_rows") - .into_db_result(db)?; + fn mark_completed(&self) -> Result<(), SQLiteError> { + if self.partial.is_none() { + // language=SQLite + self.db + .exec_safe("DELETE FROM ps_updated_rows") + .into_db_result(self.db)?; - // language=SQLite - db.exec_safe( - "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())", - ) - .into_db_result(db)?; - } + // language=SQLite + self.db + .exec_safe( + "insert or replace into ps_kv(key, value) values('last_synced_at', datetime())", + ) + .into_db_result(self.db)?; + } - Ok(1) + Ok(()) + } } diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index 16f7a7f..b58ebaf 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -120,7 +120,7 @@ fn powersync_init_impl( setup_internal_views(local_db)?; - powersync_migrate(ctx, 7)?; + powersync_migrate(ctx, 6)?; Ok(String::from("")) } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index fe95733..b088485 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -57,14 +57,17 @@ void main() { json.encode({ 'last_op_id': lastOpId, 'write_checkpoint': writeCheckpoint, - 'buckets': checksums, + 'buckets': [ + for (final cs in checksums.cast>()) + if (priority == null || cs['priority'] <= priority) cs + ], 'priority': priority, }) ]); final decoded = json.decode(row['r']); if (decoded['valid'] != true) { - fail(decoded); + fail(row['r']); } db.execute( @@ -77,13 +80,15 @@ void main() { db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [ 'sync_local', - jsonEncode({ - 'priority': priority, - 'buckets': [ - for (final cs in checksums.cast>()) - if (priority == null || cs['priority'] <= priority) cs['bucket'] - ], - }) + priority != null + ? jsonEncode({ + 'priority': priority, + 'buckets': [ + for (final cs in checksums.cast>()) + if (cs['priority'] <= priority) cs['bucket'] + ], + }) + : null, ]); return db.lastInsertRowId == 1; }