Skip to content

Commit

Permalink
feat(ingest/teradata): Add option to not use file backed dict for vie…
Browse files Browse the repository at this point in the history
…w definitions (#9024)
  • Loading branch information
asikowitz authored Oct 16, 2023
1 parent 9ccd1d4 commit 6366b63
Showing 1 changed file with 20 additions and 27 deletions.
47 changes: 20 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional, Set, Union
from typing import Iterable, MutableMapping, Optional, Union

# This import verifies that the dependencies are available.
import teradatasqlalchemy # noqa: F401
Expand All @@ -12,7 +12,6 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -34,11 +33,7 @@
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import (
MetadataChangeEventClass,
SchemaMetadataClass,
ViewPropertiesClass,
)
from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
Expand Down Expand Up @@ -112,6 +107,11 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
description="Generate usage statistic.",
)

use_file_backed_cache: bool = Field(
default=True,
description="Whether to use a file backed cache for the view definitions.",
)


@platform_name("Teradata")
@config_class(TeradataConfig)
Expand Down Expand Up @@ -142,7 +142,8 @@ class TeradataSource(TwoTierSQLAlchemySource):
and "timestamp" >= TIMESTAMP '{start_time}'
and "timestamp" < TIMESTAMP '{end_time}'
"""
urns: Optional[Set[str]]

_view_definition_cache: MutableMapping[str, str]

def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata")
Expand All @@ -166,7 +167,10 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
env=self.config.env,
)

self._view_definition_cache: FileBackedDict[str] = FileBackedDict()
if self.config.use_file_backed_cache:
self._view_definition_cache = FileBackedDict[str]()
else:
self._view_definition_cache = {}

@classmethod
def create(cls, config_dict, ctx):
Expand Down Expand Up @@ -249,24 +253,13 @@ def get_metadata_engine(self) -> Engine:
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
# Add all schemas to the schema resolver
for wu in super().get_workunits_internal():
if isinstance(wu.metadata, MetadataChangeEventClass):
if wu.metadata.proposedSnapshot:
for aspect in wu.metadata.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
self.schema_resolver.add_schema_metadata(
wu.metadata.proposedSnapshot.urn,
aspect,
)
break
if isinstance(wu.metadata, MetadataChangeProposalWrapper):
if (
wu.metadata.entityUrn
and isinstance(wu.metadata.aspect, ViewPropertiesClass)
and wu.metadata.aspect.viewLogic
):
self._view_definition_cache[
wu.metadata.entityUrn
] = wu.metadata.aspect.viewLogic
urn = wu.get_urn()
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
if schema_metadata:
self.schema_resolver.add_schema_metadata(urn, schema_metadata)
view_properties = wu.get_aspect_of_type(ViewPropertiesClass)
if view_properties and self.config.include_view_lineage:
self._view_definition_cache[urn] = view_properties.viewLogic
yield wu

if self.config.include_view_lineage:
Expand Down

0 comments on commit 6366b63

Please sign in to comment.