Skip to content

Commit

Permalink
feat(ingest/teradata): Teradata source (#8977)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Oct 12, 2023
1 parent d04d25b commit a8f0080
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 1 deletion.
28 changes: 28 additions & 0 deletions metadata-ingestion/docs/sources/teradata/teradata_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
### Prerequisites
1. Create a user which has access to the database you want to ingest.
```sql
CREATE USER datahub FROM <database> AS PASSWORD = <password> PERM = 20000000;
```
2. Create a user with the following privileges:
```sql
GRANT SELECT ON dbc.columns TO datahub;
GRANT SELECT ON dbc.databases TO datahub;
GRANT SELECT ON dbc.tables TO datahub;
GRANT SELECT ON DBC.All_RI_ChildrenV TO datahub;
GRANT SELECT ON DBC.ColumnsV TO datahub;
GRANT SELECT ON DBC.IndicesV TO datahub;
GRANT SELECT ON dbc.TableTextV TO datahub;
GRANT SELECT ON dbc.TablesV TO datahub;
GRANT SELECT ON dbc.dbqlogtbl TO datahub; -- if lineage or usage extraction is enabled
```

If you want to run profiling, you need to grant select permission on all the tables you want to profile.

3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
will fit for your queries (the default query text size Teradata captures is max 200 chars)
An example how you can set it for all users:
```sql
REPLACE QUERY LOGGING LIMIT SQLTEXT=2000 ON ALL;
```
See more here about query logging:
[https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs]()
17 changes: 17 additions & 0 deletions metadata-ingestion/docs/sources/teradata/teradata_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
pipeline_name: my-teradata-ingestion-pipeline
source:
type: teradata
config:
host_port: "myteradatainstance.teradata.com:1025"
#platform_instance: "myteradatainstance"
username: myuser
password: mypassword
#database_pattern:
# allow:
# - "demo_user"
# ignoreCase: true
include_table_lineage: true
include_usage_statistics: true
stateful_ingestion:
enabled: true
sink:
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
# to remove that dependency.
"tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib,
"teradata": sql_common | {"teradatasqlalchemy>=17.20.0.0"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
Expand Down Expand Up @@ -499,6 +500,7 @@
"s3",
"snowflake",
"tableau",
"teradata",
"trino",
"hive",
"starburst-trino-usage",
Expand Down Expand Up @@ -597,6 +599,7 @@
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
"teradata = datahub.ingestion.source.sql.teradata:TeradataSource",
"trino = datahub.ingestion.source.sql.trino:TrinoSource",
"starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource",
"nifi = datahub.ingestion.source.nifi:NifiSource",
Expand Down
228 changes: 228 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import logging
from dataclasses import dataclass
from typing import Iterable, Optional, Set, Union

# This import verifies that the dependencies are available.
import teradatasqlalchemy # noqa: F401
import teradatasqlalchemy.types as custom_types
from pydantic.fields import Field
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
)
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage

logger: logging.Logger = logging.getLogger(__name__)

register_custom_type(custom_types.JSON, BytesTypeClass)
register_custom_type(custom_types.INTERVAL_DAY, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_DAY_TO_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_DAY_TO_MINUTE, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_DAY_TO_HOUR, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_MINUTE, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_MINUTE_TO_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_HOUR, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_HOUR_TO_MINUTE, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_HOUR_TO_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_MONTH, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_YEAR, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_YEAR_TO_MONTH, TimeTypeClass)
register_custom_type(custom_types.MBB, BytesTypeClass)
register_custom_type(custom_types.MBR, BytesTypeClass)
register_custom_type(custom_types.GEOMETRY, BytesTypeClass)
register_custom_type(custom_types.TDUDT, BytesTypeClass)
register_custom_type(custom_types.XML, BytesTypeClass)


@dataclass
class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_queries_parsed: int = 0
num_table_parse_failures: int = 0


class BaseTeradataConfig(TwoTierSQLAlchemyConfig):
scheme = Field(default="teradatasql", description="database scheme")


class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
database_pattern = Field(
default=AllowDenyPattern(deny=["dbc"]),
description="Regex patterns for databases to filter in ingestion.",
)
include_table_lineage = Field(
default=False,
description="Whether to include table lineage in the ingestion. "
"This requires to have the table lineage feature enabled.",
)

usage: BaseUsageConfig = Field(
description="The usage config to use when generating usage statistics",
default=BaseUsageConfig(),
)

use_schema_resolver: bool = Field(
default=True,
description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.",
hidden_from_docs=True,
)

default_db: Optional[str] = Field(
default=None,
description="The default database to use for unqualified table names",
)

include_usage_statistics: bool = Field(
default=False,
description="Generate usage statistic.",
)


@platform_name("Teradata")
@config_class(TeradataConfig)
@support_status(SupportStatus.TESTING)
@capability(SourceCapability.DOMAINS, "Enabled by default")
@capability(SourceCapability.CONTAINERS, "Enabled by default")
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DELETION_DETECTION, "Optionally enabled via configuration")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration")
@capability(SourceCapability.LINEAGE_FINE, "Optionally enabled via configuration")
@capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration")
class TeradataSource(TwoTierSQLAlchemySource):
"""
This plugin extracts the following:
- Metadata for databases, schemas, views, and tables
- Column types associated with each table
- Table, row, and column statistics via optional SQL profiling
"""

config: TeradataConfig

LINEAGE_QUERY: str = """SELECT ProcID, UserName as "user", StartTime AT TIME ZONE 'GMT' as "timestamp", DefaultDatabase as default_database, QueryText as query
FROM "DBC".DBQLogTbl
where ErrorCode = 0
and QueryText like 'create table demo_user.test_lineage%'
and "timestamp" >= TIMESTAMP '{start_time}'
and "timestamp" < TIMESTAMP '{end_time}'
"""
urns: Optional[Set[str]]

def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata")

self.report: TeradataReport = TeradataReport()
self.graph: Optional[DataHubGraph] = ctx.graph

if self.graph:
if self.config.use_schema_resolver:
self.schema_resolver = (
self.graph.initialize_schema_resolver_from_datahub(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
)
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.urns = None
else:
self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)
self.urns = None

self.builder: SqlParsingBuilder = SqlParsingBuilder(
usage_config=self.config.usage
if self.config.include_usage_statistics
else None,
generate_lineage=self.config.include_table_lineage,
generate_usage_statistics=self.config.include_usage_statistics,
generate_operations=self.config.usage.include_operational_stats,
)

@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
engine = self.get_metadata_engine()
for entry in engine.execute(
self.LINEAGE_QUERY.format(
start_time=self.config.start_time, end_time=self.config.end_time
)
):
self.report.num_queries_parsed += 1
if self.report.num_queries_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} queries")

result = sqlglot_lineage(
sql=entry.query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=entry.default_database
if entry.default_database
else self.config.default_db,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
continue

yield from self.builder.process_sql_parsing_result(
result,
query=entry.query,
query_timestamp=entry.timestamp,
user=f"urn:li:corpuser:{entry.user}",
include_urns=self.urns,
)

def get_metadata_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
return create_engine(url, **self.config.options)

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps()
yield from self.builder.gen_workunits()
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ def assert_sql_result(
sql: str,
*,
dialect: str,
platform_instance: Optional[str] = None,
expected_file: pathlib.Path,
schemas: Optional[Dict[str, SchemaInfo]] = None,
**kwargs: Any,
) -> None:
schema_resolver = SchemaResolver(platform=dialect)
schema_resolver = SchemaResolver(
platform=dialect, platform_instance=platform_instance
)
if schemas:
for urn, schema in schemas.items():
schema_resolver.add_raw_schema_info(urn, schema)
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ def _column_level_lineage( # noqa: C901
# Our snowflake source lowercases column identifiers, so we are forced
# to do fuzzy (case-insensitive) resolution instead of exact resolution.
"snowflake",
# Teradata column names are case-insensitive.
# A name, even when enclosed in double quotation marks, is not case sensitive. For example, CUSTOMER and Customer are the same.
# See more below:
# https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/acreldb/n0ejgx4895bofnn14rlguktfx5r3.htm
"teradata",
}

sqlglot_db_schema = sqlglot.MappingSchema(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"query_type": "CREATE",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)"
],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)"
],
"column_lineage": [
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)",
"column": "PatientId",
"native_column_type": "INTEGER()"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)",
"column": "PatientId"
}
]
},
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)",
"column": "BMI",
"native_column_type": "FLOAT()"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)",
"column": "BMI"
}
]
}
]
}
Loading

0 comments on commit a8f0080

Please sign in to comment.