Skip to content

Commit

Permalink
Merge release 0.7.1 (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored May 8, 2020
1 parent 6a322e7 commit 597a7b6
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions tests/flytekit/unit/models/test_dynamic_wfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,76 @@ def test_dynamic_launch_plan_yielding():
assert dj_spec.outputs[0].var == "out"
assert dj_spec.outputs[0].binding.promise.node_id == node_id
assert dj_spec.outputs[0].binding.promise.var == "task_output"


@_tasks.python_task
def empty_task(wf_params):
wf_params.logging.info("Running empty task")


@_workflow.workflow_class()
class EmptyWorkflow(object):
empty_task_task_execution = empty_task()


constant_workflow_lp = EmptyWorkflow.create_launch_plan()


@_tasks.outputs(out=_Types.Integer)
@_tasks.dynamic_task
def lp_yield_empty_wf(wf_params, out):
wf_params.logging.info("Running inner task... yielding a launchplan for empty workflow")
constant_lp_yielding_task_execution = constant_workflow_lp()
yield constant_lp_yielding_task_execution
out.set(42)


def test_dynamic_launch_plan_yielding_of_constant_workflow():
outputs = lp_yield_empty_wf.unit_test()
# TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected
# in the output of a dynamic task.
dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME]

assert len(dj_spec.nodes) == 1
assert len(dj_spec.outputs) == 1
assert dj_spec.outputs[0].var == "out"
assert len(outputs.keys()) == 2


@_tasks.inputs(num=_Types.Integer)
@_tasks.python_task
def log_only_task(wf_params, num):
wf_params.logging.info("{} was called".format(num))


@_workflow.workflow_class()
class InputOnlyWorkflow(object):
a = _workflow.Input(_Types.Integer, default=5, help="Input for inner workflow")
log_only_task_execution = log_only_task(num=a)


input_only_workflow_lp = InputOnlyWorkflow.create_launch_plan()


@_tasks.dynamic_task
def lp_yield_input_only_wf(wf_params):
wf_params.logging.info("Running inner task... yielding a launchplan for input only workflow")
input_only_workflow_lp_execution = input_only_workflow_lp()
yield input_only_workflow_lp_execution


def test_dynamic_launch_plan_yielding_of_input_only_workflow():
outputs = lp_yield_input_only_wf.unit_test()
# TODO: Currently, Flytekit will not return early and not do anything if there are any workflow nodes detected
# in the output of a dynamic task.
dj_spec = outputs[_sdk_constants.FUTURES_FILE_NAME]

assert len(dj_spec.nodes) == 1
assert len(dj_spec.outputs) == 0
assert len(outputs.keys()) == 2

# Using the id of the launch plan node, and then appending /inputs.pb to the string, should give you in the outputs
# map the LiteralMap of the inputs of that node
input_key = "{}/inputs.pb".format(dj_spec.nodes[0].id)
lp_input_map = outputs[input_key]
assert lp_input_map.literals['a'] is not None

0 comments on commit 597a7b6

Please sign in to comment.