diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 450058da..4e04a42c 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -465,6 +465,8 @@ pub async fn handle_emptyset( } } } + + println!("shutting down handle empties loop"); } pub async fn process_emptyset( @@ -506,12 +508,12 @@ pub async fn process_emptyset( store_empty_changeset(&tx, actor_id, version.clone(), *ts)?; } - // snap.insert_db(&tx, RangeInclusiveSet::from_iter(versions.clone())) - // .map_err(|source| ChangeError::Rusqlite { - // source, - // actor_id: None, - // version: None, - // })?; + snap.insert_db(&tx, RangeInclusiveSet::from_iter(versions.clone())) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: None, + version: None, + })?; snap.update_cleared_ts(&tx, *ts) .map_err(|source| ChangeError::Rusqlite { source, diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index d4b2cd3c..886f8cad 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -528,10 +528,6 @@ pub async fn process_fully_buffered_changes( .await; debug!(%actor_id, %version, "acquired Booked write lock to process fully buffered changes"); - let mut agent_booked = agent - .booked() - .write("get snapshot for update cleared ts") - .await; let inserted = block_in_place(|| { let (last_seq, ts) = { match bookedw.partials.get(&version) { @@ -672,6 +668,12 @@ pub async fn process_fully_buffered_changes( } } + let mut agent_booked = { + agent + .booked() + .blocking_write("process_fully_buffered_changes(get snapshot)") + }; + let mut agent_snap = agent_booked.snapshot(); if let Some(ts) = last_cleared { agent_snap @@ -718,13 +720,17 @@ pub async fn process_multiple_changes( if !seen.insert((change.actor_id, versions, seqs.cloned())) { continue; } - if bookie - .write(format!( - "process_multiple_changes(ensure):{}", - change.actor_id.as_simple() - )) - .await - .ensure(change.actor_id) + + let booked_writer = { + bookie + .write(format!( + "process_multiple_changes(ensure):{}", + change.actor_id.as_simple() + )) + .await + .ensure(change.actor_id) + }; + if booked_writer .read(format!( "process_multiple_changes(contains?):{}", change.actor_id.as_simple() diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 51242b8d..6c5df4fd 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -846,7 +846,7 @@ async fn process_sync( let last_ts = agent .clone() .booked() - .read("process_sync") + .read("process_sync(read_cleared_ts))") .await .last_cleared_ts(); loop { diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 0da2321e..43d21aa6 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -280,7 +280,12 @@ impl From for SyncMessage { } pub async fn get_last_cleared_ts(bookie: &Bookie, actor_id: &ActorId) -> Option { - if let Some(booked) = bookie.read("get_last_cleared_ts").await.get(actor_id) { + let booked = bookie + .read("get_last_cleared_ts") + .await + .get(actor_id) + .cloned(); + if let Some(booked) = booked { let booked_reader = booked.read("get_last_cleared_ts").await; return booked_reader.last_cleared_ts(); }