Skip to content

Commit

Permalink
order nodes chosen for sync by last sync time also
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Jul 4, 2024
1 parent a82563d commit 40f53c4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
31 changes: 28 additions & 3 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::RangeInclusive;

use std::cmp::Ordering;
use std::{
cmp,
collections::VecDeque,
Expand Down Expand Up @@ -792,8 +793,15 @@ pub async fn handle_sync(
**id != agent.actor_id() && state.cluster_id == agent.cluster_id()
})
// Grab a ring-buffer index to the member RTT range
.map(|(id, state)| (*id, state.ring.unwrap_or(255), state.addr))
.collect::<Vec<(ActorId, u8, SocketAddr)>>()
.map(|(id, state)| {
(
*id,
state.ring.unwrap_or(255),
state.addr,
state.last_sync_ts,
)
})
.collect::<Vec<(ActorId, u8, SocketAddr, Option<Timestamp>)>>()
};

if candidates.is_empty() {
Expand All @@ -816,14 +824,24 @@ pub async fn handle_sync(
sync_state
.need_len_for_actor(&b.0)
.cmp(&sync_state.need_len_for_actor(&a.0))
// if equal, look at last sync time
.then_with(|| {
if a.3.is_none() {
return Ordering::Greater;
}
if b.3.is_none() {
return Ordering::Less;
}
a.3.unwrap().cmp(&b.3.unwrap())
})
// if equal, look at proximity (via `ring`)
.then_with(|| a.1.cmp(&b.1))
});

choices.truncate(desired_count);
choices
.into_iter()
.map(|(actor_id, _, addr)| (actor_id, addr))
.map(|(actor_id, _, addr, _)| (actor_id, addr))
.collect()
};

Expand Down Expand Up @@ -852,13 +870,20 @@ pub async fn handle_sync(
info!(
"synced {n} changes w/ {} in {}s @ {} changes/s",
chosen
.clone()
.into_iter()
.map(|(actor_id, _)| actor_id.to_string())
.collect::<Vec<_>>()
.join(", "),
elapsed.as_secs_f64(),
n as f64 / elapsed.as_secs_f64()
);

let ts = Timestamp::from(agent.clock().new_timestamp());
for (actor_id, _) in chosen {
let mut members = agent.members().write();
members.update_sync_ts(actor_id, ts);
}
}
Ok(())
}
Expand Down
8 changes: 8 additions & 0 deletions crates/corro-types/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct MemberState {
pub cluster_id: ClusterId,

pub ring: Option<u8>,
pub last_sync_ts: Option<Timestamp>,
}

impl MemberState {
Expand All @@ -25,6 +26,7 @@ impl MemberState {
ts,
cluster_id,
ring: None,
last_sync_ts: None,
}
}

Expand Down Expand Up @@ -59,6 +61,12 @@ impl Members {
self.states.get(id)
}

pub fn update_sync_ts(&mut self, actor_id: ActorId, ts: Timestamp) {
if let Some(state) = self.states.get_mut(&actor_id) {
state.last_sync_ts = Some(ts);
}
}

// A result of `true` means that the effective list of
// cluster member addresses has changed
pub fn add_member(&mut self, actor: &Actor) -> MemberAddedResult {
Expand Down

0 comments on commit 40f53c4

Please sign in to comment.