diff --git a/python/lsst/daf/butler/datastores/fileDatastore.py b/python/lsst/daf/butler/datastores/fileDatastore.py index d8f9f5822f..49d6bb8b4d 100644 --- a/python/lsst/daf/butler/datastores/fileDatastore.py +++ b/python/lsst/daf/butler/datastores/fileDatastore.py @@ -1976,6 +1976,71 @@ def _locations_to_URI( return uris + @staticmethod + def _find_missing_records( + datastore: FileDatastore, + refs: Iterable[DatasetRef], + missing_ids: set[DatasetId], + artifact_existence: dict[ResourcePath, bool] | None = None, + ) -> dict[DatasetId, list[StoredFileInfo]]: + if not missing_ids: + return {} + + if artifact_existence is None: + artifact_existence = {} + + found_records: dict[DatasetId, list[StoredFileInfo]] = defaultdict(list) + id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids} + + # This should be chunked in case we end up having to check + # the file store since we need some log output to show + # progress. + for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=10_000): + records = {} + for missing in missing_ids_chunk: + # Ask the source datastore where the missing artifacts + # should be. An execution butler might not know about the + # artifacts even if they are there. + expected = datastore._get_expected_dataset_locations_info(id_to_ref[missing]) + records[missing] = [info for _, info in expected] + + # Call the mexist helper method in case we have not already + # checked these artifacts such that artifact_existence is + # empty. This allows us to benefit from parallelism. + # datastore.mexists() itself does not give us access to the + # derived datastore record. + log.verbose("Checking existence of %d datasets unknown to datastore", len(records)) + ref_exists = datastore._process_mexists_records( + id_to_ref, records, False, artifact_existence=artifact_existence + ) + + # Now go through the records and propagate the ones that exist. + location_factory = datastore.locationFactory + for missing, record_list in records.items(): + # Skip completely if the ref does not exist. + ref = id_to_ref[missing] + if not ref_exists[ref]: + log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref) + continue + # Check for file artifact to decide which parts of a + # disassembled composite do exist. If there is only a + # single record we don't even need to look because it can't + # be a composite and must exist. + if len(record_list) == 1: + dataset_records = record_list + else: + dataset_records = [ + record + for record in record_list + if artifact_existence[record.file_location(location_factory).uri] + ] + assert len(dataset_records) > 0, "Disassembled composite should have had some files." + + # Rely on source_records being a defaultdict. + found_records[missing].extend(dataset_records) + log.verbose("Completed scan for missing data files") + return found_records + def retrieveArtifacts( self, refs: Iterable[DatasetRef], @@ -2037,6 +2102,18 @@ def retrieveArtifacts( # Retrieve all the records in bulk indexed by ref.id. records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) + # Check for missing records. + known_ids = set(records) + log.debug("Number of datastore records found in database: %d", len(known_ids)) + requested_ids = {ref.id for ref in refs} + missing_ids = requested_ids - known_ids + + if missing_ids and not self.trustGetRequest: + raise ValueError(f"Number of datasets missing from this datastore: {len(missing_ids)}") + + missing_records = self._find_missing_records(self, refs, missing_ids) + records.update(missing_records) + # One artifact can be used by multiple DatasetRef. # e.g. DECam. artifact_to_ref_id: dict[ResourcePath, list[DatasetId]] = defaultdict(list) @@ -2605,55 +2682,10 @@ def transfer_from( len(missing_ids), len(requested_ids), ) - id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids} - - # This should be chunked in case we end up having to check - # the file store since we need some log output to show - # progress. - for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=10_000): - records = {} - for missing in missing_ids_chunk: - # Ask the source datastore where the missing artifacts - # should be. An execution butler might not know about the - # artifacts even if they are there. - expected = source_datastore._get_expected_dataset_locations_info(id_to_ref[missing]) - records[missing] = [info for _, info in expected] - - # Call the mexist helper method in case we have not already - # checked these artifacts such that artifact_existence is - # empty. This allows us to benefit from parallelism. - # datastore.mexists() itself does not give us access to the - # derived datastore record. - log.verbose("Checking existence of %d datasets unknown to datastore", len(records)) - ref_exists = source_datastore._process_mexists_records( - id_to_ref, records, False, artifact_existence=artifact_existence - ) - - # Now go through the records and propagate the ones that exist. - location_factory = source_datastore.locationFactory - for missing, record_list in records.items(): - # Skip completely if the ref does not exist. - ref = id_to_ref[missing] - if not ref_exists[ref]: - log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref) - continue - # Check for file artifact to decide which parts of a - # disassembled composite do exist. If there is only a - # single record we don't even need to look because it can't - # be a composite and must exist. - if len(record_list) == 1: - dataset_records = record_list - else: - dataset_records = [ - record - for record in record_list - if artifact_existence[record.file_location(location_factory).uri] - ] - assert len(dataset_records) > 0, "Disassembled composite should have had some files." - - # Rely on source_records being a defaultdict. - source_records[missing].extend(dataset_records) - log.verbose("Completed scan for missing data files") + found_records = self._find_missing_records( + source_datastore, refs, missing_ids, artifact_existence + ) + source_records.update(found_records) # See if we already have these records target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)