From f126081a4bc12cb2ee8454c7eee09a1c0be1bf7a Mon Sep 17 00:00:00 2001 From: Abrar Quazi Date: Thu, 16 Jan 2025 09:22:10 -0800 Subject: [PATCH] rsc: chunk ids in query and apply review comments --- rust/rsc/src/bin/rsc/read_job.rs | 70 +++++++++++++++++--------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/rust/rsc/src/bin/rsc/read_job.rs b/rust/rsc/src/bin/rsc/read_job.rs index 83dffe938..d2f157a9e 100644 --- a/rust/rsc/src/bin/rsc/read_job.rs +++ b/rust/rsc/src/bin/rsc/read_job.rs @@ -36,47 +36,51 @@ async fn record_miss(hash: String, conn: Arc) { #[tracing::instrument(skip(db, stores))] async fn resolve_blobs( - ids: &[Uuid], + ids: &Vec, db: &T, stores: &HashMap>, ) -> Result, String> { - // Fetch all blobs in a single query - let blobs = Blob::find() - .filter(entity::blob::Column::Id.is_in(ids.to_vec())) - .all(db) - .await - .map_err(|e| format!("Failed to query blobs: {}", e))?; - - // Build a map of blob_id -> blob model for quick lookup - let blob_map: HashMap = blobs.into_iter().map(|b| (b.id, b)).collect(); + //Postgres has a 65,535 parameter limit, ensuring we chunk ID's below that: https://www.postgresql.org/docs/current/limits.html + const CHUNK_SIZE: usize = 50_000; - // Ensure we have all requested blobs - for &id in ids { - if !blob_map.contains_key(&id) { - return Err(format!("Unable to find blob {} by id", id)); + let mut resolved_map = HashMap::new(); + + for chunk in ids.chunks(CHUNK_SIZE) { + // Fetch chunked blobs in a single query + let blob_map: HashMap = Blob::find() + .filter(entity::blob::Column::Id.is_in(chunk.to_vec())) + .all(db) + .await + .map_err(|e| format!("Failed to query blobs, database error: {}", e))? + .into_iter() + .map(|b| (b.id, b)) + .collect(); + + // Ensure we have all requested blobs + for &id in chunk { + if !blob_map.contains_key(&id) { + return Err(format!("Unable to find blob {} by id", id)); + } } - } - - // Resolve all download URLs in parallel - let futures = blob_map.iter().map(|(id, blob)| { - let store_opt = stores.get(&blob.store_id).cloned(); - let key = blob.key.clone(); - async move { - let store = store_opt.ok_or_else(|| { - format!("Unable to find backing store {} for blob {}", blob.store_id, id) - })?; - let url = store.download_url(key).await; - Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url })) - } - }); + // Resolve all download URLs in parallel + let futures = blob_map.iter().map(|(id, blob)| { + let store_opt = stores.get(&blob.store_id).cloned(); + let key = blob.key.clone(); + + async move { + let store = store_opt.ok_or_else(|| { + format!("Unable to find backing store {} for blob {}", blob.store_id, id) + })?; + let url = store.download_url(key).await; + Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url })) + } + }); - let results = join_all(futures).await; + let results = join_all(futures).await; - let mut resolved_map = HashMap::new(); - for res in results { - let (id, resolved_blob) = res?; - resolved_map.insert(id, resolved_blob); + let partial_map: HashMap = results.into_iter().collect::>()?; + resolved_map.extend(partial_map); } Ok(resolved_map)