Skip to content

Commit

Permalink
feat: add edit schema command
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed May 15, 2024
1 parent f7e3c56 commit ba7b6dd
Showing 1 changed file with 62 additions and 3 deletions.
65 changes: 62 additions & 3 deletions src/cdf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import subprocess
import sys
import tempfile
import typing as t
from contextvars import Token
from enum import Enum
Expand Down Expand Up @@ -475,9 +476,7 @@ def dump_schema(
_ExportFormat, typer.Option(help="The format to dump the schema in.")
] = _ExportFormat.json,
) -> None:
""":wrench: Prints the first N rows of a [b green]Resource[/b green] within a [b blue]pipeline[/b blue]. Defaults to [cyan]5[/cyan].
This is useful for quickly inspecting data :detective: and verifying that it is coming over the wire correctly.
""":mag: Dump the schema of a pipeline:sink combination.
\f
Args:
Expand Down Expand Up @@ -514,6 +513,66 @@ def dump_schema(
context.active_project.reset(token)


@app.command(rich_help_panel="Develop")
def edit_schema(
ctx: typer.Context,
pipeline_to_sink: t.Annotated[
str,
typer.Argument(
help="The pipeline:sink combination from which to fetch the schema."
),
],
) -> None:
""":mag: Edit the schema of a pipeline:sink combination using the system editor.
\f
Args:
ctx: The CLI context.
pipeline_to_sink: The pipeline:sink combination from which to fetch the schema.
Raises:
typer.BadParameter: If the pipeline or sink are not found.
"""
workspace, token = _unwrap_workspace(*ctx.obj)
try:
source, destination = pipeline_to_sink.split(":", 1)
sink, _ = (
workspace.get_sink(destination)
.map(lambda s: s.get_ingest_config())
.unwrap_or((destination, None))
)
spec = workspace.get_pipeline(source).unwrap()
logger.info(f"Clearing local schema and state for {source}.")
pipe = spec.create_pipeline(dlt.Pipeline, destination=sink, staging=None)
pipe.drop()
logger.info(f"Syncing schema for {source}:{destination}.")
rv = execute_pipeline_specification(
spec, sink, dry_run=True, quiet=True
).unwrap()
schema = rv.pipeline.default_schema.clone()
with tempfile.TemporaryDirectory() as tmpdir:
fname = f"{schema.name}.schema.yaml"
with open(os.path.join(tmpdir, fname), "w") as f:
f.write(schema.to_pretty_yaml())
logger.info(f"Editing schema {schema.name}.")
subprocess.run([os.environ.get("EDITOR", "vi"), f.name], check=True)
pipe_mut = spec.create_pipeline(
dlt.Pipeline, import_schema_path=tmpdir, destination=sink, staging=None
)
schema_mut = pipe_mut.default_schema
if schema_mut.version > schema.version:
with pipe_mut.destination_client() as client:
logger.info(
f"Updating schema {schema.name} to version {schema_mut.version} in {destination}."
)
client.update_stored_schema()
logger.info("Schema updated.")
else:
logger.info("Schema not updated.")
finally:
context.active_project.reset(token)


def _unwrap_workspace(workspace_name: str, path: Path) -> t.Tuple["Workspace", "Token"]:
"""Unwrap the workspace from the context."""
workspace = (
Expand Down

0 comments on commit ba7b6dd

Please sign in to comment.