Skip to content

Commit

Permalink
Add client-side implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ayazhafiz committed Aug 1, 2023
1 parent ee12154 commit 3654472
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 70 deletions.
119 changes: 55 additions & 64 deletions crates/abq_cli/src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use abq_utils::{
self,
entity::{Entity, WorkerRunner},
queue::AssociatedTestResults,
results::{ResultsLine, Summary},
results::{OpaqueLazyAssociatedTestResults, ResultsLine, Summary},
runners::{TestResult, TestResultSpec},
workers::{RunId, WorkId},
},
Expand Down Expand Up @@ -52,10 +52,10 @@ pub(crate) async fn report_results(
let reporters = build_reporters(reporter_kinds, stdout_preferences, test_suite_name, ONE);
let mut stdout = stdout_preferences.stdout_stream();

let all_results: Vec<Vec<ResultsLine>> =
let all_results: Vec<ResultsLine> =
wait_for_results(abq, entity, run_id, results_timeout).await?;

process_results(&mut stdout, reporters, all_results.into_iter().flatten())
process_results(&mut stdout, reporters, all_results.into_iter())
}

pub(crate) async fn list_tests(
Expand All @@ -67,12 +67,12 @@ pub(crate) async fn list_tests(
worker: u32,
runner: NonZeroUsize,
) -> anyhow::Result<ExitCode> {
let all_results: Vec<Vec<ResultsLine>> =
let all_results: Vec<ResultsLine> =
wait_for_results(abq, entity, run_id, results_timeout).await?;

print_tests_for_runner(
&mut stdout_preferences.stdout_stream(),
all_results.into_iter().flatten(),
all_results.into_iter(),
WorkerRunner::from((worker, runner.get() as u32)),
);

Expand Down Expand Up @@ -229,7 +229,7 @@ async fn wait_for_results(
entity: Entity,
run_id: RunId,
results_timeout: Duration,
) -> anyhow::Result<Vec<Vec<ResultsLine>>> {
) -> anyhow::Result<Vec<ResultsLine>> {
let queue_addr = abq.server_addr();
let client = abq.client_options_owned().build_async()?;

Expand All @@ -251,7 +251,7 @@ async fn wait_for_results_help(
client: Box<dyn net_async::ConfiguredClient>,
entity: Entity,
run_id: RunId,
) -> anyhow::Result<Vec<Vec<ResultsLine>>> {
) -> anyhow::Result<Vec<ResultsLine>> {
let mut attempt = 1;
loop {
let client = &client;
Expand All @@ -267,50 +267,40 @@ async fn wait_for_results_help(
};
net_protocol::async_write(&mut conn, &request).await?;

let mut results = Vec::with_capacity(1);

// TODO: as this is a hot loop of just fetching results, reporting would be more
// interactive if we wrote results into a channel as they came in, with the
// results processing happening on a separate thread.
loop {
use net_protocol::queue::TestResultsResponse::*;
let response = net_protocol::async_read(&mut conn).await?;
match response {
Results { chunk, final_chunk } => {
let chunk = chunk.decode().map_err(|e| {
anyhow!(
"failed to decode corrupted test results message: {}",
e.to_string()
)
})?;
use net_protocol::queue::TestResultsResponse::*;
let response = net_protocol::async_read(&mut conn).await?;
match response {
StreamingResults => {
let mut stream = net_protocol::async_read_stream(&mut conn).await?;

results.push(chunk);
let results =
OpaqueLazyAssociatedTestResults::read_results_lines(&mut stream).await?;
let results = results.decode()?;

match final_chunk {
true => return Ok(results),
false => continue,
}
}
Pending => {
tracing::debug!(
attempt,
"deferring fetching results do to pending notification"
);
tokio::time::sleep(PENDING_RESULTS_DELAY).await;
attempt += 1;
continue;
}
OutstandingRunners(tags) => {
let active_runners = tags
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>()
.join(", ");

bail!("failed to fetch test results because the following runners are still active: {active_runners}")
}
Error(reason) => bail!("failed to fetch test results because {reason}"),
return Ok(results);
}
Pending => {
tracing::debug!(
attempt,
"deferring fetching results do to pending notification"
);
tokio::time::sleep(PENDING_RESULTS_DELAY).await;
attempt += 1;
continue;
}
OutstandingRunners(tags) => {
let active_runners = tags
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>()
.join(", ");

bail!("failed to fetch test results because the following runners are still active: {active_runners}")
}
Error(reason) => bail!("failed to fetch test results because {reason}"),
}
}
}
Expand Down Expand Up @@ -397,7 +387,7 @@ mod test {
use super::{print_tests_for_runner, process_results, wait_for_results_help};

#[tokio::test]
async fn fetches_chunked_tests() {
async fn fetches_streamed_tests() {
let (server, client) = build_fake_server_client().await;
let server_addr = server.local_addr().unwrap();

Expand Down Expand Up @@ -430,24 +420,25 @@ mod test {
}
));

let chunks = [
queue::TestResultsResponse::Results {
chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![
serde_json::value::to_raw_value(results1).unwrap(),
]),
final_chunk: false,
},
queue::TestResultsResponse::Results {
chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![
serde_json::value::to_raw_value(results2).unwrap(),
]),
final_chunk: true,
},
];
let results_buffer = OpaqueLazyAssociatedTestResults::into_jsonl_buffer(&[
serde_json::value::to_raw_value(results1).unwrap(),
serde_json::value::to_raw_value(results2).unwrap(),
])
.unwrap();

for chunk in chunks {
net_protocol::async_write(&mut conn, &chunk).await.unwrap();
}
let mut results_buffer_slice = &results_buffer[..];

net_protocol::async_write(&mut conn, &queue::TestResultsResponse::StreamingResults)
.await
.unwrap();

net_protocol::async_write_stream(
&mut conn,
results_buffer.len(),
&mut results_buffer_slice,
)
.await
.unwrap();
}
};

Expand All @@ -457,7 +448,7 @@ mod test {
let ((), results) = tokio::join!(server_task, client_task);

let results = results.unwrap();
let expected = [[results1], [results2]];
let expected = [results1, results2];
assert_eq!(results, expected);
}

Expand Down
12 changes: 6 additions & 6 deletions crates/abq_queue/src/persistence/results/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::{collections::HashMap, sync::Arc};
use abq_utils::{
error::{ErrorLocation, ResultLocation},
here,
net_protocol::{results::ResultsLine, workers::RunId},
net_protocol::{
results::{OpaqueLazyAssociatedTestResults, ResultsLine},
workers::RunId,
},
};
use async_trait::async_trait;
use serde_json::value::RawValue;
Expand Down Expand Up @@ -46,11 +49,8 @@ impl PersistResults for InMemoryPersistor {
.get(run_id)
.ok_or_else(|| "results not found for run ID".located(here!()))?;

let mut readable_json_lines_buffer: Vec<u8> = Vec::new();
for json_line in json_lines {
serde_json::to_writer(&mut readable_json_lines_buffer, json_line).located(here!())?;
readable_json_lines_buffer.push(b'\n');
}
let readable_json_lines_buffer =
OpaqueLazyAssociatedTestResults::into_jsonl_buffer(json_lines).unwrap();

let len = readable_json_lines_buffer.len();
let mut slice = readable_json_lines_buffer.as_slice();
Expand Down
11 changes: 11 additions & 0 deletions crates/abq_utils/src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,17 @@ pub mod results {
}
Ok(Self(opaque_jsonl))
}

pub fn into_jsonl_buffer(
lines: &[Box<serde_json::value::RawValue>],
) -> OpaqueResult<Vec<u8>> {
let mut buffer: Vec<u8> = Vec::new();
for json_line in lines {
serde_json::to_writer(&mut buffer, json_line).located(here!())?;
buffer.push(b'\n');
}
Ok(buffer)
}
}

impl PartialEq for OpaqueLazyAssociatedTestResults {
Expand Down

0 comments on commit 3654472

Please sign in to comment.