From e6d4f627e4a316c392d44df5984491444915d516 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Fri, 5 Jul 2024 15:39:09 +0100 Subject: [PATCH] don't process emptyset --- crates/corro-agent/src/agent/run_root.rs | 12 ++++++------ crates/corro-agent/src/api/peer.rs | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index 470ef1d1..01d0813b 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -218,12 +218,12 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul tripwire.clone(), )); - spawn_counted(handlers::handle_emptyset( - agent.clone(), - bookie.clone(), - rx_emptyset, - tripwire.clone(), - )); + // spawn_counted(handlers::handle_emptyset( + // agent.clone(), + // bookie.clone(), + // rx_emptyset, + // tripwire.clone(), + // )); Ok(bookie) } diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 0d537e27..9c4983a4 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1374,7 +1374,7 @@ pub async fn parallel_sync( let counts = FuturesUnordered::from_iter(readers.into_iter().map(|(actor_id, mut read)| { let tx_changes = agent.tx_changes().clone(); - let tx_emptyset = agent.tx_emptyset().clone(); + // let tx_emptyset = agent.tx_emptyset().clone(); async move { let mut count = 0; @@ -1402,13 +1402,13 @@ pub async fn parallel_sync( change.seqs() ); // only accept emptyset that's from the same node that's syncing - if change.is_empty_set() { - tx_emptyset - .send(change) - .await - .map_err(|_| SyncRecvError::ChangesChannelClosed)?; - continue; - } + // if change.is_empty_set() { + // tx_emptyset + // .send(change) + // .await + // .map_err(|_| SyncRecvError::ChangesChannelClosed)?; + // continue; + // } tx_changes .send((change, ChangeSource::Sync))