diff --git a/crates/abq_cli/src/report.rs b/crates/abq_cli/src/report.rs index fc938394..6b81724d 100644 --- a/crates/abq_cli/src/report.rs +++ b/crates/abq_cli/src/report.rs @@ -13,7 +13,7 @@ use abq_utils::{ self, entity::{Entity, WorkerRunner}, queue::AssociatedTestResults, - results::{ResultsLine, Summary}, + results::{OpaqueLazyAssociatedTestResults, ResultsLine, Summary}, runners::{TestResult, TestResultSpec}, workers::{RunId, WorkId}, }, @@ -52,10 +52,10 @@ pub(crate) async fn report_results( let reporters = build_reporters(reporter_kinds, stdout_preferences, test_suite_name, ONE); let mut stdout = stdout_preferences.stdout_stream(); - let all_results: Vec> = + let all_results: Vec = wait_for_results(abq, entity, run_id, results_timeout).await?; - process_results(&mut stdout, reporters, all_results.into_iter().flatten()) + process_results(&mut stdout, reporters, all_results.into_iter()) } pub(crate) async fn list_tests( @@ -67,12 +67,12 @@ pub(crate) async fn list_tests( worker: u32, runner: NonZeroUsize, ) -> anyhow::Result { - let all_results: Vec> = + let all_results: Vec = wait_for_results(abq, entity, run_id, results_timeout).await?; print_tests_for_runner( &mut stdout_preferences.stdout_stream(), - all_results.into_iter().flatten(), + all_results.into_iter(), WorkerRunner::from((worker, runner.get() as u32)), ); @@ -229,7 +229,7 @@ async fn wait_for_results( entity: Entity, run_id: RunId, results_timeout: Duration, -) -> anyhow::Result>> { +) -> anyhow::Result> { let queue_addr = abq.server_addr(); let client = abq.client_options_owned().build_async()?; @@ -251,7 +251,7 @@ async fn wait_for_results_help( client: Box, entity: Entity, run_id: RunId, -) -> anyhow::Result>> { +) -> anyhow::Result> { let mut attempt = 1; loop { let client = &client; @@ -267,50 +267,40 @@ async fn wait_for_results_help( }; net_protocol::async_write(&mut conn, &request).await?; - let mut results = Vec::with_capacity(1); - // TODO: as this is a hot loop of just fetching results, reporting would be more // interactive if we wrote results into a channel as they came in, with the // results processing happening on a separate thread. - loop { - use net_protocol::queue::TestResultsResponse::*; - let response = net_protocol::async_read(&mut conn).await?; - match response { - Results { chunk, final_chunk } => { - let chunk = chunk.decode().map_err(|e| { - anyhow!( - "failed to decode corrupted test results message: {}", - e.to_string() - ) - })?; + use net_protocol::queue::TestResultsResponse::*; + let response = net_protocol::async_read(&mut conn).await?; + match response { + StreamingResults => { + let mut stream = net_protocol::async_read_stream(&mut conn).await?; - results.push(chunk); + let results = + OpaqueLazyAssociatedTestResults::read_results_lines(&mut stream).await?; + let results = results.decode()?; - match final_chunk { - true => return Ok(results), - false => continue, - } - } - Pending => { - tracing::debug!( - attempt, - "deferring fetching results do to pending notification" - ); - tokio::time::sleep(PENDING_RESULTS_DELAY).await; - attempt += 1; - continue; - } - OutstandingRunners(tags) => { - let active_runners = tags - .into_iter() - .map(|t| t.to_string()) - .collect::>() - .join(", "); - - bail!("failed to fetch test results because the following runners are still active: {active_runners}") - } - Error(reason) => bail!("failed to fetch test results because {reason}"), + return Ok(results); + } + Pending => { + tracing::debug!( + attempt, + "deferring fetching results do to pending notification" + ); + tokio::time::sleep(PENDING_RESULTS_DELAY).await; + attempt += 1; + continue; } + OutstandingRunners(tags) => { + let active_runners = tags + .into_iter() + .map(|t| t.to_string()) + .collect::>() + .join(", "); + + bail!("failed to fetch test results because the following runners are still active: {active_runners}") + } + Error(reason) => bail!("failed to fetch test results because {reason}"), } } } @@ -397,7 +387,7 @@ mod test { use super::{print_tests_for_runner, process_results, wait_for_results_help}; #[tokio::test] - async fn fetches_chunked_tests() { + async fn fetches_streamed_tests() { let (server, client) = build_fake_server_client().await; let server_addr = server.local_addr().unwrap(); @@ -430,24 +420,25 @@ mod test { } )); - let chunks = [ - queue::TestResultsResponse::Results { - chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![ - serde_json::value::to_raw_value(results1).unwrap(), - ]), - final_chunk: false, - }, - queue::TestResultsResponse::Results { - chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![ - serde_json::value::to_raw_value(results2).unwrap(), - ]), - final_chunk: true, - }, - ]; + let results_buffer = OpaqueLazyAssociatedTestResults::into_jsonl_buffer(&[ + serde_json::value::to_raw_value(results1).unwrap(), + serde_json::value::to_raw_value(results2).unwrap(), + ]) + .unwrap(); - for chunk in chunks { - net_protocol::async_write(&mut conn, &chunk).await.unwrap(); - } + let mut results_buffer_slice = &results_buffer[..]; + + net_protocol::async_write(&mut conn, &queue::TestResultsResponse::StreamingResults) + .await + .unwrap(); + + net_protocol::async_write_stream( + &mut conn, + results_buffer.len(), + &mut results_buffer_slice, + ) + .await + .unwrap(); } }; @@ -457,7 +448,7 @@ mod test { let ((), results) = tokio::join!(server_task, client_task); let results = results.unwrap(); - let expected = [[results1], [results2]]; + let expected = [results1, results2]; assert_eq!(results, expected); } diff --git a/crates/abq_queue/src/persistence/results/in_memory.rs b/crates/abq_queue/src/persistence/results/in_memory.rs index f6f19259..7ad02d26 100644 --- a/crates/abq_queue/src/persistence/results/in_memory.rs +++ b/crates/abq_queue/src/persistence/results/in_memory.rs @@ -3,7 +3,10 @@ use std::{collections::HashMap, sync::Arc}; use abq_utils::{ error::{ErrorLocation, ResultLocation}, here, - net_protocol::{results::ResultsLine, workers::RunId}, + net_protocol::{ + results::{OpaqueLazyAssociatedTestResults, ResultsLine}, + workers::RunId, + }, }; use async_trait::async_trait; use serde_json::value::RawValue; @@ -46,11 +49,8 @@ impl PersistResults for InMemoryPersistor { .get(run_id) .ok_or_else(|| "results not found for run ID".located(here!()))?; - let mut readable_json_lines_buffer: Vec = Vec::new(); - for json_line in json_lines { - serde_json::to_writer(&mut readable_json_lines_buffer, json_line).located(here!())?; - readable_json_lines_buffer.push(b'\n'); - } + let readable_json_lines_buffer = + OpaqueLazyAssociatedTestResults::into_jsonl_buffer(json_lines).unwrap(); let len = readable_json_lines_buffer.len(); let mut slice = readable_json_lines_buffer.as_slice(); diff --git a/crates/abq_utils/src/net_protocol.rs b/crates/abq_utils/src/net_protocol.rs index bde747f3..9f4b6e12 100644 --- a/crates/abq_utils/src/net_protocol.rs +++ b/crates/abq_utils/src/net_protocol.rs @@ -767,6 +767,17 @@ pub mod results { } Ok(Self(opaque_jsonl)) } + + pub fn into_jsonl_buffer( + lines: &[Box], + ) -> OpaqueResult> { + let mut buffer: Vec = Vec::new(); + for json_line in lines { + serde_json::to_writer(&mut buffer, json_line).located(here!())?; + buffer.push(b'\n'); + } + Ok(buffer) + } } impl PartialEq for OpaqueLazyAssociatedTestResults {