From 6bf29f9a7f9496ebffaa6f50e5d61549d21d8e0f Mon Sep 17 00:00:00 2001 From: ElenaKhaustova <157851531+ElenaKhaustova@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:07:59 +0100 Subject: [PATCH] [DataCatalog2.0]: `Protocol` abstraction for `DataCatalog` (#4160) * Added a skeleton for AbstractDataCatalog and KedroDataCatalog Signed-off-by: Elena Khaustova * Removed from_config method Signed-off-by: Elena Khaustova * Implemented _init_datasets method Signed-off-by: Elena Khaustova * Implemented get dataset Signed-off-by: Elena Khaustova * Started resolve_patterns implementation Signed-off-by: Elena Khaustova * Implemented resolve_patterns Signed-off-by: Elena Khaustova * Fixed credentials resolving Signed-off-by: Elena Khaustova * Updated match pattern Signed-off-by: Elena Khaustova * Implemented add from dict method Signed-off-by: Elena Khaustova * Updated io __init__ Signed-off-by: Elena Khaustova * Added list method Signed-off-by: Elena Khaustova * Implemented _validate_missing_keys Signed-off-by: Elena Khaustova * Added datasets access logic Signed-off-by: Elena Khaustova * Added __contains__ and comments on lazy loading Signed-off-by: Elena Khaustova * Renamed dataset_name to ds_name Signed-off-by: Elena Khaustova * Updated some docstrings Signed-off-by: Elena Khaustova * Fixed _update_ds_configs Signed-off-by: Elena Khaustova * Fixed _init_datasets Signed-off-by: Elena Khaustova * Implemented add_runtime_patterns Signed-off-by: Elena Khaustova * Fixed runtime patterns usage Signed-off-by: Elena Khaustova * Moved pattern logic out of data catalog, implemented KedroDataCatalog Signed-off-by: Elena Khaustova * KedroDataCatalog updates Signed-off-by: Elena Khaustova * Added property to return config Signed-off-by: Elena Khaustova * Added list patterns method Signed-off-by: Elena Khaustova * Renamed and moved ConfigResolver Signed-off-by: Elena Khaustova * Renamed ConfigResolver Signed-off-by: Elena Khaustova * Cleaned KedroDataCatalog Signed-off-by: Elena Khaustova * Cleaned up DataCatalogConfigResolver Signed-off-by: Elena Khaustova * Docs build fix attempt Signed-off-by: Elena Khaustova * Removed KedroDataCatalog Signed-off-by: Elena Khaustova * Updated from_config method Signed-off-by: Elena Khaustova * Updated constructor and add methods Signed-off-by: Elena Khaustova * Updated _get_dataset method Signed-off-by: Elena Khaustova * Updated __contains__ Signed-off-by: Elena Khaustova * Updated __eq__ and shallow_copy Signed-off-by: Elena Khaustova * Added __iter__ and __getitem__ Signed-off-by: Elena Khaustova * Removed unused imports Signed-off-by: Elena Khaustova * Added TODO Signed-off-by: Elena Khaustova * Updated runner.run() Signed-off-by: Elena Khaustova * Updated session Signed-off-by: Elena Khaustova * Added confil_resolver property Signed-off-by: Elena Khaustova * Updated catalog list command Signed-off-by: Elena Khaustova * Updated catalog create command Signed-off-by: Elena Khaustova * Updated catalog rank command Signed-off-by: Elena Khaustova * Updated catalog resolve command Signed-off-by: Elena Khaustova * Remove some methods Signed-off-by: Elena Khaustova * Removed ds configs from catalog Signed-off-by: Elena Khaustova * Fixed lint Signed-off-by: Elena Khaustova * Fixed typo Signed-off-by: Elena Khaustova * Added module docstring Signed-off-by: Elena Khaustova * Removed None from Pattern type Signed-off-by: Elena Khaustova * Fixed docs failing to find class reference Signed-off-by: Elena Khaustova * Fixed docs failing to find class reference Signed-off-by: Elena Khaustova * Updated Patterns type Signed-off-by: Elena Khaustova * Fix tests (#4149) * Fix most tests Signed-off-by: Ankita Katiyar * Fix most tests Signed-off-by: Ankita Katiyar --------- Signed-off-by: Ankita Katiyar * Returned constants to avoid breaking changes Signed-off-by: Elena Khaustova * Minor fix Signed-off-by: Elena Khaustova * Updated test_sorting_order_with_other_dataset_through_extra_pattern Signed-off-by: Elena Khaustova * Removed odd properties Signed-off-by: Elena Khaustova * Updated tests Signed-off-by: Elena Khaustova * Removed None from _fetch_credentials input Signed-off-by: Elena Khaustova * Renamed DataCatalogConfigResolver to CatalogConfigResolver Signed-off-by: Elena Khaustova * Renamed _init_configs to _resolve_config_credentials Signed-off-by: Elena Khaustova * Moved functions to the class Signed-off-by: Elena Khaustova * Refactored resolve_dataset_pattern Signed-off-by: Elena Khaustova * Fixed refactored part Signed-off-by: Elena Khaustova * Changed the order of arguments for DataCatalog constructor Signed-off-by: Elena Khaustova * Replaced __getitem__ with .get() Signed-off-by: Elena Khaustova * Updated catalog commands Signed-off-by: Elena Khaustova * Moved warm up block outside of the try block Signed-off-by: Elena Khaustova * Fixed linter Signed-off-by: Elena Khaustova * Removed odd copying Signed-off-by: Elena Khaustova * Updated release notes Signed-off-by: Elena Khaustova * Returned DatasetError Signed-off-by: Elena Khaustova * Added _dataset_patterns and _default_pattern to _config_resolver to avoid breaking change Signed-off-by: Elena Khaustova * Made resolve_dataset_pattern return just dict Signed-off-by: Elena Khaustova * Fixed linter Signed-off-by: Elena Khaustova * Added Catalogprotocol draft Signed-off-by: Elena Khaustova * Implemented CatalogProtocol Signed-off-by: Elena Khaustova * Updated types Signed-off-by: Elena Khaustova * Fixed linter Signed-off-by: Elena Khaustova * Added _ImplementsCatalogProtocolValidator Signed-off-by: Elena Khaustova * Updated docstrings Signed-off-by: Elena Khaustova * Fixed tests Signed-off-by: Elena Khaustova * Fixed docs Signed-off-by: Elena Khaustova * Excluded Potocol from coverage Signed-off-by: Elena Khaustova * Fixed docs Signed-off-by: Elena Khaustova * Removed reference to DataCatalog in docstrings Signed-off-by: Elena Khaustova * Removed add_all from protocol Signed-off-by: Elena Khaustova * Updated docstrings Signed-off-by: Elena Khaustova * Updated docstrings Signed-off-by: Elena Khaustova * Fixed docstrings Signed-off-by: Elena Khaustova * Updated RELEASE.md Signed-off-by: Elena Khaustova --------- Signed-off-by: Elena Khaustova Signed-off-by: Ankita Katiyar Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> --- RELEASE.md | 1 + docs/source/conf.py | 2 + kedro/framework/context/context.py | 20 +++---- kedro/framework/hooks/specs.py | 28 ++++----- kedro/framework/project/__init__.py | 25 +++++++- kedro/io/__init__.py | 2 + kedro/io/core.py | 79 ++++++++++++++++++++++++- kedro/runner/parallel_runner.py | 18 +++--- kedro/runner/runner.py | 50 ++++++++-------- kedro/runner/sequential_runner.py | 8 +-- kedro/runner/thread_runner.py | 8 +-- pyproject.toml | 2 +- tests/framework/context/test_context.py | 2 +- tests/runner/test_sequential_runner.py | 4 +- 14 files changed, 178 insertions(+), 71 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 6c126e8ab6..f2f24f9a77 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,7 @@ # Upcoming Release ## Major features and improvements +* Implemented `Protocol` abstraction for the current `DataCatalog` and adding new catalog implementations. * Refactored `kedro run` and `kedro catalog` commands. * Moved pattern resolution logic from `DataCatalog` to a separate component - `CatalogConfigResolver`. Updated `DataCatalog` to use `CatalogConfigResolver` internally. * Made packaged Kedro projects return `session.run()` output to be used when running it in the interactive environment. diff --git a/docs/source/conf.py b/docs/source/conf.py index 2c3a2c4c00..a61ba1b08f 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -130,6 +130,7 @@ "kedro.io.catalog_config_resolver.CatalogConfigResolver", "kedro.io.core.AbstractDataset", "kedro.io.core.AbstractVersionedDataset", + "kedro.io.core.CatalogProtocol", "kedro.io.core.DatasetError", "kedro.io.core.Version", "kedro.io.data_catalog.DataCatalog", @@ -170,6 +171,7 @@ "None. Update D from mapping/iterable E and F.", "Patterns", "CatalogConfigResolver", + "CatalogProtocol", ), "py:data": ( "typing.Any", diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index 3b61b747f6..5c14cbae38 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -14,7 +14,7 @@ from kedro.config import AbstractConfigLoader, MissingConfigException from kedro.framework.project import settings -from kedro.io import DataCatalog # noqa: TCH001 +from kedro.io import CatalogProtocol, DataCatalog # noqa: TCH001 from kedro.pipeline.transcoding import _transcode_split if TYPE_CHECKING: @@ -123,7 +123,7 @@ def _convert_paths_to_absolute_posix( return conf_dictionary -def _validate_transcoded_datasets(catalog: DataCatalog) -> None: +def _validate_transcoded_datasets(catalog: CatalogProtocol) -> None: """Validates transcoded datasets are correctly named Args: @@ -178,13 +178,13 @@ class KedroContext: ) @property - def catalog(self) -> DataCatalog: - """Read-only property referring to Kedro's ``DataCatalog`` for this context. + def catalog(self) -> CatalogProtocol: + """Read-only property referring to Kedro's catalog` for this context. Returns: - DataCatalog defined in `catalog.yml`. + catalog defined in `catalog.yml`. Raises: - KedroContextError: Incorrect ``DataCatalog`` registered for the project. + KedroContextError: Incorrect catalog registered for the project. """ return self._get_catalog() @@ -213,13 +213,13 @@ def _get_catalog( self, save_version: str | None = None, load_versions: dict[str, str] | None = None, - ) -> DataCatalog: - """A hook for changing the creation of a DataCatalog instance. + ) -> CatalogProtocol: + """A hook for changing the creation of a catalog instance. Returns: - DataCatalog defined in `catalog.yml`. + catalog defined in `catalog.yml`. Raises: - KedroContextError: Incorrect ``DataCatalog`` registered for the project. + KedroContextError: Incorrect catalog registered for the project. """ # '**/catalog*' reads modular pipeline configs diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index b0037a0878..3b32eb294c 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from kedro.framework.context import KedroContext - from kedro.io import DataCatalog + from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline from kedro.pipeline.node import Node @@ -22,7 +22,7 @@ class DataCatalogSpecs: @hook_spec def after_catalog_created( # noqa: PLR0913 self, - catalog: DataCatalog, + catalog: CatalogProtocol, conf_catalog: dict[str, Any], conf_creds: dict[str, Any], feed_dict: dict[str, Any], @@ -53,7 +53,7 @@ class NodeSpecs: def before_node_run( self, node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, session_id: str, @@ -63,7 +63,7 @@ def before_node_run( Args: node: The ``Node`` to run. - catalog: A ``DataCatalog`` containing the node's inputs and outputs. + catalog: An implemented instance of ``CatalogProtocol`` containing the node's inputs and outputs. inputs: The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance. @@ -81,7 +81,7 @@ def before_node_run( def after_node_run( # noqa: PLR0913 self, node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], outputs: dict[str, Any], is_async: bool, @@ -93,7 +93,7 @@ def after_node_run( # noqa: PLR0913 Args: node: The ``Node`` that ran. - catalog: A ``DataCatalog`` containing the node's inputs and outputs. + catalog: An implemented instance of ``CatalogProtocol`` containing the node's inputs and outputs. inputs: The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance. @@ -110,7 +110,7 @@ def on_node_error( # noqa: PLR0913 self, error: Exception, node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, session_id: str, @@ -122,7 +122,7 @@ def on_node_error( # noqa: PLR0913 Args: error: The uncaught exception thrown during the node run. node: The ``Node`` to run. - catalog: A ``DataCatalog`` containing the node's inputs and outputs. + catalog: An implemented instance of ``CatalogProtocol`` containing the node's inputs and outputs. inputs: The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance. @@ -137,7 +137,7 @@ class PipelineSpecs: @hook_spec def before_pipeline_run( - self, run_params: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog + self, run_params: dict[str, Any], pipeline: Pipeline, catalog: CatalogProtocol ) -> None: """Hook to be invoked before a pipeline runs. @@ -164,7 +164,7 @@ def before_pipeline_run( } pipeline: The ``Pipeline`` that will be run. - catalog: The ``DataCatalog`` to be used during the run. + catalog: An implemented instance of ``CatalogProtocol`` to be used during the run. """ pass @@ -174,7 +174,7 @@ def after_pipeline_run( run_params: dict[str, Any], run_result: dict[str, Any], pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, ) -> None: """Hook to be invoked after a pipeline runs. @@ -202,7 +202,7 @@ def after_pipeline_run( run_result: The output of ``Pipeline`` run. pipeline: The ``Pipeline`` that was run. - catalog: The ``DataCatalog`` used during the run. + catalog: An implemented instance of ``CatalogProtocol`` used during the run. """ pass @@ -212,7 +212,7 @@ def on_pipeline_error( error: Exception, run_params: dict[str, Any], pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, ) -> None: """Hook to be invoked if a pipeline run throws an uncaught Exception. The signature of this error hook should match the signature of ``before_pipeline_run`` @@ -242,7 +242,7 @@ def on_pipeline_error( } pipeline: The ``Pipeline`` that will was run. - catalog: The ``DataCatalog`` used during the run. + catalog: An implemented instance of ``CatalogProtocol`` used during the run. """ pass diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index a3248b9daf..195fa077f6 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -20,6 +20,7 @@ from dynaconf import LazySettings from dynaconf.validator import ValidationError, Validator +from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline, pipeline if TYPE_CHECKING: @@ -59,6 +60,25 @@ def validate( ) +class _ImplementsCatalogProtocolValidator(Validator): + """A validator to check if the supplied setting value is a subclass of the default class""" + + def validate( + self, settings: dynaconf.base.Settings, *args: Any, **kwargs: Any + ) -> None: + super().validate(settings, *args, **kwargs) + + protocol = CatalogProtocol + for name in self.names: + setting_value = getattr(settings, name) + if not isinstance(setting_value(), protocol): + raise ValidationError( + f"Invalid value '{setting_value.__module__}.{setting_value.__qualname__}' " + f"received for setting '{name}'. It must implement " + f"'{protocol.__module__}.{protocol.__qualname__}'." + ) + + class _HasSharedParentClassValidator(Validator): """A validator to check that the parent of the default class is an ancestor of the settings value.""" @@ -115,8 +135,9 @@ class _ProjectSettings(LazySettings): _CONFIG_LOADER_ARGS = Validator( "CONFIG_LOADER_ARGS", default={"base_env": "base", "default_run_env": "local"} ) - _DATA_CATALOG_CLASS = _IsSubclassValidator( - "DATA_CATALOG_CLASS", default=_get_default_class("kedro.io.DataCatalog") + _DATA_CATALOG_CLASS = _ImplementsCatalogProtocolValidator( + "DATA_CATALOG_CLASS", + default=_get_default_class("kedro.io.DataCatalog"), ) def __init__(self, *args: Any, **kwargs: Any): diff --git a/kedro/io/__init__.py b/kedro/io/__init__.py index 4b4a2e1b52..c4d968c2ba 100644 --- a/kedro/io/__init__.py +++ b/kedro/io/__init__.py @@ -9,6 +9,7 @@ from .core import ( AbstractDataset, AbstractVersionedDataset, + CatalogProtocol, DatasetAlreadyExistsError, DatasetError, DatasetNotFoundError, @@ -23,6 +24,7 @@ "AbstractDataset", "AbstractVersionedDataset", "CachedDataset", + "CatalogProtocol", "DataCatalog", "CatalogConfigResolver", "DatasetAlreadyExistsError", diff --git a/kedro/io/core.py b/kedro/io/core.py index f3975c9c3c..0b722444d4 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -17,7 +17,15 @@ from glob import iglob from operator import attrgetter from pathlib import Path, PurePath, PurePosixPath -from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Generic, + Protocol, + TypeVar, + runtime_checkable, +) from urllib.parse import urlsplit from cachetools import Cache, cachedmethod @@ -29,6 +37,8 @@ if TYPE_CHECKING: import os + from kedro.io.catalog_config_resolver import CatalogConfigResolver, Patterns + VERSION_FORMAT = "%Y-%m-%dT%H.%M.%S.%fZ" VERSIONED_FLAG_KEY = "versioned" VERSION_KEY = "version" @@ -871,3 +881,70 @@ def validate_on_forbidden_chars(**kwargs: Any) -> None: raise DatasetError( f"Neither white-space nor semicolon are allowed in '{key}'." ) + + +_C = TypeVar("_C") + + +@runtime_checkable +class CatalogProtocol(Protocol[_C]): + _datasets: dict[str, AbstractDataset] + + def __contains__(self, ds_name: str) -> bool: + """Check if a dataset is in the catalog.""" + ... + + @property + def config_resolver(self) -> CatalogConfigResolver: + """Return a copy of the datasets dictionary.""" + ... + + @classmethod + def from_config(cls, catalog: dict[str, dict[str, Any]] | None) -> _C: + """Create a catalog instance from configuration.""" + ... + + def _get_dataset( + self, + dataset_name: str, + version: Any = None, + suggest: bool = True, + ) -> AbstractDataset: + """Retrieve a dataset by its name.""" + ... + + def list(self, regex_search: str | None = None) -> list[str]: + """List all dataset names registered in the catalog.""" + ... + + def save(self, name: str, data: Any) -> None: + """Save data to a registered dataset.""" + ... + + def load(self, name: str, version: str | None = None) -> Any: + """Load data from a registered dataset.""" + ... + + def add(self, ds_name: str, dataset: Any, replace: bool = False) -> None: + """Add a new dataset to the catalog.""" + ... + + def add_feed_dict(self, datasets: dict[str, Any], replace: bool = False) -> None: + """Add datasets to the catalog using the data provided through the `feed_dict`.""" + ... + + def exists(self, name: str) -> bool: + """Checks whether registered data set exists by calling its `exists()` method.""" + ... + + def release(self, name: str) -> None: + """Release any cached data associated with a dataset.""" + ... + + def confirm(self, name: str) -> None: + """Confirm a dataset by its name.""" + ... + + def shallow_copy(self, extra_dataset_patterns: Patterns | None = None) -> _C: + """Returns a shallow copy of the current object.""" + ... diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 62d7e1216b..d09601ff7e 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -22,7 +22,7 @@ ) from kedro.framework.project import settings from kedro.io import ( - DataCatalog, + CatalogProtocol, DatasetNotFoundError, MemoryDataset, SharedMemoryDataset, @@ -60,7 +60,7 @@ def _bootstrap_subprocess( def _run_node_synchronization( # noqa: PLR0913 node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, is_async: bool = False, session_id: str | None = None, package_name: str | None = None, @@ -73,7 +73,7 @@ def _run_node_synchronization( # noqa: PLR0913 Args: node: The ``Node`` to run. - catalog: A ``DataCatalog`` containing the node's inputs and outputs. + catalog: An implemented instance of ``CatalogProtocol`` containing the node's inputs and outputs. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. session_id: The session id of the pipeline run. @@ -118,7 +118,7 @@ def __init__( cannot be larger than 61 and will be set to min(61, max_workers). is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets to SharedMemoryDataset for `ParallelRunner`. @@ -168,7 +168,7 @@ def _validate_nodes(cls, nodes: Iterable[Node]) -> None: ) @classmethod - def _validate_catalog(cls, catalog: DataCatalog, pipeline: Pipeline) -> None: + def _validate_catalog(cls, catalog: CatalogProtocol, pipeline: Pipeline) -> None: """Ensure that all data sets are serialisable and that we do not have any non proxied memory data sets being used as outputs as their content will not be synchronized across threads. @@ -213,7 +213,9 @@ def _validate_catalog(cls, catalog: DataCatalog, pipeline: Pipeline) -> None: f"MemoryDatasets" ) - def _set_manager_datasets(self, catalog: DataCatalog, pipeline: Pipeline) -> None: + def _set_manager_datasets( + self, catalog: CatalogProtocol, pipeline: Pipeline + ) -> None: for dataset in pipeline.datasets(): try: catalog.exists(dataset) @@ -240,7 +242,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -248,7 +250,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 6f165e87c0..f3a0889909 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -21,7 +21,7 @@ from more_itertools import interleave from kedro.framework.hooks.manager import _NullPluginManager -from kedro.io import DataCatalog, MemoryDataset +from kedro.io import CatalogProtocol, MemoryDataset from kedro.pipeline import Pipeline if TYPE_CHECKING: @@ -45,7 +45,7 @@ def __init__( Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets on the Runner instances. """ @@ -59,7 +59,7 @@ def _logger(self) -> logging.Logger: def run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager | None = None, session_id: str | None = None, ) -> dict[str, Any]: @@ -68,7 +68,7 @@ def run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. @@ -76,7 +76,7 @@ def run( ValueError: Raised when ``Pipeline`` inputs cannot be satisfied. Returns: - Any node outputs that cannot be processed by the ``DataCatalog``. + Any node outputs that cannot be processed by the catalog. These are returned in a dictionary, where the keys are defined by the node outputs. @@ -94,7 +94,7 @@ def run( if unsatisfied: raise ValueError( - f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" + f"Pipeline input(s) {unsatisfied} not found in the {catalog.__class__.__name__}" ) # Identify MemoryDataset in the catalog @@ -124,7 +124,7 @@ def run( return {ds_name: catalog.load(ds_name) for ds_name in free_outputs} def run_only_missing( - self, pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager + self, pipeline: Pipeline, catalog: CatalogProtocol, hook_manager: PluginManager ) -> dict[str, Any]: """Run only the missing outputs from the ``Pipeline`` using the datasets provided by ``catalog``, and save results back to the @@ -132,7 +132,7 @@ def run_only_missing( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be @@ -140,7 +140,7 @@ def run_only_missing( Returns: Any node outputs that cannot be processed by the - ``DataCatalog``. These are returned in a dictionary, where + catalog. These are returned in a dictionary, where the keys are defined by the node outputs. """ @@ -164,7 +164,7 @@ def run_only_missing( def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -173,7 +173,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. @@ -184,7 +184,7 @@ def _suggest_resume_scenario( self, pipeline: Pipeline, done_nodes: Iterable[Node], - catalog: DataCatalog, + catalog: CatalogProtocol, ) -> None: """ Suggest a command to the user to resume a run after it fails. @@ -194,7 +194,7 @@ def _suggest_resume_scenario( Args: pipeline: the ``Pipeline`` of the run. done_nodes: the ``Node``s that executed successfully. - catalog: the ``DataCatalog`` of the run. + catalog: an implemented instance of ``CatalogProtocol`` of the run. """ remaining_nodes = set(pipeline.nodes) - set(done_nodes) @@ -223,7 +223,7 @@ def _suggest_resume_scenario( def _find_nodes_to_resume_from( - pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: DataCatalog + pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: CatalogProtocol ) -> set[str]: """Given a collection of unfinished nodes in a pipeline using a certain catalog, find the node names to pass to pipeline.from_nodes() @@ -233,7 +233,7 @@ def _find_nodes_to_resume_from( Args: pipeline: the ``Pipeline`` to find starting nodes for. unfinished_nodes: collection of ``Node``s that have not finished yet - catalog: the ``DataCatalog`` of the run. + catalog: an implemented instance of ``CatalogProtocol`` of the run. Returns: Set of node names to pass to pipeline.from_nodes() to continue @@ -251,7 +251,7 @@ def _find_nodes_to_resume_from( def _find_all_nodes_for_resumed_pipeline( - pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog + pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: CatalogProtocol ) -> set[Node]: """Breadth-first search approach to finding the complete set of ``Node``s which need to run to cover all unfinished nodes, @@ -261,7 +261,7 @@ def _find_all_nodes_for_resumed_pipeline( Args: pipeline: the ``Pipeline`` to analyze. unfinished_nodes: the iterable of ``Node``s which have not finished yet. - catalog: the ``DataCatalog`` of the run. + catalog: an implemented instance of ``CatalogProtocol`` of the run. Returns: A set containing all input unfinished ``Node``s and all remaining @@ -309,12 +309,12 @@ def _nodes_with_external_inputs(nodes_of_interest: Iterable[Node]) -> set[Node]: return set(p_nodes_with_external_inputs.nodes) -def _enumerate_non_persistent_inputs(node: Node, catalog: DataCatalog) -> set[str]: +def _enumerate_non_persistent_inputs(node: Node, catalog: CatalogProtocol) -> set[str]: """Enumerate non-persistent input datasets of a ``Node``. Args: node: the ``Node`` to check the inputs of. - catalog: the ``DataCatalog`` of the run. + catalog: an implemented instance of ``CatalogProtocol`` of the run. Returns: Set of names of non-persistent inputs of given ``Node``. @@ -379,7 +379,7 @@ def _find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[ def run_node( node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, is_async: bool = False, session_id: str | None = None, @@ -388,7 +388,7 @@ def run_node( Args: node: The ``Node`` to run. - catalog: A ``DataCatalog`` containing the node's inputs and outputs. + catalog: An implemented instance of ``CatalogProtocol`` containing the node's inputs and outputs. hook_manager: The ``PluginManager`` to activate hooks. is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. @@ -422,7 +422,7 @@ def run_node( def _collect_inputs_from_hook( # noqa: PLR0913 node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, hook_manager: PluginManager, @@ -455,7 +455,7 @@ def _collect_inputs_from_hook( # noqa: PLR0913 def _call_node_run( # noqa: PLR0913 node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, inputs: dict[str, Any], is_async: bool, hook_manager: PluginManager, @@ -486,7 +486,7 @@ def _call_node_run( # noqa: PLR0913 def _run_node_sequential( node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> Node: @@ -533,7 +533,7 @@ def _run_node_sequential( def _run_node_async( node: Node, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> Node: diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 48dac3cd54..c888e737cf 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -14,7 +14,7 @@ if TYPE_CHECKING: from pluggy import PluginManager - from kedro.io import DataCatalog + from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline @@ -34,7 +34,7 @@ def __init__( Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets to MemoryDataset for `SequentialRunner`. @@ -48,7 +48,7 @@ def __init__( def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -56,7 +56,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index b4751a602a..5ad13b9153 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -16,7 +16,7 @@ if TYPE_CHECKING: from pluggy import PluginManager - from kedro.io import DataCatalog + from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline from kedro.pipeline.node import Node @@ -43,7 +43,7 @@ def __init__( is_async: If True, set to False, because `ThreadRunner` doesn't support loading and saving the node inputs and outputs asynchronously with threads. Defaults to False. - extra_dataset_patterns: Extra dataset factory patterns to be added to the DataCatalog + extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog during the run. This is used to set the default datasets to MemoryDataset for `ThreadRunner`. @@ -87,7 +87,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: def _run( self, pipeline: Pipeline, - catalog: DataCatalog, + catalog: CatalogProtocol, hook_manager: PluginManager, session_id: str | None = None, ) -> None: @@ -95,7 +95,7 @@ def _run( Args: pipeline: The ``Pipeline`` to run. - catalog: The ``DataCatalog`` from which to fetch data. + catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. diff --git a/pyproject.toml b/pyproject.toml index 8b7b4cb09b..d9ebbfd70b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,7 +134,7 @@ omit = [ "kedro/runner/parallel_runner.py", "*/site-packages/*", ] -exclude_also = ["raise NotImplementedError", "if TYPE_CHECKING:"] +exclude_also = ["raise NotImplementedError", "if TYPE_CHECKING:", "class CatalogProtocol"] [tool.pytest.ini_options] addopts=""" diff --git a/tests/framework/context/test_context.py b/tests/framework/context/test_context.py index 61e4bbaa6f..ea62cb04c9 100644 --- a/tests/framework/context/test_context.py +++ b/tests/framework/context/test_context.py @@ -261,7 +261,7 @@ def test_wrong_catalog_type(self, mock_settings_file_bad_data_catalog_class): pattern = ( "Invalid value 'tests.framework.context.test_context.BadCatalog' received " "for setting 'DATA_CATALOG_CLASS'. " - "It must be a subclass of 'kedro.io.data_catalog.DataCatalog'." + "It must implement 'kedro.io.core.CatalogProtocol'." ) mock_settings = _ProjectSettings( settings_file=str(mock_settings_file_bad_data_catalog_class) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index dbc73a30f0..4f22bab296 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -130,7 +130,9 @@ def test_conflict_feed_catalog( def test_unsatisfied_inputs(self, is_async, unfinished_outputs_pipeline, catalog): """ds1, ds2 and ds3 were not specified.""" - with pytest.raises(ValueError, match=r"not found in the DataCatalog"): + with pytest.raises( + ValueError, match=rf"not found in the {catalog.__class__.__name__}" + ): SequentialRunner(is_async=is_async).run( unfinished_outputs_pipeline, catalog )