diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..019b577 --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +# Miscellaneous +*.class +*.log +*.pyc +*.swp +.DS_Store +.atom/ +.buildlog/ +.history +.svn/ +migrate_working_dir/ + +# IntelliJ related +*.iml +*.ipr +*.iws +.idea/ + +# The .vscode folder contains launch configuration and tasks you configure in +# VS Code which you may wish to be included in version control, so this line +# is commented out by default. +#.vscode/ + +# Flutter/Dart/Pub related +# Libraries should not include pubspec.lock, per https://dart.dev/guides/libraries/private-files#pubspeclock. +/pubspec.lock +**/doc/api/ +.dart_tool/ +.packages +build/ + +# Test store +store/ diff --git a/.metadata b/.metadata new file mode 100644 index 0000000..adf9702 --- /dev/null +++ b/.metadata @@ -0,0 +1,10 @@ +# This file tracks properties of this Flutter project. +# Used by Flutter tool to assess capabilities and perform upgrades etc. +# +# This file should be version controlled and should not be manually edited. + +version: + revision: 75927305ff855f76a9ef704f9b4a86fa2fce7292 + channel: beta + +project_type: package diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..41cc7d8 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,3 @@ +## 0.0.1 + +* TODO: Describe initial release. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..27e142e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [2022] [Daniel Cachapa] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..19f0ccb --- /dev/null +++ b/README.md @@ -0,0 +1,100 @@ +> ⚠ This package is still under development and may not be stable + +Dart implementation of Conflict-free Replicated Data Types (CRDTs) using a Sqlite database for data storage. +This project is a continuation of the [crdt](https://github.com/cachapa/crdt) package and may depend on it in the future. + +## Notes + +`sqlite_crdt` has no intention of being an ORM - on the contrary, ideally it should look like using a normal SQL database. Unfortunately that's not possible at this time because CRDTs have a few properties that need to be guaranteed: + +* Every change needs to be associated to `hlc` and `modified` timestamps +* There are different rules to generating the timestamps depending on whether data is being inserted or merged +* Timestamp generation require non-trivial checks, e.g. clock drift, duplicate nodes, etc. +* Records may not be deleted, but rather marked by setting the `is_deleted` flag + +For those reasons, while the raw sqlite database is exposed, changes to the database should only be made using the provided methods `insert` `update` `merge` `setDeleted` and their siblings. + +## Usage + +```dart +// Create or load the database +final crdt = await SqliteCrdt.open( + 'store', + 'sqlite_crdt_test', + ['users'], + version: 1, + onCreate: (db, version) { + // Use [createCrdtTable] to automatically add the CRDT columns + db.createCrdtTable(''' + CREATE TABLE users ( + id INTEGER NOT NULL, + name TEXT, + PRIMARY KEY (id) + ) + '''); + + // You can also create non-crdt tables, they will be ignored + db.execute(''' + CREATE TABLE not_a_crdt ( + id TEXT NOT NULL, + count INTEGER, + PRIMARY KEY (id) + ) + '''); + }, +); + +// Insert data into the database +await crdt.insert('users', { + 'id': 1, + 'name': 'John Doe', +}); + +// Delete it +await crdt.setDeleted('users', [1]); + +// Or merge a remote dataset +await crdt.merge({ + 'users': [ + { + 'id': 2, + 'name': 'Jane Doe', + 'hlc': Hlc.now(uuid()).toString(), + }, + ], +}); + +// Queries are simple SQL statements, but notice: +// 1. the CRDT columns: hlc, modified, is_deleted +// 2. Mr. John Doe appears in the results with is_deleted = 1 +final result = await crdt.query('SELECT * FROM users'); + +// Perhaps a better query would be +final betterResult = + await crdt.query('SELECT id, name FROM users WHERE is_deleted = 0'); + +// We can also watch for results to a specific query, but be aware that this +// can be inefficient since it reruns watched queries on every database change +crdt.watch('SELECT id, name FROM users WHERE is_deleted = 0').listen(print); + +// Update the database +await crdt.update('users', [2], {'name': 'Jane Doe 👍'}); + +// Undelete Mr. Doe +await crdt.setDeleted('users', [1], false); + +// Create a changeset to synchronize with another node +final changeset = await crdt.getChangeset(); +``` + +## Usage: Reading Data + +Queries are standard SQL fare with three columns required for the CRDT functionality: `hlc` `modified` `is_deleted` + + +* In most cases you'll want to filter out deleted data by adding `WHERE is_deleted = 0` to your queries +* There's a `watch` method that emits results whenever the database changes, but it is inefficient since it just reruns the query whenever the database changes + +## Features and bugs + +Please file feature requests and bugs at the [issue tracker](https://github.com/cachapa/sqlite_crdt/issues). diff --git a/analysis_options.yaml b/analysis_options.yaml new file mode 100644 index 0000000..54667c7 --- /dev/null +++ b/analysis_options.yaml @@ -0,0 +1,6 @@ +include: package:lints/recommended.yaml + +linter: + rules: + prefer_single_quotes: true + unawaited_futures: true diff --git a/example/example.dart b/example/example.dart new file mode 100644 index 0000000..3563b14 --- /dev/null +++ b/example/example.dart @@ -0,0 +1,91 @@ +import 'dart:convert'; +import 'dart:io'; + +import 'package:sqlite_crdt/sqlite_crdt.dart'; +import 'package:sqlite_crdt/src/util/uuid.dart'; + +Future main() async { + // Cleanup past runs + final file = File('store/sqlite_crdt_test.db'); + if (file.existsSync()) file.deleteSync(); + + // Create or load the database + final crdt = await SqliteCrdt.open( + 'store', + 'sqlite_crdt_test', + ['users'], + version: 1, + onCreate: (db, version) { + // Use [createCrdtTable] to automatically add the CRDT columns + db.createCrdtTable(''' + CREATE TABLE users ( + id INTEGER NOT NULL, + name TEXT, + PRIMARY KEY (id) + ) + '''); + + // You can also create non-crdt tables, they will be ignored + db.execute(''' + CREATE TABLE not_a_crdt ( + id TEXT NOT NULL, + count INTEGER, + PRIMARY KEY (id) + ) + '''); + }, + ); + + // Insert data into the database + await crdt.insert('users', { + 'id': 1, + 'name': 'John Doe', + }); + + // Delete it + await crdt.setDeleted('users', [1]); + + // Or merge a remote dataset + await crdt.merge({ + 'users': [ + { + 'id': 2, + 'name': 'Jane Doe', + 'hlc': Hlc.now(uuid()).toString(), + }, + ], + }); + + // Queries are simple SQL statements, but notice: + // 1. the CRDT columns: hlc, modified, is_deleted + // 2. Mr. John Doe appears in the results with is_deleted = 1 + final result = await crdt.query('SELECT * FROM users'); + printRecords('SELECT * FROM users', result); + + // Perhaps a better query would be + final betterResult = + await crdt.query('SELECT id, name FROM users WHERE is_deleted = 0'); + printRecords('SELECT id, name FROM users WHERE is_deleted = 0', betterResult); + + // We can also watch for results to a specific query, but be aware that this + // can be inefficient since it reruns watched queries on every database change + crdt.watch('SELECT id, name FROM users WHERE is_deleted = 0').listen((e) => + printRecords('Watch: SELECT id, name FROM users WHERE is_deleted = 0', e)); + + // Update the database + await crdt.update('users', [2], {'name': 'Jane Doe 👍'}); + + // Undelete Mr. Doe + await crdt.setDeleted('users', [1], false); + + // Create a changeset to synchronize with another node + final changeset = await crdt.getChangeset(); + print('> Changeset'); + print(changeset); +} + +void printRecords(String title, List> records) { + print('> $title'); + records.forEach(print); + print(''); +} diff --git a/lib/sqlite_crdt.dart b/lib/sqlite_crdt.dart new file mode 100644 index 0000000..df91936 --- /dev/null +++ b/lib/sqlite_crdt.dart @@ -0,0 +1,7 @@ +library sqlite_crdt; + +export 'package:sqflite_common/sqlite_api.dart'; + +export 'src/extensions.dart'; +export 'src/hlc.dart'; +export 'src/sqlite_crdt.dart'; diff --git a/lib/src/extensions.dart b/lib/src/extensions.dart new file mode 100644 index 0000000..1b2d0bb --- /dev/null +++ b/lib/src/extensions.dart @@ -0,0 +1,27 @@ +import 'package:sqflite_common/sqlite_api.dart'; + +extension DatabaseX on Database { + /// Runs a create table statement and adds CRDT columns to it. + /// See [crdtfyTable]. + Future createCrdtTable(String createStatement) async { + // Extract table name from statement + final tokens = createStatement.split(' '); + final i = tokens.indexWhere((e) => e.startsWith('(')); + final table = tokens[i - 1]; + + await execute(createStatement); + await crdtfyTable(table); + } + + /// Runs a create table statement and adds CRDT columns to it. + /// See [crdtfyTable]. + Future crdtfyTable(String table) => execute(''' + ALTER TABLE $table ADD COLUMN is_deleted INTEGER DEFAULT 0; + ALTER TABLE $table ADD COLUMN hlc TEXT NOT NULL; + ALTER TABLE $table ADD COLUMN modified TEXT NOT NULL; + '''); +} + +extension MapX on Map>> { + int get recordCount => values.fold(0, (prev, e) => prev + e.length); +} diff --git a/lib/src/hlc.dart b/lib/src/hlc.dart new file mode 100644 index 0000000..717c10d --- /dev/null +++ b/lib/src/hlc.dart @@ -0,0 +1,157 @@ +import 'dart:math'; + +const _shift = 16; +const _maxCounter = 0xFFFF; +const _maxDrift = 60000; // 1 minute in ms + +/// A Hybrid Logical Clock implementation. +/// This class trades time precision for a guaranteed monotonically increasing +/// clock in distributed systems. +/// Inspiration: https://cse.buffalo.edu/tech-reports/2014-04.pdf +class Hlc implements Comparable { + final int millis; + final int counter; + final String nodeId; + + int get logicalTime => (millis << _shift) + counter; + + const Hlc(int millis, this.counter, this.nodeId) + : assert(counter <= _maxCounter), + // Detect microseconds and convert to millis + millis = millis < 0x0001000000000000 ? millis : millis ~/ 1000; + + const Hlc.zero(String nodeId) : this(0, 0, nodeId); + + Hlc.fromDate(DateTime dateTime, String nodeId) + : this(dateTime.millisecondsSinceEpoch, 0, nodeId); + + Hlc.now(String nodeId) : this.fromDate(DateTime.now(), nodeId); + + Hlc.fromLogicalTime(logicalTime, String nodeId) + : this(logicalTime >> _shift, logicalTime & _maxCounter, nodeId); + + factory Hlc.parse(String timestamp) { + final counterDash = timestamp.indexOf('-', timestamp.lastIndexOf(':')); + final nodeIdDash = timestamp.indexOf('-', counterDash + 1); + final millis = DateTime.parse(timestamp.substring(0, counterDash)) + .millisecondsSinceEpoch; + final counter = + int.parse(timestamp.substring(counterDash + 1, nodeIdDash), radix: 16); + final nodeId = timestamp.substring(nodeIdDash + 1); + return Hlc(millis, counter, nodeId); + } + + Hlc apply({int? millis, int? counter, String? nodeId}) => Hlc( + millis ?? this.millis, counter ?? this.counter, nodeId ?? this.nodeId); + + /// Generates a unique, monotonic timestamp suitable for transmission to + /// another system in string format. Local wall time will be used if + /// [millis] isn't supplied. + factory Hlc.send(Hlc canonical, {int? millis}) { + // Retrieve the local wall time if millis is null + millis = millis ?? DateTime.now().millisecondsSinceEpoch; + + // Unpack the canonical time and counter + final millisOld = canonical.millis; + final counterOld = canonical.counter; + + // Calculate the next time and counter + // * ensure that the logical time never goes backward + // * increment the counter if time does not advance + final millisNew = max(millisOld, millis); + final counterNew = millisOld == millisNew ? counterOld + 1 : 0; + + // Check the result for drift and counter overflow + if (millisNew - millis > _maxDrift) { + throw ClockDriftException(millisNew, millis); + } + if (counterNew > _maxCounter) { + throw OverflowException(counterNew); + } + + return Hlc(millisNew, counterNew, canonical.nodeId); + } + + /// Compares and validates a timestamp from a remote system with the local + /// canonical timestamp to preserve monotonicity. + /// Returns an updated canonical timestamp instance. + /// Local wall time will be used if [millis] isn't supplied. + factory Hlc.recv(Hlc canonical, Hlc remote, {int? millis}) { + // Retrieve the local wall time if millis is null + millis = millis ?? DateTime.now().millisecondsSinceEpoch; + + // No need to do any more work if the remote logical time is lower + if (canonical.logicalTime >= remote.logicalTime) return canonical; + + // Assert the node id + if (canonical.nodeId == remote.nodeId) { + throw DuplicateNodeException(canonical.nodeId.toString()); + } + // Assert the remote clock drift + if (remote.millis - millis > _maxDrift) { + throw ClockDriftException(remote.millis, millis); + } + + return Hlc.fromLogicalTime(remote.logicalTime, canonical.nodeId); + } + + String toJson() => toString(); + + @override + String toString() => + '${DateTime.fromMillisecondsSinceEpoch(millis, isUtc: true).toIso8601String()}' + '-${counter.toRadixString(16).toUpperCase().padLeft(4, '0')}' + '-$nodeId'; + + @override + int get hashCode => toString().hashCode; + + @override + bool operator ==(other) => other is Hlc && compareTo(other) == 0; + + bool operator <(other) => other is Hlc && compareTo(other) < 0; + + bool operator <=(other) => this < other || this == other; + + bool operator >(other) => other is Hlc && compareTo(other) > 0; + + bool operator >=(other) => this > other || this == other; + + @override + int compareTo(Hlc other) { + final time = logicalTime.compareTo(other.logicalTime); + return time != 0 ? time : nodeId.compareTo(other.nodeId); + } +} + +class ClockDriftException implements Exception { + final int drift; + + ClockDriftException(int millisTs, int millisWall) + : drift = millisTs - millisWall; + + @override + String toString() => 'Clock drift of $drift ms exceeds maximum ($_maxDrift)'; +} + +class OverflowException implements Exception { + final int counter; + + OverflowException(this.counter); + + @override + String toString() => 'Timestamp counter overflow: $counter'; +} + +class DuplicateNodeException implements Exception { + final String nodeId; + + DuplicateNodeException(this.nodeId); + + @override + String toString() => 'Duplicate node: $nodeId'; +} + +extension StringHlcX on String { + Hlc get toHlc => Hlc.parse(this); +} diff --git a/lib/src/sqlite_crdt.dart b/lib/src/sqlite_crdt.dart new file mode 100644 index 0000000..6c4de95 --- /dev/null +++ b/lib/src/sqlite_crdt.dart @@ -0,0 +1,373 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +import 'package:sqflite_common/sqlite_api.dart'; + +// ignore: implementation_imports +import 'package:sqflite_common/src/open_options.dart'; +import 'package:sqflite_common_ffi/sqflite_ffi.dart'; +import 'package:sqlite_crdt/src/util/uuid.dart'; + +import 'hlc.dart'; + +class SqliteCrdt { + final Database _db; + Hlc canonicalTime; + late final Map> _schemas; + + final _allChanges = StreamController.broadcast(); + final _watches = >>, _Query>{}; + + String get nodeId => canonicalTime.nodeId; + + /// Returns the last modified timestamp from other peers + Future get peerLastModified => lastModified(excludeNodeId: nodeId); + + Stream get allChanges => _allChanges.stream; + + Iterable get tables => _schemas.keys; + + /// Returns the last modified timestamp, optionally filtering for or against a + /// specific node id. + /// Useful to get "modified since" timestamps for synchronization. + Future lastModified({String? onlyNodeId, String? excludeNodeId}) => + _latestModified(_db, tables, + onlyNodeId: onlyNodeId, excludeNodeId: excludeNodeId); + + static Future _latestModified(Database db, Iterable tables, + {String? onlyNodeId, String? excludeNodeId}) async { + assert(onlyNodeId == null || excludeNodeId == null); + + final whereStatement = onlyNodeId != null + ? "WHERE hlc LIKE '%' || ?1" + : excludeNodeId != null + ? "WHERE hlc NOT LIKE '%' || ?1" + : ''; + final tableStatements = tables.map((table) => + 'SELECT max(modified) AS modified FROM $table $whereStatement'); + final result = await db.rawQuery( + ''' + SELECT max(modified) AS modified FROM ( + ${tableStatements.join('\nUNION ALL\n')} + ) + ''', + [ + if (onlyNodeId != null) onlyNodeId, + if (excludeNodeId != null) excludeNodeId, + ], + ); + return (result.first['modified'] as String?)?.toHlc; + } + + SqliteCrdt._(this._db, this.canonicalTime); + + static Future open( + String basePath, + String name, + Iterable tables, { + bool inMemory = false, + int? version, + OnDatabaseCreateFn? onCreate, + OnDatabaseVersionChangeFn? onUpgrade, + }) async { + // Initialize FFI + sqfliteFfiInit(); + if (Platform.isLinux) { + await databaseFactoryFfi.setDatabasesPath('.'); + } + + final db = await databaseFactoryFfi.openDatabase( + inMemory ? inMemoryDatabasePath : '$basePath/$name.db', + options: SqfliteOpenDatabaseOptions( + version: version, + onCreate: onCreate, + onUpgrade: onUpgrade, + ), + ); + + // Get existing node id, or generate one + final canonicalTime = await _latestModified(db, tables); + final crdt = SqliteCrdt._(db, canonicalTime ?? Hlc.zero(uuid())); + + // Read schemas directly from database + crdt._schemas = { + for (final table in tables) table: await _getTableColumns(db, table) + }; + + return crdt; + } + + // TODO Maybe remove this method? + Iterable getPrimaryKeys(String table) => + _schemas[table]!.where((e) => e.isPrimaryKey).map((e) => e.name); + + // TODO Check statements for INSERT, UPDATE, DELETE and trigger watches + // Future execute(String sql, [List? args]) => + // _db.execute(sql, args); + + Future>> query(String sql, [List? args]) => + _db.rawQuery(sql, args?.map(_encode).toList()); + + Stream>> watch(String sql, [List? args]) { + late final StreamController>> controller; + controller = StreamController>>( + onListen: () { + final query = _Query(sql, args); + _watches[controller] = query; + _emitQuery(controller, query); + }, + onCancel: () { + _watches.remove(controller); + controller.close(); + }, + ); + + return controller.stream; + } + + /// Insert a new record in the database. + /// See [insertAll], [insertAllTable], [update]. + Future insert(String table, Map record) => + insertAllTable(table, [record]); + + /// Insert new records in the database. + /// See [insert], [insertAllTable], [update]. + Future insertAll( + Map>> records) async { + final count = records.values.fold(0, (prev, e) => prev + e.length); + if (count == 0) return; + + await beginTransaction(); + canonicalTime = Hlc.send(canonicalTime); + for (final entry in records.entries) { + final table = entry.key; + final records = entry.value; + + final columns = + [...records.first.keys, 'is_deleted', 'hlc', 'modified'].join(', '); + final placeholders = + List.generate(records.first.length + 3, (i) => '?${i + 1}') + .join(', '); + + for (final record in records) { + final values = [...record.values, false, canonicalTime, canonicalTime] + .map(_encode) + .toList(); + final sql = ''' + INSERT INTO $table ($columns) + VALUES ($placeholders) + '''; + await _db.execute(sql, values); + } + } + await commitTransaction(); + await _onDbChanged(); + } + + /// Insert new records into a table in the database. + /// See [insert], [insertAll], [update]. + Future insertAllTable( + String table, Iterable> records) async { + if (records.isEmpty) return; + return insertAll({table: records}); + } + + /// Update [fields] in an existing value with [ids]. + /// Set [isDeleted] to true if the value is to be marked as deleted. + /// Note: data is never actually deleted from the database since CRDT deletions need to be propagated. + /// Fields need to be overwritten if purging data is required. + Future update( + String table, Iterable ids, Map fields, + [bool isDeleted = false]) async { + // Find primary key fields + final keyCols = + _schemas[table]!.where((e) => e.isPrimaryKey).map((e) => e.name); + assert(keyCols.length == ids.length); + + canonicalTime = Hlc.send(canonicalTime); + final crdtFields = { + ...fields, + 'is_deleted': isDeleted, + 'hlc': canonicalTime, + 'modified': canonicalTime, + }; + + var i = 1; + final updateStatement = + crdtFields.keys.map((e) => '"$e" = ?${i++}').join(', \n'); + final whereStatement = keyCols.map((e) => '"$e" = ?${i++}').join(' AND \n'); + + final sql = ''' + UPDATE "$table" SET + $updateStatement + WHERE + $whereStatement + '''; + + final values = [...crdtFields.values, ...ids].map(_encode).toList(); + await _db.execute(sql, values); + await _onDbChanged(); + } + + /// Marks record as deleted in the CRDT. Set [isDeleted] to false to restore. + /// Convenience method for [update]. + Future setDeleted(String table, List ids, + [bool isDeleted = true]) => + update(table, ids, {}, isDeleted); + + /// Returns all CRDT records in database. + /// Use [modifiedSince] to fetch only recently changed records. + /// Set [onlyModifiedHere] to get only records changed in this node. + Future>>> getChangeset( + {Iterable? fromTables, + Hlc? modifiedSince, + bool onlyModifiedHere = false}) async => + { + for (final table in fromTables ?? tables) + table: await getTableChangeset( + table, + modifiedSince: modifiedSince, + onlyModifiedHere: onlyModifiedHere, + ) + }..removeWhere((_, records) => records.isEmpty); + + /// Returns all records in [table]. + /// See [getChangeset]. + Future>> getTableChangeset(String table, + {Hlc? modifiedSince, bool onlyModifiedHere = false}) async { + final conditions = [ + if (modifiedSince != null) "modified > '$modifiedSince'", + if (onlyModifiedHere) "hlc LIKE '%$nodeId'", + ]; + final conditionClause = + conditions.isEmpty ? '' : 'WHERE ${conditions.join(' AND ')}'; + + return await _db.rawQuery('SELECT * FROM $table $conditionClause'); + } + + /// Merge [changeset] into all tables. + Future merge( + Map>> changeset) async { + await beginTransaction(); + + // Iterate through all the remote timestamps to + // 1. Check for invalid entries (throws exception) + // 2. Update local canonical time if needed + var hlc = canonicalTime; + for (final records in changeset.values) { + hlc = records.fold(hlc, + (hlc, record) => Hlc.recv(hlc, Hlc.parse(record['hlc'] as String))); + } + canonicalTime = hlc; + + for (final entry in changeset.entries) { + final table = entry.key; + final records = entry.value; + + final columnNames = _schemas[table]!.map((e) => e.name).toSet(); + for (final record in records) { + record['modified'] = canonicalTime; + record.removeWhere((key, _) => !columnNames.contains(key)); + + final columns = record.keys.join(', '); + final placeholders = + List.generate(record.length, (i) => '?${i + 1}').join(', '); + final values = record.values.map(_encode).toList(); + + var i = 1; + final updateStatement = + record.keys.map((e) => '$e = ?${i++}').join(', \n'); + + final sql = ''' + INSERT INTO $table ($columns) + VALUES ($placeholders) + ON CONFLICT DO + UPDATE SET $updateStatement + WHERE excluded.hlc > $table.hlc + '''; + await _db.execute(sql, values); + } + } + + await commitTransaction(); + await _onDbChanged(); + } + + /// Merge [changeset] into [table]. + /// See [merge]. + Future mergeTable( + String table, Iterable> changeset) => + merge({table: changeset}); + + var _transactionCount = 0; + + Future beginTransaction() async { + if (_transactionCount == 0) await _db.execute('BEGIN TRANSACTION'); + _transactionCount++; + } + + Future commitTransaction() async { + _transactionCount--; + if (_transactionCount == 0) await _db.execute('COMMIT'); + } + + Future _onDbChanged() async { + _allChanges.add(null); + for (final entry in _watches.entries.toList()) { + await _emitQuery(entry.key, entry.value); + } + } + + Future _emitQuery( + StreamController>> controller, + _Query query) async { + final result = + await _db.rawQuery(query.sql, query.args?.map(_encode).toList()); + if (!controller.isClosed) { + controller.add(result); + } + } +} + +Object? _encode(Object? value) { + if (value == null) return null; + if (value is Map) return jsonEncode(value); + if (value is Enum) return value.name; + + switch (value.runtimeType) { + case String: + case int: + case double: + return value; + case bool: + return (value as bool) ? 1 : 0; + case DateTime: + return (value as DateTime).toUtc().toIso8601String(); + case Hlc: + return value.toString(); + default: + throw 'Unsupported type: ${value.runtimeType}'; + } +} + +Future> _getTableColumns(Database db, String table) async => + (await db.rawQuery('SELECT name, pk FROM pragma_table_info(?1)', [table])) + .map((e) => _Column(e['name'] as String, e['pk'] != 0)); + +class _Column { + final String name; + final bool isPrimaryKey; + + _Column(this.name, this.isPrimaryKey); + + @override + String toString() => '$name${isPrimaryKey ? ' [PK]' : ''}'; +} + +class _Query { + final String sql; + final List? args; + + _Query(this.sql, this.args); +} diff --git a/lib/src/util/uuid.dart b/lib/src/util/uuid.dart new file mode 100644 index 0000000..3fc5797 --- /dev/null +++ b/lib/src/util/uuid.dart @@ -0,0 +1,5 @@ +import 'package:uuid/uuid.dart'; + +Uuid _uuid = const Uuid(); + +String uuid() => _uuid.v4(); diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 0000000..f7913ab --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,18 @@ +name: sqlite_crdt +description: A CRDT backed by a Sqlite database. +version: 0.0.1 +homepage: https://github.com/cachapa/sqlite_crdt +repository: https://github.com/cachapa/sqlite_crdt +issue_tracker: https://github.com/cachapa/sqlite_crdt/issues + +environment: + sdk: '>=2.19.0-374.1.beta <3.0.0' + +dependencies: + sqflite_common: ^2.4.0+2 + sqflite_common_ffi: ^2.2.0+1 + uuid: ^3.0.7 + +dev_dependencies: + lints: any + test: any