diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 6080cf7b371e3..e628e4dbd3446 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -1,7 +1,7 @@ import logging from dataclasses import dataclass from datetime import datetime -from typing import Iterable, Optional, Set, Union +from typing import Iterable, MutableMapping, Optional, Union # This import verifies that the dependencies are available. import teradatasqlalchemy # noqa: F401 @@ -12,7 +12,6 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BaseTimeWindowConfig -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.sql_parsing_builder import SqlParsingBuilder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -34,11 +33,7 @@ from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport -from datahub.metadata._schema_classes import ( - MetadataChangeEventClass, - SchemaMetadataClass, - ViewPropertiesClass, -) +from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BytesTypeClass, TimeTypeClass, @@ -112,6 +107,11 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): description="Generate usage statistic.", ) + use_file_backed_cache: bool = Field( + default=True, + description="Whether to use a file backed cache for the view definitions.", + ) + @platform_name("Teradata") @config_class(TeradataConfig) @@ -142,7 +142,8 @@ class TeradataSource(TwoTierSQLAlchemySource): and "timestamp" >= TIMESTAMP '{start_time}' and "timestamp" < TIMESTAMP '{end_time}' """ - urns: Optional[Set[str]] + + _view_definition_cache: MutableMapping[str, str] def __init__(self, config: TeradataConfig, ctx: PipelineContext): super().__init__(config, ctx, "teradata") @@ -166,7 +167,10 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): env=self.config.env, ) - self._view_definition_cache: FileBackedDict[str] = FileBackedDict() + if self.config.use_file_backed_cache: + self._view_definition_cache = FileBackedDict[str]() + else: + self._view_definition_cache = {} @classmethod def create(cls, config_dict, ctx): @@ -249,24 +253,13 @@ def get_metadata_engine(self) -> Engine: def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: # Add all schemas to the schema resolver for wu in super().get_workunits_internal(): - if isinstance(wu.metadata, MetadataChangeEventClass): - if wu.metadata.proposedSnapshot: - for aspect in wu.metadata.proposedSnapshot.aspects: - if isinstance(aspect, SchemaMetadataClass): - self.schema_resolver.add_schema_metadata( - wu.metadata.proposedSnapshot.urn, - aspect, - ) - break - if isinstance(wu.metadata, MetadataChangeProposalWrapper): - if ( - wu.metadata.entityUrn - and isinstance(wu.metadata.aspect, ViewPropertiesClass) - and wu.metadata.aspect.viewLogic - ): - self._view_definition_cache[ - wu.metadata.entityUrn - ] = wu.metadata.aspect.viewLogic + urn = wu.get_urn() + schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) + if schema_metadata: + self.schema_resolver.add_schema_metadata(urn, schema_metadata) + view_properties = wu.get_aspect_of_type(ViewPropertiesClass) + if view_properties and self.config.include_view_lineage: + self._view_definition_cache[urn] = view_properties.viewLogic yield wu if self.config.include_view_lineage: