diff --git a/src/sinks/util/adaptive_concurrency/tests.rs b/src/sinks/util/adaptive_concurrency/tests.rs index 37ab6e364ab08..9840ac4fca5ae 100644 --- a/src/sinks/util/adaptive_concurrency/tests.rs +++ b/src/sinks/util/adaptive_concurrency/tests.rs @@ -5,7 +5,7 @@ use core::task::Context; use std::{ collections::{HashMap, VecDeque}, fmt, - fs::{read_dir, File}, + fs::File, future::pending, io::Read, path::PathBuf, @@ -21,6 +21,7 @@ use futures::{ }; use rand::{thread_rng, Rng}; use rand_distr::Exp1; +use rstest::*; use serde::Deserialize; use snafu::Snafu; use tokio::time::{self, sleep, Duration, Instant}; @@ -29,6 +30,7 @@ use vector_lib::configurable::configurable_component; use vector_lib::json_size::JsonSize; use super::controller::ControllerStatistics; +use super::AdaptiveConcurrencySettings; use crate::{ config::{self, AcknowledgementsConfig, Input, SinkConfig, SinkContext}, event::{metric::MetricValue, Event}, @@ -136,6 +138,10 @@ struct TestParams { #[configurable(derived)] #[serde(default = "default_concurrency")] concurrency: Concurrency, + + #[configurable(derived)] + #[serde(default)] + adaptive_concurrency: AdaptiveConcurrencySettings, } const fn default_interval() -> f64 { @@ -418,6 +424,7 @@ async fn run_test(params: TestParams) -> TestResults { rate_limit_num: 9999, timeout_secs: 1, retry_jitter_mode: JitterMode::None, + adaptive_concurrency: params.adaptive_concurrency, ..Default::default() }, params, @@ -617,9 +624,7 @@ struct TestInput { controller: ControllerResults, } -async fn run_compare(file_path: PathBuf, input: TestInput) { - eprintln!("Running test in {:?}", file_path); - +async fn run_compare(input: TestInput) { let results = run_test(input.params).await; let mut failures = Vec::new(); @@ -667,42 +672,23 @@ async fn run_compare(file_path: PathBuf, input: TestInput) { assert!(failures.is_empty(), "{:#?}", results); } +#[rstest] #[tokio::test] -async fn all_tests() { - const PATH: &str = "tests/data/adaptive-concurrency"; - - // Read and parse everything first - let mut entries = read_dir(PATH) - .expect("Could not open data directory") - .map(|entry| entry.expect("Could not read data directory").path()) - .filter_map(|file_path| { - if (file_path.extension().map(|ext| ext == "toml")).unwrap_or(false) { - let mut data = String::new(); - File::open(&file_path) - .unwrap() - .read_to_string(&mut data) - .unwrap(); - let input: TestInput = toml::from_str(&data) - .unwrap_or_else(|error| panic!("Invalid TOML in {:?}: {:?}", file_path, error)); - Some((file_path, input)) - } else { - None - } - }) - .collect::>(); - - entries.sort_unstable_by_key(|entry| entry.0.to_string_lossy().to_string()); +async fn all_tests(#[files("tests/data/adaptive-concurrency/*.toml")] file_path: PathBuf) { + let mut data = String::new(); + File::open(&file_path) + .unwrap() + .read_to_string(&mut data) + .unwrap(); + let input: TestInput = toml::from_str(&data) + .unwrap_or_else(|error| panic!("Invalid TOML in {:?}: {:?}", file_path, error)); time::pause(); - // The first delay takes just slightly longer than all the rest, - // which causes the first test to run differently than all the - // others. Throw in a dummy delay to take up this delay "slack". + // The first delay takes just slightly longer than all the rest, which causes the first + // statistic to be inaccurate. Throw in a dummy delay to take up this delay "slack". sleep(Duration::from_millis(1)).await; time::advance(Duration::from_millis(1)).await; - // Then run all the tests - for (file_path, input) in entries { - run_compare(file_path, input).await; - } + run_compare(input).await; } diff --git a/tests/data/adaptive-concurrency-template.toml b/tests/data/adaptive-concurrency-template.toml index fd2a6f29965e5..7a7830011e146 100644 --- a/tests/data/adaptive-concurrency-template.toml +++ b/tests/data/adaptive-concurrency-template.toml @@ -19,6 +19,13 @@ rate.action = "defer" | "drop" concurrency = "auto" +# Delete any of these that are not needed +adaptive_concurrency.initial_concurrency = 1 +adaptive_concurrency.decrease_ratio = 0.9 +adaptive_concurrency.ewma_alpha = 0.4 +adaptive_concurrency.rtt_deviation_scale = 2.5 +adaptive_concurrency.max_concurrency_limit = 200 + [stats.in_flight] max = [,] mean = [,]