From 8b3ef0e235d67ce1029e524fe26234c3944838c2 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Wed, 17 Apr 2024 10:07:34 -0400 Subject: [PATCH] Do not load _everything_ in memory for bookkeeping (#189) * create a new __corro_bookkeeping_gaps table containing needed version gaps, this should speed up boot time * possibly fix tests, using a large max seq was a bad idea * fix typo, get rid of process_version function * more typo fixes * fix test, painstakingly, forgot to allow sending partials * give CI a bit more time * use INNER JOIN for query instead of subquery * deadlock situation when trying to persist * fix warnings * send empties as empties, add a test for that * rename a field and add a few comments * actually persist as we go instead of once in a while, that was going to be a big problem * log how long it took to load bookkeeping, reduce default apply queue length to 200 * actually add tests and fix the gaps collection * add tests for initial gap being updated * maybe try collapsing ranges * for now, insert or ignore * I'm not sure if this is better * hold onto the lock between storing in db and in memory * may or may not be better, it should at least give consistent results * better diagnose locks held too long * revolution: process empties in hot path * process empties by switching a version's end_version to not-null * Revert "process empties by switching a version's end_version to not-null" This reverts commit 91e26899aa63b19dc8eb573151f8bf1c0b685a0f. * remove logic to process empties async, we don't do that anymore * log gaps in bookkeeping when failing to fully sync * site_id -> actor_id * sync a lot more often in test builds * count an empty version received as a change len of 1 instead of 0 * does that work * split in smaller chunks large empty version ranges * possibly much faster query to store empties * bring back huge empties handling * set len back to 0 when the changes are empty * timeout syncs after 5 minutes so we don't get stuck not processing any buffered changes * process changes in chunks based on 'cost' and not the number of changes (though that also informs it) * don't send partials that have the full range of versions * add command to lock the DB and run a command while it is locked * exit with the same code as the command * don't create the table if it exists, this is a special case... * much faster initial query to build small-but-still-here in-memory bookkeeping --- Cargo.lock | 15 +- Cargo.toml | 4 +- crates/corro-admin/src/lib.rs | 52 +- crates/corro-agent/src/agent/handlers.rs | 226 +++-- crates/corro-agent/src/agent/mod.rs | 10 +- crates/corro-agent/src/agent/run_root.rs | 21 +- crates/corro-agent/src/agent/setup.rs | 7 +- crates/corro-agent/src/agent/tests.rs | 307 +++++-- crates/corro-agent/src/agent/util.rs | 908 +++++++++----------- crates/corro-agent/src/api/peer.rs | 485 ++++++----- crates/corro-agent/src/api/public/mod.rs | 42 +- crates/corro-api-types/src/lib.rs | 3 +- crates/corro-devcluster/src/main.rs | 4 +- crates/corro-devcluster/src/topology/mod.rs | 2 +- crates/corro-pg/src/lib.rs | 13 +- crates/corro-types/src/agent.rs | 614 +++++++++---- crates/corro-types/src/broadcast.rs | 12 +- crates/corro-types/src/config.rs | 2 +- crates/corro-types/src/sync.rs | 9 +- crates/corrosion/Cargo.toml | 1 + crates/corrosion/src/command/agent.rs | 4 +- crates/corrosion/src/main.rs | 46 +- crates/sqlite3-restore/src/lib.rs | 4 +- 23 files changed, 1629 insertions(+), 1162 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f00446c1..61013376 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1034,6 +1034,7 @@ dependencies = [ "seahash", "serde", "serde_json", + "shell-words", "shellwords", "spawn", "sqlite3-restore", @@ -3103,9 +3104,9 @@ dependencies = [ [[package]] name = "rangemap" -version = "1.4.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "977b1e897f9d764566891689e642653e5ed90c6895106acd005eb4c1d0203991" +checksum = "f60fcc7d6849342eff22c4350c8b9a989ee8ceabc4b481253e8946b9fe83d684" dependencies = [ "serde", ] @@ -3562,6 +3563,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "shellwords" version = "1.1.0" @@ -4453,9 +4460,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "uhlc" -version = "0.6.3" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1eadef1fa26cbbae1276c46781e8f4d888bdda434779c18ae6c2a0e69991885" +checksum = "99b6df3f3e948b40e20c38a6d1fd6d8f91b3573922fc164e068ad3331560487e" dependencies = [ "defmt", "humantime", diff --git a/Cargo.toml b/Cargo.toml index 6dea2fa2..effb596d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ quinn-proto = "0.10.5" quinn-plaintext = { version = "0.2.0" } quoted-string = "0.6.1" rand = { version = "0.8.5", features = ["small_rng"] } -rangemap = { version = "1.4.0", features = ["serde1"] } +rangemap = { version = "1.5.1", features = ["serde1"] } rcgen = { version = "0.11.1", features = ["x509-parser"] } rhai = { version = "1.15.1", features = ["sync"] } rusqlite = { version = "0.30.0", features = ["serde_json", "time", "bundled", "uuid", "array", "load_extension", "column_decltype", "vtab", "functions", "chrono"] } @@ -77,7 +77,7 @@ tracing-filter = { version = "0.1.0-alpha.2", features = ["smallvec"] } tracing-opentelemetry = { version = "0.21.0", default-features = false, features = ["tracing-log"]} tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] } trust-dns-resolver = "0.22.0" -uhlc = { version = "0.6.3", features = ["defmt"] } +uhlc = { version = "0.7", features = ["defmt"] } uuid = { version = "1.3.1", features = ["v4", "serde"] } webpki = { version = "0.22.0", features = ["std"] } http = { version = "0.2.9" } diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index dcf530a0..061f1173 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -4,13 +4,14 @@ use camino::Utf8PathBuf; use corro_agent::agent::clear_overwritten_versions; use corro_types::{ actor::{ActorId, ClusterId}, - agent::{Agent, Bookie, KnownVersion, LockKind, LockMeta, LockState}, - base::Version, - broadcast::{FocaCmd, FocaInput}, + agent::{Agent, Bookie, LockKind, LockMeta, LockState}, + base::{CrsqlDbVersion, CrsqlSeq, Version}, + broadcast::{FocaCmd, FocaInput, Timestamp}, sqlite::SqlitePoolError, sync::generate_sync, }; use futures::{SinkExt, TryStreamExt}; +use rusqlite::{named_params, OptionalExtension}; use serde::{Deserialize, Serialize}; use serde_json::json; use spawn::spawn_counted; @@ -388,7 +389,7 @@ async fn handle_conn( send_success(&mut stream).await; } Command::Actor(ActorCommand::Version { actor_id, version }) => { - let json = { + let json: Result = { let bookie = bookie.read("admin actor version").await; let booked = match bookie.get(&actor_id) { Some(booked) => booked, @@ -399,17 +400,40 @@ async fn handle_conn( } }; let booked_read = booked.read("admin actor version booked").await; - match booked_read.get(&version) { - Some(known) => match known { - KnownVersion::Cleared => { - Ok(serde_json::Value::String("cleared".into())) + if booked_read.contains_version(&version) { + match booked_read.get_partial(&version) { + Some(partial) => { + Ok(serde_json::json!({"partial": partial})) + // serde_json::to_value(partial) + // .map(|v| serde_json::json!({"partial": v})) + }, + None => { + match agent.pool().read().await { + Ok(conn) => match conn.prepare_cached("SELECT db_version, last_seq, ts FROM __corro_bookkeeping WHERE actor_id = :actor_id AND start_version = :version") { + Ok(mut prepped) => match prepped.query_row(named_params! {":actor_id": actor_id, ":version": version}, |row| Ok((row.get::<_, Option>(0)?, row.get::<_, Option>(1)?, row.get::<_, Option>(2)?))).optional() { + Ok(Some((Some(db_version), Some(last_seq), Some(ts)))) => { + Ok(serde_json::json!({"current": {"db_version": db_version, "last_seq": last_seq, "ts": ts}})) + }, + Ok(_) => { + Ok(serde_json::Value::String("cleared".into())) + } + Err(e) => { + Err(e) + } + }, + Err(e) => { + Err(e) + } + }, + Err(e) => { + _ = send_error(&mut stream, e).await; + continue; + } + } } - KnownVersion::Current(known) => serde_json::to_value(known) - .map(|v| serde_json::json!({"current": v})), - KnownVersion::Partial(known) => serde_json::to_value(known) - .map(|v| serde_json::json!({"partial": v})), - }, - None => Ok(serde_json::Value::Null), + } + } else { + Ok(serde_json::Value::Null) } }; diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index ec655f97..8f76c19b 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -18,7 +18,7 @@ use corro_types::{ actor::{Actor, ActorId}, agent::{Agent, Bookie, SplitPool}, base::CrsqlSeq, - broadcast::{BroadcastInput, BroadcastV1, ChangeSource, ChangeV1, FocaInput}, + broadcast::{BroadcastInput, BroadcastV1, ChangeSource, ChangeV1, Changeset, FocaInput}, channel::CorroReceiver, members::MemberAddedResult, sync::generate_sync, @@ -388,6 +388,16 @@ pub fn spawn_handle_db_cleanup(pool: SplitPool) { }); } +// determine the estimated resource cost of processing a change +fn processing_cost(change: &Changeset) -> usize { + if change.is_empty() { + let versions = change.versions(); + cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20) + } else { + change.len() + } +} + /// Bundle incoming changes to optimise transaction sizes with SQLite /// /// *Performance tradeoff*: introduce latency (with a max timeout) to @@ -403,7 +413,7 @@ pub async fn handle_changes( let max_changes_chunk: usize = agent.config().perf.apply_queue_len; let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new(); let mut buf = vec![]; - let mut count = 0; + let mut buf_cost = 0; const MAX_CONCURRENT: usize = 5; let mut join_set = JoinSet::new(); @@ -419,15 +429,15 @@ pub async fn handle_changes( // complicated loop to process changes efficiently w/ a max concurrency // and a minimum chunk size for bigger and faster SQLite transactions loop { - while count >= max_changes_chunk && join_set.len() < MAX_CONCURRENT { + while buf_cost >= max_changes_chunk && join_set.len() < MAX_CONCURRENT { // we're already bigger than the minimum size of changes batch // so we want to accumulate at least that much and process them // concurrently bvased on MAX_CONCURRENCY - let mut tmp_count = 0; + let mut tmp_cost = 0; while let Some((change, src, queued_at)) = queue.pop_front() { - tmp_count += change.len(); + tmp_cost += processing_cost(&change); buf.push((change, src, queued_at)); - if tmp_count >= max_changes_chunk { + if tmp_cost >= max_changes_chunk { break; } } @@ -436,17 +446,17 @@ pub async fn handle_changes( break; } - debug!(count = %tmp_count, "spawning processing multiple changes from beginning of loop"); + debug!(count = %tmp_cost, "spawning processing multiple changes from beginning of loop"); join_set.spawn(util::process_multiple_changes( agent.clone(), bookie.clone(), std::mem::take(&mut buf), )); - count -= tmp_count; + buf_cost -= tmp_cost; } - tokio::select! { + let (change, src) = tokio::select! { biased; // process these first, we don't care about the result, @@ -459,152 +469,128 @@ pub async fn handle_changes( continue; }, - Some((change, src)) = rx_changes.recv() => { - let change_len = change.len(); - counter!("corro.agent.changes.recv").increment(std::cmp::max(change_len, 1) as u64); // count empties... - - if change.actor_id == agent.actor_id() { - continue; - } - - if let Some(mut seqs) = change.seqs().cloned() { - let v = *change.versions().start(); - if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) { - if seqs.all(|seq| seen_seqs.contains(&seq)) { - continue; - } - } - } else { - // empty versions - if change.versions().all(|v| seen.contains_key(&(change.actor_id, v))) { - continue; - } - } - - let recv_lag = change - .ts() - .map(|ts| (agent.clock().new_timestamp().get_time() - ts.0).to_duration()); - - if matches!(src, ChangeSource::Broadcast) { - counter!("corro.broadcast.recv.count", "kind" => "change").increment(1); - } - - let booked = { - bookie - .read(format!( - "handle_change(get):{}", - change.actor_id.as_simple() - )) - .await - .get(&change.actor_id) - .cloned() - }; - - if let Some(booked) = booked { - if booked - .read(format!( - "handle_change(contains?):{}", - change.actor_id.as_simple() - )) - .await - .contains_all(change.versions(), change.seqs()) - { - trace!("already seen, stop disseminating"); - continue; - } - } - - if let Some(recv_lag) = recv_lag { - let src_str: &'static str = src.into(); - histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str).record(recv_lag.as_secs_f64()); - } - - // this will only run once for a non-empty changeset - for v in change.versions() { - let entry = seen.entry((change.actor_id, v)).or_default(); - if let Some(seqs) = change.seqs().cloned() { - entry.extend([seqs]); - } - } - - if matches!(src, ChangeSource::Broadcast) && !change.is_empty() { - if let Err(_e) = - agent - .tx_bcast() - .try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change(change.clone()))) - { - debug!("broadcasts are full or done!"); - } - } - - queue.push_back((change, src, Instant::now())); - - count += change_len; // track number of individual changes, not changesets + maybe_change_src = rx_changes.recv() => match maybe_change_src { + Some((change, src)) => (change, src), + None => break, }, _ = max_wait.tick() => { // got a wait interval tick... - gauge!("corro.agent.changes.in_queue").set(count as f64); + gauge!("corro.agent.changes.in_queue").set(buf_cost as f64); gauge!("corro.agent.changesets.in_queue").set(queue.len() as f64); gauge!("corro.agent.changes.processing.jobs").set(join_set.len() as f64); - if count < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT { + if buf_cost < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT { // we can process this right away - debug!(%count, "spawning processing multiple changes from max wait interval"); + debug!(%buf_cost, "spawning processing multiple changes from max wait interval"); join_set.spawn(util::process_multiple_changes( agent.clone(), bookie.clone(), queue.drain(..).collect(), )); - count = 0; + buf_cost = 0; } if seen.len() > MAX_SEEN_CACHE_LEN { // we don't want to keep too many entries in here. seen = seen.split_off(seen.len() - KEEP_SEEN_CACHE_SIZE); } + continue }, _ = &mut tripwire => { break; } + }; - else => { - break; + let change_len = change.len(); + counter!("corro.agent.changes.recv").increment(std::cmp::max(change_len, 1) as u64); // count empties... + + if change.actor_id == agent.actor_id() { + continue; + } + + if let Some(mut seqs) = change.seqs().cloned() { + let v = *change.versions().start(); + if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) { + if seqs.all(|seq| seen_seqs.contains(&seq)) { + continue; + } + } + } else { + // empty versions + if change + .versions() + .all(|v| seen.contains_key(&(change.actor_id, v))) + { + continue; } } - } - info!("Draining changes receiver..."); + let recv_lag = change + .ts() + .map(|ts| (agent.clock().new_timestamp().get_time() - ts.0).to_duration()); - // drain! - while let Ok((change, src)) = rx_changes.try_recv() { - let changes_count = std::cmp::max(change.len(), 1); - counter!("corro.agent.changes.recv").increment(changes_count as u64); - count += changes_count; - queue.push_back((change, src, Instant::now())); - if count >= max_changes_chunk { - // drain and process current changes! - if let Err(e) = util::process_multiple_changes( - agent.clone(), - bookie.clone(), - queue.drain(..).collect(), - ) - .await + if matches!(src, ChangeSource::Broadcast) { + counter!("corro.broadcast.recv.count", "kind" => "change").increment(1); + } + + let booked = { + bookie + .read(format!( + "handle_change(get):{}", + change.actor_id.as_simple() + )) + .await + .get(&change.actor_id) + .cloned() + }; + + if let Some(booked) = booked { + if booked + .read(format!( + "handle_change(contains?):{}", + change.actor_id.as_simple() + )) + .await + .contains_all(change.versions(), change.seqs()) { - error!("could not process last multiple changes: {e}"); + trace!("already seen, stop disseminating"); + continue; } + } - // reset count - count = 0; + if let Some(recv_lag) = recv_lag { + let src_str: &'static str = src.into(); + histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str) + .record(recv_lag.as_secs_f64()); } - } - // process the last changes we got! - if let Err(e) = util::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await - { - error!("could not process multiple changes: {e}"); + // this will only run once for a non-empty changeset + for v in change.versions() { + let entry = seen.entry((change.actor_id, v)).or_default(); + if let Some(seqs) = change.seqs().cloned() { + entry.extend([seqs]); + } + } + + if matches!(src, ChangeSource::Broadcast) && !change.is_empty() { + if let Err(_e) = + agent + .tx_bcast() + .try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change( + change.clone(), + ))) + { + debug!("broadcasts are full or done!"); + } + } + + let cost = processing_cost(&change); + queue.push_back((change, src, Instant::now())); + + buf_cost += cost; // tracks the cost, not number of changes } } diff --git a/crates/corro-agent/src/agent/mod.rs b/crates/corro-agent/src/agent/mod.rs index 537f2d13..8025272d 100644 --- a/crates/corro-agent/src/agent/mod.rs +++ b/crates/corro-agent/src/agent/mod.rs @@ -12,7 +12,7 @@ mod metrics; mod run_root; mod setup; mod uni; -mod util; +pub mod util; #[cfg(test)] mod tests; @@ -27,11 +27,13 @@ use uuid::Uuid; pub use error::{SyncClientError, SyncRecvError}; pub use run_root::start_with_config; pub use setup::{setup, AgentOptions}; -pub use util::{process_multiple_changes, clear_overwritten_versions}; +pub use util::{clear_overwritten_versions, process_multiple_changes}; pub const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300); -pub const MAX_SYNC_BACKOFF: Duration = Duration::from_secs(15); // 1 minute oughta be enough, we're constantly - // getting broadcasts randomly + targetted +#[cfg(test)] +pub const MAX_SYNC_BACKOFF: Duration = Duration::from_secs(2); +#[cfg(not(test))] +pub const MAX_SYNC_BACKOFF: Duration = Duration::from_secs(15); pub const RANDOM_NODES_CHOICES: usize = 10; pub const CHECK_EMPTIES_TO_INSERT_AFTER: Duration = Duration::from_secs(120); diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index 370100d4..1f73bf66 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -1,5 +1,7 @@ //! Start the root agent tasks +use std::time::Instant; + use crate::{ agent::{ handlers::{self, spawn_handle_db_cleanup}, @@ -41,7 +43,6 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul lock_registry, rx_bcast, rx_apply, - rx_empty, rx_clear_buf, rx_changes, rx_foca, @@ -99,11 +100,11 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul ) .await?; - spawn_counted(util::write_empties_loop( - agent.clone(), - rx_empty, - tripwire.clone(), - )); + // spawn_counted(util::write_empties_loop( + // agent.clone(), + // rx_empty, + // tripwire.clone(), + // )); tokio::spawn(util::clear_buffered_meta_loop(agent.clone(), rx_clear_buf)); @@ -124,10 +125,12 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul let mut w = bookie.write("init").await; w.insert(agent.actor_id(), agent.booked().clone()); } + + let start = Instant::now(); { let conn = agent.pool().read().await?; let actor_ids: Vec = conn - .prepare("SELECT DISTINCT actor_id FROM __corro_bookkeeping")? + .prepare("SELECT site_id FROM crsql_site_id WHERE ordinal > 0")? .query_map([], |row| row.get(0)) .and_then(|rows| rows.collect::>>())?; @@ -155,7 +158,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul } }), ) - .buffer_unordered(2); + .buffer_unordered(4); while let Some((actor_id, bv)) = TryStreamExt::try_next(&mut buf).await? { for (version, partial) in bv.partials.iter() { @@ -180,6 +183,8 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul } } + info!("Bookkeeping fully loaded in {:?}", start.elapsed()); + spawn_counted( util::sync_loop( agent.clone(), diff --git a/crates/corro-agent/src/agent/setup.rs b/crates/corro-agent/src/agent/setup.rs index b92b86a5..a41adbff 100644 --- a/crates/corro-agent/src/agent/setup.rs +++ b/crates/corro-agent/src/agent/setup.rs @@ -50,7 +50,6 @@ pub struct AgentOptions { pub api_listener: TcpListener, pub rx_bcast: CorroReceiver, pub rx_apply: CorroReceiver<(ActorId, Version)>, - pub rx_empty: CorroReceiver<(ActorId, RangeInclusive)>, pub rx_clear_buf: CorroReceiver<(ActorId, RangeInclusive)>, pub rx_changes: CorroReceiver<(ChangeV1, ChangeSource)>, pub rx_foca: CorroReceiver, @@ -144,18 +143,18 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age ); let (tx_bcast, rx_bcast) = bounded(conf.perf.bcast_channel_len, "bcast"); - let (tx_empty, rx_empty) = bounded(conf.perf.empties_channel_len, "empty"); let (tx_changes, rx_changes) = bounded(conf.perf.changes_channel_len, "changes"); let (tx_foca, rx_foca) = bounded(conf.perf.foca_channel_len, "foca"); let lock_registry = LockRegistry::default(); // make an empty booked! - let booked = Booked::new(BookedVersions::default(), lock_registry.clone()); + let booked = Booked::new(BookedVersions::new(actor_id), lock_registry.clone()); // asynchronously load it up! tokio::task::spawn_blocking({ let pool = pool.clone(); + // acquiring the lock here means everything will have to wait for it to be ready let mut booked = booked.write_owned("init").await; move || { let conn = pool.read_blocking()?; @@ -171,7 +170,6 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age lock_registry, rx_bcast, rx_apply, - rx_empty, rx_clear_buf, rx_changes, rx_foca, @@ -193,7 +191,6 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age booked, tx_bcast, tx_apply, - tx_empty, tx_clear_buf, tx_changes, tx_foca, diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index 4f162181..031a16b9 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -1,6 +1,7 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, net::SocketAddr, + ops::Deref, time::{Duration, Instant}, }; @@ -9,14 +10,18 @@ use hyper::StatusCode; use rand::{ distributions::Uniform, prelude::Distribution, rngs::StdRng, seq::IteratorRandom, SeedableRng, }; +use rangemap::RangeInclusiveSet; use serde::Deserialize; use serde_json::json; use spawn::wait_for_all_pending_handles; -use tokio::time::{sleep, timeout, MissedTickBehavior}; +use tokio::{ + sync::mpsc, + time::{sleep, timeout, MissedTickBehavior}, +}; use tracing::{debug, info_span}; use tripwire::Tripwire; -use crate::agent::util::*; +use crate::{agent::util::*, api::peer::parallel_sync, transport::Transport}; use corro_tests::*; use corro_types::{ actor::ActorId, @@ -331,7 +336,10 @@ pub async fn configurable_stress_test( let client: hyper::Client<_, hyper::Body> = hyper::Client::builder().build_http(); - let addrs: Vec = agents.iter().map(|ta| ta.agent.api_addr()).collect(); + let addrs: Vec<(ActorId, SocketAddr)> = agents + .iter() + .map(|ta| (ta.agent.actor_id(), ta.agent.api_addr())) + .collect(); let iter = (0..input_count).flat_map(|n| { serde_json::from_value::>(json!([ @@ -355,66 +363,97 @@ pub async fn configurable_stress_test( .unwrap() }); - tokio::spawn(async move { - tokio_stream::StreamExt::map(futures::stream::iter(iter).chunks(20), { + let actor_versions = tokio_stream::StreamExt::map(futures::stream::iter(iter).chunks(20), { + let addrs = addrs.clone(); + let client = client.clone(); + move |statements| { let addrs = addrs.clone(); let client = client.clone(); - move |statements| { - let addrs = addrs.clone(); - let client = client.clone(); - Ok(async move { - let mut rng = StdRng::from_entropy(); - let chosen = addrs.iter().choose(&mut rng).unwrap(); - - let res = client - .request( - hyper::Request::builder() - .method(hyper::Method::POST) - .uri(format!("http://{chosen}/v1/transactions")) - .header(hyper::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_vec(&statements)?.into())?, - ) - .await?; - - if res.status() != StatusCode::OK { - eyre::bail!("unexpected status code: {}", res.status()); - } + Ok(async move { + let mut rng = StdRng::from_entropy(); + let (actor_id, chosen) = addrs.iter().choose(&mut rng).unwrap(); + + let res = client + .request( + hyper::Request::builder() + .method(hyper::Method::POST) + .uri(format!("http://{chosen}/v1/transactions")) + .header(hyper::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_vec(&statements)?.into())?, + ) + .await?; + + if res.status() != StatusCode::OK { + eyre::bail!("unexpected status code: {}", res.status()); + } - let body: ExecResponse = - serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await?)?; + let body: ExecResponse = + serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await?)?; - for (i, statement) in statements.iter().enumerate() { - if !matches!( - body.results[i], - ExecResult::Execute { - rows_affected: 1, - .. - } - ) { - eyre::bail!("unexpected exec result for statement {i}: {statement:?}"); + for (i, statement) in statements.iter().enumerate() { + if !matches!( + body.results[i], + ExecResult::Execute { + rows_affected: 1, + .. } + ) { + eyre::bail!("unexpected exec result for statement {i}: {statement:?}"); } + } - Ok::<_, eyre::Report>(()) - }) - } - }) - .try_buffer_unordered(10) - .try_collect::>() - .await?; - Ok::<_, eyre::Report>(()) - }); + Ok::<_, eyre::Report>((*actor_id, statements.len())) + }) + } + }) + .try_buffer_unordered(10) + .try_collect::>() + .await?; let changes_count = 4 * input_count; println!("expecting {changes_count} ops"); + // tokio::spawn({ + // let bookies = agents + // .iter() + // .map(|a| (a.agent.actor_id(), a.bookie.clone())) + // .collect::>(); + // async move { + // loop { + // tokio::time::sleep(Duration::from_secs(1)).await; + // for (actor_id, bookie) in bookies.iter() { + // let registry = bookie.registry(); + // let r = registry.map.read(); + + // for v in r.values() { + // debug!(%actor_id, "GOT A LOCK {v:?}"); + // } + // } + // } + // } + // }); + let start = Instant::now(); let mut interval = tokio::time::interval(Duration::from_secs(1)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { - interval.tick().await; + debug!("looping"); + for ta in agents.iter() { + let registry = ta.bookie.registry(); + let r = registry.map.read(); + + for v in r.values() { + println!( + "{}: GOT A LOCK: {} has been locked for {:?}", + ta.agent.actor_id(), + v.label, + v.started_at.elapsed() + ); + } + } + tokio::time::sleep(Duration::from_secs(1)).await; println!("checking status after {}s", start.elapsed().as_secs_f32()); let mut v = vec![]; for ta in agents.iter() { @@ -461,13 +500,43 @@ pub async fn configurable_stress_test( break; } + println!("we're not done yet..."); + if start.elapsed() > Duration::from_secs(30) { - let conn = agents[0].agent.pool().read().await?; - let mut prepped = conn.prepare("SELECT * FROM crsql_changes;")?; - let mut rows = prepped.query(())?; + for ta in agents.iter() { + let conn = ta.agent.pool().read().await?; + let mut per_actor: BTreeMap> = BTreeMap::new(); + let mut prepped = conn.prepare("SELECT actor_id, start_version, coalesce(end_version, start_version) FROM __corro_bookkeeping;")?; + let mut rows = prepped.query(())?; + + while let Ok(Some(row)) = rows.next() { + per_actor + .entry(row.get(0)?) + .or_default() + .insert(row.get(1)?..=row.get(2)?); + } + + for (actor_id, versions) in per_actor { + if let Some(versions_len) = actor_versions.get(&actor_id) { + let full_range = Version(1)..=Version(*versions_len as u64 + 1); + let gaps = versions.gaps(&full_range); + for gap in gaps { + println!("{} gap! {actor_id} => {gap:?}", ta.agent.actor_id()); + } + } + } + + let recorded_gaps = conn + .prepare("SELECT actor_id, start, end FROM __corro_bookkeeping_gaps")? + .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))? + .collect::, _>>()?; - while let Ok(Some(row)) = rows.next() { - println!("row: {row:?}"); + for (actor_id, start, end) in recorded_gaps { + println!( + "{} recorded gap: {actor_id} => {start}..={end}", + ta.agent.actor_id() + ); + } } panic!( @@ -478,6 +547,21 @@ pub async fn configurable_stress_test( } println!("fully disseminated in {}s", start.elapsed().as_secs_f32()); + println!("checking gaps in db..."); + for ta in agents { + let conn = ta.agent.pool().read().await?; + let gaps_count: u64 = + conn.query_row("SELECT count(*) FROM __corro_bookkeeping_gaps", [], |row| { + row.get(0) + })?; + assert_eq!( + gaps_count, + 0, + "expected {} to have 0 gaps in DB", + ta.agent.actor_id() + ); + } + tripwire_tx.send(()).await.ok(); tripwire_worker.await; wait_for_all_pending_handles().await; @@ -652,58 +736,93 @@ async fn large_tx_sync() -> eyre::Result<()> { .query_row("SELECT crsql_db_version();", (), |row| row.get(0))?; assert_eq!(db_version, CrsqlDbVersion(counts.len() as u64)); - sleep(Duration::from_secs(2)).await; + println!("expected count: {expected_count}"); - let ta2 = launch_test_agent( - |conf| { - conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]) - .build() - }, - tripwire.clone(), - ) - .await?; + let ta2 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + let ta3 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + let ta4 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; - sleep(Duration::from_secs(1)).await; + let (rtt_tx, _rtt_rx) = mpsc::channel(1024); + let ta2_transport = Transport::new(&ta2.agent.config().gossip, rtt_tx.clone()).await?; + let ta3_transport = Transport::new(&ta3.agent.config().gossip, rtt_tx.clone()).await?; + let ta4_transport = Transport::new(&ta4.agent.config().gossip, rtt_tx.clone()).await?; - let ta3 = launch_test_agent( - |conf| { - conf.bootstrap(vec![ta2.agent.gossip_addr().to_string()]) - .build() - }, - tripwire.clone(), - ) - .await?; + for _ in 0..6 { + let res = parallel_sync( + &ta2.agent, + &ta2_transport, + vec![(ta1.agent.actor_id(), ta1.agent.gossip_addr())], + generate_sync(&ta2.bookie, ta2.agent.actor_id()).await, + ) + .await?; - sleep(Duration::from_secs(1)).await; + println!("ta2 synced {res}"); - let ta4 = launch_test_agent( - |conf| { - conf.bootstrap(vec![ta3.agent.gossip_addr().to_string()]) - .build() - }, - tripwire.clone(), - ) - .await?; + let res = parallel_sync( + &ta3.agent, + &ta3_transport, + vec![ + (ta1.agent.actor_id(), ta1.agent.gossip_addr()), + (ta2.agent.actor_id(), ta2.agent.gossip_addr()), + ], + generate_sync(&ta3.bookie, ta3.agent.actor_id()).await, + ) + .await?; + + println!("ta3 synced {res}"); + + let res = parallel_sync( + &ta4.agent, + &ta4_transport, + vec![ + (ta3.agent.actor_id(), ta3.agent.gossip_addr()), + (ta2.agent.actor_id(), ta2.agent.gossip_addr()), + ], + generate_sync(&ta4.bookie, ta4.agent.actor_id()).await, + ) + .await?; - sleep(Duration::from_secs(20)).await; + println!("ta4 synced {res}"); + + tokio::time::sleep(Duration::from_secs(1)).await; + } - for ta in [ta2, ta3, ta4] { - let agent = ta.agent; + tokio::time::sleep(Duration::from_secs(10)).await; + + let mut ta_counts = vec![]; + + for (name, ta) in [("ta2", &ta2), ("ta3", &ta3), ("ta4", &ta4)] { + let agent = &ta.agent; let conn = agent.pool().read().await?; let count: u64 = conn .prepare_cached("SELECT COUNT(*) FROM testsbool;")? .query_row((), |row| row.get(0))?; - println!("{:#?}", generate_sync(&ta.bookie, agent.actor_id()).await); + println!( + "{name}: {:#?}", + generate_sync(&ta.bookie, agent.actor_id()).await + ); + + println!( + "{name}: bookie: {:?}", + ta.bookie + .read("test") + .await + .get(&ta1.agent.actor_id()) + .unwrap() + .read("test") + .await + .deref() + ); if count as usize != expected_count { - let buf_count: u64 = - conn.query_row("select count(*) from __corro_buffered_changes", [], |row| { - row.get(0) - })?; + let buf_count: Vec<(Version, u64)> = conn + .prepare("select version,count(*) from __corro_buffered_changes group by version")? + .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))? + .collect::>>()?; println!( - "BUFFERED COUNT: {buf_count} (actor_id: {})", + "{name}: BUFFERED COUNT: {buf_count:?} (actor_id: {})", agent.actor_id() ); @@ -711,14 +830,16 @@ async fn large_tx_sync() -> eyre::Result<()> { .prepare("select start_seq, end_seq from __corro_seq_bookkeeping")? .query_map([], |row| Ok(row.get::<_, u64>(0)?..=row.get::<_, u64>(1)?))? .collect::>>()?; - println!("ranges: {ranges:?}"); + println!("{name}: ranges: {ranges:?}"); } + ta_counts.push((name, agent.actor_id(), count as usize)); + } + + for (name, actor_id, count) in ta_counts { assert_eq!( - count as usize, - expected_count, - "actor {} did not reach 10K rows", - agent.actor_id() + count, expected_count, + "{name}: actor {actor_id} did not reach 10K rows", ); } diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 33edb361..66f3c482 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -7,7 +7,7 @@ use std::{ cmp, - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashSet}, convert::Infallible, net::SocketAddr, ops::RangeInclusive, @@ -16,9 +16,7 @@ use std::{ }; use crate::{ - agent::{ - handlers, CountedExecutor, CHECK_EMPTIES_TO_INSERT_AFTER, MAX_SYNC_BACKOFF, TO_CLEAR_COUNT, - }, + agent::{handlers, CountedExecutor, MAX_SYNC_BACKOFF, TO_CLEAR_COUNT}, api::public::{ api_v1_db_schema, api_v1_queries, api_v1_table_stats, api_v1_transactions, pubsub::{api_v1_sub_by_id, api_v1_subs}, @@ -45,21 +43,15 @@ use axum::{ BoxError, Extension, Router, TypedHeader, }; use foca::Member; -use futures::{FutureExt, TryFutureExt}; +use futures::FutureExt; use hyper::{server::conn::AddrIncoming, StatusCode}; -use itertools::Itertools; use metrics::{counter, histogram}; use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; use rusqlite::{ named_params, params, params_from_iter, Connection, OptionalExtension, ToSql, Transaction, }; use spawn::spawn_counted; -use tokio::{ - net::TcpListener, - sync::mpsc::Sender, - task::block_in_place, - time::{sleep, timeout}, -}; +use tokio::{net::TcpListener, sync::mpsc::Sender, task::block_in_place, time::sleep}; use tower::{limit::ConcurrencyLimitLayer, load_shed::LoadShedLayer}; use tower_http::trace::TraceLayer; use tracing::{debug, error, info, trace, warn}; @@ -98,9 +90,7 @@ pub async fn initialise_foca(agent: &Agent) { if let Err(e) = agent .tx_foca() - .send(FocaInput::ApplyMany( - foca_states.into_iter().map(|(_, v)| v).collect(), - )) + .send(FocaInput::ApplyMany(foca_states.into_values().collect())) .await { error!("Failed to queue initial foca state: {e:?}, cluster membership states will be broken!"); @@ -151,198 +141,198 @@ pub async fn clear_overwritten_versions_loop(agent: Agent, bookie: Bookie, sleep /// Prune the database pub async fn clear_overwritten_versions( - agent: &Agent, - bookie: &Bookie, - pool: &SplitPool, - feedback: Option>, + _agent: &Agent, + _bookie: &Bookie, + _pool: &SplitPool, + _feedback: Option>, ) -> Result<(), String> { - let start = Instant::now(); - - let bookie_clone = { - bookie - .read("gather_booked_for_compaction") - .await - .iter() - .map(|(actor_id, booked)| (*actor_id, booked.clone())) - .collect::>() - }; - - let mut inserted = 0; - let mut deleted = 0; - - let mut db_elapsed = Duration::new(0, 0); - - if let Some(ref tx) = feedback { - tx.send(format!( - "Compacting changes for {} actors", - bookie_clone.len() - )) - .await - .map_err(|e| format!("{e}"))?; - } - - for (actor_id, booked) in bookie_clone { - if let Some(ref tx) = feedback { - tx.send(format!("Starting change compaction for {actor_id}")) - .await - .map_err(|e| format!("{e}"))?; - } - - // pull the current db version -> version map at the present time - // these are only updated _after_ a transaction has been committed, via a write lock - // so it should be representative of the current state. - let mut versions = { - match timeout( - Duration::from_secs(1), - booked.read(format!( - "clear_overwritten_versions:{}", - actor_id.as_simple() - )), - ) - .await - { - Ok(booked) => booked.current_versions(), - Err(_) => { - info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now"); - - if let Some(ref tx) = feedback { - tx.send("timed out acquiring read lock on bookkeeping".into()) - .await - .map_err(|e| format!("{e}"))?; - } - - return Err("Timed out acquiring read lock on bookkeeping".into()); - } - } - }; - - if versions.is_empty() { - if let Some(ref tx) = feedback { - tx.send("No versions to compact".into()) - .await - .map_err(|e| format!("{e}"))?; - } - continue; - } - - // we're using a read connection here, starting a read-only transaction - // this should be representative of the state of current versions from the actor - let cleared_versions = match pool.read().await { - Ok(mut conn) => { - let start = Instant::now(); - let res = block_in_place(|| { - let tx = conn.transaction()?; - find_cleared_db_versions(&tx, &actor_id) - }); - db_elapsed += start.elapsed(); - match res { - Ok(cleared) => { - debug!( - actor_id = %actor_id, - "Aggregated {} DB versions to clear in {:?}", - cleared.len(), - start.elapsed() - ); - - if let Some(ref tx) = feedback { - tx.send(format!("Aggregated {} DB versions to clear", cleared.len())) - .await - .map_err(|e| format!("{e}"))?; - } - - cleared - } - Err(e) => { - error!("could not get cleared versions: {e}"); - - if let Some(ref tx) = feedback { - tx.send(format!("failed to get cleared versions: {e}")) - .await - .map_err(|e| format!("{e}"))?; - } - - return Err("failed to cleared versions".into()); - } - } - } - Err(e) => { - error!("could not get read connection: {e}"); - if let Some(ref tx) = feedback { - tx.send(format!("failed to get read connection: {e}")) - .await - .map_err(|e| format!("{e}"))?; - } - return Err("could not get read connection".into()); - } - }; - - if !cleared_versions.is_empty() { - let mut to_clear = Vec::new(); - - for db_v in cleared_versions { - if let Some(v) = versions.remove(&db_v) { - to_clear.push((db_v, v)) - } - } - - if !to_clear.is_empty() { - // use a write lock here so we can mutate the bookkept state - let mut bookedw = booked - .write(format!("clearing:{}", actor_id.as_simple())) - .await; - - for (_db_v, v) in to_clear.iter() { - // only remove when confirming that version is still considered "current" - if bookedw.contains_current(v) { - // set it as cleared right away - bookedw.insert(*v, KnownDbVersion::Cleared); - deleted += 1; - } - } - - // find any affected cleared ranges - for range in to_clear - .iter() - .filter_map(|(_, v)| bookedw.cleared.get(v)) - .dedup() - { - // schedule for clearing in the background task - if let Err(e) = agent.tx_empty().send((actor_id, range.clone())).await { - error!("could not schedule version to be cleared: {e}"); - if let Some(ref tx) = feedback { - tx.send(format!("failed to get queue compaction set: {e}")) - .await - .map_err(|e| format!("{e}"))?; - } - } else { - inserted += 1; - } - - tokio::task::yield_now().await; - } - - if let Some(ref tx) = feedback { - tx.send(format!("Queued {inserted} empty versions to compact")) - .await - .map_err(|e| format!("{e}"))?; - } - } - } - - if let Some(ref tx) = feedback { - tx.send(format!("Finshed compacting changes for {actor_id}")) - .await - .map_err(|e| format!("{e}"))?; - } - - tokio::time::sleep(Duration::from_secs(1)).await; - } - - info!( - "Compaction done, cleared {} DB bookkeeping table rows (wall time: {:?}, db time: {db_elapsed:?})", - deleted - inserted, - start.elapsed() - ); + // let start = Instant::now(); + + // let bookie_clone = { + // bookie + // .read("gather_booked_for_compaction") + // .await + // .iter() + // .map(|(actor_id, booked)| (*actor_id, booked.clone())) + // .collect::>() + // }; + + // let mut inserted = 0; + // let mut deleted = 0; + + // let mut db_elapsed = Duration::new(0, 0); + + // if let Some(ref tx) = feedback { + // tx.send(format!( + // "Compacting changes for {} actors", + // bookie_clone.len() + // )) + // .await + // .map_err(|e| format!("{e}"))?; + // } + + // for (actor_id, booked) in bookie_clone { + // if let Some(ref tx) = feedback { + // tx.send(format!("Starting change compaction for {actor_id}")) + // .await + // .map_err(|e| format!("{e}"))?; + // } + + // // pull the current db version -> version map at the present time + // // these are only updated _after_ a transaction has been committed, via a write lock + // // so it should be representative of the current state. + // let mut versions = { + // match timeout( + // Duration::from_secs(1), + // booked.read(format!( + // "clear_overwritten_versions:{}", + // actor_id.as_simple() + // )), + // ) + // .await + // { + // Ok(booked) => booked.current_versions(), + // Err(_) => { + // info!(%actor_id, "timed out acquiring read lock on bookkeeping, skipping for now"); + + // if let Some(ref tx) = feedback { + // tx.send("timed out acquiring read lock on bookkeeping".into()) + // .await + // .map_err(|e| format!("{e}"))?; + // } + + // return Err("Timed out acquiring read lock on bookkeeping".into()); + // } + // } + // }; + + // if versions.is_empty() { + // if let Some(ref tx) = feedback { + // tx.send("No versions to compact".into()) + // .await + // .map_err(|e| format!("{e}"))?; + // } + // continue; + // } + + // // we're using a read connection here, starting a read-only transaction + // // this should be representative of the state of current versions from the actor + // let cleared_versions = match pool.read().await { + // Ok(mut conn) => { + // let start = Instant::now(); + // let res = block_in_place(|| { + // let tx = conn.transaction()?; + // find_cleared_db_versions(&tx, &actor_id) + // }); + // db_elapsed += start.elapsed(); + // match res { + // Ok(cleared) => { + // debug!( + // actor_id = %actor_id, + // "Aggregated {} DB versions to clear in {:?}", + // cleared.len(), + // start.elapsed() + // ); + + // if let Some(ref tx) = feedback { + // tx.send(format!("Aggregated {} DB versions to clear", cleared.len())) + // .await + // .map_err(|e| format!("{e}"))?; + // } + + // cleared + // } + // Err(e) => { + // error!("could not get cleared versions: {e}"); + + // if let Some(ref tx) = feedback { + // tx.send(format!("failed to get cleared versions: {e}")) + // .await + // .map_err(|e| format!("{e}"))?; + // } + + // return Err("failed to cleared versions".into()); + // } + // } + // } + // Err(e) => { + // error!("could not get read connection: {e}"); + // if let Some(ref tx) = feedback { + // tx.send(format!("failed to get read connection: {e}")) + // .await + // .map_err(|e| format!("{e}"))?; + // } + // return Err("could not get read connection".into()); + // } + // }; + + // if !cleared_versions.is_empty() { + // let mut to_clear = Vec::new(); + + // for db_v in cleared_versions { + // if let Some(v) = versions.remove(&db_v) { + // to_clear.push((db_v, v)) + // } + // } + + // if !to_clear.is_empty() { + // // use a write lock here so we can mutate the bookkept state + // let mut bookedw = booked + // .write(format!("clearing:{}", actor_id.as_simple())) + // .await; + + // for (_db_v, v) in to_clear.iter() { + // // only remove when confirming that version is still considered "current" + // if bookedw.contains_current(v) { + // // set it as cleared right away + // bookedw.insert(*v, KnownDbVersion::Cleared); + // deleted += 1; + // } + // } + + // // find any affected cleared ranges + // for range in to_clear + // .iter() + // .filter_map(|(_, v)| bookedw.cleared.get(v)) + // .dedup() + // { + // // schedule for clearing in the background task + // if let Err(e) = agent.tx_empty().send((actor_id, range.clone())).await { + // error!("could not schedule version to be cleared: {e}"); + // if let Some(ref tx) = feedback { + // tx.send(format!("failed to get queue compaction set: {e}")) + // .await + // .map_err(|e| format!("{e}"))?; + // } + // } else { + // inserted += 1; + // } + + // tokio::task::yield_now().await; + // } + + // if let Some(ref tx) = feedback { + // tx.send(format!("Queued {inserted} empty versions to compact")) + // .await + // .map_err(|e| format!("{e}"))?; + // } + // } + // } + + // if let Some(ref tx) = feedback { + // tx.send(format!("Finshed compacting changes for {actor_id}")) + // .await + // .map_err(|e| format!("{e}"))?; + // } + + // tokio::time::sleep(Duration::from_secs(1)).await; + // } + + // info!( + // "Compaction done, cleared {} DB bookkeeping table rows (wall time: {:?}, db time: {db_elapsed:?})", + // deleted - inserted, + // start.elapsed() + // ); Ok(()) } @@ -640,21 +630,27 @@ pub async fn sync_loop( match branch { Branch::Tick => { // ignoring here, there is trying and logging going on inside - match handlers::handle_sync(&agent, &bookie, &transport) - .preemptible(&mut tripwire) - .await + match tokio::time::timeout( + Duration::from_secs(300), + handlers::handle_sync(&agent, &bookie, &transport), + ) + .preemptible(&mut tripwire) + .await { tripwire::Outcome::Preempted(_) => { warn!("aborted sync by tripwire"); break; } - tripwire::Outcome::Completed(res) => { - if let Err(e) = res { + tripwire::Outcome::Completed(res) => match res { + Ok(Err(e)) => { error!("could not sync: {e}"); // keep syncing until we successfully sync - continue; } - } + Err(_e) => { + warn!("timed out waiting for sync to complete!"); + } + Ok(Ok(_)) => {} + }, } next_sync_at .as_mut() @@ -734,131 +730,6 @@ pub async fn clear_buffered_meta_loop( } } -const MAX_EMPTIES_BATCH_SIZE: u64 = 40; - -/// Clear empty versions from the database in chunks to avoid locking -/// the database for too long. -/// -/// We are given versions to clear either by receiving empty -/// changesets, or when calling -/// [`find_cleared_db_versions`](self::find_cleared_db_versions) -/// periodically. -pub async fn write_empties_loop( - agent: Agent, - mut rx_empty: CorroReceiver<(ActorId, RangeInclusive)>, - mut tripwire: Tripwire, -) { - let mut empties: BTreeMap> = BTreeMap::new(); - - let next_empties_check = tokio::time::sleep(CHECK_EMPTIES_TO_INSERT_AFTER); - tokio::pin!(next_empties_check); - - let mut count = 0; - - loop { - tokio::select! { - maybe_empty = rx_empty.recv() => match maybe_empty { - Some((actor_id, versions)) => { - empties.entry(actor_id).or_default().insert(versions); - count += 1; - if count < MAX_EMPTIES_BATCH_SIZE { - continue; - } - }, - None => { - debug!("empties queue is done"); - break; - } - }, - _ = &mut next_empties_check => { - next_empties_check.as_mut().reset(tokio::time::Instant::now() + CHECK_EMPTIES_TO_INSERT_AFTER); - if empties.is_empty() { - continue; - } - }, - _ = &mut tripwire => break - } - - let empties_to_process = std::mem::take(&mut empties); - - // TODO: replace with a JoinSet and max concurrency - spawn_counted( - process_completed_empties(agent.clone(), empties_to_process) - .inspect_err(|e| error!("could not process empties: {e}")), - ); - - count = 0; - } - info!("Draining empty versions to process..."); - // drain empties channel - while let Ok((actor_id, versions)) = rx_empty.try_recv() { - empties.entry(actor_id).or_default().insert(versions); - } - - if !empties.is_empty() { - info!("Inserting last unprocessed empties before shut down"); - if let Err(e) = process_completed_empties(agent, empties).await { - error!("could not process empties: {e}"); - } - } -} - -#[tracing::instrument(skip_all, err)] -pub async fn process_completed_empties( - agent: Agent, - empties: BTreeMap>, -) -> eyre::Result<()> { - debug!( - "processing empty versions (count: {})", - empties.values().map(RangeInclusiveSet::len).sum::() - ); - - let mut inserted = 0; - - let start = Instant::now(); - for (actor_id, empties) in empties { - let v = empties.into_iter().collect::>(); - - for ranges in v.chunks(25) { - let mut conn = agent.pool().write_low().await?; - block_in_place(|| { - let mut tx = conn.immediate_transaction()?; - - for range in ranges { - let mut sp = tx.savepoint()?; - match store_empty_changeset(&sp, actor_id, range.clone()) { - Ok(count) => { - inserted += count; - sp.commit()?; - } - Err(e) => { - error!(%actor_id, "could not store empty changeset for versions {range:?}: {e}"); - sp.rollback()?; - continue; - } - } - if let Err(e) = agent.tx_clear_buf().try_send((actor_id, range.clone())) { - error!(%actor_id, "could not schedule buffered meta clear: {e}"); - } - } - - tx.commit()?; - - Ok::<_, eyre::Report>(()) - })?; - } - } - - let elapsed = start.elapsed(); - - debug!("upserted {inserted} empty version ranges in {elapsed:?}"); - - counter!("corro.agent.empties.committed").increment(inserted as u64); - histogram!("corro.agent.empties.commit.second").record(elapsed); - - Ok(()) -} - #[tracing::instrument(skip_all, err)] pub fn process_single_version( agent: &Agent, @@ -908,7 +779,7 @@ pub fn store_empty_changeset( conn: &Connection, actor_id: ActorId, versions: RangeInclusive, -) -> eyre::Result { +) -> Result { // first, delete "current" versions, they're now gone! let deleted: Vec> = conn .prepare_cached( @@ -916,6 +787,16 @@ pub fn store_empty_changeset( DELETE FROM __corro_bookkeeping WHERE actor_id = :actor_id AND + start_version >= COALESCE(( + SELECT start_version + FROM __corro_bookkeeping + WHERE + actor_id = :actor_id AND + start_version < :start + ORDER BY start_version DESC + LIMIT 1 + ), 1) + AND ( -- start_version is between start and end of range AND no end_version ( start_version BETWEEN :start AND :end AND end_version IS NULL ) OR @@ -933,7 +814,12 @@ pub fn store_empty_changeset( ( end_version = :start - 1 ) ) RETURNING start_version, end_version", - )? + ) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: None, + })? .query_map( named_params![ ":actor_id": actor_id, @@ -945,7 +831,14 @@ pub fn store_empty_changeset( Ok(start..=row.get::<_, Option>(1)?.unwrap_or(start)) }, ) - .and_then(|rows| rows.collect::>>())?; + .and_then(|rows| rows.collect::>>()) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: None, + })?; + + // println!("deleted: {deleted:?}"); if !deleted.is_empty() { debug!( @@ -961,12 +854,13 @@ pub fn store_empty_changeset( // we should never have deleted non-contiguous ranges, abort! if new_ranges.len() > 1 { warn!("deleted non-contiguous ranges! {new_ranges:?}"); - // this serves as a failsafe - eyre::bail!("deleted non-contiguous ranges: {new_ranges:?}"); + return Err(ChangeError::NonContiguousDelete); } let mut inserted = 0; + // println!("inserting: {new_ranges:?}"); + for range in new_ranges { // insert cleared versions inserted += conn @@ -975,8 +869,16 @@ pub fn store_empty_changeset( INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version, db_version, last_seq, ts) VALUES (?, ?, ?, NULL, NULL, NULL); ", - )? - .execute(params![actor_id, range.start(), range.end()])?; + ).map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: None, + })? + .execute(params![actor_id, range.start(), range.end()]).map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: None, + })?; } Ok(inserted) @@ -1028,11 +930,17 @@ pub async fn process_fully_buffered_changes( } }; - let tx = conn.immediate_transaction()?; + let tx = conn + .immediate_transaction() + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; info!(%actor_id, %version, "Processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})"); - let max_db_version: Option> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?")?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional()?; + let max_db_version: Option> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional().map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; let start = Instant::now(); @@ -1048,8 +956,8 @@ pub async fn process_fully_buffered_changes( AND version = ? ORDER BY db_version ASC, seq ASC "#, - )? - .execute(params![max_db_version, actor_id.as_bytes(), version])?; + ).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})? + .execute(params![max_db_version, actor_id.as_bytes(), version]).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; info!(%actor_id, %version, "Inserted {count} rows from buffered into crsql_changes in {:?}", start.elapsed()); } else { info!(%actor_id, %version, "No buffered rows, skipped insertion into crsql_changes"); @@ -1060,19 +968,34 @@ pub async fn process_fully_buffered_changes( } let rows_impacted: i64 = tx - .prepare_cached("SELECT crsql_rows_impacted()")? - .query_row((), |row| row.get(0))?; + .prepare_cached("SELECT crsql_rows_impacted()") + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })? + .query_row((), |row| row.get(0)) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; debug!(%actor_id, %version, "rows impacted by buffered changes insertion: {rows_impacted}"); - let known_version = if rows_impacted > 0 { - let db_version: CrsqlDbVersion = - tx.query_row("SELECT crsql_next_db_version()", [], |row| row.get(0))?; + if rows_impacted > 0 { + let db_version: CrsqlDbVersion = tx + .query_row("SELECT crsql_next_db_version()", [], |row| row.get(0)) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; debug!("db version: {db_version}"); tx.prepare_cached( " - INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) + INSERT OR IGNORE INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES ( :actor_id, :version, @@ -1080,46 +1003,41 @@ pub async fn process_fully_buffered_changes( :last_seq, :ts );", - )? + ).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})? .execute(named_params! { ":actor_id": actor_id, ":version": version, ":db_version": db_version, - // ":start_version": 0, ":last_seq": last_seq, ":ts": ts - })?; + }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; debug!(%actor_id, %version, "inserted bookkeeping row after buffered insert"); - - Some(KnownDbVersion::Current(CurrentVersion { - db_version, - last_seq, - ts, - })) } else { - if let Err(e) = agent.tx_empty().try_send((actor_id, version..=version)) { - error!(%actor_id, "could not schedule empties for clear: {e}"); - } + store_empty_changeset(&tx, actor_id, version..=version)?; debug!(%actor_id, %version, "inserted CLEARED bookkeeping row after buffered insert"); - Some(KnownDbVersion::Cleared) }; - tx.commit()?; - - let inserted = if let Some(known_version) = known_version { - bookedw.insert(version, known_version); + let needed_changes = + bookedw + .insert_db(&tx, [version..=version].into()) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; - drop(bookedw); + tx.commit().map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; - true - } else { - false - }; + bookedw.apply_needed_changes(needed_changes); - Ok::<_, rusqlite::Error>(inserted) - }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; + Ok::<_, ChangeError>(true) + })?; Ok(inserted) } @@ -1135,7 +1053,7 @@ pub async fn process_multiple_changes( debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::()); let mut seen = HashSet::new(); - let mut unknown_changes = Vec::with_capacity(changes.len()); + let mut unknown_changes: BTreeMap<_, Vec<_>> = BTreeMap::new(); for (change, src, queued_at) in changes { histogram!("corro.agent.changes.queued.seconds").record(queued_at.elapsed()); let versions = change.versions(); @@ -1160,11 +1078,12 @@ pub async fn process_multiple_changes( continue; } - unknown_changes.push((change, src)); + unknown_changes + .entry(change.actor_id) + .or_default() + .push((change, src)); } - unknown_changes.sort_by_key(|(change, _src)| change.actor_id); - let mut conn = agent.pool().write_normal().await?; let changesets = block_in_place(|| { @@ -1182,113 +1101,105 @@ pub async fn process_multiple_changes( let mut last_db_version = None; - for (actor_id, changes) in unknown_changes - .into_iter() - .group_by(|(change, _src)| change.actor_id) - .into_iter() - { - // get a lock on the actor id's booked writer if we didn't already - { - let booked = { - bookie - .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{}", - actor_id.as_simple() - )) - .ensure(actor_id) - }; - let booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer):{}", - actor_id.as_simple() - )); - - let mut seen = RangeInclusiveMap::new(); - - for (change, src) in changes { - trace!("handling a single changeset: {change:?}"); - let seqs = change.seqs(); - if booked_write.contains_all(change.versions(), change.seqs()) { - trace!( - "previously unknown versions are now deemed known, aborting inserts" - ); - continue; - } + // let mut writers: BTreeMap = Default::default(); - let versions = change.versions(); - - // check if we've seen this version here... - if versions.clone().all(|version| match seqs { - Some(check_seqs) => match seen.get(&version) { - Some(known) => match known { - KnownDbVersion::Partial(PartialVersion { seqs, .. }) => { - check_seqs.clone().all(|seq| seqs.contains(&seq)) - } - KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, - }, - None => false, - }, - None => seen.contains_key(&version), - }) { - continue; - } + for (actor_id, changes) in unknown_changes { + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{}", + actor_id.as_simple() + )) + .ensure(actor_id) + }; + let booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer, unknown changes):{}", + actor_id.as_simple() + )); - // optimizing this, insert later! - let known = if change.is_complete() && change.is_empty() { - // we never want to block here - if let Err(e) = agent.tx_empty().try_send((actor_id, change.versions())) { - error!("could not send empty changed versions into channel: {e}"); - } + let mut seen = RangeInclusiveMap::new(); - KnownDbVersion::Cleared - } else { - if let Some(seqs) = change.seqs() { - if seqs.end() < seqs.start() { - warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); - continue; + for (change, src) in changes { + trace!("handling a single changeset: {change:?}"); + let seqs = change.seqs(); + if booked_write.contains_all(change.versions(), change.seqs()) { + trace!("previously unknown versions are now deemed known, aborting inserts"); + continue; + } + + let versions = change.versions(); + + // check if we've seen this version here... + if versions.clone().all(|version| match seqs { + Some(check_seqs) => match seen.get(&version) { + Some(known) => match known { + KnownDbVersion::Partial(PartialVersion { seqs, .. }) => { + check_seqs.clone().all(|seq| seqs.contains(&seq)) } + KnownDbVersion::Current { .. } | KnownDbVersion::Cleared => true, + }, + None => false, + }, + None => seen.contains_key(&version), + }) { + continue; + } + + // optimizing this, insert later! + let known = if change.is_complete() && change.is_empty() { + KnownDbVersion::Cleared + } else { + if let Some(seqs) = change.seqs() { + if seqs.end() < seqs.start() { + warn!(%actor_id, versions = ?change.versions(), "received an invalid change, seqs start is greater than seqs end: {seqs:?}"); + continue; } + } - let (known, versions) = match process_single_version( - &agent, - &tx, - last_db_version, - change, - ) { - Ok((known, changeset)) => { - let versions = changeset.versions(); - if let KnownDbVersion::Current(CurrentVersion { - db_version, .. - }) = &known - { - last_db_version = Some(*db_version); - changesets.push((actor_id, changeset, *db_version, src)); - } - (known, versions) - } - Err(e) => { - error!(%actor_id, ?versions, "could not process single change: {e}"); - continue; + let (known, versions) = match process_single_version( + &agent, + &tx, + last_db_version, + change, + ) { + Ok((known, changeset)) => { + let versions = changeset.versions(); + if let KnownDbVersion::Current(CurrentVersion { db_version, .. }) = + &known + { + last_db_version = Some(*db_version); + changesets.push((actor_id, changeset, *db_version, src)); } - }; - debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); - known + (known, versions) + } + Err(e) => { + error!(%actor_id, ?versions, "could not process single change: {e}"); + continue; + } }; + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "got known to insert: {known:?}"); + known + }; - seen.insert(versions.clone(), known.clone()); - knowns.entry(actor_id).or_default().push((versions, known)); - } + seen.insert(versions.clone(), known.clone()); + knowns.entry(actor_id).or_default().push((versions, known)); } + // if knowns.contains_key(&actor_id) { + // writers.insert(actor_id, booked_write); + // } } let mut count = 0; + let mut needed_changes = BTreeMap::new(); for (actor_id, knowns) in knowns.iter_mut() { debug!(%actor_id, self_actor_id = %agent.actor_id(), "processing {} knowns", knowns.len()); - for (versions, known) in knowns.iter_mut() { + + let mut all_versions = RangeInclusiveSet::new(); + + for (versions, known) in knowns.iter() { match known { - KnownDbVersion::Partial { .. } => { - continue; - } + KnownDbVersion::Partial { .. } => {} KnownDbVersion::Current(CurrentVersion { db_version, last_seq, @@ -1298,7 +1209,7 @@ pub async fn process_multiple_changes( let version = versions.start(); debug!(%actor_id, self_actor_id = %agent.actor_id(), %version, "inserting bookkeeping row db_version: {db_version}, ts: {ts:?}"); tx.prepare_cached(" - INSERT INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) + INSERT OR IGNORE INTO __corro_bookkeeping ( actor_id, start_version, db_version, last_seq, ts) VALUES (:actor_id, :start_version, :db_version, :last_seq, :ts);").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(*actor_id), version: Some(*version)})? .execute(named_params!{ ":actor_id": actor_id, @@ -1310,13 +1221,36 @@ pub async fn process_multiple_changes( } KnownDbVersion::Cleared => { debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); - if let Err(e) = agent.tx_empty().try_send((*actor_id, versions.clone())) { - error!("could not schedule version to be cleared: {e}"); - } + store_empty_changeset(&tx, *actor_id, versions.clone())?; } } + + all_versions.insert(versions.clone()); + debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserted bookkeeping row"); } + + let booked = { + bookie + .blocking_write(format!( + "process_multiple_changes(for_actor_blocking):{actor_id}", + )) + .ensure(*actor_id) + }; + let mut booked_write = booked.blocking_write(format!( + "process_multiple_changes(booked writer, during knowns):{actor_id}", + )); + + needed_changes.insert( + *actor_id, + booked_write + .insert_db(&tx, all_versions) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(*actor_id), + version: None, + })?, + ); } debug!("inserted {count} new changesets"); @@ -1340,22 +1274,24 @@ pub async fn process_multiple_changes( let booked = { bookie .blocking_write(format!( - "process_multiple_changes(for_actor_blocking):{}", - actor_id.as_simple() + "process_multiple_changes(for_actor_blocking):{actor_id}", )) .ensure(actor_id) }; let mut booked_write = booked.blocking_write(format!( - "process_multiple_changes(booked writer, post commit):{}", - actor_id.as_simple() + "process_multiple_changes(booked writer, before apply needed):{actor_id}", )); + if let Some(needed_changes) = needed_changes.remove(&actor_id) { + booked_write.apply_needed_changes(needed_changes); + } + for (versions, known) in knowns { let version = *versions.start(); - // this merges partial version seqs - if let Some(PartialVersion { seqs, last_seq, .. }) = - booked_write.insert_many(versions, known) - { + if let KnownDbVersion::Partial(partial) = known { + let PartialVersion { seqs, last_seq, .. } = + booked_write.insert_partial(version, partial); + let full_seqs_range = CrsqlSeq(0)..=last_seq; let gaps_count = seqs.gaps(&full_seqs_range).count(); if gaps_count == 0 { @@ -1378,7 +1314,7 @@ pub async fn process_multiple_changes( })?; let mut change_chunk_size = 0; - + for (_actor_id, changeset, db_version, _src) in changesets { change_chunk_size += changeset.changes().len(); agent diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 2781d543..076f345c 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -8,9 +8,7 @@ use std::time::{Duration, Instant}; use bytes::{BufMut, BytesMut}; use compact_str::format_compact; use corro_types::actor::ClusterId; -use corro_types::agent::{ - Agent, CurrentVersion, KnownDbVersion, KnownVersion, PartialVersion, SplitPool, -}; +use corro_types::agent::{Agent, PartialVersion, SplitPool}; use corro_types::base::{CrsqlSeq, Version}; use corro_types::broadcast::{ BiPayload, BiPayloadV1, ChangeSource, ChangeV1, Changeset, Timestamp, @@ -28,7 +26,7 @@ use metrics::counter; use quinn::{RecvStream, SendStream}; use rand::seq::SliceRandom; use rangemap::RangeInclusiveSet; -use rusqlite::{params, Connection}; +use rusqlite::{named_params, params, Connection, OptionalExtension}; use speedy::Writable; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{self, unbounded_channel, Sender}; @@ -357,61 +355,73 @@ fn handle_known_version( conn: &mut Connection, actor_id: ActorId, version: Version, - init_known: KnownDbVersion, + partial: Option, booked: &Booked, - seqs_needed: Vec>, - last_seq: CrsqlSeq, - ts: Timestamp, + mut seqs_needed: Vec>, sender: &Sender, ) -> eyre::Result<()> { - debug!(%actor_id, %version, "handle known version! known: {init_known:?}, seqs_needed: {seqs_needed:?}"); - let mut seqs_iter = seqs_needed.into_iter(); - while let Some(range_needed) = seqs_iter.by_ref().next() { - match &init_known { - KnownDbVersion::Current(CurrentVersion { db_version, .. }) => { - let bw = booked.blocking_write(format_compact!( - "sync_handle_known[{version}]:{}", - actor_id.as_simple() - )); - match bw.get(&version) { - Some(known) => { - // a current version cannot go back to a partial version - if known.is_cleared() { - debug!(%actor_id, %version, "in-memory bookkeeping has been cleared, aborting."); - break; - } - } - None => { - warn!(%actor_id, %version, "in-memory bookkeeping vanished, aborting."); - break; - } + debug!(%actor_id, %version, "handle known version! partial? {partial:?}"); + + match partial { + None => { + // this is a read transaction! + let tx = conn.transaction()?; + + let last_seq_ts: Option<(Option, Option)> = tx.prepare_cached("SELECT last_seq, ts FROM __corro_bookkeeping WHERE actor_id = :actor_id AND start_version = :version")?.query_row( + named_params! { + ":actor_id": actor_id, + ":version": version + }, + |row| Ok((row.get(0)?, row.get(1)?)) + ).optional()?; + + let (last_seq, ts) = match last_seq_ts { + None | Some((None, _)) | Some((_, None)) => { + // empty version! + // TODO: optimize by sending the full range found... + sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Empty { + versions: version..=version, + }, + })))?; + return Ok(()); } + Some((Some(last_seq), Some(ts))) => (last_seq, ts), + }; - // this is a read transaction! - let tx = conn.transaction()?; + if seqs_needed.is_empty() { + // no seqs provided, use 0..=last_seq + seqs_needed = vec![CrsqlSeq(0)..=last_seq]; + } + let mut seqs_iter = seqs_needed.into_iter(); + while let Some(range_needed) = seqs_iter.by_ref().next() { let mut prepped = tx.prepare_cached( r#" - SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl - FROM crsql_changes - WHERE site_id = ? - AND db_version = ? - AND seq >= ? AND seq <= ? - ORDER BY seq ASC - "#, + SELECT c."table", c.pk, c.cid, c.val, c.col_version, c.db_version, c.seq, c.site_id, c.cl + FROM __corro_bookkeeping AS bk + INNER JOIN crsql_changes AS c ON c.site_id = bk.actor_id AND c.db_version = bk.db_version + WHERE bk.actor_id = :actor_id + AND bk.start_version = :version + AND c.seq >= :start_seq AND c.seq <= :end_seq + ORDER BY c.seq ASC + "#, )?; let start_seq = range_needed.start(); let end_seq = range_needed.end(); let rows = prepped.query_map( - params![actor_id, db_version, start_seq, end_seq], + named_params! { + ":actor_id": actor_id, + ":version": version, + ":start_seq": start_seq, + ":end_seq": end_seq + }, row_to_change, )?; - // drop write lock! - drop(bw); - send_change_chunks( sender, ChunkedChanges::new(rows, *start_seq, *end_seq, MAX_CHANGES_BYTES_PER_MESSAGE), @@ -421,7 +431,15 @@ fn handle_known_version( ts, )?; } - KnownDbVersion::Partial(PartialVersion { seqs, .. }) => { + } + Some(PartialVersion { seqs, last_seq, .. }) => { + if seqs_needed.is_empty() { + // if no seqs provided, use 0..=last_seq + seqs_needed = vec![CrsqlSeq(0)..=last_seq]; + } + + let mut seqs_iter = seqs_needed.into_iter(); + while let Some(range_needed) = seqs_iter.by_ref().next() { let mut partial_seqs = seqs.clone(); let mut range_needed = range_needed.clone(); @@ -452,69 +470,63 @@ fn handle_known_version( "sync_handle_known(partial)[{version}]:{}", actor_id.as_simple() )); - let maybe_current_version = match bw.get(&version) { - Some(known) => match known { - KnownVersion::Partial(PartialVersion { seqs, .. }) => { - if seqs != &partial_seqs { - warn!(%actor_id, %version, "different partial sequences, updating! range_needed: {range_needed:?}"); - partial_seqs = seqs.clone(); - if let Some(new_start_seq) = last_sent_seq.take() { - range_needed = - (new_start_seq + 1)..=*range_needed.end(); - } - continue 'outer; - } - None - } - known @ KnownVersion::Current(_) => Some(known.into()), - KnownVersion::Cleared => { - debug!(%actor_id, %version, "in-memory bookkeeping has been cleared, aborting."); - break; + + let still_partial = if let Some(PartialVersion { seqs, .. }) = + bw.get_partial(&version) + { + if seqs != &partial_seqs { + warn!(%actor_id, %version, "different partial sequences, updating! range_needed: {range_needed:?}"); + partial_seqs = seqs.clone(); + if let Some(new_start_seq) = last_sent_seq.take() { + range_needed = (new_start_seq + 1)..=*range_needed.end(); } - }, - None => { - warn!(%actor_id, %version, "in-memory bookkeeping vanished!"); - break; + continue 'outer; } + true + } else { + // not a partial anymore + false }; - if let Some(current) = maybe_current_version { + if !still_partial { warn!(%actor_id, %version, "switched from partial to current version"); - // drop write lock - drop(bw); - - // restart the seqs_needed here! - let mut seqs_needed: Vec> = - seqs_iter.collect(); - if let Some(new_start_seq) = last_sent_seq.take() { - range_needed = (new_start_seq + 1)..=*range_needed.end(); - } - seqs_needed.insert(0, range_needed); - - return handle_known_version( - conn, - actor_id, - version, - current, - booked, - seqs_needed, - last_seq, - ts, - sender, - ); + return Ok(()); + // // drop write lock + // drop(bw); + + // // restart the seqs_needed here! + // let mut seqs_needed: Vec> = + // seqs_iter.collect(); + // if let Some(new_start_seq) = last_sent_seq.take() { + // range_needed = (new_start_seq + 1)..=*range_needed.end(); + // } + // seqs_needed.insert(0, range_needed); + + // return handle_known_version( + // conn, + // actor_id, + // version, + // None, + // booked, + // seqs_needed, + // sender, + // ); } // this is a read transaction! let tx = conn.transaction()?; // first check if we do have the sequence... - let still_exists: bool = tx.prepare_cached("SELECT EXISTS(SELECT 1 FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ? AND start_seq <= ? AND end_seq >= ?)")?.query_row(params![actor_id, version, start_seq, end_seq], |row| row.get(0))?; + let last_seq_ts: Option<(CrsqlSeq, Timestamp)> = tx.prepare_cached("SELECT last_seq, ts FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ? AND start_seq <= ? AND end_seq >= ?")?.query_row(params![actor_id, version, start_seq, end_seq], |row| Ok((row.get(0)?, row.get(1)?))).optional()?; - if !still_exists { - warn!(%actor_id, %version, "seqs bookkeeping indicated buffered changes were gone, aborting!"); - break 'outer; - } + let (last_seq, ts) = match last_seq_ts { + None => { + warn!(%actor_id, %version, "seqs bookkeeping indicated buffered changes were gone, aborting!"); + break 'outer; + } + Some(res) => res, + }; // the data is still valid because we just checked __corro_seq_bookkeeping and // we're in a transaction (of "read" type since the first select) @@ -557,57 +569,12 @@ fn handle_known_version( break; } } - KnownDbVersion::Cleared => unreachable!(), } } Ok(()) } -#[allow(clippy::too_many_arguments)] -async fn process_version( - pool: &SplitPool, - actor_id: ActorId, - version: Version, - known_version: KnownDbVersion, - booked: &Booked, - mut seqs_needed: Vec>, - sender: &Sender, -) -> eyre::Result<()> { - let mut conn = pool.read().await?; - - let (last_seq, ts) = { - let (last_seq, ts) = match &known_version { - KnownDbVersion::Current(CurrentVersion { last_seq, ts, .. }) => (*last_seq, *ts), - KnownDbVersion::Partial(PartialVersion { last_seq, ts, .. }) => (*last_seq, *ts), - KnownDbVersion::Cleared => return Ok(()), - }; - if seqs_needed.is_empty() { - seqs_needed = vec![(CrsqlSeq(0)..=last_seq)]; - } - - (last_seq, ts) - }; - - block_in_place(|| { - handle_known_version( - &mut conn, - actor_id, - version, - known_version, - booked, - seqs_needed, - last_seq, - ts, - sender, - ) - })?; - - trace!("done processing version: {version} for actor_id: {actor_id}"); - - Ok(()) -} - fn send_change_chunks>>( sender: &Sender, mut chunked: ChunkedChanges, @@ -625,16 +592,25 @@ fn send_change_chunks>>( Some(Ok((changes, seqs))) => { let start = Instant::now(); - sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { - actor_id, - changeset: Changeset::Full { - version, - changes, - seqs, - last_seq, - ts, - }, - })))?; + if changes.is_empty() && *seqs.start() == CrsqlSeq(0) && *seqs.end() == last_seq { + sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Empty { + versions: version..=version, + }, + })))?; + } else { + sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Full { + version, + changes, + seqs, + last_seq, + ts, + }, + })))?; + } let elapsed = start.elapsed(); @@ -685,7 +661,7 @@ async fn process_sync( 6, ); - let mut current_haves = vec![]; + let mut to_process = vec![]; let mut partial_needs = vec![]; loop { @@ -717,8 +693,6 @@ async fn process_sync( None => continue, }; - let mut cleared: RangeInclusiveSet = RangeInclusiveSet::new(); - { let read = booked.read("process_need(full)").await; @@ -726,90 +700,81 @@ async fn process_sync( match need { SyncNeedV1::Full { versions } => { for version in versions { - match read.get(&version) { - Some(KnownVersion::Cleared) => { - cleared.insert(version..=version); - } - Some(known) => { - current_haves.push((version, KnownDbVersion::from(known))); - } - None => continue, + if !read.contains_version(&version) { + continue; } + + to_process.push((version, read.get_partial(&version).cloned())); } } - SyncNeedV1::Partial { version, seqs } => match read.get(&version) { - Some(KnownVersion::Cleared) => { - cleared.insert(version..=version); - } - Some(known) => { - partial_needs.push((version, KnownDbVersion::from(known), seqs)); + SyncNeedV1::Partial { version, seqs } => { + if !read.contains_version(&version) { + continue; } - None => continue, - }, - } - } - } - for versions in cleared { - let sender = sender.clone(); - if job_tx - .send(Box::pin(async move { - sender - .send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { - actor_id, - changeset: Changeset::Empty { versions }, - }))) - .await - .map_err(eyre::Report::from) - })) - .is_err() - { - eyre::bail!("could not send into job channel"); + partial_needs.push(( + version, + read.get_partial(&version).cloned(), + seqs, + )); + } + } } } - for (version, known_version) in current_haves.drain(..) { + for (version, partial) in to_process.drain(..) { let pool = pool.clone(); let booked = booked.clone(); let sender = sender.clone(); - if job_tx - .send(Box::pin(async move { - process_version( - &pool, + let fut = Box::pin(async move { + let mut conn = pool.read().await?; + + block_in_place(|| { + handle_known_version( + &mut conn, actor_id, version, - known_version, + partial, &booked, vec![], &sender, ) - .await - })) - .is_err() - { + })?; + + trace!("done processing version: {version} for actor_id: {actor_id}"); + Ok(()) + }); + + if job_tx.send(fut).is_err() { eyre::bail!("could not send into job channel"); } } - for (version, known_version, seqs_needed) in partial_needs.drain(..) { + for (version, partial, seqs_needed) in partial_needs.drain(..) { let pool = pool.clone(); let booked = booked.clone(); let sender = sender.clone(); - if job_tx - .send(Box::pin(async move { - process_version( - &pool, + + let fut = Box::pin(async move { + let mut conn = pool.read().await?; + + block_in_place(|| { + handle_known_version( + &mut conn, actor_id, version, - known_version, + partial, &booked, seqs_needed, &sender, ) - .await - })) - .is_err() - { + })?; + + trace!("done processing version: {version} for actor_id: {actor_id}"); + Ok(()) + }); + + if job_tx.send(fut).is_err() { eyre::bail!("could not send into job channel"); } } @@ -1262,6 +1227,13 @@ pub async fn parallel_sync( count += changes_len; counter!("corro.sync.changes.recv", "actor_id" => actor_id.to_string()) .increment(changes_len as u64); + + debug!( + "handling versions: {:?}, seqs: {:?}, len: {changes_len}", + change.versions(), + change.seqs() + ); + tx_changes .send((change, ChangeSource::Sync)) .await @@ -1562,7 +1534,6 @@ mod tests { }; use hyper::StatusCode; use tempfile::TempDir; - use tokio::sync::mpsc; use tripwire::Tripwire; use crate::{ @@ -1661,25 +1632,13 @@ mod tests { ) .await?; - let known1 = KnownDbVersion::Current(CurrentVersion { - db_version: CrsqlDbVersion(1), - last_seq: CrsqlSeq(0), - ts, - }); - - let known2 = KnownDbVersion::Current(CurrentVersion { - db_version: CrsqlDbVersion(2), - last_seq: CrsqlSeq(0), // original last seq - ts, - }); - let booked = bookie.read("test").await.get(&actor_id).cloned().unwrap(); { let read = booked.read("test").await; - assert_eq!(KnownDbVersion::from(read.get(&Version(1)).unwrap()), known1); - assert_eq!(KnownDbVersion::from(read.get(&Version(2)).unwrap()), known2); + assert!(read.contains_version(&Version(1))); + assert!(read.contains_version(&Version(2))); } { @@ -1705,11 +1664,9 @@ mod tests { &mut conn, actor_id, Version(1), - known1, + None, &booked, vec![CrsqlSeq(0)..=CrsqlSeq(0)], - CrsqlSeq(0), - ts, &tx, ) })?; @@ -1734,11 +1691,9 @@ mod tests { &mut conn, actor_id, Version(2), - known2, + None, &booked, vec![CrsqlSeq(0)..=CrsqlSeq(0)], - CrsqlSeq(0), - ts, &tx, ) })?; @@ -1759,13 +1714,79 @@ mod tests { ); } - // make_broadcastable_change(&agent, |tx| { - // tx.execute("INSERT INTO test (id, text) VALUES (1, \"one\")", []) - // })?; + let change3 = Change { + table: TableName("tests".into()), + pk: pack_columns(&vec![1i64.into()])?, + cid: ColumnName("text".into()), + val: "one override".into(), + col_version: 2, + db_version: CrsqlDbVersion(3), + seq: CrsqlSeq(0), + site_id: actor_id.to_bytes(), + cl: 1, + }; + + process_multiple_changes( + agent.clone(), + bookie.clone(), + vec![( + ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: Version(3), + changes: vec![change3.clone()], + seqs: CrsqlSeq(0)..=CrsqlSeq(0), + last_seq: CrsqlSeq(0), + ts, + }, + }, + ChangeSource::Sync, + Instant::now(), + )], + ) + .await?; - // make_broadcastable_change(&agent, |tx| { - // tx.execute("INSERT INTO test (id, text) VALUES (2, \"two\")", []) - // })?; + { + let (tx, mut rx) = mpsc::channel(5); + let mut conn = agent.pool().read().await?; + + { + let mut prepped = conn.prepare("SELECT * FROM crsql_changes;")?; + let mut rows = prepped.query([])?; + + loop { + let row = rows.next()?; + if row.is_none() { + break; + } + + println!("ROW: {row:?}"); + } + } + + block_in_place(|| { + handle_known_version( + &mut conn, + actor_id, + Version(1), + None, + &booked, + vec![CrsqlSeq(0)..=CrsqlSeq(0)], + &tx, + ) + })?; + + let msg = rx.recv().await.unwrap(); + assert_eq!( + msg, + SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Empty { + versions: Version(1)..=Version(1) + } + })) + ); + } Ok(()) } diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index f6a78bb0..cee14a4f 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -7,12 +7,12 @@ use axum::{response::IntoResponse, Extension}; use bytes::{BufMut, BytesMut}; use compact_str::ToCompactString; use corro_types::{ - agent::{Agent, ChangeError, CurrentVersion, KnownDbVersion}, + agent::{Agent, ChangeError}, api::{ row_to_change, ColumnName, ExecResponse, ExecResult, QueryEvent, Statement, TableStatRequest, TableStatResponse, }, - base::{CrsqlDbVersion, CrsqlSeq}, + base::{CrsqlDbVersion, CrsqlSeq, Version}, broadcast::{ChangeV1, Changeset, Timestamp}, change::{ChunkedChanges, SqliteValue, MAX_CHANGES_BYTE_SIZE}, schema::{apply_schema, parse_sql}, @@ -39,7 +39,7 @@ pub mod pubsub; pub async fn make_broadcastable_changes( agent: &Agent, f: F, -) -> Result<(T, Duration), ChangeError> +) -> Result<(T, Option, Duration), ChangeError> where F: Fn(&Transaction) -> Result, { @@ -104,7 +104,7 @@ where actor_id: Some(actor_id), version: None, })?; - return Ok((ret, start.elapsed())); + return Ok((ret, None, start.elapsed())); } let last_version = book_writer.last().unwrap_or_default(); @@ -126,7 +126,9 @@ where version: Some(version), })?; - let elapsed = { + let versions = version..=version; + + let (elapsed, needed_changes) = { tx.prepare_cached( r#" INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) @@ -153,24 +155,26 @@ where debug!(%actor_id, %version, %db_version, "inserted local bookkeeping row!"); + let needed_changes = + book_writer + .insert_db(&tx, [versions].into()) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; + tx.commit().map_err(|source| ChangeError::Rusqlite { source, actor_id: Some(actor_id), version: Some(version), })?; - start.elapsed() + (start.elapsed(), needed_changes) }; trace!("committed tx, db_version: {db_version}, last_seq: {last_seq:?}"); - book_writer.insert( - version, - KnownDbVersion::Current(CurrentVersion { - db_version, - last_seq, - ts, - }), - ); + book_writer.apply_needed_changes(needed_changes); drop(book_writer); let agent = agent.clone(); @@ -237,7 +241,7 @@ where Ok::<_, eyre::Report>(()) }); - Ok::<_, ChangeError>((ret, elapsed)) + Ok::<_, ChangeError>((ret, Some(version), elapsed)) }) } @@ -285,6 +289,7 @@ pub async fn api_v1_transactions( error: "at least 1 statement is required".into(), }], time: 0.0, + version: None, }), ); } @@ -317,7 +322,7 @@ pub async fn api_v1_transactions( }) .await; - let (results, elapsed) = match res { + let (results, version, elapsed) = match res { Ok(res) => res, Err(e) => { error!("could not execute statement(s): {e}"); @@ -328,6 +333,7 @@ pub async fn api_v1_transactions( error: e.to_string(), }], time: 0.0, + version: None, }), ); } @@ -338,6 +344,7 @@ pub async fn api_v1_transactions( axum::Json(ExecResponse { results, time: elapsed.as_secs_f64(), + version, }), ) } @@ -634,6 +641,7 @@ pub async fn api_v1_db_schema( error: "at least 1 statement is required".into(), }], time: 0.0, + version: None, }), ); } @@ -649,6 +657,7 @@ pub async fn api_v1_db_schema( error: e.to_string(), }], time: 0.0, + version: None, }), ); } @@ -658,6 +667,7 @@ pub async fn api_v1_db_schema( axum::Json(ExecResponse { results: vec![], time: start.elapsed().as_secs_f64(), + version: None, }), ) } diff --git a/crates/corro-api-types/src/lib.rs b/crates/corro-api-types/src/lib.rs index 95e8872f..b6ec6be3 100644 --- a/crates/corro-api-types/src/lib.rs +++ b/crates/corro-api-types/src/lib.rs @@ -7,7 +7,7 @@ use std::{ }; use compact_str::CompactString; -use corro_base_types::{CrsqlDbVersion, CrsqlSeq}; +use corro_base_types::{CrsqlDbVersion, CrsqlSeq, Version}; use rusqlite::{ types::{FromSql, FromSqlError, ToSqlOutput, Value, ValueRef}, Row, ToSql, @@ -210,6 +210,7 @@ impl From<&str> for Statement { pub struct ExecResponse { pub results: Vec, pub time: f64, + pub version: Option, } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/corro-devcluster/src/main.rs b/crates/corro-devcluster/src/main.rs index a9129947..8ae131ed 100644 --- a/crates/corro-devcluster/src/main.rs +++ b/crates/corro-devcluster/src/main.rs @@ -212,7 +212,7 @@ struct BinHandle { path: String, } -fn nix_output(vec: &Vec) -> Vec> { +fn nix_output(vec: &[u8]) -> Vec> { serde_json::from_slice(vec).unwrap() } @@ -253,6 +253,6 @@ fn build_corrosion() -> Option { .get("outputs")? .get("out")? .to_string() - .replace("\"", ""), + .replace('"', ""), }) } diff --git a/crates/corro-devcluster/src/topology/mod.rs b/crates/corro-devcluster/src/topology/mod.rs index 894fad9a..f2abf51d 100644 --- a/crates/corro-devcluster/src/topology/mod.rs +++ b/crates/corro-devcluster/src/topology/mod.rs @@ -19,7 +19,7 @@ impl Simple { /// B -> C /// A -> C /// etc - pub fn parse_edge<'top, 'input>(&mut self, input: &'input str) -> IResult<&'input str, ()> { + pub fn parse_edge<'input>(&mut self, input: &'input str) -> IResult<&'input str, ()> { let (input, first) = alpha1(input)?; let (input, _) = delimited(multispace0, tag("->"), multispace0)(input)?; let (input, second) = alpha1(input)?; diff --git a/crates/corro-pg/src/lib.rs b/crates/corro-pg/src/lib.rs index 63a82bd0..7dbc5332 100644 --- a/crates/corro-pg/src/lib.rs +++ b/crates/corro-pg/src/lib.rs @@ -14,7 +14,7 @@ use bytes::Buf; use chrono::NaiveDateTime; use compact_str::CompactString; use corro_types::{ - agent::{Agent, CurrentVersion, KnownDbVersion}, + agent::Agent, base::{CrsqlDbVersion, CrsqlSeq}, broadcast::{BroadcastInput, BroadcastV1, ChangeV1, Changeset, Timestamp}, change::{row_to_change, ChunkedChanges, MAX_CHANGES_BYTE_SIZE}, @@ -2160,18 +2160,13 @@ impl Session { debug!(%actor_id, %version, %db_version, "inserted local bookkeeping row!"); + let needed_changes = book_writer.insert_db(conn, [version..=version].into())?; + conn.execute_batch("COMMIT")?; trace!("committed tx, db_version: {db_version}, last_seq: {last_seq:?}"); - book_writer.insert( - version, - KnownDbVersion::Current(CurrentVersion { - db_version, - last_seq, - ts, - }), - ); + book_writer.apply_needed_changes(needed_changes); drop(book_writer); diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index 315de6fa..e9f7b210 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -18,7 +18,7 @@ use indexmap::IndexMap; use metrics::{gauge, histogram}; use parking_lot::RwLock; use rangemap::RangeInclusiveSet; -use rusqlite::{Connection, Transaction}; +use rusqlite::{named_params, Connection, Transaction}; use serde::{Deserialize, Serialize}; use tokio::sync::{ AcquireError, OwnedRwLockWriteGuard as OwnedTokioRwLockWriteGuard, OwnedSemaphorePermit, @@ -30,7 +30,7 @@ use tokio::{ sync::{oneshot, Semaphore}, }; use tokio_util::sync::{CancellationToken, DropGuard}; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use tripwire::Tripwire; use crate::{ @@ -63,7 +63,6 @@ pub struct AgentConfig { pub tx_bcast: CorroSender, pub tx_apply: CorroSender<(ActorId, Version)>, - pub tx_empty: CorroSender<(ActorId, RangeInclusive)>, pub tx_clear_buf: CorroSender<(ActorId, RangeInclusive)>, pub tx_changes: CorroSender<(ChangeV1, ChangeSource)>, pub tx_foca: CorroSender, @@ -90,7 +89,6 @@ pub struct AgentInner { booked: Booked, tx_bcast: CorroSender, tx_apply: CorroSender<(ActorId, Version)>, - tx_empty: CorroSender<(ActorId, RangeInclusive)>, tx_clear_buf: CorroSender<(ActorId, RangeInclusive)>, tx_changes: CorroSender<(ChangeV1, ChangeSource)>, tx_foca: CorroSender, @@ -120,7 +118,6 @@ impl Agent { booked: config.booked, tx_bcast: config.tx_bcast, tx_apply: config.tx_apply, - tx_empty: config.tx_empty, tx_clear_buf: config.tx_clear_buf, tx_changes: config.tx_changes, tx_foca: config.tx_foca, @@ -180,10 +177,6 @@ impl Agent { &self.0.tx_changes } - pub fn tx_empty(&self) -> &CorroSender<(ActorId, RangeInclusive)> { - &self.0.tx_empty - } - pub fn tx_clear_buf(&self) -> &CorroSender<(ActorId, RangeInclusive)> { &self.0.tx_clear_buf } @@ -252,11 +245,27 @@ pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> { Box::new(create_corro_subs as fn(&Transaction) -> rusqlite::Result<()>), Box::new(refactor_corro_members as fn(&Transaction) -> rusqlite::Result<()>), Box::new(crsqlite_v0_16_migration as fn(&Transaction) -> rusqlite::Result<()>), + Box::new(create_bookkeeping_gaps as fn(&Transaction) -> rusqlite::Result<()>), ]; crate::sqlite::migrate(conn, migrations) } +fn create_bookkeeping_gaps(tx: &Transaction) -> rusqlite::Result<()> { + tx.execute_batch( + r#" + -- store known needed versions + CREATE TABLE IF NOT EXISTS __corro_bookkeeping_gaps ( + actor_id BLOB NOT NULL, + start INTEGER NOT NULL, + end INTEGER NOT NULL, + + PRIMARY KEY (actor_id, start) + ) WITHOUT ROWID; + "#, + ) +} + // since crsqlite 0.16, site_id is NOT NULL in clock tables // also sets the new 'merge-equal-values' config to true. fn crsqlite_v0_16_migration(tx: &Transaction) -> rusqlite::Result<()> { @@ -467,6 +476,8 @@ pub enum ChangeError { actor_id: Option, version: Option, }, + #[error("non-contiguous empties range delete")] + NonContiguousDelete, } #[derive(Debug, thiserror::Error)] @@ -645,19 +656,6 @@ impl DerefMut for WriteConn { } } -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum KnownDbVersion { - Partial(PartialVersion), - Current(CurrentVersion), - Cleared, -} - -impl KnownDbVersion { - pub fn is_cleared(&self) -> bool { - matches!(self, KnownDbVersion::Cleared) - } -} - pub struct CountedTokioRwLock { registry: LockRegistry, lock: Arc>, @@ -995,16 +993,6 @@ impl Drop for LockTracker { } } -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct CurrentVersion { - // cr-sqlite db version - pub db_version: CrsqlDbVersion, - // actual last sequence originally produced - pub last_seq: CrsqlSeq, - // timestamp when the change was produced by the source - pub ts: Timestamp, -} - #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct PartialVersion { // range of sequences recorded @@ -1015,74 +1003,82 @@ pub struct PartialVersion { pub ts: Timestamp, } -impl From for KnownDbVersion { - fn from(partial: PartialVersion) -> Self { - KnownDbVersion::Partial(partial) +impl PartialVersion { + pub fn is_complete(&self) -> bool { + self.seqs.gaps(&self.full_range()).count() == 0 } -} -#[derive(Debug)] -pub enum KnownVersion<'a> { - Cleared, - Current(&'a CurrentVersion), - Partial(&'a PartialVersion), + pub fn full_range(&self) -> RangeInclusive { + CrsqlSeq(1)..=self.last_seq + } } -impl<'a> KnownVersion<'a> { - pub fn is_cleared(&self) -> bool { - matches!(self, KnownVersion::Cleared) - } +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct CurrentVersion { + pub db_version: CrsqlDbVersion, + pub last_seq: CrsqlSeq, + pub ts: Timestamp, } -impl<'a> From> for KnownDbVersion { - fn from(value: KnownVersion<'a>) -> Self { - match value { - KnownVersion::Cleared => KnownDbVersion::Cleared, - KnownVersion::Current(current) => KnownDbVersion::Current(current.clone()), - KnownVersion::Partial(partial) => KnownDbVersion::Partial(partial.clone()), - } - } +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum KnownDbVersion { + Cleared, + Current(CurrentVersion), + Partial(PartialVersion), } -#[derive(Default, Clone)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct BookedVersions { - pub cleared: RangeInclusiveSet, - pub current: BTreeMap, + actor_id: ActorId, pub partials: BTreeMap, - sync_need: RangeInclusiveSet, - last: Option, + needed: RangeInclusiveSet, + max: Option, } impl BookedVersions { + pub fn new(actor_id: ActorId) -> Self { + Self { + actor_id, + partials: Default::default(), + needed: Default::default(), + max: Default::default(), + } + } + pub fn from_conn(conn: &Connection, actor_id: ActorId) -> rusqlite::Result { - let mut bv = Self::default(); - let mut prepped = conn.prepare_cached( - "SELECT start_version, end_version, db_version, last_seq, ts FROM __corro_bookkeeping WHERE actor_id = ?", - )?; + let mut bv = BookedVersions::new(actor_id); + + // fetch the sync's needed version gaps + let mut prepped = conn + .prepare_cached("SELECT start, end FROM __corro_bookkeeping_gaps WHERE actor_id = ?")?; let mut rows = prepped.query([actor_id])?; + let mut changes = NeededChanges::default(); + loop { let row = rows.next()?; match row { None => break, Some(row) => { let start_v = row.get(0)?; - let end_v: Option = row.get(1)?; - bv.insert_many( - start_v..=end_v.unwrap_or(start_v), - match row.get(2)? { - Some(db_version) => KnownDbVersion::Current(CurrentVersion { - db_version, - last_seq: row.get(3)?, - ts: row.get(4)?, - }), - None => KnownDbVersion::Cleared, - }, - ); + let end_v = row.get(1)?; + + changes.insert_set.insert(start_v..=end_v); } } } + // fetch the biggest version we know, a partial version might override + // this below + bv.max = conn + .prepare_cached( + "SELECT MAX(start_version) FROM __corro_bookkeeping WHERE actor_id = ?", + )? + .query_row([actor_id], |row| row.get(0))?; + + bv.apply_needed_changes(changes); + + // fetch known partial sequences let mut prepped = conn.prepare_cached( "SELECT version, start_seq, end_seq, last_seq, ts FROM __corro_seq_bookkeeping WHERE site_id = ?", )?; @@ -1092,18 +1088,19 @@ impl BookedVersions { let row = rows.next()?; match row { None => break, - Some(row) => match bv.partials.entry(row.get(0)?) { - std::collections::btree_map::Entry::Vacant(entry) => { - entry.insert(PartialVersion { + Some(row) => { + let version = row.get(0)?; + bv.max = std::cmp::max(Some(version), bv.max); + // NOTE: use normal insert logic to have a consistent behavior + bv.insert_partial( + version, + PartialVersion { seqs: RangeInclusiveSet::from_iter(vec![row.get(1)?..=row.get(2)?]), last_seq: row.get(3)?, ts: row.get(4)?, - }); - } - std::collections::btree_map::Entry::Occupied(mut entry) => { - entry.get_mut().seqs.insert(row.get(1)?..=row.get(2)?); - } - }, + }, + ); + } } } @@ -1111,28 +1108,28 @@ impl BookedVersions { } pub fn contains_version(&self, version: &Version) -> bool { - self.cleared.contains(version) - || self.current.contains_key(version) - || self.partials.contains_key(version) + // corrosion knows about a version if... + + // it's not in the list of needed versions + !self.needed.contains(version) && + // and the last known version is bigger than the requested version + self.max.unwrap_or_default() >= *version + // we don't need to look at partials because if we have a partial + // then it fulfills the previous conditions } - pub fn get(&self, version: &Version) -> Option { - self.cleared - .get(version) - .map(|_| KnownVersion::Cleared) - .or_else(|| self.current.get(version).map(KnownVersion::Current)) - .or_else(|| self.partials.get(version).map(KnownVersion::Partial)) + pub fn get_partial(&self, version: &Version) -> Option<&PartialVersion> { + self.partials.get(version) } pub fn contains(&self, version: Version, seqs: Option<&RangeInclusive>) -> bool { self.contains_version(&version) && seqs - .map(|check_seqs| match self.get(&version) { - Some(KnownVersion::Cleared) | Some(KnownVersion::Current(_)) => true, - Some(KnownVersion::Partial(partial)) => { - check_seqs.clone().all(|seq| partial.seqs.contains(&seq)) - } - None => false, + .map(|check_seqs| match self.partials.get(&version) { + Some(partial) => check_seqs.clone().all(|seq| partial.seqs.contains(&seq)), + // if `contains_version` is true but we don't have a partial version, + // then we must have it as a fully applied or cleared version + None => true, }) .unwrap_or(true) } @@ -1145,81 +1142,223 @@ impl BookedVersions { versions.all(|version| self.contains(version, seqs)) } - pub fn contains_current(&self, version: &Version) -> bool { - self.current.contains_key(version) + pub fn last(&self) -> Option { + self.max } - pub fn current_versions(&self) -> BTreeMap { - self.current - .iter() - .map(|(version, current)| (current.db_version, *version)) - .collect() - } + pub fn apply_needed_changes(&mut self, mut changes: NeededChanges) { + for range in std::mem::take(&mut changes.remove_ranges) { + self.max = std::cmp::max(self.max, Some(*range.end())); + for version in range.clone() { + self.partials.remove(&version); + } + self.needed.remove(range); + } - pub fn last(&self) -> Option { - self.last + for range in std::mem::take(&mut changes.insert_set) { + self.max = std::cmp::max(self.max, Some(*range.end())); + self.needed.insert(range); + } } - pub fn insert(&mut self, version: Version, known_version: KnownDbVersion) { - self.insert_many(version..=version, known_version); + // used when the commit has succeeded + pub fn insert_partial(&mut self, version: Version, partial: PartialVersion) -> PartialVersion { + debug!(actor_id = %self.actor_id, "insert partial {version:?}"); + + match self.partials.entry(version) { + btree_map::Entry::Vacant(entry) => entry.insert(partial).clone(), + btree_map::Entry::Occupied(mut entry) => { + let got = entry.get_mut(); + got.seqs.extend(partial.seqs); + got.clone() + } + } } - pub fn insert_many( - &mut self, - versions: RangeInclusive, - known_version: KnownDbVersion, - ) -> Option { - let ret = match known_version { - KnownDbVersion::Partial(partial) => { - Some(match self.partials.entry(*versions.start()) { - btree_map::Entry::Vacant(entry) => entry.insert(partial).clone(), - btree_map::Entry::Occupied(mut entry) => { - let got = entry.get_mut(); - got.seqs.extend(partial.seqs); - got.clone() - } - }) + pub fn insert_db( + &mut self, // only because we want 1 mt a time here + conn: &Connection, // usually a `Transaction` + versions: RangeInclusiveSet, + ) -> rusqlite::Result { + debug!("wants to insert into db {versions:?}"); + debug!("needed: {:?}", self.needed); + + let mut changes = NeededChanges::default(); + for versions in versions { + changes.remove_ranges.push(versions.clone()); + + let overlapping = self.needed.overlapping(&versions); + + for range in overlapping { + debug!(actor_id = %self.actor_id, "overlapping: {range:?}"); + changes.insert_set.insert(range.clone()); + changes.remove_ranges.push(range.clone()); } - KnownDbVersion::Current(current) => { - let version = *versions.start(); - self.partials.remove(&version); - self.current.insert(version, current); - None + + // reproducing the rangemap collapsing logic + if let Some(range) = self.needed.get(&Version(versions.start().0 - 1)) { + debug!(actor_id = %self.actor_id, "got a start - 1: {range:?}"); + changes.insert_set.insert(range.clone()); + changes.remove_ranges.push(range.clone()); } - KnownDbVersion::Cleared => { - for version in versions.clone() { - self.partials.remove(&version); - self.current.remove(&version); + + // reproducing the rangemap collapsing logic + if let Some(range) = self.needed.get(&Version(versions.end().0 + 1)) { + debug!(actor_id = %self.actor_id, "got a end + 1: {range:?}"); + changes.insert_set.insert(range.clone()); + changes.remove_ranges.push(range.clone()); + } + + // either a max or 0 + // TODO: figure out if we want to use 0 instead of None in the struct by default + let current_max = self.max.unwrap_or_default(); + + if current_max + 1 < *versions.start() { + let start = current_max + 1; + + let range = start..=*versions.start(); + debug!("inserting gap between max + 1 and start: {range:?}"); + changes.insert_set.insert(range.clone()); + for range in self.needed.overlapping(&range) { + changes.insert_set.insert(range.clone()); + changes.remove_ranges.push(range.clone()); } - self.cleared.insert(versions.clone()); - None } - }; - // update last known version - let old_last = self - .last - .replace(std::cmp::max( - *versions.end(), - self.last.unwrap_or_default(), - )) - .unwrap_or_default(); - - if old_last < *versions.start() { - // add these as needed! - self.sync_need.insert((old_last + 1)..=*versions.start()); + // we now know the applied versions + changes.insert_set.remove(versions.clone()); + } + + debug!(actor_id = %self.actor_id, "delete: {:?}", changes.remove_ranges); + debug!(actor_id = %self.actor_id, "new: {:?}", changes.insert_set); + + for range in changes.remove_ranges.iter() { + debug!(actor_id = %self.actor_id, "deleting {range:?}"); + conn + .prepare_cached("DELETE FROM __corro_bookkeeping_gaps WHERE actor_id = :actor_id AND start = :start AND end = :end")? + .execute(named_params! { + ":actor_id": self.actor_id, + ":start": range.start(), + ":end": range.end() + })?; } - self.sync_need.remove(versions); + for range in changes.insert_set.iter() { + debug!(actor_id = %self.actor_id, "inserting {range:?}"); + let res = conn + .prepare_cached( + "INSERT INTO __corro_bookkeeping_gaps VALUES (:actor_id, :start, :end)", + )? + .execute(named_params! { + ":actor_id": self.actor_id, + ":start": range.start(), + ":end": range.end() + }); + + if let Err(e) = res { + let (actor_id, start, end) : (ActorId, Version, Version) = conn.query_row("SELECT actor_id, start, end FROM __corro_bookkeeping_gaps WHERE actor_id = :actor_id AND start = :start", named_params! { + ":actor_id": self.actor_id, + ":start": range.start(), + }, |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?; + + warn!("already had gaps entry! actor_id: {actor_id}, start: {start}, end: {end}"); + + return Err(e); + } + } - ret + // // gather all ranges that might include a version we've applied + // let mut new_ranges: RangeInclusiveSet = conn + // .prepare_cached( + // "DELETE FROM __corro_bookkeeping_gaps + // WHERE actor_id = :actor_id AND + // ( + // -- :start of checked range is between any start and end in db + // -- S-----(:start)-----E + // ( :start BETWEEN start AND end ) OR + + // -- :end of checked range is between any start and end in db + // -- S------(:end)------E + // ( :end BETWEEN start AND end ) OR + + // -- checked range encompasses full range + // -- (:start) ... S------E ... (:end) + // ( :start <= start AND :end >= end ) OR + + // -- start = end + 1 (to collapse ranges) + // ( start = :end + 1 ) OR + + // -- end = start - 1 (to collapse ranges) + // ( end = :start - 1 ) + // ) + // RETURNING start, end", + // )? + // .query_map( + // named_params![ + // ":actor_id": self.actor_id, + // ":start": versions.start(), + // ":end": versions.end(), + // ], + // |row| Ok(row.get(0)?..=row.get(1)?), + // )? + // .collect::>()?; + + // let current_last = self.last.unwrap_or_default(); + // // this could be 0 < 1 and could create a weird range of `1..=1` even if we just + // // inserted 1, but that should be fixed by the remove afterwards + // if current_last < *versions.start() { + // // last max version was smaller than the start of the range here + // // this means there's now a gap! + // new_ranges.insert((current_last + 1)..=*versions.start()); + // } + + // // remove the inserted versions from these ranges + // // NOTE: load bearing + // new_ranges.remove(versions.clone()); + + // // insert new versions ranges + // for range in new_ranges.clone() { + // conn.prepare_cached( + // " + // INSERT INTO __corro_bookkeeping_gaps (actor_id, start, end) + // VALUES (?, ?, ?) + // ON CONFLICT (actor_id, start) + // DO UPDATE SET + // end = excluded.end + // WHERE excluded.end > end -- this isn't really possible, + // -- but if it does happen, we want to + // -- err on the side of caution + // ", + // )? + // .execute(params![self.actor_id, range.start(), range.end()])?; + // } + + Ok(changes) + } + + pub fn needed(&self) -> &RangeInclusiveSet { + &self.needed } +} - pub fn sync_need(&self) -> &RangeInclusiveSet { - &self.sync_need - } +#[derive(Debug, Default)] +pub struct NeededChanges { + insert_set: RangeInclusiveSet, + remove_ranges: Vec>, } +// // this struct must be drained! +// impl Drop for NeededChanges { +// fn drop(&mut self) { +// if !self.insert_set.is_empty() { +// panic!("NeededChanges: did not properly drain new ranges"); +// } +// if !self.remove_ranges.is_empty() { +// panic!("NeededChanges: did not properly drain new ranges"); +// } +// } +// } + pub type BookedInner = Arc>; #[derive(Clone)] @@ -1286,7 +1425,7 @@ impl BookieInner { .or_insert_with(|| { Booked(Arc::new(CountedTokioRwLock::new( self.registry.clone(), - Default::default(), + BookedVersions::new(actor_id), ))) }) .clone() @@ -1364,3 +1503,168 @@ impl Bookie { self.0.registry() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_booked_insert_db() -> rusqlite::Result<()> { + _ = tracing_subscriber::fmt::try_init(); + + let mut conn = CrConn::init(Connection::open_in_memory()?)?; + migrate(&mut conn)?; + + let actor_id = ActorId::default(); + let mut bv = BookedVersions::new(actor_id); + + let mut all = RangeInclusiveSet::new(); + + insert_everywhere(&conn, &mut bv, &mut all, Version(1)..=Version(20))?; + expect_gaps(&conn, &bv, &all, vec![])?; + + insert_everywhere(&conn, &mut bv, &mut all, Version(1)..=Version(10))?; + expect_gaps(&conn, &bv, &all, vec![])?; + + // try from an empty state again + let mut bv = BookedVersions::new(actor_id); + let mut all = RangeInclusiveSet::new(); + + // insert a non-1 first version + insert_everywhere(&conn, &mut bv, &mut all, Version(5)..=Version(20))?; + expect_gaps(&conn, &bv, &all, vec![Version(1)..=Version(4)])?; + + // insert a further change that does not overlap a gap + insert_everywhere(&conn, &mut bv, &mut all, Version(6)..=Version(7))?; + expect_gaps(&conn, &bv, &all, vec![Version(1)..=Version(4)])?; + + // insert a further change that does overlap a gap + insert_everywhere(&conn, &mut bv, &mut all, Version(3)..=Version(7))?; + expect_gaps(&conn, &bv, &all, vec![Version(1)..=Version(2)])?; + + insert_everywhere(&conn, &mut bv, &mut all, Version(1)..=Version(2))?; + expect_gaps(&conn, &bv, &all, vec![])?; + + insert_everywhere(&conn, &mut bv, &mut all, Version(25)..=Version(25))?; + expect_gaps(&conn, &bv, &all, vec![Version(21)..=Version(24)])?; + + insert_everywhere(&conn, &mut bv, &mut all, Version(30)..=Version(35))?; + expect_gaps( + &conn, + &bv, + &all, + vec![Version(21)..=Version(24), Version(26)..=Version(29)], + )?; + + // NOTE: overlapping partially from the end + + insert_everywhere(&conn, &mut bv, &mut all, Version(19)..=Version(22))?; + expect_gaps( + &conn, + &bv, + &all, + vec![Version(23)..=Version(24), Version(26)..=Version(29)], + )?; + + // NOTE: overlapping partially from the start + + insert_everywhere(&conn, &mut bv, &mut all, Version(24)..=Version(25))?; + expect_gaps( + &conn, + &bv, + &all, + vec![Version(23)..=Version(23), Version(26)..=Version(29)], + )?; + + // NOTE: overlapping 2 ranges + + insert_everywhere(&conn, &mut bv, &mut all, Version(23)..=Version(27))?; + expect_gaps(&conn, &bv, &all, vec![Version(28)..=Version(29)])?; + + // NOTE: ineffective insert of already known ranges + + insert_everywhere(&conn, &mut bv, &mut all, Version(1)..=Version(20))?; + expect_gaps(&conn, &bv, &all, vec![Version(28)..=Version(29)])?; + + // NOTE: overlapping no ranges, but encompassing a full range + + insert_everywhere(&conn, &mut bv, &mut all, Version(27)..=Version(30))?; + expect_gaps(&conn, &bv, &all, vec![])?; + + // NOTE: touching multiple ranges, partially + + // create gap 36..=39 + insert_everywhere(&conn, &mut bv, &mut all, Version(40)..=Version(45))?; + // create gap 46..=49 + insert_everywhere(&conn, &mut bv, &mut all, Version(50)..=Version(55))?; + + insert_everywhere(&conn, &mut bv, &mut all, Version(38)..=Version(47))?; + expect_gaps( + &conn, + &bv, + &all, + vec![Version(36)..=Version(37), Version(48)..=Version(49)], + )?; + + // test loading a bv from the conn, they should be identical! + let mut bv2 = BookedVersions::from_conn(&conn, actor_id)?; + // manually set the last version because there's nothing in `__corro_bookkeeping` + bv2.max = Some(Version(55)); + + assert_eq!(bv, bv2); + + Ok(()) + } + + fn insert_everywhere( + conn: &Connection, + bv: &mut BookedVersions, + all_versions: &mut RangeInclusiveSet, + versions: RangeInclusive, + ) -> rusqlite::Result<()> { + all_versions.insert(versions.clone()); + let changes = bv.insert_db(conn, RangeInclusiveSet::from([versions]))?; + bv.apply_needed_changes(changes); + Ok(()) + } + + fn expect_gaps( + conn: &Connection, + bv: &BookedVersions, + all_versions: &RangeInclusiveSet, + expected: Vec>, + ) -> rusqlite::Result<()> { + let gaps: Vec<(ActorId, Version, Version)> = conn + .prepare_cached("SELECT actor_id, start, end FROM __corro_bookkeeping_gaps")? + .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))? + .collect::, _>>()?; + + assert_eq!( + gaps, + expected + .clone() + .into_iter() + .map(|expected| (bv.actor_id, *expected.start(), *expected.end())) + .collect::>() + ); + + for range in all_versions.iter() { + assert!(bv.contains_all(range.clone(), None)); + } + + for range in expected { + for v in range { + assert!(!bv.contains(v, None), "expected not to contain {v}"); + assert!(bv.needed.contains(&v), "expected needed to contain {v}"); + } + } + + assert_eq!( + bv.max, + all_versions.iter().last().map(|range| *range.end()), + "expected last version not to increment" + ); + + Ok(()) + } +} diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index d1eac889..85d3737b 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -180,7 +180,7 @@ impl Changeset { pub fn len(&self) -> usize { match self { - Changeset::Empty { .. } => 0, + Changeset::Empty { .. } => 0, //(versions.end().0 - versions.start().0 + 1) as usize, Changeset::Full { changes, .. } => changes.len(), } } @@ -232,7 +232,7 @@ pub enum TimestampParseError { Parse(ParseNTP64Error), } -#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd)] #[serde(transparent)] pub struct Timestamp(pub NTP64); @@ -251,6 +251,13 @@ impl Timestamp { } } +// formatting to humantime and then parsing again incurs oddness, so lets compare secs and subsec_nanos +impl PartialEq for Timestamp { + fn eq(&self, other: &Self) -> bool { + self.0.as_secs() == other.0.as_secs() && self.0.subsec_nanos() == other.0.subsec_nanos() + } +} + impl Deref for Timestamp { type Target = NTP64; @@ -375,6 +382,7 @@ impl Runtime for DispatchRuntime { } _ => {} }; + if let Err(e) = self.notifications.try_send(notification) { counter!("corro.channel.error", "type" => "full", "name" => "dispatch.notifications") .increment(1); diff --git a/crates/corro-types/src/config.rs b/crates/corro-types/src/config.rs index c3cd7f2b..89158b3d 100644 --- a/crates/corro-types/src/config.rs +++ b/crates/corro-types/src/config.rs @@ -7,7 +7,7 @@ pub const DEFAULT_GOSSIP_PORT: u16 = 4001; const DEFAULT_GOSSIP_IDLE_TIMEOUT: u32 = 30; const fn default_apply_queue() -> usize { - 600 + 100 } /// Used for the apply channel diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 685cb00a..0a406120 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -300,14 +300,19 @@ pub async fn generate_sync(bookie: &Bookie, actor_id: ActorId) -> SyncStateV1 { Some(v) => v, }; - let need: Vec<_> = bookedr.sync_need().iter().cloned().collect(); + let need: Vec<_> = bookedr.needed().iter().cloned().collect(); if !need.is_empty() { state.need.insert(actor_id, need); } { - for (v, partial) in bookedr.partials.iter() { + for (v, partial) in bookedr + .partials + .iter() + // don't set partial if it is effectively complete + .filter(|(_, partial)| !partial.is_complete()) + { state.partial_need.entry(actor_id).or_default().insert( *v, partial diff --git a/crates/corrosion/Cargo.toml b/crates/corrosion/Cargo.toml index 346883db..1445410a 100644 --- a/crates/corrosion/Cargo.toml +++ b/crates/corrosion/Cargo.toml @@ -50,6 +50,7 @@ tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } tripwire = { path = "../tripwire" } uuid = { workspace = true } +shell-words = "1.1.0" [build-dependencies] build-info-build = { workspace = true } diff --git a/crates/corrosion/src/command/agent.rs b/crates/corrosion/src/command/agent.rs index 71e72310..436373f1 100644 --- a/crates/corrosion/src/command/agent.rs +++ b/crates/corrosion/src/command/agent.rs @@ -49,8 +49,8 @@ pub async fn run(config: Config, config_path: &Utf8PathBuf) -> eyre::Result<()> .expect("could not start agent"); corro_admin::start_server( - agent, - bookie, + agent.clone(), + bookie.clone(), AdminConfig { listen_path: config.admin.uds_path.clone(), config_path: config_path.clone(), diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index 043fc3ad..b69b98e6 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -1,7 +1,7 @@ use std::{ net::{IpAddr, SocketAddr}, path::PathBuf, - time::Duration, + time::{Duration, Instant}, }; use admin::AdminConn; @@ -487,6 +487,40 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> { conn.send_command(corro_admin::Command::CompactEmpties) .await?; } + + Command::Db(DbCommand::Lock { cmd }) => { + let config = match cli.config() { + Ok(config) => config, + Err(_e) => { + eyre::bail!( + "path to current database is required via the config file passed as --config" + ); + } + }; + + let db_path = &config.db.path; + info!("Opening DB file at {db_path}"); + let mut db_file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(db_path)?; + + info!("Acquiring lock..."); + let start = Instant::now(); + let _lock = sqlite3_restore::lock_all(&mut db_file, db_path, Duration::from_secs(30))?; + info!("Lock acquired after {:?}", start.elapsed()); + + info!("Launching command {cmd}"); + let mut splitted_cmd = shell_words::split(cmd.as_str())?; + let exit = std::process::Command::new(splitted_cmd.remove(0)) + .args(splitted_cmd) + .spawn()? + .wait()?; + + info!("Exited with code: {:?}", exit.code()); + std::process::exit(exit.code().unwrap_or(1)); + } } Ok(()) @@ -650,6 +684,10 @@ enum Command { /// Clear overwritten versions CompactEmpties, + + /// DB-related commands + #[command(subcommand)] + Db(DbCommand), } #[derive(Subcommand)] @@ -725,3 +763,9 @@ enum TlsClientCommand { ca_cert: Utf8PathBuf, }, } + +#[derive(Subcommand)] +enum DbCommand { + /// Acquires the lock on the DB + Lock { cmd: String }, +} diff --git a/crates/sqlite3-restore/src/lib.rs b/crates/sqlite3-restore/src/lib.rs index 76cb134b..692677b3 100644 --- a/crates/sqlite3-restore/src/lib.rs +++ b/crates/sqlite3-restore/src/lib.rs @@ -137,7 +137,7 @@ fn copy_check(src: &mut File, dst: &mut File, len: u64) -> Result<(), Error> { Ok(()) } -enum Locked { +pub enum Locked { Wal(File), Other, } @@ -148,7 +148,7 @@ impl Locked { } } -fn lock_all>( +pub fn lock_all>( db_file: &mut File, db_path: P, timeout: Duration,