diff --git a/cpp/DBHostObject.cpp b/cpp/DBHostObject.cpp index f059043b..dfc81fe0 100644 --- a/cpp/DBHostObject.cpp +++ b/cpp/DBHostObject.cpp @@ -9,13 +9,49 @@ #include "macros.h" #include "utils.h" #include +#include namespace opsqlite { namespace jsi = facebook::jsi; namespace react = facebook::react; -#ifndef OP_SQLITE_USE_LIBSQL +#ifdef OP_SQLITE_USE_LIBSQL +void DBHostObject::flush_pending_reactive_queries() { + // intentionally left blank +} +#else +void DBHostObject::flush_pending_reactive_queries() { + for (const auto &query_ptr : pending_reactive_queries) { + auto query = query_ptr.get(); + + std::vector results; + std::shared_ptr> metadata = + std::make_shared>(); + + auto status = opsqlite_execute_prepared_statement(db_name, query->stmt, + &results, metadata); + + if (status.type == SQLiteError) { + invoker->invokeAsync( + [this, callback = query->callback, status = std::move(status)] { + auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error"); + auto error = errorCtr.callAsConstructor( + rt, jsi::String::createFromUtf8(rt, status.message)); + callback->asObject(rt).asFunction(rt).call(rt, error); + }); + } else { + invoker->invokeAsync( + [this, + results = std::make_shared>(results), + callback = query->callback, metadata, status = std::move(status)] { + auto jsiResult = createResult(rt, status, results.get(), metadata); + callback->asObject(rt).asFunction(rt).call(rt, jsiResult); + }); + } + } +} + void DBHostObject::auto_register_update_hook() { if (update_hook_callback == nullptr && reactive_queries.empty() && is_update_hook_registered) { @@ -31,19 +67,17 @@ void DBHostObject::auto_register_update_hook() { auto hook = [this](std::string name, std::string table_name, std::string operation, int rowid) { if (update_hook_callback != nullptr) { - jsCallInvoker->invokeAsync( - [this, - callback = update_hook_callback, table_name, - operation = std::move(operation), rowid] { - auto res = jsi::Object(rt); - res.setProperty(rt, "table", - jsi::String::createFromUtf8(rt, table_name)); - res.setProperty(rt, "operation", - jsi::String::createFromUtf8(rt, operation)); - res.setProperty(rt, "rowId", jsi::Value(rowid)); - - callback->asObject(rt).asFunction(rt).call(rt, res); - }); + invoker->invokeAsync([this, callback = update_hook_callback, table_name, + operation = std::move(operation), rowid] { + auto res = jsi::Object(rt); + res.setProperty(rt, "table", + jsi::String::createFromUtf8(rt, table_name)); + res.setProperty(rt, "operation", + jsi::String::createFromUtf8(rt, operation)); + res.setProperty(rt, "rowId", jsi::Value(rowid)); + + callback->asObject(rt).asFunction(rt).call(rt, res); + }); } for (const auto &query_ptr : reactive_queries) { @@ -55,54 +89,28 @@ void DBHostObject::auto_register_update_hook() { bool shouldFire = false; for (const auto &discriminator : query->discriminators) { + // Tables don't match then skip if (discriminator.table != table_name) { continue; } - // Table has matched // If no ids are specified, then we should fire if (discriminator.ids.size() == 0) { shouldFire = true; break; - } else { // If ids are specified, then we should check if the rowId - // matches - for (const auto &discrimator_id : discriminator.ids) { - if (rowid == discrimator_id) { - shouldFire = true; - break; - } - } } - } - if (!shouldFire) { - continue; + // If ids are specified, then we should check if the rowId matches + for (const auto &discrimator_id : discriminator.ids) { + if (rowid == discrimator_id) { + shouldFire = true; + break; + } + } } - std::vector results; - std::shared_ptr> metadata = - std::make_shared>(); - - auto status = opsqlite_execute_prepared_statement(db_name, query->stmt, - &results, metadata); - - if (status.type == SQLiteError) { - jsCallInvoker->invokeAsync( - [this, callback = query->callback, status = std::move(status)] { - auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error"); - auto error = errorCtr.callAsConstructor( - rt, jsi::String::createFromUtf8(rt, status.message)); - callback->asObject(rt).asFunction(rt).call(rt, error); - }); - } else { - jsCallInvoker->invokeAsync( - [this, - results = std::make_shared>(results), - callback = query->callback, metadata, status = std::move(status)] { - auto jsiResult = - createResult(rt, status, results.get(), metadata); - callback->asObject(rt).asFunction(rt).call(rt, jsiResult); - }); + if (shouldFire) { + pending_reactive_queries.insert(query_ptr); } } }; @@ -115,10 +123,10 @@ void DBHostObject::auto_register_update_hook() { #ifdef OP_SQLITE_USE_LIBSQL DBHostObject::DBHostObject(jsi::Runtime &rt, std::string &url, std::string &auth_token, - std::shared_ptr js_call_invoker, + std::shared_ptr invoker, std::shared_ptr thread_pool) - : db_name(url), jsCallInvoker(js_call_invoker), thread_pool(thread_pool), - rt(rt) { + : db_name(url), invoker(std::move(invoker)), + thread_pool(std::move(thread_pool)), rt(rt) { BridgeResult result = opsqlite_libsql_open_remote(url, auth_token); if (result.type == SQLiteError) { @@ -134,8 +142,8 @@ DBHostObject::DBHostObject(jsi::Runtime &rt, std::string &db_name, std::string &path, std::string &url, std::string &auth_token, int sync_interval) - : db_name(db_name), jsCallInvoker(invoker), thread_pool(thread_pool), - rt(rt) { + : db_name(db_name), invoker(std::move(invoker)), + thread_pool(std::move(thread_pool)), rt(rt) { BridgeResult result = opsqlite_libsql_open_sync(db_name, path, url, auth_token, sync_interval); @@ -149,14 +157,14 @@ DBHostObject::DBHostObject(jsi::Runtime &rt, #endif DBHostObject::DBHostObject(jsi::Runtime &rt, std::string &base_path, - std::shared_ptr jsCallInvoker, + std::shared_ptr invoker, std::shared_ptr thread_pool, std::string &db_name, std::string &path, std::string &crsqlite_path, std::string &sqlite_vec_path, std::string &encryption_key) - : base_path(base_path), jsCallInvoker(jsCallInvoker), - thread_pool(thread_pool), db_name(db_name), rt(rt) { + : base_path(base_path), invoker(std::move(invoker)), + thread_pool(std::move(thread_pool)), db_name(db_name), rt(rt) { #ifdef OP_SQLITE_USE_SQLCIPHER BridgeResult result = opsqlite_open(db_name, path, crsqlite_path, @@ -268,7 +276,7 @@ void DBHostObject::create_jsi_functions() { if (!location.empty()) { if (location == ":memory:") { path = ":memory:"; - } else if (location.rfind("/", 0) == 0) { + } else if (location.rfind('/', 0) == 0) { path = location; } else { path = path + "/" + location; @@ -304,7 +312,7 @@ void DBHostObject::create_jsi_functions() { auto reject = std::make_shared(rt, args[1]); auto task = [&rt, this, query, params = std::move(params), resolve, - reject, invoker = this->jsCallInvoker]() { + reject, invoker = this->invoker]() { try { std::vector> results; @@ -367,7 +375,7 @@ void DBHostObject::create_jsi_functions() { auto task = [&rt, this, query = std::move(query), params = std::move(params), resolve, reject, - invoker = this->jsCallInvoker]() { + invoker = this->invoker]() { try { #ifdef OP_SQLITE_USE_LIBSQL @@ -429,7 +437,7 @@ void DBHostObject::create_jsi_functions() { auto reject = std::make_shared(rt, args[1]); auto task = [&rt, this, query, params = std::move(params), resolve, - reject, invoker = this->jsCallInvoker]() { + reject, invoker = this->invoker]() { try { std::vector results; std::shared_ptr> metadata = @@ -525,8 +533,8 @@ void DBHostObject::create_jsi_functions() { return; } - jsCallInvoker->invokeAsync([&rt, batchResult = std::move(batchResult), - resolve, reject] { + invoker->invokeAsync([&rt, batchResult = std::move(batchResult), + resolve, reject] { if (batchResult.type == SQLiteOk) { auto res = jsi::Object(rt); res.setProperty(rt, "rowsAffected", @@ -541,7 +549,7 @@ void DBHostObject::create_jsi_functions() { }); } catch (std::exception &exc) { auto what = exc.what(); - jsCallInvoker->invokeAsync([&rt, what = std::move(what), reject] { + invoker->invokeAsync([&rt, what = std::move(what), reject] { auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error"); auto error = errorCtr.callAsConstructor( rt, jsi::String::createFromAscii(rt, what)); @@ -585,8 +593,8 @@ void DBHostObject::create_jsi_functions() { try { const auto result = importSQLFile(db_name, sqlFileName); - jsCallInvoker->invokeAsync([&rt, result = std::move(result), resolve, - reject] { + invoker->invokeAsync([&rt, result = std::move(result), resolve, + reject] { if (result.type == SQLiteOk) { auto res = jsi::Object(rt); res.setProperty(rt, "rowsAffected", @@ -602,7 +610,7 @@ void DBHostObject::create_jsi_functions() { }); } catch (std::exception &exc) { auto what = exc.what(); - jsCallInvoker->invokeAsync([&rt, what = std::move(what), reject] { + invoker->invokeAsync([&rt, what = std::move(what), reject] { auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error"); auto error = errorCtr.callAsConstructor( rt, jsi::String::createFromAscii(rt, what)); @@ -643,7 +651,7 @@ void DBHostObject::create_jsi_functions() { commit_hook_callback = callback; auto hook = [&rt, this, callback](std::string dbName) { - jsCallInvoker->invokeAsync( + invoker->invokeAsync( [&rt, callback] { callback->asObject(rt).asFunction(rt).call(rt); }); }; @@ -667,7 +675,7 @@ void DBHostObject::create_jsi_functions() { rollback_hook_callback = callback; auto hook = [&rt, this, callback](std::string db_name) { - jsCallInvoker->invokeAsync( + invoker->invokeAsync( [&rt, callback] { callback->asObject(rt).asFunction(rt).call(rt); }); }; @@ -772,8 +780,8 @@ void DBHostObject::create_jsi_functions() { sqlite3_stmt *statement = opsqlite_prepare_statement(db_name, query); #endif auto preparedStatementHostObject = - std::make_shared( - db_name, statement, jsCallInvoker, thread_pool); + std::make_shared(db_name, statement, + invoker, thread_pool); return jsi::Object::createFromHostObject(rt, preparedStatementHostObject); }); @@ -801,6 +809,12 @@ void DBHostObject::create_jsi_functions() { return jsi::String::createFromUtf8(rt, result); }); + auto flush_pending_reactive_queries_js = + HOSTFN("flushPendingReactiveQueries") { + flush_pending_reactive_queries(); + return {}; + }); + function_map["attach"] = std::move(attach); function_map["detach"] = std::move(detach); function_map["close"] = std::move(close); @@ -811,6 +825,8 @@ void DBHostObject::create_jsi_functions() { function_map["executeBatch"] = std::move(execute_batch); function_map["prepareStatement"] = std::move(prepare_statement); function_map["getDbPath"] = std::move(get_db_path); + function_map["flushPendingReactiveQueries"] = + std::move(flush_pending_reactive_queries_js); #ifdef OP_SQLITE_USE_LIBSQL function_map["sync"] = std::move(sync); #else @@ -833,6 +849,12 @@ jsi::Value DBHostObject::get(jsi::Runtime &rt, const jsi::PropNameID &propNameID) { auto name = propNameID.utf8(rt); + if (name == "execute") { + return jsi::Value(rt, function_map["execute"]); + } + if (name == "flushPendingReactiveQueries") { + return jsi::Value(rt, function_map["flushPendingReactiveQueries"]); + } if (name == "attach") { return jsi::Value(rt, function_map["attach"]); } @@ -845,9 +867,6 @@ jsi::Value DBHostObject::get(jsi::Runtime &rt, if (name == "executeRaw") { return jsi::Value(rt, function_map["executeRaw"]); } - if (name == "execute") { - return jsi::Value(rt, function_map["execute"]); - } if (name == "executeWithHostObjects") { return jsi::Value(rt, function_map["executeWithHostObjects"]); } diff --git a/cpp/DBHostObject.h b/cpp/DBHostObject.h index dcb694c5..cb059cbb 100644 --- a/cpp/DBHostObject.h +++ b/cpp/DBHostObject.h @@ -4,16 +4,22 @@ #include "sqlite3.h" #include "types.h" #include -#include #include #include #include +#include namespace opsqlite { namespace jsi = facebook::jsi; namespace react = facebook::react; +struct PendingReactiveInvocation { + std::string db_name; + std::string table; + std::string rowid; +}; + struct TableRowDiscriminator { std::string table; std::vector ids; @@ -29,7 +35,7 @@ class JSI_EXPORT DBHostObject : public jsi::HostObject { public: // Constructor for local databases DBHostObject(jsi::Runtime &rt, std::string &base_path, - std::shared_ptr js_call_invoker, + std::shared_ptr invoker, std::shared_ptr thread_pool, std::string &db_name, std::string &path, std::string &crsqlite_path, std::string &sqlite_vec_path, std::string &encryption_key); @@ -37,7 +43,7 @@ class JSI_EXPORT DBHostObject : public jsi::HostObject { #ifdef OP_SQLITE_USE_LIBSQL // Constructor for remoteOpen, purely for remote databases DBHostObject(jsi::Runtime &rt, std::string &url, std::string &auth_token, - std::shared_ptr js_call_invoker, + std::shared_ptr invoker, std::shared_ptr thread_pool); // Constructor for a local database with remote sync @@ -55,13 +61,15 @@ class JSI_EXPORT DBHostObject : public jsi::HostObject { ~DBHostObject(); private: + std::set> pending_reactive_queries; void auto_register_update_hook(); void create_jsi_functions(); + void flush_pending_reactive_queries(); std::unordered_map function_map; std::string base_path; std::shared_ptr update_hook; - std::shared_ptr jsCallInvoker; + std::shared_ptr invoker; std::shared_ptr thread_pool; std::string db_name; std::shared_ptr update_hook_callback; @@ -69,6 +77,7 @@ class JSI_EXPORT DBHostObject : public jsi::HostObject { std::shared_ptr rollback_hook_callback; jsi::Runtime &rt; std::vector> reactive_queries; + std::vector pending_reactive_invocations; bool is_update_hook_registered = false; bool invalidated = false; }; diff --git a/example/ios/Podfile.lock b/example/ios/Podfile.lock index 2d6c9c48..723294b3 100644 --- a/example/ios/Podfile.lock +++ b/example/ios/Podfile.lock @@ -10,7 +10,7 @@ PODS: - hermes-engine (0.74.0): - hermes-engine/Pre-built (= 0.74.0) - hermes-engine/Pre-built (0.74.0) - - op-sqlite (9.1.3): + - op-sqlite (9.2.1): - React - React-callinvoker - React-Core @@ -1393,7 +1393,7 @@ SPEC CHECKSUMS: GCDWebServer: 2c156a56c8226e2d5c0c3f208a3621ccffbe3ce4 glog: c5d68082e772fa1c511173d6b30a9de2c05a69a2 hermes-engine: 6eae7edb2f563ee41d7c1f91f4f2e57c26d8a5c3 - op-sqlite: efd0f6f9aa3215353b825625d4b132d5951e168e + op-sqlite: de7f4da4de0217c70e41bf0695967070ad6561d9 RCT-Folly: 045d6ecaa59d826c5736dfba0b2f4083ff8d79df RCTDeprecation: 3ca8b6c36bfb302e1895b72cfe7db0de0c92cd47 RCTRequired: 9fc183af555fd0c89a366c34c1ae70b7e03b1dc5 diff --git a/example/package.json b/example/package.json index 800d2073..cdcf34ed 100644 --- a/example/package.json +++ b/example/package.json @@ -68,7 +68,7 @@ "performanceMode": "2", "iosSqlite": false, "fts5": true, - "libsql": false, + "libsql": true, "sqliteVec": true } } diff --git a/example/src/tests/dbsetup.spec.ts b/example/src/tests/dbsetup.spec.ts index 4985526a..e5bd1e64 100644 --- a/example/src/tests/dbsetup.spec.ts +++ b/example/src/tests/dbsetup.spec.ts @@ -16,8 +16,8 @@ let expect = chai.expect; const expectedVersion = isLibsql() ? '3.45.1' : isSQLCipher() - ? '3.44.2' - : '3.45.1'; + ? '3.46.1' + : '3.46.1'; // const expectedSqliteVecVersion = 'v0.1.2-alpha.7'; diff --git a/example/src/tests/hooks.spec.ts b/example/src/tests/hooks.spec.ts index 34d155ce..d51b4b5f 100644 --- a/example/src/tests/hooks.spec.ts +++ b/example/src/tests/hooks.spec.ts @@ -2,7 +2,7 @@ import Chance from 'chance'; import {type DB, open, isLibsql} from '@op-engineering/op-sqlite'; import chai from 'chai'; -import {describe, it, beforeEach, afterAll, itOnly} from './MochaRNAdapter'; +import {describe, it, beforeEach, afterAll} from './MochaRNAdapter'; import {sleep} from './utils'; const expect = chai.expect; @@ -47,7 +47,7 @@ export function registerHooksTests() { return; } - itOnly('update hook', async () => { + it('update hook', async () => { let promiseResolve: any; let promise = new Promise<{ rowId: number; diff --git a/example/src/tests/queries.spec.ts b/example/src/tests/queries.spec.ts index 2e9b0616..17ea3d4e 100644 --- a/example/src/tests/queries.spec.ts +++ b/example/src/tests/queries.spec.ts @@ -396,7 +396,7 @@ export function queriesTests() { expect(res.insertId).to.equal(1); // expect(res.metadata).to.eql([]); expect(res.rows).to.eql([]); - expect(res.rows?.length).to.equal(0); + expect(res.rows.length).to.equal(0); await tx.commit(); diff --git a/example/src/tests/reactive.spec.ts b/example/src/tests/reactive.spec.ts index 283f2d49..f0ec2851 100644 --- a/example/src/tests/reactive.spec.ts +++ b/example/src/tests/reactive.spec.ts @@ -39,6 +39,7 @@ export function reactiveTests() { if (isLibsql()) { return; } + it('Table reactive query', async () => { let fullSelectRan = false; let emittedUser = null; @@ -68,14 +69,18 @@ export function reactiveTests() { }, }); - db.execute( - 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', - [1, 'John', 30, 1000, 'Johnny'], - ); + await db.transaction(async tx => { + await tx.execute( + 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', + [1, 'John', 30, 1000, 'Johnny'], + ); + }); await sleep(20); - db.execute('UPDATE User SET name = ? WHERE id = ?;', ['Foo', 1]); + await db.transaction(async tx => { + await tx.execute('UPDATE User SET name = ? WHERE id = ?;', ['Foo', 1]); + }); await sleep(20); @@ -105,10 +110,12 @@ export function reactiveTests() { expect(unsubscribe).to.be.a('function'); - db.execute( - 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', - [1, 'John', 30, 1000, 'Johnny'], - ); + await db.transaction(async tx => { + await tx.execute( + 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', + [1, 'John', 30, 1000, 'Johnny'], + ); + }); await sleep(20); @@ -166,14 +173,18 @@ export function reactiveTests() { }, }); - db.execute( - 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', - [1, 'John', 30, 1000, 'Johnny'], - ); + await db.transaction(async tx => { + await tx.execute( + 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', + [1, 'John', 30, 1000, 'Johnny'], + ); + }); await sleep(0); - db.execute('UPDATE User SET name = ? WHERE id = ?;', ['Foo', 1]); + await db.transaction(async tx => { + await tx.execute('UPDATE User SET name = ? WHERE id = ?;', ['Foo', 1]); + }); await sleep(0); @@ -217,10 +228,13 @@ export function reactiveTests() { const name = chance.name(); const age = chance.integer(); const networth = chance.floating(); - db.execute( - 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', - [id, name, age, networth, 'Johnny'], - ); + + await db.transaction(async tx => { + await tx.execute( + 'INSERT INTO User (id, name, age, networth, nickname) VALUES (?, ?, ?, ?, ?);', + [id, name, age, networth, 'Johnny'], + ); + }); const operation = await promise; diff --git a/src/index.ts b/src/index.ts index d1ec71d2..7dddcd61 100644 --- a/src/index.ts +++ b/src/index.ts @@ -59,7 +59,7 @@ export type QueryResult = { insertId?: number; rowsAffected: number; res?: any[]; - rows?: any[]; + rows: any[]; // An array of intermediate results, just values without column names rawRows?: any[]; columnNames?: string[]; @@ -178,6 +178,7 @@ export type DB = { callback: (response: any) => void; }) => () => void; sync: () => void; + flushPendingReactiveQueries: () => void; }; type OPSQLiteProxy = { @@ -243,6 +244,7 @@ function enhanceDB(db: DB, options: any): DB { getDbPath: db.getDbPath, reactiveExecute: db.reactiveExecute, sync: db.sync, + flushPendingReactiveQueries: db.flushPendingReactiveQueries, close: () => { db.close(); delete locks[options.url]; @@ -284,7 +286,7 @@ function enhanceDB(db: DB, options: any): DB { let rows: any[] = []; for (let i = 0; i < (intermediateResult.rawRows?.length ?? 0); i++) { let row: any = {}; - for (let j = 0; j < intermediateResult.columnNames!.length ?? 0; j++) { + for (let j = 0; j < intermediateResult.columnNames!.length; j++) { let columnName = intermediateResult.columnNames![j]!; let value = intermediateResult.rawRows![i][j]; @@ -347,6 +349,9 @@ function enhanceDB(db: DB, options: any): DB { ); } const result = await enhancedDb.execute('COMMIT;'); + console.log('BEFORE FLUSH'); + enhancedDb.flushPendingReactiveQueries(); + console.log('AFER FLUSH'); isFinalized = true; return result; }; diff --git a/turbo.json b/turbo.json index e274708e..cde2f601 100644 --- a/turbo.json +++ b/turbo.json @@ -10,6 +10,7 @@ "src/*.tsx", "example/package.json", "example/android", + "cpp", "!example/android/.gradle", "!example/android/build", "!example/android/app/build" @@ -21,6 +22,7 @@ "package.json", "*.podspec", "ios", + "cpp", "src/*.ts", "src/*.tsx", "example/package.json",