Skip to content

Commit

Permalink
add info command for subs
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Sep 18, 2024
1 parent e35acab commit fd6e7f1
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 1 deletion.
38 changes: 38 additions & 0 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub enum Command {
Locks { top: usize },
Cluster(ClusterCommand),
Actor(ActorCommand),
Subs(SubsCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -104,6 +105,11 @@ pub enum SyncCommand {
ReconcileGaps,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SubsCommand {
Info { hash: String },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClusterCommand {
Rejoin,
Expand Down Expand Up @@ -521,6 +527,38 @@ async fn handle_conn(

send_success(&mut stream).await;
}
Command::Subs(SubsCommand::Info { hash }) => {
match agent.subs_manager().get_by_hash(&hash) {
Some(matcher) => {
let statements = matcher
.cached_stmts()
.iter()
.map(|(table, stmts)| {
json!({
table: stmts.new_query(),
})
})
.collect::<Vec<_>>();
send(
&mut stream,
Response::Json(serde_json::json!({
"id": matcher.id(),
"path": matcher.subs_path(),
"last_change_id": matcher.last_change_id_sent(),
"original query": matcher.sql(),
"statement": statements,
})),
)
.await;
send_success(&mut stream).await;
}
None => {
send_error(&mut stream, format!("unknown subscription hash: {hash}"))
.await;
continue;
}
};
}
},
Ok(None) => {
debug!("done with admin conn");
Expand Down
34 changes: 34 additions & 0 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl SubsManager {
self.0.read().get_by_query(sql)
}

pub fn get_by_hash(&self, hash: &str) -> Option<MatcherHandle> {
self.0.read().get_by_hash(hash)
}

pub fn get_or_insert(
&self,
sql: &str,
Expand Down Expand Up @@ -328,6 +332,13 @@ impl InnerSubsManager {
.and_then(|id| self.handles.get(id).cloned())
}

pub fn get_by_hash(&self, hash: &str) -> Option<MatcherHandle> {
self.handles
.values()
.find(|x| x.inner.hash == hash)
.cloned()
}

fn remove(&mut self, id: &Uuid) -> Option<MatcherHandle> {
let handle = self.handles.remove(id)?;
self.queries.remove(&handle.inner.sql);
Expand Down Expand Up @@ -364,6 +375,9 @@ struct InnerMatcherHandle {
cancel: CancellationToken,
changes_tx: mpsc::Sender<(MatchCandidates, CrsqlDbVersion)>,
last_change_rx: watch::Receiver<ChangeId>,
// some state from the matcher so we can take a look later
subs_path: String,
cached_statements: HashMap<String, MatcherStmt>,
}

type MatchCandidates = IndexMap<TableName, IndexSet<Vec<u8>>>;
Expand All @@ -373,6 +387,10 @@ impl MatcherHandle {
self.inner.id
}

pub fn sql(&self) -> &String {
&self.inner.sql
}

pub fn hash(&self) -> &str {
&self.inner.hash
}
Expand All @@ -385,6 +403,14 @@ impl MatcherHandle {
&self.inner.col_names
}

pub fn subs_path(&self) -> &String {
&self.inner.subs_path
}

pub fn cached_stmts(&self) -> &HashMap<String, MatcherStmt> {
&self.inner.cached_statements
}

pub async fn cleanup(self) {
self.inner.cancel.cancel();
info!(sub_id = %self.inner.id, "Canceled subscription");
Expand Down Expand Up @@ -593,6 +619,12 @@ pub struct MatcherStmt {
temp_query: String,
}

impl MatcherStmt {
pub fn new_query(self: &Self) -> &String {
return &self.new_query;
}
}

const CHANGE_ID_COL: &str = "id";
const CHANGE_TYPE_COL: &str = "type";

Expand Down Expand Up @@ -819,6 +851,8 @@ impl Matcher {
cancel: cancel.clone(),
last_change_rx,
changes_tx,
cached_statements: statements.clone(),
subs_path: sub_path.to_string(),
}),
state: state.clone(),
};
Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ impl From<SyncStateV1> for SyncMessage {
}
}


// generates a `SyncMessage` to tell another node what versions we're missing
#[tracing::instrument(skip_all, level = "debug")]
pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncStateV1 {
Expand Down
17 changes: 17 additions & 0 deletions crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> {
info!("Exited with code: {:?}", exit.code());
std::process::exit(exit.code().unwrap_or(1));
}
Command::Subs(SubsCommand::Info { hash }) => {
let mut conn = AdminConn::connect(cli.admin_path()).await?;
conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::Info {
hash: hash.to_string(),
}))
.await?;
}
}

Ok(())
Expand Down Expand Up @@ -686,6 +693,10 @@ enum Command {
/// DB-related commands
#[command(subcommand)]
Db(DbCommand),

/// Subscription related commands
#[command(subcommand)]
Subs(SubsCommand),
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -769,3 +780,9 @@ enum DbCommand {
/// Acquires the lock on the DB
Lock { cmd: String },
}

#[derive(Subcommand)]
enum SubsCommand {
/// Acquires the lock on the DB
Info { hash: String },
}

0 comments on commit fd6e7f1

Please sign in to comment.