Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reactive queries by triggering them only on transactions after commit #163

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 94 additions & 75 deletions cpp/DBHostObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,49 @@
#include "macros.h"
#include "utils.h"
#include <iostream>
#include <utility>

namespace opsqlite {

namespace jsi = facebook::jsi;
namespace react = facebook::react;

#ifndef OP_SQLITE_USE_LIBSQL
#ifdef OP_SQLITE_USE_LIBSQL
void DBHostObject::flush_pending_reactive_queries() {
// intentionally left blank
}
#else
void DBHostObject::flush_pending_reactive_queries() {
for (const auto &query_ptr : pending_reactive_queries) {
auto query = query_ptr.get();

std::vector<DumbHostObject> results;
std::shared_ptr<std::vector<SmartHostObject>> metadata =
std::make_shared<std::vector<SmartHostObject>>();

auto status = opsqlite_execute_prepared_statement(db_name, query->stmt,
&results, metadata);

if (status.type == SQLiteError) {
invoker->invokeAsync(
[this, callback = query->callback, status = std::move(status)] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromUtf8(rt, status.message));
callback->asObject(rt).asFunction(rt).call(rt, error);
});
} else {
invoker->invokeAsync(
[this,
results = std::make_shared<std::vector<DumbHostObject>>(results),
callback = query->callback, metadata, status = std::move(status)] {
auto jsiResult = createResult(rt, status, results.get(), metadata);
callback->asObject(rt).asFunction(rt).call(rt, jsiResult);
});
}
}
}

void DBHostObject::auto_register_update_hook() {
if (update_hook_callback == nullptr && reactive_queries.empty() &&
is_update_hook_registered) {
Expand All @@ -31,19 +67,17 @@ void DBHostObject::auto_register_update_hook() {
auto hook = [this](std::string name, std::string table_name,
std::string operation, int rowid) {
if (update_hook_callback != nullptr) {
jsCallInvoker->invokeAsync(
[this,
callback = update_hook_callback, table_name,
operation = std::move(operation), rowid] {
auto res = jsi::Object(rt);
res.setProperty(rt, "table",
jsi::String::createFromUtf8(rt, table_name));
res.setProperty(rt, "operation",
jsi::String::createFromUtf8(rt, operation));
res.setProperty(rt, "rowId", jsi::Value(rowid));

callback->asObject(rt).asFunction(rt).call(rt, res);
});
invoker->invokeAsync([this, callback = update_hook_callback, table_name,
operation = std::move(operation), rowid] {
auto res = jsi::Object(rt);
res.setProperty(rt, "table",
jsi::String::createFromUtf8(rt, table_name));
res.setProperty(rt, "operation",
jsi::String::createFromUtf8(rt, operation));
res.setProperty(rt, "rowId", jsi::Value(rowid));

callback->asObject(rt).asFunction(rt).call(rt, res);
});
}

for (const auto &query_ptr : reactive_queries) {
Expand All @@ -55,54 +89,28 @@ void DBHostObject::auto_register_update_hook() {
bool shouldFire = false;

for (const auto &discriminator : query->discriminators) {
// Tables don't match then skip
if (discriminator.table != table_name) {
continue;
}
// Table has matched

// If no ids are specified, then we should fire
if (discriminator.ids.size() == 0) {
shouldFire = true;
break;
} else { // If ids are specified, then we should check if the rowId
// matches
for (const auto &discrimator_id : discriminator.ids) {
if (rowid == discrimator_id) {
shouldFire = true;
break;
}
}
}
}

if (!shouldFire) {
continue;
// If ids are specified, then we should check if the rowId matches
for (const auto &discrimator_id : discriminator.ids) {
if (rowid == discrimator_id) {
shouldFire = true;
break;
}
}
}

std::vector<DumbHostObject> results;
std::shared_ptr<std::vector<SmartHostObject>> metadata =
std::make_shared<std::vector<SmartHostObject>>();

auto status = opsqlite_execute_prepared_statement(db_name, query->stmt,
&results, metadata);

if (status.type == SQLiteError) {
jsCallInvoker->invokeAsync(
[this, callback = query->callback, status = std::move(status)] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromUtf8(rt, status.message));
callback->asObject(rt).asFunction(rt).call(rt, error);
});
} else {
jsCallInvoker->invokeAsync(
[this,
results = std::make_shared<std::vector<DumbHostObject>>(results),
callback = query->callback, metadata, status = std::move(status)] {
auto jsiResult =
createResult(rt, status, results.get(), metadata);
callback->asObject(rt).asFunction(rt).call(rt, jsiResult);
});
if (shouldFire) {
pending_reactive_queries.insert(query_ptr);
}
}
};
Expand All @@ -115,10 +123,10 @@ void DBHostObject::auto_register_update_hook() {
#ifdef OP_SQLITE_USE_LIBSQL
DBHostObject::DBHostObject(jsi::Runtime &rt, std::string &url,
std::string &auth_token,
std::shared_ptr<react::CallInvoker> js_call_invoker,
std::shared_ptr<react::CallInvoker> invoker,
std::shared_ptr<ThreadPool> thread_pool)
: db_name(url), jsCallInvoker(js_call_invoker), thread_pool(thread_pool),
rt(rt) {
: db_name(url), invoker(std::move(invoker)),
thread_pool(std::move(thread_pool)), rt(rt) {
BridgeResult result = opsqlite_libsql_open_remote(url, auth_token);

if (result.type == SQLiteError) {
Expand All @@ -134,8 +142,8 @@ DBHostObject::DBHostObject(jsi::Runtime &rt,
std::string &db_name, std::string &path,
std::string &url, std::string &auth_token,
int sync_interval)
: db_name(db_name), jsCallInvoker(invoker), thread_pool(thread_pool),
rt(rt) {
: db_name(db_name), invoker(std::move(invoker)),
thread_pool(std::move(thread_pool)), rt(rt) {
BridgeResult result =
opsqlite_libsql_open_sync(db_name, path, url, auth_token, sync_interval);

Expand All @@ -149,14 +157,14 @@ DBHostObject::DBHostObject(jsi::Runtime &rt,
#endif

DBHostObject::DBHostObject(jsi::Runtime &rt, std::string &base_path,
std::shared_ptr<react::CallInvoker> jsCallInvoker,
std::shared_ptr<react::CallInvoker> invoker,
std::shared_ptr<ThreadPool> thread_pool,
std::string &db_name, std::string &path,
std::string &crsqlite_path,
std::string &sqlite_vec_path,
std::string &encryption_key)
: base_path(base_path), jsCallInvoker(jsCallInvoker),
thread_pool(thread_pool), db_name(db_name), rt(rt) {
: base_path(base_path), invoker(std::move(invoker)),
thread_pool(std::move(thread_pool)), db_name(db_name), rt(rt) {

#ifdef OP_SQLITE_USE_SQLCIPHER
BridgeResult result = opsqlite_open(db_name, path, crsqlite_path,
Expand Down Expand Up @@ -268,7 +276,7 @@ void DBHostObject::create_jsi_functions() {
if (!location.empty()) {
if (location == ":memory:") {
path = ":memory:";
} else if (location.rfind("/", 0) == 0) {
} else if (location.rfind('/', 0) == 0) {
path = location;
} else {
path = path + "/" + location;
Expand Down Expand Up @@ -304,7 +312,7 @@ void DBHostObject::create_jsi_functions() {
auto reject = std::make_shared<jsi::Value>(rt, args[1]);

auto task = [&rt, this, query, params = std::move(params), resolve,
reject, invoker = this->jsCallInvoker]() {
reject, invoker = this->invoker]() {
try {
std::vector<std::vector<JSVariant>> results;

Expand Down Expand Up @@ -367,7 +375,7 @@ void DBHostObject::create_jsi_functions() {

auto task = [&rt, this, query = std::move(query),
params = std::move(params), resolve, reject,
invoker = this->jsCallInvoker]() {
invoker = this->invoker]() {
try {

#ifdef OP_SQLITE_USE_LIBSQL
Expand Down Expand Up @@ -429,7 +437,7 @@ void DBHostObject::create_jsi_functions() {
auto reject = std::make_shared<jsi::Value>(rt, args[1]);

auto task = [&rt, this, query, params = std::move(params), resolve,
reject, invoker = this->jsCallInvoker]() {
reject, invoker = this->invoker]() {
try {
std::vector<DumbHostObject> results;
std::shared_ptr<std::vector<SmartHostObject>> metadata =
Expand Down Expand Up @@ -525,8 +533,8 @@ void DBHostObject::create_jsi_functions() {
return;
}

jsCallInvoker->invokeAsync([&rt, batchResult = std::move(batchResult),
resolve, reject] {
invoker->invokeAsync([&rt, batchResult = std::move(batchResult),
resolve, reject] {
if (batchResult.type == SQLiteOk) {
auto res = jsi::Object(rt);
res.setProperty(rt, "rowsAffected",
Expand All @@ -541,7 +549,7 @@ void DBHostObject::create_jsi_functions() {
});
} catch (std::exception &exc) {
auto what = exc.what();
jsCallInvoker->invokeAsync([&rt, what = std::move(what), reject] {
invoker->invokeAsync([&rt, what = std::move(what), reject] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromAscii(rt, what));
Expand Down Expand Up @@ -585,8 +593,8 @@ void DBHostObject::create_jsi_functions() {
try {
const auto result = importSQLFile(db_name, sqlFileName);

jsCallInvoker->invokeAsync([&rt, result = std::move(result), resolve,
reject] {
invoker->invokeAsync([&rt, result = std::move(result), resolve,
reject] {
if (result.type == SQLiteOk) {
auto res = jsi::Object(rt);
res.setProperty(rt, "rowsAffected",
Expand All @@ -602,7 +610,7 @@ void DBHostObject::create_jsi_functions() {
});
} catch (std::exception &exc) {
auto what = exc.what();
jsCallInvoker->invokeAsync([&rt, what = std::move(what), reject] {
invoker->invokeAsync([&rt, what = std::move(what), reject] {
auto errorCtr = rt.global().getPropertyAsFunction(rt, "Error");
auto error = errorCtr.callAsConstructor(
rt, jsi::String::createFromAscii(rt, what));
Expand Down Expand Up @@ -643,7 +651,7 @@ void DBHostObject::create_jsi_functions() {
commit_hook_callback = callback;

auto hook = [&rt, this, callback](std::string dbName) {
jsCallInvoker->invokeAsync(
invoker->invokeAsync(
[&rt, callback] { callback->asObject(rt).asFunction(rt).call(rt); });
};

Expand All @@ -667,7 +675,7 @@ void DBHostObject::create_jsi_functions() {
rollback_hook_callback = callback;

auto hook = [&rt, this, callback](std::string db_name) {
jsCallInvoker->invokeAsync(
invoker->invokeAsync(
[&rt, callback] { callback->asObject(rt).asFunction(rt).call(rt); });
};

Expand Down Expand Up @@ -772,8 +780,8 @@ void DBHostObject::create_jsi_functions() {
sqlite3_stmt *statement = opsqlite_prepare_statement(db_name, query);
#endif
auto preparedStatementHostObject =
std::make_shared<PreparedStatementHostObject>(
db_name, statement, jsCallInvoker, thread_pool);
std::make_shared<PreparedStatementHostObject>(db_name, statement,
invoker, thread_pool);

return jsi::Object::createFromHostObject(rt, preparedStatementHostObject);
});
Expand Down Expand Up @@ -801,6 +809,12 @@ void DBHostObject::create_jsi_functions() {
return jsi::String::createFromUtf8(rt, result);
});

auto flush_pending_reactive_queries_js =
HOSTFN("flushPendingReactiveQueries") {
flush_pending_reactive_queries();
return {};
});

function_map["attach"] = std::move(attach);
function_map["detach"] = std::move(detach);
function_map["close"] = std::move(close);
Expand All @@ -811,6 +825,8 @@ void DBHostObject::create_jsi_functions() {
function_map["executeBatch"] = std::move(execute_batch);
function_map["prepareStatement"] = std::move(prepare_statement);
function_map["getDbPath"] = std::move(get_db_path);
function_map["flushPendingReactiveQueries"] =
std::move(flush_pending_reactive_queries_js);
#ifdef OP_SQLITE_USE_LIBSQL
function_map["sync"] = std::move(sync);
#else
Expand All @@ -833,6 +849,12 @@ jsi::Value DBHostObject::get(jsi::Runtime &rt,
const jsi::PropNameID &propNameID) {

auto name = propNameID.utf8(rt);
if (name == "execute") {
return jsi::Value(rt, function_map["execute"]);
}
if (name == "flushPendingReactiveQueries") {
return jsi::Value(rt, function_map["flushPendingReactiveQueries"]);
}
if (name == "attach") {
return jsi::Value(rt, function_map["attach"]);
}
Expand All @@ -845,9 +867,6 @@ jsi::Value DBHostObject::get(jsi::Runtime &rt,
if (name == "executeRaw") {
return jsi::Value(rt, function_map["executeRaw"]);
}
if (name == "execute") {
return jsi::Value(rt, function_map["execute"]);
}
if (name == "executeWithHostObjects") {
return jsi::Value(rt, function_map["executeWithHostObjects"]);
}
Expand Down
Loading
Loading