Skip to content

Commit

Permalink
docs(ingest): inherit capabilities from superclasses (#9174)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Nov 6, 2023
1 parent 4a4c290 commit 86d2b08
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
4 changes: 4 additions & 0 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def get_long_description():
f"acryl-datahub[testing-utils]{_self_pin}",
# Extra requirements for loading our test dags.
"apache-airflow[snowflake]>=2.0.2",
# Connexion's new version breaks Airflow:
# See https://github.com/apache/airflow/issues/35234.
# TODO: We should transition to using Airflow's constraints file.
"connexion<3",
# https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350
# Eventually we want to set this to "snowflake-sqlalchemy>=1.4.3".
# However, that doesn't work with older versions of Airflow. Instead
Expand Down
12 changes: 11 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,20 @@ def capability(
"""

def wrapper(cls: Type) -> Type:
if not hasattr(cls, "__capabilities"):
if not hasattr(cls, "__capabilities") or any(
# It's from this class and not a superclass.
cls.__capabilities is getattr(base, "__capabilities", None)
for base in cls.__bases__
):
cls.__capabilities = {}
cls.get_capabilities = lambda: cls.__capabilities.values()

# If the superclasses have capability annotations, copy those over.
for base in cls.__bases__:
base_caps = getattr(base, "__capabilities", None)
if base_caps:
cls.__capabilities.update(base_caps)

cls.__capabilities[capability_name] = CapabilitySetting(
capability=capability_name, description=description, supported=supported
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import capability
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
IngestionCheckpointingProviderBase,
JobId,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.source.state.checkpoint import Checkpoint, StateType
from datahub.ingestion.source.state.use_case_handler import (
StatefulIngestionUsecaseHandlerBase,
Expand Down Expand Up @@ -177,6 +178,11 @@ class StatefulIngestionReport(SourceReport):
pass


@capability(
SourceCapability.DELETION_DETECTION,
"Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
supported=True,
)
class StatefulIngestionSourceBase(Source):
"""
Defines the base class for all stateful sources.
Expand Down

0 comments on commit 86d2b08

Please sign in to comment.