diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 95dfb84..9348f3e 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -24,7 +24,7 @@ pub use listener::ListenerHandle; pub use serialize::Serializable; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; pub use self::configuration::ChitchatConfig; pub use self::state::{ClusterStateSnapshot, NodeState}; @@ -121,6 +121,7 @@ impl Chitchat { } pub(crate) fn process_message(&mut self, msg: ChitchatMessage) -> Option { + debug!(msg=?msg, "received message"); self.update_self_heartbeat(); match msg { diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 81bc446..1a2bbed 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -276,6 +276,8 @@ impl Server { .into_iter() .filter(|addr| *addr != chitchat_guard.self_chitchat_id().gossip_advertise_addr) .collect(); + + debug!(peer_nodes=?peer_nodes, live_nodes=?live_nodes, dead_nodes=?dead_nodes, "gossip"); let (selected_nodes, random_dead_node_opt, random_seed_node_opt) = select_nodes_for_gossip( &mut self.rng, peer_nodes, @@ -283,6 +285,7 @@ impl Server { dead_nodes, seed_nodes, ); + debug!(selected_nodes=?selected_nodes, random_dead_node_opt=?random_dead_node_opt, "selected_nodes"); chitchat_guard.update_self_heartbeat(); chitchat_guard.gc_keys_marked_for_deletion(); @@ -313,6 +316,7 @@ impl Server { /// Gossips with another peer. async fn gossip(&mut self, addr: SocketAddr) -> anyhow::Result<()> { let syn = self.chitchat.lock().await.create_syn_message(); + debug!(syn=?syn, addr=?addr, "sending syn to addr"); self.transport.send(addr, syn).await?; Ok(()) } diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index b2a09c2..5e022fa 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -12,7 +12,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::time::Instant; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::delta::{Delta, DeltaSerializer, NodeDelta}; use crate::digest::{Digest, NodeDigest}; @@ -633,11 +633,13 @@ impl ClusterState { let mut delta_serializer = DeltaSerializer::with_mtu(mtu); for stale_node in stale_nodes.into_iter() { + debug!(chitchat_id=?stale_node.chitchat_id, from_version=stale_node.from_version_excluded, "adding stale node"); if !delta_serializer.try_add_node( stale_node.chitchat_id.clone(), stale_node.node_state.last_gc_version, stale_node.from_version_excluded, ) { + debug!("reached mtu after adding node"); break; }; @@ -654,6 +656,10 @@ impl ClusterState { // This call returns false if the mtu has been reached. // // In that case, this empty node update is useless but does not hurt correctness. + debug!( + max_version = stale_node.node_state.max_version, + "set max version" + ); let _ = delta_serializer.try_set_max_version(stale_node.node_state.max_version); } } @@ -944,19 +950,46 @@ mod tests { // No stale records (due to the floor version). This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); - let node3 = ChitchatId::for_local_test(10_002); + let node3 = ChitchatId::for_local_test(10_003); let mut node3_state = NodeState::for_test(); node3_state.set_with_version("key_a", "value_a", 1); node3_state.set_with_version("key_b", "value_b", 2); node3_state.set_with_version("key_c", "value_c", 3); stale_nodes.offer(&node3, &node3_state, 1u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); - let expected_staleness = Staleness { + + let node4 = ChitchatId::for_local_test(10_004); + let mut node4_state = NodeState::for_test(); + node4_state.set_with_version("key_a", "value_a", 1); + node4_state.set_with_version("key_b", "value_b", 2); + node4_state.set_with_version("key_c", "value_c", 3); + node4_state.set_with_version("key_d", "value_d", 4); + node4_state.set_with_version("key_e", "value_e", 5); + node4_state.set_with_version("key_f", "value_f", 6); + node4_state.set_with_version("key_g", "value_g", 7); + node4_state.set_with_version("key_h", "value_h", 8); + stale_nodes.offer(&node4, &node4_state, 4u64); + assert_eq!(stale_nodes.stale_nodes.len(), 2); + + let expected_staleness1 = Staleness { is_unknown: false, - max_version: 1, + max_version: 3, num_stale_key_values: 2, }; - assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); + let expected_staleness2 = Staleness { + is_unknown: false, + max_version: 8, + num_stale_key_values: 4, + }; + + assert_eq!(stale_nodes.stale_nodes[&expected_staleness1].len(), 1); + assert_eq!(stale_nodes.stale_nodes[&expected_staleness2].len(), 1); + + let stale_nodes_sorted: Vec> = stale_nodes.into_iter().collect(); + assert_eq!(stale_nodes_sorted.len(), 2); + + assert_eq!(stale_nodes_sorted[0].chitchat_id, &node4); + assert_eq!(stale_nodes_sorted[1].chitchat_id, &node3); } #[test]