Skip to content

Commit

Permalink
feat: add cmd to print the fields of a pydantic spec to simplify config
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Apr 8, 2024
1 parent 2c13bf3 commit 926df27
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 22 deletions.
58 changes: 58 additions & 0 deletions src/cdf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,30 @@
import sys
import typing as t
from contextvars import Token
from enum import Enum
from pathlib import Path

import pydantic
import rich
import typer

import cdf.core.constants as c
import cdf.core.context as context
from cdf.core.feature_flag import FlagProvider
from cdf.core.project import Workspace, load_project
from cdf.core.runtime import (
execute_notebook_specification,
execute_pipeline_specification,
execute_publisher_specification,
execute_script_specification,
)
from cdf.core.specification import (
NotebookSpecification,
PipelineSpecification,
PublisherSpecification,
ScriptSpecification,
SinkSpecification,
)

app = typer.Typer(
rich_markup_mode="rich",
Expand Down Expand Up @@ -368,6 +378,54 @@ def jupyter_lab(
context.active_project.reset(token)


class _SpecType(str, Enum):
"""An enum of specs which can be described via the `spec` command."""

pipeline = "pipeline"
publisher = "publisher"
script = "script"
notebook = "notebook"
sink = "sink"
feature_flags = "feature_flags"


@app.command(rich_help_panel="Develop")
def spec(name: _SpecType) -> None:
""":mag: Print the fields for a given spec type.
\f
Args:
name: The name of the spec to print.
"""

def _print_spec(spec: t.Type[pydantic.BaseModel]) -> None:
console.print(f"[bold]{spec.__name__}:[/bold]")
for name, info in spec.model_fields.items():
typ = getattr(info.annotation, "__name__", info.annotation)
desc = info.description or "No description provided."
d = f"- [blue]{name}[/blue] ({typ!s}): {desc}"
if "Undefined" not in str(info.default):
d += f" Defaults to `{info.default}`)"
console.print(d)
console.print()

if name == _SpecType.pipeline:
_print_spec(PipelineSpecification)
elif name == _SpecType.publisher:
_print_spec(PublisherSpecification)
elif name == _SpecType.script:
_print_spec(ScriptSpecification)
elif name == _SpecType.notebook:
_print_spec(NotebookSpecification)
elif name == _SpecType.sink:
_print_spec(SinkSpecification)
elif name == _SpecType.feature_flags:
for spec in t.get_args(FlagProvider):
_print_spec(spec)
else:
raise ValueError(f"Invalid spec type {name}.")


def _unwrap_workspace(workspace_name: str, path: Path) -> t.Tuple["Workspace", "Token"]:
"""Unwrap the workspace from the context."""
workspace = (
Expand Down
8 changes: 6 additions & 2 deletions src/cdf/core/feature_flag/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ class FileFlagProvider(BaseFlagProvider, extra="allow", arbitrary_types_allowed=
description="The path to the file where the feature flags are stored in the configured filesystem."
)
storage: fsspec.AbstractFileSystem = pydantic.Field(
default=fsspec.filesystem("file")
default=fsspec.filesystem("file"),
description="This leverages the configured filesystem and can be used to store the flags in S3, GCS, etc. It should not be set directly.",
exclude=True,
)

provider: t.Literal["file"] = "file"
provider: t.Literal["file"] = pydantic.Field(
"file", frozen=True, description="The feature flag provider."
)

_lock: Lock = pydantic.PrivateAttr(default_factory=Lock)

Expand Down
4 changes: 3 additions & 1 deletion src/cdf/core/feature_flag/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class HarnessFlagProvider(BaseFlagProvider, extra="allow"):
description="The harness project ID.",
)

provider: t.Literal["harness"] = "harness"
provider: t.Literal["harness"] = pydantic.Field(
"harness", frozen=True, description="The feature flag provider."
)

_client: t.Optional[CfClient] = None

Expand Down
9 changes: 7 additions & 2 deletions src/cdf/core/feature_flag/launchdarkly.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import typing as t

import pydantic
from dlt.sources import DltSource

from cdf.core.feature_flag.base import BaseFlagProvider
Expand All @@ -10,9 +11,13 @@
class LaunchDarklyFlagProvider(BaseFlagProvider, extra="allow"):
"""LaunchDarkly feature flag provider."""

sdk_key: str
sdk_key: str = pydantic.Field(
description="The LaunchDarkly SDK key used to connect to the LaunchDarkly service."
)

provider: t.Literal["launchdarkly"] = "launchdarkly"
provider: t.Literal["launchdarkly"] = pydantic.Field(
"launchdarkly", frozen=True, description="The feature flag provider."
)

def apply_source(self, source: DltSource) -> DltSource:
raise NotImplementedError("LaunchDarkly feature flags are not yet supported")
Expand Down
5 changes: 4 additions & 1 deletion src/cdf/core/feature_flag/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import typing as t

import pydantic
from dlt.sources import DltSource

from cdf.core.feature_flag.base import BaseFlagProvider
Expand All @@ -10,7 +11,9 @@
class NoopFlagProvider(BaseFlagProvider, extra="allow"):
"""LaunchDarkly feature flag provider."""

provider: t.Literal["noop"] = "noop"
provider: t.Literal["noop"] = pydantic.Field(
"noop", frozen=True, description="The feature flag provider."
)

def apply_source(self, source: DltSource) -> DltSource:
return source
Expand Down
70 changes: 59 additions & 11 deletions src/cdf/core/specification/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,45 @@ class BaseComponent(pydantic.BaseModel):
default_factory=_gen_anon_name,
pattern=r"^[a-zA-Z0-9_-]+$",
max_length=64,
description="The name of the component. Must be unique within the workspace.",
),
]
"""The name of the component."""
version: t.Annotated[int, pydantic.Field(1, ge=1, le=999, frozen=True)] = 1
version: t.Annotated[
int,
pydantic.Field(
1,
ge=1,
le=999,
frozen=True,
description="The version of the component. Used internally to version datasets and serves as an external signal to dependees that something has changed in a breaking way. All components are versioned.",
),
] = 1
"""The version of the component."""
owner: str | None = None
owner: str | None = pydantic.Field(
None,
description="The owners of the component.",
)
"""The owners of the component."""
description: str = _NO_DESCRIPTION
description: str = pydantic.Field(
_NO_DESCRIPTION,
description="The description of the component. This should help users understand the purpose of the component. For scripts and entrypoints, we will attempt to extract the relevant docstring.",
)
"""The description of the component."""
tags: t.List[str] = []
tags: t.List[str] = pydantic.Field(
default_factory=list,
description="Tags for the component. Used for queries and integrations.",
)
"""Tags for this component used for component queries and integrations."""
enabled: bool = True
enabled: bool = pydantic.Field(
True,
description="Whether this component is enabled. Respected in cdf operations.",
)
"""Whether this component is enabled."""
meta: t.Dict[str, t.Any] = {}
meta: t.Dict[str, t.Any] = pydantic.Field(
default_factory=dict,
description="Arbitrary user-defined metadata for this component. Used for user-specific integrations and automation.",
)
"""Arbitrary user-defined metadata for this component."""

@property
Expand Down Expand Up @@ -124,9 +149,18 @@ def _setup_base(self):
class WorkspaceComponent(BaseComponent):
"""A component within a workspace."""

workspace_path: Path = Path(".")
workspace_path: Path = pydantic.Field(
Path("."),
exclude=True,
description="The fully qualified path to the workspace containing the component. This changes from system to system and should not be set by the user or serialized.",
)
"""The path to the workspace containing the component."""
component_path: Path = pydantic.Field(Path("."), alias="path")
component_path: Path = pydantic.Field(
Path("."),
alias="path",
frozen=True,
description="The path to the component within the workspace folder.",
)
"""The path to the component within the workspace folder."""

_folder: str = "."
Expand Down Expand Up @@ -159,7 +193,12 @@ def _component_path_validator(cls, component_path: t.Any) -> Path:
class Schedulable(pydantic.BaseModel):
"""A mixin for schedulable components."""

cron_string: str = pydantic.Field("@daily", serialization_alias="cron")
cron_string: str = pydantic.Field(
"@daily",
serialization_alias="cron",
frozen=True,
description="A cron expression for scheduling the primary action associated with the component. This is intended to be leveraged by libraries like Airflow.",
)
"""A cron expression for scheduling the primary action associated with the component."""

@property
Expand Down Expand Up @@ -202,7 +241,11 @@ def _cron_validator(cls, cron_string: t.Any) -> str:
class InstallableRequirements(pydantic.BaseModel):
"""A mixin for components that support installation of requirements."""

requirements: t.List[t.Annotated[str, pydantic.Field(..., frozen=True)]] = []
requirements: t.List[str] = pydantic.Field(
[],
frozen=True,
description="The full set of requirements needed to run this component independently. Used for packaging.",
)
"""The requirements for the component."""

@pydantic.field_validator("requirements", mode="before")
Expand Down Expand Up @@ -314,7 +357,12 @@ class PythonEntrypoint(BaseComponent, InstallableRequirements):

entrypoint: t.Annotated[
str,
pydantic.Field(..., frozen=True, pattern=r"^[a-zA-Z0-9_\.]+:[a-zA-Z0-9_\.]+$"),
pydantic.Field(
...,
frozen=True,
pattern=r"^[a-zA-Z0-9_\.]+:[a-zA-Z0-9_\.]+$",
description="The entrypoint function in the format module:func.",
),
]
"""The entrypoint of the component."""

Expand Down
25 changes: 20 additions & 5 deletions src/cdf/core/specification/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ def __call__(
class PipelineMetricSpecification(PythonEntrypoint):
"""Defines metrics which can be captured during pipeline execution"""

options: t.Dict[str, t.Any] = {}
options: t.Dict[str, t.Any] = pydantic.Field(
default_factory=dict,
description="Kwargs to pass to the metric function if it is a callable that returns a metric interface. If the metric is already a metric interface, this should be left empty.",
)
"""
Kwargs to pass to the metric function.
Expand Down Expand Up @@ -89,7 +92,10 @@ def __call__(self, item: TDataItem) -> bool: ...
class PipelineFilterSpecification(PythonEntrypoint):
"""Defines filters which can be applied to pipeline execution"""

options: t.Dict[str, t.Any] = {}
options: t.Dict[str, t.Any] = pydantic.Field(
default_factory=dict,
description="Kwargs to pass to the filter function if it is a callable that returns a filter interface. If the filter is already a filter interface, this should be left empty.",
)
"""
Kwargs to pass to the filter function.
Expand Down Expand Up @@ -121,15 +127,21 @@ class SchemaOptions(pydantic.BaseModel):
class PipelineSpecification(PythonScript, Schedulable):
"""A pipeline specification."""

metrics: InlineMetricSpecifications = {}
metrics: InlineMetricSpecifications = pydantic.Field(
default_factory=dict,
description="A dict of resource name glob patterns to metric definitions.",
)
"""
A dict of resource name glob patterns to metric definitions.
Metrics are captured on a per resource basis during pipeline execution and are
accumulated into the metric_state dict. The metric definitions are callables that
take the current item and the current metric value and return the new metric value.
"""
filters: InlineFilterSpecifications = {}
filters: InlineFilterSpecifications = pydantic.Field(
default_factory=dict,
description="A dict of resource name glob patterns to filter definitions.",
)
"""
A dict of resource name glob patterns to filter definitions.
Expand All @@ -138,7 +150,10 @@ class PipelineSpecification(PythonScript, Schedulable):
whether the item should be filtered out.
"""

dataset_name: str = ""
dataset_name: str = pydantic.Field(
"{name}_v{version}",
description="The name of the dataset associated with the pipeline. Defaults to the versioned name. This string is formatted with the pipeline name, version, meta, and tags.",
)
"""The name of the dataset associated with the pipeline."""

_metric_state: t.Dict[str, t.Dict[str, Metric]] = {}
Expand Down

0 comments on commit 926df27

Please sign in to comment.