diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index a5af881022d8c..e88fc870cb333 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -101,6 +101,10 @@ def get_long_description(): f"acryl-datahub[testing-utils]{_self_pin}", # Extra requirements for loading our test dags. "apache-airflow[snowflake]>=2.0.2", + # Connexion's new version breaks Airflow: + # See https://github.com/apache/airflow/issues/35234. + # TODO: We should transition to using Airflow's constraints file. + "connexion<3", # https://github.com/snowflakedb/snowflake-sqlalchemy/issues/350 # Eventually we want to set this to "snowflake-sqlalchemy>=1.4.3". # However, that doesn't work with older versions of Airflow. Instead diff --git a/metadata-ingestion/src/datahub/ingestion/api/decorators.py b/metadata-ingestion/src/datahub/ingestion/api/decorators.py index 5e4427047104f..b390ffb9dd036 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/decorators.py +++ b/metadata-ingestion/src/datahub/ingestion/api/decorators.py @@ -93,10 +93,20 @@ def capability( """ def wrapper(cls: Type) -> Type: - if not hasattr(cls, "__capabilities"): + if not hasattr(cls, "__capabilities") or any( + # It's from this class and not a superclass. + cls.__capabilities is getattr(base, "__capabilities", None) + for base in cls.__bases__ + ): cls.__capabilities = {} cls.get_capabilities = lambda: cls.__capabilities.values() + # If the superclasses have capability annotations, copy those over. + for base in cls.__bases__: + base_caps = getattr(base, "__capabilities", None) + if base_caps: + cls.__capabilities.update(base_caps) + cls.__capabilities[capability_name] = CapabilitySetting( capability=capability_name, description=description, supported=supported ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index 7fb2cf9813cab..d11b1f9ad6a53 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -15,11 +15,12 @@ from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import capability from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import ( IngestionCheckpointingProviderBase, JobId, ) -from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.source import Source, SourceCapability, SourceReport from datahub.ingestion.source.state.checkpoint import Checkpoint, StateType from datahub.ingestion.source.state.use_case_handler import ( StatefulIngestionUsecaseHandlerBase, @@ -177,6 +178,11 @@ class StatefulIngestionReport(SourceReport): pass +@capability( + SourceCapability.DELETION_DETECTION, + "Optionally enabled via `stateful_ingestion.remove_stale_metadata`", + supported=True, +) class StatefulIngestionSourceBase(Source): """ Defines the base class for all stateful sources.