diff --git a/README.md b/README.md index 7bdd74c..7540742 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ This crate is used at the core of Quickwit for - cluster membership - failure detection - sharing configuration, and extra metadata values +. -The idea of relying on scuttlebutt reconciliation and phi-accrual detection is borrowed from -Cassandra, itself borrowing it from DynamoDB. +The idea of relying on scuttlebutt reconciliation and phi-accrual detection is borrowed from Cassandra, itself borrowing it from DynamoDB. A anti-entropy gossip algorithm called scuttlebutt is in charge of spreading a common state to all nodes. @@ -28,6 +28,9 @@ Not receiving any update from node for a given amount of time can therefore be regarded as a sign of failure. Rather than using a hard threshold, we use phi-accrual detection to dynamically compute a threshold. +We also abuse `chitchat` in Quickwit and use it like a reliable broadcast, +with different caveats. + # References - ScuttleButt paper: https://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf @@ -36,3 +39,80 @@ we use phi-accrual detection to dynamically compute a threshold. https://www.youtube.com/watch?v=FuP1Fvrv6ZQ - https://docs.datastax.com/en/articles/cassandra/cassandrathenandnow.html - https://github.com/apache/cassandra/blob/f5fb1b0bd32b5dc7da13ec66d43acbdad7fe9dbf/src/java/org/apache/cassandra/gms/Gossiper.java#L1749 + +# Heartbeat + +In order to get a constant flow of updates to feed into phi-accrual detection, +chitchat's node state includes a key-value called `heartbeat`. The heartbeat of a given node, starts at 0, and is incremented once after each round of gossip initiated. + +Nodes then reports all heartbeat updates to a phi-accrual detector to +assess the liveness of this node. Liveness is a local concept. Every single +node computes its own vision of the liveness of all other nodes. + +# KV deletion + +A deletion of a KV is a just another type of mutation: A deletion is +associated with a version, and replicated using the same mechanism as a KV update. + +The library will then interpret this versioned tombstone before exposing kv to +the user. + +To avoid keeping deleted KV indefinitely, the library includes a GC mechanism. Any nodes containing a tombstone older than a given `grace period threshold` +(age is measure in ticks of heartbeat), it is safe to be deleted. + +This yield the following problem. If a node was deconnected for more than +`marked_for_deletion_grace_period`, they could have missed the deletion of a KV and never be aware of it. + +To address this problem, nodes that are too outdated have to reset their state. + +More accurately, let's assume a Node A sends a Syn message to Node B with a digest with an outdated version V for a node N. +Node B will compare the version of the digest with its own version. + +If V is fresher than `own version - marked_for_deletion_grace_period`, +Node B knows that no GC has impacted Key values with a version above V. It can +safely emit a normal delta to A. +If however V is older than `own version - marked_for_deletion_grace_period`, +a GC could have been executed. Instead of sending a delta to A, Node B will +instruct A to reset its state. + +A will wipe-off whatever information it has about N, and will start syncing from a blank state. + +# Node deletion + +In Quickwit, we also use chitchat as a "reliable broadcast with caveats". +The idea of reliable broadcast is that the emission of a message is supposed +to evenutally reach all or no correct nodes in the nodes. + +Of course, if the emitter starts failing before emitting its message, obviously one cannot expect the message to reach anyone. +However, if at least one correct nodes receives the message, it will +eventually reach all correct nodes (assuming the node stays correct). + +For this reason, we keep emitting KVs from dead nodes too. + +In order to avoid keeping the state of dead nodes indefinitely, we take +a very important trade off. + +If a node is marked as dead for more than `DEAD_NODE_GRACE_PERIOD`, we assume that its state can be safely removed from the system. The grace period is +computed from the last time we received an update from the dead node. + +Just deleting the state is of course impossible. After the given `DEAD_NODE_GRACE_PERIOD / 2`, we will mark the dead node as `ScheduledForDeletion`. + +We first stop sharing data about nodes in the `ScheduledForDeletion` state, +nor listing them node in our digest. + +We also ignore any updates received about the dead node. For simplification, we do not even keep track of the last update received. Eventually, all the nodes of the cluster will have marked the dead node as `ScheduledForDeletion`. + +After another `DEAD_NODE_GRACE_PERIOD` / 2 has elapsed since the last update received, we delete the dead node state. + +It is important to set `DEAD_NODE_GRACE_PERIOD` with a value such `DEAD_NODE_GRACE_PERIOD / 2` is much greater than the period it takes to detect a faulty node. + +Note that we are here breaking the reliable broadcast nature of chitchat. +New nodes joining after `DEAD_NODE_GRACE_PERIOD` for instance, will never know about the state of the dead node. + +Also, if a node was disconnected from the cluster for more than `DEAD_NODE_GRACE_PERIOD / 2` and reconnects, it is likely to spread information +about the dead node again. Worse, it could not know about the deletion +of some specific KV and spread them again. + +The chitchat library does not include any mechanism to prevent this from happening. They should however eventually get deleted (after a bit more than `DEAD_NODE_GRACE_PERIOD`) if the node is really dead. + +If the node is alive, it should be able to fix everyone's state via reset or regular delta. diff --git a/chitchat/src/failure_detector.rs b/chitchat/src/failure_detector.rs index 6f10771..a783955 100644 --- a/chitchat/src/failure_detector.rs +++ b/chitchat/src/failure_detector.rs @@ -81,8 +81,9 @@ impl FailureDetector { /// Removes and returns the list of garbage collectible nodes. pub fn garbage_collect(&mut self) -> Vec { let mut garbage_collected_nodes = Vec::new(); - for (chitchat_id, instant) in self.dead_nodes.iter() { - if instant.elapsed() >= self.config.dead_node_grace_period { + let now = Instant::now(); + for (chitchat_id, &time_of_death) in &self.dead_nodes { + if now >= time_of_death + self.config.dead_node_grace_period { garbage_collected_nodes.push(chitchat_id.clone()) } } @@ -102,6 +103,23 @@ impl FailureDetector { self.dead_nodes.keys() } + /// Returns the list of nodes considered dead by the failure detector. + pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator { + let now = Instant::now(); + let half_dead_node_grace_period = self.config.dead_node_grace_period.div_f32(2.0f32); + // Note: we can't just compute the threshold now - half_dead_node_grace_period, because it + // would underflow on some platform (MacOS). + self.dead_nodes + .iter() + .filter_map(move |(chitchat_id, time_of_death)| { + if *time_of_death + half_dead_node_grace_period < now { + Some(chitchat_id) + } else { + None + } + }) + } + /// Returns the current phi value of a node. fn phi(&mut self, chitchat_id: &ChitchatId) -> Option { self.node_samples diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 7ffc261..bfa3ec5 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -87,7 +87,8 @@ impl Chitchat { } pub(crate) fn create_syn_message(&self) -> ChitchatMessage { - let digest = self.compute_digest(); + let scheduled_for_deletion: HashSet<_> = self.scheduled_for_deletion_nodes().collect(); + let digest = self.compute_digest(&scheduled_for_deletion); ChitchatMessage::Syn { cluster_id: self.config.cluster_id.clone(), digest, @@ -106,15 +107,16 @@ impl Chitchat { return Some(ChitchatMessage::BadCluster); } // Ensure for every reply from this node, at least the heartbeat is changed. - let dead_nodes: HashSet<_> = self.dead_nodes().collect(); - let self_digest = self.compute_digest(); + let scheduled_for_deletion: HashSet<_> = + self.scheduled_for_deletion_nodes().collect(); + let self_digest = self.compute_digest(&scheduled_for_deletion); let empty_delta = Delta::default(); let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - syn_ack_serialized_len(&self_digest, &empty_delta); let delta = self.cluster_state.compute_delta( &digest, delta_mtu, - &dead_nodes, + &scheduled_for_deletion, self.config.marked_for_deletion_grace_period as u64, ); Some(ChitchatMessage::SynAck { @@ -125,11 +127,12 @@ impl Chitchat { ChitchatMessage::SynAck { digest, delta } => { self.report_heartbeats(&delta); self.cluster_state.apply_delta(delta); - let dead_nodes = self.dead_nodes().collect::>(); + let scheduled_for_deletion = + self.scheduled_for_deletion_nodes().collect::>(); let delta = self.cluster_state.compute_delta( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1, - &dead_nodes, + &scheduled_for_deletion, self.config.marked_for_deletion_grace_period as u64, ); Some(ChitchatMessage::Ack { delta }) @@ -249,6 +252,11 @@ impl Chitchat { self.failure_detector.dead_nodes() } + /// Returns the set of nodes considered dead by the failure detector. + pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator { + self.failure_detector.scheduled_for_deletion_nodes() + } + /// Returns the set of seed nodes. pub fn seed_nodes(&self) -> HashSet { self.cluster_state.seed_addrs() @@ -277,8 +285,9 @@ impl Chitchat { } /// Computes the node's digest. - fn compute_digest(&self) -> Digest { - self.cluster_state.compute_digest() + fn compute_digest(&self, scheduled_for_deletion_nodes: &HashSet<&ChitchatId>) -> Digest { + self.cluster_state + .compute_digest(scheduled_for_deletion_nodes) } /// Subscribes a callback that will be called every time a key matching the supplied prefix @@ -577,9 +586,23 @@ mod tests { } #[tokio::test] - async fn test_dead_node_should_not_be_gossiped_when_node_joins() -> anyhow::Result<()> { + async fn test_dead_node_kvs_are_when_node_joins() -> anyhow::Result<()> { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); - let mut nodes = setup_nodes(40001..=40004, &transport).await; + // starting 2 nodes. + let mut nodes = setup_nodes(40001..=40002, &transport).await; + + // Let's add a key to node1. + let node1_id = { + let node1 = nodes.get(0).unwrap(); + let node1_chitchat = node1.chitchat(); + node1_chitchat + .lock() + .await + .self_node_state() + .set("test_key", "test_val"); + node1.chitchat_id().clone() + }; + { let node2 = nodes.get(1).unwrap(); assert_eq!(node2.chitchat_id().advertise_port(), 40002); @@ -588,54 +611,67 @@ mod tests { &[ ChitchatId::for_local_test(40001), ChitchatId::for_local_test(40002), - ChitchatId::for_local_test(40003), - ChitchatId::for_local_test(40004), ], ) .await; + let node2_chitchat = node2.chitchat(); + // We have received node3's key + let value = node2_chitchat + .lock() + .await + .node_state(&node1_id) + .unwrap() + .get("test_key") + .unwrap() + .to_string(); + assert_eq!(&value, "test_val"); } - // Take node 3 down. - let node3 = nodes.remove(2); - assert_eq!(node3.chitchat_id().advertise_port(), 40003); - node3.shutdown().await.unwrap(); + // Take node 1 down. + let node1 = nodes.remove(0); + assert_eq!(node1.chitchat_id().advertise_port(), 40001); + node1.shutdown().await.unwrap(); + // Node 2 has detected that node 1 is missing. { - let node2 = nodes.get(1).unwrap(); + let node2 = nodes.first().unwrap(); assert_eq!(node2.chitchat_id().advertise_port(), 40002); - wait_for_chitchat_state( - node2.chitchat(), - &[ - ChitchatId::for_local_test(40_001), - ChitchatId::for_local_test(40002), - ChitchatId::for_local_test(40_004), - ], - ) - .await; + wait_for_chitchat_state(node2.chitchat(), &[ChitchatId::for_local_test(40_002)]).await; } - // Restart node at localhost:40003 with new name - let mut new_config = ChitchatConfig::for_test(40_003); + // Restart node at localhost:40001 with new name + let mut new_config = ChitchatConfig::for_test(40_001); new_config.chitchat_id.node_id = "new_node".to_string(); let new_chitchat_id = new_config.chitchat_id.clone(); - let seed_addr = ChitchatId::for_local_test(40_002).gossip_advertise_addr; + let seed_addr = ChitchatId::for_local_test(40_001).gossip_advertise_addr; new_config.seed_nodes = vec![seed_addr.to_string()]; - let new_node_chitchat = spawn_chitchat(new_config, Vec::new(), &transport) + let new_node_chitchat_handle = spawn_chitchat(new_config, Vec::new(), &transport) .await .unwrap(); + let new_node_chitchat = new_node_chitchat_handle.chitchat(); wait_for_chitchat_state( - new_node_chitchat.chitchat(), - &[ - ChitchatId::for_local_test(40_001), - ChitchatId::for_local_test(40_002), - new_chitchat_id, - ChitchatId::for_local_test(40_004), - ], + new_node_chitchat.clone(), + &[ChitchatId::for_local_test(40_002), new_chitchat_id], ) .await; - nodes.push(new_node_chitchat); + { + let new_node_chitchat_guard = new_node_chitchat.lock().await; + let test_val = new_node_chitchat_guard + .node_state(&node1_id) + .unwrap() + .get("test_key") + .unwrap(); + assert_eq!(test_val, "test_val"); + + // Let's check that node1 is seen as dead. + let dead_nodes: HashSet<&ChitchatId> = new_node_chitchat_guard.dead_nodes().collect(); + assert_eq!(dead_nodes.len(), 1); + assert!(dead_nodes.contains(&node1_id)); + } + + nodes.push(new_node_chitchat_handle); shutdown_nodes(nodes).await?; Ok(()) } diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 0b2be33..bb96649 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -312,11 +312,12 @@ impl ClusterState { } } - pub fn compute_digest(&self) -> Digest { + pub fn compute_digest(&self, scheduled_for_deletion: &HashSet<&ChitchatId>) -> Digest { Digest { node_digests: self .node_states .iter() + .filter(|(chitchat_id, _)| !scheduled_for_deletion.contains(chitchat_id)) .map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest())) .collect(), } @@ -336,18 +337,20 @@ impl ClusterState { } /// Implements the Scuttlebutt reconciliation with the scuttle-depth ordering. + /// + /// Nodes that are scheduled for deletion (as passed by argument) are not shared. pub fn compute_delta( &self, digest: &Digest, mtu: usize, - dead_nodes: &HashSet<&ChitchatId>, + scheduled_for_deletion: &HashSet<&ChitchatId>, marked_for_deletion_grace_period: u64, ) -> Delta { let mut stale_nodes = SortedStaleNodes::default(); let mut nodes_to_reset = Vec::new(); for (chitchat_id, node_state) in &self.node_states { - if dead_nodes.contains(chitchat_id) { + if scheduled_for_deletion.contains(chitchat_id) { continue; } let Some(node_digest) = digest.node_digests.get(chitchat_id) else { @@ -833,7 +836,7 @@ mod tests { node2_state.set("key_a", ""); node2_state.set("key_b", ""); - let digest = cluster_state.compute_digest(); + let digest = cluster_state.compute_digest(&HashSet::new()); let mut expected_node_digests = Digest::default(); expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1);