Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Updates GET workflows to v3 #178

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## lifebit-ai/cloudos-cli: changelog

## v2.15.0 (2025-01-16)

### Feature

- Updates GET workflows endpoint to v3.

## v2.14.0 (2024-12-18)

- Adds the new `--accelerate-file-staging` parameter to job submission to add support for AWS S3 mountpoint for quicker file staging.
Expand Down
2 changes: 1 addition & 1 deletion cloudos/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.14.0'
__version__ = '2.15.0'
120 changes: 74 additions & 46 deletions cloudos/clos.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,65 +340,54 @@ def get_curated_workflow_list(self, workspace_id, get_all=True, page=1, verify=T
r : list
A list of dicts, each corresponding to a workflow.
"""
data = {"search": "",
"page": page,
"filters": [
[
{
"isPredefined": True,
"isCurated": True,
"isFeatured": False,
"isModule": False
},
{
"isPredefined": True,
"isCurated": False,
"isFeatured": False,
"isModule": False
},
{
"isPredefined": True,
"isCurated": True,
"isFeatured": True,
"isModule": False
}
]
]
}
headers = {
"Content-type": "application/json",
"apikey": self.apikey
}
r = retry_requests_post("{}/api/v1/workflows/getByType?teamId={}".format(self.cloudos_url,
workspace_id),
json=data, headers=headers, verify=verify)
r = retry_requests_get(
"{}/api/v3/workflows?search=&groups[]=curated&groups[]=featured&groups[]=predefined&page={}&teamId={}".format(
self.cloudos_url, page, workspace_id),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
content = json.loads(r.content)
if get_all:
workflows_collected = len(content['pipelines'])
workflows_to_get = content['total']
workflows_collected = len(content['workflows'])
workflows_to_get = content['paginationMetadata']['Pagination-Count']
if workflows_to_get <= workflows_collected or workflows_collected == 0:
return content['pipelines']
return content['workflows']
if workflows_to_get > workflows_collected:
return content['pipelines'] + self.get_curated_workflow_list(workspace_id,
return content['workflows'] + self.get_curated_workflow_list(workspace_id,
get_all=True,
page=page+1,
verify=verify)
else:
return content['pipelines']
return content['workflows']

def get_workflow_list(self, workspace_id, verify=True):
def get_workflow_list(self, workspace_id, verify=True, get_all=True,
page=1, page_size=10, max_page_size=1000,
archived_status=False):
"""Get all the workflows from a CloudOS workspace.

Parameters
----------
workspace_id : string
The CloudOS workspace id from to collect the workflows.
verify: [bool|string]
verify : [bool|string]
Whether to use SSL verification or not. Alternatively, if
a string is passed, it will be interpreted as the path to
the SSL certificate file.
get_all : bool
Whether to get all available curated workflows or just the
indicated page.
page : int
The page number to retrieve, from the paginated response.
page_size : int
The number of workflows by page. From 1 to 1000.
max_page_size : int
Max page size defined by the API server. It is currently 1000.
archived_status : bool
Whether to retrieve archived workflows or not.

Returns
-------
Expand All @@ -409,12 +398,41 @@ def get_workflow_list(self, workspace_id, verify=True):
"Content-type": "application/json",
"apikey": self.apikey
}
r = retry_requests_get("{}/api/v1/workflows?teamId={}".format(self.cloudos_url,
workspace_id),
headers=headers, verify=verify)
archived_status = str(archived_status).lower()
r = retry_requests_get(
"{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format(
self.cloudos_url, workspace_id, page_size, page, archived_status),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
return json.loads(r.content)
content = json.loads(r.content)
if get_all:
total_workflows = content['paginationMetadata']['Pagination-Count']
if total_workflows <= max_page_size:
r = retry_requests_get(
"{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format(
self.cloudos_url, workspace_id, total_workflows, 1, archived_status),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
return json.loads(r.content)['workflows']
else:
n_pages = (total_workflows // max_page_size) + int((total_workflows % max_page_size) > 0)
for p in range(n_pages):
p += 1
r = retry_requests_get(
"{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format(
self.cloudos_url, workspace_id, max_page_size, p, archived_status),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
if p == 1:
all_content = json.loads(r.content)['workflows']
else:
all_content += json.loads(r.content)['workflows']
return all_content
else:
return content['workflows']

@staticmethod
def process_workflow_list(r, all_fields=False):
Expand All @@ -435,11 +453,10 @@ def process_workflow_list(r, all_fields=False):
"""
COLUMNS = ['_id',
'name',
'isModule',
'archived.status',
'mainFile',
'workflowType',
'parameters',
'group',
'repository.name',
'repository.platform',
'repository.url',
Expand Down Expand Up @@ -511,14 +528,25 @@ def is_module(self, workflow_name, workspace_id, verify=True):
"""
my_workflows_r = self.get_workflow_list(workspace_id, verify=verify)
my_workflows = self.process_workflow_list(my_workflows_r)
is_module = my_workflows.loc[
group = my_workflows.loc[
(my_workflows['name'] == workflow_name) & (my_workflows['archived.status'] == False),
'isModule']
if len(is_module) == 0:
'group']
if len(group) == 0:
raise ValueError(f'No workflow found with name: {workflow_name}')
if len(is_module) > 1:
if len(group) > 1:
raise ValueError(f'More than one workflow found with name: {workflow_name}')
return is_module.values[0]
module_groups = ['system-tools',
'data-factory-data-connection-etl',
dapineyro marked this conversation as resolved.
Show resolved Hide resolved
'data-factory',
'data-factory-omics-etl',
'drug-discovery',
'data-factory-omics-insights',
'intermediate'
]
if group.values[0] in module_groups:
return True
else:
return False

def get_project_list(self, workspace_id, verify=True):
"""Get all the project from a CloudOS workspace.
Expand Down
14 changes: 3 additions & 11 deletions cloudos/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,8 @@ def fetch_cloudos_id(self,
if resource not in allowed_resources:
raise ValueError('Your specified resource is not supported. ' +
f'Use one of the following: {allowed_resources}')
headers = {
"Content-type": "application/json",
"apikey": apikey
}
r = retry_requests_get("{}/api/v1/{}?teamId={}".format(cloudos_url,
resource,
workspace_id),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
content = json.loads(r.content)
if resource == 'workflows':
content = self.get_workflow_list(workspace_id, verify=verify)
for element in content:
if (element["name"] == name and
element["repository"]["platform"] == repository_platform and
Expand All @@ -167,6 +157,8 @@ def fetch_cloudos_id(self,
elif "importsFile" in element.keys() and element["importsFile"] == importsfile:
return element["_id"]
elif resource == 'projects':
r = self.get_project_list(workspace_id, verify=verify)
content = json.loads(r.content)
# New API projects endpoint spec
if type(content) is dict:
for element in content["projects"]:
Expand Down
12 changes: 9 additions & 3 deletions tests/test_clos/test_detect_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
APIKEY = 'vnoiweur89u2ongs'
CLOUDOS_URL = 'http://cloudos.lifebit.ai'
WORKSPACE_ID = 'lv89ufc838sdig'
PAGE_SIZE = 10
PAGE = 1
ARCHIVED_STATUS = "false"


@mock.patch('cloudos.clos', mock.MagicMock())
Expand All @@ -19,17 +22,20 @@ def test_detect_workflow():
API request is mocked and replicated with json files
"""
json_data = load_json_file(INPUT)
params = {"teamId": WORKSPACE_ID}
params = {"teamId": WORKSPACE_ID,
"pageSize": PAGE_SIZE,
"page": PAGE,
"archived.status": ARCHIVED_STATUS}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY
}
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"teamId={WORKSPACE_ID}&pageSize={PAGE_SIZE}&page={PAGE}&archived.status={ARCHIVED_STATUS}"
# mock GET method with the .json
responses.add(
responses.GET,
url=f"{CLOUDOS_URL}/api/v1/workflows?{search_str}",
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=json_data,
headers=header,
match=[matchers.query_param_matcher(params)],
Expand Down
23 changes: 17 additions & 6 deletions tests/test_clos/test_get_curated_workflow_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import pytest
import responses
from responses import matchers
from cloudos.clos import Cloudos
from cloudos.utils.errors import BadRequestException
from tests.functions_for_pytest import load_json_file
Expand All @@ -10,6 +11,7 @@
APIKEY = 'vnoiweur89u2ongs'
CLOUDOS_URL = 'http://cloudos.lifebit.ai'
WORKSPACE_ID = 'lv89ufc838sdig'
PAGE = 1


@mock.patch('cloudos.clos', mock.MagicMock())
Expand All @@ -20,12 +22,21 @@ def test_get_curated_workflow_list_correct_response():
API request is mocked and replicated with json files
"""
create_json = load_json_file(OUTPUT)
search_str = f"teamId={WORKSPACE_ID}"
params = {"teamId": WORKSPACE_ID,
"groups[]": ["curated", "featured", "predefined"],
"page": PAGE}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY}
search_str = f"search=&groups[]=curated&groups[]=featured&groups[]=predefined&page={PAGE}&teamId={WORKSPACE_ID}"
# mock GET method with the .json
responses.add(
responses.POST,
url=f"{CLOUDOS_URL}/api/v1/workflows/getByType?{search_str}",
responses.GET,
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=create_json,
headers=header,
match=[matchers.query_param_matcher(params)],
status=200)
# start cloudOS service
clos = Cloudos(apikey=APIKEY, cromwell_token=None, cloudos_url=CLOUDOS_URL)
Expand All @@ -46,11 +57,11 @@ def test_get_curated_workflow_list_incorrect_response():
error_message = {"statusCode": 400, "code": "BadRequest",
"message": "Bad Request.", "time": "2022-11-23_17:31:07"}
error_json = json.dumps(error_message)
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"search=&groups[]=curated&groups[]=featured&groups[]=predefined&page={PAGE}&teamId={WORKSPACE_ID}"
# mock GET method with the .json
responses.add(
responses.POST,
url=f"{CLOUDOS_URL}/api/v1/workflows/getByType?{search_str}",
responses.GET,
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=error_json,
status=400)
# raise 400 error
Expand Down
21 changes: 15 additions & 6 deletions tests/test_clos/test_get_workflow_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
APIKEY = 'vnoiweur89u2ongs'
CLOUDOS_URL = 'http://cloudos.lifebit.ai'
WORKSPACE_ID = 'lv89ufc838sdig'
PAGE_SIZE = 10
PAGE = 1
ARCHIVED_STATUS = "false"


@mock.patch('cloudos.clos', mock.MagicMock())
Expand All @@ -21,17 +24,20 @@ def test_get_workflow_list_correct_response():
API request is mocked and replicated with json files
"""
create_json = load_json_file(INPUT)
params = {"teamId": WORKSPACE_ID}
params = {"teamId": WORKSPACE_ID,
"pageSize": PAGE_SIZE,
"page": PAGE,
"archived.status": ARCHIVED_STATUS}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY
}
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"teamId={WORKSPACE_ID}&pageSize={PAGE_SIZE}&page={PAGE}&archived.status={ARCHIVED_STATUS}"
# mock GET method with the .json
responses.add(
responses.GET,
url=f"{CLOUDOS_URL}/api/v1/workflows?{search_str}",
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=create_json,
headers=header,
match=[matchers.query_param_matcher(params)],
Expand All @@ -55,17 +61,20 @@ def test_get_workflow_list_incorrect_response():
error_message = {"statusCode": 400, "code": "BadRequest",
"message": "Bad Request.", "time": "2022-11-23_17:31:07"}
error_json = json.dumps(error_message)
params = {"teamId": WORKSPACE_ID}
params = {"teamId": WORKSPACE_ID,
"pageSize": PAGE_SIZE,
"page": PAGE,
"archived.status": ARCHIVED_STATUS}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY
}
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"teamId={WORKSPACE_ID}&pageSize={PAGE_SIZE}&page={PAGE}&archived.status={ARCHIVED_STATUS}"
# mock GET method with the .json
responses.add(
responses.GET,
url=f"{CLOUDOS_URL}/api/v1/workflows?{search_str}",
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=error_json,
headers=header,
match=[matchers.query_param_matcher(params)],
Expand Down
Loading