diff --git a/src/cdf/cli.py b/src/cdf/cli.py index e0ce9c2..411ba10 100644 --- a/src/cdf/cli.py +++ b/src/cdf/cli.py @@ -7,13 +7,16 @@ import sys import typing as t from contextvars import Token +from enum import Enum from pathlib import Path +import pydantic import rich import typer import cdf.core.constants as c import cdf.core.context as context +from cdf.core.feature_flag import FlagProvider from cdf.core.project import Workspace, load_project from cdf.core.runtime import ( execute_notebook_specification, @@ -21,6 +24,13 @@ execute_publisher_specification, execute_script_specification, ) +from cdf.core.specification import ( + NotebookSpecification, + PipelineSpecification, + PublisherSpecification, + ScriptSpecification, + SinkSpecification, +) app = typer.Typer( rich_markup_mode="rich", @@ -368,6 +378,54 @@ def jupyter_lab( context.active_project.reset(token) +class _SpecType(str, Enum): + """An enum of specs which can be described via the `spec` command.""" + + pipeline = "pipeline" + publisher = "publisher" + script = "script" + notebook = "notebook" + sink = "sink" + feature_flags = "feature_flags" + + +@app.command(rich_help_panel="Develop") +def spec(name: _SpecType) -> None: + """:mag: Print the fields for a given spec type. + + \f + Args: + name: The name of the spec to print. + """ + + def _print_spec(spec: t.Type[pydantic.BaseModel]) -> None: + console.print(f"[bold]{spec.__name__}:[/bold]") + for name, info in spec.model_fields.items(): + typ = getattr(info.annotation, "__name__", info.annotation) + desc = info.description or "No description provided." + d = f"- [blue]{name}[/blue] ({typ!s}): {desc}" + if "Undefined" not in str(info.default): + d += f" Defaults to `{info.default}`)" + console.print(d) + console.print() + + if name == _SpecType.pipeline: + _print_spec(PipelineSpecification) + elif name == _SpecType.publisher: + _print_spec(PublisherSpecification) + elif name == _SpecType.script: + _print_spec(ScriptSpecification) + elif name == _SpecType.notebook: + _print_spec(NotebookSpecification) + elif name == _SpecType.sink: + _print_spec(SinkSpecification) + elif name == _SpecType.feature_flags: + for spec in t.get_args(FlagProvider): + _print_spec(spec) + else: + raise ValueError(f"Invalid spec type {name}.") + + def _unwrap_workspace(workspace_name: str, path: Path) -> t.Tuple["Workspace", "Token"]: """Unwrap the workspace from the context.""" workspace = ( diff --git a/src/cdf/core/feature_flag/file.py b/src/cdf/core/feature_flag/file.py index c862f74..dc24d4d 100644 --- a/src/cdf/core/feature_flag/file.py +++ b/src/cdf/core/feature_flag/file.py @@ -19,10 +19,14 @@ class FileFlagProvider(BaseFlagProvider, extra="allow", arbitrary_types_allowed= description="The path to the file where the feature flags are stored in the configured filesystem." ) storage: fsspec.AbstractFileSystem = pydantic.Field( - default=fsspec.filesystem("file") + default=fsspec.filesystem("file"), + description="This leverages the configured filesystem and can be used to store the flags in S3, GCS, etc. It should not be set directly.", + exclude=True, ) - provider: t.Literal["file"] = "file" + provider: t.Literal["file"] = pydantic.Field( + "file", frozen=True, description="The feature flag provider." + ) _lock: Lock = pydantic.PrivateAttr(default_factory=Lock) diff --git a/src/cdf/core/feature_flag/harness.py b/src/cdf/core/feature_flag/harness.py index f05b5b4..af551f7 100644 --- a/src/cdf/core/feature_flag/harness.py +++ b/src/cdf/core/feature_flag/harness.py @@ -63,7 +63,9 @@ class HarnessFlagProvider(BaseFlagProvider, extra="allow"): description="The harness project ID.", ) - provider: t.Literal["harness"] = "harness" + provider: t.Literal["harness"] = pydantic.Field( + "harness", frozen=True, description="The feature flag provider." + ) _client: t.Optional[CfClient] = None diff --git a/src/cdf/core/feature_flag/launchdarkly.py b/src/cdf/core/feature_flag/launchdarkly.py index ee4b93c..f4f51a9 100644 --- a/src/cdf/core/feature_flag/launchdarkly.py +++ b/src/cdf/core/feature_flag/launchdarkly.py @@ -2,6 +2,7 @@ import typing as t +import pydantic from dlt.sources import DltSource from cdf.core.feature_flag.base import BaseFlagProvider @@ -10,9 +11,13 @@ class LaunchDarklyFlagProvider(BaseFlagProvider, extra="allow"): """LaunchDarkly feature flag provider.""" - sdk_key: str + sdk_key: str = pydantic.Field( + description="The LaunchDarkly SDK key used to connect to the LaunchDarkly service." + ) - provider: t.Literal["launchdarkly"] = "launchdarkly" + provider: t.Literal["launchdarkly"] = pydantic.Field( + "launchdarkly", frozen=True, description="The feature flag provider." + ) def apply_source(self, source: DltSource) -> DltSource: raise NotImplementedError("LaunchDarkly feature flags are not yet supported") diff --git a/src/cdf/core/feature_flag/noop.py b/src/cdf/core/feature_flag/noop.py index 6294f50..be578a8 100644 --- a/src/cdf/core/feature_flag/noop.py +++ b/src/cdf/core/feature_flag/noop.py @@ -2,6 +2,7 @@ import typing as t +import pydantic from dlt.sources import DltSource from cdf.core.feature_flag.base import BaseFlagProvider @@ -10,7 +11,9 @@ class NoopFlagProvider(BaseFlagProvider, extra="allow"): """LaunchDarkly feature flag provider.""" - provider: t.Literal["noop"] = "noop" + provider: t.Literal["noop"] = pydantic.Field( + "noop", frozen=True, description="The feature flag provider." + ) def apply_source(self, source: DltSource) -> DltSource: return source diff --git a/src/cdf/core/specification/base.py b/src/cdf/core/specification/base.py index 1253b48..2340f45 100644 --- a/src/cdf/core/specification/base.py +++ b/src/cdf/core/specification/base.py @@ -52,20 +52,45 @@ class BaseComponent(pydantic.BaseModel): default_factory=_gen_anon_name, pattern=r"^[a-zA-Z0-9_-]+$", max_length=64, + description="The name of the component. Must be unique within the workspace.", ), ] """The name of the component.""" - version: t.Annotated[int, pydantic.Field(1, ge=1, le=999, frozen=True)] = 1 + version: t.Annotated[ + int, + pydantic.Field( + 1, + ge=1, + le=999, + frozen=True, + description="The version of the component. Used internally to version datasets and serves as an external signal to dependees that something has changed in a breaking way. All components are versioned.", + ), + ] = 1 """The version of the component.""" - owner: str | None = None + owner: str | None = pydantic.Field( + None, + description="The owners of the component.", + ) """The owners of the component.""" - description: str = _NO_DESCRIPTION + description: str = pydantic.Field( + _NO_DESCRIPTION, + description="The description of the component. This should help users understand the purpose of the component. For scripts and entrypoints, we will attempt to extract the relevant docstring.", + ) """The description of the component.""" - tags: t.List[str] = [] + tags: t.List[str] = pydantic.Field( + default_factory=list, + description="Tags for the component. Used for queries and integrations.", + ) """Tags for this component used for component queries and integrations.""" - enabled: bool = True + enabled: bool = pydantic.Field( + True, + description="Whether this component is enabled. Respected in cdf operations.", + ) """Whether this component is enabled.""" - meta: t.Dict[str, t.Any] = {} + meta: t.Dict[str, t.Any] = pydantic.Field( + default_factory=dict, + description="Arbitrary user-defined metadata for this component. Used for user-specific integrations and automation.", + ) """Arbitrary user-defined metadata for this component.""" @property @@ -124,9 +149,18 @@ def _setup_base(self): class WorkspaceComponent(BaseComponent): """A component within a workspace.""" - workspace_path: Path = Path(".") + workspace_path: Path = pydantic.Field( + Path("."), + exclude=True, + description="The fully qualified path to the workspace containing the component. This changes from system to system and should not be set by the user or serialized.", + ) """The path to the workspace containing the component.""" - component_path: Path = pydantic.Field(Path("."), alias="path") + component_path: Path = pydantic.Field( + Path("."), + alias="path", + frozen=True, + description="The path to the component within the workspace folder.", + ) """The path to the component within the workspace folder.""" _folder: str = "." @@ -159,7 +193,12 @@ def _component_path_validator(cls, component_path: t.Any) -> Path: class Schedulable(pydantic.BaseModel): """A mixin for schedulable components.""" - cron_string: str = pydantic.Field("@daily", serialization_alias="cron") + cron_string: str = pydantic.Field( + "@daily", + serialization_alias="cron", + frozen=True, + description="A cron expression for scheduling the primary action associated with the component. This is intended to be leveraged by libraries like Airflow.", + ) """A cron expression for scheduling the primary action associated with the component.""" @property @@ -202,7 +241,11 @@ def _cron_validator(cls, cron_string: t.Any) -> str: class InstallableRequirements(pydantic.BaseModel): """A mixin for components that support installation of requirements.""" - requirements: t.List[t.Annotated[str, pydantic.Field(..., frozen=True)]] = [] + requirements: t.List[str] = pydantic.Field( + [], + frozen=True, + description="The full set of requirements needed to run this component independently. Used for packaging.", + ) """The requirements for the component.""" @pydantic.field_validator("requirements", mode="before") @@ -314,7 +357,12 @@ class PythonEntrypoint(BaseComponent, InstallableRequirements): entrypoint: t.Annotated[ str, - pydantic.Field(..., frozen=True, pattern=r"^[a-zA-Z0-9_\.]+:[a-zA-Z0-9_\.]+$"), + pydantic.Field( + ..., + frozen=True, + pattern=r"^[a-zA-Z0-9_\.]+:[a-zA-Z0-9_\.]+$", + description="The entrypoint function in the format module:func.", + ), ] """The entrypoint of the component.""" diff --git a/src/cdf/core/specification/pipeline.py b/src/cdf/core/specification/pipeline.py index a6a0bf6..3b49216 100644 --- a/src/cdf/core/specification/pipeline.py +++ b/src/cdf/core/specification/pipeline.py @@ -29,7 +29,10 @@ def __call__( class PipelineMetricSpecification(PythonEntrypoint): """Defines metrics which can be captured during pipeline execution""" - options: t.Dict[str, t.Any] = {} + options: t.Dict[str, t.Any] = pydantic.Field( + default_factory=dict, + description="Kwargs to pass to the metric function if it is a callable that returns a metric interface. If the metric is already a metric interface, this should be left empty.", + ) """ Kwargs to pass to the metric function. @@ -89,7 +92,10 @@ def __call__(self, item: TDataItem) -> bool: ... class PipelineFilterSpecification(PythonEntrypoint): """Defines filters which can be applied to pipeline execution""" - options: t.Dict[str, t.Any] = {} + options: t.Dict[str, t.Any] = pydantic.Field( + default_factory=dict, + description="Kwargs to pass to the filter function if it is a callable that returns a filter interface. If the filter is already a filter interface, this should be left empty.", + ) """ Kwargs to pass to the filter function. @@ -121,7 +127,10 @@ class SchemaOptions(pydantic.BaseModel): class PipelineSpecification(PythonScript, Schedulable): """A pipeline specification.""" - metrics: InlineMetricSpecifications = {} + metrics: InlineMetricSpecifications = pydantic.Field( + default_factory=dict, + description="A dict of resource name glob patterns to metric definitions.", + ) """ A dict of resource name glob patterns to metric definitions. @@ -129,7 +138,10 @@ class PipelineSpecification(PythonScript, Schedulable): accumulated into the metric_state dict. The metric definitions are callables that take the current item and the current metric value and return the new metric value. """ - filters: InlineFilterSpecifications = {} + filters: InlineFilterSpecifications = pydantic.Field( + default_factory=dict, + description="A dict of resource name glob patterns to filter definitions.", + ) """ A dict of resource name glob patterns to filter definitions. @@ -138,7 +150,10 @@ class PipelineSpecification(PythonScript, Schedulable): whether the item should be filtered out. """ - dataset_name: str = "" + dataset_name: str = pydantic.Field( + "{name}_v{version}", + description="The name of the dataset associated with the pipeline. Defaults to the versioned name. This string is formatted with the pipeline name, version, meta, and tags.", + ) """The name of the dataset associated with the pipeline.""" _metric_state: t.Dict[str, t.Dict[str, Metric]] = {}