Skip to content

Commit

Permalink
Spreading KVs for half of the dead node grace period.
Browse files Browse the repository at this point in the history
See README for more information.

This change is made because we use chitchat as a reliable broadcast to
update published position in Quickwit.
  • Loading branch information
fulmicoton committed Feb 15, 2024
1 parent 15f3641 commit 73fd3e2
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 46 deletions.
100 changes: 98 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ This crate is used at the core of Quickwit for
- failure detection
- sharing configuration, and extra metadata values

The idea of relying on scuttlebutt reconciliation and phi-accrual detection is borrowed from
Cassandra, itself borrowing it from DynamoDB.
The idea of relying on scuttlebutt reconciliation and phi-accrual detection is borrowed from Cassandra, itself borrowing it from DynamoDB.

A anti-entropy gossip algorithm called scuttlebutt is in charge of spreading
a common state to all nodes.
Expand All @@ -28,6 +27,9 @@ Not receiving any update from node for a given amount of time can therefore be
regarded as a sign of failure. Rather than using a hard threshold,
we use phi-accrual detection to dynamically compute a threshold.

We also abuse `chitchat` in Quickwit and use it like a reliable broadcast,
with different caveats.

# References

- ScuttleButt paper: https://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
Expand All @@ -36,3 +38,97 @@ we use phi-accrual detection to dynamically compute a threshold.
https://www.youtube.com/watch?v=FuP1Fvrv6ZQ
- https://docs.datastax.com/en/articles/cassandra/cassandrathenandnow.html
- https://github.com/apache/cassandra/blob/f5fb1b0bd32b5dc7da13ec66d43acbdad7fe9dbf/src/java/org/apache/cassandra/gms/Gossiper.java#L1749

# Heartbeat

In order to get a constant flow of updates to feed into phi-accrual detection,
chitchat's node state includes a key-value called `heartbeat`. The heartbeat of a given node, starts at 0, and is incremented once after each round of gossip initiated.

Nodes then report all heartbeat updates to a phi-accrual detector to
assess the liveness of this node. Liveness is a local concept. Every single
node computes its own vision of the liveness of all other nodes.

# KV deletion

The deletion of a KV is a just another type of mutation: it is
associated with a version, and replicated using the same mechanism as a KV update.

The library will then interpret this versioned tombstone before exposing kv to
the user.

To avoid keeping deleted KV indefinitely, the library includes a GC mechanism. Any nodes containing a tombstone older than a given `grace period threshold`
(age is measured in ticks of heartbeat), it is safe to be deleted.

This yields the following problem. If a node was disconnected for more than
`marked_for_deletion_grace_period`, they could have missed the deletion of a KV and never be aware of it.

To address this problem, nodes that are too outdated have to reset their state.

More accurately, let's assume a Node A sends a Syn message to Node B with a digest with an outdated version V for a node N.
Node B will compare the version of the digest with its own version.

If V is fresher than `own version - marked_for_deletion_grace_period`,
Node B knows that no GC has impacted Key values with a version above V. It can
safely emit a normal delta to A.
If however V is older than `own version - marked_for_deletion_grace_period`,
a GC could have been executed. Instead of sending a delta to Node A, Node B will
instruct A to reset its state.

Node A will then wipe-off whatever information it has about N, and will start syncing from a blank state.

# Node deletion

In Quickwit, we also use chitchat as a "reliable broadcast with caveats".
The idea of reliable broadcast is that the emission of a message is supposed
to eventually be received by all or none of the correct nodes. Here, a node is called "correct" if it does not fail at any point during its execution.

Of course, if the emitter starts failing before emitting its message, one cannot expect the message to reach anyone.
However, if at least one correct nodes receives the message, it will
eventually reach all correct nodes (assuming the node stays correct).

For this reason, we keep emitting KVs from dead nodes too.

To avoid keeping the state of dead nodes indefinitely, we make
a very important trade off.

If a node is marked as dead for more than `DEAD_NODE_GRACE_PERIOD`, we assume that its state can be safely removed from the system. The grace period is
computed from the last time we received an update from the dead node.

Just deleting the state is of course impossible. After the given `DEAD_NODE_GRACE_PERIOD / 2`, we will mark the dead node as `ScheduledForDeletion`.

We first stop sharing data about nodes in the `ScheduledForDeletion` state,
nor listing them node in our digest.

We also ignore any updates received about the dead node. For simplification, we do not even keep track of the last update received. Eventually, all the nodes of the cluster will have marked the dead node as `ScheduledForDeletion`.

After another `DEAD_NODE_GRACE_PERIOD` / 2 has elapsed since the last update received, we delete the dead node state.

It is important to set `DEAD_NODE_GRACE_PERIOD` with a value such `DEAD_NODE_GRACE_PERIOD / 2` is much greater than the period it takes to detect a faulty node.

Note that we are here breaking the reliable broadcast nature of chitchat.
New nodes joining after `DEAD_NODE_GRACE_PERIOD` for instance, will never know about the state of the dead node.

Also, if a node was disconnected from the cluster for more than `DEAD_NODE_GRACE_PERIOD / 2` and reconnects, it is likely to spread information
about the dead node again. Worse, it could not know about the deletion
of some specific KV and spread them again.

The chitchat library does not include any mechanism to prevent this from happening. They should however eventually get deleted (after a bit more than `DEAD_NODE_GRACE_PERIOD`) if the node is really dead.

If the node is alive, it should be able to fix everyone's state via reset or regular delta.

<!--
Alternative, more concise naming / explanation:
Node deletion
Heartbeats are fed into a phi-accrual detector.
Detector tells live nodes from failed nodes apart.
Failed nodes are GCed after GC_GRACE_PERIOD.
Reliable broadcast
In order to ensure reliable broadcast, we must propagate info about failed nodes for some time shorter than GC_GRACE_PERIOD before deleting them.
To do so, failed nodes are split into two categories: zombie and dead.
First, upon failure, failed nodes become zombie nodes, and we keep sharing data about them.
After ZOMBIE_GRACE_PERIOD, zombie nodes transition to dead nodes, and we stop sharing data about them.
ZOMBIE_GRACE_PERIOD is set to GC_GRACE_PERIOD / 2
-->
22 changes: 20 additions & 2 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ impl FailureDetector {
/// Removes and returns the list of garbage collectible nodes.
pub fn garbage_collect(&mut self) -> Vec<ChitchatId> {
let mut garbage_collected_nodes = Vec::new();
for (chitchat_id, instant) in self.dead_nodes.iter() {
if instant.elapsed() >= self.config.dead_node_grace_period {
let now = Instant::now();
for (chitchat_id, &time_of_death) in &self.dead_nodes {
if now >= time_of_death + self.config.dead_node_grace_period {
garbage_collected_nodes.push(chitchat_id.clone())
}
}
Expand All @@ -102,6 +103,23 @@ impl FailureDetector {
self.dead_nodes.keys()
}

/// Returns the list of nodes considered dead by the failure detector.
pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
let now = Instant::now();
let half_dead_node_grace_period = self.config.dead_node_grace_period.div_f32(2.0f32);
// Note: we can't just compute the threshold now - half_dead_node_grace_period, because it
// would underflow on some platform (MacOS).
self.dead_nodes
.iter()
.filter_map(move |(chitchat_id, time_of_death)| {
if *time_of_death + half_dead_node_grace_period < now {
Some(chitchat_id)
} else {
None
}
})
}

/// Returns the current phi value of a node.
fn phi(&mut self, chitchat_id: &ChitchatId) -> Option<f64> {
self.node_samples
Expand Down
113 changes: 75 additions & 38 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ impl Chitchat {
}

pub(crate) fn create_syn_message(&self) -> ChitchatMessage {
let digest = self.compute_digest();
let scheduled_for_deletion: HashSet<_> = self.scheduled_for_deletion_nodes().collect();
let digest = self.compute_digest(&scheduled_for_deletion);
ChitchatMessage::Syn {
cluster_id: self.config.cluster_id.clone(),
digest,
Expand All @@ -112,13 +113,15 @@ impl Chitchat {
);
return Some(ChitchatMessage::BadCluster);
}
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest();

let scheduled_for_deletion: HashSet<_> =
self.scheduled_for_deletion_nodes().collect();
let self_digest = self.compute_digest(&scheduled_for_deletion);
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - digest.serialized_len();
let delta = self.cluster_state.compute_partial_delta_respecting_mtu(
&digest,
delta_mtu,
&dead_nodes,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::SynAck {
Expand All @@ -128,11 +131,12 @@ impl Chitchat {
}
ChitchatMessage::SynAck { digest, delta } => {
self.process_delta(delta);
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let scheduled_for_deletion =
self.scheduled_for_deletion_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&dead_nodes,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::Ack { delta })
Expand Down Expand Up @@ -254,6 +258,11 @@ impl Chitchat {
self.failure_detector.dead_nodes()
}

/// Returns the set of nodes considered dead by the failure detector.
pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
self.failure_detector.scheduled_for_deletion_nodes()
}

/// Returns the set of seed nodes.
pub fn seed_nodes(&self) -> HashSet<SocketAddr> {
self.cluster_state.seed_addrs()
Expand Down Expand Up @@ -282,8 +291,9 @@ impl Chitchat {
}

/// Computes the node's digest.
fn compute_digest(&self) -> Digest {
self.cluster_state.compute_digest()
fn compute_digest(&self, scheduled_for_deletion_nodes: &HashSet<&ChitchatId>) -> Digest {
self.cluster_state
.compute_digest(scheduled_for_deletion_nodes)
}

/// Subscribes a callback that will be called every time a key matching the supplied prefix
Expand Down Expand Up @@ -582,9 +592,23 @@ mod tests {
}

#[tokio::test]
async fn test_dead_node_should_not_be_gossiped_when_node_joins() -> anyhow::Result<()> {
async fn test_dead_node_kvs_are_when_node_joins() -> anyhow::Result<()> {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let mut nodes = setup_nodes(40001..=40004, &transport).await;
// starting 2 nodes.
let mut nodes = setup_nodes(40001..=40002, &transport).await;

// Let's add a key to node1.
let node1_id = {
let node1 = nodes.first().unwrap();
let node1_chitchat = node1.chitchat();
node1_chitchat
.lock()
.await
.self_node_state()
.set("test_key", "test_val");
node1.chitchat_id().clone()
};

{
let node2 = nodes.get(1).unwrap();
assert_eq!(node2.chitchat_id().advertise_port(), 40002);
Expand All @@ -593,54 +617,67 @@ mod tests {
&[
ChitchatId::for_local_test(40001),
ChitchatId::for_local_test(40002),
ChitchatId::for_local_test(40003),
ChitchatId::for_local_test(40004),
],
)
.await;
let node2_chitchat = node2.chitchat();
// We have received node3's key
let value = node2_chitchat
.lock()
.await
.node_state(&node1_id)
.unwrap()
.get("test_key")
.unwrap()
.to_string();
assert_eq!(&value, "test_val");
}

// Take node 3 down.
let node3 = nodes.remove(2);
assert_eq!(node3.chitchat_id().advertise_port(), 40003);
node3.shutdown().await.unwrap();
// Take node 1 down.
let node1 = nodes.remove(0);
assert_eq!(node1.chitchat_id().advertise_port(), 40001);
node1.shutdown().await.unwrap();

// Node 2 has detected that node 1 is missing.
{
let node2 = nodes.get(1).unwrap();
let node2 = nodes.first().unwrap();
assert_eq!(node2.chitchat_id().advertise_port(), 40002);
wait_for_chitchat_state(
node2.chitchat(),
&[
ChitchatId::for_local_test(40_001),
ChitchatId::for_local_test(40002),
ChitchatId::for_local_test(40_004),
],
)
.await;
wait_for_chitchat_state(node2.chitchat(), &[ChitchatId::for_local_test(40_002)]).await;
}

// Restart node at localhost:40003 with new name
let mut new_config = ChitchatConfig::for_test(40_003);
// Restart node at localhost:40001 with new name
let mut new_config = ChitchatConfig::for_test(40_001);
new_config.chitchat_id.node_id = "new_node".to_string();
let new_chitchat_id = new_config.chitchat_id.clone();
let seed_addr = ChitchatId::for_local_test(40_002).gossip_advertise_addr;
let seed_addr = ChitchatId::for_local_test(40_001).gossip_advertise_addr;
new_config.seed_nodes = vec![seed_addr.to_string()];
let new_node_chitchat = spawn_chitchat(new_config, Vec::new(), &transport)
let new_node_chitchat_handle = spawn_chitchat(new_config, Vec::new(), &transport)
.await
.unwrap();

let new_node_chitchat = new_node_chitchat_handle.chitchat();
wait_for_chitchat_state(
new_node_chitchat.chitchat(),
&[
ChitchatId::for_local_test(40_001),
ChitchatId::for_local_test(40_002),
new_chitchat_id,
ChitchatId::for_local_test(40_004),
],
new_node_chitchat.clone(),
&[ChitchatId::for_local_test(40_002), new_chitchat_id],
)
.await;

nodes.push(new_node_chitchat);
{
let new_node_chitchat_guard = new_node_chitchat.lock().await;
let test_val = new_node_chitchat_guard
.node_state(&node1_id)
.unwrap()
.get("test_key")
.unwrap();
assert_eq!(test_val, "test_val");

// Let's check that node1 is seen as dead.
let dead_nodes: HashSet<&ChitchatId> = new_node_chitchat_guard.dead_nodes().collect();
assert_eq!(dead_nodes.len(), 1);
assert!(dead_nodes.contains(&node1_id));
}

nodes.push(new_node_chitchat_handle);
shutdown_nodes(nodes).await?;
Ok(())
}
Expand Down
11 changes: 7 additions & 4 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,12 @@ impl ClusterState {
}
}

pub fn compute_digest(&self) -> Digest {
pub fn compute_digest(&self, scheduled_for_deletion: &HashSet<&ChitchatId>) -> Digest {
Digest {
node_digests: self
.node_states
.iter()
.filter(|(chitchat_id, _)| !scheduled_for_deletion.contains(chitchat_id))
.map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest()))
.collect(),
}
Expand All @@ -341,18 +342,20 @@ impl ClusterState {
}

/// Implements the Scuttlebutt reconciliation with the scuttle-depth ordering.
///
/// Nodes that are scheduled for deletion (as passed by argument) are not shared.
pub fn compute_partial_delta_respecting_mtu(
&self,
digest: &Digest,
mtu: usize,
dead_nodes: &HashSet<&ChitchatId>,
scheduled_for_deletion: &HashSet<&ChitchatId>,
marked_for_deletion_grace_period: u64,
) -> Delta {
let mut stale_nodes = SortedStaleNodes::default();
let mut nodes_to_reset = Vec::new();

for (chitchat_id, node_state) in &self.node_states {
if dead_nodes.contains(chitchat_id) {
if scheduled_for_deletion.contains(chitchat_id) {
continue;
}
let Some(node_digest) = digest.node_digests.get(chitchat_id) else {
Expand Down Expand Up @@ -838,7 +841,7 @@ mod tests {
node2_state.set("key_a", "");
node2_state.set("key_b", "");

let digest = cluster_state.compute_digest();
let digest = cluster_state.compute_digest(&HashSet::new());

let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1);
Expand Down

0 comments on commit 73fd3e2

Please sign in to comment.