Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/teradata): Add option to not use file backed dict for view definitions #9024

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional, Set, Union
from typing import Iterable, MutableMapping, Optional, Union

# This import verifies that the dependencies are available.
import teradatasqlalchemy # noqa: F401
Expand All @@ -12,7 +12,6 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -34,11 +33,7 @@
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import (
MetadataChangeEventClass,
SchemaMetadataClass,
ViewPropertiesClass,
)
from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
Expand Down Expand Up @@ -112,6 +107,11 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
description="Generate usage statistic.",
)

use_file_backed_cache: bool = Field(
default=True,
description="Whether to use a file backed cache for the view definitions.",
)


@platform_name("Teradata")
@config_class(TeradataConfig)
Expand Down Expand Up @@ -142,7 +142,8 @@ class TeradataSource(TwoTierSQLAlchemySource):
and "timestamp" >= TIMESTAMP '{start_time}'
and "timestamp" < TIMESTAMP '{end_time}'
"""
urns: Optional[Set[str]]

_view_definition_cache: MutableMapping[str, str]

def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata")
Expand All @@ -166,7 +167,10 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
env=self.config.env,
)

self._view_definition_cache: FileBackedDict[str] = FileBackedDict()
if self.config.use_file_backed_cache:
self._view_definition_cache = FileBackedDict[str]()
else:
self._view_definition_cache = {}

@classmethod
def create(cls, config_dict, ctx):
Expand Down Expand Up @@ -249,24 +253,13 @@ def get_metadata_engine(self) -> Engine:
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
# Add all schemas to the schema resolver
for wu in super().get_workunits_internal():
if isinstance(wu.metadata, MetadataChangeEventClass):
if wu.metadata.proposedSnapshot:
for aspect in wu.metadata.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
self.schema_resolver.add_schema_metadata(
wu.metadata.proposedSnapshot.urn,
aspect,
)
break
if isinstance(wu.metadata, MetadataChangeProposalWrapper):
if (
wu.metadata.entityUrn
and isinstance(wu.metadata.aspect, ViewPropertiesClass)
and wu.metadata.aspect.viewLogic
):
self._view_definition_cache[
wu.metadata.entityUrn
] = wu.metadata.aspect.viewLogic
urn = wu.get_urn()
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
if schema_metadata:
self.schema_resolver.add_schema_metadata(urn, schema_metadata)
view_properties = wu.get_aspect_of_type(ViewPropertiesClass)
if view_properties and self.config.include_view_lineage:
self._view_definition_cache[urn] = view_properties.viewLogic
yield wu

if self.config.include_view_lineage:
Expand Down
Loading