Skip to content

Commit

Permalink
chore(tests): Parallelize the adaptive concurrency tests (#21343)
Browse files Browse the repository at this point in the history
* Add adaptive concurrency settings to test params

* Use `rstest` to break out the adaptive concurrency tests
  • Loading branch information
bruceg authored Sep 24, 2024
1 parent ca0fa05 commit e172732
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 35 deletions.
56 changes: 21 additions & 35 deletions src/sinks/util/adaptive_concurrency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use core::task::Context;
use std::{
collections::{HashMap, VecDeque},
fmt,
fs::{read_dir, File},
fs::File,
future::pending,
io::Read,
path::PathBuf,
Expand All @@ -21,6 +21,7 @@ use futures::{
};
use rand::{thread_rng, Rng};
use rand_distr::Exp1;
use rstest::*;
use serde::Deserialize;
use snafu::Snafu;
use tokio::time::{self, sleep, Duration, Instant};
Expand All @@ -29,6 +30,7 @@ use vector_lib::configurable::configurable_component;
use vector_lib::json_size::JsonSize;

use super::controller::ControllerStatistics;
use super::AdaptiveConcurrencySettings;
use crate::{
config::{self, AcknowledgementsConfig, Input, SinkConfig, SinkContext},
event::{metric::MetricValue, Event},
Expand Down Expand Up @@ -136,6 +138,10 @@ struct TestParams {
#[configurable(derived)]
#[serde(default = "default_concurrency")]
concurrency: Concurrency,

#[configurable(derived)]
#[serde(default)]
adaptive_concurrency: AdaptiveConcurrencySettings,
}

const fn default_interval() -> f64 {
Expand Down Expand Up @@ -418,6 +424,7 @@ async fn run_test(params: TestParams) -> TestResults {
rate_limit_num: 9999,
timeout_secs: 1,
retry_jitter_mode: JitterMode::None,
adaptive_concurrency: params.adaptive_concurrency,
..Default::default()
},
params,
Expand Down Expand Up @@ -617,9 +624,7 @@ struct TestInput {
controller: ControllerResults,
}

async fn run_compare(file_path: PathBuf, input: TestInput) {
eprintln!("Running test in {:?}", file_path);

async fn run_compare(input: TestInput) {
let results = run_test(input.params).await;

let mut failures = Vec::new();
Expand Down Expand Up @@ -667,42 +672,23 @@ async fn run_compare(file_path: PathBuf, input: TestInput) {
assert!(failures.is_empty(), "{:#?}", results);
}

#[rstest]
#[tokio::test]
async fn all_tests() {
const PATH: &str = "tests/data/adaptive-concurrency";

// Read and parse everything first
let mut entries = read_dir(PATH)
.expect("Could not open data directory")
.map(|entry| entry.expect("Could not read data directory").path())
.filter_map(|file_path| {
if (file_path.extension().map(|ext| ext == "toml")).unwrap_or(false) {
let mut data = String::new();
File::open(&file_path)
.unwrap()
.read_to_string(&mut data)
.unwrap();
let input: TestInput = toml::from_str(&data)
.unwrap_or_else(|error| panic!("Invalid TOML in {:?}: {:?}", file_path, error));
Some((file_path, input))
} else {
None
}
})
.collect::<Vec<_>>();

entries.sort_unstable_by_key(|entry| entry.0.to_string_lossy().to_string());
async fn all_tests(#[files("tests/data/adaptive-concurrency/*.toml")] file_path: PathBuf) {
let mut data = String::new();
File::open(&file_path)
.unwrap()
.read_to_string(&mut data)
.unwrap();
let input: TestInput = toml::from_str(&data)
.unwrap_or_else(|error| panic!("Invalid TOML in {:?}: {:?}", file_path, error));

time::pause();

// The first delay takes just slightly longer than all the rest,
// which causes the first test to run differently than all the
// others. Throw in a dummy delay to take up this delay "slack".
// The first delay takes just slightly longer than all the rest, which causes the first
// statistic to be inaccurate. Throw in a dummy delay to take up this delay "slack".
sleep(Duration::from_millis(1)).await;
time::advance(Duration::from_millis(1)).await;

// Then run all the tests
for (file_path, input) in entries {
run_compare(file_path, input).await;
}
run_compare(input).await;
}
7 changes: 7 additions & 0 deletions tests/data/adaptive-concurrency-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ rate.action = "defer" | "drop"

concurrency = "auto"

# Delete any of these that are not needed
adaptive_concurrency.initial_concurrency = 1
adaptive_concurrency.decrease_ratio = 0.9
adaptive_concurrency.ewma_alpha = 0.4
adaptive_concurrency.rtt_deviation_scale = 2.5
adaptive_concurrency.max_concurrency_limit = 200

[stats.in_flight]
max = [,]
mean = [,]
Expand Down

0 comments on commit e172732

Please sign in to comment.