Skip to content

Commit

Permalink
Do not load _everything_ in memory for bookkeeping (#189)
Browse files Browse the repository at this point in the history
* create a new __corro_bookkeeping_gaps table containing needed version gaps, this should speed up boot time

* possibly fix tests, using a large max seq was a bad idea

* fix typo, get rid of process_version function

* more typo fixes

* fix test, painstakingly, forgot to allow sending partials

* give CI a bit more time

* use INNER JOIN for query instead of subquery

* deadlock situation when trying to persist

* fix warnings

* send empties as empties, add a test for that

* rename a field and add a few comments

* actually persist as we go instead of once in a while, that was going to be a big problem

* log how long it took to load bookkeeping, reduce default apply queue length to 200

* actually add tests and fix the gaps collection

* add tests for initial gap being updated

* maybe try collapsing ranges

* for now, insert or ignore

* I'm not sure if this is better

* hold onto the lock between storing in db and in memory

* may or may not be better, it should at least give consistent results

* better diagnose locks held too long

* revolution: process empties in hot path

* process empties by switching a version's end_version to not-null

* Revert "process empties by switching a version's end_version to not-null"

This reverts commit 91e2689.

* remove logic to process empties async, we don't do that anymore

* log gaps in bookkeeping when failing to fully sync

* site_id -> actor_id

* sync a lot more often in test builds

* count an empty version received as a change len of 1 instead of 0

* does that work

* split in smaller chunks large empty version ranges

* possibly much faster query to store empties

* bring back huge empties handling

* set len back to 0 when the changes are empty

* timeout syncs after 5 minutes so we don't get stuck not processing any buffered changes

* process changes in chunks based on 'cost' and not the number of changes (though that also informs it)

* don't send partials that have the full range of versions

* add command to lock the DB and run a command while it is locked

* exit with the same code as the command

* don't create the table if it exists, this is a special case...

* much faster initial query to build small-but-still-here in-memory bookkeeping
  • Loading branch information
jeromegn authored Apr 17, 2024
1 parent 4b0a94e commit 8b3ef0e
Show file tree
Hide file tree
Showing 23 changed files with 1,629 additions and 1,162 deletions.
15 changes: 11 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ quinn-proto = "0.10.5"
quinn-plaintext = { version = "0.2.0" }
quoted-string = "0.6.1"
rand = { version = "0.8.5", features = ["small_rng"] }
rangemap = { version = "1.4.0", features = ["serde1"] }
rangemap = { version = "1.5.1", features = ["serde1"] }
rcgen = { version = "0.11.1", features = ["x509-parser"] }
rhai = { version = "1.15.1", features = ["sync"] }
rusqlite = { version = "0.30.0", features = ["serde_json", "time", "bundled", "uuid", "array", "load_extension", "column_decltype", "vtab", "functions", "chrono"] }
Expand Down Expand Up @@ -77,7 +77,7 @@ tracing-filter = { version = "0.1.0-alpha.2", features = ["smallvec"] }
tracing-opentelemetry = { version = "0.21.0", default-features = false, features = ["tracing-log"]}
tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] }
trust-dns-resolver = "0.22.0"
uhlc = { version = "0.6.3", features = ["defmt"] }
uhlc = { version = "0.7", features = ["defmt"] }
uuid = { version = "1.3.1", features = ["v4", "serde"] }
webpki = { version = "0.22.0", features = ["std"] }
http = { version = "0.2.9" }
Expand Down
52 changes: 38 additions & 14 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use camino::Utf8PathBuf;
use corro_agent::agent::clear_overwritten_versions;
use corro_types::{
actor::{ActorId, ClusterId},
agent::{Agent, Bookie, KnownVersion, LockKind, LockMeta, LockState},
base::Version,
broadcast::{FocaCmd, FocaInput},
agent::{Agent, Bookie, LockKind, LockMeta, LockState},
base::{CrsqlDbVersion, CrsqlSeq, Version},
broadcast::{FocaCmd, FocaInput, Timestamp},
sqlite::SqlitePoolError,
sync::generate_sync,
};
use futures::{SinkExt, TryStreamExt};
use rusqlite::{named_params, OptionalExtension};
use serde::{Deserialize, Serialize};
use serde_json::json;
use spawn::spawn_counted;
Expand Down Expand Up @@ -388,7 +389,7 @@ async fn handle_conn(
send_success(&mut stream).await;
}
Command::Actor(ActorCommand::Version { actor_id, version }) => {
let json = {
let json: Result<serde_json::Value, rusqlite::Error> = {
let bookie = bookie.read("admin actor version").await;
let booked = match bookie.get(&actor_id) {
Some(booked) => booked,
Expand All @@ -399,17 +400,40 @@ async fn handle_conn(
}
};
let booked_read = booked.read("admin actor version booked").await;
match booked_read.get(&version) {
Some(known) => match known {
KnownVersion::Cleared => {
Ok(serde_json::Value::String("cleared".into()))
if booked_read.contains_version(&version) {
match booked_read.get_partial(&version) {
Some(partial) => {
Ok(serde_json::json!({"partial": partial}))
// serde_json::to_value(partial)
// .map(|v| serde_json::json!({"partial": v}))
},
None => {
match agent.pool().read().await {
Ok(conn) => match conn.prepare_cached("SELECT db_version, last_seq, ts FROM __corro_bookkeeping WHERE actor_id = :actor_id AND start_version = :version") {
Ok(mut prepped) => match prepped.query_row(named_params! {":actor_id": actor_id, ":version": version}, |row| Ok((row.get::<_, Option<CrsqlDbVersion>>(0)?, row.get::<_, Option<CrsqlSeq>>(1)?, row.get::<_, Option<Timestamp>>(2)?))).optional() {
Ok(Some((Some(db_version), Some(last_seq), Some(ts)))) => {
Ok(serde_json::json!({"current": {"db_version": db_version, "last_seq": last_seq, "ts": ts}}))
},
Ok(_) => {
Ok(serde_json::Value::String("cleared".into()))
}
Err(e) => {
Err(e)
}
},
Err(e) => {
Err(e)
}
},
Err(e) => {
_ = send_error(&mut stream, e).await;
continue;
}
}
}
KnownVersion::Current(known) => serde_json::to_value(known)
.map(|v| serde_json::json!({"current": v})),
KnownVersion::Partial(known) => serde_json::to_value(known)
.map(|v| serde_json::json!({"partial": v})),
},
None => Ok(serde_json::Value::Null),
}
} else {
Ok(serde_json::Value::Null)
}
};

Expand Down
Loading

0 comments on commit 8b3ef0e

Please sign in to comment.