From 597a7b6c6c5ddd1d9e3b9c7e8b2db8aa2bf8f73c Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 8 May 2020 12:16:28 -0700 Subject: [PATCH] Merge release 0.7.1 (#113) --- .../flytekit/unit/models/test_dynamic_wfs.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/tests/flytekit/unit/models/test_dynamic_wfs.py b/tests/flytekit/unit/models/test_dynamic_wfs.py index f0e69a4bd0..a1ca54fa6d 100644 --- a/tests/flytekit/unit/models/test_dynamic_wfs.py +++ b/tests/flytekit/unit/models/test_dynamic_wfs.py @@ -52,3 +52,76 @@ def test_dynamic_launch_plan_yielding(): assert dj_spec.outputs[0].var == "out" assert dj_spec.outputs[0].binding.promise.node_id == node_id assert dj_spec.outputs[0].binding.promise.var == "task_output" + + +@_tasks.python_task +def empty_task(wf_params): + wf_params.logging.info("Running empty task") + + +@_workflow.workflow_class() +class EmptyWorkflow(object): + empty_task_task_execution = empty_task() + + +constant_workflow_lp = EmptyWorkflow.create_launch_plan() + + +@_tasks.outputs(out=_Types.Integer) +@_tasks.dynamic_task +def lp_yield_empty_wf(wf_params, out): + wf_params.logging.info("Running inner task... yielding a launchplan for empty workflow") + constant_lp_yielding_task_execution = constant_workflow_lp() + yield constant_lp_yielding_task_execution + out.set(42) + + +def test_dynamic_launch_plan_yielding_of_constant_workflow(): + outputs = lp_yield_empty_wf.unit_test() + # TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected + # in the output of a dynamic task. + dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME] + + assert len(dj_spec.nodes) == 1 + assert len(dj_spec.outputs) == 1 + assert dj_spec.outputs[0].var == "out" + assert len(outputs.keys()) == 2 + + +@_tasks.inputs(num=_Types.Integer) +@_tasks.python_task +def log_only_task(wf_params, num): + wf_params.logging.info("{} was called".format(num)) + + +@_workflow.workflow_class() +class InputOnlyWorkflow(object): + a = _workflow.Input(_Types.Integer, default=5, help="Input for inner workflow") + log_only_task_execution = log_only_task(num=a) + + +input_only_workflow_lp = InputOnlyWorkflow.create_launch_plan() + + +@_tasks.dynamic_task +def lp_yield_input_only_wf(wf_params): + wf_params.logging.info("Running inner task... yielding a launchplan for input only workflow") + input_only_workflow_lp_execution = input_only_workflow_lp() + yield input_only_workflow_lp_execution + + +def test_dynamic_launch_plan_yielding_of_input_only_workflow(): + outputs = lp_yield_input_only_wf.unit_test() + # TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected + # in the output of a dynamic task. + dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME] + + assert len(dj_spec.nodes) == 1 + assert len(dj_spec.outputs) == 0 + assert len(outputs.keys()) == 2 + + # Using the id of the launch plan node, and then appending /inputs.pb to the string, should give you in the outputs + # map the LiteralMap of the inputs of that node + input_key = "{}/inputs.pb".format(dj_spec.nodes[0].id) + lp_input_map = outputs[input_key] + assert lp_input_map.literals['a'] is not None