From 229727a3cab2a50346b5c794efc57940601c5c5a Mon Sep 17 00:00:00 2001 From: lmseidler Date: Tue, 14 Jan 2025 16:05:56 +0100 Subject: [PATCH 1/7] initial work --- src/mindlessgen/generator/main.py | 309 +++++++++++++++++------ src/mindlessgen/molecules/postprocess.py | 7 +- src/mindlessgen/molecules/refinement.py | 3 + src/mindlessgen/qm/orca.py | 1 + src/mindlessgen/qm/xtb.py | 1 + 5 files changed, 238 insertions(+), 83 deletions(-) diff --git a/src/mindlessgen/generator/main.py b/src/mindlessgen/generator/main.py index 95d7c93..061156f 100644 --- a/src/mindlessgen/generator/main.py +++ b/src/mindlessgen/generator/main.py @@ -5,9 +5,13 @@ from __future__ import annotations from collections.abc import Callable +from concurrent.futures import ProcessPoolExecutor, as_completed, wait +from multiprocessing.managers import ValueProxy from pathlib import Path import multiprocessing as mp +from threading import Condition, Event import warnings +from dataclasses import dataclass from ..molecules import generate_random_molecule, Molecule from ..qm import XTB, get_xtb_path, QMMethod, ORCA, get_orca_path, GXTB, get_gxtb_path @@ -19,6 +23,12 @@ MINDLESS_MOLECULES_FILE = "mindless.molecules" +@dataclass +class Block: + num_molecules: int + ncores: int + + def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]: """ Generate a molecule. @@ -72,45 +82,146 @@ def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]: if Path(MINDLESS_MOLECULES_FILE).is_file(): if config.general.verbosity > 0: print(f"\n--- Appending to existing file '{MINDLESS_MOLECULES_FILE}'. ---") + + backup_verbosity: int | None = None + if num_cores > 1 and config.general.verbosity > 0: + backup_verbosity = config.general.verbosity # Save verbosity level for later + config.general.verbosity = 0 # Disable verbosity if parallel + exitcode = 0 optimized_molecules: list[Molecule] = [] - for molcount in range(config.general.num_molecules): - # print a decent header for each molecule iteration - if config.general.verbosity > 0: - print(f"\n{'='*80}") - print( - f"{'='*22} Generating molecule {molcount + 1:<4} of " - + f"{config.general.num_molecules:<4} {'='*24}" - ) - print(f"{'='*80}") - manager = mp.Manager() + + # Initialize parallel blocks here + blocks = setup_blocks(num_cores, config.general.num_molecules) + blocks.sort(key=lambda x: x.ncores) + + # Set up parallel blocks environment + with mp.Manager() as manager: stop_event = manager.Event() - cycles = range(config.general.max_cycles) - backup_verbosity: int | None = None - if num_cores > 1 and config.general.verbosity > 0: - backup_verbosity = ( - config.general.verbosity - ) # Save verbosity level for later - config.general.verbosity = 0 # Disable verbosity if parallel - - if config.general.verbosity == 0: - print("Cycle... ", end="", flush=True) - with mp.Pool(processes=num_cores) as pool: - results = pool.starmap( - single_molecule_generator, - [ - (config, refine_engine, postprocess_engine, cycle, stop_event) - for cycle in cycles - ], - ) - if config.general.verbosity == 0: - print("") + free_cores = manager.Value(int, num_cores) + enough_cores = manager.Condition() + + with ProcessPoolExecutor(max_workers=num_cores // 4) as executor: + # NOTE: The following creates a queue of futures which acquire the enough_cores lock successively. + # By using notify instead of notify_all in increase_cores we make sure that only the next future in + # order of submission is notified and the rest remain waiting. + tasks = [] + for block in blocks: + for _ in range(block.num_molecules): + tasks.append( + executor.submit( + single_molecule_generator, + len(tasks) + 1, + config, + refine_engine, + postprocess_engine, + block.ncores, + stop_event, + ) + ) + tasks[-1].add_done_callback( + lambda _, ncores=block.ncores: increase_cores( + ncores, free_cores, enough_cores + ) + ) + + # Collect results of all tries to create a molecule + results: list[Molecule | None] = [ + task.result() for task in as_completed(tasks) + ] # Restore verbosity level if it was changed if backup_verbosity is not None: config.general.verbosity = backup_verbosity - # Filter out None values and return the first successful molecule + for molcount, optimized_molecule in enumerate(results): + if optimized_molecule is None: + # TODO: molcount might not align with the number of the molecule that actually failed, look into this + warnings.warn( + "Molecule generation including optimization (and postprocessing) " + + f"failed for all cycles for molecule {molcount + 1}." + ) + exitcode = 1 + continue + + # if config.general.verbosity > 0: + # print(f"Optimized mindless molecule found in {cycles_needed} cycles.") + # print(optimized_molecule) + + if config.general.write_xyz: + optimized_molecule.write_xyz_to_file() + if config.general.verbosity > 0: + print( + f"Written molecule file 'mlm_{optimized_molecule.name}.xyz'.\n" + ) + with open("mindless.molecules", "a", encoding="utf8") as f: + f.write(f"mlm_{optimized_molecule.name}\n") + + optimized_molecules.append(optimized_molecule) + + return optimized_molecules, exitcode + + +def single_molecule_generator( + molcount: int, + config: ConfigManager, + refine_engine: QMMethod, + postprocess_engine: QMMethod | None, + ncores: int, + free_cores: ValueProxy[int], + enough_cores: Condition, +) -> Molecule | None: + """ + Generate a single molecule (from start to finish). + """ + + # Wait for enough cores + decrease_cores(ncores, free_cores, enough_cores) + + # print a decent header for each molecule iteration + # if config.general.verbosity > 0: + # print(f"\n{'='*80}") + # print( + # f"{'='*22} Generating molecule {molcount + 1:<4} of " + # + f"{config.general.num_molecules:<4} {'='*24}" + # ) + # print(f"{'='*80}") + + with mp.Manager() as manager: + stop_event = manager.Event() + free_cores = manager.Value(int, ncores) + enough_cores = manager.Condition() + + # Launch worker processes to find molecule + # if config.general.verbosity == 0: + # print("Cycle... ", end="", flush=True) + with ProcessPoolExecutor(ncores // 4) as executor: + cycles = range(config.general.max_cycles) + tasks = [ + executor.submit( + single_molecule_step, + config, + refine_engine, + postprocess_engine, + cycle, + stop_event, + free_cores, + enough_cores, + ) + for cycle in cycles + ] + + # Finally, add a future to set the stop_event if all jobs are completed + executor.submit(lambda: stop_event.set() if wait(tasks) else None) + + stop_event.wait() + # TODO: kill all workers on receiving stop signal instead of waiting + + results = [task.result() for task in as_completed(tasks)] + + # if config.general.verbosity == 0: + # print("") + optimized_molecule: Molecule | None = None for i, result in enumerate(results): if result is not None: @@ -118,45 +229,33 @@ def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]: optimized_molecule = result break - if optimized_molecule is None: - warnings.warn( - "Molecule generation including optimization (and postprocessing) " - + f"failed for all cycles for molecule {molcount + 1}." - ) - exitcode = 1 - continue - if config.general.verbosity > 0: - print(f"Optimized mindless molecule found in {cycles_needed} cycles.") - print(optimized_molecule) - if config.general.write_xyz: - optimized_molecule.write_xyz_to_file() - if config.general.verbosity > 0: - print(f"Written molecule file 'mlm_{optimized_molecule.name}.xyz'.\n") - with open("mindless.molecules", "a", encoding="utf8") as f: - f.write(f"mlm_{optimized_molecule.name}\n") - optimized_molecules.append(optimized_molecule) + # if config.general.verbosity > 0: + # print(f"Optimized mindless molecule found in {cycles_needed} cycles.") + # print(optimized_molecule) - return optimized_molecules, exitcode + return optimized_molecule -def single_molecule_generator( +def single_molecule_step( config: ConfigManager, refine_engine: QMMethod, postprocess_engine: QMMethod | None, cycle: int, - stop_event, + stop_event: Event, + free_cores: ValueProxy[int], + enough_cores: Condition, ) -> Molecule | None: - """ - Generate a single molecule. - """ + """Execute one step in a single molecule generation""" + if stop_event.is_set(): return None # Exit early if a molecule has already been found - if config.general.verbosity == 0: - # print the cycle in one line, not starting a new line - print("✔", end="", flush=True) - elif config.general.verbosity > 0: - print(f"Cycle {cycle + 1}:") + # if config.general.verbosity == 0: + # # print the cycle in one line, not starting a new line + # print("✔", end="", flush=True) + # elif config.general.verbosity > 0: + # print(f"Cycle {cycle + 1}:") + # _____ _ # / ____| | | # | | __ ___ _ __ ___ _ __ __ _| |_ ___ _ __ @@ -171,17 +270,17 @@ def single_molecule_generator( except ( SystemExit ) as e: # debug functionality: raise SystemExit to stop the whole execution - if config.general.verbosity > 0: - print(f"Generation aborted for cycle {cycle + 1}.") - if config.general.verbosity > 1: - print(e) + # if config.general.verbosity > 0: + # print(f"Generation aborted for cycle {cycle + 1}.") + # if config.general.verbosity > 1: + # print(e) stop_event.set() return None except RuntimeError as e: - if config.general.verbosity > 0: - print(f"Generation failed for cycle {cycle + 1}.") - if config.general.verbosity > 1: - print(e) + # if config.general.verbosity > 0: + # print(f"Generation failed for cycle {cycle + 1}.") + # if config.general.verbosity > 1: + # print(e) return None try: @@ -194,17 +293,19 @@ def single_molecule_generator( # | | # |_| optimized_molecule = iterative_optimization( - mol=mol, - engine=refine_engine, - config_generate=config.generate, - config_refine=config.refine, + mol, + refine_engine, + config.generate, + config.refine, + free_cores, + enough_cores, verbosity=config.general.verbosity, ) except RuntimeError as e: - if config.general.verbosity > 0: - print(f"Refinement failed for cycle {cycle + 1}.") - if config.general.verbosity > 1 or config.refine.debug: - print(e) + # if config.general.verbosity > 0: + # print(f"Refinement failed for cycle {cycle + 1}.") + # if config.general.verbosity > 1 or config.refine.debug: + # print(e) return None finally: if config.refine.debug: @@ -216,19 +317,21 @@ def single_molecule_generator( optimized_molecule, postprocess_engine, # type: ignore config.postprocess, - config.general.verbosity, + free_cores, + enough_cores, + verbosity=config.general.verbosity, ) except RuntimeError as e: - if config.general.verbosity > 0: - print(f"Postprocessing failed for cycle {cycle + 1}.") - if config.general.verbosity > 1 or config.postprocess.debug: - print(e) + # if config.general.verbosity > 0: + # print(f"Postprocessing failed for cycle {cycle + 1}.") + # if config.general.verbosity > 1 or config.postprocess.debug: + # print(e) return None finally: if config.postprocess.debug: stop_event.set() # Stop further runs if debugging of this step is enabled - if config.general.verbosity > 1: - print("Postprocessing successful.") + # if config.general.verbosity > 1: + # print("Postprocessing successful.") if not stop_event.is_set(): stop_event.set() # Signal other processes to stop @@ -300,3 +403,45 @@ def setup_engines( return GXTB(path) else: raise NotImplementedError("Engine not implemented.") + + +def setup_blocks(ncores: int, num_molecules: int) -> list[Block]: + blocks: list[Block] = [] + + # Maximum and minimum number of parallel processes possible + maxcores = ncores + mincores = 4 # TODO: 4 as a placeholder for an actual setting + maxprocs = ncores // mincores + minprocs = ncores // maxcores + + # Distribute number of molecules among blocks + # First (if possible) create the maximum number of parallel blocks (maxprocs) and distribute as many molecules as possible + molecules_left = num_molecules + if molecules_left >= maxprocs: + p = maxprocs + molecules_per_block = molecules_left // p + for _ in range(p): + blocks.append(Block(molecules_per_block, ncores // p)) + molecules_left -= molecules_per_block * p + + # While there are more than minprocs (1) molecules left find the optimal number of parallel blocks + # Again distribute as many molecules per block as possible + while molecules_left >= minprocs: + p = max( + [ + j + for j in range(minprocs, maxprocs) + if ncores % j == 0 and j <= molecules_left + ] + ) + molecules_per_block = molecules_left // p + for _ in range(p): + blocks.append(Block(molecules_per_block, ncores // p)) + molecules_left -= molecules_per_block * p + + # NOTE: using minprocs = 1 this is probably never true + if molecules_left > 0: + blocks.append(Block(molecules_left, maxcores)) + molecules_left -= molecules_left + + return blocks diff --git a/src/mindlessgen/molecules/postprocess.py b/src/mindlessgen/molecules/postprocess.py index 17f6ae4..2a2c1b5 100644 --- a/src/mindlessgen/molecules/postprocess.py +++ b/src/mindlessgen/molecules/postprocess.py @@ -8,7 +8,12 @@ def postprocess_mol( - mol: Molecule, engine: QMMethod, config: PostProcessConfig, verbosity: int = 1 + mol: Molecule, + engine: QMMethod, + config: PostProcessConfig, + free_cores, + enough_cores, + verbosity: int = 1, ) -> Molecule: """ Postprocess the generated molecule. diff --git a/src/mindlessgen/molecules/refinement.py b/src/mindlessgen/molecules/refinement.py index a520ebd..aabf940 100644 --- a/src/mindlessgen/molecules/refinement.py +++ b/src/mindlessgen/molecules/refinement.py @@ -30,6 +30,8 @@ def iterative_optimization( engine: QMMethod, config_generate: GenerateConfig, config_refine: RefineConfig, + free_cores, + enough_cores, verbosity: int = 1, ) -> Molecule: """ @@ -44,6 +46,7 @@ def iterative_optimization( for cycle in range(config_refine.max_frag_cycles): # Optimize the current molecule try: + # TODO: run single points first, start optimization if scf converges rev_mol = engine.optimize(rev_mol, None, verbosity) except RuntimeError as e: raise RuntimeError( diff --git a/src/mindlessgen/qm/orca.py b/src/mindlessgen/qm/orca.py index 3225335..375f604 100644 --- a/src/mindlessgen/qm/orca.py +++ b/src/mindlessgen/qm/orca.py @@ -175,6 +175,7 @@ def _gen_input( orca_input += ( f"%scf\n\tMaxIter {self.cfg.scf_cycles}\n\tConvergence Medium\nend\n" ) + # TODO: variable number of threads orca_input += "%pal nprocs 1 end\n\n" orca_input += f"* xyzfile {molecule.charge} {molecule.uhf + 1} {xyzfile}\n" return orca_input diff --git a/src/mindlessgen/qm/xtb.py b/src/mindlessgen/qm/xtb.py index 3fe4435..f45d037 100644 --- a/src/mindlessgen/qm/xtb.py +++ b/src/mindlessgen/qm/xtb.py @@ -204,6 +204,7 @@ def _run(self, temp_path: Path, arguments: list[str]) -> tuple[str, str, int]: tuple[str, str, int]: The output of the xtb calculation (stdout and stderr) and the return code """ + # TODO: variable number of threads non_parallel = ["-P", "1"] arguments += non_parallel try: From 0069a6e6673012e89435ce177e41839f65dd6670 Mon Sep 17 00:00:00 2001 From: lmseidler Date: Wed, 15 Jan 2025 12:48:10 +0100 Subject: [PATCH 2/7] new parallelization preliminary done --- src/mindlessgen/generator/main.py | 147 +++++++++++------------ src/mindlessgen/molecules/postprocess.py | 12 +- src/mindlessgen/molecules/refinement.py | 21 +++- src/mindlessgen/prog/__init__.py | 6 +- src/mindlessgen/prog/config.py | 2 + src/mindlessgen/prog/parallel.py | 38 ++++++ 6 files changed, 139 insertions(+), 87 deletions(-) create mode 100644 src/mindlessgen/prog/parallel.py diff --git a/src/mindlessgen/generator/main.py b/src/mindlessgen/generator/main.py index 061156f..b0ca7e1 100644 --- a/src/mindlessgen/generator/main.py +++ b/src/mindlessgen/generator/main.py @@ -5,7 +5,7 @@ from __future__ import annotations from collections.abc import Callable -from concurrent.futures import ProcessPoolExecutor, as_completed, wait +from concurrent.futures import Future, ProcessPoolExecutor, as_completed, wait from multiprocessing.managers import ValueProxy from pathlib import Path import multiprocessing as mp @@ -16,7 +16,8 @@ from ..molecules import generate_random_molecule, Molecule from ..qm import XTB, get_xtb_path, QMMethod, ORCA, get_orca_path, GXTB, get_gxtb_path from ..molecules import iterative_optimization, postprocess_mol -from ..prog import ConfigManager +from ..prog import ConfigManager, ParallelManager +from ..prog.config import MINCORES_PLACEHOLDER from ..__version__ import __version__ @@ -96,39 +97,29 @@ def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]: blocks.sort(key=lambda x: x.ncores) # Set up parallel blocks environment - with mp.Manager() as manager: - stop_event = manager.Event() - free_cores = manager.Value(int, num_cores) - enough_cores = manager.Condition() - - with ProcessPoolExecutor(max_workers=num_cores // 4) as executor: - # NOTE: The following creates a queue of futures which acquire the enough_cores lock successively. - # By using notify instead of notify_all in increase_cores we make sure that only the next future in - # order of submission is notified and the rest remain waiting. - tasks = [] - for block in blocks: - for _ in range(block.num_molecules): - tasks.append( - executor.submit( - single_molecule_generator, - len(tasks) + 1, - config, - refine_engine, - postprocess_engine, - block.ncores, - stop_event, - ) - ) - tasks[-1].add_done_callback( - lambda _, ncores=block.ncores: increase_cores( - ncores, free_cores, enough_cores - ) + with ParallelManager(num_cores // MINCORES_PLACEHOLDER, num_cores) as parallel: + # The following creates a queue of futures which occupy a certain number of cores each + # as defined by each block + # Each future represents the generation of one molecule + # NOTE: proceeding this way assures that each molecule gets a static number of cores + # a dynamic setting would also be thinkable and straightforward to implement + tasks: list[Future[Molecule | None]] = [] + for block in blocks: + for _ in range(block.num_molecules): + tasks.append( + parallel.executor.submit( + single_molecule_generator, + len(tasks) + 1, + config, + parallel, + refine_engine, + postprocess_engine, + block.ncores, ) + ) - # Collect results of all tries to create a molecule - results: list[Molecule | None] = [ - task.result() for task in as_completed(tasks) - ] + # Collect results of all tries to create a molecule + results: list[Molecule | None] = [task.result() for task in as_completed(tasks)] # Restore verbosity level if it was changed if backup_verbosity is not None: @@ -165,18 +156,17 @@ def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]: def single_molecule_generator( molcount: int, config: ConfigManager, + parallel: ParallelManager, refine_engine: QMMethod, postprocess_engine: QMMethod | None, ncores: int, - free_cores: ValueProxy[int], - enough_cores: Condition, ) -> Molecule | None: """ Generate a single molecule (from start to finish). """ # Wait for enough cores - decrease_cores(ncores, free_cores, enough_cores) + parallel.occupy_cores(ncores) # print a decent header for each molecule iteration # if config.general.verbosity > 0: @@ -187,66 +177,67 @@ def single_molecule_generator( # ) # print(f"{'='*80}") - with mp.Manager() as manager: - stop_event = manager.Event() - free_cores = manager.Value(int, ncores) - enough_cores = manager.Condition() + with ParallelManager(ncores, ncores) as parallel_local: + stop_event = parallel_local.manager.Event() # Launch worker processes to find molecule # if config.general.verbosity == 0: # print("Cycle... ", end="", flush=True) - with ProcessPoolExecutor(ncores // 4) as executor: - cycles = range(config.general.max_cycles) - tasks = [ - executor.submit( - single_molecule_step, - config, - refine_engine, - postprocess_engine, - cycle, - stop_event, - free_cores, - enough_cores, - ) - for cycle in cycles - ] + cycles = range(config.general.max_cycles) + tasks = [ + parallel_local.executor.submit( + single_molecule_step, + config, + parallel_local, + refine_engine, + postprocess_engine, + cycle, + stop_event, + ) + for cycle in cycles + ] - # Finally, add a future to set the stop_event if all jobs are completed - executor.submit(lambda: stop_event.set() if wait(tasks) else None) + # Finally, add a future to set the stop_event if all jobs are completed + parallel_local.executor.submit( + lambda: stop_event.set() if wait(tasks) else None + ) - stop_event.wait() - # TODO: kill all workers on receiving stop signal instead of waiting + stop_event.wait() + # TODO: kill all workers and cancel futures on receiving stop signal instead of waiting - results = [task.result() for task in as_completed(tasks)] + results = [task.result() for task in as_completed(tasks)] - # if config.general.verbosity == 0: - # print("") + # if config.general.verbosity == 0: + # print("") - optimized_molecule: Molecule | None = None - for i, result in enumerate(results): - if result is not None: - cycles_needed = i + 1 - optimized_molecule = result - break + optimized_molecule: Molecule | None = None + for i, result in enumerate(results): + if result is not None: + cycles_needed = i + 1 + optimized_molecule = result + break - # if config.general.verbosity > 0: - # print(f"Optimized mindless molecule found in {cycles_needed} cycles.") - # print(optimized_molecule) + # if config.general.verbosity > 0: + # print(f"Optimized mindless molecule found in {cycles_needed} cycles.") + # print(optimized_molecule) - return optimized_molecule + # Free up the cores + parallel.free_cores(ncores) + + return optimized_molecule def single_molecule_step( config: ConfigManager, + parallel: ParallelManager, refine_engine: QMMethod, postprocess_engine: QMMethod | None, cycle: int, stop_event: Event, - free_cores: ValueProxy[int], - enough_cores: Condition, ) -> Molecule | None: """Execute one step in a single molecule generation""" + # TODO: this might not be necessary anymore but could still be included as fallback if stop_event.is_set(): return None # Exit early if a molecule has already been found @@ -297,8 +288,7 @@ def single_molecule_step( refine_engine, config.generate, config.refine, - free_cores, - enough_cores, + parallel, verbosity=config.general.verbosity, ) except RuntimeError as e: @@ -317,8 +307,7 @@ def single_molecule_step( optimized_molecule, postprocess_engine, # type: ignore config.postprocess, - free_cores, - enough_cores, + parallel, verbosity=config.general.verbosity, ) except RuntimeError as e: @@ -410,7 +399,7 @@ def setup_blocks(ncores: int, num_molecules: int) -> list[Block]: # Maximum and minimum number of parallel processes possible maxcores = ncores - mincores = 4 # TODO: 4 as a placeholder for an actual setting + mincores = MINCORES_PLACEHOLDER maxprocs = ncores // mincores minprocs = ncores // maxcores diff --git a/src/mindlessgen/molecules/postprocess.py b/src/mindlessgen/molecules/postprocess.py index 2a2c1b5..6459857 100644 --- a/src/mindlessgen/molecules/postprocess.py +++ b/src/mindlessgen/molecules/postprocess.py @@ -4,15 +4,15 @@ from .molecule import Molecule from ..qm import QMMethod -from ..prog import PostProcessConfig +from ..prog import PostProcessConfig, ParallelManager +from ..prog.config import MINCORES_PLACEHOLDER def postprocess_mol( mol: Molecule, engine: QMMethod, config: PostProcessConfig, - free_cores, - enough_cores, + parallel: ParallelManager, verbosity: int = 1, ) -> Molecule: """ @@ -31,17 +31,23 @@ def postprocess_mol( print("Postprocessing molecule...") if config.optimize: try: + parallel.occupy_cores(MINCORES_PLACEHOLDER) postprocmol = engine.optimize( mol, max_cycles=config.opt_cycles, verbosity=verbosity ) except RuntimeError as e: raise RuntimeError("Optimization in postprocessing failed.") from e + finally: + parallel.free_cores(MINCORES_PLACEHOLDER) else: try: + parallel.occupy_cores(MINCORES_PLACEHOLDER) engine.singlepoint(mol, verbosity=verbosity) postprocmol = mol except RuntimeError as e: raise RuntimeError( "Single point calculation in postprocessing failed." ) from e + finally: + parallel.free_cores(MINCORES_PLACEHOLDER) return postprocmol diff --git a/src/mindlessgen/molecules/refinement.py b/src/mindlessgen/molecules/refinement.py index aabf940..cae894b 100644 --- a/src/mindlessgen/molecules/refinement.py +++ b/src/mindlessgen/molecules/refinement.py @@ -7,7 +7,7 @@ import networkx as nx # type: ignore import numpy as np from ..qm.base import QMMethod -from ..prog import GenerateConfig, RefineConfig +from ..prog import GenerateConfig, RefineConfig, ParallelManager from .molecule import Molecule from .miscellaneous import ( set_random_charge, @@ -17,6 +17,7 @@ get_lanthanides, get_actinides, ) +from ..prog.config import MINCORES_PLACEHOLDER COV_RADII = "pyykko" BOHR2AA = ( @@ -30,8 +31,7 @@ def iterative_optimization( engine: QMMethod, config_generate: GenerateConfig, config_refine: RefineConfig, - free_cores, - enough_cores, + parallel: ParallelManager, verbosity: int = 1, ) -> Molecule: """ @@ -44,14 +44,27 @@ def iterative_optimization( verbosity = 3 for cycle in range(config_refine.max_frag_cycles): + # Run single points first, start optimization if scf converges + try: + parallel.occupy_cores(1) + _ = engine.singlepoint(rev_mol, verbosity) + except RuntimeError as e: + raise RuntimeError( + f"Single-point calculation failed at fragmentation cycle {cycle}: {e}" + ) from e + finally: + parallel.free_cores(1) + # Optimize the current molecule try: - # TODO: run single points first, start optimization if scf converges + parallel.occupy_cores(MINCORES_PLACEHOLDER) rev_mol = engine.optimize(rev_mol, None, verbosity) except RuntimeError as e: raise RuntimeError( f"Optimization failed at fragmentation cycle {cycle}: {e}" ) from e + finally: + parallel.free_cores(MINCORES_PLACEHOLDER) if verbosity > 2: # Print coordinates of optimized molecule diff --git a/src/mindlessgen/prog/__init__.py b/src/mindlessgen/prog/__init__.py index 8dee820..b977cb3 100644 --- a/src/mindlessgen/prog/__init__.py +++ b/src/mindlessgen/prog/__init__.py @@ -1,5 +1,6 @@ """ -This module contains the classes and functions for all configuration-related tasks. +This module contains the classes and functions for all configuration-related tasks, +as well as utilities concerned with parallelization. """ from .config import ( @@ -12,6 +13,8 @@ PostProcessConfig, ) +from .parallel import ParallelManager + __all__ = [ "ConfigManager", "GeneralConfig", @@ -20,4 +23,5 @@ "GenerateConfig", "RefineConfig", "PostProcessConfig", + "ParallelManager", ] diff --git a/src/mindlessgen/prog/config.py b/src/mindlessgen/prog/config.py index 22713f6..f972faa 100644 --- a/src/mindlessgen/prog/config.py +++ b/src/mindlessgen/prog/config.py @@ -16,6 +16,8 @@ from ..molecules import PSE_NUMBERS +MINCORES_PLACEHOLDER = 4 + # abstract base class for configuration class BaseConfig(ABC): diff --git a/src/mindlessgen/prog/parallel.py b/src/mindlessgen/prog/parallel.py new file mode 100644 index 0000000..d8c231f --- /dev/null +++ b/src/mindlessgen/prog/parallel.py @@ -0,0 +1,38 @@ +from concurrent.futures import ProcessPoolExecutor +from multiprocessing.managers import SyncManager +from multiprocessing import Manager +from typing import Any + + +class ParallelManager: + def __init__(self, max_workers: int, ncores: int): + self.executor: ProcessPoolExecutor = ProcessPoolExecutor( + max_workers=max_workers + ) + self.manager: SyncManager = Manager() + + self.__free_cores = self.manager.Value(int, ncores) + self.__enough_cores = self.manager.Condition() + + def __enter__(self): + return self + + def __exit__(self, exc_type: Exception, exc_val: Any, exc_tb: Any): + self.executor.shutdown(False, cancel_futures=True) + self.manager.shutdown() + + return False # Don't supress any exceptions + + def shutdown(self): + self.executor.shutdown(False, cancel_futures=True) + self.manager.shutdown() + + def occupy_cores(self, ncores: int): + with self.__enough_cores: + self.__enough_cores.wait_for(lambda: self.__free_cores.value >= ncores) + self.__free_cores.value -= ncores + + def free_cores(self, ncores: int): + with self.__enough_cores: + self.__free_cores.value += ncores + self.__enough_cores.notify() # TODO: try this with notify_all instead From 0534777a7402df4940b2e9ebb7b48db18dc2853f Mon Sep 17 00:00:00 2001 From: lmseidler Date: Wed, 15 Jan 2025 13:53:16 +0100 Subject: [PATCH 3/7] small fix in tests --- src/mindlessgen/generator/main.py | 39 +++++++++++++------------- test/test_molecules/test_refinement.py | 18 +++++++----- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/mindlessgen/generator/main.py b/src/mindlessgen/generator/main.py index b0ca7e1..7c91d4e 100644 --- a/src/mindlessgen/generator/main.py +++ b/src/mindlessgen/generator/main.py @@ -5,11 +5,10 @@ from __future__ import annotations from collections.abc import Callable -from concurrent.futures import Future, ProcessPoolExecutor, as_completed, wait -from multiprocessing.managers import ValueProxy +from concurrent.futures import Future, as_completed, wait from pathlib import Path import multiprocessing as mp -from threading import Condition, Event +from threading import Event import warnings from dataclasses import dataclass @@ -30,7 +29,7 @@ class Block: ncores: int -def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]: +def generator(config: ConfigManager) -> tuple[list[Molecule], int]: """ Generate a molecule. """ @@ -49,7 +48,7 @@ def generator(config: ConfigManager) -> tuple[list[Molecule] | None, int]: if config.general.print_config: print(config) - return None, 0 + return [], 0 # Import and set up required engines refine_engine: QMMethod = setup_engines( @@ -184,18 +183,19 @@ def single_molecule_generator( # if config.general.verbosity == 0: # print("Cycle... ", end="", flush=True) cycles = range(config.general.max_cycles) - tasks = [ - parallel_local.executor.submit( - single_molecule_step, - config, - parallel_local, - refine_engine, - postprocess_engine, - cycle, - stop_event, + tasks: list[Future[Molecule | None]] = [] + for cycle in cycles: + tasks.append( + parallel_local.executor.submit( + single_molecule_step, + config, + parallel_local, + refine_engine, + postprocess_engine, + cycle, + stop_event, + ) ) - for cycle in cycles - ] # Finally, add a future to set the stop_event if all jobs are completed parallel_local.executor.submit( @@ -203,7 +203,6 @@ def single_molecule_generator( ) stop_event.wait() - # TODO: kill all workers and cancel futures on receiving stop signal instead of waiting results = [task.result() for task in as_completed(tasks)] @@ -237,7 +236,7 @@ def single_molecule_step( ) -> Molecule | None: """Execute one step in a single molecule generation""" - # TODO: this might not be necessary anymore but could still be included as fallback + # NOTE: this might not be necessary anymore but could still be included as fallback if stop_event.is_set(): return None # Exit early if a molecule has already been found @@ -400,8 +399,8 @@ def setup_blocks(ncores: int, num_molecules: int) -> list[Block]: # Maximum and minimum number of parallel processes possible maxcores = ncores mincores = MINCORES_PLACEHOLDER - maxprocs = ncores // mincores - minprocs = ncores // maxcores + maxprocs = max(1, ncores // mincores) + minprocs = max(1, ncores // maxcores) # Distribute number of molecules among blocks # First (if possible) create the maximum number of parallel blocks (maxprocs) and distribute as many molecules as possible diff --git a/test/test_molecules/test_refinement.py b/test/test_molecules/test_refinement.py index 4011e3c..dfbe319 100644 --- a/test/test_molecules/test_refinement.py +++ b/test/test_molecules/test_refinement.py @@ -11,6 +11,8 @@ from mindlessgen.molecules import detect_fragments # type: ignore from mindlessgen.molecules import Molecule # type: ignore from mindlessgen.molecules import iterative_optimization # type: ignore +from mindlessgen.prog.config import MINCORES_PLACEHOLDER +from mindlessgen.prog.parallel import ParallelManager from mindlessgen.qm import XTB, get_xtb_path # type: ignore TESTSDIR = Path(__file__).resolve().parents[1] @@ -142,13 +144,15 @@ def test_iterative_optimization(mol_C13H14: Molecule, mol_C7H8: Molecule) -> Non else: raise NotImplementedError("Engine not implemented.") mol = mol_C13H14 - mol_opt = iterative_optimization( - mol, - engine=engine, - config_generate=config.generate, - config_refine=config.refine, - verbosity=2, - ) + with ParallelManager(1, MINCORES_PLACEHOLDER) as parallel: + mol_opt = iterative_optimization( + mol, + engine, + config.generate, + config.refine, + parallel, + verbosity=2, + ) mol_ref = mol_C7H8 # assert number of atoms in mol_opt is equal to number of atoms in mol_ref From ef08fe5a934c538c994e82986057a75719c5386b Mon Sep 17 00:00:00 2001 From: lmseidler Date: Wed, 15 Jan 2025 14:44:17 +0100 Subject: [PATCH 4/7] commented unnecessary code --- src/mindlessgen/generator/main.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/mindlessgen/generator/main.py b/src/mindlessgen/generator/main.py index 7c91d4e..286010e 100644 --- a/src/mindlessgen/generator/main.py +++ b/src/mindlessgen/generator/main.py @@ -198,11 +198,11 @@ def single_molecule_generator( ) # Finally, add a future to set the stop_event if all jobs are completed - parallel_local.executor.submit( - lambda: stop_event.set() if wait(tasks) else None - ) - - stop_event.wait() + # parallel_local.executor.submit( + # lambda: stop_event.set() if wait(tasks) else None + # ) + # + # stop_event.wait() results = [task.result() for task in as_completed(tasks)] From 872fd2ec9cf254806194b82559affc8b99f9eb37 Mon Sep 17 00:00:00 2001 From: lmseidler Date: Wed, 15 Jan 2025 15:38:20 +0100 Subject: [PATCH 5/7] updated construction sites --- src/mindlessgen/generator/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mindlessgen/generator/main.py b/src/mindlessgen/generator/main.py index 286010e..184b49f 100644 --- a/src/mindlessgen/generator/main.py +++ b/src/mindlessgen/generator/main.py @@ -105,6 +105,7 @@ def generator(config: ConfigManager) -> tuple[list[Molecule], int]: tasks: list[Future[Molecule | None]] = [] for block in blocks: for _ in range(block.num_molecules): + # TODO: remove objects from future call that cannot be pickled (basically everything that doesn't consist only of primitives) tasks.append( parallel.executor.submit( single_molecule_generator, @@ -185,6 +186,7 @@ def single_molecule_generator( cycles = range(config.general.max_cycles) tasks: list[Future[Molecule | None]] = [] for cycle in cycles: + # TODO: remove objects from future call that cannot be pickled (basically everything that doesn't consist only of primitives) tasks.append( parallel_local.executor.submit( single_molecule_step, @@ -236,7 +238,6 @@ def single_molecule_step( ) -> Molecule | None: """Execute one step in a single molecule generation""" - # NOTE: this might not be necessary anymore but could still be included as fallback if stop_event.is_set(): return None # Exit early if a molecule has already been found From e2b0a51f2c9878731d3b37d52ba588ef800fc543 Mon Sep 17 00:00:00 2001 From: lmseidler Date: Thu, 16 Jan 2025 11:31:23 +0100 Subject: [PATCH 6/7] hopefully fixed parallelization --- src/mindlessgen/generator/main.py | 99 ++++++++++++------------ src/mindlessgen/molecules/postprocess.py | 20 ++--- src/mindlessgen/molecules/refinement.py | 17 ++-- src/mindlessgen/prog/__init__.py | 5 +- src/mindlessgen/prog/parallel.py | 54 ++++++------- 5 files changed, 92 insertions(+), 103 deletions(-) diff --git a/src/mindlessgen/generator/main.py b/src/mindlessgen/generator/main.py index 184b49f..d724e09 100644 --- a/src/mindlessgen/generator/main.py +++ b/src/mindlessgen/generator/main.py @@ -12,10 +12,11 @@ import warnings from dataclasses import dataclass + from ..molecules import generate_random_molecule, Molecule from ..qm import XTB, get_xtb_path, QMMethod, ORCA, get_orca_path, GXTB, get_gxtb_path from ..molecules import iterative_optimization, postprocess_mol -from ..prog import ConfigManager, ParallelManager +from ..prog import ConfigManager, setup_managers, ResourceMonitor from ..prog.config import MINCORES_PLACEHOLDER from ..__version__ import __version__ @@ -96,7 +97,11 @@ def generator(config: ConfigManager) -> tuple[list[Molecule], int]: blocks.sort(key=lambda x: x.ncores) # Set up parallel blocks environment - with ParallelManager(num_cores // MINCORES_PLACEHOLDER, num_cores) as parallel: + with setup_managers(num_cores // MINCORES_PLACEHOLDER, num_cores) as ( + executor, + _, + resources, + ): # The following creates a queue of futures which occupy a certain number of cores each # as defined by each block # Each future represents the generation of one molecule @@ -107,11 +112,11 @@ def generator(config: ConfigManager) -> tuple[list[Molecule], int]: for _ in range(block.num_molecules): # TODO: remove objects from future call that cannot be pickled (basically everything that doesn't consist only of primitives) tasks.append( - parallel.executor.submit( + executor.submit( single_molecule_generator, len(tasks) + 1, config, - parallel, + resources, refine_engine, postprocess_engine, block.ncores, @@ -156,7 +161,7 @@ def generator(config: ConfigManager) -> tuple[list[Molecule], int]: def single_molecule_generator( molcount: int, config: ConfigManager, - parallel: ParallelManager, + resources: ResourceMonitor, refine_engine: QMMethod, postprocess_engine: QMMethod | None, ncores: int, @@ -165,48 +170,45 @@ def single_molecule_generator( Generate a single molecule (from start to finish). """ - # Wait for enough cores - parallel.occupy_cores(ncores) - - # print a decent header for each molecule iteration - # if config.general.verbosity > 0: - # print(f"\n{'='*80}") - # print( - # f"{'='*22} Generating molecule {molcount + 1:<4} of " - # + f"{config.general.num_molecules:<4} {'='*24}" - # ) - # print(f"{'='*80}") - - with ParallelManager(ncores, ncores) as parallel_local: - stop_event = parallel_local.manager.Event() - - # Launch worker processes to find molecule - # if config.general.verbosity == 0: - # print("Cycle... ", end="", flush=True) - cycles = range(config.general.max_cycles) - tasks: list[Future[Molecule | None]] = [] - for cycle in cycles: - # TODO: remove objects from future call that cannot be pickled (basically everything that doesn't consist only of primitives) - tasks.append( - parallel_local.executor.submit( - single_molecule_step, - config, - parallel_local, - refine_engine, - postprocess_engine, - cycle, - stop_event, + # Wait for enough cores (cores freed automatically upon leaving managed context) + with resources.occupy_cores(ncores): + # print a decent header for each molecule iteration + # if config.general.verbosity > 0: + # print(f"\n{'='*80}") + # print( + # f"{'='*22} Generating molecule {molcount + 1:<4} of " + # + f"{config.general.num_molecules:<4} {'='*24}" + # ) + # print(f"{'='*80}") + + with setup_managers(ncores, ncores) as (executor, manager, resources_local): + stop_event = manager.Event() + # Launch worker processes to find molecule + # if config.general.verbosity == 0: + # print("Cycle... ", end="", flush=True) + cycles = range(config.general.max_cycles) + tasks: list[Future[Molecule | None]] = [] + for cycle in cycles: + tasks.append( + executor.submit( + single_molecule_step, + config, + resources_local, + refine_engine, + postprocess_engine, + cycle, + stop_event, + ) ) - ) - # Finally, add a future to set the stop_event if all jobs are completed - # parallel_local.executor.submit( - # lambda: stop_event.set() if wait(tasks) else None - # ) - # - # stop_event.wait() + # Finally, add a future to set the stop_event if all jobs are completed + # parallel_local.executor.submit( + # lambda: stop_event.set() if wait(tasks) else None + # ) + # + # stop_event.wait() - results = [task.result() for task in as_completed(tasks)] + results = [task.result() for task in as_completed(tasks)] # if config.general.verbosity == 0: # print("") @@ -222,15 +224,12 @@ def single_molecule_generator( # print(f"Optimized mindless molecule found in {cycles_needed} cycles.") # print(optimized_molecule) - # Free up the cores - parallel.free_cores(ncores) - return optimized_molecule def single_molecule_step( config: ConfigManager, - parallel: ParallelManager, + resources_local: ResourceMonitor, refine_engine: QMMethod, postprocess_engine: QMMethod | None, cycle: int, @@ -288,7 +287,7 @@ def single_molecule_step( refine_engine, config.generate, config.refine, - parallel, + resources_local, verbosity=config.general.verbosity, ) except RuntimeError as e: @@ -307,7 +306,7 @@ def single_molecule_step( optimized_molecule, postprocess_engine, # type: ignore config.postprocess, - parallel, + resources_local, verbosity=config.general.verbosity, ) except RuntimeError as e: diff --git a/src/mindlessgen/molecules/postprocess.py b/src/mindlessgen/molecules/postprocess.py index 6459857..6984c34 100644 --- a/src/mindlessgen/molecules/postprocess.py +++ b/src/mindlessgen/molecules/postprocess.py @@ -4,7 +4,7 @@ from .molecule import Molecule from ..qm import QMMethod -from ..prog import PostProcessConfig, ParallelManager +from ..prog import PostProcessConfig, ResourceMonitor from ..prog.config import MINCORES_PLACEHOLDER @@ -12,7 +12,7 @@ def postprocess_mol( mol: Molecule, engine: QMMethod, config: PostProcessConfig, - parallel: ParallelManager, + resources_local: ResourceMonitor, verbosity: int = 1, ) -> Molecule: """ @@ -31,23 +31,19 @@ def postprocess_mol( print("Postprocessing molecule...") if config.optimize: try: - parallel.occupy_cores(MINCORES_PLACEHOLDER) - postprocmol = engine.optimize( - mol, max_cycles=config.opt_cycles, verbosity=verbosity - ) + with resources_local.occupy_cores(MINCORES_PLACEHOLDER): + postprocmol = engine.optimize( + mol, max_cycles=config.opt_cycles, verbosity=verbosity + ) except RuntimeError as e: raise RuntimeError("Optimization in postprocessing failed.") from e - finally: - parallel.free_cores(MINCORES_PLACEHOLDER) else: try: - parallel.occupy_cores(MINCORES_PLACEHOLDER) - engine.singlepoint(mol, verbosity=verbosity) + with resources_local.occupy_cores(MINCORES_PLACEHOLDER): + engine.singlepoint(mol, verbosity=verbosity) postprocmol = mol except RuntimeError as e: raise RuntimeError( "Single point calculation in postprocessing failed." ) from e - finally: - parallel.free_cores(MINCORES_PLACEHOLDER) return postprocmol diff --git a/src/mindlessgen/molecules/refinement.py b/src/mindlessgen/molecules/refinement.py index cae894b..386bfba 100644 --- a/src/mindlessgen/molecules/refinement.py +++ b/src/mindlessgen/molecules/refinement.py @@ -6,8 +6,9 @@ from pathlib import Path import networkx as nx # type: ignore import numpy as np + from ..qm.base import QMMethod -from ..prog import GenerateConfig, RefineConfig, ParallelManager +from ..prog import GenerateConfig, RefineConfig, ResourceMonitor from .molecule import Molecule from .miscellaneous import ( set_random_charge, @@ -31,7 +32,7 @@ def iterative_optimization( engine: QMMethod, config_generate: GenerateConfig, config_refine: RefineConfig, - parallel: ParallelManager, + resources_local: ResourceMonitor, verbosity: int = 1, ) -> Molecule: """ @@ -46,25 +47,21 @@ def iterative_optimization( for cycle in range(config_refine.max_frag_cycles): # Run single points first, start optimization if scf converges try: - parallel.occupy_cores(1) - _ = engine.singlepoint(rev_mol, verbosity) + with resources_local.occupy_cores(1): + _ = engine.singlepoint(rev_mol, verbosity) except RuntimeError as e: raise RuntimeError( f"Single-point calculation failed at fragmentation cycle {cycle}: {e}" ) from e - finally: - parallel.free_cores(1) # Optimize the current molecule try: - parallel.occupy_cores(MINCORES_PLACEHOLDER) - rev_mol = engine.optimize(rev_mol, None, verbosity) + with resources_local.occupy_cores(MINCORES_PLACEHOLDER): + rev_mol = engine.optimize(rev_mol, None, verbosity) except RuntimeError as e: raise RuntimeError( f"Optimization failed at fragmentation cycle {cycle}: {e}" ) from e - finally: - parallel.free_cores(MINCORES_PLACEHOLDER) if verbosity > 2: # Print coordinates of optimized molecule diff --git a/src/mindlessgen/prog/__init__.py b/src/mindlessgen/prog/__init__.py index b977cb3..41d8d45 100644 --- a/src/mindlessgen/prog/__init__.py +++ b/src/mindlessgen/prog/__init__.py @@ -13,7 +13,7 @@ PostProcessConfig, ) -from .parallel import ParallelManager +from .parallel import setup_managers, ResourceMonitor __all__ = [ "ConfigManager", @@ -23,5 +23,6 @@ "GenerateConfig", "RefineConfig", "PostProcessConfig", - "ParallelManager", + "setup_managers", + "ResourceMonitor", ] diff --git a/src/mindlessgen/prog/parallel.py b/src/mindlessgen/prog/parallel.py index d8c231f..a0b8291 100644 --- a/src/mindlessgen/prog/parallel.py +++ b/src/mindlessgen/prog/parallel.py @@ -1,38 +1,34 @@ from concurrent.futures import ProcessPoolExecutor from multiprocessing.managers import SyncManager from multiprocessing import Manager -from typing import Any +from contextlib import contextmanager -class ParallelManager: - def __init__(self, max_workers: int, ncores: int): - self.executor: ProcessPoolExecutor = ProcessPoolExecutor( - max_workers=max_workers - ) - self.manager: SyncManager = Manager() +@contextmanager +def setup_managers(max_workers: int, ncores: int): + executor: ProcessPoolExecutor = ProcessPoolExecutor(max_workers=max_workers) + manager: SyncManager = Manager() + resource_manager: ResourceMonitor = ResourceMonitor(manager, ncores) + try: + yield executor, manager, resource_manager + finally: + executor.shutdown(False, cancel_futures=True) + manager.shutdown() - self.__free_cores = self.manager.Value(int, ncores) - self.__enough_cores = self.manager.Condition() - def __enter__(self): - return self - - def __exit__(self, exc_type: Exception, exc_val: Any, exc_tb: Any): - self.executor.shutdown(False, cancel_futures=True) - self.manager.shutdown() - - return False # Don't supress any exceptions - - def shutdown(self): - self.executor.shutdown(False, cancel_futures=True) - self.manager.shutdown() +class ResourceMonitor: + def __init__(self, manager: SyncManager, ncores: int): + self.__free_cores = manager.Value(int, ncores) + self.__enough_cores = manager.Condition() + @contextmanager def occupy_cores(self, ncores: int): - with self.__enough_cores: - self.__enough_cores.wait_for(lambda: self.__free_cores.value >= ncores) - self.__free_cores.value -= ncores - - def free_cores(self, ncores: int): - with self.__enough_cores: - self.__free_cores.value += ncores - self.__enough_cores.notify() # TODO: try this with notify_all instead + try: + with self.__enough_cores: + self.__enough_cores.wait_for(lambda: self.__free_cores.value >= ncores) + self.__free_cores.value -= ncores + yield + finally: + with self.__enough_cores: + self.__free_cores.value += ncores + self.__enough_cores.notify() # TODO: try this with notify_all instead From 3383173ed92dcc6a627bee9986619ddd97f4bda1 Mon Sep 17 00:00:00 2001 From: lmseidler Date: Thu, 16 Jan 2025 11:45:04 +0100 Subject: [PATCH 7/7] small test fix --- test/test_molecules/test_refinement.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_molecules/test_refinement.py b/test/test_molecules/test_refinement.py index dfbe319..70e0174 100644 --- a/test/test_molecules/test_refinement.py +++ b/test/test_molecules/test_refinement.py @@ -12,7 +12,7 @@ from mindlessgen.molecules import Molecule # type: ignore from mindlessgen.molecules import iterative_optimization # type: ignore from mindlessgen.prog.config import MINCORES_PLACEHOLDER -from mindlessgen.prog.parallel import ParallelManager +from mindlessgen.prog.parallel import setup_managers from mindlessgen.qm import XTB, get_xtb_path # type: ignore TESTSDIR = Path(__file__).resolve().parents[1] @@ -144,13 +144,13 @@ def test_iterative_optimization(mol_C13H14: Molecule, mol_C7H8: Molecule) -> Non else: raise NotImplementedError("Engine not implemented.") mol = mol_C13H14 - with ParallelManager(1, MINCORES_PLACEHOLDER) as parallel: + with setup_managers(1, MINCORES_PLACEHOLDER) as (_, _, resources): mol_opt = iterative_optimization( mol, engine, config.generate, config.refine, - parallel, + resources, verbosity=2, ) mol_ref = mol_C7H8