diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index ced1fe6..34e72a6 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, HashSet}; use std::mem; use crate::serialize::*; -use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue}; +use crate::{ChitchatId, Heartbeat, VersionedValue}; #[derive(Debug, Default, Eq, PartialEq)] pub struct Delta { @@ -84,8 +84,6 @@ impl Delta { tombstone: Option, ) { let node_delta = self.node_deltas.get_mut(chitchat_id).unwrap(); - - node_delta.max_version = node_delta.max_version.max(version); node_delta.key_values.insert( key.to_string(), VersionedValue { @@ -105,8 +103,6 @@ impl Delta { pub(crate) struct NodeDelta { pub heartbeat: Heartbeat, pub key_values: BTreeMap, - // This attribute is computed upon deserialization. 0 if `key_values` is empty. - pub max_version: MaxVersion, } #[cfg(test)] @@ -191,10 +187,6 @@ impl DeltaWriter { ) { return false; } - self.current_node_delta.max_version = self - .current_node_delta - .max_version - .max(versioned_value.version); self.current_node_delta .key_values .insert(key.to_string(), versioned_value); @@ -238,14 +230,12 @@ impl Serializable for NodeDelta { fn deserialize(buf: &mut &[u8]) -> anyhow::Result { let heartbeat = Heartbeat::deserialize(buf)?; let mut key_values: BTreeMap = Default::default(); - let mut max_version = 0; let num_key_values = u16::deserialize(buf)?; for _ in 0..num_key_values { let key = String::deserialize(buf)?; let value = String::deserialize(buf)?; let version = u64::deserialize(buf)?; let tombstone = >::deserialize(buf)?; - max_version = max_version.max(version); key_values.insert( key, VersionedValue { @@ -258,7 +248,6 @@ impl Serializable for NodeDelta { Ok(Self { heartbeat, key_values, - max_version, }) } diff --git a/chitchat/src/digest.rs b/chitchat/src/digest.rs index 2371dd7..9e0252b 100644 --- a/chitchat/src/digest.rs +++ b/chitchat/src/digest.rs @@ -1,16 +1,16 @@ use std::collections::BTreeMap; use crate::serialize::*; -use crate::{ChitchatId, Heartbeat, MaxVersion}; +use crate::{ChitchatId, Heartbeat, Version}; #[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] pub(crate) struct NodeDigest { pub(crate) heartbeat: Heartbeat, - pub(crate) max_version: MaxVersion, + pub(crate) max_version: Version, } impl NodeDigest { - pub(crate) fn new(heartbeat: Heartbeat, max_version: MaxVersion) -> Self { + pub(crate) fn new(heartbeat: Heartbeat, max_version: Version) -> Self { Self { heartbeat, max_version, @@ -30,7 +30,7 @@ pub struct Digest { #[cfg(test)] impl Digest { - pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: MaxVersion) { + pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: Version) { let node_digest = NodeDigest::new(heartbeat, max_version); self.node_digests.insert(node, node_digest); } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 9368721..7ffc261 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -33,7 +33,7 @@ use crate::message::syn_ack_serialized_len; pub use crate::message::ChitchatMessage; pub use crate::server::{spawn_chitchat, ChitchatHandle}; use crate::state::ClusterState; -pub use crate::types::{ChitchatId, Heartbeat, MaxVersion, Version, VersionedValue}; +pub use crate::types::{ChitchatId, Heartbeat, Version, VersionedValue}; /// Maximum UDP datagram payload size (in bytes). /// @@ -51,7 +51,7 @@ pub struct Chitchat { cluster_state: ClusterState, failure_detector: FailureDetector, /// Notifies listeners when a change has occurred in the set of live nodes. - previous_live_nodes: HashMap, + previous_live_nodes: HashMap, live_nodes_watcher_tx: watch::Sender>, live_nodes_watcher_rx: watch::Receiver>, } @@ -117,7 +117,6 @@ impl Chitchat { &dead_nodes, self.config.marked_for_deletion_grace_period as u64, ); - self.report_heartbeats(&delta); Some(ChitchatMessage::SynAck { digest: self_digest, delta, @@ -125,7 +124,6 @@ impl Chitchat { } ChitchatMessage::SynAck { digest, delta } => { self.report_heartbeats(&delta); - // self.config.chitchat_id.node_id, digest, delta); self.cluster_state.apply_delta(delta); let dead_nodes = self.dead_nodes().collect::>(); let delta = self.cluster_state.compute_delta( @@ -161,9 +159,7 @@ impl Chitchat { fn report_heartbeats(&mut self, delta: &Delta) { for (chitchat_id, node_delta) in &delta.node_deltas { if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) { - if node_state.heartbeat() < node_delta.heartbeat - || node_state.max_version() < node_delta.max_version - { + if node_state.heartbeat() < node_delta.heartbeat { self.failure_detector.report_heartbeat(chitchat_id); } } else { diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index de8a607..0b2be33 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -15,14 +15,14 @@ use tracing::warn; use crate::delta::{Delta, DeltaWriter}; use crate::digest::{Digest, NodeDigest}; use crate::listener::Listeners; -use crate::{ChitchatId, Heartbeat, KeyChangeEvent, MaxVersion, Version, VersionedValue}; +use crate::{ChitchatId, Heartbeat, KeyChangeEvent, Version, VersionedValue}; #[derive(Clone, Serialize, Deserialize)] pub struct NodeState { node_id: ChitchatId, heartbeat: Heartbeat, key_values: BTreeMap, - max_version: MaxVersion, + max_version: Version, #[serde(skip)] #[serde(default = "Instant::now")] last_heartbeat: Instant, @@ -74,7 +74,7 @@ impl NodeState { } /// Returns the node's max version. - pub fn max_version(&self) -> MaxVersion { + pub fn max_version(&self) -> Version { self.max_version } @@ -305,7 +305,6 @@ impl ClusterState { node_state.heartbeat = node_delta.heartbeat; node_state.last_heartbeat = Instant::now(); } - node_state.max_version = node_state.max_version.max(node_delta.max_version); for (key, versioned_value) in node_delta.key_values { node_state.max_version = node_state.max_version.max(versioned_value.version); node_state.set_versioned_value(key, versioned_value); diff --git a/chitchat/src/transport/udp.rs b/chitchat/src/transport/udp.rs index 327602d..59d5e89 100644 --- a/chitchat/src/transport/udp.rs +++ b/chitchat/src/transport/udp.rs @@ -24,7 +24,7 @@ impl Transport for UdpTransport { } } -struct UdpSocket { +pub struct UdpSocket { buf_send: Vec, buf_recv: Box<[u8; MAX_UDP_DATAGRAM_PAYLOAD_SIZE]>, socket: tokio::net::UdpSocket, diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index 8a919e8..55af70d 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -68,9 +68,6 @@ impl VersionedValue { /// The current version of a key. pub type Version = u64; -/// The highest version of a key in a node's state. -pub type MaxVersion = u64; - /// The current heartbeat of a node. #[derive( Debug, Clone, Copy, Default, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize, diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index 725973d..ccaf816 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -326,6 +326,7 @@ async fn test_simple_simulation_with_network_partition() { #[tokio::test] async fn test_marked_for_deletion_gc_with_network_partition() { + const TIMEOUT: Duration = Duration::from_millis(500); // let _ = tracing_subscriber::fmt::try_init(); let mut simulator = Simulator::new(Duration::from_millis(100), 10); let chitchat_id_1 = create_chitchat_id("node-1"); @@ -358,13 +359,13 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, // Isolate node 3. Operation::RemoveNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()), @@ -379,7 +380,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), true), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, // Check marked for deletion is not propagated to node 3. Operation::NodeStateAssert { @@ -394,7 +395,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, Operation::NodeStateAssert { server_chitchat_id: chitchat_id_1.clone(), @@ -411,7 +412,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { }, // Wait for propagation // We need to wait longer... because node 4 is just starting? - Operation::Wait(Duration::from_millis(500)), + Operation::Wait(TIMEOUT), Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), @@ -422,7 +423,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_4.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), - timeout_opt: None, + timeout_opt: Some(TIMEOUT), }, // Relink node 3 Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()), @@ -431,13 +432,13 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(Duration::from_millis(500)), + timeout_opt: Some(TIMEOUT), }, Operation::NodeStateAssert { server_chitchat_id: chitchat_id_4.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(Duration::from_millis(500)), + timeout_opt: Some(TIMEOUT), }, ]; simulator.execute(operations).await;