diff --git a/src/glob_matcher.rs b/src/glob_matcher.rs index cc9b29e..3d9004c 100644 --- a/src/glob_matcher.rs +++ b/src/glob_matcher.rs @@ -113,14 +113,13 @@ impl S3GlobMatcher { /// 4. Search for all folders in ["foo/bar", "foo/baz"] /// 4. Append "qux" -> ["foo/bar/qux", "foo/baz/qux"] /// 5. Filter by "*" -> keep prefixes whose last component starts with "qux" - pub async fn find_prefixes(&self, engine: &mut impl Engine) -> Result> { + pub async fn find_prefixes(&self, mut engine: impl Engine + Clone) -> Result> { debug!("finding prefixes for {}", self.raw); let mut prefixes = vec!["".to_string()]; let delimiter = self.delimiter.to_string(); let mut regex_so_far = "^".to_string(); let mut prev_part = None; let mut part_iter = self.parts.iter().enumerate(); - let mut next_scan_is_slow = false; for (i, part) in &mut part_iter { if prefixes.len() >= MAX_PREFIXES { debug!( @@ -154,19 +153,43 @@ impl S3GlobMatcher { !matches!(prev_part, Some(&Glob::Any { .. })) && !is_last_part; if scan_might_help { debug!(part = %part.display(), "scanning for keys in an Any"); - let mut new_prefixes = Vec::new(); + let (tx, mut rx) = tokio::sync::mpsc::channel(prefixes.len()); + + let mut tasks = Vec::new(); for prefix in &prefixes { + let client_prefix = prefix.clone(); + let delimiter = delimiter.clone(); + let tx = tx.clone(); + let mut engine = engine.clone(); + + let task = tokio::spawn(async move { + match engine.scan_prefixes(&client_prefix, &delimiter).await { + Ok(results) => { + let _ = tx.send(Ok(results)).await; + } + Err(e) => { + let _ = tx.send(Err(e)).await; + } + } + }); + tasks.push(task); + } + + drop(tx); + + let mut new_prefixes = Vec::new(); + while let Some(result) = rx.recv().await { + new_prefixes.extend(result.context("scanning prefixes")?); if new_prefixes.len() >= MAX_PREFIXES { debug!( new_prefix_count = new_prefixes.len(), "scanning for Any generated over 1,000 prefixes, stopping." ); - next_scan_is_slow = true; + for task in tasks { + task.abort(); + } break; } - let np = engine.scan_prefixes(prefix, &delimiter).await?; - trace!(%prefix, found = ?np, pattern = %part.display(), "extending prefixes for Any"); - new_prefixes.extend(np); } prefixes = new_prefixes; } @@ -315,7 +338,7 @@ impl S3GlobMatcher { // extend the list until we have several. 50 is generally pretty fast, // but going more than 5 layers deep feels excessive let mut extensions = 0; - while prefixes.len() < DESIRED_MIN_PREFIXES && extensions < 5 && !next_scan_is_slow { + while prefixes.len() < DESIRED_MIN_PREFIXES && extensions < 5 { eprint!("\rDiscovering prefixes: {} ", prefixes.len()); debug!( prefix_count = prefixes.len(), @@ -693,9 +716,9 @@ mod tests { async fn test_find_prefixes_literal() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("src/foo/bar".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec!["src/foo/bar".to_string()]); + let engine = MockS3Engine::new(vec!["src/foo/bar".to_string()]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/bar"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -706,13 +729,13 @@ mod tests { async fn test_find_prefixes_alternation_no_any() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("src/{foo,bar}/baz".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), "src/bar/baz".to_string(), "src/qux/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/baz", "src/bar/baz"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -724,14 +747,14 @@ mod tests { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("src/{foo,bar}*/baz".to_string(), "/")?; println!("scanner_parts for {}:\n{:?}", scanner.raw, scanner.parts); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), "src/bar/baz".to_string(), "src/foo-quux/baz".to_string(), "src/qux/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("src/foo", "/"), ("src/bar", "/")]); assert!(prefixes == vec!["src/foo-quux/baz", "src/foo/baz", "src/bar/baz",]); Ok(()) @@ -741,14 +764,14 @@ mod tests { async fn test_find_prefixes_star() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("src/*/main.rs".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/main.rs".to_string(), "src/bar/main.rs".to_string(), "src/baz/other.rs".to_string(), ]); info!(?engine.paths); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/bar/main.rs", "src/foo/main.rs"]); engine.assert_calls(&[("src/", "/")]); Ok(()) @@ -758,14 +781,14 @@ mod tests { async fn test_find_prefixes_recursive() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("src/**/test.rs".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/test.rs".to_string(), "src/foo/test.rs".to_string(), "src/foo/bar/test.rs".to_string(), "src/other.rs".to_string(), ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; // Should stop at src/ since ** matches anything after assert!(prefixes == vec!["src/"]); let e: &[(&str, &str)] = &[]; @@ -780,14 +803,14 @@ mod tests { assert_scanner_part!(&scanner.parts[0], Choice(vec!["src/a", "src/b", "src/c"])); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice(".rs")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/abc.rs".to_string(), "src/baz.rs".to_string(), "src/cat.rs".to_string(), "src/dog.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("src/a", "/"), ("src/b", "/"), ("src/c", "/")]); assert!(prefixes == vec!["src/abc.rs", "src/baz.rs", "src/cat.rs"]); Ok(()) @@ -805,14 +828,14 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice("/baz")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/bar-stuff/baz".to_string(), "literal/foo-extra/baz".to_string(), "literal/foo/baz".to_string(), "literal/other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/foo", "/"), ("literal/bar", "/")]); assert!( prefixes @@ -837,7 +860,7 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice("quux/baz")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/foo-quux/baz".to_string(), "literal/bar-quux/baz".to_string(), // Should be filtered out @@ -846,7 +869,7 @@ mod tests { "literal/foo-quux-bar/baz".to_string(), ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["literal/foo-quux/baz", "literal/bar-quux/baz"]); engine.assert_calls(&[("literal/foo", "/"), ("literal/bar", "/")]); Ok(()) @@ -860,13 +883,13 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["foo/baz", "bar/baz"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/something-foo/baz".to_string(), "literal/other-bar/baz".to_string(), "literal/not-match/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/", "/")]); assert!(prefixes == vec!["literal/other-bar/baz", "literal/something-foo/baz"]); Ok(()) @@ -881,13 +904,13 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["foo/baz", "bar/baz"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/quux-foo/baz".to_string(), "literal/quux-something-bar/baz".to_string(), "literal/quux-other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/quux", "/")]); assert!(prefixes == vec!["literal/quux-foo/baz", "literal/quux-something-bar/baz"]); Ok(()) @@ -901,14 +924,14 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice(".rs")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/baz.rs".to_string(), "literal/baz-extra.rs".to_string(), "literal/bazinga.rs".to_string(), "literal/other.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/baz", "/")]); assert!( prefixes @@ -930,13 +953,13 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["a.rs", "b.rs"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/baz-a.rs".to_string(), "literal/baz-extra-b.rs".to_string(), "literal/baz-c.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/baz", "/")]); assert!(prefixes == vec!["literal/baz-a.rs", "literal/baz-extra-b.rs"]); Ok(()) @@ -946,13 +969,13 @@ mod tests { async fn test_find_prefixes_empty_alternative() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("src/{,tmp}/file".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), "src/other/file".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; // TODO: there is a legitimate case that this should only be // src/tmp/file, but we strip double forward slashes to work around // minio @@ -966,13 +989,13 @@ mod tests { async fn test_find_prefixes_empty_alternative_with_delimiter() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("src/{,tmp/}file".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), "src/other/file".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/file", "src/tmp/file"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -988,14 +1011,14 @@ mod tests { Choice(vec!["src/foo/bar/test", "src/baz/test"]) ); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/bar/test".to_string(), "src/baz/test".to_string(), "src/foo/test".to_string(), // Should be filtered out "src/foo/baz/test".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/bar/test", "src/baz/test"]); let e: &[(&str, &str)] = &[]; // No API calls needed since alternation is static engine.assert_calls(e); @@ -1006,14 +1029,14 @@ mod tests { async fn test_find_prefixes_negative_class_start() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("[!a]*/foo".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "b/foo".to_string(), "c/foo".to_string(), "xyz/foo".to_string(), "a/foo".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["b/foo", "c/foo", "xyz/foo"]); engine.assert_calls(&[("", "/")]); Ok(()) @@ -1023,14 +1046,14 @@ mod tests { async fn test_find_prefixes_negative_class_after_wildcard() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("*[!f]oo".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "zoo".to_string(), "boo".to_string(), "foo".to_string(), // Should be filtered out "something/foo".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["boo", "zoo"]); // TODO: this could be improved to only call the engine once Ok(()) @@ -1040,7 +1063,7 @@ mod tests { async fn test_find_prefixes_negative_class_between_alternations() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("{foo,bar}[!z]*/baz".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "foo-abc/baz".to_string(), "bar-def/baz".to_string(), "fooz/baz".to_string(), // Should be filtered out @@ -1048,7 +1071,7 @@ mod tests { "other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["foo-abc/baz", "bar-def/baz"]); engine.assert_calls(&[("foo", "/"), ("bar", "/")]); Ok(()) @@ -1058,7 +1081,7 @@ mod tests { async fn test_find_prefixes_multiple_negative_classes() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("[!a]*[!b]/foo".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "c-x/foo".to_string(), "d-y/foo".to_string(), // filtered out @@ -1067,7 +1090,7 @@ mod tests { "c-b/foo".to_string(), // (second part starts with b) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["c-x/foo", "d-y/foo"]); engine.assert_calls(&[("", "/")]); Ok(()) @@ -1081,7 +1104,7 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("[!/]")); assert_scanner_part!(&scanner.parts[2], OneChoice("/bar")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "foo/x/bar".to_string(), "foo/a/bar".to_string(), "foo//bar".to_string(), // Should be filtered out @@ -1089,7 +1112,7 @@ mod tests { "foo///bar".to_string(), // Should be filtered out (the excluded char) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["foo/a/bar", "foo/x/bar"]); engine.assert_calls(&[("foo/", "/")]); Ok(()) @@ -1099,7 +1122,7 @@ mod tests { async fn test_find_prefixes_complex_negative_pattern() -> Result<()> { setup_logging(Some("s3glob=trace")); let scanner = S3GlobMatcher::parse("*{foo,bar}*[!Z]/baz".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "x-foo-a/baz".to_string(), "y-bar-b/baz".to_string(), // filtered out @@ -1108,7 +1131,7 @@ mod tests { "x-baz-a/baz".to_string(), // (middle not foo/bar) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["x-foo-a/baz", "y-bar-b/baz"]); // TODO: this could be improved to only call the engine once Ok(()) diff --git a/src/glob_matcher/engine.rs b/src/glob_matcher/engine.rs index 25bf367..8e6c7ca 100644 --- a/src/glob_matcher/engine.rs +++ b/src/glob_matcher/engine.rs @@ -5,14 +5,17 @@ use tracing::{debug, trace}; #[cfg(test)] use std::collections::BTreeSet; #[cfg(test)] +use std::sync::{Arc, Mutex}; +#[cfg(test)] use tracing::info; #[async_trait::async_trait] -pub trait Engine { +pub trait Engine: Send + Sync + 'static { async fn scan_prefixes(&mut self, prefix: &str, delimiter: &str) -> Result>; async fn check_prefixes(&mut self, prefixes: &[String]) -> Result>; } +#[derive(Debug, Clone)] pub struct S3Engine { client: Client, bucket: String, @@ -102,16 +105,20 @@ impl Engine for S3Engine { /// A test engine that simulates a real S3 bucket with a set of paths #[cfg(test)] +#[derive(Debug, Clone)] pub(super) struct MockS3Engine { - pub paths: Vec, - pub calls: Vec<(String, String)>, // (prefix, delimiter) pairs + pub paths: Arc>, + pub calls: Arc>>, // (prefix, delimiter) pairs } #[cfg(test)] #[async_trait::async_trait] impl Engine for MockS3Engine { async fn scan_prefixes(&mut self, prefix: &str, delimiter: &str) -> Result> { - self.calls.push((prefix.to_string(), delimiter.to_string())); + self.calls + .lock() + .unwrap() + .push((prefix.to_string(), delimiter.to_string())); let found = self.scan_prefixes_inner(prefix, delimiter); info!(prefix, ?found, "mocks3 found prefixes"); @@ -141,14 +148,14 @@ impl Engine for MockS3Engine { impl MockS3Engine { pub fn new(paths: Vec) -> Self { Self { - paths, - calls: Vec::new(), + paths: Arc::new(paths), + calls: Arc::new(Mutex::new(Vec::new())), } } pub fn assert_calls(&self, expected: &[(impl AsRef, impl AsRef)]) { for (i, ((actual_prefix, actual_delim), (expected_prefix, expected_delim))) in - self.calls.iter().zip(expected).enumerate() + self.calls.lock().unwrap().iter().zip(expected).enumerate() { let i = i + 1; assert!( @@ -167,11 +174,11 @@ impl MockS3Engine { ); } assert!( - self.calls.len() == expected.len(), + self.calls.lock().unwrap().len() == expected.len(), "Got {} calls, expected {}. Actual calls: {:#?}", - self.calls.len(), + self.calls.lock().unwrap().len(), expected.len(), - self.calls + self.calls.lock().unwrap() ); } diff --git a/src/main.rs b/src/main.rs index 86d7cd5..0ffbb97 100644 --- a/src/main.rs +++ b/src/main.rs @@ -182,9 +182,9 @@ async fn run(opts: Opts) -> Result<()> { .find(['*', '?', '[', '{']) .map_or(raw_pattern.clone(), |i| raw_pattern[..i].to_owned()); - let mut engine = S3Engine::new(client.clone(), bucket.clone(), opts.delimiter.to_string()); + let engine = S3Engine::new(client.clone(), bucket.clone(), opts.delimiter.to_string()); let matcher = S3GlobMatcher::parse(raw_pattern, &opts.delimiter.to_string())?; - let mut prefixes = matcher.find_prefixes(&mut engine).await?; + let mut prefixes = matcher.find_prefixes(engine).await?; trace!(?prefixes, "matcher generated prefixes"); debug!(prefix_count = prefixes.len(), "matcher generated prefixes");