diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 0000000..a9398e4 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,2 @@ +[profile.default] +slow-timeout = { period = "20s", terminate-after = 3 } diff --git a/src/glob_matcher.rs b/src/glob_matcher.rs index a775018..af6ea8d 100644 --- a/src/glob_matcher.rs +++ b/src/glob_matcher.rs @@ -10,11 +10,21 @@ use anyhow::{bail, Context as _, Result}; use globset::GlobMatcher; use itertools::Itertools as _; use regex::Regex; -use tracing::{debug, enabled, trace, Level}; +use tracing::{debug, trace}; mod engine; pub use engine::{Engine, S3Engine}; +/// The maximum number of prefixes that can be generated by the glob matcher +/// +/// Checking that constructed prefixes exist is significantly slower than +/// scanning for objects. +const MAX_PREFIXES: usize = 1000; + +/// Parallelism is determined by the number of prefixes, and 50 is much faster +/// than 1 +const DESIRED_MIN_PREFIXES: usize = 50; + /// A thing that knows how to generate and filter S3 prefixes based on a glob pattern #[derive(Debug, Clone)] pub struct S3GlobMatcher { @@ -22,6 +32,7 @@ pub struct S3GlobMatcher { delimiter: char, parts: Vec, glob: GlobMatcher, + min_prefixes: usize, } /// A scanner takes a glob pattern and can efficiently generate a list of S3 @@ -76,6 +87,7 @@ impl S3GlobMatcher { delimiter: delimiter.chars().next().unwrap(), parts: new_parts, glob: glob.compile_matcher(), + min_prefixes: DESIRED_MIN_PREFIXES, }) } @@ -103,7 +115,7 @@ impl S3GlobMatcher { /// 4. Search for all folders in ["foo/bar", "foo/baz"] /// 4. Append "qux" -> ["foo/bar/qux", "foo/baz/qux"] /// 5. Filter by "*" -> keep prefixes whose last component starts with "qux" - pub async fn find_prefixes(&self, engine: &mut impl Engine) -> Result> { + pub async fn find_prefixes(&self, mut engine: impl Engine + Clone) -> Result> { debug!("finding prefixes for {}", self.raw); let mut prefixes = vec!["".to_string()]; let delimiter = self.delimiter.to_string(); @@ -111,12 +123,17 @@ impl S3GlobMatcher { let mut prev_part = None; let mut part_iter = self.parts.iter().enumerate(); for (i, part) in &mut part_iter { - // only included prefixes in trace logs - if enabled!(Level::TRACE) { - trace!(new_part = %part.display(), %regex_so_far, ?prefixes, "scanning for part"); - } else { - debug!(new_part = %part.display(), %regex_so_far, "scanning for part"); + if prefixes.len() >= MAX_PREFIXES { + debug!( + new_prefix_count = prefixes.len(), + "We have over {MAX_PREFIXES} prefixes, stopping." + ); + break; } + // only included prefixes in trace logs + trace!(?prefixes, "scanning for part"); + debug!(new_part = %part.display(), %regex_so_far, prefix_count = prefixes.len(), "scanning for part"); + eprint!("\rDiscovering prefixes: {} ", prefixes.len()); // We always want to scan for things including the last part, // finding more prefixes in it is guaranteed to be slower than // just searching because we have to do an api call to check each @@ -138,13 +155,67 @@ impl S3GlobMatcher { !matches!(prev_part, Some(&Glob::Any { .. })) && !is_last_part; if scan_might_help { debug!(part = %part.display(), "scanning for keys in an Any"); - let mut new_prefixes = Vec::new(); + let (tx, mut rx) = tokio::sync::mpsc::channel(prefixes.len()); + + let mut tasks = Vec::new(); for prefix in &prefixes { - let np = engine.scan_prefixes(prefix, &delimiter).await?; - trace!(%prefix, found = ?np, pattern = %part.display(), "extending prefixes for Any"); - new_prefixes.extend(np); + let client_prefix = prefix.clone(); + let delimiter = delimiter.clone(); + let tx = tx.clone(); + let mut engine = engine.clone(); + let prefix = prefix.clone(); + + let task = tokio::spawn(async move { + match engine.scan_prefixes(&client_prefix, &delimiter).await { + Ok(results) => { + let _ = tx.send(Ok((prefix, results))).await; + } + Err(e) => { + let _ = tx.send(Err(e)).await; + } + } + }); + tasks.push(task); + } + + drop(tx); + + let mut new_prefixes = Vec::new(); + let mut new_prefix_count = new_prefixes.len(); + while let Some(result) = rx.recv().await { + let (scanned_prefix, results) = result.context("scanning prefixes")?; + let result_len = results.len(); + new_prefixes.extend(results); + trace!( + scanned_prefix, + scanned_results = result_len, + new_prefix_count, + "Scanning for any, got result from task" + ); + new_prefix_count = new_prefixes.len(); + if new_prefix_count >= MAX_PREFIXES { + debug!( + new_prefix_count, + "Scanning for any, found too many prefixes, aborting" + ); + for task in tasks { + task.abort(); + } + break; + } + } + // don't overwrite if we gave up early + if new_prefix_count < MAX_PREFIXES { + prefixes = new_prefixes; + } + // TODO: Do we need to make sure that every prefix in + // the old prefixes is a prefix in the new ones? + if new_prefix_count < prefixes.len() { + // this means that something after the any matched + // nothing, which I think always means that we have hit the last delimiter + debug!("Any scan decreased the number of prefixes, aborting. (Probably hit the last delimiter.)"); + break; } - prefixes = new_prefixes; } if part.is_negated() { // if this part is a negated character class then we should filter @@ -180,11 +251,18 @@ impl S3GlobMatcher { if is_simple_append { debug!(allowed = %allowed.join(","), "simple append"); let mut new_prefixes = Vec::with_capacity(prefixes.len() * allowed.len()); - for prefix in prefixes { + for prefix in &prefixes { for alt in allowed { - new_prefixes.push(prefix_join(&prefix, alt)); + new_prefixes.push(prefix_join(prefix, alt)); } } + if new_prefixes.len() >= MAX_PREFIXES { + debug!( + new_prefix_count = new_prefixes.len(), + "simple append generated over 1,000 prefixes, using prefixes from previous part" + ); + break; + } prefixes = engine.check_prefixes(&new_prefixes).await?; } else { // Build up the filters and appends @@ -225,24 +303,24 @@ impl S3GlobMatcher { debug!("no filters, appending"); let mut new_prefixes = Vec::with_capacity(prefixes.len() * appends.len()); - for prefix in prefixes { + for prefix in &prefixes { for alt in &appends { - new_prefixes.push(prefix_join(&prefix, alt)); + new_prefixes.push(prefix_join(prefix, alt)); } } new_prefixes } else { debug!("filtering and appending"); let mut new_prefixes = Vec::with_capacity(prefixes.len()); - for prefix in prefixes { - if filter.is_match(&prefix) { + for prefix in &prefixes { + if filter.is_match(prefix) { // we only need to append if it's not already matched - if !appends.is_empty() && !append_matcher.is_match(&prefix) { + if !appends.is_empty() && !append_matcher.is_match(prefix) { for alt in &appends { - new_prefixes.push(prefix_join(&prefix, alt)); + new_prefixes.push(prefix_join(prefix, alt)); } } else { - new_prefixes.push(prefix); + new_prefixes.push(prefix.to_string()); } } } @@ -251,7 +329,16 @@ impl S3GlobMatcher { }; if !appends.is_empty() { - trace!(prefixes = ?new_prefixes, "checking appended prefixes"); + if new_prefixes.len() >= MAX_PREFIXES { + // checking prefixes is significantly slower + // than scanning existing prefixes. + debug!( + new_prefix_count = new_prefixes.len(), + "Appends generated over 1,000 prefixes, using prefixes from previous part" + ); + break; + } + trace!(new_prefixes = ?new_prefixes, new_prefix_count = new_prefixes.len(), "checking appended prefixes"); prefixes = engine.check_prefixes(&new_prefixes).await?; } else { debug!("no appends, using new prefixes"); @@ -271,12 +358,47 @@ impl S3GlobMatcher { prev_part = Some(part); } + // parallelism is determined by the number of prefixes, so we need to + // extend the list until we have several. 50 is generally pretty fast, + // but going more than 5 layers deep feels excessive + let mut extensions = 0; + while !prefixes.is_empty() && prefixes.len() < self.min_prefixes && extensions < 5 { + eprint!("\rDiscovering prefixes: {} ", prefixes.len()); + debug!( + prefix_count = prefixes.len(), + extensions, "extending prefixes because there are less than 50" + ); + trace!( + ?prefixes, + "extending prefixes because there are less than 50" + ); + extensions += 1; + let mut new_prefixes = Vec::new(); + for prefix in &prefixes { + let np = engine.scan_prefixes(prefix, &delimiter).await?; + for p in np { + if self.is_match(&p) { + new_prefixes.push(p); + } + } + } + if prefixes == new_prefixes { + break; + } + prefixes = new_prefixes; + } + eprintln!("\rDiscovered prefixes: {} ", prefixes.len()); Ok(prefixes) } pub fn is_match(&self, path: &str) -> bool { self.glob.is_match(path) } + + #[cfg(test)] + fn set_min_prefixes(&mut self, min_prefixes: usize) { + self.min_prefixes = min_prefixes; + } } fn prefix_join(prefix: &str, alt: &str) -> String { @@ -625,10 +747,11 @@ mod tests { #[tokio::test] async fn test_find_prefixes_literal() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/foo/bar".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec!["src/foo/bar".to_string()]); + let mut scanner = S3GlobMatcher::parse("src/foo/bar".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec!["src/foo/bar".to_string()]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/bar"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -638,14 +761,15 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_no_any() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{foo,bar}/baz".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("src/{foo,bar}/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), "src/bar/baz".to_string(), "src/qux/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/baz", "src/bar/baz"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -655,16 +779,17 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_with_any() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{foo,bar}*/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/{foo,bar}*/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); println!("scanner_parts for {}:\n{:?}", scanner.raw, scanner.parts); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/baz".to_string(), "src/bar/baz".to_string(), "src/foo-quux/baz".to_string(), "src/qux/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("src/foo", "/"), ("src/bar", "/")]); assert!(prefixes == vec!["src/foo-quux/baz", "src/foo/baz", "src/bar/baz",]); Ok(()) @@ -673,15 +798,16 @@ mod tests { #[tokio::test] async fn test_find_prefixes_star() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/*/main.rs".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("src/*/main.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "src/foo/main.rs".to_string(), "src/bar/main.rs".to_string(), "src/baz/other.rs".to_string(), ]); info!(?engine.paths); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/bar/main.rs", "src/foo/main.rs"]); engine.assert_calls(&[("src/", "/")]); Ok(()) @@ -690,15 +816,16 @@ mod tests { #[tokio::test] async fn test_find_prefixes_recursive() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/**/test.rs".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("src/**/test.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "src/test.rs".to_string(), "src/foo/test.rs".to_string(), "src/foo/bar/test.rs".to_string(), "src/other.rs".to_string(), ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; // Should stop at src/ since ** matches anything after assert!(prefixes == vec!["src/"]); let e: &[(&str, &str)] = &[]; @@ -709,18 +836,19 @@ mod tests { #[tokio::test] async fn test_find_prefixes_character_class() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/[abc]*.rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/[abc]*.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], Choice(vec!["src/a", "src/b", "src/c"])); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice(".rs")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/abc.rs".to_string(), "src/baz.rs".to_string(), "src/cat.rs".to_string(), "src/dog.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("src/a", "/"), ("src/b", "/"), ("src/c", "/")]); assert!(prefixes == vec!["src/abc.rs", "src/baz.rs", "src/cat.rs"]); Ok(()) @@ -728,7 +856,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_then_any() -> Result<()> { - let scanner = S3GlobMatcher::parse("literal/{foo,bar}*/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/{foo,bar}*/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); println!("scanner_parts for {}:\n{:#?}", scanner.raw, scanner.parts); assert_scanner_part!( @@ -738,14 +867,14 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice("/baz")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/bar-stuff/baz".to_string(), "literal/foo-extra/baz".to_string(), "literal/foo/baz".to_string(), "literal/other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/foo", "/"), ("literal/bar", "/")]); assert!( prefixes @@ -761,7 +890,8 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_any_literal() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("literal/{foo,bar}*quux/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/{foo,bar}*quux/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!( &scanner.parts[0], @@ -770,7 +900,7 @@ mod tests { assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice("quux/baz")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/foo-quux/baz".to_string(), "literal/bar-quux/baz".to_string(), // Should be filtered out @@ -779,7 +909,7 @@ mod tests { "literal/foo-quux-bar/baz".to_string(), ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["literal/foo-quux/baz", "literal/bar-quux/baz"]); engine.assert_calls(&[("literal/foo", "/"), ("literal/bar", "/")]); Ok(()) @@ -787,19 +917,20 @@ mod tests { #[tokio::test] async fn test_find_prefixes_any_then_alternation() -> Result<()> { - let scanner = S3GlobMatcher::parse("literal/*{foo,bar}/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/*{foo,bar}/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/")); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["foo/baz", "bar/baz"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/something-foo/baz".to_string(), "literal/other-bar/baz".to_string(), "literal/not-match/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/", "/")]); assert!(prefixes == vec!["literal/other-bar/baz", "literal/something-foo/baz"]); Ok(()) @@ -808,19 +939,20 @@ mod tests { #[tokio::test] async fn test_find_prefixes_literal_any_alternation() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("literal/quux*{foo,bar}/baz".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/quux*{foo,bar}/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/quux")); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["foo/baz", "bar/baz"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/quux-foo/baz".to_string(), "literal/quux-something-bar/baz".to_string(), "literal/quux-other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/quux", "/")]); assert!(prefixes == vec!["literal/quux-foo/baz", "literal/quux-something-bar/baz"]); Ok(()) @@ -828,20 +960,21 @@ mod tests { #[tokio::test] async fn test_find_prefixes_any_after_last_delimiter() -> Result<()> { - let scanner = S3GlobMatcher::parse("literal/baz*.rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/baz*.rs".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/baz")); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], OneChoice(".rs")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/baz.rs".to_string(), "literal/baz-extra.rs".to_string(), "literal/bazinga.rs".to_string(), "literal/other.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/baz", "/")]); assert!( prefixes @@ -857,19 +990,20 @@ mod tests { #[tokio::test] async fn test_find_prefixes_any_and_character_class() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("literal/baz*[ab].rs".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("literal/baz*[ab].rs".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("literal/baz")); assert_scanner_part!(&scanner.parts[1], Any("*")); assert_scanner_part!(&scanner.parts[2], Choice(vec!["a.rs", "b.rs"])); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "literal/baz-a.rs".to_string(), "literal/baz-extra-b.rs".to_string(), "literal/baz-c.rs".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; engine.assert_calls(&[("literal/baz", "/")]); assert!(prefixes == vec!["literal/baz-a.rs", "literal/baz-extra-b.rs"]); Ok(()) @@ -878,14 +1012,15 @@ mod tests { #[tokio::test] async fn test_find_prefixes_empty_alternative() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{,tmp}/file".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("src/{,tmp}/file".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), "src/other/file".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; // TODO: there is a legitimate case that this should only be // src/tmp/file, but we strip double forward slashes to work around // minio @@ -898,14 +1033,15 @@ mod tests { #[tokio::test] async fn test_find_prefixes_empty_alternative_with_delimiter() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("src/{,tmp/}file".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("src/{,tmp/}file".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "src/file".to_string(), "src/tmp/file".to_string(), "src/other/file".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/file", "src/tmp/file"]); let e: &[(&str, &str)] = &[]; engine.assert_calls(e); @@ -914,21 +1050,22 @@ mod tests { #[tokio::test] async fn test_find_prefixes_alternation_with_delimiter() -> Result<()> { - let scanner = S3GlobMatcher::parse("src/{foo/bar,baz}/test".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("src/{foo/bar,baz}/test".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!( &scanner.parts[0], Choice(vec!["src/foo/bar/test", "src/baz/test"]) ); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "src/foo/bar/test".to_string(), "src/baz/test".to_string(), "src/foo/test".to_string(), // Should be filtered out "src/foo/baz/test".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["src/foo/bar/test", "src/baz/test"]); let e: &[(&str, &str)] = &[]; // No API calls needed since alternation is static engine.assert_calls(e); @@ -938,15 +1075,16 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_start() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("[!a]*/foo".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("[!a]*/foo".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "b/foo".to_string(), "c/foo".to_string(), "xyz/foo".to_string(), "a/foo".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["b/foo", "c/foo", "xyz/foo"]); engine.assert_calls(&[("", "/")]); Ok(()) @@ -955,15 +1093,16 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_after_wildcard() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("*[!f]oo".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("*[!f]oo".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "zoo".to_string(), "boo".to_string(), "foo".to_string(), // Should be filtered out "something/foo".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["boo", "zoo"]); // TODO: this could be improved to only call the engine once Ok(()) @@ -972,8 +1111,9 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_between_alternations() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("{foo,bar}[!z]*/baz".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("{foo,bar}[!z]*/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "foo-abc/baz".to_string(), "bar-def/baz".to_string(), "fooz/baz".to_string(), // Should be filtered out @@ -981,7 +1121,7 @@ mod tests { "other/baz".to_string(), // Should be filtered out ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["foo-abc/baz", "bar-def/baz"]); engine.assert_calls(&[("foo", "/"), ("bar", "/")]); Ok(()) @@ -990,8 +1130,9 @@ mod tests { #[tokio::test] async fn test_find_prefixes_multiple_negative_classes() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("[!a]*[!b]/foo".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("[!a]*[!b]/foo".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "c-x/foo".to_string(), "d-y/foo".to_string(), // filtered out @@ -1000,7 +1141,7 @@ mod tests { "c-b/foo".to_string(), // (second part starts with b) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["c-x/foo", "d-y/foo"]); engine.assert_calls(&[("", "/")]); Ok(()) @@ -1009,12 +1150,13 @@ mod tests { #[tokio::test] async fn test_find_prefixes_negative_class_with_delimiter() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("foo/[!/]/bar".to_string(), "/")?; + let mut scanner = S3GlobMatcher::parse("foo/[!/]/bar".to_string(), "/")?; + scanner.set_min_prefixes(0); assert_scanner_part!(&scanner.parts[0], OneChoice("foo/")); assert_scanner_part!(&scanner.parts[1], Any("[!/]")); assert_scanner_part!(&scanner.parts[2], OneChoice("/bar")); - let mut engine = MockS3Engine::new(vec![ + let engine = MockS3Engine::new(vec![ "foo/x/bar".to_string(), "foo/a/bar".to_string(), "foo//bar".to_string(), // Should be filtered out @@ -1022,7 +1164,7 @@ mod tests { "foo///bar".to_string(), // Should be filtered out (the excluded char) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["foo/a/bar", "foo/x/bar"]); engine.assert_calls(&[("foo/", "/")]); Ok(()) @@ -1031,8 +1173,9 @@ mod tests { #[tokio::test] async fn test_find_prefixes_complex_negative_pattern() -> Result<()> { setup_logging(Some("s3glob=trace")); - let scanner = S3GlobMatcher::parse("*{foo,bar}*[!Z]/baz".to_string(), "/")?; - let mut engine = MockS3Engine::new(vec![ + let mut scanner = S3GlobMatcher::parse("*{foo,bar}*[!Z]/baz".to_string(), "/")?; + scanner.set_min_prefixes(0); + let engine = MockS3Engine::new(vec![ "x-foo-a/baz".to_string(), "y-bar-b/baz".to_string(), // filtered out @@ -1041,7 +1184,7 @@ mod tests { "x-baz-a/baz".to_string(), // (middle not foo/bar) ]); - let prefixes = scanner.find_prefixes(&mut engine).await?; + let prefixes = scanner.find_prefixes(engine.clone()).await?; assert!(prefixes == vec!["x-foo-a/baz", "y-bar-b/baz"]); // TODO: this could be improved to only call the engine once Ok(()) diff --git a/src/glob_matcher/engine.rs b/src/glob_matcher/engine.rs index 25bf367..c2dc0e1 100644 --- a/src/glob_matcher/engine.rs +++ b/src/glob_matcher/engine.rs @@ -5,14 +5,17 @@ use tracing::{debug, trace}; #[cfg(test)] use std::collections::BTreeSet; #[cfg(test)] +use std::sync::{Arc, Mutex}; +#[cfg(test)] use tracing::info; #[async_trait::async_trait] -pub trait Engine { +pub trait Engine: Send + Sync + 'static { async fn scan_prefixes(&mut self, prefix: &str, delimiter: &str) -> Result>; async fn check_prefixes(&mut self, prefixes: &[String]) -> Result>; } +#[derive(Debug, Clone)] pub struct S3Engine { client: Client, bucket: String, @@ -102,24 +105,29 @@ impl Engine for S3Engine { /// A test engine that simulates a real S3 bucket with a set of paths #[cfg(test)] +#[derive(Debug, Clone)] pub(super) struct MockS3Engine { - pub paths: Vec, - pub calls: Vec<(String, String)>, // (prefix, delimiter) pairs + pub paths: Arc>, + pub calls: Arc>>, // (prefix, delimiter) pairs } #[cfg(test)] #[async_trait::async_trait] impl Engine for MockS3Engine { async fn scan_prefixes(&mut self, prefix: &str, delimiter: &str) -> Result> { - self.calls.push((prefix.to_string(), delimiter.to_string())); + self.calls + .lock() + .unwrap() + .push((prefix.to_string(), delimiter.to_string())); let found = self.scan_prefixes_inner(prefix, delimiter); - info!(prefix, ?found, "mocks3 found prefixes"); + info!(prefix, ?found, "MockS3 found prefixes"); found } async fn check_prefixes(&mut self, prefixes: &[String]) -> Result> { + info!(?prefixes, "MockS3 checking prefixes"); let mut valid_prefixes = Vec::new(); for prefix in prefixes { @@ -141,15 +149,18 @@ impl Engine for MockS3Engine { impl MockS3Engine { pub fn new(paths: Vec) -> Self { Self { - paths, - calls: Vec::new(), + paths: Arc::new(paths), + calls: Arc::new(Mutex::new(Vec::new())), } } pub fn assert_calls(&self, expected: &[(impl AsRef, impl AsRef)]) { + info!("MockS3 asserting calls"); + let calls = &self.calls.lock().unwrap(); for (i, ((actual_prefix, actual_delim), (expected_prefix, expected_delim))) in - self.calls.iter().zip(expected).enumerate() + calls.iter().zip(expected).enumerate() { + info!("MockS3 asserting call {i}"); let i = i + 1; assert!( actual_prefix == expected_prefix.as_ref(), @@ -167,11 +178,11 @@ impl MockS3Engine { ); } assert!( - self.calls.len() == expected.len(), + calls.len() == expected.len(), "Got {} calls, expected {}. Actual calls: {:#?}", - self.calls.len(), + calls.len(), expected.len(), - self.calls + calls ); } diff --git a/src/main.rs b/src/main.rs index aba38cd..5b1e047 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use std::io::{IsTerminal as _, Write as _}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Context as _, Result}; use aws_config::meta::region::RegionProviderChain; @@ -142,8 +142,12 @@ fn main() { error!("Failed to run: {}", err); let mut err = err.source(); let mut count = 0; + let mut prev_msg = String::new(); while let Some(e) = err { - eprintln!(" : {}", e); + if e.to_string() != prev_msg { + eprintln!(" : {}", e); + prev_msg = e.to_string(); + } err = e.source(); count += 1; if count > 10 { @@ -158,6 +162,7 @@ fn main() { } async fn run(opts: Opts) -> Result<()> { + let start = Instant::now(); let pat = match &opts.command { Command::List { pattern, .. } | Command::Download { pattern, .. } => pattern, }; @@ -178,9 +183,9 @@ async fn run(opts: Opts) -> Result<()> { .find(['*', '?', '[', '{']) .map_or(raw_pattern.clone(), |i| raw_pattern[..i].to_owned()); - let mut engine = S3Engine::new(client.clone(), bucket.clone(), opts.delimiter.to_string()); + let engine = S3Engine::new(client.clone(), bucket.clone(), opts.delimiter.to_string()); let matcher = S3GlobMatcher::parse(raw_pattern, &opts.delimiter.to_string())?; - let mut prefixes = matcher.find_prefixes(&mut engine).await?; + let mut prefixes = matcher.find_prefixes(engine).await?; trace!(?prefixes, "matcher generated prefixes"); debug!(prefix_count = prefixes.len(), "matcher generated prefixes"); @@ -219,7 +224,7 @@ async fn run(opts: Opts) -> Result<()> { let seen_prefixes = add_atomic(&seen_prefixes, 1); eprint!( "\rmatches/total {:>4}/{:<10} prefixes/total {:>4}/{:<4}", - match_count, + match_count.to_formatted_string(&Locale::en), total_objects.to_formatted_string(&Locale::en), seen_prefixes, total_prefixes @@ -253,10 +258,11 @@ async fn run(opts: Opts) -> Result<()> { } } eprintln!( - "Found {} matching objects out of {} scanned in {} prefixes", + "Matched {}/{} objects across {} prefixes in {:?}", objects.len(), total_objects.load(Ordering::Relaxed), - total_prefixes + total_prefixes, + Duration::from_millis(start.elapsed().as_millis() as u64) ); } Command::Download { dest, .. } => { @@ -330,7 +336,7 @@ async fn create_s3_client(opts: &Opts, bucket: &String) -> Result { .raw_response() .and_then(|res| res.headers().get("x-amz-bucket-region")) .map(str::to_owned) - .ok_or_else(|| anyhow!("failed to extract bucket region"))?, + .ok_or_else(|| anyhow!("failed to extract bucket region: {err}"))?, }; let region = Region::new(bucket_region);