diff --git a/crates/corro-admin/src/lib.rs b/crates/corro-admin/src/lib.rs index 855f8652..6467622d 100644 --- a/crates/corro-admin/src/lib.rs +++ b/crates/corro-admin/src/lib.rs @@ -96,6 +96,7 @@ pub enum Command { Locks { top: usize }, Cluster(ClusterCommand), Actor(ActorCommand), + Subs(SubsCommand), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -104,6 +105,11 @@ pub enum SyncCommand { ReconcileGaps, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum SubsCommand { + Info { hash: String }, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ClusterCommand { Rejoin, @@ -521,6 +527,38 @@ async fn handle_conn( send_success(&mut stream).await; } + Command::Subs(SubsCommand::Info { hash }) => { + match agent.subs_manager().get_by_hash(&hash) { + Some(matcher) => { + let statements = matcher + .cached_stmts() + .iter() + .map(|(table, stmts)| { + json!({ + table: stmts.new_query(), + }) + }) + .collect::>(); + send( + &mut stream, + Response::Json(serde_json::json!({ + "id": matcher.id(), + "path": matcher.subs_path(), + "last_change_id": matcher.last_change_id_sent(), + "original query": matcher.sql(), + "statement": statements, + })), + ) + .await; + send_success(&mut stream).await; + } + None => { + send_error(&mut stream, format!("unknown subscription hash: {hash}")) + .await; + continue; + } + }; + } }, Ok(None) => { debug!("done with admin conn"); diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index bd582a37..a6c32029 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -74,6 +74,10 @@ impl SubsManager { self.0.read().get_by_query(sql) } + pub fn get_by_hash(&self, hash: &str) -> Option { + self.0.read().get_by_hash(hash) + } + pub fn get_or_insert( &self, sql: &str, @@ -328,6 +332,13 @@ impl InnerSubsManager { .and_then(|id| self.handles.get(id).cloned()) } + pub fn get_by_hash(&self, hash: &str) -> Option { + self.handles + .values() + .find(|x| x.inner.hash == hash) + .cloned() + } + fn remove(&mut self, id: &Uuid) -> Option { let handle = self.handles.remove(id)?; self.queries.remove(&handle.inner.sql); @@ -364,6 +375,9 @@ struct InnerMatcherHandle { cancel: CancellationToken, changes_tx: mpsc::Sender<(MatchCandidates, CrsqlDbVersion)>, last_change_rx: watch::Receiver, + // some state from the matcher so we can take a look later + subs_path: String, + cached_statements: HashMap, } type MatchCandidates = IndexMap>>; @@ -373,6 +387,10 @@ impl MatcherHandle { self.inner.id } + pub fn sql(&self) -> &String { + &self.inner.sql + } + pub fn hash(&self) -> &str { &self.inner.hash } @@ -385,6 +403,14 @@ impl MatcherHandle { &self.inner.col_names } + pub fn subs_path(&self) -> &String { + &self.inner.subs_path + } + + pub fn cached_stmts(&self) -> &HashMap { + &self.inner.cached_statements + } + pub async fn cleanup(self) { self.inner.cancel.cancel(); info!(sub_id = %self.inner.id, "Canceled subscription"); @@ -593,6 +619,12 @@ pub struct MatcherStmt { temp_query: String, } +impl MatcherStmt { + pub fn new_query(self: &Self) -> &String { + return &self.new_query; + } +} + const CHANGE_ID_COL: &str = "id"; const CHANGE_TYPE_COL: &str = "type"; @@ -819,6 +851,8 @@ impl Matcher { cancel: cancel.clone(), last_change_rx, changes_tx, + cached_statements: statements.clone(), + subs_path: sub_path.to_string(), }), state: state.clone(), }; diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 303dec61..5aa546f3 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -279,7 +279,6 @@ impl From for SyncMessage { } } - // generates a `SyncMessage` to tell another node what versions we're missing #[tracing::instrument(skip_all, level = "debug")] pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncStateV1 { diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index 8079f3ce..036e7ad5 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -522,6 +522,13 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> { info!("Exited with code: {:?}", exit.code()); std::process::exit(exit.code().unwrap_or(1)); } + Command::Subs(SubsCommand::Info { hash }) => { + let mut conn = AdminConn::connect(cli.admin_path()).await?; + conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::Info { + hash: hash.to_string(), + })) + .await?; + } } Ok(()) @@ -686,6 +693,10 @@ enum Command { /// DB-related commands #[command(subcommand)] Db(DbCommand), + + /// Subscription related commands + #[command(subcommand)] + Subs(SubsCommand), } #[derive(Subcommand)] @@ -769,3 +780,9 @@ enum DbCommand { /// Acquires the lock on the DB Lock { cmd: String }, } + +#[derive(Subcommand)] +enum SubsCommand { + /// Acquires the lock on the DB + Info { hash: String }, +}