Skip to content

Commit

Permalink
release bookie quickly
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Jul 3, 2024
1 parent 8ed9af4 commit b489955
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 19 deletions.
14 changes: 8 additions & 6 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ pub async fn handle_emptyset(
}
}
}

println!("shutting down handle empties loop");
}

pub async fn process_emptyset(
Expand Down Expand Up @@ -506,12 +508,12 @@ pub async fn process_emptyset(
store_empty_changeset(&tx, actor_id, version.clone(), *ts)?;
}

// snap.insert_db(&tx, RangeInclusiveSet::from_iter(versions.clone()))
// .map_err(|source| ChangeError::Rusqlite {
// source,
// actor_id: None,
// version: None,
// })?;
snap.insert_db(&tx, RangeInclusiveSet::from_iter(versions.clone()))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;
snap.update_cleared_ts(&tx, *ts)
.map_err(|source| ChangeError::Rusqlite {
source,
Expand Down
28 changes: 17 additions & 11 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,6 @@ pub async fn process_fully_buffered_changes(
.await;
debug!(%actor_id, %version, "acquired Booked write lock to process fully buffered changes");

let mut agent_booked = agent
.booked()
.write("get snapshot for update cleared ts")
.await;
let inserted = block_in_place(|| {
let (last_seq, ts) = {
match bookedw.partials.get(&version) {
Expand Down Expand Up @@ -672,6 +668,12 @@ pub async fn process_fully_buffered_changes(
}
}

let mut agent_booked = {
agent
.booked()
.blocking_write("process_fully_buffered_changes(get snapshot)")
};

let mut agent_snap = agent_booked.snapshot();
if let Some(ts) = last_cleared {
agent_snap
Expand Down Expand Up @@ -718,13 +720,17 @@ pub async fn process_multiple_changes(
if !seen.insert((change.actor_id, versions, seqs.cloned())) {
continue;
}
if bookie
.write(format!(
"process_multiple_changes(ensure):{}",
change.actor_id.as_simple()
))
.await
.ensure(change.actor_id)

let booked_writer = {
bookie
.write(format!(
"process_multiple_changes(ensure):{}",
change.actor_id.as_simple()
))
.await
.ensure(change.actor_id)
};
if booked_writer
.read(format!(
"process_multiple_changes(contains?):{}",
change.actor_id.as_simple()
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ async fn process_sync(
let last_ts = agent
.clone()
.booked()
.read("process_sync")
.read("process_sync(read_cleared_ts))")
.await
.last_cleared_ts();
loop {
Expand Down
7 changes: 6 additions & 1 deletion crates/corro-types/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,12 @@ impl From<SyncStateV1> for SyncMessage {
}

pub async fn get_last_cleared_ts(bookie: &Bookie, actor_id: &ActorId) -> Option<Timestamp> {
if let Some(booked) = bookie.read("get_last_cleared_ts").await.get(actor_id) {
let booked = bookie
.read("get_last_cleared_ts")
.await
.get(actor_id)
.cloned();
if let Some(booked) = booked {
let booked_reader = booked.read("get_last_cleared_ts").await;
return booked_reader.last_cleared_ts();
}
Expand Down

0 comments on commit b489955

Please sign in to comment.