Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tools] Parallel integration #289

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions .github/workflows/c-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ jobs:
run: |
cd tools/integration
python3 run_integration.py --ignore ignored_tests.txt
cd ../..

- uses: actions/upload-artifact@v4
with:
name: integration-report
path: |
integration-test/*/out/sim/report.txt
integration-test/*/out/dynamatic_*.txt

195 changes: 138 additions & 57 deletions tools/integration/run_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import subprocess
import re
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor


class CLIHandler:
Expand Down Expand Up @@ -35,6 +36,10 @@ def add_arguments(self):
"-t", "--timeout",
help="Custom timeout value for a single test. If not given, 500 seconds is used."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can you specify the default timeout time here?
then you can remove the description from help

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the argument default=500, but this does not automatically add it to the help description if I remove it.

)
self.parser.add_argument(
"-w", "--workers",
help="Number of workers to run in parallel for testing. Default is os.cpu_count()."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

        self.parser.add_argument(
            "-w", "--workers",
            nargs="?",
            type=int
            help="Number of workers to run in parallel for testing. Default is os.cpu_count()."
        )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does, thanks! I changed it for the timeout argument as well.


def parse_args(self, args=None):
"""
Expand All @@ -59,15 +64,12 @@ def parse_args(self, args=None):
simulate
exit
"""
DYN_FILE = DYNAMATIC_ROOT / "build" / "run_test.dyn"

# Note: Must use --exit-on-failure in order for run_command_with_timeout
# to be able to detect the status code properly
DYNAMATIC_COMMAND = str(DYNAMATIC_ROOT / "bin" / "dynamatic") + \
" --exit-on-failure --run {script_path}"

# Class to have different colors while writing in terminal


class TermColors:
"""
Expand Down Expand Up @@ -138,13 +140,15 @@ def read_file(file_path):
return file.read()


def run_command_with_timeout(command, timeout=500):
def run_command_with_timeout(command, timeout, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
"""
Runs a command with a time limit for execution.

Arguments:
`command` -- Shell command to be executed
`timeout` -- Execution time limit
`stdout` -- File to redirect stdout to
`stderr` -- File to redirect stderr to

Returns: An integer representing the execution result
0 -- if process completed without errors
Expand All @@ -157,8 +161,8 @@ def run_command_with_timeout(command, timeout=500):
shell=True,
timeout=timeout,
check=True,
stdout=subprocess.PIPE, # Suppress standard output
stderr=subprocess.PIPE, # Suppress standard error
stdout=stdout,
stderr=stderr
)
return 0
except subprocess.CalledProcessError:
Expand Down Expand Up @@ -217,13 +221,88 @@ def get_sim_time(log_path):
raise ValueError("Log file does not contain simulation time!")


def run_test(c_file, id, timeout):
"""
Runs the specified integration test.

Arguments:
`c_file` -- Path to .c source file of integration test.
`id` -- Index used to identify the test.
`timeout` -- Timeout in seconds for running the test.

Returns:
Dictionary with the following keys:
`id` -- Index of the test that was given as argument.
`msg` -- Message indicating the result of the test.
`status` -- One of 'pass', `fail` or `timeout`.
"""

# Write .dyn script with appropriate source file name
dyn_file = DYNAMATIC_ROOT / "build" / f"test_{id}.dyn"
write_string_to_file(SCRIPT_CONTENT.format(src_path=c_file), dyn_file)

# Get out dir name
out_dir = replace_filename_with(c_file, "out")

# Remove previous out directory
if os.path.isdir(out_dir):
shutil.rmtree(out_dir)

Path(out_dir).mkdir()

with open(Path(out_dir) / "dynamatic_out.txt", "w") as stdout, \
open(Path(out_dir) / "dynamatic_err.txt", "w") as stderr:
# Run test and output result
exit_code = run_command_with_timeout(
DYNAMATIC_COMMAND.format(script_path=dyn_file),
timeout=timeout,
stdout=stdout,
stderr=stderr
)

name = Path(c_file).name[:-2]
if exit_code == 0:
sim_log_path = os.path.join(out_dir, "sim", "report.txt")
try:
sim_time = get_sim_time(sim_log_path)
return {
"id": id,
"msg": f"[PASS] {name} (simulation duration: "
f"{round(sim_time / 4)} cycles)",
"status": "pass"
}
except ValueError:
# This should never happen
return {
"id": id,
"msg": f"[PASS] {name} (simulation duration: NOT FOUND)",
"status": "pass"
}

elif exit_code == 1:
return {
"id": id,
"msg": f"[FAIL] {name}",
"status": "fail"
}
else:
return {
"id": id,
"msg": f"[TIMEOUT] {name}",
"status": "timeout"
}


def main():
"""
Entry point for the script.
"""
cli = CLIHandler()
args = cli.parse_args() # Parse the CLI arguments

# If timeout is not given, use default of 500 seconds
timeout = int(args.timeout or 500)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then you don't need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

c_files = find_files_ext(INTEGRATION_FOLDER, ".c")
test_names = []
if args.list:
Expand Down Expand Up @@ -253,60 +332,62 @@ def main():
passed_cnt = 0
ignored_cnt = 0

for c_file in c_files:
# Write .dyn script with appropriate source file name
write_string_to_file(SCRIPT_CONTENT.format(src_path=c_file), DYN_FILE)

# Get out dir name
out_dir = replace_filename_with(c_file, "out")

# Check if test is supposed to be ignored
if Path(c_file).name[:-2] in ignored_tests:
ignored_cnt += 1
color_print(f"[IGNORED] {c_file}", TermColors.OKGREEN)
continue

# One more test to handle
test_cnt += 1

# Remove previous out directory
if os.path.isdir(out_dir):
shutil.rmtree(out_dir)

# Run test and output result
if args.timeout:
result = run_command_with_timeout(
DYNAMATIC_COMMAND.format(script_path=DYN_FILE),
timeout=int(args.timeout)
)
else:
result = run_command_with_timeout(
DYNAMATIC_COMMAND.format(script_path=DYN_FILE)
)
workers = None
if args.workers:
workers = int(args.workers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then you don't need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed


# ProcessPoolExecutor creates subprocesses and not threads, and as such,
# is not limited by Python's Global Interpreter Lock.
# Hence, it allows the full performance gain from parallelism to be achieved,
# unlike ThreadPoolExecutor, which would not make execution any faster whatsoever.
with ProcessPoolExecutor(max_workers=workers) as executor:
shundroid marked this conversation as resolved.
Show resolved Hide resolved
# Note: _max_workers is a private variable, as indicated by the underscore.
# However, we access it to get the number of workers used, since the default
# number (which is used when the ctor is called with max_workers=None) is
# os.cpu_count() or os.process_cpu_count(), depending on the Python version.
# This is "cheating", but is independent of the Python version.
color_print(
f"[INFO] Running with {executor._max_workers} worker(s).",
shundroid marked this conversation as resolved.
Show resolved Hide resolved
TermColors.OKBLUE
)

if result == 0:
sim_log_path = os.path.join(out_dir, "sim", "report.txt")
try:
sim_time = get_sim_time(sim_log_path)
color_print(
f"[PASS] {c_file} (simulation duration: "
f"{round(sim_time / 4)} cycles)",
TermColors.OKGREEN
)
except ValueError:
# This should never happen
color_print(
f"[PASS] {c_file} (simulation duration: NOT FOUND)",
TermColors.OKGREEN
processes = []
for idx, c_file in enumerate(c_files):
# Check if test is supposed to be ignored
name = Path(c_file).name[:-2]
if name in ignored_tests:
ignored_cnt += 1
color_print(f"[IGNORED] {name}", TermColors.OKGREEN)
continue

# One more test to handle
test_cnt += 1

# Run the test
processes.append(
executor.submit(
run_test,
c_file, idx, timeout
)
)

passed_cnt += 1
elif result == 1:
color_print(f"[FAIL] {c_file}", TermColors.FAIL)
else:
color_print(f"[TIMEOUT] {c_file}", TermColors.WARNING)

sys.stdout.flush()
color_print(
f"[INFO] Submitted {name} for execution",
TermColors.OKBLUE
)
sys.stdout.flush()

for p in processes:
result = p.result()
if result["status"] == "pass":
color_print(result["msg"], TermColors.OKGREEN)
passed_cnt += 1
elif result["status"] == "fail":
color_print(result["msg"], TermColors.FAIL)
else:
color_print(result["msg"], TermColors.WARNING)

sys.stdout.flush()

print(
f"** Integration testing finished: "
Expand Down