Skip to content

Commit

Permalink
fix: ensure get_workflows_for_user returns only the user workflows
Browse files Browse the repository at this point in the history
...meaning only workflows executed by the user OUTSIDE of a "team project"
  • Loading branch information
pieterlukasse committed Nov 10, 2023
1 parent febe655 commit 173f21c
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 32 deletions.
28 changes: 20 additions & 8 deletions src/argowrapper/engine/argo_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,11 @@ def retry_workflow(self, workflow_name: str, uid: str) -> str:
)
return f"archived {workflow_name} retried sucessfully"

def _get_archived_workflow_given_name(self, archived_workflow_uid) -> str:
def _get_archived_workflow_given_name_and_team_project(
self, archived_workflow_uid
) -> (str, str):
"""
Gets the name details for the given archived workflow
Gets the name and team project details for the given archived workflow
It tries to get it from cache first. If not in cache, it will query the
argo endpoint for archived workflows and parse out the 'workflow_name'
Expand All @@ -301,8 +303,12 @@ def _get_archived_workflow_given_name(self, archived_workflow_uid) -> str:
workflow_details = self.get_workflow_details(None, archived_workflow_uid)
# get the workflow given name from the parsed details:
given_name = workflow_details["wf_name"]
self.workflow_given_names_cache[archived_workflow_uid] = given_name
return given_name
team_project = workflow_details[GEN3_TEAM_PROJECT_METADATA_LABEL]
self.workflow_given_names_cache[archived_workflow_uid] = (
given_name,
team_project,
)
return given_name, team_project

def get_workflows_for_team_projects_and_user(
self, team_projects: List[str], auth_header: str
Expand Down Expand Up @@ -340,14 +346,14 @@ def get_workflows_for_team_project(self, team_project: str) -> List[Dict]:
)
label_selector = f"{GEN3_TEAM_PROJECT_METADATA_LABEL}={team_project_label}"
workflows = self.get_workflows_for_label_selector(label_selector=label_selector)
for workflow in workflows:
workflow["team_project"] = team_project
return workflows

def get_workflows_for_user(self, auth_header: str) -> List[Dict]:
"""
Get the list of all workflows for the current user. Each item in the list
contains the workflow name, its status, start and end time.
Considers solely the workflows that are labelled with ONLY the user name (so no
team project label)
Args:
auth_header: authorization header that contains the user's jwt token
Expand All @@ -362,9 +368,15 @@ def get_workflows_for_user(self, auth_header: str) -> List[Dict]:
username = argo_engine_helper.get_username_from_token(auth_header)
user_label = argo_engine_helper.convert_gen3username_to_pod_label(username)
label_selector = f"{GEN3_USER_METADATA_LABEL}={user_label}"
return self.get_workflows_for_label_selector(
all_user_workflows = self.get_workflows_for_label_selector(
label_selector=label_selector
) # TODO - this part would benefit from a system test
user_only_workflows = []
for workflow in all_user_workflows:
# keep only workflows that have an empty team project:
if not workflow[GEN3_TEAM_PROJECT_METADATA_LABEL]:
user_only_workflows.append(workflow)
return user_only_workflows

def get_workflows_for_label_selector(self, label_selector: str) -> List[Dict]:
try:
Expand Down Expand Up @@ -402,7 +414,7 @@ def get_workflows_for_label_selector(self, label_selector: str) -> List[Dict]:
argo_engine_helper.parse_list_item(
workflow,
workflow_type="archived_workflow",
get_archived_workflow_given_name=self._get_archived_workflow_given_name,
get_archived_workflow_given_name_and_team_project=self._get_archived_workflow_given_name_and_team_project,
)
for workflow in archived_workflow_list_return.items
]
Expand Down
16 changes: 12 additions & 4 deletions src/argowrapper/engine/helpers/argo_engine_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,30 @@ def parse_details(
def parse_list_item(
workflow_details: Dict[str, any],
workflow_type: str,
get_archived_workflow_given_name: Callable = None,
get_archived_workflow_given_name_and_team_project: Callable = None,
) -> Dict[str, any]:
"""Parse the return of workflow list view"""
result = parse_common_details(
workflow_details=workflow_details, workflow_type=workflow_type
)
if get_archived_workflow_given_name is None:
if get_archived_workflow_given_name_and_team_project is None:
result["wf_name"] = (
workflow_details["metadata"].get("annotations", {}).get("workflow_name")
)
result[GEN3_TEAM_PROJECT_METADATA_LABEL] = (
workflow_details["metadata"]
.get("labels")
.get(GEN3_TEAM_PROJECT_METADATA_LABEL)
)
else:
# this is needed because archived list items do not have metadata.annotations
# this is needed because archived list items do not have metadata.annotations or meta.labels
# returned by the list service...so we need to call another service to get it:
result["wf_name"] = get_archived_workflow_given_name(
wf_name, team_project = get_archived_workflow_given_name_and_team_project(
workflow_details["metadata"].get("uid")
)
result["wf_name"] = wf_name
result[GEN3_TEAM_PROJECT_METADATA_LABEL] = team_project

result["uid"] = workflow_details["metadata"].get("uid")
return result

Expand Down
4 changes: 1 addition & 3 deletions src/argowrapper/routes/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ def get_workflows(
auth_header=request.headers.get("Authorization"),
)
else:
# no team_projects, so fall back to default behavior of returning just the user workflows.
# (this does mean that users will still have access to the workflows _they_ executed, even
# if after get kicked out of a "team project"):
# no team_projects, so fall back to querying the workflows that belong just to the user (no team project):
return argo_engine.get_workflows_for_user(
request.headers.get("Authorization")
)
Expand Down
56 changes: 42 additions & 14 deletions test/test_argo_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,7 @@ def test_argo_engine_get_workflows_for_user_and_team_projects_suceeded():
"creationTimestamp": "2023-03-22T16:48:51Z",
"labels": {
GEN3_USER_METADATA_LABEL: "dummyuser",
GEN3_TEAM_PROJECT_METADATA_LABEL: argo_engine_helper.convert_gen3teamproject_to_pod_label(
"dummyteam"
),
GEN3_TEAM_PROJECT_METADATA_LABEL: "",
},
},
"spec": {"arguments": {}, "shutdown": "Terminate"},
Expand Down Expand Up @@ -371,18 +369,11 @@ def test_argo_engine_get_workflows_for_user_and_team_projects_suceeded():
"argowrapper.engine.argo_engine.argo_engine_helper.convert_gen3username_to_pod_label"
):
uniq_workflow_list = engine.get_workflows_for_user("test_jwt_token")
assert len(uniq_workflow_list) == 3
assert len(uniq_workflow_list) == 1
# assert on values as mapped in argo_engine_helper.parse_details():
assert "Canceled" == uniq_workflow_list[0]["phase"]
assert "workflow_three" == uniq_workflow_list[2]["name"]
assert "custom_name_active1" == uniq_workflow_list[0]["wf_name"]
assert "2023-03-22T16:48:51Z" == uniq_workflow_list[0]["submittedAt"]
# preference is given to active workflows, so we expect [1] to have this name instead of custom_name_archived:
assert "custom_name_active2" == uniq_workflow_list[1]["wf_name"]
assert "2023-03-22T17:47:51Z" == uniq_workflow_list[1]["submittedAt"]
# workflow [2] is archived:
assert "custom_name_archived" == uniq_workflow_list[2]["wf_name"]
assert "2023-03-22T19:59:59Z" == uniq_workflow_list[2]["submittedAt"]
assert (
GEN3_USER_METADATA_LABEL
in engine.api_instance.list_workflows.call_args[1][
Expand All @@ -396,9 +387,13 @@ def test_argo_engine_get_workflows_for_user_and_team_projects_suceeded():
]
)

# leave out the one that has no team project, to simulate the argo query:
engine.api_instance.list_workflows = mock.MagicMock(
return_value=WorkFlow(argo_workflows_mock_raw_response[1:])
)
# test also the get_workflows_for_team_project:
uniq_workflow_list = engine.get_workflows_for_team_project("dummyteam")
assert len(uniq_workflow_list) == 3
assert len(uniq_workflow_list) == 2
assert (
engine.api_instance.list_workflows.call_args[1][
"list_options_label_selector"
Expand All @@ -411,9 +406,14 @@ def test_argo_engine_get_workflows_for_user_and_team_projects_suceeded():
]
== f"{GEN3_TEAM_PROJECT_METADATA_LABEL}={argo_engine_helper.convert_gen3teamproject_to_pod_label('dummyteam')}"
)
# get_workflows_for_team_projects should return the same items as get_workflows_for_team_project if queried with just the same team:
# get_workflows_for_team_projects should return the same items as get_workflows_for_team_project if queried with just the one team:
# (actually we need a smarter mock method to make the team project name count in this test...TODO - write better mock methods that simulate the
# underlying filtering by Argo and returning different results for different team project queries)
uniq_workflow_list = engine.get_workflows_for_team_projects(["dummyteam"])
assert len(uniq_workflow_list) == 3
assert len(uniq_workflow_list) == 2
assert "custom_name_active2" == uniq_workflow_list[0]["wf_name"]
assert "custom_name_archived" == uniq_workflow_list[1]["wf_name"]
assert "2023-03-22T19:59:59Z" == uniq_workflow_list[1]["submittedAt"]


def test_argo_engine_get_workflows_for_user_failed():
Expand Down Expand Up @@ -713,3 +713,31 @@ def test_argo_engine_get_workflow_log_succeeded():
== "Timeout occurred while fetching attrition table information. Please retry."
)
assert workflow_errors[0]["error_message"] == "Error (exit code 126)"


def test_get_archived_workflow_given_name_and_team_project():
"""check if this helper method returns the expected values and returns results from cache if called a second time for the same workflow"""
engine = ArgoEngine()
mock_return_wf = {
"wf_name": "dummy_wf_name",
GEN3_TEAM_PROJECT_METADATA_LABEL: "dummy_team_project_label",
}

engine.get_workflow_details = mock.MagicMock(return_value=mock_return_wf)
(
given_name,
team_project,
) = engine._get_archived_workflow_given_name_and_team_project("dummy_uid")
assert given_name == "dummy_wf_name"
assert team_project == "dummy_team_project_label"

# test the internal caching that happens at _get_archived_workflow_given_name_and_team_project,
# by setting the get_workflow_details to return None and show that it was not called,
# as the result is still the previous one:
engine.get_workflow_details = mock.MagicMock(return_value=None)
(
given_name,
team_project,
) = engine._get_archived_workflow_given_name_and_team_project("dummy_uid")
assert given_name == "dummy_wf_name"
assert team_project == "dummy_team_project_label"
9 changes: 6 additions & 3 deletions test/test_argo_engine_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,16 @@ def test_parse_list_item():
assert parsed_item.get("wf_name") == "custom_name"
assert parsed_item.get("uid") == "test_uid"

def dummy_get_archived_workflow_given_name(workflow_uid):
return "dummy_wf_name"
def dummy_get_archived_workflow_given_name_and_team_project(workflow_uid):
return "dummy_wf_name", "dummy_team_project"

parsed_item = argo_engine_helper.parse_list_item(
workflow_item, "archived_workflow", dummy_get_archived_workflow_given_name
workflow_item,
"archived_workflow",
dummy_get_archived_workflow_given_name_and_team_project,
)
assert parsed_item.get("wf_name") == "dummy_wf_name"
assert parsed_item.get(GEN3_TEAM_PROJECT_METADATA_LABEL) == "dummy_team_project"


def test_remove_list_duplicates():
Expand Down

0 comments on commit 173f21c

Please sign in to comment.