From 1a5545c4a595d4aed229d841655b8018c88e1fee Mon Sep 17 00:00:00 2001 From: Philipp Ross Date: Wed, 21 Feb 2024 09:02:26 +0100 Subject: [PATCH 1/6] Update setup-python and checkout --- .github/workflows/lint.yml | 4 ++-- .github/workflows/test.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 522abc57..69090a1a 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -21,10 +21,10 @@ jobs: shell: bash -el {0} steps: - name: Check out Git repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: '3.9.16' token: ${{ secrets.QUARK_GH_GITHUB_COM_TOKEN }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 124936e1..c3eb7e54 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,10 +17,10 @@ jobs: shell: bash -el {0} steps: - name: Check out Git repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: '3.9.16' cache: 'pip' # caching pip dependencies From 0d702ecc0aa651f8ac8a0f9552cfb16006e93489 Mon Sep 17 00:00:00 2001 From: Philipp Ross Date: Thu, 22 Feb 2024 10:41:43 +0100 Subject: [PATCH 2/6] Save results of current benchmark run when CTRL-C is detected --- src/BenchmarkManager.py | 111 +++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 53 deletions(-) diff --git a/src/BenchmarkManager.py b/src/BenchmarkManager.py index a6010cd1..fc4d5377 100644 --- a/src/BenchmarkManager.py +++ b/src/BenchmarkManager.py @@ -125,59 +125,64 @@ def run_benchmark(self, benchmark_backlog: list, repetitions: int): """ git_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ) git_revision_number, git_uncommitted_changes = get_git_revision(git_dir) - - try: - for idx_backlog, backlog_item in enumerate(benchmark_backlog): - benchmark_records: [BenchmarkRecord] = [] - path = f"{self.store_dir}/benchmark_{idx_backlog}" - Path(path).mkdir(parents=True, exist_ok=True) - with open(f"{path}/application_config.json", 'w') as filehandler: - json.dump(backlog_item["config"], filehandler, indent=2) - for i in range(1, repetitions + 1): - logging.info(f"Running backlog item {idx_backlog + 1}/{len(benchmark_backlog)}," - f" Iteration {i}/{repetitions}:") - try: - - self.benchmark_record_template = BenchmarkRecord(idx_backlog, - datetime.today().strftime('%Y-%m-%d-%H-%M-%S'), - git_revision_number, git_uncommitted_changes, - i, repetitions) - self.application.metrics.set_module_config(backlog_item["config"]) - problem, preprocessing_time = self.application.preprocess(None, backlog_item["config"], - store_dir=path, rep_count=i) - self.application.metrics.set_preprocessing_time(preprocessing_time) - self.application.save(path, i) - - processed_input, benchmark_record = self.traverse_config(backlog_item["submodule"], problem, - path, rep_count=i) - - _, postprocessing_time = self.application.postprocess(processed_input, None, store_dir=path, - rep_count=i) - self.application.metrics.set_postprocessing_time(postprocessing_time) - self.application.metrics.validate() - benchmark_record.append_module_record_left(deepcopy(self.application.metrics)) - benchmark_records.append(benchmark_record) - - except Exception as error: - logging.exception(f"Error during benchmark run: {error}", exc_info=True) - if self.fail_fast: - raise - - for record in benchmark_records: - record.sum_up_times() - - # Wait until all MPI processes have finished and save results on rank 0 - comm.Barrier() - if comm.Get_rank() == 0: - with open(f"{path}/results.json", 'w') as filehandler: - json.dump([x.get() for x in benchmark_records], filehandler, indent=2, cls=NumpyEncoder) - - logging.info("") - logging.info(" =============== Run finished =============== ") - logging.info("") - - except KeyboardInterrupt: - logging.warning("CTRL-C detected. Still trying to create results.json.") + break_flag = False + + for idx_backlog, backlog_item in enumerate(benchmark_backlog): + benchmark_records: [BenchmarkRecord] = [] + path = f"{self.store_dir}/benchmark_{idx_backlog}" + Path(path).mkdir(parents=True, exist_ok=True) + with open(f"{path}/application_config.json", 'w') as filehandler: + json.dump(backlog_item["config"], filehandler, indent=2) + for i in range(1, repetitions + 1): + logging.info(f"Running backlog item {idx_backlog + 1}/{len(benchmark_backlog)}," + f" Iteration {i}/{repetitions}:") + try: + + self.benchmark_record_template = BenchmarkRecord(idx_backlog, + datetime.today().strftime('%Y-%m-%d-%H-%M-%S'), + git_revision_number, git_uncommitted_changes, + i, repetitions) + self.application.metrics.set_module_config(backlog_item["config"]) + problem, preprocessing_time = self.application.preprocess(None, backlog_item["config"], + store_dir=path, rep_count=i) + self.application.metrics.set_preprocessing_time(preprocessing_time) + self.application.save(path, i) + + processed_input, benchmark_record = self.traverse_config(backlog_item["submodule"], problem, + path, rep_count=i) + + _, postprocessing_time = self.application.postprocess(processed_input, None, store_dir=path, + rep_count=i) + self.application.metrics.set_postprocessing_time(postprocessing_time) + self.application.metrics.validate() + benchmark_record.append_module_record_left(deepcopy(self.application.metrics)) + benchmark_records.append(benchmark_record) + + except KeyboardInterrupt: + logging.warning("CTRL-C detected during run_benchmark. Still trying to create results.json.") + break_flag = True + break + + except Exception as error: + logging.exception(f"Error during benchmark run: {error}", exc_info=True) + if self.fail_fast: + raise + + for record in benchmark_records: + record.sum_up_times() + + # Wait until all MPI processes have finished and save results on rank 0 + comm.Barrier() + if comm.Get_rank() == 0: + with open(f"{path}/results.json", 'w') as filehandler: + json.dump([x.get() for x in benchmark_records], filehandler, indent=2, cls=NumpyEncoder) + + logging.info("") + logging.info(" =============== Run finished =============== ") + logging.info("") + + if break_flag: + break def traverse_config(self, module: dict, input_data: any, path: str, rep_count: int) -> (any, BenchmarkRecord): """ From 6d71f427719b8f46e60cd9b7053a7fe535ed8140 Mon Sep 17 00:00:00 2001 From: "Marvin Erdmann (FG-231)" Date: Wed, 20 Mar 2024 14:45:13 +0100 Subject: [PATCH 3/6] Minor sphinx confic adjustment --- docs/conf.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/conf.py b/docs/conf.py index 4be74e10..c4710193 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -106,6 +106,9 @@ # # html_sidebars = {} +# If smartquotes is True, double dashes (--) are transformed to en-dashes (–) +# which could be confused with single dashes (-). +smartquotes = False # -- Options for HTMLHelp output --------------------------------------------- From a35b4102e891012c54098889735d76c9653fdc28 Mon Sep 17 00:00:00 2001 From: jusschwitalla Date: Thu, 21 Mar 2024 19:12:32 +0100 Subject: [PATCH 4/6] Add interrupt/resume feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * treatment of async jobs * fixing treatment of unfinished jobs * added demo application for the new QUARK instructions * amended the README --------- Co-authored-by: Jürgen Schwitalla Co-authored-by: Stefan Maschek --- README.md | 17 +++ docs/developer.rst | 29 ++++ src/BenchmarkManager.py | 250 +++++++++++++++++++++++++++++------ src/BenchmarkRecord.py | 31 +++++ src/demo/instruction_demo.py | 61 +++++++++ src/main.py | 10 +- 6 files changed, 359 insertions(+), 39 deletions(-) create mode 100644 src/demo/instruction_demo.py diff --git a/README.md b/README.md index c83e7c74..3671c180 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,23 @@ Example run (You need to check at least one option with an ``X`` for the checkbo All used config files, logs and results are stored in a folder in the ```benchmark_runs``` directory. +#### interrupt/resume +The processing of backlog items may get interrupted in which case you will see something like +``` +2024-03-13 10:25:20,201 [INFO] ================================================================================ +2024-03-13 10:25:20,201 [INFO] ====== Run 3 backlog items with 10 iterations - FINISHED:15 INTERRUPTED:15 +2024-03-13 10:25:20,201 [INFO] ====== There are interrupted jobs. You may resume them by running QUARK with +2024-03-13 10:25:20,201 [INFO] ====== --resume-dir=benchmark_runs\tsp-2024-03-13-10-25-19 +2024-03-13 10:25:20,201 [INFO] ================================================================================ +``` +This happens if you press CTRL-C or if some QUARK module does its work asynchronously, e.g. by submitting its job to some +batch system. Learn more about how to write asynchronous modules in the [developer guide](https://quark-framework.readthedocs.io/en/dev/). +You can resume an interrupted QUARK run by calling: +``` +python src/main.py --resume-dir= +``` +Note that you can copy/paste the --resume-dir option from the QUARK output as shown in the above example. + #### Non-Interactive Mode It is also possible to start the script with a config file instead of using the interactive mode: ``` diff --git a/docs/developer.rst b/docs/developer.rst index 272c719e..00bea10e 100644 --- a/docs/developer.rst +++ b/docs/developer.rst @@ -192,6 +192,35 @@ Example for an application, which should reside under ``src/modules/applications def save(self, path, iter_count): save_your_application(self.application, f"{path}/application.txt") +Writing an asynchronous Module +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +A typical example for an asynchronous module is a solver which submits its job into +the queue of some server and retrieves the result some times later. In QUARK this is +supported via the interrupt/resume mechanism. + +QUARK modules may return instructions to the BenchmarkManager as first entry in the return value of +pre and post-process. Currently the following instructions are supported: + - PROCEED + - INTERRUPT + +PROCEED: If the BenchmarkManager gets the instruction "PROCEED" (or no instruction at all) it continues with the regular QUARK workflow. +If the current job can be finished without getting an "INTERRUPT" instruction or an exception, +the BenchmarkManager adds "quark_job_status"=FINISHED to the metrics. + +INTERRUPT: If the BenchmarkManager gets the instruction "INTERRUPT" it stops the current QUARK workflow, +adds "quark_job_status"=INTERRUPTED to the metrics, saves all the metrics written so far to the BenchmarkRecord +and continues with the configuration/repetition loop. + +QUARK Resume Mode: + +After running QUARK in its regular mode QUARK can be run again on the same results directory in resume mode by +specifying the existing results directory with the --resume-dir option. This can be done repeatedly for the same +results directory. + +If QUARK is called in resume mode the module which has returned an INTERRUPT previously will be called again +with the same input supplemented by the key word argument "previous_job_info" which contains all the information +the moduls has written to the metrics on the previous run. + Updating the Module Database ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/BenchmarkManager.py b/src/BenchmarkManager.py index fc4d5377..4cb31cea 100644 --- a/src/BenchmarkManager.py +++ b/src/BenchmarkManager.py @@ -19,13 +19,14 @@ import os.path from copy import deepcopy from datetime import datetime +from enum import Enum from pathlib import Path -from typing import List, Dict +from typing import List, Dict, Optional import numpy as np from ConfigManager import ConfigManager -from BenchmarkRecord import BenchmarkRecord +from BenchmarkRecord import BenchmarkRecord, BenchmarkRecordStored from Plotter import Plotter from modules.Core import Core from utils import get_git_revision @@ -35,6 +36,63 @@ comm = get_comm() +class Instruction(Enum): + PROCEED = 0 + INTERRUPT = 1 + + +class JobStatus(Enum): + UNDEF = 0 + INTERRUPTED = 1 + FINISHED = 2 + FAILED = 3 + + +def _prepend_instruction(result: tuple) -> tuple: + """ + If the given list does not contain an instruction as first entry a + PROCEED is inserted at position 0 such that it is guaranteed that + the first entry of the returned list is an INSTRUCTION with PROCEED + as default. + + :param result: the list to which the instruction is to be prepended + :type result: tuple + :return: the list with an INSTRUCTION as first entry + :rtype: tuple + """ + if isinstance(result[0], Instruction): + return result + else: + return Instruction.PROCEED, *result + + +def postprocess(module_instance: Core, *args, **kwargs) -> tuple: + """ + Wraps module_instance.postprocess such that the first entry of the + result list is guaranteed to be an Instruction. See _prepend_instruction. + + :param module_instance: the QUARK module on which to call postprocess + :type module_instance: Core + :return: the result list of module_instance.postprocess with an Instruction as first entry. + :rtype: tuple + """ + result = module_instance.postprocess(*args, **kwargs) + return _prepend_instruction(result) + + +def preprocess(module_instance: Core, *args, **kwargs) -> tuple: + """ + Wraps module_instance.preprocess such that the first entry of the + result list is guaranteed to be an Instruction. See _prepend_instruction. + + :param module_instance: the QUARK module on which to call preprocess + :type module_instance: Core + :return: the result list of module_instance.preprocess with an Instruction as first entry. + :rtype: tuple + """ + result = module_instance.preprocess(*args, **kwargs) + return _prepend_instruction(result) + class BenchmarkManager: """ @@ -56,6 +114,18 @@ def __init__(self, fail_fast: bool = False): self.results = [] self.store_dir = None self.benchmark_record_template = None + self.interrupted_results_path = None + + def load_interrupted_results(self) -> Optional[list]: + """ + :return: the content of the results file from the QUARK run to be resumed or None. + :rtype: Optional[list] + """ + if self.interrupted_results_path is None or not os.path.exists(self.interrupted_results_path): + return None + with open(self.interrupted_results_path, encoding='utf-8') as results_file : + results = json.load(results_file) + return results def _create_store_dir(self, store_dir: str = None, tag: str = None) -> None: """ @@ -73,7 +143,13 @@ def _create_store_dir(self, store_dir: str = None, tag: str = None) -> None: self.store_dir = f"{store_dir}/benchmark_runs/{tag + '-' if not None else ''}" \ f"{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}" Path(self.store_dir).mkdir(parents=True, exist_ok=True) + self._set_logger() + + def _resume_store_dir(self, store_dir) -> None: + self.store_dir = store_dir + self._set_logger() + def _set_logger(self) -> None: # Also store the log file to the benchmark dir logger = logging.getLogger() formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") @@ -82,7 +158,7 @@ def _create_store_dir(self, store_dir: str = None, tag: str = None) -> None: logger.addHandler(filehandler) def orchestrate_benchmark(self, benchmark_config_manager: ConfigManager, app_modules: list[dict], - store_dir: str = None) -> None: + store_dir: str = None, interrupted_results_path: str = None) -> None: """ Executes the benchmarks according to the given settings. @@ -92,10 +168,17 @@ def orchestrate_benchmark(self, benchmark_config_manager: ConfigManager, app_mod :type app_modules: list of dict :param store_dir: target directory to store the results of the benchmark (if you decided to store it) :type store_dir: str + :param interrupted_results_path: result file from which the information for the interrupted jobs will be read. + If store_dir is None the parent directory of interrupted_results_path will + be used as store_dir. + :type interrupted_results_path: str :rtype: None """ - - self._create_store_dir(store_dir, tag=benchmark_config_manager.get_config()["application"]["name"].lower()) + self.interrupted_results_path = interrupted_results_path + if interrupted_results_path and not store_dir: + self._resume_store_dir(os.path.dirname(interrupted_results_path)) + else: + self._create_store_dir(store_dir, tag=benchmark_config_manager.get_config()["application"]["name"].lower()) benchmark_config_manager.save(self.store_dir) benchmark_config_manager.load_config(app_modules) self.application = benchmark_config_manager.get_app() @@ -127,15 +210,36 @@ def run_benchmark(self, benchmark_backlog: list, repetitions: int): git_revision_number, git_uncommitted_changes = get_git_revision(git_dir) break_flag = False + job_status_count_total = {} + interrupted_results = self.load_interrupted_results() for idx_backlog, backlog_item in enumerate(benchmark_backlog): benchmark_records: [BenchmarkRecord] = [] path = f"{self.store_dir}/benchmark_{idx_backlog}" Path(path).mkdir(parents=True, exist_ok=True) with open(f"{path}/application_config.json", 'w') as filehandler: json.dump(backlog_item["config"], filehandler, indent=2) + job_status_count = {} for i in range(1, repetitions + 1): logging.info(f"Running backlog item {idx_backlog + 1}/{len(benchmark_backlog)}," f" Iteration {i}/{repetitions}:") + # getting information of interrupted jobs + job_info_with_meta_data = {} + if interrupted_results: + for entry in interrupted_results: + if entry["benchmark_backlog_item_number"] == idx_backlog and entry["repetition"] == i: + job_info_with_meta_data = entry + break + job_info = job_info_with_meta_data['module'] if job_info_with_meta_data else {} + quark_job_status_name = job_info.get("quark_job_status") + if quark_job_status_name in (JobStatus.FINISHED.name, JobStatus.FAILED.name): + quark_job_status = JobStatus.FINISHED if quark_job_status_name == JobStatus.FINISHED.name \ + else JobStatus.FAILED + benchmark_records.append(BenchmarkRecordStored(job_info_with_meta_data)) + job_status_count[quark_job_status] = job_status_count.get(quark_job_status, 0) + 1 + job_status_count_total[quark_job_status] = job_status_count_total.get(quark_job_status, 0) + 1 + logging.info("job already %s - skip.", quark_job_status_name) + continue + try: self.benchmark_record_template = BenchmarkRecord(idx_backlog, @@ -143,20 +247,34 @@ def run_benchmark(self, benchmark_backlog: list, repetitions: int): git_revision_number, git_uncommitted_changes, i, repetitions) self.application.metrics.set_module_config(backlog_item["config"]) - problem, preprocessing_time = self.application.preprocess(None, backlog_item["config"], - store_dir=path, rep_count=i) + instruction, problem, preprocessing_time = preprocess(self.application, None, backlog_item["config"], + store_dir=path, rep_count=i, + previous_job_info=job_info) self.application.metrics.set_preprocessing_time(preprocessing_time) self.application.save(path, i) - processed_input, benchmark_record = self.traverse_config(backlog_item["submodule"], problem, - path, rep_count=i) + postprocessing_time = 0. + benchmark_record = self.benchmark_record_template.copy() + if instruction == Instruction.PROCEED: + instruction, processed_input, benchmark_record = \ + self.traverse_config(backlog_item["submodule"], problem, + path, rep_count=i, previous_job_info=job_info) + if instruction == Instruction.PROCEED: + instruction, _, postprocessing_time = \ + postprocess(self.application, processed_input, backlog_item["config"], + store_dir=path, rep_count=i, previous_job_info=job_info) + + if instruction == Instruction.INTERRUPT: + quark_job_status = JobStatus.INTERRUPTED + else: + quark_job_status = JobStatus.FINISHED + self.application.metrics.add_metric("quark_job_status", quark_job_status.name) - _, postprocessing_time = self.application.postprocess(processed_input, None, store_dir=path, - rep_count=i) self.application.metrics.set_postprocessing_time(postprocessing_time) self.application.metrics.validate() - benchmark_record.append_module_record_left(deepcopy(self.application.metrics)) - benchmark_records.append(benchmark_record) + if benchmark_record is not None: + benchmark_record.append_module_record_left(deepcopy(self.application.metrics)) + benchmark_records.append(benchmark_record) except KeyboardInterrupt: logging.warning("CTRL-C detected during run_benchmark. Still trying to create results.json.") @@ -165,12 +283,28 @@ def run_benchmark(self, benchmark_backlog: list, repetitions: int): except Exception as error: logging.exception(f"Error during benchmark run: {error}", exc_info=True) + quark_job_status = JobStatus.FAILED + if job_info: + # restore results/infos from previous run + benchmark_records.append(job_info) if self.fail_fast: raise + job_status_count[quark_job_status] = job_status_count.get(quark_job_status, 0) + 1 + job_status_count_total[quark_job_status] = job_status_count_total.get(quark_job_status, 0) + 1 + for record in benchmark_records: record.sum_up_times() + for record in benchmark_records: + record.sum_up_times() + + status_report = " ".join([f"{status.name}:{count}" for status, count in job_status_count.items()]) + logging.info("") + logging.info(f" ==== Run backlog item {idx_backlog + 1}/{len(benchmark_backlog)} " + f"with {repetitions} iterations - {status_report} ==== ") + logging.info("") + # Wait until all MPI processes have finished and save results on rank 0 comm.Barrier() if comm.Get_rank() == 0: @@ -184,7 +318,26 @@ def run_benchmark(self, benchmark_backlog: list, repetitions: int): if break_flag: break - def traverse_config(self, module: dict, input_data: any, path: str, rep_count: int) -> (any, BenchmarkRecord): + # print overall status information + status_report = " ".join([f"{status.name}:{count}" for status, count in job_status_count_total.items()]) + logging.info(80 * "=") + logging.info(f"====== Run {len(benchmark_backlog)} backlog items " + f"with {repetitions} iterations - {status_report}") + + nb_interrupted = job_status_count_total.get(JobStatus.INTERRUPTED, 0) + nb_not_started = sum(job_status_count_total.values()) < len(benchmark_backlog) + if nb_interrupted + nb_not_started > 0: + try: + rel_path = Path(self.store_dir).relative_to(os.getcwd()) + except ValueError: + rel_path = self.store_dir + logging.info(f"====== There are interrupted jobs. You may resume them by running QUARK with") + logging.info(f"====== --resume-dir={rel_path}") + logging.info(80*"=") + logging.info("") + + + def traverse_config(self, module: dict, input_data: any, path: str, rep_count: int, previous_job_info: dict = None) -> (any, BenchmarkRecord): """ Executes a benchmark by traversing down the initialized config recursively until it reaches the end. Then traverses up again. Once it reaches the root/application, a benchmark run is finished. @@ -204,36 +357,59 @@ def traverse_config(self, module: dict, input_data: any, path: str, rep_count: i # Only the value of the dict is needed (dict has only one key) module = module[next(iter(module))] module_instance: Core = module["instance"] - + + submodule_job_info = None + if previous_job_info and previous_job_info.get("submodule"): + assert module['name'] == previous_job_info["submodule"]["module_name"], \ + f"asyncronous job info given, but no information about module {module['name']} stored in it" #TODO!! + if 'submodule' in previous_job_info and previous_job_info['submodule']: + submodule_job_info = previous_job_info['submodule'] + module_instance.metrics.set_module_config(module["config"]) - module_instance.preprocessed_input, preprocessing_time = module_instance.preprocess(input_data, - module["config"], - store_dir=path, - rep_count=rep_count) + instruction, module_instance.preprocessed_input, preprocessing_time\ + = preprocess(module_instance, input_data, + module["config"], + store_dir=path, + rep_count=rep_count, + previous_job_info=submodule_job_info) + module_instance.metrics.set_preprocessing_time(preprocessing_time) + output = None + benchmark_record = self.benchmark_record_template.copy() + postprocessing_time = 0.0 + if instruction == Instruction.PROCEED: + + # Check if end of the chain is reached + if not module["submodule"]: + # If we reach the end of the chain we create the benchmark record, fill it and then pass it up + instruction, module_instance.postprocessed_input, postprocessing_time = \ + postprocess( module_instance, + module_instance.preprocessed_input, + module["config"], store_dir=path, + rep_count=rep_count, + previous_job_info=submodule_job_info) + output = module_instance.postprocessed_input + else: + instruction, processed_input, benchmark_record = self.traverse_config(module["submodule"], + module_instance.preprocessed_input, path, + rep_count, previous_job_info=submodule_job_info) + + if instruction == Instruction.PROCEED: + instruction, module_instance.postprocessed_input, postprocessing_time = \ + postprocess(module_instance, processed_input, + module["config"], + store_dir=path, + rep_count=rep_count, + previous_job_info=submodule_job_info) + output = module_instance.postprocessed_input + else: + output = processed_input - # Check if end of the chain is reached - if not module["submodule"]: - # If we reach the end of the chain we create the benchmark record, fill it and then pass it up - benchmark_record = self.benchmark_record_template.copy() - module_instance.postprocessed_input, postprocessing_time = module_instance.postprocess( - module_instance.preprocessed_input, module["config"], store_dir=path, rep_count=rep_count) - - else: - processed_input, benchmark_record = self.traverse_config(module["submodule"], - module_instance.preprocessed_input, path, - rep_count) - module_instance.postprocessed_input, postprocessing_time = module_instance.postprocess(processed_input, - module["config"], - store_dir=path, - rep_count=rep_count) - - output = module_instance.postprocessed_input module_instance.metrics.set_postprocessing_time(postprocessing_time) module_instance.metrics.validate() benchmark_record.append_module_record_left(deepcopy(module_instance.metrics)) - return output, benchmark_record + return instruction, output, benchmark_record def _collect_all_results(self) -> List[Dict]: """ diff --git a/src/BenchmarkRecord.py b/src/BenchmarkRecord.py index 0aa505e8..0c749525 100644 --- a/src/BenchmarkRecord.py +++ b/src/BenchmarkRecord.py @@ -177,3 +177,34 @@ def copy(self) -> any: :rtype: BenchmarkRecord """ return deepcopy(self) + + +class BenchmarkRecordStored: + """ + This class can be used to store the BenchmarkRecord of a previous QUARK run as read from results.json. + It is a simple wrapper with the purpose to provide the same interface to the BenchmarkManager as the + BenchmarkRecord does. + """ + def __init__(self, record: dict): + """ + :param record: the record as dictionary + :type record: dict + """ + self.record = record + + def get(self) -> dict: + """ + Simply returns the dictionary as given to the constructor. + + :return: Dictionary as given to the constructor + :rtype: dict + """ + return self.record + + def sum_up_times(self) -> None: + """ + Dummy implementation which does nothing. + + :rtype: None + """ + pass diff --git a/src/demo/instruction_demo.py b/src/demo/instruction_demo.py new file mode 100644 index 00000000..612c0bf9 --- /dev/null +++ b/src/demo/instruction_demo.py @@ -0,0 +1,61 @@ +import logging + +from BenchmarkManager import Instruction +from modules.Core import Core +from modules.applications.Application import Application + + +class InstructionDemo(Application): + """ + A simple QUARK Application implementation showing the usage of instructions. + """ + def __init__(self, application_name: str = None): + super().__init__(application_name) + self.submodule_options = ["Dummy"] + + def preprocess(self, input_data: any, config: dict, **kwargs) -> (any, float): + logging.info("%s", kwargs.keys()) + logging.info("previous_job_info: %s", kwargs.get("previous_job_info")) + rep_count = kwargs["rep_count"] + instruction_name = config.get("instruction", Instruction.PROCEED.name) + instruction = Instruction.PROCEED + if instruction_name == Instruction.PROCEED.name: + instruction = Instruction.PROCEED + elif instruction_name == Instruction.INTERRUPT.name: + instruction = Instruction.INTERRUPT + if instruction_name == "mixed": + instruction = Instruction.PROCEED + if rep_count%2 == 1: + instruction = Instruction.INTERRUPT + elif instruction_name == "exception": + raise Exception("demo exception") + + logging.info("InstructionDemo iteration %s returns instruction %s", rep_count, instruction.name) + return instruction, "", 0. + + def get_parameter_options(self) -> dict: + return { + "instruction": {"values": [Instruction.PROCEED.name, + Instruction.INTERRUPT.name, + "exception", + "mixed"], + "description": "How should preprocess behave?"} + } + + def get_default_submodule(self, option: str) -> Core: + return Dummy() + + def save(self, path: str, iter_count: int) -> None: + pass + + +class Dummy(Core): + """ + Dummy QUARK module implementation which is used by the InstructionDemo. + """ + + def get_parameter_options(self) -> dict: + return {} + + def get_default_submodule(self, option: str) -> Core: + pass diff --git a/src/main.py b/src/main.py index 2f6b9d8b..14b48327 100644 --- a/src/main.py +++ b/src/main.py @@ -126,8 +126,10 @@ def create_benchmark_parser(parser: argparse.ArgumentParser): parser.add_argument('-s', '--summarize', nargs='+', help='If you want to summarize multiple experiments', required=False) parser.add_argument('-m', '--modules', help="Provide a file listing the modules to be loaded") + parser.add_argument('-rd', '--resume-dir', nargs='?', help='Provide results directory of the job to be resumed') parser.add_argument('-ff', '--failfast', help='Flag whether a single failed benchmark run causes QUARK to fail', required=False, action=argparse.BooleanOptionalAction) + parser.set_defaults(goal='benchmark') @@ -175,7 +177,9 @@ def handle_benchmark_run(args: argparse.Namespace) -> None: installer = Installer() app_modules = installer.get_env(installer.get_active_env()) - if args.config: + if args.config or args.resume_dir: + if not args.config: + args.config = os.path.join(args.resume_dir, "config.yml") logging.info(f"Provided config file at {args.config}") # Loads config with open(args.config) as filehandler: @@ -194,7 +198,9 @@ def handle_benchmark_run(args: argparse.Namespace) -> None: logging.info("Selected config is:") config_manager.print() else: - benchmark_manager.orchestrate_benchmark(config_manager, app_modules) + interrupted_results_path = None if args.resume_dir is None else os.path.join(args.resume_dir, "results.json") + benchmark_manager.orchestrate_benchmark(config_manager, app_modules, + interrupted_results_path=interrupted_results_path) comm.Barrier() if comm.Get_rank() == 0: results = benchmark_manager.load_results() From 4eef3de91341c03cf76a8f5295c0387e1b80a886 Mon Sep 17 00:00:00 2001 From: chsowinski <153180328+chsowinski@users.noreply.github.com> Date: Tue, 26 Mar 2024 11:07:57 +0100 Subject: [PATCH 5/6] Added IBM Eagle Access to QiskitQAOA * Added IBM Eagle Access to QiskitQAOA * Adjusted range of iterations for QiskitQAOA in order to fit for IBM Eagle, since here a low number of iterations is necessary * Added instructions to tutorial.rst on how to add IBM API Token --- .settings/module_db.json | 16 +++++++++++++--- docs/tutorial.rst | 14 ++++++++++++++ src/modules/solvers/QiskitQAOA.py | 26 ++++++++++++++++++-------- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/.settings/module_db.json b/.settings/module_db.json index d783a2c0..2fc22e51 100644 --- a/.settings/module_db.json +++ b/.settings/module_db.json @@ -1,7 +1,7 @@ { - "build_number": 6, - "build_date": "22-11-2023 13:47:42", - "git_revision_number": "5ea1fcf56c438df4b32fcc318dcb6e2c8e58447b", + "build_number": 7, + "build_date": "20-03-2024 12:51:58", + "git_revision_number": "3be6f3847150d4bb8debee2451522b0b19fa205f", "modules": [ { "name": "PVC", @@ -1863,6 +1863,16 @@ "module": "modules.devices.HelperClass", "requirements": [], "submodules": [] + }, + { + "name": "ibm_eagle", + "class": "HelperClass", + "args": { + "device_name": "ibm_eagle" + }, + "module": "modules.devices.HelperClass", + "requirements": [], + "submodules": [] } ] } diff --git a/docs/tutorial.rst b/docs/tutorial.rst index f4597b47..29b38761 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -133,6 +133,20 @@ Example run (You need to check at least one option with an ``X`` for the checkbo All used config files, logs and results are stored in a folder in the ``benchmark_runs`` directory. +Access to IBM Eagle +^^^^^^^^^^^^^^^^^^^ + +In order to use the IBM Eagle device in QUARK you have to first save your API token. +This can be done similar to accessing AWS: + +.. code:: bash + + export ibm_quantum_token='Your Token' + python src/main.py + +:: + + Non-Interactive Mode ^^^^^^^^^^^^^^^^^^^^ diff --git a/src/modules/solvers/QiskitQAOA.py b/src/modules/solvers/QiskitQAOA.py index eca7bac9..8fbd1774 100644 --- a/src/modules/solvers/QiskitQAOA.py +++ b/src/modules/solvers/QiskitQAOA.py @@ -15,6 +15,7 @@ from typing import Tuple from typing import TypedDict +import os import numpy as np from qiskit import Aer from qiskit.algorithms import VQE, QAOA, NumPyMinimumEigensolver @@ -22,6 +23,7 @@ from qiskit.circuit.library import TwoLocal from qiskit.opflow import PauliSumOp from qiskit_optimization.applications import OptimizationApplication +from qiskit_ibm_runtime import QiskitRuntimeService from modules.solvers.Solver import * from utils import start_time_measurement, end_time_measurement @@ -37,7 +39,7 @@ def __init__(self): Constructor method """ super().__init__() - self.submodule_options = ["qasm_simulator", "qasm_simulator_gpu"] + self.submodule_options = ["qasm_simulator", "qasm_simulator_gpu", "ibm_eagle"] @staticmethod def get_requirements() -> list[dict]: @@ -69,6 +71,9 @@ def get_default_submodule(self, option: str) -> Core: elif option == "qasm_simulator_gpu": from modules.devices.HelperClass import HelperClass # pylint: disable=C0415 return HelperClass("qasm_simulator_gpu") + elif option == "ibm_eagle": + from modules.devices.HelperClass import HelperClass # pylint: disable=C0415 + return HelperClass("ibm_eagle") else: raise NotImplementedError(f"Device Option {option} not implemented") @@ -85,8 +90,8 @@ def get_parameter_options(self) -> dict: "description": "How many shots do you need?" }, "iterations": { # number measurements to make on circuit - "values": [10, 20, 50, 75], - "description": "How many iterations do you need?" + "values": [1, 5, 10, 20, 50, 75], + "description": "How many iterations do you need? Warning: When using the IBM Eagle Device you should only choose a lower number of iterations, since a high number would lead to a waiting time that could take up to mulitple days!" }, "depth": { "values": [2, 3, 4, 5, 10, 20], @@ -98,7 +103,7 @@ def get_parameter_options(self) -> dict: }, "optimizer": { "values": ["POWELL", "SPSA", "COBYLA"], - "description": "Which Qiskit solver should be used?" + "description": "Which Qiskit solver should be used? Warning: When using the IBM Eagle Device you should not use the SPSA optimizer, since it is not suited for only one evaluation!" } } @@ -109,8 +114,8 @@ def get_parameter_options(self) -> dict: "description": "How many shots do you need?" }, "iterations": { # number measurements to make on circuit - "values": [10, 20, 50, 75], - "description": "How many iterations do you need?" + "values": [1, 5, 10, 20, 50, 75], + "description": "How many iterations do you need? Warning: When using the IBM Eagle Device you should only choose a lower number of iterations, since a high number would lead to a waiting time that could take up to mulitple days!" }, "depth": { "values": [2, 3, 4, 5, 10, 20], @@ -122,7 +127,7 @@ def get_parameter_options(self) -> dict: }, "optimizer": { "values": ["POWELL", "SPSA", "COBYLA"], - "description": "Which Qiskit solver should be used?" + "description": "Which Qiskit solver should be used? Warning: When using the IBM Eagle Device you should not use the SPSA optimizer for a low number of iterations!" } } @@ -186,7 +191,7 @@ def run(self, mapped_problem: any, device_wrapper: any, config: Config, **kwargs if config["optimizer"] == "COBYLA": optimizer = COBYLA(maxiter=config["iterations"]) elif config["optimizer"] == "POWELL": - optimizer = POWELL(maxiter=config["iterations"]) + optimizer = POWELL(maxiter=config["iterations"], maxfev=config["iterations"] if device_wrapper.device == 'ibm_eagle' else None) elif config["optimizer"] == "SPSA": optimizer = SPSA(maxiter=config["iterations"]) if config["method"] == "vqe": @@ -208,6 +213,11 @@ def _get_quantum_instance(device_wrapper: any) -> any: logging.info("Using GPU simulator") backend.set_options(device='GPU') backend.set_options(method='statevector_gpu') + elif device_wrapper.device == 'ibm_eagle': + logging.info("Using IBM Eagle") + ibm_quantum_token = os.environ.get('ibm_quantum_token') + service = QiskitRuntimeService(channel="ibm_quantum", token=ibm_quantum_token) + backend = service.least_busy(operational=True, simulator=False, min_num_qubits=127) else: logging.info("Using CPU simulator") backend.set_options(device='CPU') From de5162ffce388c298ca6b5ec4f83dab23ae3d5c6 Mon Sep 17 00:00:00 2001 From: Marvin Erdmann <106394656+Marvmann@users.noreply.github.com> Date: Wed, 27 Mar 2024 07:50:04 +0100 Subject: [PATCH 6/6] Bugfix issue #49 (#117) * Ignore imaginary terms of problem matrix If the imaginary part of all matrix elements is zero, the matrix is transformed into a real matrix. If there are imaginary terms != 0, a warning is logged. * Correct reverse_mapping Updated the input type from dict to any. In all tests run, the input format was a numpy array. Corrected checking for -1 in the solution string. * Correct linting errors --- src/BenchmarkManager.py | 19 ++++++++++--------- src/main.py | 3 ++- .../optimization/TSP/mappings/ISING.py | 8 ++++---- src/modules/solvers/QAOA.py | 5 +++++ src/modules/solvers/QiskitQAOA.py | 19 ++++++++++++++----- 5 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/BenchmarkManager.py b/src/BenchmarkManager.py index 4cb31cea..817be578 100644 --- a/src/BenchmarkManager.py +++ b/src/BenchmarkManager.py @@ -144,7 +144,7 @@ def _create_store_dir(self, store_dir: str = None, tag: str = None) -> None: f"{datetime.today().strftime('%Y-%m-%d-%H-%M-%S')}" Path(self.store_dir).mkdir(parents=True, exist_ok=True) self._set_logger() - + def _resume_store_dir(self, store_dir) -> None: self.store_dir = store_dir self._set_logger() @@ -177,7 +177,7 @@ def orchestrate_benchmark(self, benchmark_config_manager: ConfigManager, app_mod self.interrupted_results_path = interrupted_results_path if interrupted_results_path and not store_dir: self._resume_store_dir(os.path.dirname(interrupted_results_path)) - else: + else: self._create_store_dir(store_dir, tag=benchmark_config_manager.get_config()["application"]["name"].lower()) benchmark_config_manager.save(self.store_dir) benchmark_config_manager.load_config(app_modules) @@ -196,7 +196,7 @@ def orchestrate_benchmark(self, benchmark_config_manager: ConfigManager, app_mod results = self._collect_all_results() self._save_as_json(results) - def run_benchmark(self, benchmark_backlog: list, repetitions: int): + def run_benchmark(self, benchmark_backlog: list, repetitions: int): # pylint: disable=R0915 """ Goes through the benchmark backlog, which contains all the benchmarks to execute. @@ -247,7 +247,8 @@ def run_benchmark(self, benchmark_backlog: list, repetitions: int): git_revision_number, git_uncommitted_changes, i, repetitions) self.application.metrics.set_module_config(backlog_item["config"]) - instruction, problem, preprocessing_time = preprocess(self.application, None, backlog_item["config"], + instruction, problem, preprocessing_time = preprocess(self.application, None, + backlog_item["config"], store_dir=path, rep_count=i, previous_job_info=job_info) self.application.metrics.set_preprocessing_time(preprocessing_time) @@ -331,13 +332,14 @@ def run_benchmark(self, benchmark_backlog: list, repetitions: int): rel_path = Path(self.store_dir).relative_to(os.getcwd()) except ValueError: rel_path = self.store_dir - logging.info(f"====== There are interrupted jobs. You may resume them by running QUARK with") + logging.info("====== There are interrupted jobs. You may resume them by running QUARK with") logging.info(f"====== --resume-dir={rel_path}") logging.info(80*"=") logging.info("") - def traverse_config(self, module: dict, input_data: any, path: str, rep_count: int, previous_job_info: dict = None) -> (any, BenchmarkRecord): + def traverse_config(self, module: dict, input_data: any, path: str, rep_count: int, previous_job_info: + dict = None) -> (any, BenchmarkRecord): """ Executes a benchmark by traversing down the initialized config recursively until it reaches the end. Then traverses up again. Once it reaches the root/application, a benchmark run is finished. @@ -357,14 +359,14 @@ def traverse_config(self, module: dict, input_data: any, path: str, rep_count: i # Only the value of the dict is needed (dict has only one key) module = module[next(iter(module))] module_instance: Core = module["instance"] - + submodule_job_info = None if previous_job_info and previous_job_info.get("submodule"): assert module['name'] == previous_job_info["submodule"]["module_name"], \ f"asyncronous job info given, but no information about module {module['name']} stored in it" #TODO!! if 'submodule' in previous_job_info and previous_job_info['submodule']: submodule_job_info = previous_job_info['submodule'] - + module_instance.metrics.set_module_config(module["config"]) instruction, module_instance.preprocessed_input, preprocessing_time\ = preprocess(module_instance, input_data, @@ -378,7 +380,6 @@ def traverse_config(self, module: dict, input_data: any, path: str, rep_count: i benchmark_record = self.benchmark_record_template.copy() postprocessing_time = 0.0 if instruction == Instruction.PROCEED: - # Check if end of the chain is reached if not module["submodule"]: # If we reach the end of the chain we create the benchmark record, fill it and then pass it up diff --git a/src/main.py b/src/main.py index 14b48327..de5445a4 100644 --- a/src/main.py +++ b/src/main.py @@ -198,7 +198,8 @@ def handle_benchmark_run(args: argparse.Namespace) -> None: logging.info("Selected config is:") config_manager.print() else: - interrupted_results_path = None if args.resume_dir is None else os.path.join(args.resume_dir, "results.json") + interrupted_results_path = None if args.resume_dir is None else os.path.join(args.resume_dir, + "results.json") benchmark_manager.orchestrate_benchmark(config_manager, app_modules, interrupted_results_path=interrupted_results_path) comm.Barrier() diff --git a/src/modules/applications/optimization/TSP/mappings/ISING.py b/src/modules/applications/optimization/TSP/mappings/ISING.py index 3f69bdbb..2fd6a663 100644 --- a/src/modules/applications/optimization/TSP/mappings/ISING.py +++ b/src/modules/applications/optimization/TSP/mappings/ISING.py @@ -346,17 +346,17 @@ def _map_qiskit(graph: nx.Graph, config: Config) -> (dict, float): return {"J": j_matrix, "t": t_matrix}, end_time_measurement(start) - def reverse_map(self, solution: dict) -> (dict, float): + def reverse_map(self, solution: any) -> (dict, float): """ Maps the solution back to the representation needed by the TSP class for validation/evaluation. - :param solution: dictionary containing the solution - :type solution: dict + :param solution: list or array containing the solution + :type solution: any :return: solution mapped accordingly, time it took to map it :rtype: tuple(dict, float) """ start = start_time_measurement() - if np.any(solution == "-1"): # ising model output from Braket QAOA + if -1 in solution: # ising model output from Braket QAOA solution = self._convert_ising_to_qubo(solution) elif self.config["mapping"] == "pyqubo" or self.config["mapping"] == "ocean": logging.debug("Flip bits in solutions to unify different mappings") diff --git a/src/modules/solvers/QAOA.py b/src/modules/solvers/QAOA.py index 15a5e9cd..d30ae5bf 100644 --- a/src/modules/solvers/QAOA.py +++ b/src/modules/solvers/QAOA.py @@ -151,6 +151,11 @@ def run(self, mapped_problem: any, device_wrapper: any, config: Config, **kwargs """ j = mapped_problem['J'] + if np.any(np.iscomplex(j)): + logging.warning("The problem matrix of the QAOA solver contains imaginary numbers." + "This may lead to an error later in the run.") + else: + j = np.real(j) # set up the problem n_qubits = j.shape[0] diff --git a/src/modules/solvers/QiskitQAOA.py b/src/modules/solvers/QiskitQAOA.py index 8fbd1774..fb8b1b35 100644 --- a/src/modules/solvers/QiskitQAOA.py +++ b/src/modules/solvers/QiskitQAOA.py @@ -91,7 +91,10 @@ def get_parameter_options(self) -> dict: }, "iterations": { # number measurements to make on circuit "values": [1, 5, 10, 20, 50, 75], - "description": "How many iterations do you need? Warning: When using the IBM Eagle Device you should only choose a lower number of iterations, since a high number would lead to a waiting time that could take up to mulitple days!" + "description": "How many iterations do you need? Warning: When using\ + the IBM Eagle Device you should only choose a lower number of\ + iterations, since a high number would lead to a waiting time that\ + could take up to mulitple days!" }, "depth": { "values": [2, 3, 4, 5, 10, 20], @@ -103,7 +106,9 @@ def get_parameter_options(self) -> dict: }, "optimizer": { "values": ["POWELL", "SPSA", "COBYLA"], - "description": "Which Qiskit solver should be used? Warning: When using the IBM Eagle Device you should not use the SPSA optimizer, since it is not suited for only one evaluation!" + "description": "Which Qiskit solver should be used? Warning: When\ + using the IBM Eagle Device you should not use the SPSA optimizer,\ + since it is not suited for only one evaluation!" } } @@ -115,7 +120,9 @@ def get_parameter_options(self) -> dict: }, "iterations": { # number measurements to make on circuit "values": [1, 5, 10, 20, 50, 75], - "description": "How many iterations do you need? Warning: When using the IBM Eagle Device you should only choose a lower number of iterations, since a high number would lead to a waiting time that could take up to mulitple days!" + "description": "How many iterations do you need? Warning: When using the IBM Eagle Device you\ + should only choose a lower number of iterations, since a high number would lead to a waiting \ + ime that could take up to mulitple days!" }, "depth": { "values": [2, 3, 4, 5, 10, 20], @@ -127,7 +134,8 @@ def get_parameter_options(self) -> dict: }, "optimizer": { "values": ["POWELL", "SPSA", "COBYLA"], - "description": "Which Qiskit solver should be used? Warning: When using the IBM Eagle Device you should not use the SPSA optimizer for a low number of iterations!" + "description": "Which Qiskit solver should be used? Warning: When using the IBM Eagle Device\ + you should not use the SPSA optimizer for a low number of iterations!" } } @@ -191,7 +199,8 @@ def run(self, mapped_problem: any, device_wrapper: any, config: Config, **kwargs if config["optimizer"] == "COBYLA": optimizer = COBYLA(maxiter=config["iterations"]) elif config["optimizer"] == "POWELL": - optimizer = POWELL(maxiter=config["iterations"], maxfev=config["iterations"] if device_wrapper.device == 'ibm_eagle' else None) + optimizer = POWELL(maxiter=config["iterations"], maxfev=config["iterations"] if + device_wrapper.device == 'ibm_eagle' else None) elif config["optimizer"] == "SPSA": optimizer = SPSA(maxiter=config["iterations"]) if config["method"] == "vqe":