diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 52966758..8f76c19b 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -388,6 +388,16 @@ pub fn spawn_handle_db_cleanup(pool: SplitPool) { }); } +// determine the estimated resource cost of processing a change +fn processing_cost(change: &Changeset) -> usize { + if change.is_empty() { + let versions = change.versions(); + cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20) + } else { + change.len() + } +} + /// Bundle incoming changes to optimise transaction sizes with SQLite /// /// *Performance tradeoff*: introduce latency (with a max timeout) to @@ -403,7 +413,7 @@ pub async fn handle_changes( let max_changes_chunk: usize = agent.config().perf.apply_queue_len; let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new(); let mut buf = vec![]; - let mut count = 0; + let mut buf_cost = 0; const MAX_CONCURRENT: usize = 5; let mut join_set = JoinSet::new(); @@ -419,15 +429,15 @@ pub async fn handle_changes( // complicated loop to process changes efficiently w/ a max concurrency // and a minimum chunk size for bigger and faster SQLite transactions loop { - while count >= max_changes_chunk && join_set.len() < MAX_CONCURRENT { + while buf_cost >= max_changes_chunk && join_set.len() < MAX_CONCURRENT { // we're already bigger than the minimum size of changes batch // so we want to accumulate at least that much and process them // concurrently bvased on MAX_CONCURRENCY - let mut tmp_count = 0; + let mut tmp_cost = 0; while let Some((change, src, queued_at)) = queue.pop_front() { - tmp_count += change.len(); + tmp_cost += processing_cost(&change); buf.push((change, src, queued_at)); - if tmp_count >= max_changes_chunk { + if tmp_cost >= max_changes_chunk { break; } } @@ -436,17 +446,17 @@ pub async fn handle_changes( break; } - debug!(count = %tmp_count, "spawning processing multiple changes from beginning of loop"); + debug!(count = %tmp_cost, "spawning processing multiple changes from beginning of loop"); join_set.spawn(util::process_multiple_changes( agent.clone(), bookie.clone(), std::mem::take(&mut buf), )); - count -= tmp_count; + buf_cost -= tmp_cost; } - tokio::select! { + let (change, src) = tokio::select! { biased; // process these first, we don't care about the result, @@ -459,152 +469,128 @@ pub async fn handle_changes( continue; }, - Some((change, src)) = rx_changes.recv() => { - let change_len = change.len(); - counter!("corro.agent.changes.recv").increment(std::cmp::max(change_len, 1) as u64); // count empties... - - if change.actor_id == agent.actor_id() { - continue; - } - - if let Some(mut seqs) = change.seqs().cloned() { - let v = *change.versions().start(); - if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) { - if seqs.all(|seq| seen_seqs.contains(&seq)) { - continue; - } - } - } else { - // empty versions - if change.versions().all(|v| seen.contains_key(&(change.actor_id, v))) { - continue; - } - } - - let recv_lag = change - .ts() - .map(|ts| (agent.clock().new_timestamp().get_time() - ts.0).to_duration()); - - if matches!(src, ChangeSource::Broadcast) { - counter!("corro.broadcast.recv.count", "kind" => "change").increment(1); - } - - let booked = { - bookie - .read(format!( - "handle_change(get):{}", - change.actor_id.as_simple() - )) - .await - .get(&change.actor_id) - .cloned() - }; - - if let Some(booked) = booked { - if booked - .read(format!( - "handle_change(contains?):{}", - change.actor_id.as_simple() - )) - .await - .contains_all(change.versions(), change.seqs()) - { - trace!("already seen, stop disseminating"); - continue; - } - } - - if let Some(recv_lag) = recv_lag { - let src_str: &'static str = src.into(); - histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str).record(recv_lag.as_secs_f64()); - } - - // this will only run once for a non-empty changeset - for v in change.versions() { - let entry = seen.entry((change.actor_id, v)).or_default(); - if let Some(seqs) = change.seqs().cloned() { - entry.extend([seqs]); - } - } - - if matches!(src, ChangeSource::Broadcast) && !change.is_empty() { - if let Err(_e) = - agent - .tx_bcast() - .try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change(change.clone()))) - { - debug!("broadcasts are full or done!"); - } - } - - queue.push_back((change, src, Instant::now())); - - count += change_len; // track number of individual changes, not changesets + maybe_change_src = rx_changes.recv() => match maybe_change_src { + Some((change, src)) => (change, src), + None => break, }, _ = max_wait.tick() => { // got a wait interval tick... - gauge!("corro.agent.changes.in_queue").set(count as f64); + gauge!("corro.agent.changes.in_queue").set(buf_cost as f64); gauge!("corro.agent.changesets.in_queue").set(queue.len() as f64); gauge!("corro.agent.changes.processing.jobs").set(join_set.len() as f64); - if count < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT { + if buf_cost < max_changes_chunk && !queue.is_empty() && join_set.len() < MAX_CONCURRENT { // we can process this right away - debug!(%count, "spawning processing multiple changes from max wait interval"); + debug!(%buf_cost, "spawning processing multiple changes from max wait interval"); join_set.spawn(util::process_multiple_changes( agent.clone(), bookie.clone(), queue.drain(..).collect(), )); - count = 0; + buf_cost = 0; } if seen.len() > MAX_SEEN_CACHE_LEN { // we don't want to keep too many entries in here. seen = seen.split_off(seen.len() - KEEP_SEEN_CACHE_SIZE); } + continue }, _ = &mut tripwire => { break; } + }; - else => { - break; + let change_len = change.len(); + counter!("corro.agent.changes.recv").increment(std::cmp::max(change_len, 1) as u64); // count empties... + + if change.actor_id == agent.actor_id() { + continue; + } + + if let Some(mut seqs) = change.seqs().cloned() { + let v = *change.versions().start(); + if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) { + if seqs.all(|seq| seen_seqs.contains(&seq)) { + continue; + } + } + } else { + // empty versions + if change + .versions() + .all(|v| seen.contains_key(&(change.actor_id, v))) + { + continue; } } - } - info!("Draining changes receiver..."); + let recv_lag = change + .ts() + .map(|ts| (agent.clock().new_timestamp().get_time() - ts.0).to_duration()); - // drain! - while let Ok((change, src)) = rx_changes.try_recv() { - let changes_count = std::cmp::max(change.len(), 1); - counter!("corro.agent.changes.recv").increment(changes_count as u64); - count += changes_count; - queue.push_back((change, src, Instant::now())); - if count >= max_changes_chunk { - // drain and process current changes! - if let Err(e) = util::process_multiple_changes( - agent.clone(), - bookie.clone(), - queue.drain(..).collect(), - ) - .await + if matches!(src, ChangeSource::Broadcast) { + counter!("corro.broadcast.recv.count", "kind" => "change").increment(1); + } + + let booked = { + bookie + .read(format!( + "handle_change(get):{}", + change.actor_id.as_simple() + )) + .await + .get(&change.actor_id) + .cloned() + }; + + if let Some(booked) = booked { + if booked + .read(format!( + "handle_change(contains?):{}", + change.actor_id.as_simple() + )) + .await + .contains_all(change.versions(), change.seqs()) { - error!("could not process last multiple changes: {e}"); + trace!("already seen, stop disseminating"); + continue; } + } - // reset count - count = 0; + if let Some(recv_lag) = recv_lag { + let src_str: &'static str = src.into(); + histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str) + .record(recv_lag.as_secs_f64()); } - } - // process the last changes we got! - if let Err(e) = util::process_multiple_changes(agent, bookie, queue.into_iter().collect()).await - { - error!("could not process multiple changes: {e}"); + // this will only run once for a non-empty changeset + for v in change.versions() { + let entry = seen.entry((change.actor_id, v)).or_default(); + if let Some(seqs) = change.seqs().cloned() { + entry.extend([seqs]); + } + } + + if matches!(src, ChangeSource::Broadcast) && !change.is_empty() { + if let Err(_e) = + agent + .tx_bcast() + .try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change( + change.clone(), + ))) + { + debug!("broadcasts are full or done!"); + } + } + + let cost = processing_cost(&change); + queue.push_back((change, src, Instant::now())); + + buf_cost += cost; // tracks the cost, not number of changes } }