Skip to content

Commit

Permalink
Refactored runners
Browse files Browse the repository at this point in the history
- added a decent API to add new types of runners
- batch() is now much simpler as it uses new classes: Job and RunContext
- iter_inputs now allows to change the platform for each batch
- added type hints for qaboard.runners
- added tests for the CLI
- added validation of the user's run() results
  • Loading branch information
arthur-flam committed Feb 25, 2020
1 parent ff7dff0 commit b7a43c3
Show file tree
Hide file tree
Showing 21 changed files with 825 additions and 456 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ npm-debug.log*
yarn-debug.log*
yarn-error.log*

.vscode

# generated by tests. TODO: use /tmp
qaboard/sample_project/image.batches.yaml

# Created by https://www.gitignore.io/api/linux,python,windows,sublimetext,visualstudio

Expand Down
25 changes: 3 additions & 22 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,12 @@ before_script:


unit:tests:
stage: test
script:
- python -munittest

cli:tests:
stage: test
retry: 2 # solves frequent filesystem sync issues
script:
# TODO: turn into a standalone script / tests.
- cd qatools/sample_project
- python -mqatools --help
- mkdir -p cli_tests/dir; touch cli_tests/a.jpg; touch cli_tests/b.jpg; touch cli_tests/dir/c.jpg
- export QA_DATABASE=$(pwd)
- export QA_OFFLINE=true
- python -mqatools run -i cli_tests/a.jpg 'echo "{absolute_input_path} => {output_directory}"'
# expect 1 run
- python -mqatools --dryrun batch cli_tests
# expect 3 runs
- sed -i 's/# globs:/globs:/' qatools.yaml
- python -mqatools --dryrun batch cli_tests --list | jq '.'
- python -mqatools batch cli_tests 'echo "{absolute_input_path} => {output_directory}"'
- python -mqatools batch --runner=local cli_tests 'echo "{absolute_input_path} => {output_directory}"'
# other CLI tests
# - python -mqatools save-artifacts # Gitlab: 404: Project not found
- python -mqatools get commit_id
- green -vvv
# - PYTHONPATH=$(pwd) python -munittest


deploy:qa:
stage: deploy
Expand Down
4 changes: 2 additions & 2 deletions backend/backend/api/export_to_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import label

from qatools.utils import copy
from qatools.conventions import deserialize_config, serialize_config
from qaboard.utils import copy
from qaboard.conventions import deserialize_config, serialize_config
from backend import app, db_session
from ..models import Project, CiCommit, Batch, slugify_config

Expand Down
26 changes: 14 additions & 12 deletions backend/backend/api/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from flask import request, jsonify
from sqlalchemy.orm.exc import NoResultFound

from qatools.iterators import iter_inputs
from qatools.conventions import deserialize_config
from qaboard.iterators import iter_inputs
from qaboard.conventions import deserialize_config

from backend import app, db_session
from ..models import CiCommit, Project
Expand Down Expand Up @@ -163,16 +163,18 @@ def get_group():
try:
tests = list(
iter_inputs(
[request.args["name"]],
groups_paths,
project.database,
default_configuration,
{},
qatools_config,
[request.args["name"]], # groups
groups_paths, # groups_file,
project.database, # database
default_configuration, # default_configuration
'lsf', # platform
{"type": 'lsf'}, # default_lsf_configuration
qatools_config, # qatools_config
# inputs_settings=None
)
)
return jsonify({
"tests": [{"input_path": str(test.relative_to(database)), "configurations": configuration} for test, configuration, _, database, _ in tests],
"tests": [{"input_path": str(run_context.rel_input_path), "configurations": run_context.configurations} for run_context in tests],
"message": message,
})
except Exception as e:
Expand All @@ -197,7 +199,7 @@ def start_tuning(hexsha):
return jsonify("Sorry, the commit id was not found"), 404

if "qatools_config" not in ci_commit.project.data:
return jsonify("Please configure `qatools first`"), 404
return jsonify("Please create `qaboard.yaml`"), 404

ci_commit.latest_output_datetime = datetime.datetime.now()
ci_commit.latest_output_datetime = datetime.datetime.now()
Expand Down Expand Up @@ -279,8 +281,8 @@ def start_tuning(hexsha):

# Make sure qatools doesn't complain about not being in a git repository and knows where to save results
f"\nexport CI=true;\n",
f"export CI_COMMIT_SHA='{ci_commit.gitcommit.hexsha}';\n",
f"export QA_CI_COMMIT_DIR='{ci_commit.commit_dir}';\n\n",
f"export CI_COMMIT_SHA='{ci_commit.hexsha}';\n",
f"export QA_CI_CI_COMMIT_DIR='{ci_commit.commit_dir}';\n\n",
batch_command,
]
)
Expand Down
33 changes: 15 additions & 18 deletions backend/backend/models/Batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,23 @@ def __repr__(self):


def stop(self):
stdouts = []
kill_commands = []
for _, command in self.data.get('commands', {}).items():
ssh = "LC_ALL=en_US.utf8 LANG=en_US.utf8 ssh -q -tt -i /home/arthurf/.ssh/ispq.id_rsa ispq@ispq-vdi"
bsub = f"bsub_su {command['user']} -I"
kill_command = f"{ssh} {bsub} bkill -J '{command['lsf_jobs_prefix']}/*'"
kill_commands.append(kill_command)
print(kill_command)
out = subprocess.run(kill_command, shell=True, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# TODO: can we after the stop() just mark all outputs as is_pending:False ?
errors = []
for command_id, command in self.data.get('commands', {}).items():
from qaboard.runners.job import JobGroup
# Default to something reasonnable, but it likely won't work out-of-the-box for all runners
# if the stop dosn't only use the command_id...
jobs = JobGroup(job_options={"type": "local", "command_id": command_id, **command})
try:
out.check_returncode()
print(out.stdout)
stdouts.append(str(out.stdout))
error = stop_command(command_id, command)
if error:
errors.append(error)
except:
# If LSF can't find the jobs, they are done already
if 'No match' not in str(out.stdout):
return {"error": str(out.stdout), "cmd": str(kill_command)}
# TODO: check it's enough to mark all outputs as is_pending:false !
return {"cmd": '\n'.join(kill_commands), "stdout": '\n\n'.join(stdouts)}

continue
if errors:
return {"error": errors}
else:
return {}

def delete(self, session):
"""
Expand Down
21 changes: 15 additions & 6 deletions qaboard/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@ def print_url(ctx, status="starting"):
if status == "starting":
click.echo(click.style("Results: ", bold=True) + click.style(commit_url, underline=True, bold=True), err=True)
elif status == "failure":
click.secho(f"Read Logs at: {commit_url}{'?' if batch_label == 'default' else '&'}selected_views=logs", fg='red', bold=True)
click.echo(
click.style("[FAILED] Read the full logs at: ", bold=True, fg='red') +
click.style(
f"{commit_url}{'?' if batch_label == 'default' else '&'}selected_views=logs",
fg='red',
underline=True,
bold=True,
),
err=True)



Expand Down Expand Up @@ -92,7 +100,7 @@ def notify_qa_database(object_type='output', **kwargs):
return

# we only update the output database if we're in a CI run, or if the user used `qa --ci`
if not is_ci and not kwargs['share']:
if not (is_ci or kwargs['share']):
return

# some light custom serialization for Path objects
Expand Down Expand Up @@ -147,9 +155,9 @@ def get_output(output_id):
except:
pass


@lru_cache()
def batch_info(reference, is_branch, batch):
# We used to use a cache but now we want to check run statuses before/after the batch
# @lru_cache()
def batch_info(reference, batch, is_branch=False):
"""Get data about a batch of outputs in the database"""
import requests
params = {
Expand All @@ -164,7 +172,8 @@ def batch_info(reference, is_branch, batch):
url = f'{api_prefix}/commit/{commit_id}'
r = requests.get(url, params=params)
if 'batches' not in r.json():
print(r.url)
click.secho('WARNING: We could not get the results for the "{batch}" batch {reference}', fg='yellow', bold=True)
click.secho(r.url, fg='yellow')
raise ValueError(f'We could not get the results for {batch}')
return r.json()['batches'][batch]

Expand Down
5 changes: 4 additions & 1 deletion qaboard/conventions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import yaml
import click
from requests.utils import unquote


def get_settings(inputs_type, config):
Expand Down Expand Up @@ -172,7 +173,7 @@ def make_hash(obj):


def batch_dir(commit_ci_dir, batch_label, tuning, save_with_ci=False):
from qatools.config import is_ci, subproject
from qaboard.config import is_ci, subproject
batch_folder = Path('output') if batch_label == 'default' else Path('tuning') / slugify(batch_label)
return commit_ci_dir / batch_folder if (is_ci or save_with_ci) else subproject / batch_folder

Expand All @@ -198,3 +199,5 @@ def tuning_foldername(batch_label, tuning_parameters_hash):
return parameters_folder


def url_to_dir(url):
return Path(unquote(url)[2:])
34 changes: 21 additions & 13 deletions qaboard/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from .conventions import make_hash, make_pretty_tuning_filename, get_settings
from .utils import input_metadata, entrypoint_module, cased_path

from .run import RunContext


def flatten(lst: Union[str, List, Tuple]):
Expand Down Expand Up @@ -132,13 +132,15 @@ def _iter_inputs(path, database, inputs_settings, qatools_config, only=None, exc



def iter_inputs(groups, groups_file, database, default_configuration, default_lsf_configuration, qatools_config, inputs_settings=None, debug=os.environ.get('QA_DEBUG_ITER_INPUTS', False)):
"""Returns an iterator over the (input_path, configurations, lsf-configuration) from the selected groups
def iter_inputs(groups, groups_file, database, default_configuration, default_platform, default_job_configuration, qatools_config, inputs_settings=None, debug=os.environ.get('QA_DEBUG_ITER_INPUTS', False)):
"""Returns an iterator over the (input_path, configurations, runner-configuration) from the selected groups
params:
- groups: array of group names or paths whose inputs you want to iterate
- groups_file: path to a yaml file, or an array of paths
- config: is none is specified
"""
# FIXME: can't change platform/entrypoint per-batch

if not (isinstance(groups_file, list) or isinstance(groups_file, tuple)):
groups_file = [groups_file]
available_batches = {}
Expand All @@ -161,6 +163,8 @@ def iter_inputs(groups, groups_file, database, default_configuration, default_ls
group_aliases = available_batches.get('groups', {})
groups = list(resolve_aliases(groups, group_aliases))

runner = default_job_configuration['type']

if not groups:
click.secho(f'WARNING: No group chosen.', fg='yellow', err=True)

Expand All @@ -173,15 +177,16 @@ def iter_inputs(groups, groups_file, database, default_configuration, default_ls
# Maybe we asked recordings from a location...
if debug: click.secho(str(group), bold=True, fg='cyan', err=True)
inputs_iter = _iter_inputs(group, database, inputs_settings, qatools_config)
yield from ((i, default_configuration , default_lsf_configuration, database, inputs_settings['type']) for i in inputs_iter)
yield from (RunContext(input_path=i, database=database, configurations=default_configuration, platform=default_platform, job_options=default_job_configuration, type=inputs_settings['type']) for i in inputs_iter)
return

# 2. Those defined in the groups_file
if available_batches[group] is None: continue # happens when there is an orphan "$group:" in in the yaml...
group_only = available_batches[group].get('only')
group_exclude = available_batches[group].get('exclude')
# Each group can define his own default runtime and LSF configuration
group_lsf_configuration = {**default_lsf_configuration, **available_batches[group].get('lsf', {})}
group_platform = available_batches[group].get('platform', default_platform)
# Each group can define his own default runtime and runner configuration
group_job_configuration = {**default_job_configuration, **available_batches[group].get(runner, {})}
group_configuration = available_batches[group].get('configurations', available_batches[group].get('configuration', default_configuration))
group_configuration = list(flatten(group_configuration))
group_database = Path(available_batches[group].get('database', {}).get('windows' if os.name=='nt' else 'linux', database))
Expand All @@ -195,7 +200,7 @@ def iter_inputs(groups, groups_file, database, default_configuration, default_ls
if not locations:
# run all inputs matching only/exclude
inputs_iter = _iter_inputs(None, group_database, group_inputs_settings, qatools_config, only=group_only, exclude=group_exclude)
yield from ((i, group_configuration, group_lsf_configuration, group_database, group_inputs_settings['type']) for i in inputs_iter)
yield from (RunContext(input_path=i, database=group_database, configurations=group_configuration, platform=group_platform, job_options=group_job_configuration, type=group_inputs_settings['type']) for i in inputs_iter)
return

# We also allow each input to have its settings...
Expand All @@ -211,21 +216,22 @@ def iter_inputs(groups, groups_file, database, default_configuration, default_ls

for location, location_configuration in locations.items():
if not location_configuration:
location_platform = group_platform
location_configuration = group_configuration
location_database = group_database
location_lsf_configuration = group_lsf_configuration
location_job_configuration = group_job_configuration
location_inputs_settings = group_inputs_settings
else:
if isinstance(location_configuration, dict):
location_lsf_configuration = {**group_lsf_configuration, **location_configuration.get('lsf', {})}
location_job_configuration = {**group_job_configuration, **location_configuration.get(runner, {})}
location_database = Path(location_configuration.get('database', {}).get('windows' if os.name=='nt' else 'linux', group_database))
if 'type' in location_configuration:
location_type = location_configuration['type']
location_inputs_settings = get_settings(location_type, qatools_config)
else:
location_inputs_settings = group_inputs_settings
location_inputs_settings.update(location_configuration)
for k in ['type', 'database', 'lsf', 'glob', 'globs', 'use_parent_folder']:
for k in ['type', 'database', runner, 'platform', 'glob', 'globs', 'use_parent_folder']:
if k in location_configuration:
del location_configuration[k]
if 'configurations' not in location_configuration and 'configurations' not in location_configuration:
Expand All @@ -236,17 +242,19 @@ def iter_inputs(groups, groups_file, database, default_configuration, default_ls
elif isinstance(location_configuration, list):
location_configuration = list(flatten(location_configuration))
location_configuration = [*group_configuration, *location_configuration]
location_platform = group_platform
location_database = group_database
location_lsf_configuration = group_lsf_configuration
location_job_configuration = group_job_configuration
location_inputs_settings = group_inputs_settings
else: # string?
location_platform = group_platform
location_configuration = [*group_configuration, location_configuration]
location_database = group_database
location_lsf_configuration = group_lsf_configuration
location_job_configuration = group_job_configuration
location_inputs_settings = group_inputs_settings
if debug: click.secho(str(location_database / location), bold=True, fg='cyan', err=True)
inputs_iter = _iter_inputs(location, location_database, location_inputs_settings, qatools_config, only=group_only, exclude=group_exclude)
yield from ((i, location_configuration, location_lsf_configuration, location_database, location_inputs_settings['type']) for i in inputs_iter)
yield from (RunContext(input_path=i, database=location_database, configurations=location_configuration, platform=location_platform, job_options=location_job_configuration, type=location_inputs_settings['type']) for i in inputs_iter)



Expand Down
Loading

0 comments on commit b7a43c3

Please sign in to comment.