Skip to content

Commit

Permalink
Adding lineage/usage/operation aspect generation
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Oct 11, 2023
1 parent 246b28d commit ff4a65e
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 2 deletions.
10 changes: 10 additions & 0 deletions metadata-ingestion/docs/sources/teradata/teradata_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
GRANT SELECT ON DBC.IndicesV TO datahub;
GRANT SELECT ON dbc.TableTextV TO datahub;
GRANT SELECT ON dbc.TablesV TO datahub;
GRANT SELECT ON dbc.dbqlogtbl TO datahub; -- if lineage or usage extraction is enabled
```

If you want to run profiling, you need to grant select permission on all the tables you want to profile.

3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
will fit for your queries (the default query text size Teradata captures is max 200 chars)
An example how you can set it for all users:
```sql
REPLACE QUERY LOGGING LIMIT SQLTEXT=2000 ON ALL;
```
See more here about query logging:
[https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs]()
144 changes: 142 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import logging
from dataclasses import dataclass
from typing import Iterable, Union, Optional, Set

# This import verifies that the dependencies are available.
import teradatasqlalchemy # noqa: F401
import teradatasqlalchemy.types as custom_types
from pydantic.fields import Field
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand All @@ -15,15 +21,22 @@
platform_name,
support_status,
)
from datahub.ingestion.source.sql.sql_common import register_custom_type
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.sql.sql_common import register_custom_type, SqlWorkUnit
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
)
from datahub.utilities.sqlglot_lineage import sqlglot_lineage, SchemaResolver

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -48,15 +61,47 @@
register_custom_type(custom_types.XML, BytesTypeClass)


@dataclass
class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_queries_parsed: int = 0
num_table_parse_failures: int = 0


class BaseTeradataConfig(TwoTierSQLAlchemyConfig):
scheme = Field(default="teradatasql", description="database scheme")


class TeradataConfig(BaseTeradataConfig):
class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
database_pattern = Field(
default=AllowDenyPattern(deny=["dbc"]),
description="Regex patterns for databases to filter in ingestion.",
)
include_table_lineage = Field(
default=False,
description="Whether to include table lineage in the ingestion. "
"This requires to have the table lineage feature enabled.",
)

usage: BaseUsageConfig = Field(
description="The usage config to use when generating usage statistics",
default=BaseUsageConfig(),
)

use_schema_resolver: bool = Field(
default=True,
description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.",
hidden_from_docs=True,
)

default_db: Optional[str] = Field(
default=None,
description="The default database to use for unqualified table names",
)

include_usage_statistics: bool = Field(
default=False,
description="Generate usage statistic.",
)


@platform_name("Teradata")
Expand All @@ -67,6 +112,8 @@ class TeradataConfig(BaseTeradataConfig):
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DELETION_DETECTION, "Optionally enabled via configuration")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration")
@capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration")
class TeradataSource(TwoTierSQLAlchemySource):
"""
This plugin extracts the following:
Expand All @@ -78,10 +125,103 @@ class TeradataSource(TwoTierSQLAlchemySource):

config: TeradataConfig

LINEAGE_QUERY: str = """SELECT ProcID, UserName as "user", StartTime AT TIME ZONE 'GMT' as "timestamp", DefaultDatabase as default_database, QueryText as query
FROM "DBC".DBQLogTbl
where ErrorCode = 0
and QueryText like 'create table demo_user.test_lineage%'
and "timestamp" >= TIMESTAMP '{start_time}'
and "timestamp" < TIMESTAMP '{end_time}'
"""
urns: Optional[Set[str]]

def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata")

self.report: TeradataReport = TeradataReport()
self.graph: Optional[DataHubGraph] = ctx.graph

if self.graph:
if self.config.use_schema_resolver:
self.schema_resolver = (
self.graph.initialize_schema_resolver_from_datahub(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
)
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.urns = None
else:
self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)
self.urns = None

self.builder: SqlParsingBuilder = SqlParsingBuilder(
usage_config=self.config.usage
if self.config.include_usage_statistics
else None,
generate_lineage=self.config.include_table_lineage,
generate_usage_statistics=self.config.include_usage_statistics,
generate_operations=self.config.usage.include_operational_stats,
)

@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
engine = self.get_metadata_engine()
for entry in engine.execute(
self.LINEAGE_QUERY.format(
start_time=self.config.start_time, end_time=self.config.end_time
)
):
self.report.num_queries_parsed += 1
if self.report.num_queries_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} queries")

result = sqlglot_lineage(
sql=entry.query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=entry.default_database
if entry.default_database
else self.config.default_db,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
continue

yield from self.builder.process_sql_parsing_result(
result,
query=entry.query,
query_timestamp=entry.timestamp,
user=f"urn:li:corpuser:{entry.user}",
include_urns=self.urns,
)

def get_metadata_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
return create_engine(url, **self.config.options)

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps()
yield from self.builder.gen_workunits()

0 comments on commit ff4a65e

Please sign in to comment.