Skip to content

Commit

Permalink
Minor bugfix / refactoring. (#113)
Browse files Browse the repository at this point in the history
- Bugfix: We were recording our own delta to report heartbeats. I did not dig but this was probably unconsequential: It was probably dismissed because it was not seen as an update.

- Simplification: After this PR, we only focus on heartbeat updates as a sign that a node was updated. As a result, we remove max_version from
NodeDeltas. We also remove the MaxVersion type alias, that was pretty overkill.

- Unit test: attempt to fix a flaky test.

- Made UdpSocket public to remove a dynamic dispatch in quickwit.
  • Loading branch information
fulmicoton authored Feb 13, 2024
1 parent 783bfd0 commit c3a5baf
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 39 deletions.
13 changes: 1 addition & 12 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, HashSet};
use std::mem;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue};
use crate::{ChitchatId, Heartbeat, VersionedValue};

#[derive(Debug, Default, Eq, PartialEq)]
pub struct Delta {
Expand Down Expand Up @@ -84,8 +84,6 @@ impl Delta {
tombstone: Option<u64>,
) {
let node_delta = self.node_deltas.get_mut(chitchat_id).unwrap();

node_delta.max_version = node_delta.max_version.max(version);
node_delta.key_values.insert(
key.to_string(),
VersionedValue {
Expand All @@ -105,8 +103,6 @@ impl Delta {
pub(crate) struct NodeDelta {
pub heartbeat: Heartbeat,
pub key_values: BTreeMap<String, VersionedValue>,
// This attribute is computed upon deserialization. 0 if `key_values` is empty.
pub max_version: MaxVersion,
}

#[cfg(test)]
Expand Down Expand Up @@ -191,10 +187,6 @@ impl DeltaWriter {
) {
return false;
}
self.current_node_delta.max_version = self
.current_node_delta
.max_version
.max(versioned_value.version);
self.current_node_delta
.key_values
.insert(key.to_string(), versioned_value);
Expand Down Expand Up @@ -238,14 +230,12 @@ impl Serializable for NodeDelta {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let heartbeat = Heartbeat::deserialize(buf)?;
let mut key_values: BTreeMap<String, VersionedValue> = Default::default();
let mut max_version = 0;
let num_key_values = u16::deserialize(buf)?;
for _ in 0..num_key_values {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let tombstone = <Option<u64>>::deserialize(buf)?;
max_version = max_version.max(version);
key_values.insert(
key,
VersionedValue {
Expand All @@ -258,7 +248,6 @@ impl Serializable for NodeDelta {
Ok(Self {
heartbeat,
key_values,
max_version,
})
}

Expand Down
8 changes: 4 additions & 4 deletions chitchat/src/digest.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::collections::BTreeMap;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, MaxVersion};
use crate::{ChitchatId, Heartbeat, Version};

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub(crate) struct NodeDigest {
pub(crate) heartbeat: Heartbeat,
pub(crate) max_version: MaxVersion,
pub(crate) max_version: Version,
}

impl NodeDigest {
pub(crate) fn new(heartbeat: Heartbeat, max_version: MaxVersion) -> Self {
pub(crate) fn new(heartbeat: Heartbeat, max_version: Version) -> Self {
Self {
heartbeat,
max_version,
Expand All @@ -30,7 +30,7 @@ pub struct Digest {

#[cfg(test)]
impl Digest {
pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: MaxVersion) {
pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: Version) {
let node_digest = NodeDigest::new(heartbeat, max_version);
self.node_digests.insert(node, node_digest);
}
Expand Down
10 changes: 3 additions & 7 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::message::syn_ack_serialized_len;
pub use crate::message::ChitchatMessage;
pub use crate::server::{spawn_chitchat, ChitchatHandle};
use crate::state::ClusterState;
pub use crate::types::{ChitchatId, Heartbeat, MaxVersion, Version, VersionedValue};
pub use crate::types::{ChitchatId, Heartbeat, Version, VersionedValue};

/// Maximum UDP datagram payload size (in bytes).
///
Expand All @@ -51,7 +51,7 @@ pub struct Chitchat {
cluster_state: ClusterState,
failure_detector: FailureDetector,
/// Notifies listeners when a change has occurred in the set of live nodes.
previous_live_nodes: HashMap<ChitchatId, MaxVersion>,
previous_live_nodes: HashMap<ChitchatId, Version>,
live_nodes_watcher_tx: watch::Sender<BTreeMap<ChitchatId, NodeState>>,
live_nodes_watcher_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>,
}
Expand Down Expand Up @@ -117,15 +117,13 @@ impl Chitchat {
&dead_nodes,
self.config.marked_for_deletion_grace_period as u64,
);
self.report_heartbeats(&delta);
Some(ChitchatMessage::SynAck {
digest: self_digest,
delta,
})
}
ChitchatMessage::SynAck { digest, delta } => {
self.report_heartbeats(&delta);
// self.config.chitchat_id.node_id, digest, delta);
self.cluster_state.apply_delta(delta);
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
Expand Down Expand Up @@ -161,9 +159,7 @@ impl Chitchat {
fn report_heartbeats(&mut self, delta: &Delta) {
for (chitchat_id, node_delta) in &delta.node_deltas {
if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) {
if node_state.heartbeat() < node_delta.heartbeat
|| node_state.max_version() < node_delta.max_version
{
if node_state.heartbeat() < node_delta.heartbeat {
self.failure_detector.report_heartbeat(chitchat_id);
}
} else {
Expand Down
7 changes: 3 additions & 4 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use tracing::warn;
use crate::delta::{Delta, DeltaWriter};
use crate::digest::{Digest, NodeDigest};
use crate::listener::Listeners;
use crate::{ChitchatId, Heartbeat, KeyChangeEvent, MaxVersion, Version, VersionedValue};
use crate::{ChitchatId, Heartbeat, KeyChangeEvent, Version, VersionedValue};

#[derive(Clone, Serialize, Deserialize)]
pub struct NodeState {
node_id: ChitchatId,
heartbeat: Heartbeat,
key_values: BTreeMap<String, VersionedValue>,
max_version: MaxVersion,
max_version: Version,
#[serde(skip)]
#[serde(default = "Instant::now")]
last_heartbeat: Instant,
Expand Down Expand Up @@ -74,7 +74,7 @@ impl NodeState {
}

/// Returns the node's max version.
pub fn max_version(&self) -> MaxVersion {
pub fn max_version(&self) -> Version {
self.max_version
}

Expand Down Expand Up @@ -305,7 +305,6 @@ impl ClusterState {
node_state.heartbeat = node_delta.heartbeat;
node_state.last_heartbeat = Instant::now();
}
node_state.max_version = node_state.max_version.max(node_delta.max_version);
for (key, versioned_value) in node_delta.key_values {
node_state.max_version = node_state.max_version.max(versioned_value.version);
node_state.set_versioned_value(key, versioned_value);
Expand Down
2 changes: 1 addition & 1 deletion chitchat/src/transport/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Transport for UdpTransport {
}
}

struct UdpSocket {
pub struct UdpSocket {
buf_send: Vec<u8>,
buf_recv: Box<[u8; MAX_UDP_DATAGRAM_PAYLOAD_SIZE]>,
socket: tokio::net::UdpSocket,
Expand Down
3 changes: 0 additions & 3 deletions chitchat/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ impl VersionedValue {
/// The current version of a key.
pub type Version = u64;

/// The highest version of a key in a node's state.
pub type MaxVersion = u64;

/// The current heartbeat of a node.
#[derive(
Debug, Clone, Copy, Default, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize,
Expand Down
17 changes: 9 additions & 8 deletions chitchat/tests/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ async fn test_simple_simulation_with_network_partition() {

#[tokio::test]
async fn test_marked_for_deletion_gc_with_network_partition() {
const TIMEOUT: Duration = Duration::from_millis(500);
// let _ = tracing_subscriber::fmt::try_init();
let mut simulator = Simulator::new(Duration::from_millis(100), 10);
let chitchat_id_1 = create_chitchat_id("node-1");
Expand Down Expand Up @@ -358,13 +359,13 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
server_chitchat_id: chitchat_id_2.clone(),
chitchat_id: chitchat_id_1.clone(),
predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true),
timeout_opt: Some(Duration::from_millis(300)),
timeout_opt: Some(TIMEOUT),
},
Operation::NodeStateAssert {
server_chitchat_id: chitchat_id_3.clone(),
chitchat_id: chitchat_id_1.clone(),
predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true),
timeout_opt: Some(Duration::from_millis(300)),
timeout_opt: Some(TIMEOUT),
},
// Isolate node 3.
Operation::RemoveNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()),
Expand All @@ -379,7 +380,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
server_chitchat_id: chitchat_id_2.clone(),
chitchat_id: chitchat_id_1.clone(),
predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), true),
timeout_opt: Some(Duration::from_millis(300)),
timeout_opt: Some(TIMEOUT),
},
// Check marked for deletion is not propagated to node 3.
Operation::NodeStateAssert {
Expand All @@ -394,7 +395,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
server_chitchat_id: chitchat_id_2.clone(),
chitchat_id: chitchat_id_1.clone(),
predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false),
timeout_opt: Some(Duration::from_millis(300)),
timeout_opt: Some(TIMEOUT),
},
Operation::NodeStateAssert {
server_chitchat_id: chitchat_id_1.clone(),
Expand All @@ -411,7 +412,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
},
// Wait for propagation
// We need to wait longer... because node 4 is just starting?
Operation::Wait(Duration::from_millis(500)),
Operation::Wait(TIMEOUT),
Operation::NodeStateAssert {
server_chitchat_id: chitchat_id_3.clone(),
chitchat_id: chitchat_id_1.clone(),
Expand All @@ -422,7 +423,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
server_chitchat_id: chitchat_id_4.clone(),
chitchat_id: chitchat_id_1.clone(),
predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true),
timeout_opt: None,
timeout_opt: Some(TIMEOUT),
},
// Relink node 3
Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()),
Expand All @@ -431,13 +432,13 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
server_chitchat_id: chitchat_id_3.clone(),
chitchat_id: chitchat_id_1.clone(),
predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false),
timeout_opt: Some(Duration::from_millis(500)),
timeout_opt: Some(TIMEOUT),
},
Operation::NodeStateAssert {
server_chitchat_id: chitchat_id_4.clone(),
chitchat_id: chitchat_id_1.clone(),
predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false),
timeout_opt: Some(Duration::from_millis(500)),
timeout_opt: Some(TIMEOUT),
},
];
simulator.execute(operations).await;
Expand Down

0 comments on commit c3a5baf

Please sign in to comment.