diff --git a/Cargo.toml b/Cargo.toml index deb00996..e646354f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ rand_chacha = "0.3.1" blake3 = "=1.4.0" -tracing = { version = "0.1.37", features = ["release_max_level_info"] } +tracing = { version = "0.1.37", features = ["release_max_level_debug"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] } tracing-appender = "0.2.2" diff --git a/_typos.toml b/_typos.toml new file mode 100644 index 00000000..c1103a5f --- /dev/null +++ b/_typos.toml @@ -0,0 +1,2 @@ +[default.extend-words] +acception = "acception" diff --git a/crates/abq_cli/src/workers.rs b/crates/abq_cli/src/workers.rs index 46daa2a8..f2b716bd 100644 --- a/crates/abq_cli/src/workers.rs +++ b/crates/abq_cli/src/workers.rs @@ -178,26 +178,28 @@ async fn do_shutdown( let (suite_result, errors) = finalized_reporters.finish(&completed_summary); - for error in errors { - eprintln!("{error}"); - } + if let WorkersExitStatus::Completed { .. } = status { + for error in errors { + eprintln!("{error}"); + } - print!("\n\n"); - suite_result - .write_short_summary_lines(&mut stdout, ShortSummaryGrouping::Runner) - .unwrap(); - println!("\n"); - if execution_mode == ExecutionMode::WriteNormal { - println!("Run the following command to replay these tests locally:"); - println!("\n"); - println!( - "\tabq test --run-id {} --worker {} --num {} -- ", - run_id, - worker_tag.index(), - num_runners, - ); + print!("\n\n"); + suite_result + .write_short_summary_lines(&mut stdout, ShortSummaryGrouping::Runner) + .unwrap(); println!("\n"); - println!("Specify your Access Token with the RWX_ACCESS_TOKEN env variable, passing --access-token, or running `abq login`."); + if execution_mode == ExecutionMode::WriteNormal { + println!("Run the following command to replay these tests locally:"); + println!("\n"); + println!( + "\tabq test --run-id {} --worker {} --num {} -- ", + run_id, + worker_tag.index(), + num_runners, + ); + println!("\n"); + println!("Specify your Access Token with the RWX_ACCESS_TOKEN env variable, passing --access-token, or running `abq login`."); + } } // If the workers didn't fault, exit with whatever status the test suite run is at; otherwise, diff --git a/crates/abq_queue/src/queue.rs b/crates/abq_queue/src/queue.rs index 1cf01f84..8a086cd3 100644 --- a/crates/abq_queue/src/queue.rs +++ b/crates/abq_queue/src/queue.rs @@ -136,14 +136,12 @@ enum RunState { #[derive(Debug)] enum ManifestPersistence { Persisted(ManifestPersistedCell), - ManifestNeverReceived, EmptyManifest, } #[derive(Debug)] enum ResultsPersistence { Persisted(ResultsPersistedCell), - ManifestNeverReceived, } const MAX_BATCH_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(100) }; @@ -283,8 +281,6 @@ enum WriteResultsError { RunNotFound, #[error("attempting to write results before manifest received")] WaitingForManifest, - #[error("attempting to write results when manifest failed to be generated")] - ManifestNeverReceived, #[error("attempting to write results for cancelled run")] RunCancelled, } @@ -295,8 +291,6 @@ enum ReadResultsError { RunNotFound, #[error("results cannot be read before manifest is received")] WaitingForManifest, - #[error("a manifest failed to be generated")] - ManifestNeverReceived, #[error("the run was cancelled before all test results were received")] RunCancelled, } @@ -504,9 +498,7 @@ impl AllRuns { AssignedRunStatus::AlreadyDone { exit_code } } } - RunState::Cancelled { .. } => AssignedRunStatus::AlreadyDone { - exit_code: ExitCode::CANCELLED, - }, + RunState::Cancelled { reason } => AssignedRunStatus::Cancelled { reason: *reason }, } } @@ -938,7 +930,6 @@ impl AllRuns { ResultsPersistence::Persisted(cell) => { Ok((cell.clone(), EligibleForRemoteDump::Yes)) } - ResultsPersistence::ManifestNeverReceived => Err(ManifestNeverReceived), }, RunState::Cancelled { .. } => Err(RunCancelled), } @@ -990,7 +981,6 @@ impl AllRuns { Ok(ReadResultsState::RunInProgress { active_runners }) } } - ResultsPersistence::ManifestNeverReceived => Err(ManifestNeverReceived), } } RunState::Cancelled { .. } => Err(RunCancelled), @@ -1050,9 +1040,6 @@ impl AllRuns { RetryManifestState::NotYetPersisted } } - ManifestPersistence::ManifestNeverReceived => { - RetryManifestState::Error(RetryManifestError::ManifestNeverReceived) - } ManifestPersistence::EmptyManifest => { // Ship the empty manifest over. RetryManifestState::Manifest(NextWorkBundle { @@ -1214,37 +1201,22 @@ impl AllRuns { let mut run = runs.get(&run_id).expect("no run recorded").write(); - let test_command_hash = match run.state { - RunState::WaitingForManifest { - test_command_hash, .. - } => { - // okay - test_command_hash + match run.state { + RunState::WaitingForManifest { .. } => { + run.state = RunState::Cancelled { reason: CancelReason::ManifestNeverReceived }; + // NB: Always sub last for conversative estimation. + self.num_active.fetch_sub(1, atomic::ORDERING); } RunState::Cancelled { .. } => { // No-op, since the run was already cancelled. - return; } RunState::HasWork { .. } | RunState::InitialManifestDone { .. } => { illegal_state!( "attempting to mark failed to receive manifest after manifest was received", ?run_id ); - return; } }; - - run.state = RunState::InitialManifestDone { - new_worker_exit_code: ExitCode::FAILURE, - init_metadata: Default::default(), - seen_workers: Default::default(), - manifest_persistence: ManifestPersistence::ManifestNeverReceived, - results_persistence: ResultsPersistence::ManifestNeverReceived, - test_command_hash: Some(test_command_hash), - }; - - // NB: Always sub last for conversative estimation. - self.num_active.fetch_sub(1, atomic::ORDERING); } /// Marks a run as complete because it had the trivial manifest. @@ -2221,7 +2193,7 @@ impl QueueServer { mut stream: Box, ) -> OpaqueResult<()> { // If a worker failed to generate a manifest, or the manifest is empty, - // we're going to immediately end the test run. + // we're going to immediately cancel the test run. // // In the former case this indicates a failure in the underlying test runners, // and in the latter case we have nothing to do. @@ -4978,21 +4950,6 @@ mod persist_results { Ok(ReadResultsState::RunInProgress { active_runners }) if active_runners == &[Tag::runner(2, 1)] } - get_read_results_cell! { - get_read_results_cell_when_done_with_manifest_never_received, - { - RunState::InitialManifestDone { - new_worker_exit_code: ExitCode::SUCCESS, - init_metadata: Default::default(), - seen_workers: Default::default(), - results_persistence: ResultsPersistence::ManifestNeverReceived, - manifest_persistence: ManifestPersistence::EmptyManifest, - test_command_hash: Some(TestCommandHash::random()), - } - }, - Err(ReadResultsError::ManifestNeverReceived) - } - get_read_results_cell! { get_read_results_cell_when_cancelled, { diff --git a/crates/abq_utils/src/net_protocol.rs b/crates/abq_utils/src/net_protocol.rs index 4ba92b20..6386ead3 100644 --- a/crates/abq_utils/src/net_protocol.rs +++ b/crates/abq_utils/src/net_protocol.rs @@ -618,6 +618,8 @@ pub mod queue { User, /// Timed out because no progress was made popping items off the manifest. ManifestHadNoProgress, + /// Timed out because the manifest was never received + ManifestNeverReceived, } /// A request sent to the queue. diff --git a/crates/abq_workers/src/assigned_run.rs b/crates/abq_workers/src/assigned_run.rs index 5c12724c..e7b7d2c6 100644 --- a/crates/abq_workers/src/assigned_run.rs +++ b/crates/abq_workers/src/assigned_run.rs @@ -1,4 +1,4 @@ -use abq_utils::net_protocol::{entity::Entity, queue::InvokeWork}; +use abq_utils::net_protocol::{entity::Entity, queue::{CancelReason, InvokeWork}}; use async_trait::async_trait; use serde_derive::{Deserialize, Serialize}; @@ -27,6 +27,9 @@ pub enum AssignedRunStatus { AlreadyDone { exit_code: abq_utils::exit::ExitCode, }, + Cancelled { + reason: CancelReason, + }, FatalError(String), } diff --git a/crates/abq_workers/src/negotiate.rs b/crates/abq_workers/src/negotiate.rs index eabb32d6..fcae49cd 100644 --- a/crates/abq_workers/src/negotiate.rs +++ b/crates/abq_workers/src/negotiate.rs @@ -32,7 +32,7 @@ use abq_utils::{ entity::{Entity, WorkerTag}, meta::DeprecationRecord, publicize_addr, - queue::{InvokeWork, NegotiatorInfo}, + queue::{CancelReason, InvokeWork, NegotiatorInfo}, workers::{RunId, RunnerKind}, }, results_handler::SharedResultsHandler, @@ -81,6 +81,9 @@ enum MessageFromQueueNegotiator { RunAlreadyCompleted { exit_code: ExitCode, }, + RunCancelled { + reason: CancelReason, + }, /// The context a worker set should execute a run with. ExecutionContext(ExecutionContext), RunUnknown, @@ -141,6 +144,8 @@ pub struct WorkersNegotiator(Box, WorkerContext); pub enum NegotiatedWorkers { /// No more workers were created, because there is no more work to be done. Redundant { exit_code: ExitCode }, + /// No more workers were created because the run is cancelled + Cancelled { error: String }, /// A pool of workers were created. Pool(WorkerPool), } @@ -155,6 +160,13 @@ impl NegotiatedWorkers { process_outputs: Default::default(), native_runner_info: None, }, + NegotiatedWorkers::Cancelled { error } => WorkersExit { + status: WorkersExitStatus::Error { errors: vec![error.to_string()] }, + manifest_generation_output: None, + final_stdio_outputs: Default::default(), + process_outputs: Default::default(), + native_runner_info: None, + }, NegotiatedWorkers::Pool(pool) => pool.shutdown().await, } } @@ -162,6 +174,7 @@ impl NegotiatedWorkers { pub async fn cancel(&mut self) { match self { NegotiatedWorkers::Redundant { .. } => {} + NegotiatedWorkers::Cancelled { .. } => {} NegotiatedWorkers::Pool(pool) => pool.cancel().await, } } @@ -170,6 +183,7 @@ impl NegotiatedWorkers { pub async fn wait(&mut self) { match self { NegotiatedWorkers::Redundant { .. } => {} + NegotiatedWorkers::Cancelled { .. } => {} NegotiatedWorkers::Pool(pool) => pool.wait().await, } } @@ -177,6 +191,7 @@ impl NegotiatedWorkers { pub fn workers_alive(&self) -> bool { match self { NegotiatedWorkers::Redundant { .. } => false, + NegotiatedWorkers::Cancelled { .. } => false, NegotiatedWorkers::Pool(pool) => pool.workers_alive(), } } @@ -325,6 +340,14 @@ async fn wait_for_execution_context( let worker_set_decision = match net_protocol::async_read(&mut conn).await? { MessageFromQueueNegotiator::ExecutionContext(ctx) => Ok(ctx), + MessageFromQueueNegotiator::RunCancelled { reason } => { + let error_suffix = match reason { + CancelReason::User => "a worker received a cancellation signal while still working on tests.", + CancelReason::ManifestHadNoProgress => "the run timed out before any tests were completed.", + CancelReason::ManifestNeverReceived => "the run timed out before the test manifest was received." + }; + Err(NegotiatedWorkers::Cancelled { error: format!("{}{}", "Error: This ABQ run was cancelled. When an ABQ run is cancelled, it can no longer be retried. You must start a run with a new run ID instead.\nThis run was cancelled because ", error_suffix) }) + } MessageFromQueueNegotiator::RunAlreadyCompleted { exit_code } => { Err(NegotiatedWorkers::Redundant { exit_code }) } @@ -588,6 +611,10 @@ impl QueueNegotiator { tracing::debug!(?run_id, "run already completed"); MessageFromQueueNegotiator::RunAlreadyCompleted { exit_code } } + Cancelled { reason } => { + tracing::debug!(?run_id, "run cancelled"); + MessageFromQueueNegotiator::RunCancelled { reason } + } RunUnknown => { tracing::debug!(?run_id, "run not yet known"); MessageFromQueueNegotiator::RunUnknown