From ec42518ce077eac246739c00f3c4c653ac753236 Mon Sep 17 00:00:00 2001 From: Brandon W Maister Date: Wed, 12 Feb 2025 19:58:15 -0500 Subject: [PATCH 1/5] fix: Improve error messages Re-include the source of the bucket region error (it can be because you're not logged in). And discard duplicate error messages on print, not sure why they happen. --- src/main.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index aba38cd..d9e87d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -142,8 +142,12 @@ fn main() { error!("Failed to run: {}", err); let mut err = err.source(); let mut count = 0; + let mut prev_msg = String::new(); while let Some(e) = err { - eprintln!(" : {}", e); + if e.to_string() != prev_msg { + eprintln!(" : {}", e); + prev_msg = e.to_string(); + } err = e.source(); count += 1; if count > 10 { @@ -330,7 +334,7 @@ async fn create_s3_client(opts: &Opts, bucket: &String) -> Result { .raw_response() .and_then(|res| res.headers().get("x-amz-bucket-region")) .map(str::to_owned) - .ok_or_else(|| anyhow!("failed to extract bucket region"))?, + .ok_or_else(|| anyhow!("failed to extract bucket region: {err}"))?, }; let region = Region::new(bucket_region); From 3fce5b2be0d944b3196c1cf8fc169aeb611efea7 Mon Sep 17 00:00:00 2001 From: Brandon W Maister Date: Wed, 12 Feb 2025 20:48:51 -0500 Subject: [PATCH 2/5] perf: Ensure reasonable concurrency as base for scan Add a min and max prefix count, since prefixes define the level of concurrency for the scan. Checking prefixes for validity is super slow, and running without any concurrency is also super slow. This will hopefully keep things reasonably fast --- src/glob_matcher.rs | 170 +++++++++++++++++++++++++++++-------- src/glob_matcher/engine.rs | 14 +-- src/main.rs | 10 ++- 3 files changed, 148 insertions(+), 46 deletions(-) diff --git a/src/glob_matcher.rs b/src/glob_matcher.rs index a775018..e3c840e 100644 --- a/src/glob_matcher.rs +++ b/src/glob_matcher.rs @@ -10,11 +10,21 @@ use anyhow::{bail, Context as _, Result}; use globset::GlobMatcher; use itertools::Itertools as _; use regex::Regex; -use tracing::{debug, enabled, trace, Level}; +use tracing::{debug, trace}; mod engine; pub use engine::{Engine, S3Engine}; +/// The maximum number of prefixes that can be generated by the glob matcher +/// +/// Checking that constructed prefixes exist is significantly slower than +/// scanning for objects. +const MAX_PREFIXES: usize = 1000; + +/// Parallelism is determined by the number of prefixes, and 50 is much faster +/// than 1 +const DESIRED_MIN_PREFIXES: usize = 50; + /// A thing that knows how to generate and filter S3 prefixes based on a glob pattern #[derive(Debug, Clone)] pub struct S3GlobMatcher { @@ -22,6 +32,7 @@ pub struct S3GlobMatcher { delimiter: char, parts: Vec, glob: GlobMatcher, + min_prefixes: usize, } /// A scanner takes a glob pattern and can efficiently generate a list of S3 @@ -76,6 +87,7 @@ impl S3GlobMatcher { delimiter: delimiter.chars().next().unwrap(), parts: new_parts, glob: glob.compile_matcher(), + min_prefixes: DESIRED_MIN_PREFIXES, }) } @@ -111,12 +123,17 @@ impl S3GlobMatcher { let mut prev_part = None; let mut part_iter = self.parts.iter().enumerate(); for (i, part) in &mut part_iter { - // only included prefixes in trace logs - if enabled!(Level::TRACE) { - trace!(new_part = %part.display(), %regex_so_far, ?prefixes, "scanning for part"); - } else { - debug!(new_part = %part.display(), %regex_so_far, "scanning for part"); + if prefixes.len() >= MAX_PREFIXES { + debug!( + new_prefix_count = prefixes.len(), + "We have over {MAX_PREFIXES} prefixes, stopping." + ); + break; } + // only included prefixes in trace logs + trace!(?prefixes, "scanning for part"); + debug!(new_part = %part.display(), %regex_so_far, prefix_count = prefixes.len(), "scanning for part"); + eprint!("\rDiscovering prefixes: {} ", prefixes.len()); // We always want to scan for things including the last part, // finding more prefixes in it is guaranteed to be slower than // just searching because we have to do an api call to check each @@ -140,6 +157,13 @@ impl S3GlobMatcher { debug!(part = %part.display(), "scanning for keys in an Any"); let mut new_prefixes = Vec::new(); for prefix in &prefixes { + if new_prefixes.len() >= MAX_PREFIXES { + debug!( + new_prefix_count = new_prefixes.len(), + "scanning for Any generated over 1,000 prefixes, stopping." + ); + break; + } let np = engine.scan_prefixes(prefix, &delimiter).await?; trace!(%prefix, found = ?np, pattern = %part.display(), "extending prefixes for Any"); new_prefixes.extend(np); @@ -180,11 +204,18 @@ impl S3GlobMatcher { if is_simple_append { debug!(allowed = %allowed.join(","), "simple append"); let mut new_prefixes = Vec::with_capacity(prefixes.len() * allowed.len()); - for prefix in prefixes { + for prefix in &prefixes { for alt in allowed { - new_prefixes.push(prefix_join(&prefix, alt)); + new_prefixes.push(prefix_join(prefix, alt)); } } + if new_prefixes.len() >= MAX_PREFIXES { + debug!( + new_prefix_count = new_prefixes.len(), + "simple append generated over 1,000 prefixes, using prefixes from previous part" + ); + break; + } prefixes = engine.check_prefixes(&new_prefixes).await?; } else { // Build up the filters and appends @@ -225,24 +256,24 @@ impl S3GlobMatcher { debug!("no filters, appending"); let mut new_prefixes = Vec::with_capacity(prefixes.len() * appends.len()); - for prefix in prefixes { + for prefix in &prefixes { for alt in &appends { - new_prefixes.push(prefix_join(&prefix, alt)); + new_prefixes.push(prefix_join(prefix, alt)); } } new_prefixes } else { debug!("filtering and appending"); let mut new_prefixes = Vec::with_capacity(prefixes.len()); - for prefix in prefixes { - if filter.is_match(&prefix) { + for prefix in &prefixes { + if filter.is_match(prefix) { // we only need to append if it's not already matched - if !appends.is_empty() && !append_matcher.is_match(&prefix) { + if !appends.is_empty() && !append_matcher.is_match(prefix) { for alt in &appends { - new_prefixes.push(prefix_join(&prefix, alt)); + new_prefixes.push(prefix_join(prefix, alt)); } } else { - new_prefixes.push(prefix); + new_prefixes.push(prefix.to_string()); } } } @@ -251,7 +282,16 @@ impl S3GlobMatcher { }; if !appends.is_empty() { - trace!(prefixes = ?new_prefixes, "checking appended prefixes"); + if new_prefixes.len() >= MAX_PREFIXES { + // checking prefixes is significantly slower + // than scanning existing prefixes. + debug!( + new_prefix_count = new_prefixes.len(), + "Appends generated over 1,000 prefixes, using prefixes from previous part" + ); + break; + } + trace!(new_prefixes = ?new_prefixes, new_prefix_count = new_prefixes.len(), "checking appended prefixes"); prefixes = engine.check_prefixes(&new_prefixes).await?; } else { debug!("no appends, using new prefixes"); @@ -271,12 +311,47 @@ impl S3GlobMatcher { prev_part = Some(part); } + // parallelism is determined by the number of prefixes, so we need to + // extend the list until we have several. 50 is generally pretty fast, + // but going more than 5 layers deep feels excessive + let mut extensions = 0; + while !prefixes.is_empty() && prefixes.len() < self.min_prefixes && extensions < 5 { + eprint!("\rDiscovering prefixes: {} ", prefixes.len()); + debug!( + prefix_count = prefixes.len(), + extensions, "extending prefixes because there are less than 50" + ); + trace!( + ?prefixes, + "extending prefixes because there are less than 50" + ); + extensions += 1; + let mut new_prefixes = Vec::new(); + for prefix in &prefixes { + let np = engine.scan_prefixes(prefix, &delimiter).await?; + for p in np { + if self.is_match(&p) { + new_prefixes.push(p); + } + } + } + if prefixes == new_prefixes { + break; + } + prefixes = new_prefixes; + } + eprintln!("\rDiscovered prefixes: {} ", prefixes.len()); Ok(prefixes) } pub fn is_match(&self, path: &str) -> bool { self.glob.is_match(path) } + + #[cfg(test)] + fn set_min_prefixes(&mut self, min_prefixes: usize) { + self.min_prefixes = min_prefixes; + } } fn prefix_join(prefix: &str, alt: &str) -> String { @@ -625,7 +700,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_literal() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/foo/bar".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/foo/bar".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec!["src/foo/bar".to_string()]); let prefixes = scanner.find_prefixes(&mut engine).await?; @@ -638,7 +714,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_no_any() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{foo,bar}/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/{foo,bar}/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), "src/bar/baz".to_string(), @@ -655,7 +732,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_with_any() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{foo,bar}*/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/{foo,bar}*/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); println!("scanner_parts for {}:\n{:?}", scanner.raw, scanner.parts); let mut engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), @@ -673,7 +751,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_star() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/*/main.rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/*/main.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "src/foo/main.rs".to_string(), "src/bar/main.rs".to_string(), @@ -690,7 +769,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_recursive() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/**/test.rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/**/test.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "src/test.rs".to_string(), "src/foo/test.rs".to_string(), @@ -709,7 +789,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_character_class() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/[abc]*.rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/[abc]*.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], Choice(vec!["src/a", "src/b", "src/c"])); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice(".rs")); @@ -728,7 +809,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_then_any() -> Result<()> { - let scanner = S3GlobMatcher::parse("literal/{foo,bar}*/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/{foo,bar}*/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); println!("scanner_parts for {}:\n{:#?}", scanner.raw, scanner.parts); assert_scanner_part!( @@ -761,7 +843,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_any_literal() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("literal/{foo,bar}*quux/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/{foo,bar}*quux/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!( &scanner.parts[0], @@ -787,7 +870,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_any_then_alternation() -> Result<()> { - let scanner = S3GlobMatcher::parse("literal/*{foo,bar}/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/*{foo,bar}/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/")); assert_scanner_part!(&scanner.parts[1], Any("*")); @@ -808,7 +892,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_literal_any_alternation() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("literal/quux*{foo,bar}/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/quux*{foo,bar}/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/quux")); assert_scanner_part!(&scanner.parts[1], Any("*")); @@ -828,7 +913,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_any_after_last_delimiter() -> Result<()> { - let scanner = S3GlobMatcher::parse("literal/baz*.rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/baz*.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/baz")); assert_scanner_part!(&scanner.parts[1], Any("*")); @@ -857,7 +943,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_any_and_character_class() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("literal/baz*[ab].rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/baz*[ab].rs".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/baz")); assert_scanner_part!(&scanner.parts[1], Any("*")); @@ -878,7 +965,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_empty_alternative() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{,tmp}/file".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/{,tmp}/file".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), @@ -898,7 +986,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_empty_alternative_with_delimiter() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{,tmp/}file".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/{,tmp/}file".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), @@ -914,7 +1003,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_with_delimiter() -> Result<()> { - let scanner = S3GlobMatcher::parse("src/{foo/bar,baz}/test".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/{foo/bar,baz}/test".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!( &scanner.parts[0], @@ -938,7 +1028,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_start() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("[!a]*/foo".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("[!a]*/foo".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "b/foo".to_string(), "c/foo".to_string(), @@ -955,7 +1046,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_after_wildcard() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("*[!f]oo".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("*[!f]oo".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "zoo".to_string(), "boo".to_string(), @@ -972,7 +1064,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_between_alternations() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("{foo,bar}[!z]*/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("{foo,bar}[!z]*/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "foo-abc/baz".to_string(), "bar-def/baz".to_string(), @@ -990,7 +1083,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_multiple_negative_classes() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("[!a]*[!b]/foo".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("[!a]*[!b]/foo".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "c-x/foo".to_string(), "d-y/foo".to_string(), @@ -1009,7 +1103,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_with_delimiter() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("foo/[!/]/bar".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("foo/[!/]/bar".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("foo/")); assert_scanner_part!(&scanner.parts[1], Any("[!/]")); assert_scanner_part!(&scanner.parts[2], OneChoice("/bar")); @@ -1031,7 +1126,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_complex_negative_pattern() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("*{foo,bar}*[!Z]/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("*{foo,bar}*[!Z]/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); let mut engine = MockS3Engine::new(vec![ "x-foo-a/baz".to_string(), "y-bar-b/baz".to_string(), diff --git a/src/glob_matcher/engine.rs b/src/glob_matcher/engine.rs index 25bf367..82ea841 100644 --- a/src/glob_matcher/engine.rs +++ b/src/glob_matcher/engine.rs @@ -114,12 +114,13 @@ impl Engine for MockS3Engine { self.calls.push((prefix.to_string(), delimiter.to_string())); let found = self.scan_prefixes_inner(prefix, delimiter); - info!(prefix, ?found, "mocks3 found prefixes"); + info!(prefix, ?found, "MockS3 found prefixes"); found } async fn check_prefixes(&mut self, prefixes: &[String]) -> Result> { + info!(?prefixes, "MockS3 checking prefixes"); let mut valid_prefixes = Vec::new(); for prefix in prefixes { @@ -147,9 +148,12 @@ impl MockS3Engine { } pub fn assert_calls(&self, expected: &[(impl AsRef, impl AsRef)]) { + info!("MockS3 asserting calls"); + let calls = &self.calls; for (i, ((actual_prefix, actual_delim), (expected_prefix, expected_delim))) in - self.calls.iter().zip(expected).enumerate() + calls.iter().zip(expected).enumerate() { + info!("MockS3 asserting call {i}"); let i = i + 1; assert!( actual_prefix == expected_prefix.as_ref(), @@ -167,11 +171,11 @@ impl MockS3Engine { ); } assert!( - self.calls.len() == expected.len(), + calls.len() == expected.len(), "Got {} calls, expected {}. Actual calls: {:#?}", - self.calls.len(), + calls.len(), expected.len(), - self.calls + calls ); } diff --git a/src/main.rs b/src/main.rs index d9e87d8..60c5067 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use std::io::{IsTerminal as _, Write as _}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Context as _, Result}; use aws_config::meta::region::RegionProviderChain; @@ -162,6 +162,7 @@ fn main() { } async fn run(opts: Opts) -> Result<()> { + let start = Instant::now(); let pat = match &opts.command { Command::List { pattern, .. } | Command::Download { pattern, .. } => pattern, }; @@ -223,7 +224,7 @@ async fn run(opts: Opts) -> Result<()> { let seen_prefixes = add_atomic(&seen_prefixes, 1); eprint!( "\rmatches/total {:>4}/{:<10} prefixes/total {:>4}/{:<4}", - match_count, + match_count.to_formatted_string(&Locale::en), total_objects.to_formatted_string(&Locale::en), seen_prefixes, total_prefixes @@ -257,10 +258,11 @@ async fn run(opts: Opts) -> Result<()> { } } eprintln!( - "Found {} matching objects out of {} scanned in {} prefixes", + "Matched {}/{} objects across {} prefixes in {:?}", objects.len(), total_objects.load(Ordering::Relaxed), - total_prefixes + total_prefixes, + Duration::from_millis(start.elapsed().as_millis() as u64) ); } Command::Download { dest, .. } => { From ab4aba0931f13b18ce2477761dc2f967232e1ae1 Mon Sep 17 00:00:00 2001 From: Brandon W Maister Date: Wed, 12 Feb 2025 21:27:29 -0500 Subject: [PATCH 3/5] perf: Parallelize "directory" scanning for prefixes --- src/glob_matcher.rs | 140 +++++++++++++++++++++++-------------- src/glob_matcher/engine.rs | 21 ++++-- src/main.rs | 4 +- 3 files changed, 105 insertions(+), 60 deletions(-) diff --git a/src/glob_matcher.rs b/src/glob_matcher.rs index e3c840e..e1b3385 100644 --- a/src/glob_matcher.rs +++ b/src/glob_matcher.rs @@ -115,7 +115,7 @@ impl S3GlobMatcher { /// 4. Search for all folders in ["foo/bar", "foo/baz"] /// 4. Append "qux" -> ["foo/bar/qux", "foo/baz/qux"] /// 5. Filter by "*" -> keep prefixes whose last component starts with "qux" - pub async fn find_prefixes(&self, engine: &mut impl Engine) -> Result> { + pub async fn find_prefixes(&self, mut engine: impl Engine + Clone) -> Result> { debug!("finding prefixes for {}", self.raw); let mut prefixes = vec!["".to_string()]; let delimiter = self.delimiter.to_string(); @@ -155,20 +155,58 @@ impl S3GlobMatcher { !matches!(prev_part, Some(&Glob::Any { .. })) && !is_last_part; if scan_might_help { debug!(part = %part.display(), "scanning for keys in an Any"); - let mut new_prefixes = Vec::new(); + let (tx, mut rx) = tokio::sync::mpsc::channel(prefixes.len()); + + let mut tasks = Vec::new(); for prefix in &prefixes { - if new_prefixes.len() >= MAX_PREFIXES { + let client_prefix = prefix.clone(); + let delimiter = delimiter.clone(); + let tx = tx.clone(); + let mut engine = engine.clone(); + let prefix = prefix.clone(); + + let task = tokio::spawn(async move { + match engine.scan_prefixes(&client_prefix, &delimiter).await { + Ok(results) => { + let _ = tx.send(Ok((prefix, results))).await; + } + Err(e) => { + let _ = tx.send(Err(e)).await; + } + } + }); + tasks.push(task); + } + + drop(tx); + + let mut new_prefixes = Vec::new(); + let mut new_prefix_count = new_prefixes.len(); + while let Some(result) = rx.recv().await { + let (scanned_prefix, results) = result.context("scanning prefixes")?; + let result_len = results.len(); + new_prefixes.extend(results); + trace!( + scanned_prefix, + scanned_results = result_len, + new_prefix_count, + "Scanning for any, got result from task" + ); + new_prefix_count = new_prefixes.len(); + if new_prefix_count >= MAX_PREFIXES { debug!( - new_prefix_count = new_prefixes.len(), - "scanning for Any generated over 1,000 prefixes, stopping." + new_prefix_count, + "Scanning for any, found too many prefixes, aborting" ); + for task in tasks { + task.abort(); + } break; } - let np = engine.scan_prefixes(prefix, &delimiter).await?; - trace!(%prefix, found = ?np, pattern = %part.display(), "extending prefixes for Any"); - new_prefixes.extend(np); } - prefixes = new_prefixes; + if new_prefix_count < MAX_PREFIXES { + prefixes = new_prefixes; + } } if part.is_negated() { // if this part is a negated character class then we should filter @@ -702,9 +740,9 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("src/foo/bar".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec!["src/foo/bar".to_string()]); + let engine = MockS3Engine::new(vec!["src/foo/bar".to_string()]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/bar"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -716,13 +754,13 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("src/{foo,bar}/baz".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), "src/bar/baz".to_string(), "src/qux/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/baz", "src/bar/baz"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -735,14 +773,14 @@ mod tests { let mut scanner = S3GlobMatcher::parse("src/{foo,bar}*/baz".to_string(), "/")?; scanner.set_min_prefixes(0); println!("scanner_parts for {}:\n{:?}", scanner.raw, scanner.parts); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), "src/bar/baz".to_string(), "src/foo-quux/baz".to_string(), "src/qux/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("src/foo", "/"), ("src/bar", "/")]); assert!(prefixes == vec!["src/foo-quux/baz", "src/foo/baz", "src/bar/baz",]); Ok(()) @@ -753,14 +791,14 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("src/*/main.rs".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/main.rs".to_string(), "src/bar/main.rs".to_string(), "src/baz/other.rs".to_string(), ]); info!(?engine.paths); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/bar/main.rs", "src/foo/main.rs"]); engine.assert_calls(&[("src/", "/")]); Ok(()) @@ -771,14 +809,14 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("src/**/test.rs".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/test.rs".to_string(), "src/foo/test.rs".to_string(), "src/foo/bar/test.rs".to_string(), "src/other.rs".to_string(), ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; // Should stop at src/ since ** matches anything after assert!(prefixes == vec!["src/"]); let e: &[(&str, &str)] = &[]; @@ -794,14 +832,14 @@ mod tests { assert_scanner_part!(&scanner.parts[0], Choice(vec!["src/a", "src/b", "src/c"])); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice(".rs")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/abc.rs".to_string(), "src/baz.rs".to_string(), "src/cat.rs".to_string(), "src/dog.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("src/a", "/"), ("src/b", "/"), ("src/c", "/")]); assert!(prefixes == vec!["src/abc.rs", "src/baz.rs", "src/cat.rs"]); Ok(()) @@ -820,14 +858,14 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice("/baz")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/bar-stuff/baz".to_string(), "literal/foo-extra/baz".to_string(), "literal/foo/baz".to_string(), "literal/other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/foo", "/"), ("literal/bar", "/")]); assert!( prefixes @@ -853,7 +891,7 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice("quux/baz")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/foo-quux/baz".to_string(), "literal/bar-quux/baz".to_string(), // Should be filtered out @@ -862,7 +900,7 @@ mod tests { "literal/foo-quux-bar/baz".to_string(), ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["literal/foo-quux/baz", "literal/bar-quux/baz"]); engine.assert_calls(&[("literal/foo", "/"), ("literal/bar", "/")]); Ok(()) @@ -877,13 +915,13 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["foo/baz", "bar/baz"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/something-foo/baz".to_string(), "literal/other-bar/baz".to_string(), "literal/not-match/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/", "/")]); assert!(prefixes == vec!["literal/other-bar/baz", "literal/something-foo/baz"]); Ok(()) @@ -899,13 +937,13 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["foo/baz", "bar/baz"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/quux-foo/baz".to_string(), "literal/quux-something-bar/baz".to_string(), "literal/quux-other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/quux", "/")]); assert!(prefixes == vec!["literal/quux-foo/baz", "literal/quux-something-bar/baz"]); Ok(()) @@ -920,14 +958,14 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice(".rs")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/baz.rs".to_string(), "literal/baz-extra.rs".to_string(), "literal/bazinga.rs".to_string(), "literal/other.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/baz", "/")]); assert!( prefixes @@ -950,13 +988,13 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["a.rs", "b.rs"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/baz-a.rs".to_string(), "literal/baz-extra-b.rs".to_string(), "literal/baz-c.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/baz", "/")]); assert!(prefixes == vec!["literal/baz-a.rs", "literal/baz-extra-b.rs"]); Ok(()) @@ -967,13 +1005,13 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("src/{,tmp}/file".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), "src/other/file".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; // TODO: there is a legitimate case that this should only be // src/tmp/file, but we strip double forward slashes to work around // minio @@ -988,13 +1026,13 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("src/{,tmp/}file".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), "src/other/file".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/file", "src/tmp/file"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -1011,14 +1049,14 @@ mod tests { Choice(vec!["src/foo/bar/test", "src/baz/test"]) ); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/bar/test".to_string(), "src/baz/test".to_string(), "src/foo/test".to_string(), // Should be filtered out "src/foo/baz/test".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/bar/test", "src/baz/test"]); let e: &[(&str, &str)] = &[]; // No API calls needed since alternation is static engine.assert_calls(e); @@ -1030,14 +1068,14 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("[!a]*/foo".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "b/foo".to_string(), "c/foo".to_string(), "xyz/foo".to_string(), "a/foo".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["b/foo", "c/foo", "xyz/foo"]); engine.assert_calls(&[("", "/")]); Ok(()) @@ -1048,14 +1086,14 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("*[!f]oo".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "zoo".to_string(), "boo".to_string(), "foo".to_string(), // Should be filtered out "something/foo".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["boo", "zoo"]); // TODO: this could be improved to only call the engine once Ok(()) @@ -1066,7 +1104,7 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("{foo,bar}[!z]*/baz".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "foo-abc/baz".to_string(), "bar-def/baz".to_string(), "fooz/baz".to_string(), // Should be filtered out @@ -1074,7 +1112,7 @@ mod tests { "other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["foo-abc/baz", "bar-def/baz"]); engine.assert_calls(&[("foo", "/"), ("bar", "/")]); Ok(()) @@ -1085,7 +1123,7 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("[!a]*[!b]/foo".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "c-x/foo".to_string(), "d-y/foo".to_string(), // filtered out @@ -1094,7 +1132,7 @@ mod tests { "c-b/foo".to_string(), // (second part starts with b) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["c-x/foo", "d-y/foo"]); engine.assert_calls(&[("", "/")]); Ok(()) @@ -1109,7 +1147,7 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("[!/]")); assert_scanner_part!(&scanner.parts[2], OneChoice("/bar")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "foo/x/bar".to_string(), "foo/a/bar".to_string(), "foo//bar".to_string(), // Should be filtered out @@ -1117,7 +1155,7 @@ mod tests { "foo///bar".to_string(), // Should be filtered out (the excluded char) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["foo/a/bar", "foo/x/bar"]); engine.assert_calls(&[("foo/", "/")]); Ok(()) @@ -1128,7 +1166,7 @@ mod tests { setup_logging(Some("s3glob=trace")); let mut scanner = S3GlobMatcher::parse("*{foo,bar}*[!Z]/baz".to_string(), "/")?; scanner.set_min_prefixes(0); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "x-foo-a/baz".to_string(), "y-bar-b/baz".to_string(), // filtered out @@ -1137,7 +1175,7 @@ mod tests { "x-baz-a/baz".to_string(), // (middle not foo/bar) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["x-foo-a/baz", "y-bar-b/baz"]); // TODO: this could be improved to only call the engine once Ok(()) diff --git a/src/glob_matcher/engine.rs b/src/glob_matcher/engine.rs index 82ea841..c2dc0e1 100644 --- a/src/glob_matcher/engine.rs +++ b/src/glob_matcher/engine.rs @@ -5,14 +5,17 @@ use tracing::{debug, trace}; #[cfg(test)] use std::collections::BTreeSet; #[cfg(test)] +use std::sync::{Arc, Mutex}; +#[cfg(test)] use tracing::info; #[async_trait::async_trait] -pub trait Engine { +pub trait Engine: Send + Sync + 'static { async fn scan_prefixes(&mut self, prefix: &str, delimiter: &str) -> Result>; async fn check_prefixes(&mut self, prefixes: &[String]) -> Result>; } +#[derive(Debug, Clone)] pub struct S3Engine { client: Client, bucket: String, @@ -102,16 +105,20 @@ impl Engine for S3Engine { /// A test engine that simulates a real S3 bucket with a set of paths #[cfg(test)] +#[derive(Debug, Clone)] pub(super) struct MockS3Engine { - pub paths: Vec, - pub calls: Vec<(String, String)>, // (prefix, delimiter) pairs + pub paths: Arc>, + pub calls: Arc>>, // (prefix, delimiter) pairs } #[cfg(test)] #[async_trait::async_trait] impl Engine for MockS3Engine { async fn scan_prefixes(&mut self, prefix: &str, delimiter: &str) -> Result> { - self.calls.push((prefix.to_string(), delimiter.to_string())); + self.calls + .lock() + .unwrap() + .push((prefix.to_string(), delimiter.to_string())); let found = self.scan_prefixes_inner(prefix, delimiter); info!(prefix, ?found, "MockS3 found prefixes"); @@ -142,14 +149,14 @@ impl Engine for MockS3Engine { impl MockS3Engine { pub fn new(paths: Vec) -> Self { Self { - paths, - calls: Vec::new(), + paths: Arc::new(paths), + calls: Arc::new(Mutex::new(Vec::new())), } } pub fn assert_calls(&self, expected: &[(impl AsRef, impl AsRef)]) { info!("MockS3 asserting calls"); - let calls = &self.calls; + let calls = &self.calls.lock().unwrap(); for (i, ((actual_prefix, actual_delim), (expected_prefix, expected_delim))) in calls.iter().zip(expected).enumerate() { diff --git a/src/main.rs b/src/main.rs index 60c5067..5b1e047 100644 --- a/src/main.rs +++ b/src/main.rs @@ -183,9 +183,9 @@ async fn run(opts: Opts) -> Result<()> { .find(['*', '?', '[', '{']) .map_or(raw_pattern.clone(), |i| raw_pattern[..i].to_owned()); - let mut engine = S3Engine::new(client.clone(), bucket.clone(), opts.delimiter.to_string()); + let engine = S3Engine::new(client.clone(), bucket.clone(), opts.delimiter.to_string()); let matcher = S3GlobMatcher::parse(raw_pattern, &opts.delimiter.to_string())?; - let mut prefixes = matcher.find_prefixes(&mut engine).await?; + let mut prefixes = matcher.find_prefixes(engine).await?; trace!(?prefixes, "matcher generated prefixes"); debug!(prefix_count = prefixes.len(), "matcher generated prefixes"); From 626c6c339eefbc2e28a8cf9f04954639e56034ee Mon Sep 17 00:00:00 2001 From: Brandon W Maister Date: Wed, 12 Feb 2025 21:44:09 -0500 Subject: [PATCH 4/5] chore: Add cargo nextest test timeouts --- .config/nextest.toml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .config/nextest.toml diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 0000000..a9398e4 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,2 @@ +[profile.default] +slow-timeout = { period = "20s", terminate-after = 3 } From 50fcd209b23af3e800e3d6b14d46c4a57f020f6b Mon Sep 17 00:00:00 2001 From: Brandon W Maister Date: Wed, 12 Feb 2025 22:39:58 -0500 Subject: [PATCH 5/5] fix: Handle globs at the end of keys better Searching for prefixes when there is no following delimiter seems to return nothing, which makes sense. --- src/glob_matcher.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/glob_matcher.rs b/src/glob_matcher.rs index e1b3385..af6ea8d 100644 --- a/src/glob_matcher.rs +++ b/src/glob_matcher.rs @@ -204,9 +204,18 @@ impl S3GlobMatcher { break; } } + // don't overwrite if we gave up early if new_prefix_count < MAX_PREFIXES { prefixes = new_prefixes; } + // TODO: Do we need to make sure that every prefix in + // the old prefixes is a prefix in the new ones? + if new_prefix_count < prefixes.len() { + // this means that something after the any matched + // nothing, which I think always means that we have hit the last delimiter + debug!("Any scan decreased the number of prefixes, aborting. (Probably hit the last delimiter.)"); + break; + } } if part.is_negated() { // if this part is a negated character class then we should filter