Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename container_image to image for improved UX #3094

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions flytekit/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import datetime
import typing
import warnings
from typing import Any, Dict, List, Optional, Union
from typing import Literal as L

Expand Down Expand Up @@ -68,7 +69,7 @@ def __init__(
self._outputs = None
self._resources: typing.Optional[_resources_model] = None
self._extended_resources: typing.Optional[tasks_pb2.ExtendedResources] = None
self._container_image: typing.Optional[str] = None
self._image: typing.Optional[str] = None
self._pod_template: typing.Optional[PodTemplate] = None

def runs_before(self, other: Node):
Expand Down Expand Up @@ -189,7 +190,7 @@ def with_overrides(
interruptible: Optional[bool] = None,
name: Optional[str] = None,
task_config: Optional[Any] = None,
container_image: Optional[str] = None,
image: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
cache: Optional[bool] = None,
cache_version: Optional[str] = None,
Expand Down Expand Up @@ -236,9 +237,24 @@ def with_overrides(
raise ValueError("can't change the type of the task config")
self.run_entity._task_config = task_config

if container_image is not None:
assert_not_promise(container_image, "container_image")
self._container_image = container_image
# Rename the `container_image` parameter to `image` for improved user experience.
# Currently, both `image` and `container_image` are supported to maintain backward compatibility.
# For more details, please refer to https://github.com/flyteorg/flyte/issues/6140.
if image is not None and kwargs.get("container_image") is not None:
raise ValueError(
"Cannot specify both image and container_image. "
"Please use image because container_image is deprecated and will be removed in the future."
)
elif kwargs.get("container_image") is not None:
warnings.warn(
"container_image is deprecated and will be removed in the future. Please use image instead.",
DeprecationWarning,
)
assert_not_promise(kwargs["container_image"], "container_image")
self._image = kwargs["container_image"]
else:
assert_not_promise(image, "image")
self._image = image

if accelerator is not None:
assert_not_promise(accelerator, "accelerator")
Expand All @@ -256,6 +272,11 @@ def with_overrides(

return self

@property
def _container_image(self) -> typing.Optional[str]:
"""Deprecated, please use `_image` instead."""
return self._image
Comment on lines +275 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding deprecation warning for _container_image

Consider enhancing the deprecation notice for _container_image by adding a warning using the warnings module. This would help users transition to using _image instead.

Code suggestion
Check the AI-generated fix before applying
Suggested change
@property
def _container_image(self) -> typing.Optional[str]:
"""Deprecated, please use `_image` instead."""
return self._image
def _container_image(self) -> typing.Optional[str]:
"""Deprecated, please use `_image` instead."""
warnings.warn("_container_image is deprecated, use _image instead", DeprecationWarning, stacklevel=2)
return self._image

Code Review Run #4f136e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged



def _convert_resource_overrides(
resources: typing.Optional[Resources], resource_name: str
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def with_overrides(
interruptible: Optional[bool] = None,
name: Optional[str] = None,
task_config: Optional[Any] = None,
container_image: Optional[str] = None,
image: Optional[str] = None,
accelerator: Optional[BaseAccelerator] = None,
cache: Optional[bool] = None,
cache_version: Optional[str] = None,
Expand All @@ -611,7 +611,7 @@ def with_overrides(
interruptible=interruptible,
name=name,
task_config=task_config,
container_image=container_image,
image=image,
accelerator=accelerator,
cache=cache,
cache_version=cache_version,
Expand Down
45 changes: 41 additions & 4 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import importlib
import re
import warnings
from abc import ABC
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, TypeVar, Union
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(
name: str,
task_config: T,
task_type="python-task",
container_image: Optional[Union[str, ImageSpec]] = None,
image: Optional[Union[str, ImageSpec]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
environment: Optional[Dict[str, str]] = None,
Expand All @@ -59,7 +60,7 @@ def __init__(
:param name: unique name for the task, usually the function's module and name.
:param task_config: Configuration object for Task. Should be a unique type for that specific Task.
:param task_type: String task type to be associated with this Task
:param container_image: String FQN for the image.
:param image: String FQN or ImageSpec for the image.
:param requests: custom resource request settings.
:param limits: custom resource limit settings.
:param environment: Environment variables you want the task to have when run.
Expand All @@ -82,6 +83,7 @@ def __init__(
:param accelerator: The accelerator to use for this task.
:param shared_memory: If True, then shared memory will be attached to the container where the size is equal
to the allocated memory. If str, then the shared memory is set to that size.
:param container_image: Deprecated, please use `image` instead.
"""
sec_ctx = None
if secret_requests:
Expand All @@ -94,7 +96,23 @@ def __init__(
kwargs["metadata"] = kwargs["metadata"] if "metadata" in kwargs else TaskMetadata()
kwargs["metadata"].pod_template_name = pod_template_name

self._container_image = container_image
# Rename the `container_image` parameter to `image` for improved user experience.
# Currently, both `image` and `container_image` are supported to maintain backward compatibility.
# For more details, please refer to https://github.com/flyteorg/flyte/issues/6140.
if image is not None and kwargs.get("container_image") is not None:
raise ValueError(
"Cannot specify both image and container_image. "
"Please use image because container_image is deprecated and will be removed in the future."
)
elif kwargs.get("container_image") is not None:
warnings.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just make this info? warning is a little too annoying I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is not annoying enough. DeprecationWarning does not really show up with default filter warnings. (FutureWarning would be more annoying)

If we really want users to change their code, then I think warnings should be a bit annoying. Because when we remove this feature, breaking code will be even more annoying.

"container_image is deprecated and will be removed in the future. Please use image instead.",
DeprecationWarning,
)
self._image = kwargs["container_image"]
else:
self._image = image

# TODO(katrogan): Implement resource overrides
self._resources = ResourceSpec(
requests=requests if requests else Resources(), limits=limits if limits else Resources()
Expand Down Expand Up @@ -138,9 +156,28 @@ def __init__(
def task_resolver(self) -> TaskResolverMixin:
return self._task_resolver

@property
def image(self) -> Optional[Union[str, ImageSpec]]:
return self._image

@property
def container_image(self) -> Optional[Union[str, ImageSpec]]:
return self._container_image
"""Deprecated, please use `image` instead."""
return self._image

@property
def _container_image(self) -> Optional[Union[str, ImageSpec]]:
"""Deprecated, please use `image` instead."""
return self._image

@_container_image.setter
def _container_image(self, image: Optional[Union[str, ImageSpec]]):
"""Deprecated, please use `image` instead.

This setter is for backward compatibility, so that setting `_container_image`
will adjust the new `_image` parameter directly.
"""
self._image = image

@property
def resources(self) -> ResourceSpec:
Expand Down
19 changes: 10 additions & 9 deletions flytekit/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def task(
interruptible: Optional[bool] = ...,
deprecated: str = ...,
timeout: Union[datetime.timedelta, int] = ...,
container_image: Optional[Union[str, ImageSpec]] = ...,
image: Optional[Union[str, ImageSpec]] = ...,
environment: Optional[Dict[str, str]] = ...,
requests: Optional[Resources] = ...,
limits: Optional[Resources] = ...,
Expand Down Expand Up @@ -141,7 +141,7 @@ def task(
interruptible: Optional[bool] = ...,
deprecated: str = ...,
timeout: Union[datetime.timedelta, int] = ...,
container_image: Optional[Union[str, ImageSpec]] = ...,
image: Optional[Union[str, ImageSpec]] = ...,
environment: Optional[Dict[str, str]] = ...,
requests: Optional[Resources] = ...,
limits: Optional[Resources] = ...,
Expand Down Expand Up @@ -178,7 +178,7 @@ def task(
interruptible: Optional[bool] = None,
deprecated: str = "",
timeout: Union[datetime.timedelta, int] = 0,
container_image: Optional[Union[str, ImageSpec]] = None,
image: Optional[Union[str, ImageSpec]] = None,
environment: Optional[Dict[str, str]] = None,
requests: Optional[Resources] = None,
limits: Optional[Resources] = None,
Expand Down Expand Up @@ -273,7 +273,7 @@ def my_task(x: int, y: typing.Dict[str, str]) -> str:
indicates that the task is active and not deprecated
:param timeout: the max amount of time for which one execution of this task should be executed for. The execution
will be terminated if the runtime exceeds the given timeout (approximately).
:param container_image: By default the configured FLYTE_INTERNAL_IMAGE is used for every task. This directive can be
:param image: By default the configured FLYTE_INTERNAL_IMAGE is used for every task. This directive can be
used to provide an alternate image for a specific task. This is useful for the cases in which images
bloat because of various dependencies and a dependency is only required for this or a set of tasks,
and they vary from the default.
Expand All @@ -283,13 +283,13 @@ def my_task(x: int, y: typing.Dict[str, str]) -> str:
# Use default image name `fqn` and alter the tag to `tag-{{default.tag}}` tag of the default image
# with a prefix. In this case, it is assumed that the image like
# flytecookbook:tag-gitsha is published alongwith the default of flytecookbook:gitsha
@task(container_image='{{.images.default.fqn}}:tag-{{images.default.tag}}')
@task(image='{{.images.default.fqn}}:tag-{{images.default.tag}}')
def foo():
...

# Refer to configurations to configure fqns for other images besides default. In this case it will
# lookup for an image named xyz
@task(container_image='{{.images.xyz.fqn}}:{{images.default.tag}}')
@task(image='{{.images.xyz.fqn}}:{{images.default.tag}}')
def foo2():
...
:param environment: Environment variables that should be added for this tasks execution
Expand Down Expand Up @@ -344,6 +344,7 @@ def launch_dynamically():
:param pickle_untyped: Boolean that indicates if the task allows unspecified data types.
:param shared_memory: If True, then shared memory will be attached to the container where the size is equal
to the allocated memory. If int, then the shared memory is set to that size.
:param container_image: Deprecated, please use `image` instead.
"""
# Maintain backwards compatibility with the old cache parameters, while cleaning up the task function definition.
cache_serialize = kwargs.get("cache_serialize")
Expand All @@ -370,7 +371,7 @@ def wrapper(fn: Callable[P, FuncOut]) -> PythonFunctionTask[T]:
cache_version = cache.get_version(
VersionParameters(
func=fn,
container_image=container_image,
container_image=image or kwargs.get("container_image"),
pod_template=pod_template,
pod_template_name=pod_template_name,
)
Expand Down Expand Up @@ -418,7 +419,7 @@ def wrapper(fn: Callable[P, FuncOut]) -> PythonFunctionTask[T]:
task_config,
decorated_fn,
metadata=_metadata,
container_image=container_image,
image=image,
environment=environment,
requests=requests,
limits=limits,
Expand All @@ -435,6 +436,7 @@ def wrapper(fn: Callable[P, FuncOut]) -> PythonFunctionTask[T]:
accelerator=accelerator,
pickle_untyped=pickle_untyped,
shared_memory=shared_memory,
**kwargs,
)
update_wrapper(task_instance, decorated_fn)
return task_instance
Expand Down Expand Up @@ -629,7 +631,6 @@ async def eager_workflow(x: int) -> int:
async def eager_workflow(x: int) -> int:
...
"""

if _fn is None:
return partial(
eager,
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ class DC:
a: Union[int, bool, str, float]
b: Union[int, bool, str, float]

@task(container_image=custom_image)
@task(image=custom_image)
def add(a: Union[int, bool, str, float], b: Union[int, bool, str, float]) -> Union[int, bool, str, float]:
return a + b

Expand Down
4 changes: 2 additions & 2 deletions tests/flytekit/integration/jupyter/test_notebook_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ def execute_code_in_kernel(kc: BlockingKernelClient, code: str):
interactive_mode_enabled=True,
)

@task(container_image="{IMAGE}")
@task(image="{IMAGE}")
def hello(name: str) -> str:
return f"Hello {{name}}"

@task(container_image="{IMAGE}")
@task(image="{IMAGE}")
def world(pre: str) -> str:
return f"{{pre}}, Welcome to the world!"

Expand Down
4 changes: 2 additions & 2 deletions tests/flytekit/unit/cli/pyflyte/image_spec_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
image_spec = ImageSpec(packages=["numpy", "pandas"], apt_packages=["git"], registry="", builder="test")


@task(container_image=image_spec)
@task(image=image_spec)
def t2() -> str:
return "flyte"


@task(container_image=image_spec)
@task(image=image_spec)
def t1() -> str:
return "flyte"

Expand Down
12 changes: 0 additions & 12 deletions tests/flytekit/unit/core/test_array_node_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,6 @@ def my_wf1() -> typing.List[typing.Optional[int]]:
assert my_wf1() == [1, None, 3, 4]


def test_map_task_override(serialization_settings):
@task
def my_mappable_task(a: int) -> typing.Optional[str]:
return str(a)

@workflow
def wf(x: typing.List[int]):
map_task(my_mappable_task)(a=x).with_overrides(container_image="random:image")

assert wf.nodes[0]._container_image == "random:image"


def test_serialization_metadata(serialization_settings):
@task(interruptible=True)
def t1(a: int) -> int:
Expand Down
5 changes: 5 additions & 0 deletions tests/flytekit/unit/core/test_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,14 @@ def my_mappable_task(a: int) -> typing.Optional[str]:

@workflow
def wf(x: typing.List[int]):
map_task(my_mappable_task)(a=x).with_overrides(image="random:image")
map_task(my_mappable_task)(a=x).with_overrides(container_image="random:image")

assert wf.nodes[0]._image == "random:image"
assert wf.nodes[0]._container_image == "random:image"
assert wf.nodes[1]._image == "random:image"
assert wf.nodes[1]._container_image == "random:image"


def test_map_task_pod_template_override(serialization_settings):
@task
Expand Down
40 changes: 40 additions & 0 deletions tests/flytekit/unit/core/test_node_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,15 @@ def bar():

@workflow
def wf() -> str:
bar().with_overrides(image="hello/world")
bar().with_overrides(container_image="hello/world")
return "hi"

assert wf.nodes[0]._image == "hello/world"
assert wf.nodes[0]._container_image == "hello/world"
assert wf.nodes[1]._image == "hello/world"
assert wf.nodes[1]._container_image == "hello/world"


def test_pod_template_override():
@task
Expand Down Expand Up @@ -575,3 +580,38 @@ def my_wf(a: str) -> str:
assert wf_spec.template.nodes[0].metadata.cache_serializable
assert wf_spec.template.nodes[0].metadata.cacheable
assert wf_spec.template.nodes[0].metadata.cache_version == "foo"


def test_override_image_behavior():
# Define expected warning and error messages
WARN_MSG = "container_image is deprecated and will be removed in the future. Please use image instead."
ERR_MSG = (
"Cannot specify both image and container_image. "
"Please use image because container_image is deprecated and will be removed in the future."
)

@task
def dummy_task():
print("hello")


# Specify both image and container_image
@workflow
def wf1():
create_node(dummy_task).with_overrides(image="hello/world", container_image="hello/world:v2")
with pytest.raises(ValueError, match=ERR_MSG):
wf1()

# Specify only container_image
@workflow
def wf2():
create_node(dummy_task).with_overrides(container_image="hello/world")
with pytest.warns(DeprecationWarning, match=WARN_MSG):
wf2()
assert wf2.nodes[0]._image == wf2.nodes[0]._container_image == "hello/world"

# Specify only image
@workflow
def wf3():
create_node(dummy_task).with_overrides(image="hello/world")
assert wf3.nodes[0]._image == wf3.nodes[0]._container_image == "hello/world"
Loading
Loading