Skip to content

Commit

Permalink
[resources] support async yield_for_execution
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Feb 28, 2025
1 parent 463883b commit 2ac2079
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 42 deletions.
36 changes: 33 additions & 3 deletions python_modules/dagster/dagster/_config/pythonic_config/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@
has_at_least_one_parameter,
)
from dagster._core.definitions.resource_requirement import ResourceRequirement
from dagster._core.errors import DagsterInvalidConfigError, DagsterInvalidDefinitionError
from dagster._core.errors import (
DagsterError,
DagsterInvalidConfigError,
DagsterInvalidDefinitionError,
)
from dagster._core.execution.context.init import InitResourceContext, build_init_resource_context
from dagster._model.pydantic_compat_layer import model_fields
from dagster._record import record
Expand Down Expand Up @@ -405,16 +409,42 @@ def _initialize_and_run(self, context: InitResourceContext) -> TResValue:
updated_resource.setup_for_execution(context)
return updated_resource.create_resource(context)

@contextlib.contextmanager
def _async_to_sync_cm(
self,
context: InitResourceContext,
async_cm: contextlib.AbstractAsyncContextManager,
):
aio_exit_stack = contextlib.AsyncExitStack()
loop = context.event_loop
if loop is None:
raise DagsterError(
"Unable to handle resource with async def yield_for_execution in the current context. "
"If using direct execution utilities like build_context, pass an event loop in and use "
"the same event loop to execute your asset/op."
)

try:
value = loop.run_until_complete(aio_exit_stack.enter_async_context(async_cm))
yield value
finally:
loop.run_until_complete(aio_exit_stack.aclose())

@contextlib.contextmanager
def _initialize_and_run_cm(
self, context: InitResourceContext
self,
context: InitResourceContext,
) -> Generator[TResValue, None, None]:
with self._resolve_and_update_nested_resources(context) as has_nested_resource:
updated_resource = has_nested_resource.with_replaced_resource_context( # noqa: SLF001
context
)._with_updated_values(context.resource_config)

with updated_resource.yield_for_execution(context) as value:
resource_cm = updated_resource.yield_for_execution(context)
if isinstance(resource_cm, contextlib.AbstractAsyncContextManager):
resource_cm = self._async_to_sync_cm(context, resource_cm)

with resource_cm as value:
yield value

def setup_for_execution(self, context: InitResourceContext) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asyncio import AbstractEventLoop
from collections.abc import Generator, Mapping
from contextlib import contextmanager
from typing import Any, Optional, cast
Expand Down Expand Up @@ -45,6 +46,7 @@ def build_resources(
resource_config: Optional[Mapping[str, Any]] = None,
dagster_run: Optional[DagsterRun] = None,
log_manager: Optional[DagsterLogManager] = None,
event_loop: Optional[AbstractEventLoop] = None,
) -> Generator[Resources, None, None]:
"""Context manager that yields resources using provided resource definitions and run config.
Expand All @@ -67,6 +69,8 @@ def build_resources(
teardown, this must be provided, or initialization will fail.
log_manager (Optional[DagsterLogManager]): Log Manager to use during resource
initialization. Defaults to system log manager.
event_loop (Optional[AbstractEventLoop]): An event loop for handling resources
with async context managers.
Examples:
.. code-block:: python
Expand Down Expand Up @@ -99,6 +103,7 @@ def the_resource():
resource_keys_to_init=set(resource_defs.keys()),
instance=dagster_instance,
emit_persistent_events=False,
event_loop=event_loop,
)
try:
list(resources_manager.generate_setup_events())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from collections.abc import Mapping
from typing import Any, Optional, Union

Expand Down Expand Up @@ -38,6 +39,7 @@ def __init__(
instance: Optional[DagsterInstance] = None,
dagster_run: Optional[DagsterRun] = None,
log_manager: Optional[DagsterLogManager] = None,
event_loop: Optional[asyncio.AbstractEventLoop] = None,
):
self._resource_config = resource_config
self._resource_def = resource_def
Expand All @@ -46,6 +48,7 @@ def __init__(
self._instance = instance
self._resources = resources
self._dagster_run = dagster_run
self._event_loop = event_loop

@public
@property
Expand Down Expand Up @@ -115,6 +118,10 @@ def replace_config(self, config: Any) -> "InitResourceContext":
log_manager=self.log,
)

@property
def event_loop(self) -> Optional[asyncio.AbstractEventLoop]:
return self._event_loop


class UnboundInitResourceContext(InitResourceContext):
"""Resource initialization context outputted by ``build_init_resource_context``.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import abstractmethod
from asyncio import AbstractEventLoop
from collections.abc import Mapping, Sequence
from contextlib import ExitStack
from typing import AbstractSet, Any, NamedTuple, Optional, Union, cast # noqa: UP035
Expand Down Expand Up @@ -177,6 +178,7 @@ def __init__(
partition_key_range: Optional[PartitionKeyRange],
mapping_key: Optional[str],
run_tags: Mapping[str, str],
event_loop: Optional[AbstractEventLoop],
):
from dagster._core.execution.api import ephemeral_instance_if_missing
from dagster._core.execution.context_creation_job import initialize_console_manager
Expand All @@ -198,6 +200,7 @@ def __init__(
resources=self._resource_defs,
instance=self._instance,
resource_config=resources_config,
event_loop=event_loop,
)
)
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)
Expand All @@ -212,6 +215,7 @@ def __init__(
self._partition_key = partition_key
self._partition_key_range = partition_key_range
self._run_tags = run_tags
self._event_loop = event_loop

# Maintains the properties on the context that are bound to a particular invocation
# of an op
Expand Down Expand Up @@ -298,7 +302,11 @@ def bind(
resource_defs = wrap_resources_for_execution(resources_from_args)
# add new resources context to the stack to be cleared on exit
resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance)
build_resources(
resource_defs,
self.instance,
event_loop=self._event_loop,
)
)
elif assets_def and assets_def.resource_defs:
for key in sorted(list(assets_def.resource_defs.keys())):
Expand All @@ -313,7 +321,12 @@ def bind(
)
# add new resources context to the stack to be cleared on exit
resources = self._exit_stack.enter_context(
build_resources(resource_defs, self.instance, self._resources_config)
build_resources(
resource_defs,
self.instance,
self._resources_config,
event_loop=self._event_loop,
)
)
else:
# this runs the check in resources() to ensure we are in a context manager if necessary
Expand Down Expand Up @@ -863,6 +876,7 @@ def build_op_context(
partition_key_range: Optional[PartitionKeyRange] = None,
mapping_key: Optional[str] = None,
run_tags: Optional[Mapping[str, str]] = None,
event_loop: Optional[AbstractEventLoop] = None,
) -> DirectOpExecutionContext:
"""Builds op execution context from provided parameters.
Expand All @@ -883,6 +897,8 @@ def build_op_context(
partition_key (Optional[str]): String value representing partition key to execute with.
partition_key_range (Optional[PartitionKeyRange]): Partition key range to execute with.
run_tags: Optional[Mapping[str, str]]: The tags for the executing run.
event_loop: Optional[AbstractEventLoop]: An event loop for handling resources
with async context managers.
Examples:
.. code-block:: python
Expand Down Expand Up @@ -913,6 +929,7 @@ def build_op_context(
),
mapping_key=check.opt_str_param(mapping_key, "mapping_key"),
run_tags=check.opt_mapping_param(run_tags, "run_tags", key_type=str),
event_loop=event_loop,
)


Expand All @@ -924,6 +941,7 @@ def build_asset_context(
partition_key: Optional[str] = None,
partition_key_range: Optional[PartitionKeyRange] = None,
run_tags: Optional[Mapping[str, str]] = None,
event_loop: Optional[AbstractEventLoop] = None,
) -> DirectAssetExecutionContext:
"""Builds asset execution context from provided parameters.
Expand All @@ -942,6 +960,9 @@ def build_asset_context(
partition_key (Optional[str]): String value representing partition key to execute with.
partition_key_range (Optional[PartitionKeyRange]): Partition key range to execute with.
run_tags: Optional[Mapping[str, str]]: The tags for the executing run.
event_loop: Optional[AbstractEventLoop]: An event loop for handling resources
with async context managers.
Examples:
.. code-block:: python
Expand All @@ -960,6 +981,7 @@ def build_asset_context(
partition_key_range=partition_key_range,
instance=instance,
run_tags=run_tags,
event_loop=event_loop,
)

return DirectAssetExecutionContext(op_execution_context=op_context)
13 changes: 12 additions & 1 deletion python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

from abc import ABC, abstractmethod
from asyncio import AbstractEventLoop
from collections.abc import Iterable, Mapping
from functools import cached_property
from typing import TYPE_CHECKING, AbstractSet, Any, NamedTuple, Optional, Union, cast # noqa: UP035
Expand Down Expand Up @@ -302,12 +303,14 @@ def __init__(
plan_data: PlanData,
execution_data: ExecutionData,
log_manager: DagsterLogManager,
output_capture: Optional[dict[StepOutputHandle, Any]] = None,
output_capture: Optional[dict[StepOutputHandle, Any]],
event_loop: AbstractEventLoop,
):
self._plan_data = plan_data
self._execution_data = execution_data
self._log_manager = log_manager
self._output_capture = output_capture
self._event_loop = event_loop

@property
def plan_data(self) -> PlanData:
Expand All @@ -317,6 +320,10 @@ def plan_data(self) -> PlanData:
def output_capture(self) -> Optional[dict[StepOutputHandle, Any]]:
return self._output_capture

@property
def event_loop(self) -> AbstractEventLoop:
return self._event_loop

def for_step(
self,
step: ExecutionStep,
Expand All @@ -333,6 +340,7 @@ def for_step(
step=step,
output_capture=self.output_capture,
known_state=known_state,
event_loop=self.event_loop,
)

@property
Expand Down Expand Up @@ -408,6 +416,7 @@ def __init__(
step: ExecutionStep,
output_capture: Optional[dict[StepOutputHandle, Any]],
known_state: Optional["KnownExecutionState"],
event_loop,
):
from dagster._core.execution.resources_init import get_required_resource_keys_for_step

Expand All @@ -416,6 +425,7 @@ def __init__(
execution_data=execution_data,
log_manager=log_manager,
output_capture=output_capture,
event_loop=event_loop,
)
self._step = step
self._required_resource_keys = get_required_resource_keys_for_step(
Expand Down Expand Up @@ -1263,6 +1273,7 @@ def get_type_loader_context(self) -> "DagsterTypeLoaderContext":
step=self.step,
output_capture=self._output_capture,
known_state=self._known_state,
event_loop=None,
)

def output_observes_source_asset(self, output_name: str) -> bool:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import sys
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -215,31 +216,40 @@ def execution_context_event_generator(

log_manager = create_log_manager(context_creation_data)
resource_defs = job_def.get_required_resource_defs()
event_loop = asyncio.new_event_loop()
try:
resources_manager = scoped_resources_builder_cm(
resource_defs=resource_defs,
resource_configs=context_creation_data.resolved_run_config.resources,
log_manager=log_manager,
execution_plan=execution_plan,
dagster_run=context_creation_data.dagster_run,
resource_keys_to_init=context_creation_data.resource_keys_to_init,
instance=instance,
emit_persistent_events=True,
event_loop=event_loop,
)
yield from resources_manager.generate_setup_events()
scoped_resources_builder = check.inst(
resources_manager.get_object(), ScopedResourcesBuilder
)

resources_manager = scoped_resources_builder_cm(
resource_defs=resource_defs,
resource_configs=context_creation_data.resolved_run_config.resources,
log_manager=log_manager,
execution_plan=execution_plan,
dagster_run=context_creation_data.dagster_run,
resource_keys_to_init=context_creation_data.resource_keys_to_init,
instance=instance,
emit_persistent_events=True,
)
yield from resources_manager.generate_setup_events()
scoped_resources_builder = check.inst(resources_manager.get_object(), ScopedResourcesBuilder)

execution_context = PlanExecutionContext(
plan_data=create_plan_data(context_creation_data, raise_on_error, retry_mode),
execution_data=create_execution_data(context_creation_data, scoped_resources_builder),
log_manager=log_manager,
output_capture=output_capture,
)
execution_context = PlanExecutionContext(
plan_data=create_plan_data(context_creation_data, raise_on_error, retry_mode),
execution_data=create_execution_data(context_creation_data, scoped_resources_builder),
log_manager=log_manager,
output_capture=output_capture,
event_loop=event_loop,
)

_validate_plan_with_context(execution_context, execution_plan)
_validate_plan_with_context(execution_context, execution_plan)

yield execution_context
yield from resources_manager.generate_teardown_events()

yield execution_context
yield from resources_manager.generate_teardown_events()
finally:
event_loop.run_until_complete(event_loop.shutdown_asyncgens())
event_loop.close()


class PlanOrchestrationContextManager(ExecutionContextManager[PlanOrchestrationContext]):
Expand Down
Loading

0 comments on commit 2ac2079

Please sign in to comment.