Skip to content

Commit

Permalink
warn log when very slow to process changes in subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Aug 29, 2024
1 parent e828252 commit b107ecf
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,9 @@ impl Matcher {
}

async fn cmd_loop(mut self, mut state_conn: CrConn, mut tripwire: Tripwire) {
const PROCESS_CHANGES_THRESHOLD: usize = 400;
const PROCESSING_WARN_THRESHOLD: Duration = Duration::from_secs(5);

info!(sub_id = %self.id, "Starting loop to run the subscription");
{
let (lock, cvar) = &*self.state;
Expand Down Expand Up @@ -1161,7 +1164,7 @@ impl Matcher {
}
last_db_version = Some(db_version);

if buf_count >= 500 {
if buf_count >= PROCESS_CHANGES_THRESHOLD {
if let Some(db_version) = last_db_version.take() {
Branch::NewCandidates((std::mem::take(&mut buf), db_version))
} else {
Expand Down Expand Up @@ -1206,6 +1209,9 @@ impl Matcher {
let elapsed = start.elapsed();
debug!(sub_id = %self.id, "processed {buf_count} changes for subscription in {elapsed:?}");
histogram!("corro.subs.changes.processing.duration.seconds", "sql_hash" => self.hash.clone()).record(elapsed);
if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!(sub_id = %self.id, "processed {buf_count} changes (very slowly) for subscription in {elapsed:?}");
}
buf_count = 0;
}
Branch::PurgeOldChanges => {
Expand Down

0 comments on commit b107ecf

Please sign in to comment.