Skip to content

Commit

Permalink
changes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime committed Feb 7, 2025
1 parent b627ae4 commit 4a4966d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
6 changes: 4 additions & 2 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ async fn make_replica_ctx(
) -> anyhow::Result<ReplicaContext> {
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
let subscriptions = Arc::new(RwLock::new(SubscriptionManager::default()));
let manager = subscriptions.clone();
let downgraded = Arc::downgrade(&subscriptions);
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);

// If an error occurs when evaluating a subscription,
Expand All @@ -543,7 +543,9 @@ async fn make_replica_ctx(
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let subscriptions = manager.clone();
let Some(subscriptions) = downgraded.upgrade() else {
break;
};
tokio::task::spawn_blocking(move || {
subscriptions.write().remove_dropped_clients();
})
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,13 @@ impl SubscriptionManager {
/// This will return an error if the client does not have a subscription with the given query id.
pub fn remove_subscription(&mut self, client_id: ClientId, query_id: ClientQueryId) -> Result<Query, DBError> {
let subscription_id = (client_id, query_id);
let Some(ci) = self.clients.get_mut(&client_id) else {
let Some(ci) = self
.clients
.get_mut(&client_id)
.filter(|ci| !ci.dropped.load(Ordering::Acquire))
else {
return Err(anyhow::anyhow!("Client not found: {:?}", client_id).into());
};
if ci.dropped.load(Ordering::Acquire) {
return Err(anyhow::anyhow!("Client not found: {:?}", client_id).into());
}
let Some(query_hash) = ci.subscriptions.remove(&subscription_id) else {
return Err(anyhow::anyhow!("Subscription not found: {:?}", subscription_id).into());
};
Expand Down

0 comments on commit 4a4966d

Please sign in to comment.