Skip to content

Commit

Permalink
Lazy loading current actor's BookedVersions (#186)
Browse files Browse the repository at this point in the history
* we're not in such a hurry to process bookkeeping, do 2 at a time, switch to spawn_blocking

* asynchronously fill even the current actor id

* drain subscriptions when tripwire is tripped
  • Loading branch information
jeromegn authored Apr 11, 2024
1 parent 357863a commit 4b0a94e
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 115 deletions.
82 changes: 8 additions & 74 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
//! Start the root agent tasks

use std::sync::Arc;

use crate::{
agent::{
handlers::{self, spawn_handle_db_cleanup},
metrics, setup, util, AgentOptions,
},
api::public::pubsub::{
process_sub_channel, MatcherBroadcastCache, SharedMatcherBroadcastCache,
},
broadcast::runtime_loop,
};
use corro_types::{
Expand All @@ -18,12 +13,10 @@ use corro_types::{
base::CrsqlSeq,
channel::bounded,
config::{Config, PerfConfig},
pubsub::{Matcher, SubsManager},
};

use futures::{FutureExt, StreamExt, TryStreamExt};
use spawn::spawn_counted;
use tokio::{sync::RwLock as TokioRwLock, task::block_in_place};
use tracing::{error, info};
use tripwire::Tripwire;

Expand Down Expand Up @@ -53,15 +46,13 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
rx_changes,
rx_foca,
subs_manager,
subs_bcast_cache,
rtt_rx,
} = opts;

// Get our gossip address and make sure it's valid
let gossip_addr = gossip_server_endpoint.local_addr()?;

// Setup subscription handlers
let subs_bcast_cache = setup_spawn_subscriptions(&agent, &subs_manager, &tripwire).await?;

//// Start PG server to accept query requests from PG clients
// TODO: pull this out into a separate function?
if let Some(pg_conf) = agent.config().api.pg.clone() {
Expand Down Expand Up @@ -140,6 +131,9 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
.query_map([], |row| row.get(0))
.and_then(|rows| rows.collect::<rusqlite::Result<Vec<_>>>())?;

// not strictly required, but we don't need to keep it open
drop(conn);

let pool = agent.pool();

let mut buf = futures::stream::iter(
Expand All @@ -150,18 +144,18 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
.map(|actor_id| {
let pool = pool.clone();
async move {
tokio::spawn(async move {
let conn = pool.read().await?;
tokio::task::spawn_blocking(move || {
let conn = pool.read_blocking()?;

block_in_place(|| BookedVersions::from_conn(&conn, actor_id))
BookedVersions::from_conn(&conn, actor_id)
.map(|bv| (actor_id, bv))
.map_err(eyre::Report::from)
})
.await?
}
}),
)
.buffer_unordered(8);
.buffer_unordered(2);

while let Some((actor_id, bv)) = TryStreamExt::try_next(&mut buf).await? {
for (version, partial) in bv.partials.iter() {
Expand Down Expand Up @@ -220,63 +214,3 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul

Ok(bookie)
}

/// Initialise subscription state and tasks
///
/// 1. Get subscriptions state directory from config
/// 2. Load existing subscriptions and restore them in SubsManager
/// 3. Spawn subscription processor task
async fn setup_spawn_subscriptions(
agent: &Agent,
subs_manager: &SubsManager,
tripwire: &Tripwire,
) -> eyre::Result<SharedMatcherBroadcastCache> {
let mut subs_bcast_cache = MatcherBroadcastCache::default();
let mut to_cleanup = vec![];

let subs_path = agent.config().db.subscriptions_path();

if let Ok(mut dir) = tokio::fs::read_dir(&subs_path).await {
while let Ok(Some(entry)) = dir.next_entry().await {
let path_str = entry.path().display().to_string();
if let Some(sub_id_str) = path_str.strip_prefix(subs_path.as_str()) {
if let Ok(sub_id) = sub_id_str.trim_matches('/').parse() {
let (_, created) = match subs_manager.restore(
sub_id,
&subs_path,
&agent.schema().read(),
agent.pool(),
tripwire.clone(),
) {
Ok(res) => res,
Err(e) => {
error!(%sub_id, "could not restore subscription: {e}");
to_cleanup.push(sub_id);
continue;
}
};

info!(%sub_id, "Restored subscription");

let (sub_tx, _) = tokio::sync::broadcast::channel(10240);

tokio::spawn(process_sub_channel(
subs_manager.clone(),
sub_id,
sub_tx.clone(),
created.evt_rx,
));

subs_bcast_cache.insert(sub_id, sub_tx);
}
}
}
}

for id in to_cleanup {
info!(sub_id = %id, "Cleaning up unclean subscription");
Matcher::cleanup(id, Matcher::sub_path(subs_path.as_path(), id))?;
}

Ok(Arc::new(TokioRwLock::new(subs_bcast_cache)))
}
123 changes: 107 additions & 16 deletions crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,33 @@

// External crates
use arc_swap::ArcSwap;
use camino::Utf8PathBuf;
use parking_lot::RwLock;
use rusqlite::{Connection, OptionalExtension};
use std::{net::SocketAddr, ops::RangeInclusive, sync::Arc, time::Duration};
use std::{
net::SocketAddr,
ops::{DerefMut, RangeInclusive},
sync::Arc,
time::Duration,
};
use tokio::{
net::TcpListener,
sync::{
mpsc::{channel as tokio_channel, Receiver as TokioReceiver},
Semaphore,
RwLock as TokioRwLock, Semaphore,
},
};
use tracing::{debug, info};
use tracing::{debug, error, info};
use tripwire::Tripwire;

// Internals
use crate::{api::peer::gossip_server_endpoint, transport::Transport};
use crate::{
api::{
peer::gossip_server_endpoint,
public::pubsub::{process_sub_channel, MatcherBroadcastCache, SharedMatcherBroadcastCache},
},
transport::Transport,
};
use corro_types::{
actor::ActorId,
agent::{migrate, Agent, AgentConfig, Booked, BookedVersions, LockRegistry, SplitPool},
Expand All @@ -25,8 +37,8 @@ use corro_types::{
channel::{bounded, CorroReceiver},
config::Config,
members::Members,
pubsub::SubsManager,
schema::init_schema,
pubsub::{Matcher, SubsManager},
schema::{init_schema, Schema},
sqlite::CrConn,
};

Expand All @@ -44,6 +56,7 @@ pub struct AgentOptions {
pub rx_foca: CorroReceiver<FocaInput>,
pub rtt_rx: TokioReceiver<(SocketAddr, Duration)>,
pub subs_manager: SubsManager,
pub subs_bcast_cache: SharedMatcherBroadcastCache,
pub tripwire: Tripwire,
}

Expand Down Expand Up @@ -81,6 +94,18 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
schema
};

let subs_manager = SubsManager::default();

// Setup subscription handlers
let subs_bcast_cache = setup_spawn_subscriptions(
&subs_manager,
conf.db.subscriptions_path(),
&pool,
&schema,
&tripwire,
)
.await?;

let cluster_id = {
let conn = pool.read().await?;
conn.query_row(
Expand All @@ -97,15 +122,6 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
let (tx_apply, rx_apply) = bounded(conf.perf.apply_channel_len, "apply");
let (tx_clear_buf, rx_clear_buf) = bounded(conf.perf.clearbuf_channel_len, "clear_buf");

let lock_registry = LockRegistry::default();
let booked = {
let conn = pool.read().await?;
Booked::new(
BookedVersions::from_conn(&conn, actor_id)?,
lock_registry.clone(),
)
};

let gossip_server_endpoint = gossip_server_endpoint(&conf.gossip).await?;
let gossip_addr = gossip_server_endpoint.local_addr()?;

Expand All @@ -132,7 +148,21 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
let (tx_changes, rx_changes) = bounded(conf.perf.changes_channel_len, "changes");
let (tx_foca, rx_foca) = bounded(conf.perf.foca_channel_len, "foca");

let subs_manager = SubsManager::default();
let lock_registry = LockRegistry::default();

// make an empty booked!
let booked = Booked::new(BookedVersions::default(), lock_registry.clone());

// asynchronously load it up!
tokio::task::spawn_blocking({
let pool = pool.clone();
let mut booked = booked.write_owned("init").await;
move || {
let conn = pool.read_blocking()?;
*booked.deref_mut().deref_mut() = BookedVersions::from_conn(&conn, actor_id)?;
Ok::<_, eyre::Report>(())
}
});

let opts = AgentOptions {
gossip_server_endpoint,
Expand All @@ -147,6 +177,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
rx_foca,
rtt_rx,
subs_manager: subs_manager.clone(),
subs_bcast_cache,
tripwire: tripwire.clone(),
};

Expand Down Expand Up @@ -175,3 +206,63 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

Ok((agent, opts))
}

/// Initialise subscription state and tasks
///
/// 1. Get subscriptions state directory from config
/// 2. Load existing subscriptions and restore them in SubsManager
/// 3. Spawn subscription processor task
async fn setup_spawn_subscriptions(
subs_manager: &SubsManager,
subs_path: Utf8PathBuf,
pool: &SplitPool,
schema: &Schema,
tripwire: &Tripwire,
) -> eyre::Result<SharedMatcherBroadcastCache> {
let mut subs_bcast_cache = MatcherBroadcastCache::default();
let mut to_cleanup = vec![];

if let Ok(mut dir) = tokio::fs::read_dir(&subs_path).await {
while let Ok(Some(entry)) = dir.next_entry().await {
let path_str = entry.path().display().to_string();
if let Some(sub_id_str) = path_str.strip_prefix(subs_path.as_str()) {
if let Ok(sub_id) = sub_id_str.trim_matches('/').parse() {
let (_, created) = match subs_manager.restore(
sub_id,
&subs_path,
schema,
pool,
tripwire.clone(),
) {
Ok(res) => res,
Err(e) => {
error!(%sub_id, "could not restore subscription: {e}");
to_cleanup.push(sub_id);
continue;
}
};

info!(%sub_id, "Restored subscription");

let (sub_tx, _) = tokio::sync::broadcast::channel(10240);

tokio::spawn(process_sub_channel(
subs_manager.clone(),
sub_id,
sub_tx.clone(),
created.evt_rx,
));

subs_bcast_cache.insert(sub_id, sub_tx);
}
}
}
}

for id in to_cleanup {
info!(sub_id = %id, "Cleaning up unclean subscription");
Matcher::cleanup(id, Matcher::sub_path(subs_path.as_path(), id))?;
}

Ok(Arc::new(TokioRwLock::new(subs_bcast_cache)))
}
Loading

0 comments on commit 4b0a94e

Please sign in to comment.