Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VADC-785 #119

Merged
merged 14 commits into from
Nov 22, 2023
52 changes: 33 additions & 19 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "argowrapper"
version = "1.8.0"
version = "1.8.1"
description = "argo wrapper for va workflow"
authors = ["UchicagoZchen138 <[email protected]>"]
repository = "https://github.com/uc-cdis/argo-wrapper"
Expand All @@ -21,10 +21,12 @@ importlib-resources = "^5.4.0"
requests = "^2.27.1"
PyJWT = "^2.4.0"
pytest-cov = "^3.0.0"
freezegun = "^1.2.2"
tianj7 marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"


[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
3 changes: 3 additions & 0 deletions src/argowrapper/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
WORKFLOW_KIND: Final = "workflow"
GEN3_USER_METADATA_LABEL: Final = "gen3username"
GEN3_TEAM_PROJECT_METADATA_LABEL: Final = "gen3teamproject"
GEN3_WORKFLOW_PHASE_LABEL: Final = "phase"
GEN3_SUBMIT_TIMESTAMP_LABEL: Final = "submittedAt"
GEN3_NON_VA_WORKFLOW_MONTHLY_CAP: Final = 20


class POD_COMPLETION_STRATEGY(Enum):
Expand Down
38 changes: 38 additions & 0 deletions src/argowrapper/engine/argo_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@
WORKFLOW,
GEN3_USER_METADATA_LABEL,
GEN3_TEAM_PROJECT_METADATA_LABEL,
GEN3_WORKFLOW_PHASE_LABEL,
GEN3_SUBMIT_TIMESTAMP_LABEL,
)
from argowrapper.engine.helpers import argo_engine_helper
from argowrapper.engine.helpers.workflow_factory import WorkflowFactory
from argowrapper.workflows.argo_workflows.gwas import GWAS

from datetime import datetime


class ArgoEngine:
"""
Expand Down Expand Up @@ -378,6 +382,40 @@ def get_workflows_for_user(self, auth_header: str) -> List[Dict]:
user_only_workflows.append(workflow)
return user_only_workflows

def get_user_workflows_for_current_month(self, auth_header: str) -> List[Dict]:
"""
Get the list of all succeeded and running workflows the current user owns in the current month.
Each item in the list contains the workflow name, its status, start and end time.

Args:
auth_header: authorization header that contains the user's jwt token

Returns:
List[Dict]: List of workflow dictionaries with details of workflows
that the user has ran.

Raises:
raises Exception in case of any error.
"""
username = argo_engine_helper.get_username_from_token(auth_header)
user_label = argo_engine_helper.convert_gen3username_to_pod_label(username)
label_selector = f"{GEN3_USER_METADATA_LABEL}={user_label}"
all_user_workflows = self.get_workflows_for_label_selector(
label_selector=label_selector
)
user_monthly_workflows = []
for workflow in all_user_workflows:
if workflow[GEN3_WORKFLOW_PHASE_LABEL] in {"Running", "Succeeded"}:
submitted_time_str = workflow[GEN3_SUBMIT_TIMESTAMP_LABEL]
submitted_time = datetime.strptime(
submitted_time_str, "%Y-%m-%dT%H:%M:%SZ"
)
first_day_of_month = datetime.today().replace(day=1)
if submitted_time.date() >= first_day_of_month.date():
user_monthly_workflows.append(workflow)

return user_monthly_workflows

def get_workflows_for_label_selector(self, label_selector: str) -> List[Dict]:
try:
workflow_list_return = self.api_instance.list_workflows(
Expand Down
57 changes: 53 additions & 4 deletions src/argowrapper/routes/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
from starlette.status import (
HTTP_200_OK,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_500_INTERNAL_SERVER_ERROR,
)
from argowrapper.constants import (
TEAM_PROJECT_FIELD_NAME,
TEAM_PROJECT_LIST_FIELD_NAME,
GEN3_TEAM_PROJECT_METADATA_LABEL,
GEN3_USER_METADATA_LABEL,
GEN3_NON_VA_WORKFLOW_MONTHLY_CAP,
)

from argowrapper import logger
Expand Down Expand Up @@ -152,6 +154,7 @@ def check_user_billing_id(request):
"""

header = {"Authorization": request.headers.get("Authorization")}
# TODO: Make this configurable
url = "http://fence-service/user"
try:
r = requests.get(url=url, headers=header)
Expand All @@ -173,6 +176,38 @@ def check_user_billing_id(request):
return None


def check_user_reached_monthly_workflow_cap(request_token):
"""
Query Argo service to see how many successful run user already
have in the current calendar month. If the number is greater than
the threshold, return error.
"""

try:
current_month_workflows = argo_engine.get_user_workflows_for_current_month(
request_token
)

if len(current_month_workflows) >= GEN3_NON_VA_WORKFLOW_MONTHLY_CAP:
logger.info(
"User already executed {} workflows this month and cannot create new ones anymore.".format(
len(current_month_workflows)
)
)
logger.info(
"The currently monthly cap is {}.".format(
GEN3_NON_VA_WORKFLOW_MONTHLY_CAP
)
)
return True

return False
except Exception as e:
logger.error(e)
traceback.print_exc()
raise e


@router.get("/test")
def test():
"""route to test that the argo-workflow is correctly running"""
Expand All @@ -188,13 +223,27 @@ def submit_workflow(
) -> str:
"""route to submit workflow"""
try:
reached_monthly_cap = False

# check if user has a billing id tag:
billing_id = check_user_billing_id(request)
# submit workflow:
return argo_engine.workflow_submission(
request_body, request.headers.get("Authorization"), billing_id
)

# if user has billing_id (non-VA user), check if they already reached the monthly cap
if billing_id:
reached_monthly_cap = check_user_reached_monthly_workflow_cap(
request.headers.get("Authorization")
)

# submit workflow:
if not reached_monthly_cap:
return argo_engine.workflow_submission(
request_body, request.headers.get("Authorization"), billing_id
)
else:
return HTMLResponse(
content="You have reached the workflow monthly cap.",
status_code=HTTP_403_FORBIDDEN,
)
except Exception as exception:
return HTMLResponse(
content=str(exception),
Expand Down
51 changes: 51 additions & 0 deletions test/test_argo_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from test.constants import EXAMPLE_AUTH_HEADER
from argowrapper.workflows.argo_workflows.gwas import *
from unittest.mock import patch
from freezegun import freeze_time


class WorkFlow:
Expand Down Expand Up @@ -777,3 +778,53 @@ def test_get_archived_workflow_wf_name_and_team_project():
) = engine._get_archived_workflow_wf_name_and_team_project("dummy_uid")
assert given_name == "dummy_wf_name"
assert team_project == "dummy_team_project_label"


@freeze_time("Nov 16th, 2023")
def test_get_user_workflows_for_current_month(monkeypatch):

engine = ArgoEngine()
workflows_mock_response = [
{
"uid": "uid_1",
"phase": "Running",
"submittedAt": "2023-11-14T16:44:02Z",
},
{
"uid": "uid_2",
"phase": "Succeeded",
"submittedAt": "2023-11-15T17:52:52Z",
},
{
"uid": "uid_3",
"phase": "Failed",
"submittedAt": "2023-11-02T00:00:00Z",
},
{
"uid": "uid_4",
"phase": "Succeeded",
"submittedAt": "2023-10-31T00:00:00Z",
},
]

expected_workflow_reponse = [
{
"uid": "uid_1",
"phase": "Running",
"submittedAt": "2023-11-14T16:44:02Z",
},
{
"uid": "uid_2",
"phase": "Succeeded",
"submittedAt": "2023-11-15T17:52:52Z",
},
]
engine.get_workflows_for_label_selector = mock.MagicMock(
return_value=workflows_mock_response
)

user_monthly_workflow = engine.get_user_workflows_for_current_month(
EXAMPLE_AUTH_HEADER
)

assert user_monthly_workflow == expected_workflow_reponse
Loading
Loading