From d3722abdbd1e7e0ccb377f070dce4b7114f635ae Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 4 Jul 2024 15:40:55 +0100 Subject: [PATCH] order nodes chosen for sync by last sync time also --- crates/corro-agent/src/agent/handlers.rs | 22 +++++++++++++++++++--- crates/corro-types/src/broadcast.rs | 2 +- crates/corro-types/src/members.rs | 8 ++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index e4d7e4c1..6a7e079d 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -792,8 +792,15 @@ pub async fn handle_sync( **id != agent.actor_id() && state.cluster_id == agent.cluster_id() }) // Grab a ring-buffer index to the member RTT range - .map(|(id, state)| (*id, state.ring.unwrap_or(255), state.addr)) - .collect::>() + .map(|(id, state)| { + ( + *id, + state.ring.unwrap_or(255), + state.addr, + state.last_sync_ts, + ) + }) + .collect::)>>() }; if candidates.is_empty() { @@ -816,6 +823,8 @@ pub async fn handle_sync( sync_state .need_len_for_actor(&b.0) .cmp(&sync_state.need_len_for_actor(&a.0)) + // if equal, look at last sync time + .then_with(|| a.3.cmp(&b.3)) // if equal, look at proximity (via `ring`) .then_with(|| a.1.cmp(&b.1)) }); @@ -823,7 +832,7 @@ pub async fn handle_sync( choices.truncate(desired_count); choices .into_iter() - .map(|(actor_id, _, addr)| (actor_id, addr)) + .map(|(actor_id, _, addr, _)| (actor_id, addr)) .collect() }; @@ -852,6 +861,7 @@ pub async fn handle_sync( info!( "synced {n} changes w/ {} in {}s @ {} changes/s", chosen + .clone() .into_iter() .map(|(actor_id, _)| actor_id.to_string()) .collect::>() @@ -859,6 +869,12 @@ pub async fn handle_sync( elapsed.as_secs_f64(), n as f64 / elapsed.as_secs_f64() ); + + let ts = Timestamp::from(agent.clock().new_timestamp()); + for (actor_id, _) in chosen { + let mut members = agent.members().write(); + members.update_sync_ts(actor_id, ts); + } } Ok(()) } diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index ee859ad1..382bfce5 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -264,7 +264,7 @@ pub enum TimestampParseError { Parse(ParseNTP64Error), } -#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Hash)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Ord, Hash)] #[serde(transparent)] pub struct Timestamp(pub NTP64); diff --git a/crates/corro-types/src/members.rs b/crates/corro-types/src/members.rs index c5c77fff..753b2df5 100644 --- a/crates/corro-types/src/members.rs +++ b/crates/corro-types/src/members.rs @@ -16,6 +16,7 @@ pub struct MemberState { pub cluster_id: ClusterId, pub ring: Option, + pub last_sync_ts: Option, } impl MemberState { @@ -25,6 +26,7 @@ impl MemberState { ts, cluster_id, ring: None, + last_sync_ts: None, } } @@ -59,6 +61,12 @@ impl Members { self.states.get(id) } + pub fn update_sync_ts(&mut self, actor_id: ActorId, ts: Timestamp) { + if let Some(state) = self.states.get_mut(&actor_id) { + state.last_sync_ts = Some(ts); + } + } + // A result of `true` means that the effective list of // cluster member addresses has changed pub fn add_member(&mut self, actor: &Actor) -> MemberAddedResult {