From a82563d0d1b352f03cf6bfc16d99e452cd125533 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Tue, 11 Jun 2024 00:46:34 +0100 Subject: [PATCH 1/3] send empties during sync --- crates/corro-agent/src/agent/error.rs | 2 +- crates/corro-agent/src/agent/handlers.rs | 144 ++++++++++- crates/corro-agent/src/agent/run_root.rs | 8 + crates/corro-agent/src/agent/setup.rs | 23 +- crates/corro-agent/src/agent/tests.rs | 177 ++++++++++++-- crates/corro-agent/src/agent/util.rs | 108 +++++++-- crates/corro-agent/src/api/peer.rs | 250 ++++++++++++++----- crates/corro-agent/src/broadcast/mod.rs | 3 +- crates/corro-types/src/agent.rs | 203 ++++++++++++++-- crates/corro-types/src/broadcast.rs | 31 ++- crates/corro-types/src/change.rs | 24 +- crates/corro-types/src/pubsub.rs | 11 +- crates/corro-types/src/sqlite.rs | 5 +- crates/corro-types/src/sync.rs | 31 ++- crates/corrosion/src/command/consul/sync.rs | 253 +++++++++++++------- crates/tripwire/src/preempt.rs | 5 +- integration-tests/src/lib.rs | 1 + 17 files changed, 1038 insertions(+), 241 deletions(-) diff --git a/crates/corro-agent/src/agent/error.rs b/crates/corro-agent/src/agent/error.rs index 21d38603..a4ca9c2f 100644 --- a/crates/corro-agent/src/agent/error.rs +++ b/crates/corro-agent/src/agent/error.rs @@ -4,8 +4,8 @@ use corro_types::{ sqlite::SqlitePoolError, sync::{SyncMessageDecodeError, SyncMessageEncodeError}, }; -use tokio::time::error::Elapsed; use hyper::StatusCode; +use tokio::time::error::Elapsed; #[derive(Debug, thiserror::Error)] pub enum SyncClientError { diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 02787245..e4d7e4c1 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -3,6 +3,9 @@ //! This module is _big_ and maybe should be split up further. use std::collections::BTreeMap; +use std::collections::HashMap; +use std::ops::RangeInclusive; + use std::{ cmp, collections::VecDeque, @@ -26,7 +29,11 @@ use corro_types::{ }; use bytes::Bytes; +use corro_types::agent::ChangeError; use corro_types::base::Version; +use corro_types::broadcast::Timestamp; +use corro_types::change::store_empty_changeset; +use corro_types::sync::get_last_cleared_ts; use foca::Notification; use indexmap::IndexMap; use metrics::{counter, gauge, histogram}; @@ -392,7 +399,7 @@ pub fn spawn_handle_db_cleanup(pool: SplitPool) { // determine the estimated resource cost of processing a change fn processing_cost(change: &Changeset) -> usize { - if change.is_empty() { + if change.is_empty() && !change.is_empty_set() { let versions = change.versions(); cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20) } else { @@ -400,6 +407,133 @@ fn processing_cost(change: &Changeset) -> usize { } } +/// Handle incoming emptyset received during syncs +/// +pub async fn handle_emptyset( + agent: Agent, + bookie: Bookie, + mut rx_emptysets: CorroReceiver, + mut tripwire: Tripwire, +) { + // maybe bigger timeout? + let mut max_wait = tokio::time::interval(Duration::from_millis( + agent.config().perf.apply_queue_timeout as u64, + )); + let max_changes_chunk: usize = agent.config().perf.apply_queue_len; + + let mut buf = HashMap::new(); + let mut cost = 0; + + loop { + let mut process = false; + tokio::select! { + maybe_change_src = rx_emptysets.recv() => match maybe_change_src { + Some(change) => { + if let Changeset::EmptySet { versions, ts } = change.changeset { + buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts)); + cost += versions.len(); + } else { + warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id); + } + + if cost >= max_changes_chunk && !buf.is_empty() { + process = true; + } + }, + None => break, + }, + _ = max_wait.tick() => { + process = true + }, + _ = &mut tripwire => { + break; + } + } + + if process { + for (actor, changes) in &mut buf { + while !changes.is_empty() { + let change = changes.pop_front().unwrap(); + match process_emptyset(agent.clone(), bookie.clone(), *actor, &change).await { + Ok(()) => { + cost -= change.0.len(); + } + Err(e) => { + warn!("encountered error when processing emptyset - {e}"); + changes.push_front(change); + break; + } + } + } + } + } + } + + println!("shutting down handle empties loop"); +} + +pub async fn process_emptyset( + agent: Agent, + bookie: Bookie, + actor_id: ActorId, + changes: &(Vec>, Timestamp), +) -> Result<(), ChangeError> { + let (versions, ts) = changes; + let mut conn = agent.pool().write_low().await?; + + let booked = { + bookie + .write(format!( + "process_emptyset(booked writer, updates timestamp):{actor_id}", + )) + .await + .ensure(actor_id) + }; + let mut booked_write = booked + .write(format!( + "process_emptyset(booked writer, updates timestamp):{actor_id}" + )) + .await; + let mut snap = booked_write.snapshot(); + + let tx = conn + .immediate_transaction() + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + + counter!("corro.sync.empties.count", "actor" => format!("{}", actor_id.to_string())) + .increment(versions.len() as u64); + debug!(self_actor_id = %agent.actor_id(), "processing emptyset changes, len: {}", versions.len()); + for version in versions { + store_empty_changeset(&tx, actor_id, version.clone(), *ts)?; + } + + snap.insert_db(&tx, RangeInclusiveSet::from_iter(versions.clone())) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + snap.update_cleared_ts(&tx, *ts) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + + tx.commit().map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + booked_write.commit_snapshot(snap); + + Ok(()) +} + /// Bundle incoming changes to optimise transaction sizes with SQLite /// /// *Performance tradeoff*: introduce latency (with a max timeout) to @@ -698,8 +832,14 @@ pub async fn handle_sync( return Ok(()); } + let mut last_cleared: HashMap> = HashMap::new(); + + for (actor_id, _) in chosen.clone() { + last_cleared.insert(actor_id, get_last_cleared_ts(&bookie, &actor_id).await); + } + let start = Instant::now(); - let n = match parallel_sync(agent, transport, chosen.clone(), sync_state).await { + let n = match parallel_sync(agent, transport, chosen.clone(), sync_state, last_cleared).await { Ok(n) => n, Err(e) => { error!("failed to execute parallel sync: {e:?}"); diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index b2bc237b..470ef1d1 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -45,6 +45,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul rx_apply, rx_clear_buf, rx_changes, + rx_emptyset, rx_foca, subs_manager, subs_bcast_cache, @@ -217,5 +218,12 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul tripwire.clone(), )); + spawn_counted(handlers::handle_emptyset( + agent.clone(), + bookie.clone(), + rx_emptyset, + tripwire.clone(), + )); + Ok(bookie) } diff --git a/crates/corro-agent/src/agent/setup.rs b/crates/corro-agent/src/agent/setup.rs index e3643934..601fb6ef 100644 --- a/crates/corro-agent/src/agent/setup.rs +++ b/crates/corro-agent/src/agent/setup.rs @@ -52,6 +52,7 @@ pub struct AgentOptions { pub rx_apply: CorroReceiver<(ActorId, Version)>, pub rx_clear_buf: CorroReceiver<(ActorId, RangeInclusive)>, pub rx_changes: CorroReceiver<(ChangeV1, ChangeSource)>, + pub rx_emptyset: CorroReceiver, pub rx_foca: CorroReceiver, pub rtt_rx: TokioReceiver<(SocketAddr, Duration)>, pub subs_manager: SubsManager, @@ -84,9 +85,16 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age let pool = SplitPool::create(&conf.db.path, write_sema.clone()).await?; + let clock = Arc::new( + uhlc::HLCBuilder::default() + .with_id(actor_id.try_into().unwrap()) + .with_max_delta(Duration::from_millis(300)) + .build(), + ); + let schema = { let mut conn = pool.write_priority().await?; - migrate(&mut conn)?; + migrate(clock.clone(), &mut conn)?; let mut schema = init_schema(&conn)?; schema.constrain()?; @@ -138,15 +146,9 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age } let api_addr = api_listeners.first().unwrap().local_addr()?; - let clock = Arc::new( - uhlc::HLCBuilder::default() - .with_id(actor_id.try_into().unwrap()) - .with_max_delta(Duration::from_millis(300)) - .build(), - ); - let (tx_bcast, rx_bcast) = bounded(conf.perf.bcast_channel_len, "bcast"); let (tx_changes, rx_changes) = bounded(conf.perf.changes_channel_len, "changes"); + let (tx_emptyset, rx_emptyset) = bounded(conf.perf.changes_channel_len, "emptyset"); let (tx_foca, rx_foca) = bounded(conf.perf.foca_channel_len, "foca"); let lock_registry = LockRegistry::default(); @@ -162,7 +164,8 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age async move { let conn = pool.read().await?; *booked.deref_mut().deref_mut() = - tokio::task::block_in_place(|| BookedVersions::from_conn(&conn, actor_id))?; + tokio::task::block_in_place(|| BookedVersions::from_conn(&conn, actor_id)) + .expect("loading BookedVersions from db failed"); Ok::<_, eyre::Report>(()) } }); @@ -176,6 +179,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age rx_apply, rx_clear_buf, rx_changes, + rx_emptyset, rx_foca, rtt_rx, subs_manager: subs_manager.clone(), @@ -197,6 +201,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age tx_apply, tx_clear_buf, tx_changes, + tx_emptyset, tx_foca, write_sema, schema: RwLock::new(schema), diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index c4dcaf94..86e1c640 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -32,7 +32,9 @@ use crate::{ }; use corro_tests::*; use corro_types::agent::Agent; +use corro_types::broadcast::Timestamp; use corro_types::change::Change; +use corro_types::sync::get_last_cleared_ts; use corro_types::{ actor::ActorId, agent::migrate, @@ -661,12 +663,14 @@ async fn large_tx_sync() -> eyre::Result<()> { let ta3_transport = Transport::new(&ta3.agent.config().gossip, rtt_tx.clone()).await?; let ta4_transport = Transport::new(&ta4.agent.config().gossip, rtt_tx.clone()).await?; + println!("starting sync!?"); for _ in 0..6 { let res = parallel_sync( &ta2.agent, &ta2_transport, vec![(ta1.agent.actor_id(), ta1.agent.gossip_addr())], generate_sync(&ta2.bookie, ta2.agent.actor_id()).await, + HashMap::new(), ) .await?; @@ -680,6 +684,7 @@ async fn large_tx_sync() -> eyre::Result<()> { (ta2.agent.actor_id(), ta2.agent.gossip_addr()), ], generate_sync(&ta3.bookie, ta3.agent.actor_id()).await, + HashMap::new(), ) .await?; @@ -693,6 +698,7 @@ async fn large_tx_sync() -> eyre::Result<()> { (ta2.agent.actor_id(), ta2.agent.gossip_addr()), ], generate_sync(&ta4.bookie, ta4.agent.actor_id()).await, + HashMap::new(), ) .await?; @@ -772,6 +778,127 @@ struct TestRecord { text: String, } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_clear_empty_versions() -> eyre::Result<()> { + _ = tracing_subscriber::fmt::try_init(); + + let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple(); + let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + let ta2 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + + let (rtt_tx, _rtt_rx) = mpsc::channel(1024); + let ta2_transport = Transport::new(&ta2.agent.config().gossip, rtt_tx.clone()).await?; + // setup the schema, for both nodes + let (status_code, _body) = api_v1_db_schema( + Extension(ta1.agent.clone()), + axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), + ) + .await; + + assert_eq!(status_code, StatusCode::OK); + + let (status_code, _body) = api_v1_db_schema( + Extension(ta2.agent.clone()), + axum::Json(vec![corro_tests::TEST_SCHEMA.into()]), + ) + .await; + assert_eq!(status_code, StatusCode::OK); + + // make about 50 transactions to ta1 + insert_rows(ta1.agent.clone(), 1, 50).await; + // send them all + let rows = get_rows(ta1.agent.clone(), vec![(Version(1)..=Version(50), None)]).await?; + process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?; + + // overwrite different version ranges + insert_rows(ta1.agent.clone(), 1, 5).await; + insert_rows(ta1.agent.clone(), 10, 10).await; + insert_rows(ta1.agent.clone(), 23, 25).await; + insert_rows(ta1.agent.clone(), 30, 31).await; + + let rows = get_rows( + ta1.agent.clone(), + vec![ + (Version(51)..=Version(55), None), + (Version(56)..=Version(56), None), + (Version(57)..=Version(59), None), + (Version(60)..=Version(60), None), + ], + ) + .await?; + process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?; + check_bookie_versions( + ta2.clone(), + ta1.agent.actor_id(), + vec![Version(1)..=Version(50)], + vec![], + vec![], + vec![], + ) + .await?; + + let mut last_cleared: HashMap> = HashMap::new(); + last_cleared.insert( + ta1.agent.actor_id(), + get_last_cleared_ts(&ta2.bookie, &ta1.agent.actor_id()).await, + ); + + println!("got last cleared - {last_cleared:?}"); + + // initiate sync with ta1 to get cleared + let res = parallel_sync( + &ta2.agent, + &ta2_transport, + vec![(ta1.agent.actor_id(), ta1.agent.gossip_addr())], + generate_sync(&ta2.bookie, ta2.agent.actor_id()).await, + last_cleared, + ) + .await?; + + println!("ta2 synced {res}"); + + sleep(Duration::from_secs(2)).await; + + check_bookie_versions( + ta2.clone(), + ta1.agent.actor_id(), + vec![], + vec![], + vec![], + vec![ + Version(1)..=Version(5), + Version(10)..=Version(10), + Version(23)..=Version(25), + Version(30)..=Version(31), + ], + ) + .await?; + + // ta2 should have ta1's last cleared + let ta1_cleared = ta1 + .agent + .booked() + .read("test_clear_empty") + .await + .last_cleared_ts(); + let ta2_ta1_cleared = ta2 + .bookie + .write("test") + .await + .ensure(ta1.agent.actor_id()) + .read("test_clear_empty") + .await + .last_cleared_ts(); + + assert_eq!(ta1_cleared, ta2_ta1_cleared); + + tripwire_tx.send(()).await.ok(); + tripwire_worker.await; + wait_for_all_pending_handles().await; + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_process_multiple_changes() -> eyre::Result<()> { _ = tracing_subscriber::fmt::try_init(); @@ -1005,7 +1132,7 @@ async fn check_bookie_versions( for versions in cleared { assert!(conn.prepare_cached( "SELECT EXISTS (SELECT 1 FROM __corro_bookkeeping WHERE actor_id = ? and start_version = ? and end_version = ?)")? - .query_row((actor_id, versions.start(), versions.end()), |row| row.get(0))?); + .query_row((actor_id, versions.start(), versions.end()), |row| row.get(0))?, "Versions {versions:?} not cleared in corro bookkeeping table"); } Ok(()) @@ -1020,6 +1147,16 @@ async fn get_rows( let conn = agent.pool().read().await?; for versions in v { for version in versions.0 { + let count: u64 = conn.query_row( + "SELECT COUNT(*) FROM crsql_changes where db_version = ?", + [version], + |row| row.get(0), + )?; + let mut last = 4; + // count will be zero for cleared versions + if count > 0 { + last = count - 1; + } let mut query = r#"SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl FROM crsql_changes where db_version = ?"# @@ -1038,7 +1175,7 @@ async fn get_rows( changes = prepped .query_map([version], row_to_change)? .collect::, _>>()?; - CrsqlSeq(0)..=CrsqlSeq(3) + CrsqlSeq(0)..=CrsqlSeq(last) }; result.push(( @@ -1048,7 +1185,7 @@ async fn get_rows( version, changes, seqs, - last_seq: CrsqlSeq(3), + last_seq: CrsqlSeq(last), ts: agent.clock().new_timestamp().into(), }, }, @@ -1219,7 +1356,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { let mut conn = CrConn::init(rusqlite::Connection::open_in_memory()?)?; corro_types::sqlite::setup_conn(&conn)?; - migrate(&mut conn)?; + migrate(Default::default(), &mut conn)?; let actor_id = ActorId(uuid::Uuid::new_v4()); @@ -1239,7 +1376,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(1)..=Version(2))?, + store_empty_changeset(&tx, actor_id, Version(1)..=Version(2), Default::default())?, 1 ); tx.commit()?; @@ -1276,7 +1413,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(5)..=Version(7))?, + store_empty_changeset(&tx, actor_id, Version(5)..=Version(7), Default::default())?, 1 ); tx.commit()?; @@ -1324,7 +1461,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(3)..=Version(6))?, + store_empty_changeset(&tx, actor_id, Version(3)..=Version(6), Default::default())?, 1 ); tx.commit()?; @@ -1363,7 +1500,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(1)..=Version(10))?, + store_empty_changeset(&tx, actor_id, Version(1)..=Version(10), Default::default())?, 1 ); tx.commit()?; @@ -1407,7 +1544,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(1)..=Version(11))?, + store_empty_changeset(&tx, actor_id, Version(1)..=Version(11), Default::default())?, 1 ); tx.commit()?; @@ -1456,7 +1593,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(14)..=Version(14))?, + store_empty_changeset(&tx, actor_id, Version(14)..=Version(14), Default::default())?, 1 ); tx.commit()?; @@ -1516,7 +1653,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(12)..=Version(14))?, + store_empty_changeset(&tx, actor_id, Version(12)..=Version(14), Default::default())?, 1 ); tx.commit()?; @@ -1559,7 +1696,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(15)..=Version(15))?, + store_empty_changeset(&tx, actor_id, Version(15)..=Version(15), Default::default())?, 1 ); tx.commit()?; @@ -1593,7 +1730,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(15)..=Version(23))?, + store_empty_changeset(&tx, actor_id, Version(15)..=Version(23), Default::default())?, 1 ); tx.commit()?; @@ -1633,7 +1770,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(15)..=Version(23))?, + store_empty_changeset(&tx, actor_id, Version(15)..=Version(23), Default::default())?, 1 ); tx.commit()?; @@ -1687,7 +1824,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(15)..=Version(23))?, + store_empty_changeset(&tx, actor_id, Version(15)..=Version(23), Default::default())?, 1 ); tx.commit()?; @@ -1745,7 +1882,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(1)..=Version(24))?, + store_empty_changeset(&tx, actor_id, Version(1)..=Version(24), Default::default())?, 1 ); tx.commit()?; @@ -1767,7 +1904,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(26)..=Version(27))?, + store_empty_changeset(&tx, actor_id, Version(26)..=Version(27), Default::default())?, 1 ); tx.commit()?; @@ -1820,7 +1957,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(1)..=Version(29))?, + store_empty_changeset(&tx, actor_id, Version(1)..=Version(29), Default::default())?, 1 ); tx.commit()?; @@ -1829,7 +1966,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(40)..=Version(45))?, + store_empty_changeset(&tx, actor_id, Version(40)..=Version(45), Default::default())?, 1 ); tx.commit()?; @@ -1838,7 +1975,7 @@ fn test_store_empty_changeset() -> eyre::Result<()> { { let tx = conn.transaction()?; assert_eq!( - store_empty_changeset(&tx, actor_id, Version(35)..=Version(37))?, + store_empty_changeset(&tx, actor_id, Version(35)..=Version(37), Default::default())?, 1 ); tx.commit()?; diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index d398d35e..ca91c622 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -43,6 +43,7 @@ use axum::{ routing::{get, post}, BoxError, Extension, Router, TypedHeader, }; +use corro_types::broadcast::Timestamp; use foca::Member; use futures::FutureExt; use hyper::{server::conn::AddrIncoming, StatusCode}; @@ -469,6 +470,7 @@ pub fn process_single_version( let (known, changeset) = if changeset.is_complete() { let (known, changeset) = process_complete_version( + agent.clone(), tx, actor_id, last_db_version, @@ -628,8 +630,12 @@ pub async fn process_fully_buffered_changes( debug!(%actor_id, %version, "inserted bookkeeping row after buffered insert"); } else { - store_empty_changeset(&tx, actor_id, version..=version)?; - + store_empty_changeset( + &tx, + actor_id, + version..=version, + Timestamp::from(agent.clock().new_timestamp()), + )?; debug!(%actor_id, %version, "inserted CLEARED bookkeeping row after buffered insert"); }; @@ -648,12 +654,37 @@ pub async fn process_fully_buffered_changes( version: Some(version), })?; + let mut last_cleared: Option = None; for (actor_id, versions_set) in overwritten { + if actor_id != agent.actor_id() { + warn!("clearing empties for another actor: {actor_id}") + } + let ts = Timestamp::from(agent.clock().new_timestamp()); for versions in versions_set { - store_empty_changeset(&tx, actor_id, versions)?; + let inserted = store_empty_changeset(&tx, actor_id, versions, ts)?; + if inserted > 0 { + last_cleared = Some(ts); + } } } + let mut agent_booked = { + agent + .booked() + .blocking_write("process_fully_buffered_changes(get snapshot)") + }; + + let mut agent_snap = agent_booked.snapshot(); + if let Some(ts) = last_cleared { + agent_snap + .update_cleared_ts(&tx, ts) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; + } + tx.commit().map_err(|source| ChangeError::Rusqlite { source, actor_id: Some(actor_id), @@ -661,6 +692,7 @@ pub async fn process_fully_buffered_changes( })?; bookedw.commit_snapshot(snap); + agent_booked.commit_snapshot(agent_snap); Ok::<_, ChangeError>(true) })?; @@ -684,16 +716,21 @@ pub async fn process_multiple_changes( histogram!("corro.agent.changes.queued.seconds").record(queued_at.elapsed()); let versions = change.versions(); let seqs = change.seqs(); + if !seen.insert((change.actor_id, versions, seqs.cloned())) { continue; } - if bookie - .write(format!( - "process_multiple_changes(ensure):{}", - change.actor_id.as_simple() - )) - .await - .ensure(change.actor_id) + + let booked_writer = { + bookie + .write(format!( + "process_multiple_changes(ensure):{}", + change.actor_id.as_simple() + )) + .await + .ensure(change.actor_id) + }; + if booked_writer .read(format!( "process_multiple_changes(contains?):{}", change.actor_id.as_simple() @@ -748,6 +785,7 @@ pub async fn process_multiple_changes( for (change, src) in changes { trace!("handling a single changeset: {change:?}"); let seqs = change.seqs(); + let ts = change.ts(); if booked_write.contains_all(change.versions(), change.seqs()) { trace!("previously unknown versions are now deemed known, aborting inserts"); continue; @@ -802,7 +840,10 @@ pub async fn process_multiple_changes( }; seen.insert(versions.clone(), known.clone()); - knowns.entry(actor_id).or_default().push((versions, known)); + knowns + .entry(actor_id) + .or_default() + .push((versions, ts, known)); } // if knowns.contains_key(&actor_id) { // writers.insert(actor_id, booked_write); @@ -817,7 +858,7 @@ pub async fn process_multiple_changes( let mut all_versions = RangeInclusiveSet::new(); - for (versions, known) in knowns.iter() { + for (versions, ts, known) in knowns.iter() { match known { KnownDbVersion::Partial { .. } => {} KnownDbVersion::Current(CurrentVersion { @@ -841,7 +882,8 @@ pub async fn process_multiple_changes( } KnownDbVersion::Cleared => { debug!(%actor_id, self_actor_id = %agent.actor_id(), ?versions, "inserting CLEARED bookkeeping"); - store_empty_changeset(&tx, *actor_id, versions.clone())?; + let ts = ts.unwrap_or(Timestamp::from(agent.clock().new_timestamp())); + store_empty_changeset(&tx, *actor_id, versions.clone(), ts)?; } } @@ -888,18 +930,43 @@ pub async fn process_multiple_changes( version: None, })?; + let mut last_cleared: Option = None; for (actor_id, versions_set) in overwritten { + if actor_id != agent.actor_id() { + warn!("clearing and setting timestamp for empties from a different node"); + } for versions in versions_set { - store_empty_changeset(&tx, actor_id, versions)?; + let ts = Timestamp::from(agent.clock().new_timestamp()); + let inserted = store_empty_changeset(&tx, actor_id, versions, ts)?; + if inserted > 0 { + last_cleared = Some(ts); + } } } + let mut booked_writer = { + agent + .booked() + .blocking_write("process_multiple_changes(update_cleared_ts)") + }; + let mut snap = booked_writer.snapshot(); + if let Some(ts) = last_cleared { + snap.update_cleared_ts(&tx, ts) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; + } + tx.commit().map_err(|source| ChangeError::Rusqlite { source, actor_id: None, version: None, })?; + booked_writer.commit_snapshot(snap); + for (_, changeset, _, _) in changesets.iter() { if let Some(ts) = changeset.ts() { let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration(); @@ -925,7 +992,7 @@ pub async fn process_multiple_changes( booked_write.commit_snapshot(snap); } - for (versions, known) in knowns { + for (versions, _, known) in knowns { let version = *versions.start(); if let KnownDbVersion::Partial(partial) = known { let PartialVersion { seqs, last_seq, .. } = @@ -1095,8 +1162,9 @@ pub fn process_incomplete_version( })) } -#[tracing::instrument(skip(tx, last_db_version, parts), err)] +#[tracing::instrument(skip(agent, tx, last_db_version, parts), err)] pub fn process_complete_version( + agent: Agent, tx: &Transaction, actor_id: ActorId, last_db_version: Option, @@ -1176,7 +1244,13 @@ pub fn process_complete_version( } let (known_version, new_changeset) = if impactful_changeset.is_empty() { - (KnownDbVersion::Cleared, Changeset::Empty { versions }) + ( + KnownDbVersion::Cleared, + Changeset::Empty { + versions, + ts: Some(Timestamp::from(agent.clock().new_timestamp())), + }, + ) } else { // TODO: find a way to avoid this... let db_version: CrsqlDbVersion = tx diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 88b040d1..6c5df4fd 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -24,9 +24,10 @@ use itertools::Itertools; use metrics::counter; use quinn::{RecvStream, SendStream}; use rand::seq::SliceRandom; -use rangemap::RangeInclusiveSet; +use rangemap::{RangeInclusiveMap, RangeInclusiveSet}; use rusqlite::{named_params, Connection}; use speedy::Writable; +use std::string::String; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::{self, unbounded_channel, Sender}; use tokio::task::block_in_place; @@ -349,13 +350,15 @@ const ADAPT_CHUNK_SIZE_THRESHOLD: Duration = Duration::from_millis(500); #[allow(clippy::too_many_arguments)] fn handle_need( conn: &mut Connection, + agent: &Agent, actor_id: ActorId, need: SyncNeedV1, sender: &Sender, + last_cleared_ts: Option, ) -> eyre::Result<()> { debug!(%actor_id, "handle known versions! need: {need:?}"); - let mut empties = RangeInclusiveSet::new(); + let mut empties: RangeInclusiveMap = RangeInclusiveMap::new(); // this is a read transaction! let tx = conn.transaction()?; @@ -426,6 +429,7 @@ fn handle_need( let version: Version = row.get(0)?; let end_version: Option = row.get(1)?; + let ts: Timestamp = row.get(3)?; if let Some(end_version) = end_version { // cleared versions! @@ -436,7 +440,7 @@ fn handle_need( // pick the smallest end between ours and the versions let end_version = cmp::min(end_version, *versions.end()); - empties.insert(start_version..=end_version); + empties.insert(start_version..=end_version, ts); // we have now processed this range! unprocessed.remove(start_version..=end_version); @@ -448,7 +452,6 @@ fn handle_need( // since this is not a cleared version, those aren't supposed to fail! let last_seq: CrsqlSeq = row.get(2)?; - let ts: Timestamp = row.get(3)?; let mut prepped = tx.prepare_cached( r#" @@ -477,7 +480,7 @@ fn handle_need( last_seq, ts, )? { - empties.insert(empty..=empty); + empties.insert(empty..=empty, ts); } } @@ -535,10 +538,13 @@ fn handle_need( } if !empties.is_empty() { - for versions in empties { + for (versions, ts) in empties { sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { actor_id, - changeset: Changeset::Empty { versions }, + changeset: Changeset::Empty { + versions, + ts: Some(ts), + }, })))?; } } @@ -554,6 +560,10 @@ fn handle_need( Some(row) => { let version: Version = row.get(0)?; let end_version: Option = row.get(1)?; + // ts could be null, since we previously didn't store timestamp for empties + let ts: Option = row.get(3)?; + let ts: Timestamp = + ts.unwrap_or(Timestamp::from(agent.clock().new_timestamp())); if end_version.is_some() { // send this one right away @@ -563,6 +573,7 @@ fn handle_need( actor_id, changeset: Changeset::Empty { versions: version..=version, + ts: Some(ts), }, }, )))?; @@ -571,7 +582,6 @@ fn handle_need( // since this is not a cleared version, those aren't supposed to fail! let last_seq: CrsqlSeq = row.get(2)?; - let ts: Timestamp = row.get(3)?; for range_needed in seqs { let mut prepped = tx.prepare_cached( @@ -614,6 +624,7 @@ fn handle_need( actor_id, changeset: Changeset::Empty { versions: empty..=empty, + ts: Some(ts), }, }, )))?; @@ -702,6 +713,49 @@ fn handle_need( } } } + SyncNeedV1::Empty { ts } => { + if last_cleared_ts.is_none() { + return Ok(()); + } + let ts = ts.unwrap_or(Default::default()); + debug!("processing empty versions to {actor_id}"); + let mut stmt = tx.prepare_cached( + " + SELECT start_version, end_version, ts FROM __corro_bookkeeping + WHERE actor_id = crsql_site_id() AND end_version IS NOT NULL AND ts > ? + ORDER BY ts", + )?; + let rows = stmt + .query_map([ts], |row| { + Ok((Version(row.get(0)?)..=Version(row.get(1)?), row.get(2)?)) + })? + .collect::, Timestamp)>>>()? + .iter() + .fold(HashMap::new(), |mut acc, item| { + acc.entry(item.1) + .and_modify(|arr: &mut Vec>| { + arr.push(item.clone().0) + }) + .or_insert(vec![item.clone().0]); + acc + }); + + let mut rows = Vec::from_iter(rows.iter()); + rows.sort_by(|a, b| a.0.cmp(b.0)); + let mut count = 0; + for (ts, versions) in rows { + sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id: agent.actor_id(), + changeset: Changeset::EmptySet { + versions: versions.clone(), + ts: *ts, + }, + })))?; + count += versions.len(); + } + + debug!("sent {count} empty versions during sync!"); + } } Ok(()) @@ -770,6 +824,7 @@ fn send_change_chunks>>( } async fn process_sync( + agent: Agent, pool: SplitPool, bookie: Bookie, sender: Sender, @@ -788,6 +843,12 @@ async fn process_sync( 6, ); + let last_ts = agent + .clone() + .booked() + .read("process_sync(read_cleared_ts))") + .await + .last_cleared_ts(); loop { enum Branch { Reqs(Vec)>>), @@ -843,14 +904,21 @@ async fn process_sync( continue; } } + SyncNeedV1::Empty { ts } => { + debug!("received empty need with ts: {ts:?}"); + } } let pool = pool.clone(); let sender = sender.clone(); + + let agent = agent.clone(); let fut = Box::pin(async move { let mut conn = pool.read().await?; - block_in_place(|| handle_need(&mut conn, actor_id, need, &sender))?; + block_in_place(|| { + handle_need(&mut conn, &agent, actor_id, need, &sender, last_ts) + })?; Ok(()) }); @@ -971,6 +1039,7 @@ pub async fn parallel_sync( transport: &Transport, members: Vec<(ActorId, SocketAddr)>, our_sync_state: SyncStateV1, + our_empty_ts: HashMap>, ) -> Result { trace!( self_actor_id = %agent.actor_id(), @@ -1058,10 +1127,20 @@ pub async fn parallel_sync( counter!("corro.sync.client.member", "id" => actor_id.to_string(), "addr" => addr.to_string()).increment(1); - let needs = our_sync_state.compute_available_needs(&their_sync_state); + let mut needs = our_sync_state.compute_available_needs(&their_sync_state); trace!(%actor_id, self_actor_id = %agent.actor_id(), "computed needs"); + let cleared_ts = their_sync_state.last_cleared_ts; + + info!(%actor_id, "got last cleared ts {cleared_ts:?}"); + if let Some(ts) = cleared_ts { + if let Some(last_seen) = our_empty_ts.get(&actor_id) { + if last_seen.is_none() || last_seen.unwrap() < ts { + needs.entry(actor_id).or_default().push( SyncNeedV1::Empty { ts: *last_seen }); + } + } + } Ok::<_, SyncError>((needs, tx, read)) }.await ) @@ -1114,35 +1193,38 @@ pub async fn parallel_sync( debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need { SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1, SyncNeedV1::Partial {..} => 0, + SyncNeedV1::Empty {..} => 0, }).sum::()).sum::()); - servers.push(( - actor_id, - addr, - needs - .into_iter() - .flat_map(|(actor_id, needs)| { - let mut needs: Vec<_> = needs - .into_iter() - .flat_map(|need| match need { - // chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk! - SyncNeedV1::Full { versions } => chunk_range(versions, 10) - .map(|versions| SyncNeedV1::Full { versions }) - .collect(), + let actor_needs = needs + .into_iter() + .flat_map(|(actor_id, needs)| { + let mut needs: Vec<_> = needs + .into_iter() + .flat_map(|need| match need { + // chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk! + SyncNeedV1::Full { versions } => chunk_range(versions, 10) + .map(|versions| SyncNeedV1::Full { versions }) + .collect(), - need => vec![need], - }) - .collect(); + need => vec![need], + }) + .collect(); - // NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on - needs.shuffle(&mut rng); + // NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on + needs.shuffle(&mut rng); - needs - .into_iter() - .map(|need| (actor_id, need)) - .collect::>() - }) - .collect::>(), + needs + .into_iter() + .map(|need| (actor_id, need)) + .collect::>() + }) + .collect::>(); + + servers.push(( + actor_id, + addr, + actor_needs, tx, )); @@ -1241,6 +1323,7 @@ pub async fn parallel_sync( .collect(), }] } + need => {vec![need]}, }; if actual_needs.is_empty() { @@ -1291,6 +1374,8 @@ pub async fn parallel_sync( let counts = FuturesUnordered::from_iter(readers.into_iter().map(|(actor_id, mut read)| { let tx_changes = agent.tx_changes().clone(); + let tx_emptyset = agent.tx_emptyset().clone(); + async move { let mut count = 0; @@ -1316,6 +1401,14 @@ pub async fn parallel_sync( change.versions(), change.seqs() ); + // only accept emptyset that's from the same node that's syncing + if change.is_empty_set() { + tx_emptyset + .send(change) + .await + .map_err(|_| SyncRecvError::ChangesChannelClosed)?; + continue; + } tx_changes .send((change, ChangeSource::Sync)) @@ -1473,9 +1566,15 @@ pub async fn serve_sync( let (tx, mut rx) = mpsc::channel::(256); tokio::spawn( - process_sync(agent.pool().clone(), bookie.clone(), tx, rx_need) - .instrument(info_span!("process_sync")) - .inspect_err(|e| error!("could not process sync request: {e}")), + process_sync( + agent.clone(), + agent.pool().clone(), + bookie.clone(), + tx, + rx_need, + ) + .instrument(info_span!("process_sync")) + .inspect_err(|e| error!("could not process sync request: {e}")), ); let (send_res, recv_res) = tokio::join!( @@ -1746,11 +1845,13 @@ mod tests { block_in_place(|| { handle_need( &mut conn, + &agent, actor_id, SyncNeedV1::Full { versions: Version(1)..=Version(1), }, &tx, + None, ) })?; @@ -1772,12 +1873,14 @@ mod tests { block_in_place(|| { handle_need( &mut conn, + &agent, actor_id, SyncNeedV1::Partial { version: Version(2), seqs: vec![CrsqlSeq(0)..=CrsqlSeq(0)], }, &tx, + None, ) })?; @@ -1850,25 +1953,30 @@ mod tests { block_in_place(|| { handle_need( &mut conn, + &agent, actor_id, SyncNeedV1::Partial { version: Version(1), seqs: vec![CrsqlSeq(0)..=CrsqlSeq(0)], }, &tx, + None, ) })?; let msg = rx.recv().await.unwrap(); - assert_eq!( - msg, - SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { - actor_id, - changeset: Changeset::Empty { - versions: Version(1)..=Version(1) - } - })) - ); + if let SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id: actor, + changeset, + .. + })) = msg + { + assert_eq!(actor_id, actor); + assert!(changeset.is_empty()); + assert_eq!(changeset.versions(), Version(1)..=Version(1)); + } else { + panic!("{msg:?} doesn't contain an empty changeset"); + } } { @@ -1878,11 +1986,13 @@ mod tests { block_in_place(|| { handle_need( &mut conn, + &agent, actor_id, SyncNeedV1::Full { versions: Version(1)..=Version(6), }, &tx, + None, ) })?; @@ -1917,15 +2027,18 @@ mod tests { ); let msg = rx.recv().await.unwrap(); - assert_eq!( - msg, - SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { - actor_id, - changeset: Changeset::Empty { - versions: Version(1)..=Version(1) - } - })) - ); + if let SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id: actor, + changeset, + .. + })) = msg + { + assert_eq!(actor_id, actor); + assert!(changeset.is_empty()); + assert_eq!(changeset.versions(), Version(1)..=Version(1)); + } else { + panic!("{msg:?} doesn't contain an empty changeset"); + } } // overwrite v2 @@ -2041,11 +2154,13 @@ mod tests { block_in_place(|| { handle_need( &mut conn, + &agent.clone(), actor_id, SyncNeedV1::Full { versions: Version(1)..=Version(1000), }, &tx, + None, ) })?; @@ -2095,15 +2210,18 @@ mod tests { ); let msg = rx.recv().await.unwrap(); - assert_eq!( - msg, - SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { - actor_id, - changeset: Changeset::Empty { - versions: Version(1)..=Version(2) - } - })) - ); + if let SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id: actor, + changeset, + .. + })) = msg + { + assert_eq!(actor_id, actor); + assert!(changeset.is_empty()); + assert_eq!(changeset.versions(), Version(1)..=Version(2)); + } else { + panic!("{msg:?} doesn't contain an empty changeset"); + } } { @@ -2113,12 +2231,14 @@ mod tests { block_in_place(|| { handle_need( &mut conn, + &agent.clone(), actor_id, SyncNeedV1::Partial { version: Version(5), seqs: vec![CrsqlSeq(4)..=CrsqlSeq(7)], }, &tx, + None, ) })?; @@ -2149,12 +2269,14 @@ mod tests { block_in_place(|| { handle_need( &mut conn, + &agent, actor_id, SyncNeedV1::Partial { version: Version(5), seqs: vec![CrsqlSeq(2)..=CrsqlSeq(2), CrsqlSeq(15)..=CrsqlSeq(24)], }, &tx, + None, ) })?; diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index 09c63e81..77c17def 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -141,7 +141,8 @@ pub fn runtime_loop( NoCustomBroadcast, ); - let (to_schedule_tx, mut to_schedule_rx) = bounded(agent.config().perf.schedule_channel_len, "to_schedule"); + let (to_schedule_tx, mut to_schedule_rx) = + bounded(agent.config().perf.schedule_channel_len, "to_schedule"); let mut runtime: DispatchRuntime = DispatchRuntime::new(to_send_tx, to_schedule_tx, notifications_tx); diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index 29467a55..d07ccfe7 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -19,7 +19,7 @@ use indexmap::IndexMap; use metrics::{gauge, histogram}; use parking_lot::RwLock; use rangemap::RangeInclusiveSet; -use rusqlite::{named_params, Connection, Transaction}; +use rusqlite::{named_params, Connection, OptionalExtension, Transaction}; use serde::{Deserialize, Serialize}; use tokio::sync::{ AcquireError, OwnedRwLockWriteGuard as OwnedTokioRwLockWriteGuard, OwnedSemaphorePermit, @@ -66,6 +66,7 @@ pub struct AgentConfig { pub tx_apply: CorroSender<(ActorId, Version)>, pub tx_clear_buf: CorroSender<(ActorId, RangeInclusive)>, pub tx_changes: CorroSender<(ChangeV1, ChangeSource)>, + pub tx_emptyset: CorroSender, pub tx_foca: CorroSender, pub write_sema: Arc, @@ -92,6 +93,7 @@ pub struct AgentInner { tx_apply: CorroSender<(ActorId, Version)>, tx_clear_buf: CorroSender<(ActorId, RangeInclusive)>, tx_changes: CorroSender<(ChangeV1, ChangeSource)>, + tx_emptyset: CorroSender, tx_foca: CorroSender, write_sema: Arc, schema: RwLock, @@ -121,6 +123,7 @@ impl Agent { tx_apply: config.tx_apply, tx_clear_buf: config.tx_clear_buf, tx_changes: config.tx_changes, + tx_emptyset: config.tx_emptyset, tx_foca: config.tx_foca, write_sema: config.write_sema, schema: config.schema, @@ -178,6 +181,10 @@ impl Agent { &self.0.tx_changes } + pub fn tx_emptyset(&self) -> &CorroSender { + &self.0.tx_emptyset + } + pub fn tx_clear_buf(&self) -> &CorroSender<(ActorId, RangeInclusive)> { &self.0.tx_clear_buf } @@ -239,7 +246,7 @@ impl Agent { } } -pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> { +pub fn migrate(clock: Arc, conn: &mut Connection) -> rusqlite::Result<()> { let migrations: Vec> = vec![ Box::new(init_migration as fn(&Transaction) -> rusqlite::Result<()>), Box::new(bookkeeping_db_version_index as fn(&Transaction) -> rusqlite::Result<()>), @@ -248,6 +255,8 @@ pub fn migrate(conn: &mut Connection) -> rusqlite::Result<()> { Box::new(crsqlite_v0_16_migration as fn(&Transaction) -> rusqlite::Result<()>), Box::new(create_bookkeeping_gaps as fn(&Transaction) -> rusqlite::Result<()>), Box::new(create_impacted_versions as fn(&Transaction) -> rusqlite::Result<()>), + Box::new(create_ts_index_bookkeeping_table), + Box::new(create_sync_state(clock)), ]; crate::sqlite::migrate(conn, migrations) @@ -351,6 +360,38 @@ fn refactor_corro_members(tx: &Transaction) -> rusqlite::Result<()> { ) } +fn create_ts_index_bookkeeping_table(tx: &Transaction) -> rusqlite::Result<()> { + tx.execute_batch( + r#" + CREATE INDEX index__corro_bookkeeping_ts ON __corro_bookkeeping (actor_id, ts ASC); + CREATE TABLE __corro_sync_state ( + actor_id BLOB PRIMARY KEY NOT NULL, + last_cleared_ts TEXT + ); + "#, + ) +} +fn create_sync_state(clock: Arc) -> impl Fn(&Transaction) -> rusqlite::Result<()> { + let ts = Timestamp::from(clock.new_timestamp()); + + move |tx: &Transaction| -> rusqlite::Result<()> { + tx.execute( + r#" + UPDATE __corro_bookkeeping SET ts = ? + WHERE ts IS NULL AND end_version is NOT NULL AND actor_id = crsql_site_id(); + "#, + [ts], + )?; + + tx.execute( + "INSERT INTO __corro_sync_state VALUES (crsql_site_id(), ?);", + [ts], + )?; + + Ok(()) + } +} + fn create_corro_subs(tx: &Transaction) -> rusqlite::Result<()> { tx.execute_batch( r#" @@ -1095,6 +1136,7 @@ pub struct VersionsSnapshot { needed: RangeInclusiveSet, partials: BTreeMap, max: Option, + last_cleared_ts: Option, } impl VersionsSnapshot { @@ -1163,6 +1205,16 @@ impl VersionsSnapshot { Ok(()) } + pub fn update_cleared_ts(&mut self, conn: &Connection, ts: Timestamp) -> rusqlite::Result<()> { + if self.last_cleared_ts.is_none() || self.last_cleared_ts.unwrap() < ts { + self.last_cleared_ts = Some(ts); + conn.prepare_cached("INSERT OR REPLACE INTO __corro_sync_state VALUES (?, ?)")? + .execute((self.actor_id, ts))?; + } + + Ok(()) + } + fn compute_gaps_change(&self, versions: RangeInclusiveSet) -> GapsChanges { trace!("needed: {:?}", self.needed); @@ -1259,6 +1311,7 @@ pub struct BookedVersions { pub partials: BTreeMap, needed: RangeInclusiveSet, max: Option, + last_cleared_ts: Option, } impl BookedVersions { @@ -1268,6 +1321,7 @@ impl BookedVersions { partials: Default::default(), needed: Default::default(), max: Default::default(), + last_cleared_ts: Default::default(), } } @@ -1314,6 +1368,11 @@ impl BookedVersions { } } + bv.last_cleared_ts = conn + .prepare_cached("SELECT last_cleared_ts FROM __corro_sync_state WHERE actor_id = ?")? + .query_row([actor_id], |row| row.get(0)) + .optional()?; + let mut snap = bv.snapshot(); { @@ -1382,12 +1441,20 @@ impl BookedVersions { pub fn last(&self) -> Option { self.max } + pub fn last_cleared_ts(&self) -> Option { + self.last_cleared_ts + } pub fn commit_snapshot(&mut self, mut snap: VersionsSnapshot) { debug!("comitting snapshot"); self.needed = std::mem::take(&mut snap.needed); self.partials = std::mem::take(&mut snap.partials); self.max = snap.max.take(); + if let Some(ts) = snap.last_cleared_ts { + if self.last_cleared_ts.is_none() || self.last_cleared_ts().unwrap() < ts { + self.last_cleared_ts = Some(ts) + } + } } pub fn snapshot(&self) -> VersionsSnapshot { @@ -1397,6 +1464,7 @@ impl BookedVersions { needed: self.needed.clone(), partials: self.partials.clone(), max: self.max, + last_cleared_ts: self.last_cleared_ts, } } @@ -1638,8 +1706,8 @@ pub fn find_overwritten_versions( #[cfg(test)] mod tests { - use rangemap::range_inclusive_set; use super::*; + use rangemap::range_inclusive_set; #[test] fn test_booked_insert_db() -> rusqlite::Result<()> { @@ -1647,17 +1715,28 @@ mod tests { let mut conn = CrConn::init(Connection::open_in_memory()?)?; setup_conn(&conn)?; - migrate(&mut conn)?; + let clock = Arc::new(uhlc::HLC::default()); + migrate(clock, &mut conn)?; let actor_id = ActorId::default(); let mut bv = BookedVersions::new(actor_id); let mut all = RangeInclusiveSet::new(); - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(1)..=Version(20)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(1)..=Version(20)], + )?; expect_gaps(&conn, &bv, &all, vec![])?; - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(1)..=Version(10)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(1)..=Version(10)], + )?; expect_gaps(&conn, &bv, &all, vec![])?; // try from an empty state again @@ -1665,11 +1744,21 @@ mod tests { let mut all = RangeInclusiveSet::new(); // create 2:=3 gap - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(1)..=Version(1), Version(4)..=Version(4)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(1)..=Version(1), Version(4)..=Version(4)], + )?; expect_gaps(&conn, &bv, &all, vec![Version(2)..=Version(3)])?; // fill gap - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(3)..=Version(3), Version(2)..=Version(2)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(3)..=Version(3), Version(2)..=Version(2)], + )?; expect_gaps(&conn, &bv, &all, vec![])?; // try from an empty state again @@ -1677,24 +1766,54 @@ mod tests { let mut all = RangeInclusiveSet::new(); // insert a non-1 first version - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(5)..=Version(20)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(5)..=Version(20)], + )?; expect_gaps(&conn, &bv, &all, vec![Version(1)..=Version(4)])?; // insert a further change that does not overlap a gap - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(6)..=Version(7)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(6)..=Version(7)], + )?; expect_gaps(&conn, &bv, &all, vec![Version(1)..=Version(4)])?; // insert a further change that does overlap a gap - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(3)..=Version(7)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(3)..=Version(7)], + )?; expect_gaps(&conn, &bv, &all, vec![Version(1)..=Version(2)])?; - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(1)..=Version(2)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(1)..=Version(2)], + )?; expect_gaps(&conn, &bv, &all, vec![])?; - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(25)..=Version(25)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(25)..=Version(25)], + )?; expect_gaps(&conn, &bv, &all, vec![Version(21)..=Version(24)])?; - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(30)..=Version(35)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(30)..=Version(35)], + )?; expect_gaps( &conn, &bv, @@ -1704,7 +1823,12 @@ mod tests { // NOTE: overlapping partially from the end - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(19)..=Version(22)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(19)..=Version(22)], + )?; expect_gaps( &conn, &bv, @@ -1714,7 +1838,12 @@ mod tests { // NOTE: overlapping partially from the start - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(24)..=Version(25)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(24)..=Version(25)], + )?; expect_gaps( &conn, &bv, @@ -1724,27 +1853,57 @@ mod tests { // NOTE: overlapping 2 ranges - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(23)..=Version(27)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(23)..=Version(27)], + )?; expect_gaps(&conn, &bv, &all, vec![Version(28)..=Version(29)])?; // NOTE: ineffective insert of already known ranges - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(1)..=Version(20)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(1)..=Version(20)], + )?; expect_gaps(&conn, &bv, &all, vec![Version(28)..=Version(29)])?; // NOTE: overlapping no ranges, but encompassing a full range - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(27)..=Version(30)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(27)..=Version(30)], + )?; expect_gaps(&conn, &bv, &all, vec![])?; // NOTE: touching multiple ranges, partially // create gap 36..=39 - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(40)..=Version(45)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(40)..=Version(45)], + )?; // create gap 46..=49 - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(50)..=Version(55)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(50)..=Version(55)], + )?; - insert_everywhere(&conn, &mut bv, &mut all, range_inclusive_set![Version(38)..=Version(47)])?; + insert_everywhere( + &conn, + &mut bv, + &mut all, + range_inclusive_set![Version(38)..=Version(47)], + )?; expect_gaps( &conn, &bv, diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index 25ed674a..ee859ad1 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -118,6 +118,8 @@ impl Deref for ChangeV1 { pub enum Changeset { Empty { versions: RangeInclusive, + #[speedy(default_on_eof)] + ts: Option, }, Full { version: Version, @@ -128,6 +130,10 @@ pub enum Changeset { last_seq: CrsqlSeq, ts: Timestamp, }, + EmptySet { + versions: Vec>, + ts: Timestamp, + }, } impl From for Changeset { @@ -153,7 +159,10 @@ pub struct ChangesetParts { impl Changeset { pub fn versions(&self) -> RangeInclusive { match self { - Changeset::Empty { versions } => versions.clone(), + Changeset::Empty { versions, .. } => versions.clone(), + // todo: this returns dummy version because empty set has an array of versions. + // probably shouldn't be doing this + Changeset::EmptySet { .. } => Version(0)..=Version(0), Changeset::Full { version, .. } => *version..=*version, } } @@ -165,6 +174,7 @@ impl Changeset { pub fn seqs(&self) -> Option<&RangeInclusive> { match self { Changeset::Empty { .. } => None, + Changeset::EmptySet { .. } => None, Changeset::Full { seqs, .. } => Some(seqs), } } @@ -172,6 +182,7 @@ impl Changeset { pub fn last_seq(&self) -> Option { match self { Changeset::Empty { .. } => None, + Changeset::EmptySet { .. } => None, Changeset::Full { last_seq, .. } => Some(*last_seq), } } @@ -179,6 +190,7 @@ impl Changeset { pub fn is_complete(&self) -> bool { match self { Changeset::Empty { .. } => true, + Changeset::EmptySet { .. } => true, Changeset::Full { seqs, last_seq, .. } => { *seqs.start() == CrsqlSeq(0) && seqs.end() == last_seq } @@ -188,6 +200,7 @@ impl Changeset { pub fn len(&self) -> usize { match self { Changeset::Empty { .. } => 0, //(versions.end().0 - versions.start().0 + 1) as usize, + Changeset::EmptySet { versions, .. } => versions.len(), Changeset::Full { changes, .. } => changes.len(), } } @@ -195,13 +208,23 @@ impl Changeset { pub fn is_empty(&self) -> bool { match self { Changeset::Empty { .. } => true, + Changeset::EmptySet { .. } => true, Changeset::Full { changes, .. } => changes.is_empty(), } } + pub fn is_empty_set(&self) -> bool { + match self { + Changeset::Empty { .. } => false, + Changeset::EmptySet { .. } => true, + Changeset::Full { .. } => false, + } + } + pub fn ts(&self) -> Option { match self { - Changeset::Empty { .. } => None, + Changeset::Empty { ts, .. } => *ts, + Changeset::EmptySet { ts, .. } => Some(*ts), Changeset::Full { ts, .. } => Some(*ts), } } @@ -209,6 +232,7 @@ impl Changeset { pub fn changes(&self) -> &[Change] { match self { Changeset::Empty { .. } => &[], + Changeset::EmptySet { .. } => &[], Changeset::Full { changes, .. } => changes, } } @@ -216,6 +240,7 @@ impl Changeset { pub fn into_parts(self) -> Option { match self { Changeset::Empty { .. } => None, + Changeset::EmptySet { .. } => None, Changeset::Full { version, changes, @@ -239,7 +264,7 @@ pub enum TimestampParseError { Parse(ParseNTP64Error), } -#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Hash)] #[serde(transparent)] pub struct Timestamp(pub NTP64); diff --git a/crates/corro-types/src/change.rs b/crates/corro-types/src/change.rs index 8198f01c..478bfdb2 100644 --- a/crates/corro-types/src/change.rs +++ b/crates/corro-types/src/change.rs @@ -232,12 +232,29 @@ pub fn insert_local_changes( version: Some(version), })?; + let mut cleared_ts: Option = None; for (actor_id, versions_set) in overwritten { + if actor_id != agent.actor_id() { + warn!("clearing and setting timestamp for empties from a different node"); + } + let ts = Timestamp::from(agent.clock().new_timestamp()); for versions in versions_set { - store_empty_changeset(tx, actor_id, versions)?; + let inserted = store_empty_changeset(tx, actor_id, versions, ts)?; + if inserted > 0 { + cleared_ts = Some(ts) + } } } + if let Some(ts) = cleared_ts { + snap.update_cleared_ts(tx, ts) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; + } + Ok(Some(InsertChangesInfo { version, db_version, @@ -251,6 +268,7 @@ pub fn store_empty_changeset( conn: &Connection, actor_id: ActorId, versions: RangeInclusive, + ts: Timestamp, ) -> Result { trace!(%actor_id, "attempting to delete versions range {versions:?}"); let start = Instant::now(); @@ -351,14 +369,14 @@ pub fn store_empty_changeset( .prepare_cached( " INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version, db_version, last_seq, ts) - VALUES (?, ?, ?, NULL, NULL, NULL); + VALUES (?, ?, ?, NULL, NULL, ?); ", ).map_err(|source| ChangeError::Rusqlite { source, actor_id: Some(actor_id), version: None, })? - .execute(params![actor_id, range.start(), range.end()]).map_err(|source| ChangeError::Rusqlite { + .execute(params![actor_id, range.start(), range.end(), ts]).map_err(|source| ChangeError::Rusqlite { source, actor_id: Some(actor_id), version: None, diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index 3cd21382..99afaff2 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -2384,10 +2384,12 @@ mod tests { tmpdir.path().join("subs").display().to_string().into(); let pool = SplitPool::create(db_path, Arc::new(Semaphore::new(1))).await?; + let clock = Arc::new(uhlc::HLC::default()); + { let mut conn = pool.write_priority().await?; setup_conn(&conn)?; - migrate(&mut conn)?; + migrate(clock, &mut conn)?; let tx = conn.transaction()?; apply_schema(&tx, &Schema::default(), &mut schema)?; tx.commit()?; @@ -2505,10 +2507,10 @@ mod tests { .await .unwrap(); let mut conn = pool.write_priority().await.unwrap(); - + let clock = Arc::new(uhlc::HLC::default()); { setup_conn(&conn).unwrap(); - migrate(&mut conn).unwrap(); + migrate(clock, &mut conn).unwrap(); let tx = conn.transaction().unwrap(); apply_schema(&tx, &Schema::default(), &mut schema).unwrap(); tx.commit().unwrap(); @@ -2545,7 +2547,8 @@ mod tests { .expect("could not init crsql"); setup_conn(&conn2).unwrap(); - migrate(&mut conn2).unwrap(); + let clock = Arc::new(uhlc::HLC::default()); + migrate(clock, &mut conn2).unwrap(); { let tx = conn2.transaction().unwrap(); diff --git a/crates/corro-types/src/sqlite.rs b/crates/corro-types/src/sqlite.rs index 9a91324b..6ea10624 100644 --- a/crates/corro-types/src/sqlite.rs +++ b/crates/corro-types/src/sqlite.rs @@ -127,7 +127,10 @@ pub trait Migration { fn migrate(&self, tx: &Transaction) -> rusqlite::Result<()>; } -impl Migration for fn(&Transaction) -> rusqlite::Result<()> { +impl Migration for F +where + F: Fn(&Transaction) -> rusqlite::Result<()>, +{ fn migrate(&self, tx: &Transaction) -> rusqlite::Result<()> { self(tx) } diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 0a406120..43d21aa6 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -82,6 +82,8 @@ pub struct SyncStateV1 { pub heads: HashMap, pub need: HashMap>>, pub partial_need: HashMap>>>, + #[speedy(default_on_eof)] + pub last_cleared_ts: Option, } impl SyncStateV1 { @@ -256,6 +258,9 @@ pub enum SyncNeedV1 { version: Version, seqs: Vec>, }, + Empty { + ts: Option, + }, } impl SyncNeedV1 { @@ -263,6 +268,7 @@ impl SyncNeedV1 { match self { SyncNeedV1::Full { versions } => (versions.end().0 - versions.start().0) as usize + 1, SyncNeedV1::Partial { .. } => 1, + SyncNeedV1::Empty { .. } => 1, } } } @@ -273,11 +279,24 @@ impl From for SyncMessage { } } +pub async fn get_last_cleared_ts(bookie: &Bookie, actor_id: &ActorId) -> Option { + let booked = bookie + .read("get_last_cleared_ts") + .await + .get(actor_id) + .cloned(); + if let Some(booked) = booked { + let booked_reader = booked.read("get_last_cleared_ts").await; + return booked_reader.last_cleared_ts(); + } + return None; +} + // generates a `SyncMessage` to tell another node what versions we're missing #[tracing::instrument(skip_all, level = "debug")] -pub async fn generate_sync(bookie: &Bookie, actor_id: ActorId) -> SyncStateV1 { +pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncStateV1 { let mut state = SyncStateV1 { - actor_id, + actor_id: self_actor_id, ..Default::default() }; @@ -290,6 +309,8 @@ pub async fn generate_sync(bookie: &Bookie, actor_id: ActorId) -> SyncStateV1 { .collect() }; + let mut last_ts = None; + for (actor_id, booked) in actors { let bookedr = booked .read(format!("generate_sync:{}", actor_id.as_simple())) @@ -323,9 +344,15 @@ pub async fn generate_sync(bookie: &Bookie, actor_id: ActorId) -> SyncStateV1 { } } + if actor_id == self_actor_id { + last_ts = bookedr.last_cleared_ts(); + } + state.heads.insert(actor_id, last_version); } + state.last_cleared_ts = last_ts; + state } diff --git a/crates/corrosion/src/command/consul/sync.rs b/crates/corrosion/src/command/consul/sync.rs index 2580fffc..8c47597a 100644 --- a/crates/corrosion/src/command/consul/sync.rs +++ b/crates/corrosion/src/command/consul/sync.rs @@ -3,7 +3,7 @@ use corro_api_types::ColumnType; use corro_client::CorrosionClient; use corro_types::{api::Statement, config::ConsulConfig}; use futures::future::select; -use metrics::{histogram, counter}; +use metrics::{counter, histogram}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -12,7 +12,10 @@ use std::{ path::Path, time::{Duration, Instant, SystemTime}, }; -use tokio::{signal::unix::{signal, SignalKind}, time::{interval, timeout}}; +use tokio::{ + signal::unix::{signal, SignalKind}, + time::{interval, timeout}, +}; use tracing::{debug, error, info, trace}; const CONSUL_PULL_INTERVAL: Duration = Duration::from_secs(1); @@ -43,10 +46,7 @@ pub async fn run>( let consul = consul_client::Client::new(config.client.clone())?; info!("Setting up corrosion for consul sync"); - setup( - &corrosion - ) - .await?; + setup(&corrosion).await?; let mut consul_services: HashMap = HashMap::new(); let mut consul_checks: HashMap = HashMap::new(); @@ -105,29 +105,27 @@ pub async fn run>( match res { Ok((svc_stats, check_stats)) => { if !svc_stats.is_zero() { - info!("updated consul services: {svc_stats:?}"); + info!("updated consul services: {svc_stats:?}"); } if !check_stats.is_zero() { - info!("updated consul checks: {check_stats:?}"); + info!("updated consul checks: {check_stats:?}"); } } Err(e) => match e { UpdateError::Execute(ExecuteError::Sqlite(_)) => { return Err(e.into()); - }, + } e => { error!("non-fatal update error, continuing. error: {e}"); } - } + }, } } Ok(()) } -async fn setup( - corrosion: &CorrosionClient, -) -> eyre::Result<()> { +async fn setup(corrosion: &CorrosionClient) -> eyre::Result<()> { let mut conn = corrosion.pool().get().await?; { let tx = conn.transaction()?; @@ -152,13 +150,22 @@ async fn setup( struct ColumnInfo { name: String, - kind: corro_api_types::ColumnType + kind: corro_api_types::ColumnType, } - let col_infos: Vec = conn.prepare("PRAGMA table_info(consul_services)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_services' table_info: {e}"))?.collect::, _>>()?; - + let col_infos: Vec = conn + .prepare("PRAGMA table_info(consul_services)")? + .query_map([], |row| { + Ok(ColumnInfo { + name: row.get(1)?, + kind: row.get(2)?, + }) + }) + .map_err(|e| eyre::eyre!("could not query consul_services' table_info: {e}"))? + .collect::, _>>()?; + let expected_cols = [ - ("node", vec![ColumnType::Text]), + ("node", vec![ColumnType::Text]), ("id", vec![ColumnType::Text]), ("name", vec![ColumnType::Text]), ("tags", vec![ColumnType::Text, ColumnType::Blob]), @@ -169,15 +176,27 @@ async fn setup( ]; for (name, kind) in expected_cols { - if !col_infos.iter().any(|info| info.name == name && kind.contains(&info.kind)) { + if !col_infos + .iter() + .any(|info| info.name == name && kind.contains(&info.kind)) + { eyre::bail!("expected a column consul_services.{name} w/ type {kind:?}"); } } - let col_infos: Vec = conn.prepare("PRAGMA table_info(consul_checks)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))?.collect::, _>>()?; - + let col_infos: Vec = conn + .prepare("PRAGMA table_info(consul_checks)")? + .query_map([], |row| { + Ok(ColumnInfo { + name: row.get(1)?, + kind: row.get(2)?, + }) + }) + .map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))? + .collect::, _>>()?; + let expected_cols = [ - ("node", vec![ColumnType::Text]), + ("node", vec![ColumnType::Text]), ("id", vec![ColumnType::Text]), ("service_id", vec![ColumnType::Text]), ("service_name", vec![ColumnType::Text]), @@ -188,7 +207,10 @@ async fn setup( ]; for (name, kind) in expected_cols { - if !col_infos.iter().any(|info| info.name == name && kind.contains(&info.kind)) { + if !col_infos + .iter() + .any(|info| info.name == name && kind.contains(&info.kind)) + { eyre::bail!("expected a column consul_checks.{name} w/ type {kind:?}"); } } @@ -371,18 +393,18 @@ fn append_upsert_service_statements( updated_at: i64, ) { // run this by corrosion so it's part of the same transaction - statements.push(Statement::WithParams("INSERT INTO __corro_consul_services ( id, hash ) + statements.push(Statement::WithParams( + "INSERT INTO __corro_consul_services ( id, hash ) VALUES (?, ?) ON CONFLICT (id) DO UPDATE SET hash = excluded.hash;" - .into(),vec![ - - svc.id.clone().into(), - hash.to_be_bytes().to_vec().into(), - ])); + .into(), + vec![svc.id.clone().into(), hash.to_be_bytes().to_vec().into()], + )); // upsert! - statements.push(Statement::WithParams("INSERT INTO consul_services ( node, id, name, tags, meta, port, address, updated_at ) + statements.push(Statement::WithParams( + "INSERT INTO consul_services ( node, id, name, tags, meta, port, address, updated_at ) VALUES (?,?,?,?,?,?,?,?) ON CONFLICT(node, id) DO UPDATE SET name = excluded.name, @@ -391,17 +413,22 @@ fn append_upsert_service_statements( port = excluded.port, address = excluded.address, updated_at = excluded.updated_at;" - .into(),vec![ - - node.into(), - svc.id.into(), - svc.name.into(), - serde_json::to_string(&svc.tags).unwrap_or_else(|_| "[]".to_string()).into(), - serde_json::to_string(&svc.meta).unwrap_or_else(|_| "{}".to_string()).into(), - svc.port.into(), - svc.address.into(), - updated_at.into(), - ])); + .into(), + vec![ + node.into(), + svc.id.into(), + svc.name.into(), + serde_json::to_string(&svc.tags) + .unwrap_or_else(|_| "[]".to_string()) + .into(), + serde_json::to_string(&svc.meta) + .unwrap_or_else(|_| "{}".to_string()) + .into(), + svc.port.into(), + svc.address.into(), + updated_at.into(), + ], + )); } fn append_upsert_check_statements( @@ -412,15 +439,14 @@ fn append_upsert_check_statements( updated_at: i64, ) { // run this by corrosion so it's part of the same transaction - statements.push(Statement::WithParams("INSERT INTO __corro_consul_checks ( id, hash ) + statements.push(Statement::WithParams( + "INSERT INTO __corro_consul_checks ( id, hash ) VALUES (?, ?) ON CONFLICT (id) DO UPDATE SET hash = excluded.hash;" - .into(),vec![ - - check.id.clone().into(), - hash.to_be_bytes().to_vec().into(), - ])); + .into(), + vec![check.id.clone().into(), hash.to_be_bytes().to_vec().into()], + )); // upsert! statements.push(Statement::WithParams("INSERT INTO consul_checks ( node, id, service_id, service_name, name, status, output, updated_at ) @@ -433,7 +459,6 @@ fn append_upsert_check_statements( output = excluded.output, updated_at = excluded.updated_at;" .into(),vec![ - node.into(), check.id.into(), check.service_id.into(), @@ -452,7 +477,7 @@ enum ConsulServiceOp { enum ConsulCheckOp { Upsert { check: AgentCheck, hash: u64 }, - Delete { id: String } + Delete { id: String }, } /// @@ -519,7 +544,7 @@ fn update_checks( let hash = hash_check(&check); ops.push(ConsulCheckOp::Upsert { check, hash }); } - + ops } @@ -530,7 +555,7 @@ enum UpdateError { #[error("timed out")] TimedOut, #[error(transparent)] - Execute(#[from] ExecuteError) + Execute(#[from] ExecuteError), } async fn update_consul( @@ -545,7 +570,8 @@ async fn update_consul( let start = Instant::now(); match timeout(Duration::from_secs(5), consul.agent_services()).await { Ok(Ok(services)) => { - histogram!("corro_consul.consul.response.time.seconds").record(start.elapsed().as_secs_f64()); + histogram!("corro_consul.consul.response.time.seconds") + .record(start.elapsed().as_secs_f64()); Ok(update_services(services, service_hashes, skip_hash_check)) } Ok(Err(e)) => { @@ -557,14 +583,14 @@ async fn update_consul( Err(UpdateError::TimedOut) } } - }; let fut_checks = async { let start = Instant::now(); match timeout(Duration::from_secs(5), consul.agent_checks()).await { Ok(Ok(checks)) => { - histogram!("corro_consul.consul.response.time.seconds").record(start.elapsed().as_secs_f64()); + histogram!("corro_consul.consul.response.time.seconds") + .record(start.elapsed().as_secs_f64()); Ok(update_checks(checks, check_hashes, skip_hash_check)) } Ok(Err(e)) => { @@ -580,15 +606,17 @@ async fn update_consul( let (svcs, checks) = tokio::try_join!(fut_services, fut_checks)?; - execute(node, corrosion, svcs, service_hashes, checks, check_hashes).await.map_err(UpdateError::from) + execute(node, corrosion, svcs, service_hashes, checks, check_hashes) + .await + .map_err(UpdateError::from) } #[derive(Debug, thiserror::Error)] enum ExecuteError { #[error("client: {0}")] - Client(#[from]corro_client::Error), + Client(#[from] corro_client::Error), #[error("sqlite: {0}")] - Sqlite(String) + Sqlite(String), } async fn execute( @@ -614,19 +642,25 @@ async fn execute( ConsulServiceOp::Upsert { svc, hash } => { svc_to_upsert.push((svc.id.clone(), hash)); append_upsert_service_statements(&mut statements, node, svc, hash, updated_at); - }, + } ConsulServiceOp::Delete { id } => { svc_to_delete.push(id.clone()); - statements.push(Statement::WithParams("DELETE FROM __corro_consul_services WHERE id = ?;".into(),vec![id.clone().into()])); - statements.push(Statement::WithParams("DELETE FROM consul_services WHERE node = ? AND id = ?;".into(),vec![node.into(),id.into(),])); - }, + statements.push(Statement::WithParams( + "DELETE FROM __corro_consul_services WHERE id = ?;".into(), + vec![id.clone().into()], + )); + statements.push(Statement::WithParams( + "DELETE FROM consul_services WHERE node = ? AND id = ?;".into(), + vec![node.into(), id.into()], + )); + } } } // delete everything that's wrong in the DB! this is useful on restore from a backup... statements.push(Statement::WithParams("DELETE FROM consul_services WHERE node = ? AND id NOT IN (SELECT id FROM __corro_consul_services)".into(), vec![node.into()])); - + let mut check_to_upsert = vec![]; let mut check_to_delete = vec![]; @@ -635,28 +669,35 @@ async fn execute( ConsulCheckOp::Upsert { check, hash } => { check_to_upsert.push((check.id.clone(), hash)); append_upsert_check_statements(&mut statements, node, check, hash, updated_at); - }, + } ConsulCheckOp::Delete { id } => { check_to_delete.push(id.clone()); - statements.push(Statement::WithParams("DELETE FROM __corro_consul_checks WHERE id = ?;".into(),vec![id.clone().into(),])); - statements.push(Statement::WithParams("DELETE FROM consul_checks WHERE node = ? AND id = ?;".into(),vec![node.into(),id.into()])); - }, + statements.push(Statement::WithParams( + "DELETE FROM __corro_consul_checks WHERE id = ?;".into(), + vec![id.clone().into()], + )); + statements.push(Statement::WithParams( + "DELETE FROM consul_checks WHERE node = ? AND id = ?;".into(), + vec![node.into(), id.into()], + )); + } } } // delete everything that's wrong in the DB! this is useful on restore from a backup... statements.push(Statement::WithParams("DELETE FROM consul_checks WHERE node = ? AND id NOT IN (SELECT id FROM __corro_consul_checks)".into(), vec![node.into()])); - if !statements.is_empty() { - if let Some(e) = corrosion.execute(&statements).await?.results.into_iter().find_map(|res| { - match res { + if let Some(e) = corrosion + .execute(&statements) + .await? + .results + .into_iter() + .find_map(|res| match res { corro_api_types::ExecResult::Execute { .. } => None, - corro_api_types::ExecResult::Error { error } => { - Some(error) - }, - } - }) { + corro_api_types::ExecResult::Error { error } => Some(error), + }) + { return Err(ExecuteError::Sqlite(e)); } } @@ -665,7 +706,7 @@ async fn execute( for (id, hash) in svc_to_upsert { service_hashes.insert(id, hash); - svc_stats.upserted +=1 ; + svc_stats.upserted += 1; } for id in svc_to_delete { service_hashes.remove(&id); @@ -676,7 +717,7 @@ async fn execute( for (id, hash) in check_to_upsert { check_hashes.insert(id, hash); - check_stats.upserted +=1 ; + check_stats.upserted += 1; } for id in check_to_delete { check_hashes.remove(&id); @@ -702,7 +743,9 @@ mod tests { let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple(); let tmpdir = tempfile::TempDir::new()?; - tokio::fs::write(tmpdir.path().join("consul.sql"), b" + tokio::fs::write( + tmpdir.path().join("consul.sql"), + b" CREATE TABLE consul_services ( node TEXT NOT NULL, id TEXT NOT NULL, @@ -728,24 +771,31 @@ mod tests { updated_at INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (node, id) ); - ").await?; + ", + ) + .await?; - let ta1 = launch_test_agent(|conf| conf.add_schema_path(tmpdir.path().display().to_string()).build(), tripwire.clone()).await?; + let ta1 = launch_test_agent( + |conf| { + conf.add_schema_path(tmpdir.path().display().to_string()) + .build() + }, + tripwire.clone(), + ) + .await?; let ta2 = launch_test_agent( |conf| { - conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]).add_schema_path(tmpdir.path().display().to_string()) + conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]) + .add_schema_path(tmpdir.path().display().to_string()) .build() }, tripwire.clone(), ) - .await?; + .await?; let ta1_client = CorrosionClient::new(ta1.agent.api_addr(), ta1.agent.db_path()); - setup( - &ta1_client, - ) - .await?; + setup(&ta1_client).await?; let mut svc_hashes = HashMap::new(); let mut check_hashes = HashMap::new(); @@ -810,7 +860,15 @@ mod tests { services.insert("service-id".into(), svc.clone()); services.insert("service-id0".into(), svc0.clone()); - let (applied, check_applied) = execute("node-1", &ta1_client, update_services(services.clone(), &svc_hashes, false), &mut svc_hashes, Default::default(), &mut check_hashes).await?; + let (applied, check_applied) = execute( + "node-1", + &ta1_client, + update_services(services.clone(), &svc_hashes, false), + &mut svc_hashes, + Default::default(), + &mut check_hashes, + ) + .await?; assert!(check_applied.is_zero()); @@ -833,7 +891,15 @@ mod tests { assert_eq!(svc_hash, hash); } - let (applied, _check_applied) = execute("node-1", &ta1_client, update_services(services, &svc_hashes, false), &mut svc_hashes, Default::default(), &mut check_hashes).await?; + let (applied, _check_applied) = execute( + "node-1", + &ta1_client, + update_services(services, &svc_hashes, false), + &mut svc_hashes, + Default::default(), + &mut check_hashes, + ) + .await?; assert!(check_applied.is_zero()); @@ -844,10 +910,7 @@ mod tests { let ta2_client = CorrosionClient::new(ta2.agent.api_addr(), ta2.agent.db_path()); - setup( - &ta2_client, - ) - .await?; + setup(&ta2_client).await?; sleep(Duration::from_secs(2)).await; @@ -861,7 +924,15 @@ mod tests { assert_eq!(app_id, 123); } - let (applied, _check_applied) = execute("node-1", &ta1_client, update_services(HashMap::new(), &svc_hashes, false), &mut svc_hashes, Default::default(), &mut check_hashes).await?; + let (applied, _check_applied) = execute( + "node-1", + &ta1_client, + update_services(HashMap::new(), &svc_hashes, false), + &mut svc_hashes, + Default::default(), + &mut check_hashes, + ) + .await?; assert!(check_applied.is_zero()); diff --git a/crates/tripwire/src/preempt.rs b/crates/tripwire/src/preempt.rs index 859b283a..67d7c723 100644 --- a/crates/tripwire/src/preempt.rs +++ b/crates/tripwire/src/preempt.rs @@ -98,7 +98,10 @@ where C: Future, { fn preemptible(self, preempt: P) -> PreemptibleFuture { - PreemptibleFuture { preempt, complete: self } + PreemptibleFuture { + preempt, + complete: self, + } } } diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index e69de29b..8b137891 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -0,0 +1 @@ + From d3722abdbd1e7e0ccb377f070dce4b7114f635ae Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 4 Jul 2024 15:40:55 +0100 Subject: [PATCH 2/3] order nodes chosen for sync by last sync time also --- crates/corro-agent/src/agent/handlers.rs | 22 +++++++++++++++++++--- crates/corro-types/src/broadcast.rs | 2 +- crates/corro-types/src/members.rs | 8 ++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index e4d7e4c1..6a7e079d 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -792,8 +792,15 @@ pub async fn handle_sync( **id != agent.actor_id() && state.cluster_id == agent.cluster_id() }) // Grab a ring-buffer index to the member RTT range - .map(|(id, state)| (*id, state.ring.unwrap_or(255), state.addr)) - .collect::>() + .map(|(id, state)| { + ( + *id, + state.ring.unwrap_or(255), + state.addr, + state.last_sync_ts, + ) + }) + .collect::)>>() }; if candidates.is_empty() { @@ -816,6 +823,8 @@ pub async fn handle_sync( sync_state .need_len_for_actor(&b.0) .cmp(&sync_state.need_len_for_actor(&a.0)) + // if equal, look at last sync time + .then_with(|| a.3.cmp(&b.3)) // if equal, look at proximity (via `ring`) .then_with(|| a.1.cmp(&b.1)) }); @@ -823,7 +832,7 @@ pub async fn handle_sync( choices.truncate(desired_count); choices .into_iter() - .map(|(actor_id, _, addr)| (actor_id, addr)) + .map(|(actor_id, _, addr, _)| (actor_id, addr)) .collect() }; @@ -852,6 +861,7 @@ pub async fn handle_sync( info!( "synced {n} changes w/ {} in {}s @ {} changes/s", chosen + .clone() .into_iter() .map(|(actor_id, _)| actor_id.to_string()) .collect::>() @@ -859,6 +869,12 @@ pub async fn handle_sync( elapsed.as_secs_f64(), n as f64 / elapsed.as_secs_f64() ); + + let ts = Timestamp::from(agent.clock().new_timestamp()); + for (actor_id, _) in chosen { + let mut members = agent.members().write(); + members.update_sync_ts(actor_id, ts); + } } Ok(()) } diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index ee859ad1..382bfce5 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -264,7 +264,7 @@ pub enum TimestampParseError { Parse(ParseNTP64Error), } -#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Hash)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Ord, Hash)] #[serde(transparent)] pub struct Timestamp(pub NTP64); diff --git a/crates/corro-types/src/members.rs b/crates/corro-types/src/members.rs index c5c77fff..753b2df5 100644 --- a/crates/corro-types/src/members.rs +++ b/crates/corro-types/src/members.rs @@ -16,6 +16,7 @@ pub struct MemberState { pub cluster_id: ClusterId, pub ring: Option, + pub last_sync_ts: Option, } impl MemberState { @@ -25,6 +26,7 @@ impl MemberState { ts, cluster_id, ring: None, + last_sync_ts: None, } } @@ -59,6 +61,12 @@ impl Members { self.states.get(id) } + pub fn update_sync_ts(&mut self, actor_id: ActorId, ts: Timestamp) { + if let Some(state) = self.states.get_mut(&actor_id) { + state.last_sync_ts = Some(ts); + } + } + // A result of `true` means that the effective list of // cluster member addresses has changed pub fn add_member(&mut self, actor: &Actor) -> MemberAddedResult { From 41632190929cab4dd84efafb1c990e3e7a9dac0f Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 4 Jul 2024 16:57:15 +0100 Subject: [PATCH 3/3] set ts in parallel_syn --- crates/corro-agent/src/agent/handlers.rs | 6 ------ crates/corro-agent/src/api/peer.rs | 15 +++++++++------ crates/corro-types/src/members.rs | 4 ++-- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 6a7e079d..4757af19 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -869,12 +869,6 @@ pub async fn handle_sync( elapsed.as_secs_f64(), n as f64 / elapsed.as_secs_f64() ); - - let ts = Timestamp::from(agent.clock().new_timestamp()); - for (actor_id, _) in chosen { - let mut members = agent.members().write(); - members.update_sync_ts(actor_id, ts); - } } Ok(()) } diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 6c5df4fd..0d537e27 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1436,20 +1436,23 @@ pub async fn parallel_sync( debug!(%actor_id, %count, "done reading sync messages"); - Ok(count) + Ok((actor_id, count)) } .instrument(info_span!("read_sync_requests_responses", %actor_id)) })) - .collect::>>() + .collect::>>() .await; + let ts = Timestamp::from(agent.clock().new_timestamp()); + let mut members = agent.members().write(); for res in counts.iter() { - if let Err(e) = res { - error!("could not properly recv from peer: {e}"); - } + match res { + Err(e) => error!("could not properly recv from peer: {e}"), + Ok((actor_id, _)) => members.update_sync_ts(&actor_id, ts), + }; } - Ok(counts.into_iter().flatten().sum::()) + Ok(counts.into_iter().map(|res| res.map(|i| i.1)).flatten().sum::()) } #[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)] diff --git a/crates/corro-types/src/members.rs b/crates/corro-types/src/members.rs index 753b2df5..2cf2f74b 100644 --- a/crates/corro-types/src/members.rs +++ b/crates/corro-types/src/members.rs @@ -61,8 +61,8 @@ impl Members { self.states.get(id) } - pub fn update_sync_ts(&mut self, actor_id: ActorId, ts: Timestamp) { - if let Some(state) = self.states.get_mut(&actor_id) { + pub fn update_sync_ts(&mut self, actor_id: &ActorId, ts: Timestamp) { + if let Some(state) = self.states.get_mut(actor_id) { state.last_sync_ts = Some(ts); } }