From 53e376eff610129581679eb2fa2c085faa351958 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 7 Mar 2023 16:01:25 -0800 Subject: [PATCH 1/9] Run container and pod task in map task Signed-off-by: Kevin Su --- flytekit/core/map_task.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 48d0f0b335..45a878977b 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -8,7 +8,7 @@ from contextlib import contextmanager from itertools import count from typing import Any, Dict, List, Optional - +from flytekit import ContainerTask from flytekit.configuration import SerializationSettings from flytekit.core import tracker from flytekit.core.base_task import PythonTask @@ -267,8 +267,9 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = 0, min_succes successfully before terminating this task and marking it successful. """ - if not isinstance(task_function, PythonFunctionTask): - raise ValueError( - f"Only Flyte python task types are supported in map tasks currently, received {type(task_function)}" - ) - return MapPythonTask(task_function, concurrency=concurrency, min_success_ratio=min_success_ratio, **kwargs) + if task_function.task_type in ["python-task", "raw-container", "sidecar"]: + return MapPythonTask(task_function, concurrency=concurrency, min_success_ratio=min_success_ratio, **kwargs) + + raise ValueError( + f"Only Flyte python-task, raw-container, and sidecar types are supported in map tasks currently, received {type(task_function)}" + ) From 2750fd3bd4f8b126dbacf42a7f7f54a70e4c4d39 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 11:37:10 -0800 Subject: [PATCH 2/9] wip Signed-off-by: Kevin Su --- flytekit/core/container_task.py | 2 +- flytekit/core/map_task.py | 21 ++++++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/flytekit/core/container_task.py b/flytekit/core/container_task.py index 677142736c..2001f450ae 100644 --- a/flytekit/core/container_task.py +++ b/flytekit/core/container_task.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, Dict, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, List, Optional, Tuple, Type from flytekit.configuration import SerializationSettings from flytekit.core.base_task import PythonTask, TaskMetadata diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 45a878977b..b8cc95b870 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -8,6 +8,7 @@ from contextlib import contextmanager from itertools import count from typing import Any, Dict, List, Optional + from flytekit import ContainerTask from flytekit.configuration import SerializationSettings from flytekit.core import tracker @@ -55,8 +56,11 @@ def __init__( collection_interface = transform_interface_to_list_interface(python_function_task.python_interface) instance = next(self._ids) - _, mod, f, _ = tracker.extract_task_module(python_function_task.task_function) - name = f"{mod}.mapper_{f}_{instance}" + if isinstance(python_function_task, ContainerTask): + name = f"raw_container_task.mapper_{python_function_task.name}_{instance}" + else: + _, mod, f, _ = tracker.extract_task_module(python_function_task.task_function) + name = f"{mod}.mapper_{f}_{instance}" self._cmd_prefix = None self._run_task = python_function_task @@ -114,14 +118,20 @@ def prepare_target(self): self._run_task.reset_command_fn() def get_container(self, settings: SerializationSettings) -> Container: + if isinstance(self._run_task, ContainerTask): + return self._run_task.get_container(settings) with self.prepare_target(): return self._run_task.get_container(settings) def get_k8s_pod(self, settings: SerializationSettings) -> K8sPod: + if isinstance(self._run_task, ContainerTask): + return self._run_task.get_k8s_pod(settings) with self.prepare_target(): return self._run_task.get_k8s_pod(settings) def get_sql(self, settings: SerializationSettings) -> Sql: + if isinstance(self._run_task, ContainerTask): + return self._run_task.get_sql(settings) with self.prepare_target(): return self._run_task.get_sql(settings) @@ -221,7 +231,12 @@ def _raw_execute(self, **kwargs) -> Any: return outputs -def map_task(task_function: PythonFunctionTask, concurrency: int = 0, min_success_ratio: float = 1.0, **kwargs): +def map_task( + task_function: typing.Union[PythonFunctionTask, ContainerTask], + concurrency: int = 0, + min_success_ratio: float = 1.0, + **kwargs, +): """ Use a map task for parallelizable tasks that run across a list of an input type. A map task can be composed of any individual :py:class:`flytekit.PythonFunctionTask`. From 5692e7bed26cb3eae8a585c591a59472651282b5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 12:01:39 -0800 Subject: [PATCH 3/9] wip Signed-off-by: Kevin Su --- flytekit/core/map_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index b8cc95b870..cfe88818ec 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -36,7 +36,7 @@ class MapPythonTask(PythonTask): def __init__( self, - python_function_task: PythonFunctionTask, + python_function_task: typing.Union[PythonFunctionTask, ContainerTask], concurrency: Optional[int] = None, min_success_ratio: Optional[float] = None, **kwargs, @@ -282,7 +282,7 @@ def map_task( successfully before terminating this task and marking it successful. """ - if task_function.task_type in ["python-task", "raw-container", "sidecar"]: + if isinstance(task_function, PythonFunctionTask) or isinstance(task_function, ContainerTask): return MapPythonTask(task_function, concurrency=concurrency, min_success_ratio=min_success_ratio, **kwargs) raise ValueError( From 17d45113e973890e83f72b94b134f44dc307ee3a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 15:25:23 -0800 Subject: [PATCH 4/9] nit Signed-off-by: Kevin Su --- flytekit/core/map_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index cfe88818ec..26f64d8e84 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -286,5 +286,5 @@ def map_task( return MapPythonTask(task_function, concurrency=concurrency, min_success_ratio=min_success_ratio, **kwargs) raise ValueError( - f"Only Flyte python-task, raw-container, and sidecar types are supported in map tasks currently, received {type(task_function)}" + f"Only Flyte python-task, and raw-container types are supported in map tasks currently, received {type(task_function)}" ) From be94c68a0ce37525cd66c696ab1e7f5d6bbd7db5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 15:32:06 -0800 Subject: [PATCH 5/9] add tests Signed-off-by: Kevin Su --- flytekit/core/container_task.py | 4 +-- tests/flytekit/unit/core/test_map_task.py | 34 ++++++++++++++++++++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/flytekit/core/container_task.py b/flytekit/core/container_task.py index 2001f450ae..90e3eff530 100644 --- a/flytekit/core/container_task.py +++ b/flytekit/core/container_task.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, List, Optional, OrderedDict, Tuple, Type, Union from flytekit.configuration import SerializationSettings from flytekit.core.base_task import PythonTask, TaskMetadata @@ -36,7 +36,7 @@ def __init__( name: str, image: str, command: List[str], - inputs: Optional[Dict[str, Tuple[Type, Any]]] = None, + inputs: Optional[Union[Dict[str, Tuple[Type, Any]], OrderedDict[str, Type]]] = None, metadata: Optional[TaskMetadata] = None, arguments: Optional[List[str]] = None, outputs: Optional[Dict[str, Type]] = None, diff --git a/tests/flytekit/unit/core/test_map_task.py b/tests/flytekit/unit/core/test_map_task.py index 95927873d0..d16bfac54c 100644 --- a/tests/flytekit/unit/core/test_map_task.py +++ b/tests/flytekit/unit/core/test_map_task.py @@ -4,7 +4,7 @@ import pytest import flytekit.configuration -from flytekit import LaunchPlan, map_task +from flytekit import ContainerTask, LaunchPlan, kwtypes, map_task from flytekit.configuration import Image, ImageConfig from flytekit.core.map_task import MapPythonTask from flytekit.core.task import TaskMetadata, task @@ -24,6 +24,22 @@ def serialization_settings(): ) +raw_container = ContainerTask( + name="ellipse-area-metadata-python", + input_data_dir="/var/inputs", + output_data_dir="/var/outputs", + inputs=kwtypes(a=int), + outputs=kwtypes(area=float), + image="flyte/raw-container:v1", + command=[ + "python", + "test.py", + "{{.inputs.a}}", + "/var/outputs", + ], +) + + @task def t1(a: int) -> str: b = a + 2 @@ -96,6 +112,22 @@ def test_serialization(serialization_settings): ] +def test_serialization_with_raw_container(serialization_settings): + maptask = map_task(raw_container, metadata=TaskMetadata(retries=1)) + task_spec = get_serializable(OrderedDict(), serialization_settings, maptask) + + # By default all map_task tasks will have their custom fields set. + assert task_spec.template.custom["minSuccessRatio"] == 1.0 + assert task_spec.template.type == "container_array" + assert task_spec.template.task_type_version == 1 + assert task_spec.template.container.command == [ + "python", + "test.py", + "{{.inputs.a}}", + "/var/outputs", + ] + + @pytest.mark.parametrize( "custom_fields_dict, expected_custom_fields", [ From ab5ede1ccaffb36b8ad66f865fb18aed2bd44494 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 15:33:13 -0800 Subject: [PATCH 6/9] nit Signed-off-by: Kevin Su --- flytekit/core/map_task.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 26f64d8e84..6fcaf8aa63 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -124,14 +124,10 @@ def get_container(self, settings: SerializationSettings) -> Container: return self._run_task.get_container(settings) def get_k8s_pod(self, settings: SerializationSettings) -> K8sPod: - if isinstance(self._run_task, ContainerTask): - return self._run_task.get_k8s_pod(settings) with self.prepare_target(): return self._run_task.get_k8s_pod(settings) def get_sql(self, settings: SerializationSettings) -> Sql: - if isinstance(self._run_task, ContainerTask): - return self._run_task.get_sql(settings) with self.prepare_target(): return self._run_task.get_sql(settings) From a48e7f0dae35f68d40c87330e99f2292e3472680 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 16:11:08 -0800 Subject: [PATCH 7/9] nit Signed-off-by: Kevin Su --- flytekit/core/container_task.py | 2 +- flytekit/core/map_task.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flytekit/core/container_task.py b/flytekit/core/container_task.py index 90e3eff530..847ff951fd 100644 --- a/flytekit/core/container_task.py +++ b/flytekit/core/container_task.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, Callable, Dict, List, Optional, OrderedDict, Tuple, Type, Union +from typing import Any, Dict, List, Optional, OrderedDict, Tuple, Type, Union from flytekit.configuration import SerializationSettings from flytekit.core.base_task import PythonTask, TaskMetadata diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 6fcaf8aa63..26f64d8e84 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -124,10 +124,14 @@ def get_container(self, settings: SerializationSettings) -> Container: return self._run_task.get_container(settings) def get_k8s_pod(self, settings: SerializationSettings) -> K8sPod: + if isinstance(self._run_task, ContainerTask): + return self._run_task.get_k8s_pod(settings) with self.prepare_target(): return self._run_task.get_k8s_pod(settings) def get_sql(self, settings: SerializationSettings) -> Sql: + if isinstance(self._run_task, ContainerTask): + return self._run_task.get_sql(settings) with self.prepare_target(): return self._run_task.get_sql(settings) From bf25229149d5ca80b648f0adf8b84f7b408b26ad Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 30 Mar 2023 16:29:45 -0700 Subject: [PATCH 8/9] support fast register Signed-off-by: Kevin Su --- flytekit/tools/translator.py | 5 ++++- tests/flytekit/unit/core/test_map_task.py | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flytekit/tools/translator.py b/flytekit/tools/translator.py index 8b30fc4d36..9e13ed34bb 100644 --- a/flytekit/tools/translator.py +++ b/flytekit/tools/translator.py @@ -180,7 +180,10 @@ def get_serializable_task( if settings.should_fast_serialize(): # This handles container tasks. - if container and isinstance(entity, (PythonAutoContainerTask, MapPythonTask)): + if container and ( + isinstance(entity, PythonAutoContainerTask) + or (isinstance(entity, MapPythonTask) and isinstance(entity.run_task, PythonAutoContainerTask)) + ): # For fast registration, we'll need to muck with the command, but on # ly for certain kinds of tasks. Specifically, # tasks that rely on user code defined in the container. This should be encapsulated by the auto container diff --git a/tests/flytekit/unit/core/test_map_task.py b/tests/flytekit/unit/core/test_map_task.py index d16bfac54c..ec5bb278ed 100644 --- a/tests/flytekit/unit/core/test_map_task.py +++ b/tests/flytekit/unit/core/test_map_task.py @@ -120,6 +120,7 @@ def test_serialization_with_raw_container(serialization_settings): assert task_spec.template.custom["minSuccessRatio"] == 1.0 assert task_spec.template.type == "container_array" assert task_spec.template.task_type_version == 1 + assert task_spec.template.container.args is None assert task_spec.template.container.command == [ "python", "test.py", From fcfad210b443d7f40fd384cdf6db1ff86a07073b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 9 Apr 2023 16:53:52 -0700 Subject: [PATCH 9/9] fixed test Signed-off-by: Kevin Su --- flytekit/core/map_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index dac28a1a18..c8895f2d91 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -64,8 +64,8 @@ def __init__( else: actual_task = python_function_task - if not isinstance(actual_task, PythonFunctionTask): - raise ValueError("Map tasks can only compose of Python Functon Tasks currently") + if not isinstance(actual_task, (PythonFunctionTask, ContainerTask)): + raise ValueError("Map tasks can only compose of Python Function or Container Tasks currently") if len(actual_task.python_interface.outputs.keys()) > 1: raise ValueError("Map tasks only accept python function tasks with 0 or 1 outputs")