Skip to content

Commit

Permalink
clear some versions
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed May 16, 2024
1 parent 591f075 commit d88f10f
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use rand::{
distributions::Uniform, prelude::Distribution, rngs::StdRng, seq::IteratorRandom, SeedableRng,
};
use rangemap::RangeInclusiveSet;
use rusqlite::params;
use serde::Deserialize;
use serde_json::json;
use spawn::wait_for_all_pending_handles;
Expand All @@ -34,17 +33,16 @@ use crate::{
use corro_tests::*;
use corro_types::{
actor::ActorId,
agent::{migrate, Bookie, LockRegistry},
agent::{migrate},
api::{ExecResponse, ExecResult, Statement, row_to_change},
base::{CrsqlDbVersion, CrsqlSeq, Version},
change::store_empty_changeset,
sqlite::CrConn,
sync::generate_sync,
broadcast::{Changeset, ChangeSource, ChangeV1, Timestamp},
broadcast::{Changeset, ChangeSource, ChangeV1},
};
use corro_types::agent::Agent;
use corro_types::api::Change;
use corro_types::config::Config;
use corro_types::change::Change;

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn insert_rows_and_gossip() -> eyre::Result<()> {
Expand Down Expand Up @@ -791,28 +789,33 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {
assert_eq!(status_code, StatusCode::OK);

// make about 50 transactions to ta1
insert_rows(ta1.agent.clone(), 50).await;
// chunk up changes and send to ta2
insert_rows(ta1.agent.clone(), 1,50).await;

// send 1-5
let rows = get_rows(ta1.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
// check ta2 bookie
check_bookie_versions(ta2.clone(), ta1.agent.actor_id(), vec![Version(1)..=Version(5)], vec![], vec![]).await?;
check_bookie_versions(ta2.clone(), ta1.agent.actor_id(), vec![Version(1)..=Version(5)], vec![], vec![], vec![]).await?;

// send 9-10
let rows = get_rows(ta1.agent.clone(), vec![(Version(9)..=Version(10), None)]).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
// check for gap 6-8
check_bookie_versions(ta2.clone(), ta1.agent.actor_id(), vec![], vec![Version(6)..=Version(8)], vec![]).await?;
check_bookie_versions(ta2.clone(), ta1.agent.actor_id(), vec![], vec![Version(6)..=Version(8)], vec![], vec![]).await?;

// create more gaps plus send partials
// send 15-16 and 20
let rows = get_rows(ta1.agent.clone(), vec![(Version(20)..=Version(20), None),(Version(15)..=Version(16), Some(CrsqlSeq(0)..=CrsqlSeq(0)))]).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
check_bookie_versions(ta2.clone(), ta1.agent.actor_id(), vec![],
vec![Version(11)..=Version(14), Version(17)..=Version(19)],
vec![(Version(15)..=Version(16), CrsqlSeq(0)..=CrsqlSeq(0))]).await?;
vec![(Version(15)..=Version(16), CrsqlSeq(0)..=CrsqlSeq(0))], vec![]).await?;

// clear versions 21-25
insert_rows(ta1.agent.clone(), 21,25).await;
let rows = get_rows(ta1.agent.clone(), vec![(Version(21)..=Version(25), None)]).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
check_bookie_versions(ta2.clone(), ta1.agent.actor_id(), vec![], vec![], vec![], vec![Version(21)..=Version(25)]).await?;

tripwire_tx.send(()).await.ok();
tripwire_worker.await;
Expand All @@ -826,7 +829,9 @@ async fn check_bookie_versions(
actor_id: ActorId,
complete: Vec<RangeInclusive<Version>>,
gap: Vec<RangeInclusive<Version>>,
partials: Vec<(RangeInclusive<Version>, RangeInclusive<CrsqlSeq>)>) -> eyre::Result<()> {
partials: Vec<(RangeInclusive<Version>, RangeInclusive<CrsqlSeq>)>,
cleared: Vec<RangeInclusive<Version>>,
) -> eyre::Result<()> {

let conn = ta.agent.pool().read().await?;
let booked = ta.bookie.write("test").await.ensure(actor_id);
Expand Down Expand Up @@ -884,6 +889,12 @@ async fn check_bookie_versions(
.query_row((actor_id, versions.start(), versions.end()), |row| row.get(0))?);
}

for versions in cleared {
assert!(conn.prepare_cached(
"SELECT EXISTS (SELECT 1 FROM __corro_bookkeeping WHERE actor_id = ? and start_version = ? and end_version = ?)")?
.query_row((actor_id, versions.start(), versions.end()), |row| row.get(0))?);
}

Ok(())
}

Expand All @@ -895,7 +906,7 @@ async fn get_rows(agent: Agent, v: Vec<(RangeInclusive<Version>, Option<RangeInc
for version in versions.0 {
let mut query = r#"SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl
FROM crsql_changes where db_version = ?"#.to_string();
let mut changes = vec![];
let changes : Vec<Change>;
let seqs = if let Some(seq) = versions.1.clone() {
let seq_query = " and seq >= ? and seq <= ?";
query = query + seq_query;
Expand Down Expand Up @@ -923,20 +934,20 @@ async fn get_rows(agent: Agent, v: Vec<(RangeInclusive<Version>, Option<RangeInc
Ok(result)
}

async fn insert_rows(agent: Agent, n: i64) {
async fn insert_rows(agent: Agent, start: i64, n: i64) {
// check locally that everything is in order
for i in 1..=n {
let (status_code, body) = api_v1_transactions(
for i in start..=n {
let (status_code, _) = api_v1_transactions(
Extension(agent.clone()),
axum::Json(vec![Statement::WithParams(
"INSERT INTO tests3 (id,text,text2, num, num2) VALUES (?,?,?,?,?)".into(),
"INSERT OR REPLACE INTO tests3 (id,text,text2, num, num2) VALUES (?,?,?,?,?)".into(),
vec![i.into(), "service-name".into(), "second text".into(), (i+20).into(), (i+100).into()],
)]),
).await;
assert_eq!(status_code, StatusCode::OK);

let version = body.0.version.unwrap();
assert_eq!(version, Version(i as u64));
// let version = body.0.version.unwrap();
// assert_eq!(version, Version(i as u64));
}
}

Expand Down

0 comments on commit d88f10f

Please sign in to comment.