Skip to content

Commit

Permalink
Chunk FindMissingBlobsRequest appropriately (#20708)
Browse files Browse the repository at this point in the history
Fixes #20674
  • Loading branch information
tgolsson authored Feb 7, 2025
1 parent 94021c6 commit 2e4e6ff
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 20 deletions.
3 changes: 3 additions & 0 deletions docs/notes/2.26.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Thank you to [Klayvio](https://www.klaviyo.com/) and [Normal Computing](https://

### Highlights

### Remote caching/execution

- Remote cache: `FindMissingBlobsRequest` will now make multiple request if the number of files is large. (https://github.com/pantsbuild/pants/pull/20708)

### Deprecations

Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio-util = { workspace = true, features = ["io"] }
tonic = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
workunit_store = { path = "../../workunit_store" }
prost = { workspace = true }

[dev-dependencies]
mock = { path = "../../testutil/mock" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use workunit_store::{Metric, ObservationMetric};

use remote_provider_traits::{ByteStoreProvider, LoadDestination, RemoteStoreOptions};

const RPC_DIGEST_SIZE: usize = 70;

pub struct Provider {
instance_name: Option<String>,
chunk_size_bytes: usize,
Expand Down Expand Up @@ -393,24 +395,53 @@ impl ByteStoreProvider for Provider {
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String> {
let request = remexec::FindMissingBlobsRequest {
instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(),
blob_digests: digests.into_iter().map(|d| d.into()).collect::<Vec<_>>(),
};
let blob_digests = digests.into_iter().map(|d| d.into()).collect::<Vec<_>>();

let client = self.cas_client.as_ref().clone();
const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
let max_digests_per_request: usize = (DEFAULT_MAX_GRPC_MESSAGE_SIZE
- self
.instance_name
.as_ref()
.cloned()
.unwrap_or_default()
.len()
- 10)
/ RPC_DIGEST_SIZE;

let requests = blob_digests.chunks(max_digests_per_request).map(|digests| {
let msg = remexec::FindMissingBlobsRequest {
instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(),
blob_digests: digests.to_vec(),
};

workunit_store::increment_counter_if_in_workunit(Metric::RemoteStoreExistsAttempts, 1);
let result = retry_call(
client,
move |mut client, _| {
let request = request.clone();
async move { client.find_missing_blobs(request).await }
},
status_is_retryable,
)
.await
.map_err(status_to_str);
msg
});

let client = self.cas_client.as_ref();
let futures = requests
.map(|request| {
workunit_store::increment_counter_if_in_workunit(
Metric::RemoteStoreExistsAttempts,
1,
);

let client = client.clone();
retry_call(
client,
move |mut client, _| {
let request = request.clone();
async move { client.find_missing_blobs(request).await }
},
status_is_retryable,
)
})
.collect::<Vec<_>>();

let result = futures::future::join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(status_to_str);

let metric = match result {
Ok(_) => Metric::RemoteStoreExistsSuccesses,
Expand All @@ -421,10 +452,45 @@ impl ByteStoreProvider for Provider {
let response = result?;

response
.into_inner()
.missing_blob_digests
.iter()
.map(|digest| digest.try_into())
.into_iter()
.flat_map(|response| {
response
.into_inner()
.missing_blob_digests
.into_iter()
.map(|digest| digest.try_into())
})
.collect::<Result<HashSet<_>, _>>()
}
}

#[cfg(test)]
mod tests {
use super::RPC_DIGEST_SIZE;
use crate::remexec::FindMissingBlobsRequest;
use prost::Message;
use testutil::data::TestData;

#[test]
fn test_size_of_find_missing_blobs_request() {
let mut blobs = Vec::new();
let instance_name = "";
// NOTE[TSolberg]: This test is a bit of a hack, but it's the best way I could think of to
// ensure that the size of the FindMissingBlobsRequest is roughly what we expect. The only
// delta would be the encoding of the instance name.
for it in (0..10).into_iter().chain(1000..1010).chain(10000..10010) {
while blobs.len() < it {
blobs.push(TestData::roland().digest().into());
}

let request = FindMissingBlobsRequest {
instance_name: instance_name.to_string(),
blob_digests: blobs.clone(),
};

let size = request.encoded_len();

assert_eq!(size, RPC_DIGEST_SIZE * it);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,49 @@ async fn list_missing_digests_none_missing() {
)
}

#[tokio::test]
async fn list_missing_digests_more_than_4mb() {
let testdata = TestData::roland();
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::builder().file(&testdata).build().await;

let provider = new_provider(&cas).await;

let test_data = (0..100_000).map(|_| testdata.digest()).collect::<Vec<_>>();
assert_eq!(
provider
.list_missing_digests(&mut test_data.into_iter())
.await,
Ok(HashSet::new())
);

assert_eq!(cas.request_count(RequestType::CASFindMissingBlobs), 2)
}

#[tokio::test]
async fn list_missing_digests_more_than_4mb_some_missing() {
let testdata = TestData::roland();
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::builder().file(&testdata).build().await;

let provider = new_provider(&cas).await;

let henries = TestData::all_the_henries();
let robin = TestData::robin();
let mut test_data = vec![henries.digest()];
test_data.extend((0..100_000).map(|_| testdata.digest()));
test_data.push(robin.digest());

assert_eq!(
provider
.list_missing_digests(&mut test_data.into_iter())
.await,
Ok(HashSet::from([henries.digest(), robin.digest()]))
);

assert_eq!(cas.request_count(RequestType::CASFindMissingBlobs), 2)
}

#[tokio::test]
async fn list_missing_digests_some_missing() {
let cas = StubCAS::empty().await;
Expand Down

0 comments on commit 2e4e6ff

Please sign in to comment.