Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send empties during sync based on timestamp #223

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use corro_types::{
sqlite::SqlitePoolError,
sync::{SyncMessageDecodeError, SyncMessageEncodeError},
};
use tokio::time::error::Elapsed;
use hyper::StatusCode;
use tokio::time::error::Elapsed;

#[derive(Debug, thiserror::Error)]
pub enum SyncClientError {
Expand Down
160 changes: 155 additions & 5 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! This module is _big_ and maybe should be split up further.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::RangeInclusive;

use std::{
cmp,
collections::VecDeque,
Expand All @@ -26,7 +29,11 @@ use corro_types::{
};

use bytes::Bytes;
use corro_types::agent::ChangeError;
use corro_types::base::Version;
use corro_types::broadcast::Timestamp;
use corro_types::change::store_empty_changeset;
use corro_types::sync::get_last_cleared_ts;
use foca::Notification;
use indexmap::IndexMap;
use metrics::{counter, gauge, histogram};
Expand Down Expand Up @@ -392,14 +399,141 @@ pub fn spawn_handle_db_cleanup(pool: SplitPool) {

// determine the estimated resource cost of processing a change
fn processing_cost(change: &Changeset) -> usize {
if change.is_empty() {
if change.is_empty() && !change.is_empty_set() {
let versions = change.versions();
cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)
} else {
change.len()
}
}

/// Handle incoming emptyset received during syncs
///
pub async fn handle_emptyset(
agent: Agent,
bookie: Bookie,
mut rx_emptysets: CorroReceiver<ChangeV1>,
mut tripwire: Tripwire,
) {
// maybe bigger timeout?
let mut max_wait = tokio::time::interval(Duration::from_millis(
agent.config().perf.apply_queue_timeout as u64,
));
let max_changes_chunk: usize = agent.config().perf.apply_queue_len;

let mut buf = HashMap::new();
let mut cost = 0;

loop {
let mut process = false;
tokio::select! {
maybe_change_src = rx_emptysets.recv() => match maybe_change_src {
Some(change) => {
if let Changeset::EmptySet { versions, ts } = change.changeset {
buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts));
cost += versions.len();
} else {
warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id);
}

if cost >= max_changes_chunk && !buf.is_empty() {
process = true;
}
},
None => break,
},
_ = max_wait.tick() => {
process = true
},
_ = &mut tripwire => {
break;
}
}

if process {
for (actor, changes) in &mut buf {
while !changes.is_empty() {
let change = changes.pop_front().unwrap();
match process_emptyset(agent.clone(), bookie.clone(), *actor, &change).await {
Ok(()) => {
cost -= change.0.len();
}
Err(e) => {
warn!("encountered error when processing emptyset - {e}");
changes.push_front(change);
break;
}
}
}
}
}
}

println!("shutting down handle empties loop");
}

pub async fn process_emptyset(
agent: Agent,
bookie: Bookie,
actor_id: ActorId,
changes: &(Vec<RangeInclusive<corro_types::base::Version>>, Timestamp),
) -> Result<(), ChangeError> {
let (versions, ts) = changes;
let mut conn = agent.pool().write_low().await?;

let booked = {
bookie
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}",
))
.await
.ensure(actor_id)
};
let mut booked_write = booked
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}"
))
.await;
let mut snap = booked_write.snapshot();

let tx = conn
.immediate_transaction()
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;

counter!("corro.sync.empties.count", "actor" => format!("{}", actor_id.to_string()))
.increment(versions.len() as u64);
debug!(self_actor_id = %agent.actor_id(), "processing emptyset changes, len: {}", versions.len());
for version in versions {
store_empty_changeset(&tx, actor_id, version.clone(), *ts)?;
}

snap.insert_db(&tx, RangeInclusiveSet::from_iter(versions.clone()))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;
snap.update_cleared_ts(&tx, *ts)
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;

tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;
booked_write.commit_snapshot(snap);

Ok(())
}

/// Bundle incoming changes to optimise transaction sizes with SQLite
///
/// *Performance tradeoff*: introduce latency (with a max timeout) to
Expand Down Expand Up @@ -658,8 +792,15 @@ pub async fn handle_sync(
**id != agent.actor_id() && state.cluster_id == agent.cluster_id()
})
// Grab a ring-buffer index to the member RTT range
.map(|(id, state)| (*id, state.ring.unwrap_or(255), state.addr))
.collect::<Vec<(ActorId, u8, SocketAddr)>>()
.map(|(id, state)| {
(
*id,
state.ring.unwrap_or(255),
state.addr,
state.last_sync_ts,
)
})
.collect::<Vec<(ActorId, u8, SocketAddr, Option<Timestamp>)>>()
};

if candidates.is_empty() {
Expand All @@ -682,14 +823,16 @@ pub async fn handle_sync(
sync_state
.need_len_for_actor(&b.0)
.cmp(&sync_state.need_len_for_actor(&a.0))
// if equal, look at last sync time
.then_with(|| a.3.cmp(&b.3))
// if equal, look at proximity (via `ring`)
.then_with(|| a.1.cmp(&b.1))
});

choices.truncate(desired_count);
choices
.into_iter()
.map(|(actor_id, _, addr)| (actor_id, addr))
.map(|(actor_id, _, addr, _)| (actor_id, addr))
.collect()
};

Expand All @@ -698,8 +841,14 @@ pub async fn handle_sync(
return Ok(());
}

let mut last_cleared: HashMap<ActorId, Option<Timestamp>> = HashMap::new();

for (actor_id, _) in chosen.clone() {
last_cleared.insert(actor_id, get_last_cleared_ts(&bookie, &actor_id).await);
}

let start = Instant::now();
let n = match parallel_sync(agent, transport, chosen.clone(), sync_state).await {
let n = match parallel_sync(agent, transport, chosen.clone(), sync_state, last_cleared).await {
Ok(n) => n,
Err(e) => {
error!("failed to execute parallel sync: {e:?}");
Expand All @@ -712,6 +861,7 @@ pub async fn handle_sync(
info!(
"synced {n} changes w/ {} in {}s @ {} changes/s",
chosen
.clone()
.into_iter()
.map(|(actor_id, _)| actor_id.to_string())
.collect::<Vec<_>>()
Expand Down
8 changes: 8 additions & 0 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
rx_apply,
rx_clear_buf,
rx_changes,
rx_emptyset,
rx_foca,
subs_manager,
subs_bcast_cache,
Expand Down Expand Up @@ -217,5 +218,12 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
tripwire.clone(),
));

spawn_counted(handlers::handle_emptyset(
agent.clone(),
bookie.clone(),
rx_emptyset,
tripwire.clone(),
));

Ok(bookie)
}
23 changes: 14 additions & 9 deletions crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct AgentOptions {
pub rx_apply: CorroReceiver<(ActorId, Version)>,
pub rx_clear_buf: CorroReceiver<(ActorId, RangeInclusive<Version>)>,
pub rx_changes: CorroReceiver<(ChangeV1, ChangeSource)>,
pub rx_emptyset: CorroReceiver<ChangeV1>,
pub rx_foca: CorroReceiver<FocaInput>,
pub rtt_rx: TokioReceiver<(SocketAddr, Duration)>,
pub subs_manager: SubsManager,
Expand Down Expand Up @@ -84,9 +85,16 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let pool = SplitPool::create(&conf.db.path, write_sema.clone()).await?;

let clock = Arc::new(
uhlc::HLCBuilder::default()
.with_id(actor_id.try_into().unwrap())
.with_max_delta(Duration::from_millis(300))
.build(),
);

let schema = {
let mut conn = pool.write_priority().await?;
migrate(&mut conn)?;
migrate(clock.clone(), &mut conn)?;
let mut schema = init_schema(&conn)?;
schema.constrain()?;

Expand Down Expand Up @@ -138,15 +146,9 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
}
let api_addr = api_listeners.first().unwrap().local_addr()?;

let clock = Arc::new(
uhlc::HLCBuilder::default()
.with_id(actor_id.try_into().unwrap())
.with_max_delta(Duration::from_millis(300))
.build(),
);

let (tx_bcast, rx_bcast) = bounded(conf.perf.bcast_channel_len, "bcast");
let (tx_changes, rx_changes) = bounded(conf.perf.changes_channel_len, "changes");
let (tx_emptyset, rx_emptyset) = bounded(conf.perf.changes_channel_len, "emptyset");
let (tx_foca, rx_foca) = bounded(conf.perf.foca_channel_len, "foca");

let lock_registry = LockRegistry::default();
Expand All @@ -162,7 +164,8 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
async move {
let conn = pool.read().await?;
*booked.deref_mut().deref_mut() =
tokio::task::block_in_place(|| BookedVersions::from_conn(&conn, actor_id))?;
tokio::task::block_in_place(|| BookedVersions::from_conn(&conn, actor_id))
.expect("loading BookedVersions from db failed");
Ok::<_, eyre::Report>(())
}
});
Expand All @@ -176,6 +179,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
rx_apply,
rx_clear_buf,
rx_changes,
rx_emptyset,
rx_foca,
rtt_rx,
subs_manager: subs_manager.clone(),
Expand All @@ -197,6 +201,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
tx_apply,
tx_clear_buf,
tx_changes,
tx_emptyset,
tx_foca,
write_sema,
schema: RwLock::new(schema),
Expand Down
Loading
Loading