Skip to content

Commit

Permalink
update boilerplate end2end tests (#4393)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Nov 9, 2023
1 parent fee63b6 commit 243a8cb
Showing 1 changed file with 88 additions and 23 deletions.
111 changes: 88 additions & 23 deletions boilerplate/flyte/end2end/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import time
import traceback
from typing import Dict, List, Mapping, Tuple
from typing import Dict, List, Mapping, Tuple, Optional

import click
import requests
Expand Down Expand Up @@ -50,15 +50,17 @@
("basics.named_outputs.simple_wf_with_named_outputs", {}),
# # Getting a 403 for the wikipedia image
# # ("basics.reference_task.wf", {}),
("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}),
("data_types_and_io.dataclass.dataclass_wf", {"x": 10, "y": 20}),
# Enums are not supported in flyteremote
# ("type_system.enums.enum_wf", {"c": "red"}),
("data_types_and_io.schema.df_wf", {"a": 42}),
("data_types_and_io.typed_schema.wf", {}),
("data_types_and_io.structured_dataset.simple_sd_wf", {"a": 42}),
# ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
],
"integrations-k8s-spark": [
("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}),
(
"k8s_spark_plugin.pyspark_pi.my_spark",
{"triggered_date": datetime.datetime.now()},
),
],
"integrations-kfpytorch": [
("kfpytorch_plugin.pytorch_mnist.pytorch_training_wf", {}),
Expand Down Expand Up @@ -89,20 +91,30 @@
}


def execute_workflow(remote, version, workflow_name, inputs):
def execute_workflow(
remote: FlyteRemote,
version,
workflow_name,
inputs,
cluster_pool_name: Optional[str] = None,
):
print(f"Fetching workflow={workflow_name} and version={version}")
wf = remote.fetch_workflow(name=workflow_name, version=version)
return remote.execute(wf, inputs=inputs, wait=False)
return remote.execute(wf, inputs=inputs, wait=False, cluster_pool=cluster_pool_name)


def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool:
def executions_finished(
executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]
) -> bool:
for executions in executions_by_wfgroup.values():
if not all([execution.is_done for execution in executions]):
return False
return True


def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]):
def sync_executions(
remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]
):
try:
for executions in executions_by_wfgroup.values():
for execution in executions:
Expand All @@ -125,6 +137,7 @@ def schedule_workflow_groups(
workflow_groups: List[str],
remote: FlyteRemote,
terminate_workflow_on_failure: bool,
cluster_pool_name: Optional[str] = None,
) -> Dict[str, bool]:
"""
Schedule workflows executions for all workflow groups and return True if all executions succeed, otherwise
Expand All @@ -135,14 +148,19 @@ def schedule_workflow_groups(
for wf_group in workflow_groups:
workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, [])
executions_by_wfgroup[wf_group] = [
execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows
execute_workflow(remote, tag, workflow[0], workflow[1], cluster_pool_name)
for workflow in workflows
]

# Wait for all executions to finish
attempt = 0
while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS):
while attempt == 0 or (
not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS
):
attempt += 1
print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s")
print(
f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s"
)
time.sleep(WAIT_TIME)
sync_executions(remote, executions_by_wfgroup)

Expand All @@ -158,9 +176,13 @@ def schedule_workflow_groups(
if len(non_succeeded_executions) != 0:
print(f"Failed executions for {wf_group}:")
for execution in non_succeeded_executions:
print(f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}")
print(
f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}"
)
if terminate_workflow_on_failure:
remote.terminate(execution, "aborting execution scheduled in functional test")
remote.terminate(
execution, "aborting execution scheduled in functional test"
)
# A workflow group succeeds iff all of its executions succeed
results[wf_group] = len(non_succeeded_executions) == 0
return results
Expand All @@ -179,25 +201,33 @@ def run(
priorities: List[str],
config_file_path,
terminate_workflow_on_failure: bool,
test_project_name: str,
test_project_domain: str,
cluster_pool_name: Optional[str] = None,
) -> List[Dict[str, str]]:
remote = FlyteRemote(
Config.auto(config_file=config_file_path),
default_project="flytesnacks",
default_domain="development",
test_project_name,
test_project_domain,
)

# For a given release tag and priority, this function filters the workflow groups from the flytesnacks
# manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ].
manifest_url = (
"https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
"https://raw.githubusercontent.com/flyteorg/flytesnacks/"
f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
)
r = requests.get(manifest_url)
parsed_manifest = r.json()
workflow_groups = []
workflow_groups = (
["lite"]
if "lite" in priorities
else [group["name"] for group in parsed_manifest if group["priority"] in priorities]
else [
group["name"]
for group in parsed_manifest
if group["priority"] in priorities
]
)

results = []
Expand All @@ -215,7 +245,11 @@ def run(
valid_workgroups.append(workflow_group)

results_by_wfgroup = schedule_workflow_groups(
flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure
flytesnacks_release_tag,
valid_workgroups,
remote,
terminate_workflow_on_failure,
cluster_pool_name,
)

for workflow_group, succeeded in results_by_wfgroup.items():
Expand Down Expand Up @@ -246,6 +280,9 @@ def run(


@click.command()
@click.argument("flytesnacks_release_tag")
@click.argument("priorities")
@click.argument("config_file")
@click.option(
"--return_non_zero_on_failure",
default=False,
Expand All @@ -258,18 +295,46 @@ def run(
is_flag=True,
help="Abort failing workflows upon exit",
)
@click.argument("flytesnacks_release_tag")
@click.argument("priorities")
@click.argument("config_file")
@click.option(
"--test_project_name",
default="flytesnacks",
type=str,
is_flag=False,
help="Name of project to run functional tests on",
)
@click.option(
"--test_project_domain",
default="development",
type=str,
is_flag=False,
help="Name of domain in project to run functional tests on",
)
@click.argument(
"cluster_pool_name",
required=False,
type=str,
default=None,
)
def cli(
flytesnacks_release_tag,
priorities,
config_file,
return_non_zero_on_failure,
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
):
print(f"return_non_zero_on_failure={return_non_zero_on_failure}")
results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure)
results = run(
flytesnacks_release_tag,
priorities,
config_file,
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
)

# Write a json object in its own line describing the result of this run to stdout
print(f"Result of run:\n{json.dumps(results)}")
Expand Down

0 comments on commit 243a8cb

Please sign in to comment.