diff --git a/src/persist-client/src/cli/inspect.rs b/src/persist-client/src/cli/inspect.rs index 21f6d0af15b3f..29c0d3139463a 100644 --- a/src/persist-client/src/cli/inspect.rs +++ b/src/persist-client/src/cli/inspect.rs @@ -615,10 +615,7 @@ pub async fn unreferenced_blobs(args: &StateArgs) -> Result !known_writers.contains(&writer), - version @ WriterKey::Version(_) => version < minimum_version, - }; + let is_unreferenced = writer < minimum_version; if is_unreferenced && !known_parts.contains(&part) { unreferenced_blobs.batch_parts.insert(part); } diff --git a/src/persist-client/src/internal/paths.rs b/src/persist-client/src/internal/paths.rs index 9d594ede3d3e3..9c2e5d18401bd 100644 --- a/src/persist-client/src/internal/paths.rs +++ b/src/persist-client/src/internal/paths.rs @@ -341,7 +341,21 @@ mod tests { BlobKey::parse_ids(&format!("{}/{}/{}", shard_id, writer_id, part_id)), Ok(( shard_id, - PartialBlobKey::Batch(WriterKey::Id(writer_id), part_id) + PartialBlobKey::Batch(WriterKey::Id(writer_id), part_id.clone()) + )) + ); + + let version = Version::new(1, 0, 0); + assert_eq!( + BlobKey::parse_ids(&format!( + "{}/{}/{}", + shard_id, + WriterKey::for_version(&version), + part_id + )), + Ok(( + shard_id, + PartialBlobKey::Batch(WriterKey::for_version(&version), part_id) )) ); diff --git a/src/persist-client/src/usage.rs b/src/persist-client/src/usage.rs index 9ff85d18533df..66e3d0e6c247c 100644 --- a/src/persist-client/src/usage.rs +++ b/src/persist-client/src/usage.rs @@ -23,7 +23,6 @@ use crate::cfg::{PersistConfig, USAGE_STATE_FETCH_CONCURRENCY_LIMIT}; use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey, WriterKey}; use crate::internal::state::HollowBlobRef; use crate::internal::state_versions::StateVersions; -use crate::write::WriterId; use crate::{retry_external, Metrics, PersistClient, ShardId}; /// A breakdown of the size of various contributions to a shard's blob @@ -481,7 +480,6 @@ impl StorageUsageClient { }); let current_state_bytes = current_state_batches_bytes + current_state_rollups_bytes; - let live_writers = &states_iter.state().collections.writers; let ret = ShardUsageAudit::from(ShardUsageCumulativeMaybeRacy { current_state_batches_bytes, current_state_bytes, @@ -489,8 +487,7 @@ impl StorageUsageClient { referenced_batches_bytes: &referenced_batches_bytes, // In the future, this is likely to include a "grace period" so recent but non-current // versions are also considered live - minimum_version: WriterKey::for_version(&self.cfg.build_version), - live_writers, + minimum_key: WriterKey::for_version(&self.cfg.build_version), blob_usage, }); @@ -538,26 +535,22 @@ impl StorageUsageClient { } #[derive(Debug)] -struct ShardUsageCumulativeMaybeRacy<'a, T> { +struct ShardUsageCumulativeMaybeRacy<'a> { current_state_batches_bytes: u64, current_state_bytes: u64, referenced_other_bytes: u64, referenced_batches_bytes: &'a BTreeMap, - minimum_version: WriterKey, - live_writers: &'a BTreeMap, + minimum_key: WriterKey, blob_usage: &'a ShardBlobUsage, } -impl From> for ShardUsageAudit { - fn from(x: ShardUsageCumulativeMaybeRacy<'_, T>) -> Self { +impl From> for ShardUsageAudit { + fn from(x: ShardUsageCumulativeMaybeRacy<'_>) -> Self { let mut not_leaked_bytes = 0; let mut total_bytes = 0; for (writer_key, bytes) in x.blob_usage.by_writer.iter() { total_bytes += *bytes; - let writer_key_is_live = match writer_key { - WriterKey::Id(writer_id) => x.live_writers.contains_key(writer_id), - version @ WriterKey::Version(_) => *version >= x.minimum_version, - }; + let writer_key_is_live = *writer_key >= x.minimum_key; if writer_key_is_live { not_leaked_bytes += *bytes; } else { @@ -986,19 +979,13 @@ mod tests { assert_eq!(shard_usage_referenced.batches_bytes, batches_size); } - fn writer_id(x: char) -> WriterId { - let x = [x, x, x, x].iter().collect::(); - let s = format!("w{x}{x}-{x}-{x}-{x}-{x}{x}{x}"); - s.parse().unwrap() - } - struct TestCase { current_state_batches_bytes: u64, current_state_bytes: u64, referenced_other_bytes: u64, - referenced_batches_bytes: Vec<(char, u64)>, - live_writers: Vec, - blob_usage_by_writer: Vec<(char, u64)>, + referenced_batches_bytes: Vec<(WriterKey, u64)>, + min_writer_key: WriterKey, + blob_usage_by_writer: Vec<(WriterKey, u64)>, blob_usage_rollups: u64, } @@ -1008,18 +995,13 @@ mod tests { let referenced_batches_bytes = self .referenced_batches_bytes .iter() - .map(|(id, b)| (WriterKey::Id(writer_id(*id)), *b)) - .collect(); - let live_writers = self - .live_writers - .iter() - .map(|id| (writer_id(*id), ())) + .map(|(id, b)| (id.clone(), *b)) .collect(); let blob_usage = ShardBlobUsage { by_writer: self .blob_usage_by_writer .iter() - .map(|(id, b)| (WriterKey::Id(writer_id(*id)), *b)) + .map(|(id, b)| (id.clone(), *b)) .collect(), rollup_bytes: self.blob_usage_rollups, }; @@ -1028,8 +1010,7 @@ mod tests { current_state_bytes: self.current_state_bytes, referenced_other_bytes: self.referenced_other_bytes, referenced_batches_bytes: &referenced_batches_bytes, - minimum_version: WriterKey::for_version(&Version::new(0, 0, 1)), - live_writers: &live_writers, + minimum_key: self.min_writer_key.clone(), blob_usage: &blob_usage, }; let usage = ShardUsageAudit::from(input); @@ -1049,6 +1030,10 @@ mod tests { } } + fn version(minor: u64) -> WriterKey { + WriterKey::for_version(&Version::new(0, minor, 0)) + } + #[mz_ore::test] fn usage_kitchen_sink() { TestCase { @@ -1062,11 +1047,11 @@ mod tests { referenced_other_bytes: 3, // - Some data written by a still active writer: (a, 4) // - Some data written by a now-expired writer: (b, 5) - referenced_batches_bytes: vec![('a', 4), ('b', 5)], - live_writers: vec!['a'], - // - Some data leaked by a still active writer: (a, 7) - (a, 4) - // - Some data leaked by a now-expired writer: (b, 8) - (b, 5) - blob_usage_by_writer: vec![('a', 7), ('b', 8)], + referenced_batches_bytes: vec![(version(3), 4), (version(2), 5)], + min_writer_key: version(3), + // - Some data leaked by a still active writer: (v3, 7) - (a, 4) + // - Some data leaked by a now-expired writer: (v2, 8) - (b, 5) + blob_usage_by_writer: vec![(version(3), 7), (version(2), 8)], // - Some data in rollups blob_usage_rollups: 6, } @@ -1080,9 +1065,9 @@ mod tests { current_state_batches_bytes: 1, current_state_bytes: 1, referenced_other_bytes: 0, - referenced_batches_bytes: vec![('a', 1)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 1)], + referenced_batches_bytes: vec![(version(3), 1)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 1)], blob_usage_rollups: 0, } .run("1 0/1 0/1 0/1 0/1"); @@ -1092,9 +1077,9 @@ mod tests { current_state_batches_bytes: 0, current_state_bytes: 1, referenced_other_bytes: 0, - referenced_batches_bytes: vec![('a', 1)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 1)], + referenced_batches_bytes: vec![(version(3), 1)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 1)], blob_usage_rollups: 0, } .run("1 0/1 0/1 0/1 1/0"); @@ -1104,9 +1089,9 @@ mod tests { current_state_batches_bytes: 0, current_state_bytes: 0, referenced_other_bytes: 0, - referenced_batches_bytes: vec![('a', 1)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 1)], + referenced_batches_bytes: vec![(version(3), 1)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 1)], blob_usage_rollups: 0, } .run("1 0/1 0/1 1/0 0/0"); @@ -1117,8 +1102,8 @@ mod tests { current_state_bytes: 0, referenced_other_bytes: 0, referenced_batches_bytes: vec![], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 1)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 1)], blob_usage_rollups: 0, } .run("1 0/1 1/0 0/0 0/0"); @@ -1129,8 +1114,8 @@ mod tests { current_state_bytes: 0, referenced_other_bytes: 0, referenced_batches_bytes: vec![], - live_writers: vec![], - blob_usage_by_writer: vec![('a', 1)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(2), 1)], blob_usage_rollups: 0, } .run("1 1/0 0/0 0/0 0/0"); @@ -1141,7 +1126,7 @@ mod tests { current_state_bytes: 0, referenced_other_bytes: 0, referenced_batches_bytes: vec![], - live_writers: vec![], + min_writer_key: version(3), blob_usage_by_writer: vec![], blob_usage_rollups: 0, } @@ -1159,9 +1144,9 @@ mod tests { current_state_batches_bytes: 2, current_state_bytes: 4, referenced_other_bytes: 2, - referenced_batches_bytes: vec![('a', 4)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 8), ('b', 2)], + referenced_batches_bytes: vec![(version(3), 4)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 8), (version(2), 2)], blob_usage_rollups: 0, } .run("10 2/8 2/6 2/4 2/2"); @@ -1171,9 +1156,9 @@ mod tests { current_state_batches_bytes: 2, current_state_bytes: 4, referenced_other_bytes: 2, - referenced_batches_bytes: vec![('a', 4)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 8), ('b', 1)], + referenced_batches_bytes: vec![(version(3), 4)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 8), (version(2), 1)], blob_usage_rollups: 0, } .run("9 1/8 2/6 2/4 2/2"); @@ -1183,9 +1168,9 @@ mod tests { current_state_batches_bytes: 2, current_state_bytes: 4, referenced_other_bytes: 2, - referenced_batches_bytes: vec![('a', 4)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 7)], + referenced_batches_bytes: vec![(version(3), 4)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 7)], blob_usage_rollups: 0, } .run("7 0/7 1/6 2/4 2/2"); @@ -1195,9 +1180,9 @@ mod tests { current_state_batches_bytes: 2, current_state_bytes: 4, referenced_other_bytes: 2, - referenced_batches_bytes: vec![('a', 4)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 5)], + referenced_batches_bytes: vec![(version(3), 4)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 5)], blob_usage_rollups: 0, } .run("5 0/5 0/5 1/4 2/2"); @@ -1207,9 +1192,9 @@ mod tests { current_state_batches_bytes: 2, current_state_bytes: 4, referenced_other_bytes: 2, - referenced_batches_bytes: vec![('a', 4)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 3)], + referenced_batches_bytes: vec![(version(3), 4)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 3)], blob_usage_rollups: 0, } .run("3 0/3 0/3 0/3 1/2"); @@ -1219,9 +1204,9 @@ mod tests { current_state_batches_bytes: 2, current_state_bytes: 4, referenced_other_bytes: 2, - referenced_batches_bytes: vec![('a', 4)], - live_writers: vec!['a'], - blob_usage_by_writer: vec![('a', 1)], + referenced_batches_bytes: vec![(version(3), 4)], + min_writer_key: version(3), + blob_usage_by_writer: vec![(version(3), 1)], blob_usage_rollups: 0, } .run("1 0/1 0/1 0/1 0/1"); @@ -1236,9 +1221,9 @@ mod tests { current_state_batches_bytes: 0, current_state_bytes: 0, referenced_other_bytes: 0, - referenced_batches_bytes: vec![('a', 5)], - live_writers: vec![], - blob_usage_by_writer: vec![('a', 3)], + referenced_batches_bytes: vec![(version(3), 5)], + min_writer_key: version(10), + blob_usage_by_writer: vec![(version(3), 3)], blob_usage_rollups: 0, } .run("3 0/3 0/3 3/0 0/0");