Skip to content

Commit

Permalink
Guarantee consistent ordering of batches through reduction
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeilstenedmands committed Apr 4, 2024
1 parent 68af41e commit 4320449
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
27 changes: 13 additions & 14 deletions src/xia2/Modules/SSX/data_reduction_programs.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def scale_parallel_batches(
working_directory, batches: List[ProcessingBatch], reduction_params
) -> Tuple[List[ProcessingBatch], List[float]]:
# scale multiple batches in parallel
scaled_results = []
scaled_results = [ProcessingBatch()] * len(batches)
d_mins = []
batch_template = functools.partial(
"batch{index:0{maxindexlength:d}d}".format,
Expand All @@ -597,29 +597,27 @@ def scale_parallel_batches(
with record_step("dials.scale (parallel)"), concurrent.futures.ProcessPoolExecutor(
max_workers=min(reduction_params.nproc, len(batches))
) as pool:
scale_futures: Dict[Any, str] = {
scale_futures: Dict[Any, int] = {
pool.submit(
scale_on_batches,
working_directory,
[batch],
reduction_params,
name,
): name
for name, batch in jobs.items()
): i
for i, (name, batch) in enumerate(jobs.items())
}
for future in concurrent.futures.as_completed(scale_futures):
try:
result = future.result()
name = scale_futures[future]
idx = scale_futures[future]
except Exception as e:
xia2_logger.warning(f"Unsuccessful scaling of group. Error:\n{e}")
else:
xia2_logger.info(
f"Completed scaling of data reduction batch {name.lstrip('batch')}"
)
xia2_logger.info(f"Completed scaling of data reduction batch {idx+1}")
outbatch = ProcessingBatch()
outbatch.add_filepair(FilePair(result.exptfile, result.reflfile))
scaled_results.append(outbatch)
scaled_results[idx] = outbatch
FileHandler.record_log_file(
result.logfile.name.rstrip(".log"), result.logfile
)
Expand Down Expand Up @@ -890,7 +888,7 @@ def parallel_cosym(
if not Path.is_dir(working_directory):
Path.mkdir(working_directory)

reindexed_results = []
reindexed_results = [ProcessingBatch()] * len(data_to_reindex)

with open(os.devnull, "w") as devnull:
sys.stdout = devnull # block printing from cosym
Expand All @@ -899,17 +897,18 @@ def parallel_cosym(
"dials.cosym (parallel)"
), concurrent.futures.ProcessPoolExecutor(max_workers=nproc) as pool:

cosym_futures: List[Any] = [
cosym_futures: dict[Any, int] = {
pool.submit(
individual_cosym,
working_directory,
batch,
index,
reduction_params,
)
): index
for index, batch in enumerate(data_to_reindex)
]
}
for future in concurrent.futures.as_completed(cosym_futures):
idx = cosym_futures[future]
try:
result = future.result()
except Exception as e:
Expand All @@ -921,7 +920,7 @@ def parallel_cosym(
processed_batch.add_filepair(
FilePair(result.exptfile, result.reflfile)
)
reindexed_results.append(processed_batch)
reindexed_results[idx] = processed_batch
FileHandler.record_log_file(
result.logfile.name.rstrip(".log"), result.logfile
)
Expand Down
14 changes: 7 additions & 7 deletions src/xia2/Modules/SSX/data_reduction_with_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _scale(self) -> None:
if not Path.is_dir(self._scale_wd):
Path.mkdir(self._scale_wd)

scaled_results = []
scaled_results = [FilePair(Path(), Path())] * len(self._batches_to_scale)

batch_template = functools.partial(
"scaled_batch{index:0{maxindexlength:d}d}".format,
Expand All @@ -56,25 +56,25 @@ def _scale(self) -> None:
), concurrent.futures.ProcessPoolExecutor(
max_workers=self._reduction_params.nproc
) as pool:
scale_futures: Dict[Any, str] = {
scale_futures: Dict[Any, int] = {
pool.submit(
scale_against_reference,
self._scale_wd,
batch,
self._reduction_params,
name,
): name
for name, batch in jobs.items() # .items()
): idx
for idx, (name, batch) in enumerate(jobs.items()) # .items()
}
for future in concurrent.futures.as_completed(scale_futures):
try:
result = future.result()
name = scale_futures[future]
idx = scale_futures[future]
except Exception as e:
xia2_logger.warning(f"Unsuccessful scaling of group. Error:\n{e}")
else:
xia2_logger.info(f"Completed scaling of {name}")
scaled_results.append(FilePair(result.exptfile, result.reflfile))
xia2_logger.info(f"Completed scaling of batch {idx+1}")
scaled_results[idx] = FilePair(result.exptfile, result.reflfile)
FileHandler.record_data_file(result.exptfile)
FileHandler.record_data_file(result.reflfile)
FileHandler.record_log_file(
Expand Down

0 comments on commit 4320449

Please sign in to comment.