From 9b65742876825d5a4c7778f43d90616627badbec Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Mon, 24 Jun 2024 13:27:30 -0400 Subject: [PATCH 01/14] fix eager mode Signed-off-by: Niels Bantilan --- flytekit/bin/entrypoint.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 4b1dec78c6..d189973579 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -108,6 +108,9 @@ def _dispatch_execute( # Handle eager-mode (async) tasks logger.info("Output is a coroutine") outputs = asyncio.run(outputs) + # make sure an event loop exists for data persistence step + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) # Step3a if isinstance(outputs, VoidPromise): From 4e04b82c4242a24bdde135458d5029f8a86c1250 Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Wed, 3 Jul 2024 19:10:10 -0400 Subject: [PATCH 02/14] bind secret to env var Signed-off-by: Niels Bantilan --- flytekit/bin/entrypoint.py | 7 +++++-- flytekit/experimental/eager_function.py | 26 +++++++++++++++++-------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index d189973579..234c90ceb6 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -109,8 +109,11 @@ def _dispatch_execute( logger.info("Output is a coroutine") outputs = asyncio.run(outputs) # make sure an event loop exists for data persistence step - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + try: + asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) # Step3a if isinstance(outputs, VoidPromise): diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 7eec791726..3c05d3067b 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -1,5 +1,6 @@ import asyncio import inspect +import os import signal from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone @@ -382,6 +383,7 @@ def eager( timeout: Optional[timedelta] = None, poll_interval: Optional[timedelta] = None, local_entrypoint: bool = False, + bind_secret_to_env_var: Optional[str] = None, **kwargs, ): """Eager workflow decorator. @@ -396,6 +398,7 @@ def eager( :param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided :py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local testing against a remote Flyte cluster. + :param bind_secret_to_env_var: if specified, binds the client secret to the specified environment variable. :param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`. This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be @@ -488,7 +491,10 @@ async def eager_workflow(x: int) -> int: remote=remote, client_secret_group=client_secret_group, client_secret_key=client_secret_key, + timeout=timeout, + poll_interval=poll_interval, local_entrypoint=local_entrypoint, + bind_secret_to_env_var=bind_secret_to_env_var **kwargs, ) @@ -510,7 +516,7 @@ async def wrapper(*args, **kws): execution_id = exec_params.execution_id async_stack = AsyncStack(task_id, execution_id) - _remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint) + _remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint, bind_secret_to_env_var) # make sure sub-nodes as cleaned up on termination signal loop = asyncio.get_event_loop() @@ -533,8 +539,7 @@ async def wrapper(*args, **kws): await cleanup_fn() secret_requests = kwargs.pop("secret_requests", None) or [] - if client_secret_group is not None and client_secret_key is not None: - secret_requests.append(Secret(group=client_secret_group, key=client_secret_key)) + secret_requests.append(Secret(group=client_secret_group, key=client_secret_key)) return task( wrapper, @@ -551,6 +556,7 @@ def _prepare_remote( client_secret_group: Optional[str] = None, client_secret_key: Optional[str] = None, local_entrypoint: bool = False, + bind_secret_to_env_var: Optional[str] = None, ) -> Optional[FlyteRemote]: """Prepare FlyteRemote object for accessing Flyte cluster in a task running on the same cluster.""" @@ -576,7 +582,7 @@ def _prepare_remote( if remote.config.platform.endpoint.startswith("localhost"): # replace sandbox endpoints with internal dns, since localhost won't exist within the Flyte cluster return _internal_demo_remote(remote) - return _internal_remote(remote, client_secret_group, client_secret_key) + return _internal_remote(remote, client_secret_group, client_secret_key, bind_secret_to_env_var) def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: @@ -605,16 +611,20 @@ def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: def _internal_remote( remote: FlyteRemote, - client_secret_group: str, - client_secret_key: str, + client_secret_group: Optional[str], + client_secret_key: Optional[str], + bind_secret_to_env_var: Optional[str], ) -> FlyteRemote: """Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally.""" - assert client_secret_group is not None, "secret_group must be defined when using a remote cluster" - assert client_secret_key is not None, "secret_key must be defined a remote cluster" secrets_manager = current_context().secrets client_secret = secrets_manager.get(client_secret_group, client_secret_key) # get the raw output prefix from the context that's set from the pyflyte-execute entrypoint # (see flytekit/bin/entrypoint.py) + + if bind_secret_to_env_var is not None: + os.environ[bind_secret_to_env_var] = client_secret + return type(remote)() + ctx = FlyteContextManager.current_context() return FlyteRemote( config=remote.config.with_params( From 5abc9ad7faeca8fa3bee02f7e8c947813dc4f39c Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 10:29:01 -0400 Subject: [PATCH 03/14] add remote creation error handling Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 3c05d3067b..0262a3164f 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -622,8 +622,13 @@ def _internal_remote( # (see flytekit/bin/entrypoint.py) if bind_secret_to_env_var is not None: + # this creates a remote client where the env var client secret is sufficient for authentication os.environ[bind_secret_to_env_var] = client_secret - return type(remote)() + try: + remote_cls = type(remote) + return remote_cls() + except Exception as exc: + raise TypeError(f"Unable to authenticate remote class {remote_cls} with client secret") from exc ctx = FlyteContextManager.current_context() return FlyteRemote( From 9b9dd83463706cc916ff5611e40a769d977e430b Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 12:52:55 -0400 Subject: [PATCH 04/14] update new arg to client_secret_env_var Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 0262a3164f..0415e20753 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -383,7 +383,7 @@ def eager( timeout: Optional[timedelta] = None, poll_interval: Optional[timedelta] = None, local_entrypoint: bool = False, - bind_secret_to_env_var: Optional[str] = None, + client_secret_env_var: Optional[str] = None, **kwargs, ): """Eager workflow decorator. @@ -398,7 +398,8 @@ def eager( :param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided :py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local testing against a remote Flyte cluster. - :param bind_secret_to_env_var: if specified, binds the client secret to the specified environment variable. + :param client_secret_env_var: if specified, binds the client secret to the specified environment variable for + remote authentication. :param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`. This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be @@ -494,7 +495,7 @@ async def eager_workflow(x: int) -> int: timeout=timeout, poll_interval=poll_interval, local_entrypoint=local_entrypoint, - bind_secret_to_env_var=bind_secret_to_env_var + client_secret_env_var=client_secret_env_var **kwargs, ) @@ -516,7 +517,9 @@ async def wrapper(*args, **kws): execution_id = exec_params.execution_id async_stack = AsyncStack(task_id, execution_id) - _remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint, bind_secret_to_env_var) + _remote = _prepare_remote( + _remote, ctx, client_secret_group, client_secret_key, local_entrypoint, client_secret_env_var + ) # make sure sub-nodes as cleaned up on termination signal loop = asyncio.get_event_loop() @@ -556,7 +559,7 @@ def _prepare_remote( client_secret_group: Optional[str] = None, client_secret_key: Optional[str] = None, local_entrypoint: bool = False, - bind_secret_to_env_var: Optional[str] = None, + client_secret_env_var: Optional[str] = None, ) -> Optional[FlyteRemote]: """Prepare FlyteRemote object for accessing Flyte cluster in a task running on the same cluster.""" @@ -582,7 +585,7 @@ def _prepare_remote( if remote.config.platform.endpoint.startswith("localhost"): # replace sandbox endpoints with internal dns, since localhost won't exist within the Flyte cluster return _internal_demo_remote(remote) - return _internal_remote(remote, client_secret_group, client_secret_key, bind_secret_to_env_var) + return _internal_remote(remote, client_secret_group, client_secret_key, client_secret_env_var) def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: @@ -613,7 +616,7 @@ def _internal_remote( remote: FlyteRemote, client_secret_group: Optional[str], client_secret_key: Optional[str], - bind_secret_to_env_var: Optional[str], + client_secret_env_var: Optional[str], ) -> FlyteRemote: """Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally.""" secrets_manager = current_context().secrets @@ -621,9 +624,9 @@ def _internal_remote( # get the raw output prefix from the context that's set from the pyflyte-execute entrypoint # (see flytekit/bin/entrypoint.py) - if bind_secret_to_env_var is not None: + if client_secret_env_var is not None: # this creates a remote client where the env var client secret is sufficient for authentication - os.environ[bind_secret_to_env_var] = client_secret + os.environ[client_secret_env_var] = client_secret try: remote_cls = type(remote) return remote_cls() From f8f97b569d98f3c1c82bb595c2a56299fece823d Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 13:03:55 -0400 Subject: [PATCH 05/14] fix lint Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 0415e20753..ba2f2479e0 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -495,8 +495,7 @@ async def eager_workflow(x: int) -> int: timeout=timeout, poll_interval=poll_interval, local_entrypoint=local_entrypoint, - client_secret_env_var=client_secret_env_var - **kwargs, + client_secret_env_var=client_secret_env_var**kwargs, ) if local_entrypoint and remote is None: From 54c4471a56311ff8201ba76d9bda726bc5b4689b Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 15:00:38 -0400 Subject: [PATCH 06/14] fix bug Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index ba2f2479e0..2112fe244d 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -495,7 +495,7 @@ async def eager_workflow(x: int) -> int: timeout=timeout, poll_interval=poll_interval, local_entrypoint=local_entrypoint, - client_secret_env_var=client_secret_env_var**kwargs, + client_secret_env_var=client_secret_env_var, ) if local_entrypoint and remote is None: From b526f6fabd0da884f686426d1eb6ee42e7ed5631 Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 15:30:34 -0400 Subject: [PATCH 07/14] fix kwargs Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 2112fe244d..104324b7ef 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -496,6 +496,7 @@ async def eager_workflow(x: int) -> int: poll_interval=poll_interval, local_entrypoint=local_entrypoint, client_secret_env_var=client_secret_env_var, + **kwargs, ) if local_entrypoint and remote is None: From 66c6bd4b720a9f4c962515b28c0c433a8943175b Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 15:39:29 -0400 Subject: [PATCH 08/14] try creating secret Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 104324b7ef..f91daf3516 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -542,7 +542,10 @@ async def wrapper(*args, **kws): await cleanup_fn() secret_requests = kwargs.pop("secret_requests", None) or [] - secret_requests.append(Secret(group=client_secret_group, key=client_secret_key)) + try: + secret_requests.append(Secret(group=client_secret_group, key=client_secret_key)) + except ValueError: + pass return task( wrapper, From 9ec46fb2f4a6d3e0e4130c5e16b8d7481a64884b Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 15:44:38 -0400 Subject: [PATCH 09/14] add event loop if needed Signed-off-by: Niels Bantilan --- flytekit/bin/entrypoint.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 234c90ceb6..e26923c24e 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -68,6 +68,15 @@ def _compute_array_job_index(): return offset +def _reset_event_loop_if_needed(): + """Create new event loop if it doesn't exist.""" + try: + asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + def _dispatch_execute( ctx: FlyteContext, load_task: Callable[[], PythonTask], @@ -107,13 +116,14 @@ def _dispatch_execute( if inspect.iscoroutine(outputs): # Handle eager-mode (async) tasks logger.info("Output is a coroutine") + + # make sure event loop exists before running the coroutine + _reset_event_loop_if_needed() + outputs = asyncio.run(outputs) + # make sure an event loop exists for data persistence step - try: - asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + _reset_event_loop_if_needed() # Step3a if isinstance(outputs, VoidPromise): From d21559f8543471768ce8e0b2e88393f499364cce Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 15:58:44 -0400 Subject: [PATCH 10/14] debug Signed-off-by: Niels Bantilan --- flytekit/bin/entrypoint.py | 20 +++++--------------- flytekit/experimental/eager_function.py | 7 +++++++ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index e26923c24e..234c90ceb6 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -68,15 +68,6 @@ def _compute_array_job_index(): return offset -def _reset_event_loop_if_needed(): - """Create new event loop if it doesn't exist.""" - try: - asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - def _dispatch_execute( ctx: FlyteContext, load_task: Callable[[], PythonTask], @@ -116,14 +107,13 @@ def _dispatch_execute( if inspect.iscoroutine(outputs): # Handle eager-mode (async) tasks logger.info("Output is a coroutine") - - # make sure event loop exists before running the coroutine - _reset_event_loop_if_needed() - outputs = asyncio.run(outputs) - # make sure an event loop exists for data persistence step - _reset_event_loop_if_needed() + try: + asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) # Step3a if isinstance(outputs, VoidPromise): diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index f91daf3516..48b70cb411 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -528,6 +528,13 @@ async def wrapper(*args, **kws): signal.signal(signal.SIGTERM, partial(node_cleanup, loop=loop, async_stack=async_stack)) async with eager_context(_fn, _remote, ctx, async_stack, timeout, poll_interval, local_entrypoint): + # make sure an event loop exists + try: + asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: if _remote is not None: with _remote.remote_context(): From aec338a1a756166053c7cb667d8e362578bad4c8 Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 29 Aug 2024 16:15:47 -0400 Subject: [PATCH 11/14] debug Signed-off-by: Niels Bantilan --- flytekit/bin/entrypoint.py | 13 +++++++------ flytekit/experimental/eager_function.py | 7 ------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 234c90ceb6..e0c0c04138 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -108,12 +108,6 @@ def _dispatch_execute( # Handle eager-mode (async) tasks logger.info("Output is a coroutine") outputs = asyncio.run(outputs) - # make sure an event loop exists for data persistence step - try: - asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) # Step3a if isinstance(outputs, VoidPromise): @@ -183,6 +177,13 @@ def _dispatch_execute( for k, v in output_file_dict.items(): utils.write_proto_to_file(v.to_flyte_idl(), os.path.join(ctx.execution_state.engine_dir, k)) + # make sure an event loop exists for data persistence step + try: + asyncio.get_event_loop() + except Exception: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True) logger.info(f"Engine folder written successfully to the output prefix {output_prefix}") diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 48b70cb411..f91daf3516 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -528,13 +528,6 @@ async def wrapper(*args, **kws): signal.signal(signal.SIGTERM, partial(node_cleanup, loop=loop, async_stack=async_stack)) async with eager_context(_fn, _remote, ctx, async_stack, timeout, poll_interval, local_entrypoint): - # make sure an event loop exists - try: - asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: if _remote is not None: with _remote.remote_context(): From f0b5d679fd5e7b1c3f4fda62b6e22dde1b5c2757 Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Fri, 30 Aug 2024 11:22:13 -0400 Subject: [PATCH 12/14] update error Signed-off-by: Niels Bantilan --- flytekit/bin/entrypoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index e0c0c04138..7a1381757c 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -180,7 +180,7 @@ def _dispatch_execute( # make sure an event loop exists for data persistence step try: asyncio.get_event_loop() - except Exception: + except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) From 351a8c69db91106a781f9ae61da3a664f7a74c6d Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Fri, 30 Aug 2024 16:57:30 -0400 Subject: [PATCH 13/14] pass default domain and project to new remote Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index f91daf3516..e10db0c960 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -632,7 +632,10 @@ def _internal_remote( os.environ[client_secret_env_var] = client_secret try: remote_cls = type(remote) - return remote_cls() + return remote_cls( + default_domain=remote.default_domain, + default_project=remote.default_project, + ) except Exception as exc: raise TypeError(f"Unable to authenticate remote class {remote_cls} with client secret") from exc From 2425ed4c2ccdf1c437689b46e4ed305bba64a30e Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 5 Sep 2024 10:06:51 -0400 Subject: [PATCH 14/14] add client secret assertion Signed-off-by: Niels Bantilan --- flytekit/bin/entrypoint.py | 7 ------- flytekit/experimental/eager_function.py | 5 +++++ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 7a1381757c..4b1dec78c6 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -177,13 +177,6 @@ def _dispatch_execute( for k, v in output_file_dict.items(): utils.write_proto_to_file(v.to_flyte_idl(), os.path.join(ctx.execution_state.engine_dir, k)) - # make sure an event loop exists for data persistence step - try: - asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True) logger.info(f"Engine folder written successfully to the output prefix {output_prefix}") diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index e10db0c960..f5c0051de2 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -623,6 +623,11 @@ def _internal_remote( ) -> FlyteRemote: """Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally.""" secrets_manager = current_context().secrets + + assert ( + client_secret_group is not None or client_secret_key is not None + ), "One of client_secret_group or client_secret_key must be defined when using a remote cluster" + client_secret = secrets_manager.get(client_secret_group, client_secret_key) # get the raw output prefix from the context that's set from the pyflyte-execute entrypoint # (see flytekit/bin/entrypoint.py)