Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream test results without batched loads into server memory #73

Merged
merged 6 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.6.2

ABQ 1.6.2 is a patch release fixing an issue that could result in
denial-of-service of an ABQ queue due to large test results.

## 1.6.1

ABQ 1.6.1 is a patch release fixing an issue that would not continue offloading
Expand Down
119 changes: 55 additions & 64 deletions crates/abq_cli/src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use abq_utils::{
self,
entity::{Entity, WorkerRunner},
queue::AssociatedTestResults,
results::{ResultsLine, Summary},
results::{OpaqueLazyAssociatedTestResults, ResultsLine, Summary},
runners::{TestResult, TestResultSpec},
workers::{RunId, WorkId},
},
Expand Down Expand Up @@ -52,10 +52,10 @@ pub(crate) async fn report_results(
let reporters = build_reporters(reporter_kinds, stdout_preferences, test_suite_name, ONE);
let mut stdout = stdout_preferences.stdout_stream();

let all_results: Vec<Vec<ResultsLine>> =
let all_results: Vec<ResultsLine> =
wait_for_results(abq, entity, run_id, results_timeout).await?;

process_results(&mut stdout, reporters, all_results.into_iter().flatten())
process_results(&mut stdout, reporters, all_results.into_iter())
}

pub(crate) async fn list_tests(
Expand All @@ -67,12 +67,12 @@ pub(crate) async fn list_tests(
worker: u32,
runner: NonZeroUsize,
) -> anyhow::Result<ExitCode> {
let all_results: Vec<Vec<ResultsLine>> =
let all_results: Vec<ResultsLine> =
wait_for_results(abq, entity, run_id, results_timeout).await?;

print_tests_for_runner(
&mut stdout_preferences.stdout_stream(),
all_results.into_iter().flatten(),
all_results.into_iter(),
WorkerRunner::from((worker, runner.get() as u32)),
);

Expand Down Expand Up @@ -229,7 +229,7 @@ async fn wait_for_results(
entity: Entity,
run_id: RunId,
results_timeout: Duration,
) -> anyhow::Result<Vec<Vec<ResultsLine>>> {
) -> anyhow::Result<Vec<ResultsLine>> {
let queue_addr = abq.server_addr();
let client = abq.client_options_owned().build_async()?;

Expand All @@ -251,7 +251,7 @@ async fn wait_for_results_help(
client: Box<dyn net_async::ConfiguredClient>,
entity: Entity,
run_id: RunId,
) -> anyhow::Result<Vec<Vec<ResultsLine>>> {
) -> anyhow::Result<Vec<ResultsLine>> {
let mut attempt = 1;
loop {
let client = &client;
Expand All @@ -267,50 +267,40 @@ async fn wait_for_results_help(
};
net_protocol::async_write(&mut conn, &request).await?;

let mut results = Vec::with_capacity(1);

// TODO: as this is a hot loop of just fetching results, reporting would be more
// interactive if we wrote results into a channel as they came in, with the
// results processing happening on a separate thread.
loop {
use net_protocol::queue::TestResultsResponse::*;
let response = net_protocol::async_read(&mut conn).await?;
match response {
Results { chunk, final_chunk } => {
let chunk = chunk.decode().map_err(|e| {
anyhow!(
"failed to decode corrupted test results message: {}",
e.to_string()
)
})?;
use net_protocol::queue::TestResultsResponse::*;
let response = net_protocol::async_read(&mut conn).await?;
match response {
StreamingResults => {
let mut stream = net_protocol::async_read_stream(&mut conn).await?;

results.push(chunk);
let results =
OpaqueLazyAssociatedTestResults::read_results_lines(&mut stream).await?;
let results = results.decode()?;

match final_chunk {
true => return Ok(results),
false => continue,
}
}
Pending => {
tracing::debug!(
attempt,
"deferring fetching results do to pending notification"
);
tokio::time::sleep(PENDING_RESULTS_DELAY).await;
attempt += 1;
continue;
}
OutstandingRunners(tags) => {
let active_runners = tags
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>()
.join(", ");

bail!("failed to fetch test results because the following runners are still active: {active_runners}")
}
Error(reason) => bail!("failed to fetch test results because {reason}"),
return Ok(results);
}
Pending => {
tracing::debug!(
attempt,
"deferring fetching results do to pending notification"
);
tokio::time::sleep(PENDING_RESULTS_DELAY).await;
attempt += 1;
continue;
}
OutstandingRunners(tags) => {
let active_runners = tags
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>()
.join(", ");

bail!("failed to fetch test results because the following runners are still active: {active_runners}")
}
Error(reason) => bail!("failed to fetch test results because {reason}"),
}
}
}
Expand Down Expand Up @@ -397,7 +387,7 @@ mod test {
use super::{print_tests_for_runner, process_results, wait_for_results_help};

#[tokio::test]
async fn fetches_chunked_tests() {
async fn fetches_streamed_tests() {
let (server, client) = build_fake_server_client().await;
let server_addr = server.local_addr().unwrap();

Expand Down Expand Up @@ -430,24 +420,25 @@ mod test {
}
));

let chunks = [
queue::TestResultsResponse::Results {
chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![
serde_json::value::to_raw_value(results1).unwrap(),
]),
final_chunk: false,
},
queue::TestResultsResponse::Results {
chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![
serde_json::value::to_raw_value(results2).unwrap(),
]),
final_chunk: true,
},
];
let results_buffer = OpaqueLazyAssociatedTestResults::into_jsonl_buffer(&[
serde_json::value::to_raw_value(results1).unwrap(),
serde_json::value::to_raw_value(results2).unwrap(),
])
.unwrap();

for chunk in chunks {
net_protocol::async_write(&mut conn, &chunk).await.unwrap();
}
let mut results_buffer_slice = &results_buffer[..];

net_protocol::async_write(&mut conn, &queue::TestResultsResponse::StreamingResults)
.await
.unwrap();

net_protocol::async_write_stream(
&mut conn,
results_buffer.len(),
&mut results_buffer_slice,
)
.await
.unwrap();
}
};

Expand All @@ -457,7 +448,7 @@ mod test {
let ((), results) = tokio::join!(server_task, client_task);

let results = results.unwrap();
let expected = [[results1], [results2]];
let expected = [results1, results2];
assert_eq!(results, expected);
}

Expand Down
69 changes: 45 additions & 24 deletions crates/abq_queue/src/persistence/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

mod fs;
mod in_memory;
#[cfg(test)]
pub(crate) mod test_utils;

pub use fs::FilesystemPersistor;
pub use in_memory::InMemoryPersistor;
Expand All @@ -13,14 +15,29 @@ use abq_utils::{
error::LocatedError,
net_protocol::{
queue::AssociatedTestResults,
results::{OpaqueLazyAssociatedTestResults, ResultsLine, Summary},
results::{ResultsLine, Summary},
workers::RunId,
},
};
use async_trait::async_trait;

type Result<T> = std::result::Result<T, LocatedError>;

pub type OpaqueAsyncReader<'a> = dyn tokio::io::AsyncRead + Send + Unpin + 'a;

pub struct ResultsStream<'a> {
pub stream: Box<&'a mut OpaqueAsyncReader<'a>>,
pub len: usize,
}

#[async_trait]
pub trait WithResultsStream {
async fn with_results_stream<'a>(
self: Box<Self>,
results_stream: ResultsStream<'a>,
) -> Result<()>;
}

#[async_trait]
pub trait PersistResults: Send + Sync {
/// Dumps a summary line.
Expand All @@ -29,8 +46,12 @@ pub trait PersistResults: Send + Sync {
/// Dumps the persisted results to a remote, if any is configured.
async fn dump_to_remote(&self, run_id: &RunId) -> Result<()>;

/// Load a set of test results as [OpaqueLazyAssociatedTestResults].
async fn get_results(&self, run_id: &RunId) -> Result<OpaqueLazyAssociatedTestResults>;
/// Execute a closure with access to a stream of raw bytes interpretable as [OpaqueLazyAssociatedTestResults].
async fn with_results_stream(
&self,
run_id: &RunId,
f: Box<dyn WithResultsStream + Send>,
) -> Result<()>;

fn boxed_clone(&self) -> Box<dyn PersistResults>;
}
Expand Down Expand Up @@ -136,16 +157,21 @@ impl ResultsPersistedCell {
}
}

pub fn eligible_to_retrieve(&self) -> bool {
self.0.processing.load(atomic::ORDERING) == 0
}

/// Attempts to retrieve a set of test results.
/// If there are persistence jobs pending, returns [None].
pub async fn retrieve(
pub async fn retrieve_with_callback(
&self,
persistence: &SharedPersistResults,
) -> Option<Result<OpaqueLazyAssociatedTestResults>> {
if self.0.processing.load(atomic::ORDERING) != 0 {
return None;
}
Some(persistence.0.get_results(&self.0.run_id).await)
callback: Box<dyn WithResultsStream + Send>,
) -> Result<()> {
persistence
.0
.with_results_stream(&self.0.run_id, callback)
.await
}
}

Expand Down Expand Up @@ -197,20 +223,17 @@ mod test {

use crate::persistence::{
remote::{self, fake_error, PersistenceKind},
results::EligibleForRemoteDump,
results::{test_utils::retrieve_results, EligibleForRemoteDump},
};

use super::{fs::FilesystemPersistor, ResultsPersistedCell};

#[tokio::test]
async fn retrieve_is_none_while_pending() {
let tempdir = tempfile::tempdir().unwrap();
let persistence = FilesystemPersistor::new_shared(tempdir.path(), 1, remote::NoopPersister);

async fn not_eligible_to_retrieve_while_there_are_pending_results() {
let cell = ResultsPersistedCell::new(RunId::unique());
cell.0.processing.fetch_add(1, atomic::ORDERING);

assert!(cell.retrieve(&persistence).await.is_none());
assert!(!cell.eligible_to_retrieve());
}

#[tokio::test]
Expand All @@ -232,8 +255,8 @@ mod test {

let cell = ResultsPersistedCell::new(RunId::unique());

let retrieved = cell.retrieve(&persistence).await.unwrap().unwrap();
let results = retrieved.decode().unwrap();
let results = retrieve_results(&cell, &persistence).await.unwrap();
let results = results.decode().unwrap();
assert!(results.is_empty());
}

Expand Down Expand Up @@ -265,12 +288,11 @@ mod test {
// That's okay. But the retrieved must definitely include at least results1.
let retrieve_task = {
async {
loop {
match cell.retrieve(&persistence).await {
None => tokio::time::sleep(Duration::from_micros(1)).await,
Some(results) => break results,
}
while !cell.eligible_to_retrieve() {
tokio::time::sleep(Duration::from_micros(1)).await;
}

retrieve_results(&cell, &persistence).await
}
};
let persist_task = async {
Expand All @@ -283,8 +305,7 @@ mod test {
};
let ((), retrieve_result) = tokio::join!(persist_task, retrieve_task);

let retrieved = retrieve_result.unwrap();
let results = retrieved.decode().unwrap();
let results = retrieve_result.unwrap().decode().unwrap();

use ResultsLine::Results;
match results.len() {
Expand Down
Loading
Loading