Skip to content

Commit

Permalink
process changes in chunks based on 'cost' and not the number of chang…
Browse files Browse the repository at this point in the history
…es (though that also informs it)
  • Loading branch information
jeromegn committed Apr 16, 2024
1 parent 9926b47 commit a33673e
Showing 1 changed file with 105 additions and 119 deletions.
224 changes: 105 additions & 119 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,16 @@ pub fn spawn_handle_db_cleanup(pool: SplitPool) {
});
}

// determine the estimated resource cost of processing a change
fn processing_cost(change: &Changeset) -> usize {
if change.is_empty() {
let versions = change.versions();
cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)
} else {
change.len()
}
}

/// Bundle incoming changes to optimise transaction sizes with SQLite
///
/// *Performance tradeoff*: introduce latency (with a max timeout) to
Expand All @@ -403,7 +413,7 @@ pub async fn handle_changes(
let max_changes_chunk: usize = agent.config().perf.apply_queue_len;
let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new();
let mut buf = vec![];
let mut count = 0;
let mut buf_cost = 0;

const MAX_CONCURRENT: usize = 5;
let mut join_set = JoinSet::new();
Expand All @@ -419,15 +429,15 @@ pub async fn handle_changes(
// complicated loop to process changes efficiently w/ a max concurrency
// and a minimum chunk size for bigger and faster SQLite transactions
loop {
while count >= max_changes_chunk && join_set.len() < MAX_CONCURRENT {
while buf_cost >= max_changes_chunk && join_set.len() < MAX_CONCURRENT {
// we're already bigger than the minimum size of changes batch
// so we want to accumulate at least that much and process them
// concurrently bvased on MAX_CONCURRENCY
let mut tmp_count = 0;
let mut tmp_cost = 0;
while let Some((change, src, queued_at)) = queue.pop_front() {
tmp_count += change.len();
tmp_cost += processing_cost(&change);
buf.push((change, src, queued_at));
if tmp_count >= max_changes_chunk {
if tmp_cost >= max_changes_chunk {
break;
}
}
Expand All @@ -436,17 +446,17 @@ pub async fn handle_changes(
break;
}

debug!(count = %tmp_count, "spawning processing multiple changes from beginning of loop");
debug!(count = %tmp_cost, "spawning processing multiple changes from beginning of loop");
join_set.spawn(util::process_multiple_changes(
agent.clone(),
bookie.clone(),
std::mem::take(&mut buf),
));

count -= tmp_count;
buf_cost -= tmp_cost;
}

tokio::select! {
let (change, src) = tokio::select! {
biased;

// process these first, we don't care about the result,
Expand All @@ -459,152 +469,128 @@ pub async fn handle_changes(
continue;
},

Some((change, src)) = rx_changes.recv() => {
let change_len = change.len();
counter!("corro.agent.changes.recv").increment(std::cmp::max(change_len, 1) as u64); // count empties...

if change.actor_id == agent.actor_id() {
continue;
}

if let Some(mut seqs) = change.seqs().cloned() {
let v = *change.versions().start();
if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) {
if seqs.all(|seq| seen_seqs.contains(&seq)) {
continue;
}
}
} else {
// empty versions
if change.versions().all(|v| seen.contains_key(&(change.actor_id, v))) {
continue;
}
}

let recv_lag = change
.ts()
.map(|ts| (agent.clock().new_timestamp().get_time() - ts.0).to_duration());

if matches!(src, ChangeSource::Broadcast) {
counter!("corro.broadcast.recv.count", "kind" => "change").increment(1);
}

let booked = {
bookie
.read(format!(
"handle_change(get):{}",
change.actor_id.as_simple()
))
.await
.get(&change.actor_id)
.cloned()
};

if let Some(booked) = booked {
if booked
.read(format!(
"handle_change(contains?):{}",
change.actor_id.as_simple()
))
.await
.contains_all(change.versions(), change.seqs())
{
trace!("already seen, stop disseminating");
continue;
}
}

if let Some(recv_lag) = recv_lag {
let src_str: &'static str = src.into();
histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str).record(recv_lag.as_secs_f64());
}

// this will only run once for a non-empty changeset
for v in change.versions() {
let entry = seen.entry((change.actor_id, v)).or_default();
if let Some(seqs) = change.seqs().cloned() {
entry.extend([seqs]);
}
}

if matches!(src, ChangeSource::Broadcast) && !change.is_empty() {
if let Err(_e) =
agent
.tx_bcast()
.try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change(change.clone())))
{
debug!("broadcasts are full or done!");
}
}

queue.push_back((change, src, Instant::now()));

count += change_len; // track number of individual changes, not changesets
maybe_change_src = rx_changes.recv() => match maybe_change_src {
Some((change, src)) => (change, src),
None => break,
},

_ = max_wait.tick() => {
// got a wait interval tick...

gauge!("corro.agent.changes.in_queue").set(count as f64);
gauge!("corro.agent.changes.in_queue").set(buf_cost as f64);
gauge!("corro.agent.changesets.in_queue").set(queue.len() as f64);
gauge!("corro.agent.changes.processing.jobs").set(join_set.len() as f64);

if count < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT {
if buf_cost < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT {
// we can process this right away
debug!(%count, "spawning processing multiple changes from max wait interval");
debug!(%buf_cost, "spawning processing multiple changes from max wait interval");
join_set.spawn(util::process_multiple_changes(
agent.clone(),
bookie.clone(),
queue.drain(..).collect(),
));
count = 0;
buf_cost = 0;
}

if seen.len() > MAX_SEEN_CACHE_LEN {
// we don't want to keep too many entries in here.
seen = seen.split_off(seen.len() - KEEP_SEEN_CACHE_SIZE);
}
continue
},

_ = &mut tripwire => {
break;
}
};

else => {
break;
let change_len = change.len();
counter!("corro.agent.changes.recv").increment(std::cmp::max(change_len, 1) as u64); // count empties...

if change.actor_id == agent.actor_id() {
continue;
}

if let Some(mut seqs) = change.seqs().cloned() {
let v = *change.versions().start();
if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) {
if seqs.all(|seq| seen_seqs.contains(&seq)) {
continue;
}
}
} else {
// empty versions
if change
.versions()
.all(|v| seen.contains_key(&(change.actor_id, v)))
{
continue;
}
}
}

info!("Draining changes receiver...");
let recv_lag = change
.ts()
.map(|ts| (agent.clock().new_timestamp().get_time() - ts.0).to_duration());

// drain!
while let Ok((change, src)) = rx_changes.try_recv() {
let changes_count = std::cmp::max(change.len(), 1);
counter!("corro.agent.changes.recv").increment(changes_count as u64);
count += changes_count;
queue.push_back((change, src, Instant::now()));
if count >= max_changes_chunk {
// drain and process current changes!
if let Err(e) = util::process_multiple_changes(
agent.clone(),
bookie.clone(),
queue.drain(..).collect(),
)
.await
if matches!(src, ChangeSource::Broadcast) {
counter!("corro.broadcast.recv.count", "kind" => "change").increment(1);
}

let booked = {
bookie
.read(format!(
"handle_change(get):{}",
change.actor_id.as_simple()
))
.await
.get(&change.actor_id)
.cloned()
};

if let Some(booked) = booked {
if booked
.read(format!(
"handle_change(contains?):{}",
change.actor_id.as_simple()
))
.await
.contains_all(change.versions(), change.seqs())
{
error!("could not process last multiple changes: {e}");
trace!("already seen, stop disseminating");
continue;
}
}

// reset count
count = 0;
if let Some(recv_lag) = recv_lag {
let src_str: &'static str = src.into();
histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str)
.record(recv_lag.as_secs_f64());
}
}

// process the last changes we got!
if let Err(e) = util::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await
{
error!("could not process multiple changes: {e}");
// this will only run once for a non-empty changeset
for v in change.versions() {
let entry = seen.entry((change.actor_id, v)).or_default();
if let Some(seqs) = change.seqs().cloned() {
entry.extend([seqs]);
}
}

if matches!(src, ChangeSource::Broadcast) && !change.is_empty() {
if let Err(_e) =
agent
.tx_bcast()
.try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change(
change.clone(),
)))
{
debug!("broadcasts are full or done!");
}
}

let cost = processing_cost(&change);
queue.push_back((change, src, Instant::now()));

buf_cost += cost; // tracks the cost, not number of changes
}
}

Expand Down

0 comments on commit a33673e

Please sign in to comment.