Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: implements nextflow_version new parameter for job submission #171

Merged
merged 11 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
## lifebit-ai/cloudos-cli: changelog

## v2.12.0 (2024-11-27)

### Feature

- Adds the new parameter `--nextflow-version` to select the Nextflow version for job submissions.
- Now `--cloudos-url` can also take URLs with a trailing `/`

## v2.11.2 (2024-11-6)

### Fix
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Options:
to include.
--nextflow-profile TEXT A comma separated string indicating the
nextflow profile/s to use with your job.
--nextflow-version [22.10.8|24.04.4|latest]
Nextflow version to use when executing the
workflow in CloudOS. Please, note that
versions above 22.10.8 are only DSL2
compatible. Default=22.10.8.
--git-commit TEXT The exact whole 40 character commit hash to
run for the selected pipeline. If not
specified it defaults to the last commit of
Expand Down
48 changes: 40 additions & 8 deletions cloudos/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ def queue():
@click.option('--nextflow-profile',
help=('A comma separated string indicating the nextflow profile/s ' +
'to use with your job.'))
@click.option('--nextflow-version',
help=('Nextflow version to use when executing the workflow in CloudOS. ' +
'Please, note that versions above 22.10.8 are only DSL2 compatible. ' +
'Default=22.10.8.'),
type=click.Choice(['22.10.8', '24.04.4', 'latest']),
default='22.10.8')
@click.option('--git-commit',
help=('The exact whole 40 character commit hash to run for ' +
'the selected pipeline. ' +
Expand Down Expand Up @@ -237,6 +243,7 @@ def run(apikey,
ignite,
job_queue,
nextflow_profile,
nextflow_version,
instance_type,
instance_disk,
storage_mode,
Expand All @@ -255,11 +262,11 @@ def run(apikey,
disable_ssl_verification,
ssl_cert):
"""Submit a job to CloudOS."""
print('Executing run...')
cloudos_url = cloudos_url.rstrip('/')
dapineyro marked this conversation as resolved.
Show resolved Hide resolved
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
if spot:
print('\n[Message] You have specified spot instances but they are no longer available ' +
'in CloudOS. Option ignored.\n')
print('[Message] You have specified spot instances but they are no longer available ' +
'in CloudOS. Option ignored.')
if do_not_save_logs:
save_logs = False
else:
Expand All @@ -275,13 +282,13 @@ def run(apikey,
batch = None
elif ignite:
batch = None
print('\n[Warning] You have specified ignite executor. Please, note that ignite is being ' +
print('[Warning] You have specified ignite executor. Please, note that ignite is being ' +
'removed from CloudOS, so the command may fail. Check ignite availability in your ' +
'CloudOS\n')
'CloudOS')
else:
batch = True
if execution_platform == 'hpc':
print('\nHPC execution platform selected')
print('\n[Message] HPC execution platform selected')
if hpc_id is None:
raise ValueError('Please, specify your HPC ID using --hpc parameter')
print('[Message] Please, take into account that HPC execution do not support ' +
Expand All @@ -304,7 +311,7 @@ def run(apikey,
raise ValueError(f'The workflow {workflow_name} is a WDL workflow. ' +
'WDL is not supported on HPC execution platform.')
if workflow_type == 'wdl':
print('\tWDL workflow detected\n')
print('[Message] WDL workflow detected')
if wdl_mainfile is None:
raise ValueError('Please, specify WDL mainFile using --wdl-mainfile <mainFile>.')
c_status = cl.get_cromwell_status(workspace_id, verify_ssl)
Expand Down Expand Up @@ -346,15 +353,29 @@ def run(apikey,
print('\t...Sending job to CloudOS\n')
if is_module:
if job_queue is not None:
print(f'\tIgnoring job queue "{job_queue}" for ' +
print(f'[Message] Ignoring job queue "{job_queue}" for ' +
f'Platform Workflow "{workflow_name}". Platform Workflows ' +
'use their own predetermined queues.')
job_queue_id = None
if nextflow_version != '22.10.8':
print(f'[Message] The selected worflow \'{workflow_name}\' ' +
'is a CloudOS module. CloudOS modules only work with ' +
'Nextflow version 22.10.8. Switching to use 22.10.8')
nextflow_version = '22.10.8'
else:
queue = Queue(cloudos_url=cloudos_url, apikey=apikey, cromwell_token=cromwell_token,
workspace_id=workspace_id, verify=verify_ssl)
job_queue_id = queue.fetch_job_queue_id(workflow_type=workflow_type, batch=batch,
job_queue=job_queue)
if nextflow_version == 'latest':
nextflow_version = '24.04.4'
print('[Message] You have specified Nextflow version \'latest\'. The workflow will use the ' +
f'latest version available on CloudOS: {nextflow_version}.')
if nextflow_version != '22.10.8':
print(f'[Warning] You have specified Nextflow version {nextflow_version}. This version requires the pipeline ' +
'to be written in DSL2 and does not support DSL1.')
print('\nExecuting run...')
print(f'\tNextflow version: {nextflow_version}')
j_id = j.send_job(job_config=job_config,
parameter=parameter,
git_commit=git_commit,
Expand All @@ -365,6 +386,7 @@ def run(apikey,
batch=batch,
job_queue_id=job_queue_id,
nextflow_profile=nextflow_profile,
nextflow_version=nextflow_version,
instance_type=instance_type,
instance_disk=instance_disk,
storage_mode=storage_mode,
Expand Down Expand Up @@ -515,6 +537,7 @@ def run_curated_examples(apikey,

NOTE that currently, only Nextflow workflows are supported.
"""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
cl = Cloudos(cloudos_url, apikey, None)
curated_workflows = cl.get_curated_workflow_list(workspace_id, verify=verify_ssl)
Expand Down Expand Up @@ -628,6 +651,7 @@ def job_status(apikey,
disable_ssl_verification,
ssl_cert):
"""Check job status in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
print('Executing status...')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
if verbose:
Expand Down Expand Up @@ -702,6 +726,7 @@ def list_jobs(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all your jobs from a CloudOS workspace in CSV format."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down Expand Up @@ -783,6 +808,7 @@ def list_workflows(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all workflows from a CloudOS workspace in CSV format."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down Expand Up @@ -859,6 +885,7 @@ def import_workflows(apikey,
disable_ssl_verification,
ssl_cert):
"""Imports workflows to CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
print('Executing workflow import...\n')
print('\t[Message] Only Nextflow workflows are currently supported.\n')
Expand Down Expand Up @@ -920,6 +947,7 @@ def list_projects(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all projects from a CloudOS workspace in CSV format."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down Expand Up @@ -985,6 +1013,7 @@ def cromwell_status(apikey,
disable_ssl_verification,
ssl_cert):
"""Check Cromwell server status in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
if apikey is None and cromwell_token is None:
raise ValueError("Please, use one of the following tokens: '--apikey', '--cromwell_token'")
print('Executing status...')
Expand Down Expand Up @@ -1040,6 +1069,7 @@ def cromwell_restart(apikey,
disable_ssl_verification,
ssl_cert):
"""Restart Cromwell server in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
if apikey is None and cromwell_token is None:
raise ValueError("Please, use one of the following tokens: '--apikey', '--cromwell_token'")
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
Expand Down Expand Up @@ -1112,6 +1142,7 @@ def cromwell_stop(apikey,
disable_ssl_verification,
ssl_cert):
"""Stop Cromwell server in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
if apikey is None and cromwell_token is None:
raise ValueError("Please, use one of the following tokens: '--apikey', '--cromwell_token'")
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
Expand Down Expand Up @@ -1172,6 +1203,7 @@ def list_queues(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all available job queues from a CloudOS workspace."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down
2 changes: 1 addition & 1 deletion cloudos/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.11.2'
__version__ = '2.12.0'
12 changes: 10 additions & 2 deletions cloudos/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def convert_nextflow_to_json(self,
batch,
job_queue_id,
nextflow_profile,
nextflow_version,
instance_type,
instance_disk,
storage_mode,
Expand Down Expand Up @@ -242,8 +243,10 @@ def convert_nextflow_to_json(self,
Job queue Id to use in the batch job.
nextflow_profile: string
A comma separated string with the profiles to be used.
nextflow_version: string
Nextflow version to use when executing the workflow in CloudOS.
instance_type : string
Name of the AMI to choose.
Name of the instance type to be used for the job master node, for example for AWS EC2 c5.xlarge
instance_disk : int
The disk space of the instance, in GB.
storage_mode : string
Expand Down Expand Up @@ -383,6 +386,7 @@ def convert_nextflow_to_json(self,
"project": project_id,
"workflow": workflow_id,
"name": job_name,
"nextflowVersion": nextflow_version,
"resumable": resumable,
"saveProcessLogs": save_logs,
"batch": {
Expand Down Expand Up @@ -425,6 +429,7 @@ def send_job(self,
batch=True,
job_queue_id=None,
nextflow_profile=None,
nextflow_version='22.10.8',
instance_type='c5.xlarge',
instance_disk=500,
storage_mode='regular',
Expand Down Expand Up @@ -467,8 +472,10 @@ def send_job(self,
Job queue Id to use in the batch job.
nextflow_profile: string
A comma separated string with the profiles to be used.
nextflow_version: string
Nextflow version to use when executing the workflow in CloudOS.
instance_type : string
Type of the AMI to choose.
Name of the instance type to be used for the job master node, for example for AWS EC2 c5.xlarge
instance_disk : int
The disk space of the instance, in GB.
storage_mode : string
Expand Down Expand Up @@ -520,6 +527,7 @@ def send_job(self,
batch,
job_queue_id,
nextflow_profile,
nextflow_version,
instance_type,
instance_disk,
storage_mode,
Expand Down
4 changes: 2 additions & 2 deletions cloudos/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ def fetch_job_queue_id(self, workflow_type, batch=True, job_queue=None):
default_queue_name = available_queues[-1]['label']
queue_as_default = 'most recent suitable'
if job_queue is None:
print(f'\tNo job_queue was specified, using the {queue_as_default} queue: ' +
print(f'[Message] No job_queue was specified, using the {queue_as_default} queue: ' +
dapineyro marked this conversation as resolved.
Show resolved Hide resolved
f'{default_queue_name}.')
return default_queue_id
selected_queue = [q for q in available_queues if q['label'] == job_queue]
if len(selected_queue) == 0:
print(f'\tQueue \'{job_queue}\' you specified was not found, using the {queue_as_default} ' +
print(f'[Message] Queue \'{job_queue}\' you specified was not found, using the {queue_as_default} ' +
f'queue instead: {default_queue_name}.')
return default_queue_id
return selected_queue[0]['id']
2 changes: 1 addition & 1 deletion tests/test_data/convert_nextflow_to_json_params.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job", "resumable": true, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job","nextflowVersion": "22.10.8", "resumable": true, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
3 changes: 3 additions & 0 deletions tests/test_jobs/test_convert_nextflow_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"batch": False,
"job_queue_id": None,
"nextflow_profile": None,
"nextflow_version": '22.10.8',
"instance_type": "c5.xlarge",
"instance_disk": 500,
"storage_mode": 'regular',
Expand Down Expand Up @@ -46,6 +47,7 @@ def test_convert_nextflow_to_json_output_correct():
batch=param_dict["batch"],
job_queue_id=param_dict["job_queue_id"],
nextflow_profile=param_dict["nextflow_profile"],
nextflow_version=param_dict["nextflow_version"],
instance_type=param_dict["instance_type"],
instance_disk=param_dict["instance_disk"],
storage_mode=param_dict["storage_mode"],
Expand Down Expand Up @@ -78,6 +80,7 @@ def test_convert_nextflow_to_json_badly_formed_config():
batch=param_dict["batch"],
job_queue_id=param_dict["job_queue_id"],
nextflow_profile=param_dict["nextflow_profile"],
nextflow_version=param_dict["nextflow_version"],
instance_type=param_dict["instance_type"],
instance_disk=param_dict["instance_disk"],
storage_mode=param_dict["storage_mode"],
Expand Down