Skip to content

Commit

Permalink
fix infinite loop in cluster mode when loosing connection to a node
Browse files Browse the repository at this point in the history
  • Loading branch information
mcatanzariti committed Dec 29, 2023
1 parent e5a1e0c commit 29b2b5f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/commands/cluster_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,11 +761,12 @@ pub struct ClusterNodeResult {
}

/// Cluster health status for the [`cluster_shards`](ClusterCommands::cluster_shards) command.
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ClusterHealthStatus {
Online,
Failed,
Fail,
Loading,
}

Expand Down
9 changes: 8 additions & 1 deletion src/network/cluster_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ impl ClusterConnection {
let read_futures = self.nodes.iter_mut().map(|n| n.connection.read().boxed());
let (result, node_idx, _) = future::select_all(read_futures).await;

if result.is_none() {
return None;
}

if let Some(Ok(bytes)) = &result {
if bytes.is_push_message() {
return result;
Expand Down Expand Up @@ -940,7 +944,10 @@ impl ClusterConnection {
let mut slot_ranges = Vec::<SlotRange>::new();

for shard_info in shard_info_list.into_iter() {
let Some(master_info) = shard_info.nodes.into_iter().find(|n| n.role == "master")
let Some(master_info) = shard_info
.nodes
.into_iter()
.find(|n| n.role == "master" && n.health == ClusterHealthStatus::Online)
else {
return Err(Error::Client("Cluster misconfiguration".to_owned()));
};
Expand Down
29 changes: 22 additions & 7 deletions src/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ use crate::{
ClusterShardResult, ConnectionCommands, FlushingMode, GenericCommands, HelloOptions,
MigrateOptions, ScriptingCommands, ServerCommands, StringCommands,
},
network::{Version, ClusterConnection},
network::{ClusterConnection, Version},
sleep, spawn,
tests::{get_cluster_test_client, get_cluster_test_client_with_command_timeout},
Error, RedisError, RedisErrorKind, Result,
};
use serial_test::serial;
use std::{
collections::HashSet,
future::IntoFuture,
};
use futures_util::try_join;
use serial_test::serial;
use std::{collections::HashSet, future::IntoFuture, time::Duration};

#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
Expand Down Expand Up @@ -464,4 +461,22 @@ async fn commands_to_different_nodes() -> Result<()> {
assert_eq!("1", val1);
assert_eq!("2", val2);
Ok(())
}
}

/// test reconnection to replica when master is stopped
/// master stop is not automated but must be done manually
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[serial]
#[ignore]
async fn get_loop() -> Result<()> {
let client = get_cluster_test_client().await?;
client.set("key", "value").await?;

for _ in 0..1000 {
let _value: Result<String> = client.get("key").await;
sleep(Duration::from_secs(1)).await;
}

Ok(())
}

0 comments on commit 29b2b5f

Please sign in to comment.