Skip to content

Commit

Permalink
Remove changes that failed to process from seen map (#230)
Browse files Browse the repository at this point in the history
* remove changes that failed to process from seen map

* use parking_lot mutex

* return a future instead of using a mutex

---------

Co-authored-by: Jerome Gravel-Niquet <[email protected]>
  • Loading branch information
somtochiama and jeromegn authored Jul 2, 2024
1 parent 816dc5e commit cccfad8
Showing 1 changed file with 44 additions and 12 deletions.
56 changes: 44 additions & 12 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! This module is _big_ and maybe should be split up further.

use std::collections::BTreeMap;
use std::{
cmp,
collections::VecDeque,
Expand All @@ -25,6 +26,7 @@ use corro_types::{
};

use bytes::Bytes;
use corro_types::base::Version;
use foca::Notification;
use indexmap::IndexMap;
use metrics::{counter, gauge, histogram};
Expand Down Expand Up @@ -447,11 +449,26 @@ pub async fn handle_changes(
}

debug!(count = %tmp_cost, "spawning processing multiple changes from beginning of loop");
join_set.spawn(util::process_multiple_changes(
agent.clone(),
bookie.clone(),
std::mem::take(&mut buf),
));
let changes = std::mem::take(&mut buf);
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(async move {
if let Err(e) = util::process_multiple_changes(agent, bookie, changes.clone()).await
{
error!("could not process multiple changes: {e}");
changes.iter().fold(
BTreeMap::new(),
|mut acc: BTreeMap<ActorId, RangeInclusiveSet<Version>>, change| {
acc.entry(change.0.actor_id)
.or_default()
.insert(change.0.versions());
acc
},
)
} else {
BTreeMap::new()
}
});

buf_cost -= tmp_cost;
}
Expand All @@ -463,8 +480,13 @@ pub async fn handle_changes(
// but we need to drain it to free up concurrency
res = join_set.join_next(), if !join_set.is_empty() => {
debug!("processed multiple changes concurrently");
if let Some(Ok(Err(e))) = res {
error!("could not process multiple changes: {e}");
if let Some(Ok(res)) = res {
for (actor_id, versions) in res {
let versions: Vec<_> = versions.into_iter().flatten().collect();
for version in versions {
seen.remove(&(actor_id, version));
}
}
}
continue;
},
Expand All @@ -484,11 +506,21 @@ pub async fn handle_changes(
if buf_cost < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT {
// we can process this right away
debug!(%buf_cost, "spawning processing multiple changes from max wait interval");
join_set.spawn(util::process_multiple_changes(
agent.clone(),
bookie.clone(),
queue.drain(..).collect(),
));
let changes: Vec<_> = queue.drain(..).collect();
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(async move {
if let Err(e) = util::process_multiple_changes(agent, bookie, changes.clone()).await
{
error!("could not process multiple changes: {e}");
changes.iter().fold(BTreeMap::new(), |mut acc: BTreeMap<ActorId, RangeInclusiveSet<Version>> , change| {
acc.entry(change.0.actor_id).or_default().insert(change.0.versions());
acc
})
} else {
BTreeMap::new()
}
});
buf_cost = 0;
}

Expand Down

0 comments on commit cccfad8

Please sign in to comment.