From ff4a65ea00ab1b01d4b0628d9c5975a34edb59e9 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 11 Oct 2023 16:35:41 +0200 Subject: [PATCH] Adding lineage/usage/operation aspect generation --- .../docs/sources/teradata/teradata_pre.md | 10 ++ .../datahub/ingestion/source/sql/teradata.py | 144 +++++++++++++++++- 2 files changed, 152 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/docs/sources/teradata/teradata_pre.md b/metadata-ingestion/docs/sources/teradata/teradata_pre.md index 2f134c4dde778..eb59caa29eb52 100644 --- a/metadata-ingestion/docs/sources/teradata/teradata_pre.md +++ b/metadata-ingestion/docs/sources/teradata/teradata_pre.md @@ -13,6 +13,16 @@ GRANT SELECT ON DBC.IndicesV TO datahub; GRANT SELECT ON dbc.TableTextV TO datahub; GRANT SELECT ON dbc.TablesV TO datahub; + GRANT SELECT ON dbc.dbqlogtbl TO datahub; -- if lineage or usage extraction is enabled ``` If you want to run profiling, you need to grant select permission on all the tables you want to profile. + +3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which +will fit for your queries (the default query text size Teradata captures is max 200 chars) + An example how you can set it for all users: + ```sql + REPLACE QUERY LOGGING LIMIT SQLTEXT=2000 ON ALL; + ``` + See more here about query logging: + [https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs]() diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index caec47cdce9f0..f19f00c718582 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -1,11 +1,17 @@ import logging +from dataclasses import dataclass +from typing import Iterable, Union, Optional, Set # This import verifies that the dependencies are available. import teradatasqlalchemy # noqa: F401 import teradatasqlalchemy.types as custom_types from pydantic.fields import Field +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.time_window_config import BaseTimeWindowConfig +from datahub.emitter.sql_parsing_builder import SqlParsingBuilder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SourceCapability, @@ -15,15 +21,22 @@ platform_name, support_status, ) -from datahub.ingestion.source.sql.sql_common import register_custom_type +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.source.sql.sql_common import register_custom_type, SqlWorkUnit +from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source.sql.two_tier_sql_source import ( TwoTierSQLAlchemyConfig, TwoTierSQLAlchemySource, ) +from datahub.ingestion.source.usage.usage_common import BaseUsageConfig +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport +from datahub.ingestion.source_report.time_window import BaseTimeWindowReport from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BytesTypeClass, TimeTypeClass, ) +from datahub.utilities.sqlglot_lineage import sqlglot_lineage, SchemaResolver logger: logging.Logger = logging.getLogger(__name__) @@ -48,15 +61,47 @@ register_custom_type(custom_types.XML, BytesTypeClass) +@dataclass +class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport): + num_queries_parsed: int = 0 + num_table_parse_failures: int = 0 + + class BaseTeradataConfig(TwoTierSQLAlchemyConfig): scheme = Field(default="teradatasql", description="database scheme") -class TeradataConfig(BaseTeradataConfig): +class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): database_pattern = Field( default=AllowDenyPattern(deny=["dbc"]), description="Regex patterns for databases to filter in ingestion.", ) + include_table_lineage = Field( + default=False, + description="Whether to include table lineage in the ingestion. " + "This requires to have the table lineage feature enabled.", + ) + + usage: BaseUsageConfig = Field( + description="The usage config to use when generating usage statistics", + default=BaseUsageConfig(), + ) + + use_schema_resolver: bool = Field( + default=True, + description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.", + hidden_from_docs=True, + ) + + default_db: Optional[str] = Field( + default=None, + description="The default database to use for unqualified table names", + ) + + include_usage_statistics: bool = Field( + default=False, + description="Generate usage statistic.", + ) @platform_name("Teradata") @@ -67,6 +112,8 @@ class TeradataConfig(BaseTeradataConfig): @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") @capability(SourceCapability.DELETION_DETECTION, "Optionally enabled via configuration") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") +@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") +@capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration") class TeradataSource(TwoTierSQLAlchemySource): """ This plugin extracts the following: @@ -78,10 +125,103 @@ class TeradataSource(TwoTierSQLAlchemySource): config: TeradataConfig + LINEAGE_QUERY: str = """SELECT ProcID, UserName as "user", StartTime AT TIME ZONE 'GMT' as "timestamp", DefaultDatabase as default_database, QueryText as query + FROM "DBC".DBQLogTbl + where ErrorCode = 0 + and QueryText like 'create table demo_user.test_lineage%' + and "timestamp" >= TIMESTAMP '{start_time}' + and "timestamp" < TIMESTAMP '{end_time}' + """ + urns: Optional[Set[str]] + def __init__(self, config: TeradataConfig, ctx: PipelineContext): super().__init__(config, ctx, "teradata") + self.report: TeradataReport = TeradataReport() + self.graph: Optional[DataHubGraph] = ctx.graph + + if self.graph: + if self.config.use_schema_resolver: + self.schema_resolver = ( + self.graph.initialize_schema_resolver_from_datahub( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + ) + self.urns = self.schema_resolver.get_urns() + else: + self.schema_resolver = self.graph._make_schema_resolver( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + self.urns = None + else: + self.schema_resolver = SchemaResolver( + platform=self.platform, + platform_instance=self.config.platform_instance, + graph=None, + env=self.config.env, + ) + self.urns = None + + self.builder: SqlParsingBuilder = SqlParsingBuilder( + usage_config=self.config.usage + if self.config.include_usage_statistics + else None, + generate_lineage=self.config.include_table_lineage, + generate_usage_statistics=self.config.include_usage_statistics, + generate_operations=self.config.usage.include_operational_stats, + ) + @classmethod def create(cls, config_dict, ctx): config = TeradataConfig.parse_obj(config_dict) return cls(config, ctx) + + def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]: + engine = self.get_metadata_engine() + for entry in engine.execute( + self.LINEAGE_QUERY.format( + start_time=self.config.start_time, end_time=self.config.end_time + ) + ): + self.report.num_queries_parsed += 1 + if self.report.num_queries_parsed % 1000 == 0: + logger.info(f"Parsed {self.report.num_queries_parsed} queries") + + result = sqlglot_lineage( + sql=entry.query, + schema_resolver=self.schema_resolver, + default_db=None, + default_schema=entry.default_database + if entry.default_database + else self.config.default_db, + ) + if result.debug_info.table_error: + logger.debug( + f"Error parsing table lineage, {result.debug_info.table_error}" + ) + self.report.num_table_parse_failures += 1 + continue + + yield from self.builder.process_sql_parsing_result( + result, + query=entry.query, + query_timestamp=entry.timestamp, + user=f"urn:li:corpuser:{entry.user}", + include_urns=self.urns, + ) + + def get_metadata_engine(self) -> Engine: + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") + return create_engine(url, **self.config.options) + + def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + yield from super().get_workunits_internal() + if self.config.include_table_lineage or self.config.include_usage_statistics: + self.report.report_ingestion_stage_start("audit log extraction") + yield from self.get_audit_log_mcps() + yield from self.builder.gen_workunits()