From 11bd9fe00ad6a86233a01116dfdad04a2471f1cc Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 9 Aug 2024 15:53:00 +0100 Subject: [PATCH 01/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 552 +++++++++++++++++- 1 file changed, 543 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 5c9c8f063a1a9..906f7ff2e2469 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -1,8 +1,53 @@ -from typing import Dict +import datetime +import re +import logging +import time -import pydantic +from typing import Optional, Dict, List, Iterable, Union, Tuple, Any +from pydantic.fields import Field +from concurrent.futures import ThreadPoolExecutor, as_completed + +import xml.etree.ElementTree + +from sqlalchemy import create_engine, inspect +from sqlalchemy.engine.reflection import Inspector + +from datahub.configuration.common import AllowDenyPattern +from datahub.sql_parsing.sql_parsing_common import QueryType +from datahub.utilities.urns.corpuser_urn import CorpuserUrn +from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance, make_data_platform_urn, make_user_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.usage.usage_common import BaseUsageConfig +from datahub.metadata.schema_classes import ( + DatasetLineageTypeClass, + SchemaMetadataClass, + SchemaFieldClass, + ViewPropertiesClass, + MySqlDDLClass, + AuditStampClass, + BooleanTypeClass, + NumberTypeClass, + StringTypeClass, + BytesTypeClass, + ArrayTypeClass, + RecordTypeClass, + DateTypeClass, + TimeTypeClass, + SchemaFieldDataTypeClass, + SubTypesClass, + TimeStampClass +) +from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator, KnownQueryLineageInfo from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig +from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, SqlWorkUnit, SQLSourceReport +from datahub.metadata.schema_classes import ( + DatasetPropertiesClass, + UpstreamClass, + UpstreamLineageClass, +) from datahub.ingestion.api.decorators import ( SourceCapability, SupportStatus, @@ -11,14 +56,77 @@ platform_name, support_status, ) -from datahub.ingestion.source.sql.sql_common import SQLAlchemySource -from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig +logger = logging.getLogger(__name__) + +HANA_TYPES_MAP: Dict[str, Any] = { + "BOOLEAN": BooleanTypeClass(), + "TINYINT": NumberTypeClass(), + "SMALLINT": NumberTypeClass(), + "INTEGER": NumberTypeClass(), + "BIGINT": NumberTypeClass(), + "SMALLDECIMAL": NumberTypeClass(), + "DECIMAL": NumberTypeClass(), + "REAL": NumberTypeClass(), + "DOUBLE": NumberTypeClass(), + + "VARCHAR": StringTypeClass(), + "NVARCHAR": StringTypeClass(), + "ALPHANUM": StringTypeClass(), + "SHORTTEXT": StringTypeClass(), + + "VARBINARY": BytesTypeClass(), + + "BLOB": BytesTypeClass(), + "CLOB": StringTypeClass(), + "NCLOB": StringTypeClass(), + "TEXT": StringTypeClass(), + + "ARRAY": ArrayTypeClass(), + + "ST_GEOMETRY": RecordTypeClass(), + "ST_POINT": RecordTypeClass(), + + "DATE": DateTypeClass(), + "TIME": TimeTypeClass(), + "SECONDDATE": TimeTypeClass(), + "TIMESTAMP": TimeTypeClass(), +} + + +def preprocess_sap_type(sap_type): + type_str = sap_type.__class__.__name__ + return re.sub(r'\(.*\)', '', type_str).strip() + + +def get_pegasus_type(sap_type): + processed_type = preprocess_sap_type(sap_type) + return HANA_TYPES_MAP.get(processed_type) + + +class BaseHanaConfig(BasicSQLAlchemyConfig): + scheme: str = Field(default="hana", hidden_from_docs=True) + schema_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern(deny=["sys"]) + ) + max_workers: int = Field( + default=5, + description="Maximum concurrent SQL connections to the SAP Hana instance." + ) -class HanaConfig(BasicSQLAlchemyConfig): - # Override defaults - host_port: str = pydantic.Field(default="localhost:39041") - scheme: str = pydantic.Field(default="hana+hdbcli") + +class HanaConfig(BaseHanaConfig): + database_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description=( + "Regex patterns for databases to filter in ingestion. " + "Note: this is not used if `database` or `sqlalchemy_uri` are provided." + ), + ) + database: Optional[str] = Field( + default=None, + description="database (catalog). If set to Null, all databases will be considered for ingestion.", + ) @platform_name("SAP HANA", id="hana") @@ -29,10 +137,436 @@ class HanaConfig(BasicSQLAlchemyConfig): @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") class HanaSource(SQLAlchemySource): + config: HanaConfig + report: SQLSourceReport = SQLSourceReport() + aggregator: SqlParsingAggregator + def __init__(self, config: HanaConfig, ctx: PipelineContext): - super().__init__(config, ctx, "hana") + super().__init__(config, ctx, self.get_platform()) + self.config = config + self.engine = self._create_engine() + self.aggregator = SqlParsingAggregator( + platform=self.get_platform(), + platform_instance=self.config.platform_instance if self.config.platform_instance else None, + env=self.config.env, + graph=ctx.graph, + generate_lineage=True, + generate_queries=True, + generate_usage_statistics=True, + generate_operations=True, + format_queries=True, + usage_config=BaseUsageConfig() + ) + + def get_platform(self): + return "hana" @classmethod def create(cls, config_dict: Dict, ctx: PipelineContext) -> "HanaSource": config = HanaConfig.parse_obj(config_dict) return cls(config, ctx) + + def _create_engine(self): + url = self.config.get_sql_alchemy_url() + return create_engine(url) + + def get_table_properties(self, inspector: Inspector, schema: str, table: str) -> Optional[DatasetPropertiesClass]: + description = inspector.get_table_comment(table, schema).get("text", "") + return DatasetPropertiesClass(name=table, description=description) + + def get_view_properties(self, schema: str, view: str) -> Tuple[str, str, datetime.datetime, str]: + query = f""" + SELECT DEFINITION, + COMMENTS, + CREATE_TIME, + VIEW_TYPE + FROM SYS.VIEWS + WHERE LOWER(SCHEMA_NAME) = '{schema}' AND LOWER(VIEW_NAME) = '{view}' + """ + result = self.engine.execute(query).fetchone() + return result if result else ("", "", datetime.datetime(year=1970, month=1, day=1), "") + + def get_view_info(self, view: str, definition_and_description: Tuple) -> Optional[DatasetPropertiesClass]: + return DatasetPropertiesClass( + name=view, + description=definition_and_description[1], + created=TimeStampClass( + time=int(definition_and_description[2].timestamp())*1000 + ), + customProperties={ + "View Type": definition_and_description[3], + } + ) + + def get_lineage(self, schema: str, table_or_view: str) -> list: + query = f""" + SELECT LOWER(BASE_SCHEMA_NAME) as BASE_SCHEMA_NAME, + LOWER(BASE_OBJECT_NAME) AS BASE_OBJECT_NAME + FROM SYS.OBJECT_DEPENDENCIES + WHERE LOWER(DEPENDENT_SCHEMA_NAME) = '{schema.lower()}' AND LOWER(DEPENDENT_OBJECT_NAME) = '{table_or_view.lower()}' + """ + return self.engine.execute(query).fetchall() + + def construct_lineage(self, upstreams: List[Tuple[str, str]]) -> Optional[UpstreamLineageClass]: + + if not upstreams: + return None + + upstream = [ + UpstreamClass( + dataset=make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", + platform_instance=self.config.platform_instance, + env=self.config.env + ), + type=DatasetLineageTypeClass.VIEW, + ) + for row in upstreams + ] + return UpstreamLineageClass(upstreams=upstream) + + def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> List[SchemaFieldClass]: + columns = inspector.get_columns(table_or_view, schema) + return [ + SchemaFieldClass( + fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('type'))}].{col.get('name')}", + type=SchemaFieldDataTypeClass(type=get_pegasus_type(col.get('type'))), + nativeDataType=preprocess_sap_type(col.get('type')), + description=col.get('comment', "") + ) + for col in columns + ] + + def get_query_logs(self) -> List[Dict]: + query = f""" + SELECT STATEMENT_STRING, + USER_NAME, LAST_EXECUTION_TIMESTAMP + FROM SYS.M_SQL_PLAN_CACHE""" + result = self.engine.execute(query).fetchall() + return [dict(row) for row in result] + + def get_package_names(self) -> List[dict]: + query = f""" + SELECT + PACKAGE_ID, + OBJECT_NAME, + CDATA + FROM _SYS_REPO.ACTIVE_OBJECT + WHERE OBJECT_SUFFIX='calculationview'""" + result = self.engine.execute(query).fetchall() + return [dict(row) for row in result] if result else [] + + def get_schema_names(self, inspector): + return inspector.get_schema_names() + + def add_information_for_schema(self, inspector: Inspector, schema: str) -> None: + pass + + def get_allowed_schemas(self, inspector: Inspector, db_name: str) -> Iterable[str]: + # this function returns the schema names which are filtered by schema_pattern. + for schema in self.get_schema_names(inspector): + if not self.config.schema_pattern.allowed(schema): + self.report.report_dropped(f"{schema}.*") + continue + else: + self.add_information_for_schema(inspector, schema) + yield schema + + def get_workunits_internal(self): + inspector = inspect(self.engine) + schemas = self.get_allowed_schemas(inspector, self.config.database) + with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: + futures = [] + + for schema in schemas: + if schema.lower() == "_sys_bic": + views = self.get_package_names() + for view in views: + futures.append( + executor.submit( + self._process_calculation_view, + dataset_path=view.get("PACKAGE_ID"), + dataset_name=view.get("OBJECT_NAME"), + dataset_definition=view.get("CDATA"), + inspector=inspector, + schema=schema, + ) + ) + + else: + tables = inspector.get_table_names(schema) + for table in tables: + futures.append( + executor.submit( + self._process_table, + dataset_name=table, + inspector=inspector, + schema=schema, + ) + ) + + views = inspector.get_view_names(schema) + for view in views: + futures.append( + executor.submit( + self._process_view, + dataset_name=view, + inspector=inspector, + schema=schema + ) + ) + + queries = self.get_query_logs() + for query in queries: + self.aggregator.add_observed_query( + query=query.get("statement_string"), + query_timestamp=query.get("last_execution_timestamp"), + user=CorpuserUrn( + make_user_urn( + query.get("user_name") + ) + ), + ) + + for future in as_completed(futures): + yield from future.result() + + for mcp in self.aggregator.gen_metadata(): + self.report.report_workunit(mcp.as_workunit()) + yield mcp.as_workunit() + + def _process_table(self, + dataset_name: str, + inspector: Inspector, + schema: str, + ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: + entity = make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", + platform_instance=self.config.platform_instance, + env=self.config.env + ) + description = self.get_table_properties(inspector=inspector, schema=schema, table=dataset_name) + columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) + schema_metadata = SchemaMetadataClass( + schemaName=f"{schema}.{dataset_name}", + platform=make_data_platform_urn(self.get_platform()), + version=0, + fields=columns, + hash="", + platformSchema=MySqlDDLClass(""), + lastModified=AuditStampClass( + int(time.time() * 1000), "urn:li:corpuser:admin" + ), + ) + + dataset_snapshot = MetadataChangeProposalWrapper.construct_many( + entityUrn=entity, + aspects=[ + description, + schema_metadata, + ] + ) + + self.aggregator.register_schema( + urn=entity, + schema=schema_metadata + ) + + for mcp in dataset_snapshot: + self.report.report_workunit(mcp.as_workunit()) + yield mcp.as_workunit() + + def _process_view(self, + dataset_name: str, + inspector: Inspector, + schema: str, + ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: + entity = make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", + platform_instance=self.config.platform_instance, + env=self.config.env + ) + + view_details = self.get_view_properties(schema=schema, view=dataset_name) + view_definition = view_details[0] + properties = self.get_view_info(view=dataset_name, definition_and_description=view_details) + + columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) + constructed_lineage = self.get_lineage(schema=schema, table_or_view=dataset_name) + lineage = self.construct_lineage(upstreams=constructed_lineage) + subtype = SubTypesClass(["View"]) + + schema_metadata = SchemaMetadataClass( + schemaName=f"{schema}.{dataset_name}", + platform=make_data_platform_urn(self.get_platform()), + version=0, + fields=columns, + hash="", + platformSchema=MySqlDDLClass(""), + lastModified=AuditStampClass( + int(time.time() * 1000), "urn:li:corpuser:admin" + ), + ) + + view_properties = ViewPropertiesClass( + materialized=False, + viewLanguage="SQL", + viewLogic=view_definition, + ) + + aspects = [ + properties, + schema_metadata, + view_properties, + subtype + ] + + if lineage: + aspects.append(lineage) + + dataset_snapshot = MetadataChangeProposalWrapper.construct_many( + entityUrn=entity, + aspects=aspects + ) + + self.aggregator.register_schema( + urn=entity, + schema=schema_metadata + ) + + self.aggregator.add_view_definition( + view_urn=entity, + view_definition=view_definition, + default_db=self.config.database if self.config.database else "", + default_schema=schema + ) + + if constructed_lineage: + self.aggregator.add_known_query_lineage( + KnownQueryLineageInfo( + query_text=view_definition, + upstreams=[make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", + platform_instance=self.config.platform_instance, + env=self.config.env + ) for row in constructed_lineage], + downstream=entity, + query_type=QueryType.SELECT + ), + merge_lineage=True + ) + + for mcp in dataset_snapshot: + self.report.report_workunit(mcp.as_workunit()) + yield mcp.as_workunit() + + def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, dataset_path: str, dataset_name: str): + upstreams = [] + data_sources = root.find('dataSources', ns) + + try: + if data_sources: + for data_source in data_sources.findall('DataSource', ns): + data_source_type = data_source.get('type') + if data_source_type == "CALCULATION_VIEW": + upstreams.append(f"_sys_bic.{data_source.find('resourceUri', ns).text}") + else: + column_object = data_source.find('columnObject', ns) + upstreams.append(f"{column_object.get('schemaName')}.{column_object.get('columnObjectName')}") + + except Exception as e: + logging.warning( + f"No lineage found for Calculation View {dataset_path}/{dataset_path}. Parsing error: {e}" + ) + + if not upstreams: + return None + + upstream = [ + UpstreamClass( + dataset=make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database + '.') if self.config.database else ''}{row.lower()}", + platform_instance=self.config.platform_instance, + env=self.config.env + ), + type=DatasetLineageTypeClass.VIEW, + ) + for row in upstreams + ] + return UpstreamLineageClass(upstreams=upstream) + + + + def _process_calculation_view( + self, + dataset_path: str, + dataset_name: str, + dataset_definition: str, + inspector: Inspector, + schema: str + ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: + entity = make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database + '.') if self.config.database else ''}{dataset_path}.{dataset_name}", + platform_instance=self.config.platform_instance, + env=self.config.env + ) + root = xml.etree.ElementTree.fromstring(dataset_definition) + ns = { + 'Calculation': 'http://www.sap.com/ndb/BiModelCalculation.ecore', + 'xsi': 'http://www.w3.org/2001/XMLSchema-instance' + } + lineage = self.get_calculation_view_lineage(root=root, ns=ns, dataset_path=dataset_path, dataset_name=dataset_name) + + columns = self.get_columns(inspector=inspector, schema="SFLIGHT", table_or_view="SFLIGHT") + + schema_metadata = SchemaMetadataClass( + schemaName=f"{schema}.{dataset_name}", + platform=make_data_platform_urn(self.get_platform()), + version=0, + fields=columns, + hash="", + platformSchema=MySqlDDLClass(""), + lastModified=AuditStampClass( + int(time.time() * 1000), "urn:li:corpuser:admin" + ), + ) + + dataset_details = DatasetPropertiesClass(name=dataset_name) + + view_properties = ViewPropertiesClass( + materialized=False, + viewLanguage="XML", + viewLogic=dataset_definition, + ) + + subtype = SubTypesClass(["View"]) + + aspects = [ + schema_metadata, + subtype, + dataset_details, + view_properties + ] + + if lineage: + aspects.append(lineage) + + dataset_snapshot = MetadataChangeProposalWrapper.construct_many( + entityUrn=entity, + aspects=aspects + ) + + self.aggregator.register_schema( + urn=entity, + schema=schema_metadata + ) + + for mcp in dataset_snapshot: + self.report.report_workunit(mcp.as_workunit()) + yield mcp.as_workunit() From 9b9ec6c5c36d997dd86bbf6b92a08e7def29506d Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 9 Aug 2024 15:55:10 +0100 Subject: [PATCH 02/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 906f7ff2e2469..500bb133ffc97 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -523,7 +523,7 @@ def _process_calculation_view( } lineage = self.get_calculation_view_lineage(root=root, ns=ns, dataset_path=dataset_path, dataset_name=dataset_name) - columns = self.get_columns(inspector=inspector, schema="SFLIGHT", table_or_view="SFLIGHT") + columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) schema_metadata = SchemaMetadataClass( schemaName=f"{schema}.{dataset_name}", From 0aa5d6f5ce7b37d1371c8cb8859a6c23193e6a1a Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Tue, 3 Sep 2024 10:38:28 +0100 Subject: [PATCH 03/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 57 +++++++++++++------ 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 500bb133ffc97..6bf02ca600980 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -16,7 +16,8 @@ from datahub.configuration.common import AllowDenyPattern from datahub.sql_parsing.sql_parsing_common import QueryType from datahub.utilities.urns.corpuser_urn import CorpuserUrn -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance, make_data_platform_urn, make_user_urn +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.usage.usage_common import BaseUsageConfig @@ -39,10 +40,20 @@ SubTypesClass, TimeStampClass ) -from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator, KnownQueryLineageInfo -from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig -from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, SqlWorkUnit, SQLSourceReport +from datahub.emitter.mce_builder import ( + make_dataset_urn_with_platform_instance, + make_data_platform_urn, + make_user_urn +) +from datahub.sql_parsing.sql_parsing_aggregator import ( + SqlParsingAggregator, + KnownQueryLineageInfo +) +from datahub.ingestion.source.sql.sql_common import ( + SQLAlchemySource, + SqlWorkUnit, + SQLSourceReport +) from datahub.metadata.schema_classes import ( DatasetPropertiesClass, UpstreamClass, @@ -111,7 +122,9 @@ class BaseHanaConfig(BasicSQLAlchemyConfig): ) max_workers: int = Field( default=5, - description="Maximum concurrent SQL connections to the SAP Hana instance." + description=( + "Maximum concurrent SQL connections to the SAP Hana instance." + ), ) @@ -125,7 +138,9 @@ class HanaConfig(BaseHanaConfig): ) database: Optional[str] = Field( default=None, - description="database (catalog). If set to Null, all databases will be considered for ingestion.", + description=( + "database (catalog). If set to Null, all databases will be considered for ingestion." + ), ) @@ -191,7 +206,7 @@ def get_view_info(self, view: str, definition_and_description: Tuple) -> Optiona name=view, description=definition_and_description[1], created=TimeStampClass( - time=int(definition_and_description[2].timestamp())*1000 + time=int(definition_and_description[2].timestamp()) * 1000 ), customProperties={ "View Type": definition_and_description[3], @@ -203,7 +218,8 @@ def get_lineage(self, schema: str, table_or_view: str) -> list: SELECT LOWER(BASE_SCHEMA_NAME) as BASE_SCHEMA_NAME, LOWER(BASE_OBJECT_NAME) AS BASE_OBJECT_NAME FROM SYS.OBJECT_DEPENDENCIES - WHERE LOWER(DEPENDENT_SCHEMA_NAME) = '{schema.lower()}' AND LOWER(DEPENDENT_OBJECT_NAME) = '{table_or_view.lower()}' + WHERE LOWER(DEPENDENT_SCHEMA_NAME) = '{schema.lower()}' + AND LOWER(DEPENDENT_OBJECT_NAME) = '{table_or_view.lower()}' """ return self.engine.execute(query).fetchall() @@ -239,7 +255,7 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> ] def get_query_logs(self) -> List[Dict]: - query = f""" + query = """ SELECT STATEMENT_STRING, USER_NAME, LAST_EXECUTION_TIMESTAMP FROM SYS.M_SQL_PLAN_CACHE""" @@ -247,7 +263,7 @@ def get_query_logs(self) -> List[Dict]: return [dict(row) for row in result] def get_package_names(self) -> List[dict]: - query = f""" + query = """ SELECT PACKAGE_ID, OBJECT_NAME, @@ -263,7 +279,7 @@ def get_schema_names(self, inspector): def add_information_for_schema(self, inspector: Inspector, schema: str) -> None: pass - def get_allowed_schemas(self, inspector: Inspector, db_name: str) -> Iterable[str]: + def get_allowed_schemas(self, inspector: Inspector) -> Iterable[str]: # this function returns the schema names which are filtered by schema_pattern. for schema in self.get_schema_names(inspector): if not self.config.schema_pattern.allowed(schema): @@ -422,7 +438,7 @@ def _process_view(self, schema_metadata, view_properties, subtype - ] + ] if lineage: aspects.append(lineage) @@ -500,8 +516,6 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da ] return UpstreamLineageClass(upstreams=upstream) - - def _process_calculation_view( self, dataset_path: str, @@ -516,12 +530,21 @@ def _process_calculation_view( platform_instance=self.config.platform_instance, env=self.config.env ) - root = xml.etree.ElementTree.fromstring(dataset_definition) + + try: + logging.info(f"Dataset definition for {dataset_path}.{dataset_name}: {dataset_definition}") + root = xml.etree.ElementTree.fromstring(dataset_definition) + except Exception as e: + logging.error(e) + root = None + ns = { 'Calculation': 'http://www.sap.com/ndb/BiModelCalculation.ecore', 'xsi': 'http://www.w3.org/2001/XMLSchema-instance' } - lineage = self.get_calculation_view_lineage(root=root, ns=ns, dataset_path=dataset_path, dataset_name=dataset_name) + if root: + lineage = self.get_calculation_view_lineage(root=root, ns=ns, dataset_path=dataset_path, + dataset_name=dataset_name) columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) From 53b80f53694c3d52a5ea29b0205845ac43949079 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 09:41:02 +0100 Subject: [PATCH 04/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 6bf02ca600980..ff65b1adeaccb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -291,7 +291,9 @@ def get_allowed_schemas(self, inspector: Inspector) -> Iterable[str]: def get_workunits_internal(self): inspector = inspect(self.engine) - schemas = self.get_allowed_schemas(inspector, self.config.database) + schemas = self.get_allowed_schemas( + inspector=inspector, + ) with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: futures = [] From 1e744d6f4ecc5af5b692d4401c9dde507194a5d3 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 10:21:05 +0100 Subject: [PATCH 05/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index ff65b1adeaccb..dec8acd47f31c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -13,6 +13,7 @@ from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector +import datahub.sql_parsing.sqlglot_utils from datahub.configuration.common import AllowDenyPattern from datahub.sql_parsing.sql_parsing_common import QueryType from datahub.utilities.urns.corpuser_urn import CorpuserUrn @@ -70,6 +71,8 @@ logger = logging.getLogger(__name__) +datahub.sql_parsing.sqlglot_utils._get_dialect_str = "tsql" + HANA_TYPES_MAP: Dict[str, Any] = { "BOOLEAN": BooleanTypeClass(), "TINYINT": NumberTypeClass(), From 3b3040dd3182c811c6774c811893d8869aabc6ab Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 11:10:03 +0100 Subject: [PATCH 06/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index dec8acd47f31c..9862f9dae1239 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -71,8 +71,6 @@ logger = logging.getLogger(__name__) -datahub.sql_parsing.sqlglot_utils._get_dialect_str = "tsql" - HANA_TYPES_MAP: Dict[str, Any] = { "BOOLEAN": BooleanTypeClass(), "TINYINT": NumberTypeClass(), @@ -598,3 +596,9 @@ def _process_calculation_view( for mcp in dataset_snapshot: self.report.report_workunit(mcp.as_workunit()) yield mcp.as_workunit() + +def _sql_dialect(platform: str) -> str: + return "tsql" + + +datahub.sql_parsing.sqlglot_utils._get_dialect_str = _sql_dialect From 1719d91f55a9409653c1c1444d6305fa3e055276 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 11:48:49 +0100 Subject: [PATCH 07/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 9862f9dae1239..995edc784c1e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -272,7 +272,9 @@ def get_package_names(self) -> List[dict]: FROM _SYS_REPO.ACTIVE_OBJECT WHERE OBJECT_SUFFIX='calculationview'""" result = self.engine.execute(query).fetchall() - return [dict(row) for row in result] if result else [] + packages = [dict(row) for row in result] if result else [] + log.info(packages) + return packages def get_schema_names(self, inspector): return inspector.get_schema_names() From d9a446045983d97c3903eca7e9dc9f3d3741a77e Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 11:56:29 +0100 Subject: [PATCH 08/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 995edc784c1e8..c706a791ac577 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -268,7 +268,7 @@ def get_package_names(self) -> List[dict]: SELECT PACKAGE_ID, OBJECT_NAME, - CDATA + TO_VARCHAR(CDATA) AS CDATA FROM _SYS_REPO.ACTIVE_OBJECT WHERE OBJECT_SUFFIX='calculationview'""" result = self.engine.execute(query).fetchall() From a3211bd31ca9a5b9bf0a9b848039342826ead1db Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 12:29:01 +0100 Subject: [PATCH 09/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index c706a791ac577..9825af19725a9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -270,10 +270,10 @@ def get_package_names(self) -> List[dict]: OBJECT_NAME, TO_VARCHAR(CDATA) AS CDATA FROM _SYS_REPO.ACTIVE_OBJECT - WHERE OBJECT_SUFFIX='calculationview'""" + WHERE LOWER(OBJECT_SUFFIX)='calculationview'""" result = self.engine.execute(query).fetchall() packages = [dict(row) for row in result] if result else [] - log.info(packages) + logging.info(packages) return packages def get_schema_names(self, inspector): @@ -551,53 +551,53 @@ def _process_calculation_view( lineage = self.get_calculation_view_lineage(root=root, ns=ns, dataset_path=dataset_path, dataset_name=dataset_name) - columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) - - schema_metadata = SchemaMetadataClass( - schemaName=f"{schema}.{dataset_name}", - platform=make_data_platform_urn(self.get_platform()), - version=0, - fields=columns, - hash="", - platformSchema=MySqlDDLClass(""), - lastModified=AuditStampClass( - int(time.time() * 1000), "urn:li:corpuser:admin" - ), - ) + columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) + + schema_metadata = SchemaMetadataClass( + schemaName=f"{schema}.{dataset_name}", + platform=make_data_platform_urn(self.get_platform()), + version=0, + fields=columns, + hash="", + platformSchema=MySqlDDLClass(""), + lastModified=AuditStampClass( + int(time.time() * 1000), "urn:li:corpuser:admin" + ), + ) - dataset_details = DatasetPropertiesClass(name=dataset_name) + dataset_details = DatasetPropertiesClass(name=dataset_name) - view_properties = ViewPropertiesClass( - materialized=False, - viewLanguage="XML", - viewLogic=dataset_definition, - ) + view_properties = ViewPropertiesClass( + materialized=False, + viewLanguage="XML", + viewLogic=dataset_definition, + ) - subtype = SubTypesClass(["View"]) + subtype = SubTypesClass(["View"]) - aspects = [ - schema_metadata, - subtype, - dataset_details, - view_properties - ] + aspects = [ + schema_metadata, + subtype, + dataset_details, + view_properties + ] - if lineage: - aspects.append(lineage) + if lineage: + aspects.append(lineage) - dataset_snapshot = MetadataChangeProposalWrapper.construct_many( - entityUrn=entity, - aspects=aspects - ) + dataset_snapshot = MetadataChangeProposalWrapper.construct_many( + entityUrn=entity, + aspects=aspects + ) - self.aggregator.register_schema( - urn=entity, - schema=schema_metadata - ) + self.aggregator.register_schema( + urn=entity, + schema=schema_metadata + ) - for mcp in dataset_snapshot: - self.report.report_workunit(mcp.as_workunit()) - yield mcp.as_workunit() + for mcp in dataset_snapshot: + self.report.report_workunit(mcp.as_workunit()) + yield mcp.as_workunit() def _sql_dialect(platform: str) -> str: return "tsql" From 0ff784789b480831265317626ced26fab81d3911 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 13:17:30 +0100 Subject: [PATCH 10/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 9825af19725a9..8b1b069bf5699 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -248,9 +248,9 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> return [ SchemaFieldClass( fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('type'))}].{col.get('name')}", - type=SchemaFieldDataTypeClass(type=get_pegasus_type(col.get('type'))), - nativeDataType=preprocess_sap_type(col.get('type')), - description=col.get('comment', "") + type=SchemaFieldDataTypeClass(type=get_pegasus_type(col.get("type"))), + nativeDataType=preprocess_sap_type(col.get("type")), + description=col.get("comment", "") ) for col in columns ] @@ -307,9 +307,9 @@ def get_workunits_internal(self): futures.append( executor.submit( self._process_calculation_view, - dataset_path=view.get("PACKAGE_ID"), - dataset_name=view.get("OBJECT_NAME"), - dataset_definition=view.get("CDATA"), + dataset_path=view.get("package_id"), + dataset_name=view.get("object_name"), + dataset_definition=view.get("cdata"), inspector=inspector, schema=schema, ) @@ -487,16 +487,16 @@ def _process_view(self, def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, dataset_path: str, dataset_name: str): upstreams = [] - data_sources = root.find('dataSources', ns) + data_sources = root.find("dataSources", ns) try: if data_sources: - for data_source in data_sources.findall('DataSource', ns): - data_source_type = data_source.get('type') - if data_source_type == "CALCULATION_VIEW": + for data_source in data_sources.findall("DataSource", ns): + data_source_type = data_source.get("type") + if data_source_type.upper() == "CALCULATION_VIEW": upstreams.append(f"_sys_bic.{data_source.find('resourceUri', ns).text}") else: - column_object = data_source.find('columnObject', ns) + column_object = data_source.find("columnObject", ns) upstreams.append(f"{column_object.get('schemaName')}.{column_object.get('columnObjectName')}") except Exception as e: @@ -544,8 +544,8 @@ def _process_calculation_view( root = None ns = { - 'Calculation': 'http://www.sap.com/ndb/BiModelCalculation.ecore', - 'xsi': 'http://www.w3.org/2001/XMLSchema-instance' + "Calculation": "http://www.sap.com/ndb/BiModelCalculation.ecore", + "xsi": "http://www.w3.org/2001/XMLSchema-instance" } if root: lineage = self.get_calculation_view_lineage(root=root, ns=ns, dataset_path=dataset_path, From 2d6ef59b37d14678baeddceb8199dcf8fbaa7fce Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 13:52:25 +0100 Subject: [PATCH 11/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 8b1b069bf5699..f03e28c511b29 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -548,10 +548,18 @@ def _process_calculation_view( "xsi": "http://www.w3.org/2001/XMLSchema-instance" } if root: - lineage = self.get_calculation_view_lineage(root=root, ns=ns, dataset_path=dataset_path, - dataset_name=dataset_name) + lineage = self.get_calculation_view_lineage( + root=root, + ns=ns, + dataset_path=dataset_path, + dataset_name=dataset_name + ) - columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) + columns = self.get_columns( + inspector=inspector, + schema=schema, + table_or_view=f"{dataset_path}.{dataset_name}" + ) schema_metadata = SchemaMetadataClass( schemaName=f"{schema}.{dataset_name}", From 5e174012079a555d64e2dfb17228c302b2d5455d Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 13:55:07 +0100 Subject: [PATCH 12/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index f03e28c511b29..5eff7966dfa4b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -558,7 +558,7 @@ def _process_calculation_view( columns = self.get_columns( inspector=inspector, schema=schema, - table_or_view=f"{dataset_path}.{dataset_name}" + table_or_view=f"{dataset_path}/{dataset_name}" ) schema_metadata = SchemaMetadataClass( From d947ae9af6ee472a87848010a561f1cdb5f08149 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 16:31:29 +0100 Subject: [PATCH 13/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 5eff7966dfa4b..805b070462277 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -248,13 +248,44 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> return [ SchemaFieldClass( fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('type'))}].{col.get('name')}", - type=SchemaFieldDataTypeClass(type=get_pegasus_type(col.get("type"))), - nativeDataType=preprocess_sap_type(col.get("type")), + type=SchemaFieldDataTypeClass( + type=get_pegasus_type( + col.get("type") + ) + ), + nativeDataType=preprocess_sap_type( + col.get("type") + ), description=col.get("comment", "") ) for col in columns ] + def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: + query = f""" + SELECT PROPERTY_NAME, + PROPERTY_CAPTION, + DATA_TYPE + FROM "_SYS_BI"."BIMC_PROPERTIES" + WHERE "SCHEMA_NAME" = '_SYS_BIC' + AND "PROPERTY_TYPE" IN ('1', '2', '6', '7') + AND LOWER(SCHEMA_NAME) = '_sys_bic' AND LOWER(CUBE_NAME) = '{view.lower()}' + """ + query_result = [dict(row) for row in self.engine.execute(query).fetchall()] + logger.info(f"View: {view.lower()} definition: {query_result}") + + columns = query_result + + return [ + SchemaFieldClass( + fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('data_type'))}].{col.get('property_name')}", + type=SchemaFieldDataTypeClass(type=get_pegasus_type(col.get('data_type'))), + nativeDataType=preprocess_sap_type(col.get('data_type')), + description=col.get('property_caption', "") + ) + for col in columns + ] + def get_query_logs(self) -> List[Dict]: query = """ SELECT STATEMENT_STRING, @@ -555,10 +586,8 @@ def _process_calculation_view( dataset_name=dataset_name ) - columns = self.get_columns( - inspector=inspector, - schema=schema, - table_or_view=f"{dataset_path}/{dataset_name}" + columns = self.get_calculation_view_columns( + view=f"{dataset_path}/{dataset_name}" ) schema_metadata = SchemaMetadataClass( From 8dd805c7b75aea5fe7abf10bd462c48318438de8 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 17:30:02 +0100 Subject: [PATCH 14/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 805b070462277..73687680fb9b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -263,9 +263,9 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: query = f""" - SELECT PROPERTY_NAME, - PROPERTY_CAPTION, - DATA_TYPE + SELECT PROPERTY_NAME as property_name, + PROPERTY_CAPTION as property_caption, + DATA_TYPE as data_type FROM "_SYS_BI"."BIMC_PROPERTIES" WHERE "SCHEMA_NAME" = '_SYS_BIC' AND "PROPERTY_TYPE" IN ('1', '2', '6', '7') From c9c3de5f8c09c538f00e80d3aad213264f261c8a Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 9 Sep 2024 19:42:00 +0100 Subject: [PATCH 15/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 73687680fb9b3..1a2427d9827d8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -263,13 +263,13 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: query = f""" - SELECT PROPERTY_NAME as property_name, - PROPERTY_CAPTION as property_caption, - DATA_TYPE as data_type - FROM "_SYS_BI"."BIMC_PROPERTIES" + SELECT COLUMN_NAME as column_name, + COMMENTS as comments, + DATA_TYPE_NAME as data_type + FROM SYS.VIEW_COLUMNS WHERE "SCHEMA_NAME" = '_SYS_BIC' - AND "PROPERTY_TYPE" IN ('1', '2', '6', '7') - AND LOWER(SCHEMA_NAME) = '_sys_bic' AND LOWER(CUBE_NAME) = '{view.lower()}' + AND LOWER(VIEW_NAME) = '{view.lower()}' + ORDER BY POSITION ASC """ query_result = [dict(row) for row in self.engine.execute(query).fetchall()] logger.info(f"View: {view.lower()} definition: {query_result}") @@ -278,10 +278,10 @@ def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: return [ SchemaFieldClass( - fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('data_type'))}].{col.get('property_name')}", + fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('data_type'))}].{col.get('column_name')}", type=SchemaFieldDataTypeClass(type=get_pegasus_type(col.get('data_type'))), nativeDataType=preprocess_sap_type(col.get('data_type')), - description=col.get('property_caption', "") + description=col.get('comments', "") ) for col in columns ] @@ -500,12 +500,14 @@ def _process_view(self, self.aggregator.add_known_query_lineage( KnownQueryLineageInfo( query_text=view_definition, - upstreams=[make_dataset_urn_with_platform_instance( - platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", - platform_instance=self.config.platform_instance, - env=self.config.env - ) for row in constructed_lineage], + upstreams=[ + make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", + platform_instance=self.config.platform_instance, + env=self.config.env + ) for row in constructed_lineage + ], downstream=entity, query_type=QueryType.SELECT ), @@ -542,7 +544,7 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{row.lower()}", + name=row.lower(), platform_instance=self.config.platform_instance, env=self.config.env ), @@ -562,7 +564,7 @@ def _process_calculation_view( ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{dataset_path}.{dataset_name}", + name=f"_sys_bic.{dataset_path}.{dataset_name}", platform_instance=self.config.platform_instance, env=self.config.env ) From b590d8d563b13c3c63288e48ccf44667bcc0f03d Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Tue, 10 Sep 2024 09:50:50 +0100 Subject: [PATCH 16/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 91 +++++++++++-------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 1a2427d9827d8..b2108c2fe8e96 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -3,7 +3,8 @@ import logging import time -from typing import Optional, Dict, List, Iterable, Union, Tuple, Any +from typing import Optional, Dict, List, Iterable, Union, Tuple, Any, Collection + from pydantic.fields import Field from concurrent.futures import ThreadPoolExecutor, as_completed @@ -27,7 +28,7 @@ SchemaMetadataClass, SchemaFieldClass, ViewPropertiesClass, - MySqlDDLClass, + OtherSchemaClass, AuditStampClass, BooleanTypeClass, NumberTypeClass, @@ -106,14 +107,14 @@ } -def preprocess_sap_type(sap_type): - type_str = sap_type.__class__.__name__ - return re.sub(r'\(.*\)', '', type_str).strip() - - -def get_pegasus_type(sap_type): - processed_type = preprocess_sap_type(sap_type) - return HANA_TYPES_MAP.get(processed_type) +# def preprocess_sap_type(sap_type): +# type_str = sap_type.__class__.__name__ +# return re.sub(r'\(.*\)', '', type_str).strip() +# +# +# def get_pegasus_type(sap_type): +# processed_type = preprocess_sap_type(sap_type) +# return HANA_TYPES_MAP.get(processed_type) class BaseHanaConfig(BasicSQLAlchemyConfig): @@ -161,6 +162,7 @@ def __init__(self, config: HanaConfig, ctx: PipelineContext): super().__init__(config, ctx, self.get_platform()) self.config = config self.engine = self._create_engine() + # self.discovered_tables: Optional[Collection[str]] = None self.aggregator = SqlParsingAggregator( platform=self.get_platform(), platform_instance=self.config.platform_instance if self.config.platform_instance else None, @@ -171,7 +173,8 @@ def __init__(self, config: HanaConfig, ctx: PipelineContext): generate_usage_statistics=True, generate_operations=True, format_queries=True, - usage_config=BaseUsageConfig() + usage_config=BaseUsageConfig(), + # is_allowed_table=self.is_allowed_table, ) def get_platform(self): @@ -247,16 +250,16 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> columns = inspector.get_columns(table_or_view, schema) return [ SchemaFieldClass( - fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('type'))}].{col.get('name')}", + fieldPath=f"[version=2.0].[type={col.get('data_type_name')}].{col.get('column_name')}", type=SchemaFieldDataTypeClass( - type=get_pegasus_type( - col.get("type") + type=HANA_TYPES_MAP.get( + col.get('data_type_name') ) ), - nativeDataType=preprocess_sap_type( - col.get("type") - ), - description=col.get("comment", "") + nativeDataType=col.get('data_type_name'), + description=col.get("comment", ""), + nullable=col.get("is_nullable"), + ) for col in columns ] @@ -265,7 +268,8 @@ def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: query = f""" SELECT COLUMN_NAME as column_name, COMMENTS as comments, - DATA_TYPE_NAME as data_type + DATA_TYPE_NAME as data_type_name, + IS_NULLABLE as is_nullable FROM SYS.VIEW_COLUMNS WHERE "SCHEMA_NAME" = '_SYS_BIC' AND LOWER(VIEW_NAME) = '{view.lower()}' @@ -278,10 +282,15 @@ def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: return [ SchemaFieldClass( - fieldPath=f"[version=2.0].[type={preprocess_sap_type(col.get('data_type'))}].{col.get('column_name')}", - type=SchemaFieldDataTypeClass(type=get_pegasus_type(col.get('data_type'))), - nativeDataType=preprocess_sap_type(col.get('data_type')), - description=col.get('comments', "") + fieldPath=f"[version=2.0].[type={col.get('data_type_name')}].{col.get('column_name')}", + type=SchemaFieldDataTypeClass( + type=HANA_TYPES_MAP.get( + col.get('data_type_name') + ) + ), + nativeDataType=col.get('data_type_name'), + description=col.get('comments', ""), + nullable=col.get("is_nullable"), ) for col in columns ] @@ -407,10 +416,11 @@ def _process_table(self, version=0, fields=columns, hash="", - platformSchema=MySqlDDLClass(""), - lastModified=AuditStampClass( - int(time.time() * 1000), "urn:li:corpuser:admin" - ), + platformSchema=OtherSchemaClass(""), + # lastModified=AuditStampClass( + # time=int(time.time() * 1000), + # actor="urn:li:corpuser:admin", + # ), ) dataset_snapshot = MetadataChangeProposalWrapper.construct_many( @@ -423,7 +433,7 @@ def _process_table(self, self.aggregator.register_schema( urn=entity, - schema=schema_metadata + schema=schema_metadata, ) for mcp in dataset_snapshot: @@ -457,10 +467,11 @@ def _process_view(self, version=0, fields=columns, hash="", - platformSchema=MySqlDDLClass(""), - lastModified=AuditStampClass( - int(time.time() * 1000), "urn:li:corpuser:admin" - ), + platformSchema=OtherSchemaClass(""), + # lastModified=AuditStampClass( + # time=int(time.time() * 1000), + # actor="urn:li:corpuser:admin", + # ), ) view_properties = ViewPropertiesClass( @@ -473,7 +484,7 @@ def _process_view(self, properties, schema_metadata, view_properties, - subtype + subtype, ] if lineage: @@ -511,7 +522,7 @@ def _process_view(self, downstream=entity, query_type=QueryType.SELECT ), - merge_lineage=True + merge_lineage=True, ) for mcp in dataset_snapshot: @@ -598,10 +609,11 @@ def _process_calculation_view( version=0, fields=columns, hash="", - platformSchema=MySqlDDLClass(""), - lastModified=AuditStampClass( - int(time.time() * 1000), "urn:li:corpuser:admin" - ), + platformSchema=OtherSchemaClass(""), + # lastModified=AuditStampClass( + # time=int(time.time() * 1000), + # actor="urn:li:corpuser:admin", + # ), ) dataset_details = DatasetPropertiesClass(name=dataset_name) @@ -618,7 +630,7 @@ def _process_calculation_view( schema_metadata, subtype, dataset_details, - view_properties + view_properties, ] if lineage: @@ -638,6 +650,7 @@ def _process_calculation_view( self.report.report_workunit(mcp.as_workunit()) yield mcp.as_workunit() + def _sql_dialect(platform: str) -> str: return "tsql" From 554a2f97cab510b17e3ac21ab38244f1a37bc05f Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Tue, 10 Sep 2024 11:05:10 +0100 Subject: [PATCH 17/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index b2108c2fe8e96..6a9a48c0b9316 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -258,7 +258,7 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> ), nativeDataType=col.get('data_type_name'), description=col.get("comment", ""), - nullable=col.get("is_nullable"), + nullable=bool(col.get("is_nullable")), ) for col in columns @@ -290,7 +290,7 @@ def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: ), nativeDataType=col.get('data_type_name'), description=col.get('comments', ""), - nullable=col.get("is_nullable"), + nullable=bool(col.get("is_nullable")), ) for col in columns ] From 53853a74a9c384e27890cd3c7f437612a4201afc Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Tue, 10 Sep 2024 12:08:45 +0100 Subject: [PATCH 18/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 28 +------------------ 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 6a9a48c0b9316..4efa07716b3ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -1,7 +1,5 @@ import datetime -import re import logging -import time from typing import Optional, Dict, List, Iterable, Union, Tuple, Any, Collection @@ -29,7 +27,6 @@ SchemaFieldClass, ViewPropertiesClass, OtherSchemaClass, - AuditStampClass, BooleanTypeClass, NumberTypeClass, StringTypeClass, @@ -107,16 +104,6 @@ } -# def preprocess_sap_type(sap_type): -# type_str = sap_type.__class__.__name__ -# return re.sub(r'\(.*\)', '', type_str).strip() -# -# -# def get_pegasus_type(sap_type): -# processed_type = preprocess_sap_type(sap_type) -# return HANA_TYPES_MAP.get(processed_type) - - class BaseHanaConfig(BasicSQLAlchemyConfig): scheme: str = Field(default="hana", hidden_from_docs=True) schema_pattern: AllowDenyPattern = Field( @@ -417,10 +404,6 @@ def _process_table(self, fields=columns, hash="", platformSchema=OtherSchemaClass(""), - # lastModified=AuditStampClass( - # time=int(time.time() * 1000), - # actor="urn:li:corpuser:admin", - # ), ) dataset_snapshot = MetadataChangeProposalWrapper.construct_many( @@ -468,10 +451,6 @@ def _process_view(self, fields=columns, hash="", platformSchema=OtherSchemaClass(""), - # lastModified=AuditStampClass( - # time=int(time.time() * 1000), - # actor="urn:li:corpuser:admin", - # ), ) view_properties = ViewPropertiesClass( @@ -538,7 +517,7 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da for data_source in data_sources.findall("DataSource", ns): data_source_type = data_source.get("type") if data_source_type.upper() == "CALCULATION_VIEW": - upstreams.append(f"_sys_bic.{data_source.find('resourceUri', ns).text}") + upstreams.append(f"_sys_bic.{data_source.find('resourceUri', ns).text[1:]}") else: column_object = data_source.find("columnObject", ns) upstreams.append(f"{column_object.get('schemaName')}.{column_object.get('columnObjectName')}") @@ -581,7 +560,6 @@ def _process_calculation_view( ) try: - logging.info(f"Dataset definition for {dataset_path}.{dataset_name}: {dataset_definition}") root = xml.etree.ElementTree.fromstring(dataset_definition) except Exception as e: logging.error(e) @@ -610,10 +588,6 @@ def _process_calculation_view( fields=columns, hash="", platformSchema=OtherSchemaClass(""), - # lastModified=AuditStampClass( - # time=int(time.time() * 1000), - # actor="urn:li:corpuser:admin", - # ), ) dataset_details = DatasetPropertiesClass(name=dataset_name) From bfe7c936f4d54676153af7efa9d7360480ad2808 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Tue, 10 Sep 2024 12:45:53 +0100 Subject: [PATCH 19/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 4efa07716b3ab..c8698d89dcdaa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -223,7 +223,7 @@ def construct_lineage(self, upstreams: List[Tuple[str, str]]) -> Optional[Upstre UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", platform_instance=self.config.platform_instance, env=self.config.env ), @@ -391,7 +391,7 @@ def _process_table(self, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{schema.lower()}.{dataset_name.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -430,7 +430,7 @@ def _process_view(self, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{schema.lower()}.{dataset_name.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -493,7 +493,7 @@ def _process_view(self, upstreams=[ make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) for row in constructed_lineage @@ -517,7 +517,9 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da for data_source in data_sources.findall("DataSource", ns): data_source_type = data_source.get("type") if data_source_type.upper() == "CALCULATION_VIEW": - upstreams.append(f"_sys_bic.{data_source.find('resourceUri', ns).text[1:]}") + upstreams.append( + f"_sys_bic.{data_source.find('resourceUri', ns).text[1:].replace('/calculationviews/','.')}" + ) else: column_object = data_source.find("columnObject", ns) upstreams.append(f"{column_object.get('schemaName')}.{column_object.get('columnObjectName')}") @@ -554,7 +556,7 @@ def _process_calculation_view( ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"_sys_bic.{dataset_path}.{dataset_name}", + name=f"_sys_bic.{dataset_path.lower()}.{dataset_name.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) From 0c39f5ce1c70f61b75beb8b994061127495e4a16 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Tue, 10 Sep 2024 13:35:21 +0100 Subject: [PATCH 20/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index c8698d89dcdaa..61a6d0be71770 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -223,7 +223,7 @@ def construct_lineage(self, upstreams: List[Tuple[str, str]]) -> Optional[Upstre UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", + name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", platform_instance=self.config.platform_instance, env=self.config.env ), @@ -258,12 +258,11 @@ def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: DATA_TYPE_NAME as data_type_name, IS_NULLABLE as is_nullable FROM SYS.VIEW_COLUMNS - WHERE "SCHEMA_NAME" = '_SYS_BIC' + WHERE LOWER("SCHEMA_NAME") = '_sys_bic' AND LOWER(VIEW_NAME) = '{view.lower()}' ORDER BY POSITION ASC """ query_result = [dict(row) for row in self.engine.execute(query).fetchall()] - logger.info(f"View: {view.lower()} definition: {query_result}") columns = query_result @@ -391,7 +390,7 @@ def _process_table(self, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{schema.lower()}.{dataset_name.lower()}", + name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -430,7 +429,7 @@ def _process_view(self, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{schema.lower()}.{dataset_name.lower()}", + name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -493,7 +492,7 @@ def _process_view(self, upstreams=[ make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", + name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", platform_instance=self.config.platform_instance, env=self.config.env ) for row in constructed_lineage @@ -518,7 +517,7 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da data_source_type = data_source.get("type") if data_source_type.upper() == "CALCULATION_VIEW": upstreams.append( - f"_sys_bic.{data_source.find('resourceUri', ns).text[1:].replace('/calculationviews/','.')}" + f"_SYS_BIC.{data_source.find('resourceUri', ns).text[1:].replace('/calculationviews/','.')}" ) else: column_object = data_source.find("columnObject", ns) @@ -536,7 +535,7 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=row.lower(), + name=row, platform_instance=self.config.platform_instance, env=self.config.env ), @@ -556,7 +555,7 @@ def _process_calculation_view( ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"_sys_bic.{dataset_path.lower()}.{dataset_name.lower()}", + name=f"_SYS_BIC.{dataset_path}.{dataset_name}", platform_instance=self.config.platform_instance, env=self.config.env ) From 8251d3e2ce2470435d7b782b40a5c11f138771ac Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Tue, 10 Sep 2024 14:40:46 +0100 Subject: [PATCH 21/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 61a6d0be71770..0707567d6747f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -223,7 +223,7 @@ def construct_lineage(self, upstreams: List[Tuple[str, str]]) -> Optional[Upstre UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", platform_instance=self.config.platform_instance, env=self.config.env ), @@ -390,7 +390,7 @@ def _process_table(self, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{schema.lower()}.{dataset_name.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -429,7 +429,7 @@ def _process_view(self, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{schema}.{dataset_name}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{schema.lower()}.{dataset_name.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -492,7 +492,7 @@ def _process_view(self, upstreams=[ make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database + '.') if self.config.database else ''}{row[0]}.{row[1]}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) for row in constructed_lineage @@ -517,7 +517,7 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da data_source_type = data_source.get("type") if data_source_type.upper() == "CALCULATION_VIEW": upstreams.append( - f"_SYS_BIC.{data_source.find('resourceUri', ns).text[1:].replace('/calculationviews/','.')}" + f"_sys_bic.{data_source.find('resourceUri', ns).text[1:].replace('/calculationviews/','/')}" ) else: column_object = data_source.find("columnObject", ns) @@ -535,7 +535,7 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=row, + name=row.lower(), platform_instance=self.config.platform_instance, env=self.config.env ), @@ -555,7 +555,7 @@ def _process_calculation_view( ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"_SYS_BIC.{dataset_path}.{dataset_name}", + name=f"_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -583,7 +583,7 @@ def _process_calculation_view( ) schema_metadata = SchemaMetadataClass( - schemaName=f"{schema}.{dataset_name}", + schemaName=f"{schema}/{dataset_name}", platform=make_data_platform_urn(self.get_platform()), version=0, fields=columns, From d8fc1c2906a396ec25050df38f9324a972f6adc6 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Wed, 11 Sep 2024 12:12:57 +0100 Subject: [PATCH 22/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 137 +++++++++++++++++- 1 file changed, 131 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 0707567d6747f..ee6529860a9ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -9,6 +9,8 @@ import xml.etree.ElementTree +import uuid + from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector @@ -37,12 +39,17 @@ TimeTypeClass, SchemaFieldDataTypeClass, SubTypesClass, - TimeStampClass + TimeStampClass, + BrowsePathsV2Class, + BrowsePathEntryClass, + ContainerPropertiesClass, + DataPlatformInstanceClass, ) from datahub.emitter.mce_builder import ( make_dataset_urn_with_platform_instance, make_data_platform_urn, - make_user_urn + make_user_urn, + make_container_urn, ) from datahub.sql_parsing.sql_parsing_aggregator import ( SqlParsingAggregator, @@ -176,6 +183,86 @@ def _create_engine(self): url = self.config.get_sql_alchemy_url() return create_engine(url) + def create_container(self, schema: str) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: + + browsePath: BrowsePathsV2Class = self.get_browse_path( + schema=schema, + ) + + container_urn = self.get_container_urn( + schema=schema, + ) + + container_name = ContainerPropertiesClass( + name=schema, + ) + + conatiner_type = SubTypesClass( + typeNames=["Schema"], + ) + + platform = self.get_platform_instance() + + container_snapshot = MetadataChangeProposalWrapper.construct_many( + entityUrn=container_urn, + aspects=[ + container_name, + conatiner_type, + browsePath, + platform + ] + ) + + for mcp in container_snapshot: + self.report.report_workunit(mcp.as_workunit()) + yield mcp.as_workunit() + + def get_browse_path(self, schema: str) -> BrowsePathsV2Class: + container_path: List[BrowsePathEntryClass] = [] + + if self.config.database: + container_path.append( + BrowsePathEntryClass( + id=self.config.database.lower() + ) + ) + if self.config.platform_instance: + container_path.append( + BrowsePathEntryClass( + id=self.config.platform_instance.lower() + ) + ) + + container_path.append( + BrowsePathEntryClass( + id=schema + ) + ) + + return BrowsePathsV2Class( + path=container_path + ) + + def get_container_urn(self, schema: str) -> str: + namespace = uuid.NAMESPACE_DNS + + return make_container_urn( + guid=str( + uuid.uuid5( + namespace, + self.get_platform() + \ + self.config.platform_instance + \ + schema, + ) + ) + ) + + def get_platform_instance(self): + return DataPlatformInstanceClass( + platform=f"urn:li:dataPlatform:{self.get_platform()}", + instance=self.config.platform_instance + ) + def get_table_properties(self, inspector: Inspector, schema: str, table: str) -> Optional[DatasetPropertiesClass]: description = inspector.get_table_comment(table, schema).get("text", "") return DatasetPropertiesClass(name=table, description=description) @@ -237,7 +324,7 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> columns = inspector.get_columns(table_or_view, schema) return [ SchemaFieldClass( - fieldPath=f"[version=2.0].[type={col.get('data_type_name')}].{col.get('column_name')}", + fieldPath=col.get('column_name'), type=SchemaFieldDataTypeClass( type=HANA_TYPES_MAP.get( col.get('data_type_name') @@ -268,7 +355,7 @@ def get_calculation_view_columns(self, view: str) -> List[SchemaFieldClass]: return [ SchemaFieldClass( - fieldPath=f"[version=2.0].[type={col.get('data_type_name')}].{col.get('column_name')}", + fieldPath=col.get('column_name'), type=SchemaFieldDataTypeClass( type=HANA_TYPES_MAP.get( col.get('data_type_name') @@ -327,6 +414,11 @@ def get_workunits_internal(self): futures = [] for schema in schemas: + + yield from self.create_container( + schema=schema, + ) + if schema.lower() == "_sys_bic": views = self.get_package_names() for view in views: @@ -405,11 +497,19 @@ def _process_table(self, platformSchema=OtherSchemaClass(""), ) + platform = self.get_platform_instance() + + browsePath = self.get_browse_path( + schema=schema, + ) + dataset_snapshot = MetadataChangeProposalWrapper.construct_many( entityUrn=entity, aspects=[ description, schema_metadata, + platform, + browsePath, ] ) @@ -458,11 +558,19 @@ def _process_view(self, viewLogic=view_definition, ) + platform = self.get_platform_instance() + + browsePath = self.get_browse_path( + schema=schema, + ) + aspects = [ properties, schema_metadata, view_properties, subtype, + platform, + browsePath, ] if lineage: @@ -517,7 +625,7 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da data_source_type = data_source.get("type") if data_source_type.upper() == "CALCULATION_VIEW": upstreams.append( - f"_sys_bic.{data_source.find('resourceUri', ns).text[1:].replace('/calculationviews/','/')}" + f"_sys_bic.{data_source.find('resourceUri', ns).text[1:].replace('/calculationviews/', '/')}" ) else: column_object = data_source.find("columnObject", ns) @@ -543,6 +651,13 @@ def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, da ) for row in upstreams ] + + # FineGrainedLineageClass( + # upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + # downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD + # + # ) + return UpstreamLineageClass(upstreams=upstream) def _process_calculation_view( @@ -591,7 +706,9 @@ def _process_calculation_view( platformSchema=OtherSchemaClass(""), ) - dataset_details = DatasetPropertiesClass(name=dataset_name) + dataset_details = DatasetPropertiesClass( + name=f"{schema}/{dataset_name}" + ) view_properties = ViewPropertiesClass( materialized=False, @@ -599,6 +716,12 @@ def _process_calculation_view( viewLogic=dataset_definition, ) + platform = self.get_platform_instance() + + browsePath = self.get_browse_path( + schema=schema, + ) + subtype = SubTypesClass(["View"]) aspects = [ @@ -606,6 +729,8 @@ def _process_calculation_view( subtype, dataset_details, view_properties, + platform, + browsePath, ] if lineage: From 4f66b22b4304b98e6225c8276f57cc34653390f9 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Wed, 11 Sep 2024 18:57:23 +0100 Subject: [PATCH 23/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 47 ++++++++++++++----- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index ee6529860a9ad..3b7e8d4d869bc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -7,7 +7,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed -import xml.etree.ElementTree +import xml.etree.ElementTree as ET import uuid @@ -44,6 +44,7 @@ BrowsePathEntryClass, ContainerPropertiesClass, DataPlatformInstanceClass, + ContainerClass, ) from datahub.emitter.mce_builder import ( make_dataset_urn_with_platform_instance, @@ -156,7 +157,6 @@ def __init__(self, config: HanaConfig, ctx: PipelineContext): super().__init__(config, ctx, self.get_platform()) self.config = config self.engine = self._create_engine() - # self.discovered_tables: Optional[Collection[str]] = None self.aggregator = SqlParsingAggregator( platform=self.get_platform(), platform_instance=self.config.platform_instance if self.config.platform_instance else None, @@ -168,7 +168,6 @@ def __init__(self, config: HanaConfig, ctx: PipelineContext): generate_operations=True, format_queries=True, usage_config=BaseUsageConfig(), - # is_allowed_table=self.is_allowed_table, ) def get_platform(self): @@ -185,7 +184,7 @@ def _create_engine(self): def create_container(self, schema: str) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: - browsePath: BrowsePathsV2Class = self.get_browse_path( + browse_path: BrowsePathsV2Class = self.get_browse_path( schema=schema, ) @@ -208,7 +207,7 @@ def create_container(self, schema: str) -> Iterable[Union[SqlWorkUnit, MetadataW aspects=[ container_name, conatiner_type, - browsePath, + browse_path, platform ] ) @@ -257,6 +256,13 @@ def get_container_urn(self, schema: str) -> str: ) ) + def get_container_class(self, schema: str) -> ContainerClass: + return ContainerClass( + container=self.get_container_urn( + schema=schema, + ) + ) + def get_platform_instance(self): return DataPlatformInstanceClass( platform=f"urn:li:dataPlatform:{self.get_platform()}", @@ -499,7 +505,11 @@ def _process_table(self, platform = self.get_platform_instance() - browsePath = self.get_browse_path( + browse_path = self.get_browse_path( + schema=schema, + ) + + container = self.get_container_class( schema=schema, ) @@ -509,7 +519,8 @@ def _process_table(self, description, schema_metadata, platform, - browsePath, + browse_path, + container, ] ) @@ -560,7 +571,11 @@ def _process_view(self, platform = self.get_platform_instance() - browsePath = self.get_browse_path( + browse_path = self.get_browse_path( + schema=schema, + ) + + container = self.get_container_class( schema=schema, ) @@ -570,7 +585,8 @@ def _process_view(self, view_properties, subtype, platform, - browsePath, + browse_path, + container ] if lineage: @@ -615,7 +631,7 @@ def _process_view(self, self.report.report_workunit(mcp.as_workunit()) yield mcp.as_workunit() - def get_calculation_view_lineage(self, root: xml.etree.ElementTree, ns: Dict, dataset_path: str, dataset_name: str): + def get_calculation_view_lineage(self, root: ET, ns: Dict, dataset_path: str, dataset_name: str): upstreams = [] data_sources = root.find("dataSources", ns) @@ -676,7 +692,7 @@ def _process_calculation_view( ) try: - root = xml.etree.ElementTree.fromstring(dataset_definition) + root = ET.fromstring(dataset_definition) except Exception as e: logging.error(e) root = None @@ -718,7 +734,11 @@ def _process_calculation_view( platform = self.get_platform_instance() - browsePath = self.get_browse_path( + browse_path = self.get_browse_path( + schema=schema, + ) + + container = self.get_container_class( schema=schema, ) @@ -730,7 +750,8 @@ def _process_calculation_view( dataset_details, view_properties, platform, - browsePath, + browse_path, + container, ] if lineage: From f50f12dbab3ee7d390102e15725126de7b7150c0 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 08:56:49 +0100 Subject: [PATCH 24/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 3b7e8d4d869bc..c92906569738c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -51,6 +51,7 @@ make_data_platform_urn, make_user_urn, make_container_urn, + make_dataplatform_instance_urn ) from datahub.sql_parsing.sql_parsing_aggregator import ( SqlParsingAggregator, @@ -266,7 +267,13 @@ def get_container_class(self, schema: str) -> ContainerClass: def get_platform_instance(self): return DataPlatformInstanceClass( platform=f"urn:li:dataPlatform:{self.get_platform()}", - instance=self.config.platform_instance + instance=( + make_dataplatform_instance_urn( + self.get_platform(), self.config.platform_instance + ) + if self.config.platform_instance + else None + ) ) def get_table_properties(self, inspector: Inspector, schema: str, table: str) -> Optional[DatasetPropertiesClass]: From 8679638840c476ba658242cf6e24ba111f81e005 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 09:19:03 +0100 Subject: [PATCH 25/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index c92906569738c..584813facbaf6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -470,15 +470,9 @@ def get_workunits_internal(self): ) queries = self.get_query_logs() - for query in queries: + for query in queries: self.aggregator.add_observed_query( - query=query.get("statement_string"), - query_timestamp=query.get("last_execution_timestamp"), - user=CorpuserUrn( - make_user_urn( - query.get("user_name") - ) - ), + observed=query.get("statement_string"), ) for future in as_completed(futures): From f0df8327920b2aecc40e1eceecafb93c79fa1d95 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 10:00:25 +0100 Subject: [PATCH 26/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 584813facbaf6..927f0d78b895c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -470,7 +470,7 @@ def get_workunits_internal(self): ) queries = self.get_query_logs() - for query in queries: + for query in queries: self.aggregator.add_observed_query( observed=query.get("statement_string"), ) From 4afb5210f3bf9e12f50bda84104189d0e24efc56 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 10:10:06 +0100 Subject: [PATCH 27/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 927f0d78b895c..dd7062191d1ac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -55,7 +55,8 @@ ) from datahub.sql_parsing.sql_parsing_aggregator import ( SqlParsingAggregator, - KnownQueryLineageInfo + KnownQueryLineageInfo, + ObservedQuery, ) from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, @@ -472,7 +473,15 @@ def get_workunits_internal(self): queries = self.get_query_logs() for query in queries: self.aggregator.add_observed_query( - observed=query.get("statement_string"), + observed=ObservedQuery( + query=query.get("statement_string"), + timestamp=query.get("last_execution_timestamp"), + user=CorpuserUrn( + make_user_urn( + query.get("user_name") + ) + ) + ), ) for future in as_completed(futures): From 9fdeff9436b44b1f37f2e4361a82e44a0d0af567 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 11:29:05 +0100 Subject: [PATCH 28/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index dd7062191d1ac..93147d0f66b4b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -733,7 +733,7 @@ def _process_calculation_view( ) dataset_details = DatasetPropertiesClass( - name=f"{schema}/{dataset_name}" + name=f"{dataset_path}/{dataset_name}" ) view_properties = ViewPropertiesClass( From 5d86836ec8ed152e77c30a459871aed003ffe5df Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 11:57:50 +0100 Subject: [PATCH 29/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 93147d0f66b4b..8df18f990ed79 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -334,8 +334,37 @@ def construct_lineage(self, upstreams: List[Tuple[str, str]]) -> Optional[Upstre ] return UpstreamLineageClass(upstreams=upstream) - def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> List[SchemaFieldClass]: - columns = inspector.get_columns(table_or_view, schema) + def get_columns(self, schema: str, table_or_view: str) -> List[SchemaFieldClass]: + query = f""" + SELECT COLUMN_NAME as column_name, + COMMENTS as comments, + DATA_TYPE_NAME as data_type_name, + IS_NULLABLE as is_nullable + FROM ( + SELECT POSITION, + COLUMN_NAME, + COMMENTS, + DATA_TYPE_NAME, + IS_NULLABLE, + SCHEMA_NAME, + TABLE_NAME + FROM SYS.TABLE_COLUMNS UNION ALL + SELECT POSITION, + COLUMN_NAME, + COMMENTS, + DATA_TYPE_NAME, + IS_NULLABLE, + SCHEMA_NAME, + VIEW_NAME AS TABLE_NAME + FROM SYS.VIEW_COLUMNS ) AS COLUMNS + WHERE LOWER("SCHEMA_NAME") = '{schema.lower()}' + AND LOWER(TABLE_NAME) = '{table_or_view.lower()}' + ORDER BY POSITION ASC + """ + query_result = [dict(row) for row in self.engine.execute(query).fetchall()] + + columns = query_result + return [ SchemaFieldClass( fieldPath=col.get('column_name'), @@ -345,9 +374,8 @@ def get_columns(self, inspector: Inspector, schema: str, table_or_view: str) -> ) ), nativeDataType=col.get('data_type_name'), - description=col.get("comment", ""), + description=col.get('comments', ""), nullable=bool(col.get("is_nullable")), - ) for col in columns ] @@ -503,7 +531,7 @@ def _process_table(self, env=self.config.env ) description = self.get_table_properties(inspector=inspector, schema=schema, table=dataset_name) - columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) + columns = self.get_columns(schema=schema, table_or_view=dataset_name) schema_metadata = SchemaMetadataClass( schemaName=f"{schema}.{dataset_name}", platform=make_data_platform_urn(self.get_platform()), @@ -559,7 +587,7 @@ def _process_view(self, view_definition = view_details[0] properties = self.get_view_info(view=dataset_name, definition_and_description=view_details) - columns = self.get_columns(inspector=inspector, schema=schema, table_or_view=dataset_name) + columns = self.get_columns(schema=schema, table_or_view=dataset_name) constructed_lineage = self.get_lineage(schema=schema, table_or_view=dataset_name) lineage = self.construct_lineage(upstreams=constructed_lineage) subtype = SubTypesClass(["View"]) From aa4e45dd21fcfd10f24422130f78d4cf0d14e3a3 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 12:06:17 +0100 Subject: [PATCH 30/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 8df18f990ed79..7f2e0cf3354a6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -334,7 +334,7 @@ def construct_lineage(self, upstreams: List[Tuple[str, str]]) -> Optional[Upstre ] return UpstreamLineageClass(upstreams=upstream) - def get_columns(self, schema: str, table_or_view: str) -> List[SchemaFieldClass]: + def get_columns(self, schema: str, table_or_view: str) -> List[SchemaFieldClass]: query = f""" SELECT COLUMN_NAME as column_name, COMMENTS as comments, From c1e73eaeebd60eb4ef65c9c4a58aa38b9f423749 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 15:49:36 +0100 Subject: [PATCH 31/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 7f2e0cf3354a6..f394c41fb72f3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -470,7 +470,6 @@ def get_workunits_internal(self): dataset_path=view.get("package_id"), dataset_name=view.get("object_name"), dataset_definition=view.get("cdata"), - inspector=inspector, schema=schema, ) ) @@ -493,11 +492,13 @@ def get_workunits_internal(self): executor.submit( self._process_view, dataset_name=view, - inspector=inspector, schema=schema ) ) + for future in as_completed(futures): + yield from future.result() + queries = self.get_query_logs() for query in queries: self.aggregator.add_observed_query( @@ -512,9 +513,6 @@ def get_workunits_internal(self): ), ) - for future in as_completed(futures): - yield from future.result() - for mcp in self.aggregator.gen_metadata(): self.report.report_workunit(mcp.as_workunit()) yield mcp.as_workunit() @@ -567,13 +565,14 @@ def _process_table(self, schema=schema_metadata, ) + self.aggregator.is_allowed_table(entity) + for mcp in dataset_snapshot: self.report.report_workunit(mcp.as_workunit()) yield mcp.as_workunit() def _process_view(self, dataset_name: str, - inspector: Inspector, schema: str, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( @@ -640,6 +639,8 @@ def _process_view(self, schema=schema_metadata ) + self.aggregator.is_allowed_table(entity) + self.aggregator.add_view_definition( view_urn=entity, view_definition=view_definition, @@ -697,7 +698,7 @@ def get_calculation_view_lineage(self, root: ET, ns: Dict, dataset_path: str, da UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=row.lower(), + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ), @@ -719,12 +720,11 @@ def _process_calculation_view( dataset_path: str, dataset_name: str, dataset_definition: str, - inspector: Inspector, schema: str ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: entity = make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", platform_instance=self.config.platform_instance, env=self.config.env ) @@ -805,6 +805,8 @@ def _process_calculation_view( schema=schema_metadata ) + self.aggregator.is_allowed_table(entity) + for mcp in dataset_snapshot: self.report.report_workunit(mcp.as_workunit()) yield mcp.as_workunit() From b1fbbe00971309b84118f2d50e40702ae4921c75 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 16:29:51 +0100 Subject: [PATCH 32/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index f394c41fb72f3..a40a83494a7f3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -186,7 +186,7 @@ def _create_engine(self): def create_container(self, schema: str) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: - browse_path: BrowsePathsV2Class = self.get_browse_path( + browse_path = self.get_browse_path( schema=schema, ) @@ -227,12 +227,6 @@ def get_browse_path(self, schema: str) -> BrowsePathsV2Class: id=self.config.database.lower() ) ) - if self.config.platform_instance: - container_path.append( - BrowsePathEntryClass( - id=self.config.platform_instance.lower() - ) - ) container_path.append( BrowsePathEntryClass( From 776b8007d24f8924a1a6432946aaa459cb5387a1 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 23:46:56 +0100 Subject: [PATCH 33/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 267 +++++++++++++++++- 1 file changed, 254 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index a40a83494a7f3..5fc9d2b5b683c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -1,7 +1,9 @@ import datetime import logging +import uuid +import re -from typing import Optional, Dict, List, Iterable, Union, Tuple, Any, Collection +from typing import Optional, Dict, List, Iterable, Union, Tuple, Any, Set from pydantic.fields import Field @@ -9,8 +11,6 @@ import xml.etree.ElementTree as ET -import uuid - from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector @@ -45,13 +45,17 @@ ContainerPropertiesClass, DataPlatformInstanceClass, ContainerClass, + FineGrainedLineageClass, + FineGrainedLineageUpstreamTypeClass, + FineGrainedLineageDownstreamTypeClass, ) from datahub.emitter.mce_builder import ( make_dataset_urn_with_platform_instance, make_data_platform_urn, make_user_urn, make_container_urn, - make_dataplatform_instance_urn + make_dataplatform_instance_urn, + make_schema_field_urn, ) from datahub.sql_parsing.sql_parsing_aggregator import ( SqlParsingAggregator, @@ -114,6 +118,205 @@ } +class SAPCalculationViewParser: + def __init__(self): + pass + + @staticmethod + def _parseCalc(view: str, xml: ET) -> Dict: + + sources = {} + for child in xml.iter('DataSource'): + source = {'type': child.attrib['type']} + for grandchild in child: + if grandchild.tag == 'columnObject': + source['name'] = grandchild.attrib['columnObjectName'] + source['path'] = grandchild.attrib['schemaName'] + elif grandchild.tag == 'resourceUri': + source['name'] = grandchild.text.split(sep="/")[-1] + source['path'] = "/".join(grandchild.text.split(sep="/")[:-1]) + sources[child.attrib['id']] = source + + outputs = {} + for child in xml.findall(".//logicalModel/attributes/attribute"): + output = { + 'source': child.find('keyMapping').attrib['columnName'], + 'node': child.find('keyMapping').attrib['columnObjectName'], + 'type': 'attribute', + } + outputs[child.attrib['id']] = output + + for child in xml.findall(".//logicalModel/baseMeasures/measure"): + output = { + 'source': child.find('measureMapping').attrib['columnName'], + 'node': child.find('measureMapping').attrib['columnObjectName'], + 'type': 'measure', + } + outputs[child.attrib['id']] = output + + nodes = {} + for child in xml.iter('calculationView'): + node = { + 'type': child.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'], + 'sources': {}, + 'id': child.attrib['id'], + } + + if node['type'] == 'Calculation:UnionView': + node['unions'] = [] + for grandchild in child.iter('input'): + union_source = grandchild.attrib['node'].split('#')[-1] + node['unions'].append(union_source) + node['sources'][union_source] = {} + for mapping in grandchild.iter('mapping'): + if 'source' in mapping.attrib: + node['sources'][union_source][mapping.attrib['target']] = { + 'type': 'column', + 'source': mapping.attrib['source'] + } + else: + for grandchild in child.iter('input'): + input_source = grandchild.attrib['node'].split('#')[-1] + node['sources'][input_source] = {} + for mapping in grandchild.iter('mapping'): + if 'source' in mapping.attrib: + node['sources'][input_source][mapping.attrib['target']] = { + 'type': 'column', + 'source': mapping.attrib['source'] + } + + for grandchild in child.iter('calculatedViewAttribute'): + formula = grandchild.find('formula') + if formula is not None: + node['sources'][grandchild.attrib['id']] = { + 'type': 'formula', + 'source': formula.text + } + + nodes[node['id']] = node + + return { + 'viewName': view, + 'sources': sources, + 'outputs': outputs, + 'nodes': nodes + } + + @staticmethod + def _extract_columns_from_formula(formula: str) -> List[str]: + return re.findall(r'\"([^\"]*)\"', formula) + + def _find_all_sources( + self, + calc_view: Dict, + column: str, + node: str, + visited: Set[str] + ) -> List[Dict]: + if node in visited: + return [] + visited.add(node) + + sources = [] + node_info = calc_view['nodes'].get(node) + + if not node_info: + return [] + + if node_info['type'] == 'Calculation:UnionView': + for union_source in node_info['unions']: + for col, col_info in node_info['sources'][union_source].items(): + if col == column: + new_sources = self._find_all_sources( + calc_view, + col_info['source'], + union_source, + visited.copy() + ) + if not new_sources and union_source in calc_view['sources']: + new_sources = [{ + 'column': col_info['source'], + 'source': f"{calc_view['sources'][union_source]['path']}/{calc_view['sources'][union_source]['name']}" if + calc_view['sources'][union_source]['type'] == 'CALCULATION_VIEW' else + calc_view['sources'][union_source]['name'], + 'sourceType': calc_view['sources'][union_source]['type'], + 'sourcePath': calc_view['sources'][union_source]['path'], + }] + sources.extend(new_sources) + else: + for source, columns in node_info['sources'].items(): + if column in columns: + col_info = columns[column] + if col_info['type'] == 'formula': + formula_columns = self._extract_columns_from_formula(col_info['source']) + for formula_column in formula_columns: + sources.extend(self._find_all_sources(calc_view, formula_column, node, visited.copy())) + elif source in calc_view['sources']: + sources.append({ + 'column': col_info['source'], + 'source': f"{calc_view['sources'][source]['path']}/{calc_view['sources'][source]['name']}" if + calc_view['sources'][source]['type'] == 'CALCULATION_VIEW' else + calc_view['sources'][source][ + 'name'], + 'sourceType': calc_view['sources'][source]['type'], + 'sourcePath': calc_view['sources'][source]['path'], + }) + else: + sources.extend( + self._find_all_sources(calc_view, col_info['source'], source, visited.copy()) + ) + + return sources + + def _allColumnsOrigin(self, view: str, definition: str) -> Dict[str, List[Dict]]: + calc_view = self._parseCalc(view, definition) + columns_lineage = {} + + for output, output_info in calc_view['outputs'].items(): + sources = self._find_all_sources(calc_view, output_info['source'], output_info['node'], set()) + columns_lineage[output] = sources + + # Handle calculated attributes + for node in calc_view['nodes'].values(): + for column, col_info in node['sources'].items(): + if isinstance(col_info, dict) and col_info.get('type') == 'formula': + formula_columns = self._extract_columns_from_formula(col_info['source']) + sources = [] + for formula_column in formula_columns: + sources.extend(self._find_all_sources(calc_view, formula_column, node['id'], set())) + columns_lineage[column] = sources + + return columns_lineage + + def format_column_dictionary( + self, + view_name: str, + view_definition: str, + ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: + output_columns = self._allColumnsOrigin( + view_name, + view_definition, + ) + column_dicts: List[Dict[str, List[Dict[str, str]]]] = [] + for cols, src in output_columns.items(): + column_dicts.append( + { + "downstream_column": cols, + "upstream": [ + { + "upstream_table": + f"{src_col.get('sourcePath')}.{src_col.get('source')}".lower() + if src_col.get('sourceType') == "DATA_BASE_TABLE" else + f"_sys_bic.{src_col.get('source')[1:].replace('/calculationviews/', '/')}".lower(), + "upstream_column": src_col.get('column') + } + for src_col in src + ] + } + ) + return column_dicts + + class BaseHanaConfig(BasicSQLAlchemyConfig): scheme: str = Field(default="hana", hidden_from_docs=True) schema_pattern: AllowDenyPattern = Field( @@ -245,8 +448,8 @@ def get_container_urn(self, schema: str) -> str: guid=str( uuid.uuid5( namespace, - self.get_platform() + \ - self.config.platform_instance + \ + self.get_platform() + + self.config.platform_instance + schema, ) ) @@ -682,7 +885,7 @@ def get_calculation_view_lineage(self, root: ET, ns: Dict, dataset_path: str, da except Exception as e: logging.warning( - f"No lineage found for Calculation View {dataset_path}/{dataset_path}. Parsing error: {e}" + f"No lineage found for Calculation View {dataset_path}/{dataset_name}. Parsing error: {e}" ) if not upstreams: @@ -701,13 +904,51 @@ def get_calculation_view_lineage(self, root: ET, ns: Dict, dataset_path: str, da for row in upstreams ] - # FineGrainedLineageClass( - # upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, - # downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD - # - # ) + fine_grained_lineage: List[FineGrainedLineageClass] = [] + + for column_lineage in SAPCalculationViewParser().format_column_dictionary( + view_name=f"{dataset_path}/{dataset_name}", + view_definition=root, + ): + downstream_column = column_lineage.get("downstream_column") + upstream_columns: List[str] = [] + for column in column_lineage.get("upstream"): + upstream_columns.append( + make_schema_field_urn( + parent_urn=make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{column.get('upstream_table')}", + platform_instance=self.config.platform_instance, + env=self.config.env + ), + field_path=column.get("upstream_column"), + ) + ) + + fine_grained_lineage.append( + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=upstream_columns, + downstreams=[ + make_schema_field_urn( + parent_urn=make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", + platform_instance=self.config.platform_instance, + env=self.config.env + ), + field_path=downstream_column, + ) + ], + confidenceScore=1.0, + ) + ) - return UpstreamLineageClass(upstreams=upstream) + return UpstreamLineageClass( + upstreams=upstream, + fineGrainedLineages=fine_grained_lineage + ) def _process_calculation_view( self, From 7d1b5f8ee99c94e350ec9e531b7be176817989cd Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Thu, 12 Sep 2024 23:52:53 +0100 Subject: [PATCH 34/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 5fc9d2b5b683c..beccbfa57257f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -433,7 +433,10 @@ def get_browse_path(self, schema: str) -> BrowsePathsV2Class: container_path.append( BrowsePathEntryClass( - id=schema + id=schema, + urn=self.get_container_urn( + schema=schema + ) ) ) From 606e31bc8564bd077e22e8a63bfddfb049bd6481 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 13 Sep 2024 11:26:51 +0100 Subject: [PATCH 35/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index beccbfa57257f..54a0520837d3b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -516,21 +516,25 @@ def get_lineage(self, schema: str, table_or_view: str) -> list: return self.engine.execute(query).fetchall() def construct_lineage(self, upstreams: List[Tuple[str, str]]) -> Optional[UpstreamLineageClass]: + upstream_tables: List[str] = [] + for row_item in upstreams: + if isinstance(row_item[0], str) and isinstance(row_item[1], str): + upstream_tables.append(f"{row_item[0].lower()}.{row_item[1].lower()}") - if not upstreams: + if not upstream_tables: return None upstream = [ UpstreamClass( dataset=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row}", platform_instance=self.config.platform_instance, env=self.config.env ), type=DatasetLineageTypeClass.VIEW, ) - for row in upstreams + for row in upstream_tables ] return UpstreamLineageClass(upstreams=upstream) @@ -849,16 +853,21 @@ def _process_view(self, ) if constructed_lineage: + upstream_tables: List[str] = [] + for row_item in constructed_lineage: + if isinstance(row_item[0], str) and isinstance(row_item[1], str): + upstream_tables.append(f"{row_item[0].lower()}.{row_item[1].lower()}") + self.aggregator.add_known_query_lineage( KnownQueryLineageInfo( query_text=view_definition, upstreams=[ make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row[0].lower()}.{row[1].lower()}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row}", platform_instance=self.config.platform_instance, env=self.config.env - ) for row in constructed_lineage + ) for row in upstream_tables ], downstream=entity, query_type=QueryType.SELECT From 5c71192f2856e23b0af2ff28826257d6f91f66cd Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 13 Sep 2024 11:58:13 +0100 Subject: [PATCH 36/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 54a0520837d3b..dca42712e6fd0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -221,6 +221,14 @@ def _find_all_sources( node_info = calc_view['nodes'].get(node) if not node_info: + source = calc_view['sources'].get(node) + if source: + return [{ + 'column': column, + 'source': source['name'], + 'sourceType': source['type'], + 'sourcePath': source.get('path', '') + }] return [] if node_info['type'] == 'Calculation:UnionView': From d715132d70bccfa806f108e8840d4e325e9f4fbd Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 13 Sep 2024 12:38:56 +0100 Subject: [PATCH 37/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index dca42712e6fd0..fbbaddf26275e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -315,6 +315,8 @@ def format_column_dictionary( "upstream_table": f"{src_col.get('sourcePath')}.{src_col.get('source')}".lower() if src_col.get('sourceType') == "DATA_BASE_TABLE" else + f"_sys_bic.{src_col.get('source').replace('::', '/').lower()}" + if src_col.get('sourceType') == "TABLE_FUNCTION" else f"_sys_bic.{src_col.get('source')[1:].replace('/calculationviews/', '/')}".lower(), "upstream_column": src_col.get('column') } From 2a3013cd150a9e7836d96db1cbe6312339d3d88a Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 13 Sep 2024 13:35:10 +0100 Subject: [PATCH 38/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index fbbaddf26275e..74a19e2d21dfc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -932,40 +932,41 @@ def get_calculation_view_lineage(self, root: ET, ns: Dict, dataset_path: str, da view_name=f"{dataset_path}/{dataset_name}", view_definition=root, ): - downstream_column = column_lineage.get("downstream_column") - upstream_columns: List[str] = [] - for column in column_lineage.get("upstream"): - upstream_columns.append( - make_schema_field_urn( - parent_urn=make_dataset_urn_with_platform_instance( - platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{column.get('upstream_table')}", - platform_instance=self.config.platform_instance, - env=self.config.env - ), - field_path=column.get("upstream_column"), - ) - ) - - fine_grained_lineage.append( - FineGrainedLineageClass( - upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, - downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, - upstreams=upstream_columns, - downstreams=[ + if column_lineage.get("upstream"): + downstream_column = column_lineage.get("downstream_column") + upstream_columns: List[str] = [] + for column in column_lineage.get("upstream"): + upstream_columns.append( make_schema_field_urn( parent_urn=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{column.get('upstream_table')}", platform_instance=self.config.platform_instance, env=self.config.env ), - field_path=downstream_column, + field_path=column.get("upstream_column"), ) - ], - confidenceScore=1.0, + ) + + fine_grained_lineage.append( + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=upstream_columns, + downstreams=[ + make_schema_field_urn( + parent_urn=make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", + platform_instance=self.config.platform_instance, + env=self.config.env + ), + field_path=downstream_column, + ) + ], + confidenceScore=1.0, + ) ) - ) return UpstreamLineageClass( upstreams=upstream, From 61989accc852a258f5d0bfdfd6b1a465c4884527 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 13 Sep 2024 14:43:14 +0100 Subject: [PATCH 39/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 393 +++++++++--------- 1 file changed, 207 insertions(+), 186 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 74a19e2d21dfc..0fb784fe8ba70 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -124,76 +124,79 @@ def __init__(self): @staticmethod def _parseCalc(view: str, xml: ET) -> Dict: - sources = {} - for child in xml.iter('DataSource'): - source = {'type': child.attrib['type']} - for grandchild in child: - if grandchild.tag == 'columnObject': - source['name'] = grandchild.attrib['columnObjectName'] - source['path'] = grandchild.attrib['schemaName'] - elif grandchild.tag == 'resourceUri': - source['name'] = grandchild.text.split(sep="/")[-1] - source['path'] = "/".join(grandchild.text.split(sep="/")[:-1]) - sources[child.attrib['id']] = source - outputs = {} - for child in xml.findall(".//logicalModel/attributes/attribute"): - output = { - 'source': child.find('keyMapping').attrib['columnName'], - 'node': child.find('keyMapping').attrib['columnObjectName'], - 'type': 'attribute', - } - outputs[child.attrib['id']] = output + nodes = {} - for child in xml.findall(".//logicalModel/baseMeasures/measure"): - output = { - 'source': child.find('measureMapping').attrib['columnName'], - 'node': child.find('measureMapping').attrib['columnObjectName'], - 'type': 'measure', - } - outputs[child.attrib['id']] = output + try: + for child in xml.iter('DataSource'): + source = {'type': child.attrib['type']} + for grandchild in child: + if grandchild.tag == 'columnObject': + source['name'] = grandchild.attrib['columnObjectName'] + source['path'] = grandchild.attrib['schemaName'] + elif grandchild.tag == 'resourceUri': + source['name'] = grandchild.text.split(sep="/")[-1] + source['path'] = "/".join(grandchild.text.split(sep="/")[:-1]) + sources[child.attrib['id']] = source + + for child in xml.findall(".//logicalModel/attributes/attribute"): + output = { + 'source': child.find('keyMapping').attrib['columnName'], + 'node': child.find('keyMapping').attrib['columnObjectName'], + 'type': 'attribute', + } + outputs[child.attrib['id']] = output - nodes = {} - for child in xml.iter('calculationView'): - node = { - 'type': child.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'], - 'sources': {}, - 'id': child.attrib['id'], - } + for child in xml.findall(".//logicalModel/baseMeasures/measure"): + output = { + 'source': child.find('measureMapping').attrib['columnName'], + 'node': child.find('measureMapping').attrib['columnObjectName'], + 'type': 'measure', + } + outputs[child.attrib['id']] = output - if node['type'] == 'Calculation:UnionView': - node['unions'] = [] - for grandchild in child.iter('input'): - union_source = grandchild.attrib['node'].split('#')[-1] - node['unions'].append(union_source) - node['sources'][union_source] = {} - for mapping in grandchild.iter('mapping'): - if 'source' in mapping.attrib: - node['sources'][union_source][mapping.attrib['target']] = { - 'type': 'column', - 'source': mapping.attrib['source'] - } - else: - for grandchild in child.iter('input'): - input_source = grandchild.attrib['node'].split('#')[-1] - node['sources'][input_source] = {} - for mapping in grandchild.iter('mapping'): - if 'source' in mapping.attrib: - node['sources'][input_source][mapping.attrib['target']] = { - 'type': 'column', - 'source': mapping.attrib['source'] - } + for child in xml.iter('calculationView'): + node = { + 'type': child.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'], + 'sources': {}, + 'id': child.attrib['id'], + } - for grandchild in child.iter('calculatedViewAttribute'): - formula = grandchild.find('formula') - if formula is not None: - node['sources'][grandchild.attrib['id']] = { - 'type': 'formula', - 'source': formula.text - } + if node['type'] == 'Calculation:UnionView': + node['unions'] = [] + for grandchild in child.iter('input'): + union_source = grandchild.attrib['node'].split('#')[-1] + node['unions'].append(union_source) + node['sources'][union_source] = {} + for mapping in grandchild.iter('mapping'): + if 'source' in mapping.attrib: + node['sources'][union_source][mapping.attrib['target']] = { + 'type': 'column', + 'source': mapping.attrib['source'] + } + else: + for grandchild in child.iter('input'): + input_source = grandchild.attrib['node'].split('#')[-1] + node['sources'][input_source] = {} + for mapping in grandchild.iter('mapping'): + if 'source' in mapping.attrib: + node['sources'][input_source][mapping.attrib['target']] = { + 'type': 'column', + 'source': mapping.attrib['source'] + } + + for grandchild in child.iter('calculatedViewAttribute'): + formula = grandchild.find('formula') + if formula is not None: + node['sources'][grandchild.attrib['id']] = { + 'type': 'formula', + 'source': formula.text + } - nodes[node['id']] = node + nodes[node['id']] = node + except Exception as e: + logger.error(e) return { 'viewName': view, @@ -213,66 +216,70 @@ def _find_all_sources( node: str, visited: Set[str] ) -> List[Dict]: - if node in visited: - return [] - visited.add(node) - sources = [] - node_info = calc_view['nodes'].get(node) - - if not node_info: - source = calc_view['sources'].get(node) - if source: - return [{ - 'column': column, - 'source': source['name'], - 'sourceType': source['type'], - 'sourcePath': source.get('path', '') - }] - return [] - - if node_info['type'] == 'Calculation:UnionView': - for union_source in node_info['unions']: - for col, col_info in node_info['sources'][union_source].items(): - if col == column: - new_sources = self._find_all_sources( - calc_view, - col_info['source'], - union_source, - visited.copy() - ) - if not new_sources and union_source in calc_view['sources']: - new_sources = [{ + + try: + if node in visited: + return [] + visited.add(node) + + node_info = calc_view['nodes'].get(node) + + if not node_info: + source = calc_view['sources'].get(node) + if source: + return [{ + 'column': column, + 'source': source['name'], + 'sourceType': source['type'], + 'sourcePath': source.get('path', '') + }] + return [] + + if node_info['type'] == 'Calculation:UnionView': + for union_source in node_info['unions']: + for col, col_info in node_info['sources'][union_source].items(): + if col == column: + new_sources = self._find_all_sources( + calc_view, + col_info['source'], + union_source, + visited.copy() + ) + if not new_sources and union_source in calc_view['sources']: + new_sources = [{ + 'column': col_info['source'], + 'source': f"{calc_view['sources'][union_source]['path']}/{calc_view['sources'][union_source]['name']}" if + calc_view['sources'][union_source]['type'] == 'CALCULATION_VIEW' else + calc_view['sources'][union_source]['name'], + 'sourceType': calc_view['sources'][union_source]['type'], + 'sourcePath': calc_view['sources'][union_source]['path'], + }] + sources.extend(new_sources) + else: + for source, columns in node_info['sources'].items(): + if column in columns: + col_info = columns[column] + if col_info['type'] == 'formula': + formula_columns = self._extract_columns_from_formula(col_info['source']) + for formula_column in formula_columns: + sources.extend(self._find_all_sources(calc_view, formula_column, node, visited.copy())) + elif source in calc_view['sources']: + sources.append({ 'column': col_info['source'], - 'source': f"{calc_view['sources'][union_source]['path']}/{calc_view['sources'][union_source]['name']}" if - calc_view['sources'][union_source]['type'] == 'CALCULATION_VIEW' else - calc_view['sources'][union_source]['name'], - 'sourceType': calc_view['sources'][union_source]['type'], - 'sourcePath': calc_view['sources'][union_source]['path'], - }] - sources.extend(new_sources) - else: - for source, columns in node_info['sources'].items(): - if column in columns: - col_info = columns[column] - if col_info['type'] == 'formula': - formula_columns = self._extract_columns_from_formula(col_info['source']) - for formula_column in formula_columns: - sources.extend(self._find_all_sources(calc_view, formula_column, node, visited.copy())) - elif source in calc_view['sources']: - sources.append({ - 'column': col_info['source'], - 'source': f"{calc_view['sources'][source]['path']}/{calc_view['sources'][source]['name']}" if - calc_view['sources'][source]['type'] == 'CALCULATION_VIEW' else - calc_view['sources'][source][ - 'name'], - 'sourceType': calc_view['sources'][source]['type'], - 'sourcePath': calc_view['sources'][source]['path'], - }) - else: - sources.extend( - self._find_all_sources(calc_view, col_info['source'], source, visited.copy()) - ) + 'source': f"{calc_view['sources'][source]['path']}/{calc_view['sources'][source]['name']}" if + calc_view['sources'][source]['type'] == 'CALCULATION_VIEW' else + calc_view['sources'][source][ + 'name'], + 'sourceType': calc_view['sources'][source]['type'], + 'sourcePath': calc_view['sources'][source]['path'], + }) + else: + sources.extend( + self._find_all_sources(calc_view, col_info['source'], source, visited.copy()) + ) + except Exception as e: + logger.error(e) return sources @@ -280,19 +287,22 @@ def _allColumnsOrigin(self, view: str, definition: str) -> Dict[str, List[Dict]] calc_view = self._parseCalc(view, definition) columns_lineage = {} - for output, output_info in calc_view['outputs'].items(): - sources = self._find_all_sources(calc_view, output_info['source'], output_info['node'], set()) - columns_lineage[output] = sources - - # Handle calculated attributes - for node in calc_view['nodes'].values(): - for column, col_info in node['sources'].items(): - if isinstance(col_info, dict) and col_info.get('type') == 'formula': - formula_columns = self._extract_columns_from_formula(col_info['source']) - sources = [] - for formula_column in formula_columns: - sources.extend(self._find_all_sources(calc_view, formula_column, node['id'], set())) - columns_lineage[column] = sources + try: + for output, output_info in calc_view['outputs'].items(): + sources = self._find_all_sources(calc_view, output_info['source'], output_info['node'], set()) + columns_lineage[output] = sources + + # Handle calculated attributes + for node in calc_view['nodes'].values(): + for column, col_info in node['sources'].items(): + if isinstance(col_info, dict) and col_info.get('type') == 'formula': + formula_columns = self._extract_columns_from_formula(col_info['source']) + sources = [] + for formula_column in formula_columns: + sources.extend(self._find_all_sources(calc_view, formula_column, node['id'], set())) + columns_lineage[column] = sources + except Exception as e: + logger.error(e) return columns_lineage @@ -301,29 +311,36 @@ def format_column_dictionary( view_name: str, view_definition: str, ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: - output_columns = self._allColumnsOrigin( - view_name, - view_definition, - ) column_dicts: List[Dict[str, List[Dict[str, str]]]] = [] - for cols, src in output_columns.items(): - column_dicts.append( - { - "downstream_column": cols, - "upstream": [ - { - "upstream_table": - f"{src_col.get('sourcePath')}.{src_col.get('source')}".lower() - if src_col.get('sourceType') == "DATA_BASE_TABLE" else - f"_sys_bic.{src_col.get('source').replace('::', '/').lower()}" - if src_col.get('sourceType') == "TABLE_FUNCTION" else - f"_sys_bic.{src_col.get('source')[1:].replace('/calculationviews/', '/')}".lower(), - "upstream_column": src_col.get('column') - } - for src_col in src - ] - } + + try: + output_columns = self._allColumnsOrigin( + view_name, + view_definition, ) + + for cols, src in output_columns.items(): + column_dicts.append( + { + "downstream_column": cols, + "upstream": [ + { + "upstream_table": + f"{src_col.get('sourcePath')}.{src_col.get('source')}".lower() + if src_col.get('sourceType') == "DATA_BASE_TABLE" else + f"_sys_bic.{src_col.get('source').replace('::', '/').lower()}" + if src_col.get('sourceType') == "TABLE_FUNCTION" else + f"_sys_bic.{src_col.get('source')[1:].replace('/calculationviews/', '/')}".lower(), + "upstream_column": src_col.get('column') + } + for src_col in src + ] + } + ) + + except Exception as e: + logger.error(e) + return column_dicts @@ -928,45 +945,49 @@ def get_calculation_view_lineage(self, root: ET, ns: Dict, dataset_path: str, da fine_grained_lineage: List[FineGrainedLineageClass] = [] - for column_lineage in SAPCalculationViewParser().format_column_dictionary( - view_name=f"{dataset_path}/{dataset_name}", - view_definition=root, - ): - if column_lineage.get("upstream"): - downstream_column = column_lineage.get("downstream_column") - upstream_columns: List[str] = [] - for column in column_lineage.get("upstream"): - upstream_columns.append( - make_schema_field_urn( - parent_urn=make_dataset_urn_with_platform_instance( - platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{column.get('upstream_table')}", - platform_instance=self.config.platform_instance, - env=self.config.env - ), - field_path=column.get("upstream_column"), - ) - ) - - fine_grained_lineage.append( - FineGrainedLineageClass( - upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, - downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, - upstreams=upstream_columns, - downstreams=[ + try: + for column_lineage in SAPCalculationViewParser().format_column_dictionary( + view_name=f"{dataset_path}/{dataset_name}", + view_definition=root, + ): + if column_lineage.get("upstream"): + downstream_column = column_lineage.get("downstream_column") + upstream_columns: List[str] = [] + for column in column_lineage.get("upstream"): + upstream_columns.append( make_schema_field_urn( parent_urn=make_dataset_urn_with_platform_instance( platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{column.get('upstream_table')}", platform_instance=self.config.platform_instance, env=self.config.env ), - field_path=downstream_column, + field_path=column.get("upstream_column"), ) - ], - confidenceScore=1.0, + ) + + fine_grained_lineage.append( + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + upstreams=upstream_columns, + downstreams=[ + make_schema_field_urn( + parent_urn=make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}_sys_bic.{dataset_path.lower()}/{dataset_name.lower()}", + platform_instance=self.config.platform_instance, + env=self.config.env + ), + field_path=downstream_column, + ) + ], + confidenceScore=1.0, + ) ) - ) + + except Exception as e: + logging.error(e) return UpstreamLineageClass( upstreams=upstream, From 17e6d5b14fdedc0847d8e9b36e91218941033818 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Sun, 15 Sep 2024 21:25:57 +0100 Subject: [PATCH 40/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 0fb784fe8ba70..eb0174c32c342 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -780,6 +780,8 @@ def _process_table(self, schema=schema, ) + subtype = SubTypesClass(["Table"]) + dataset_snapshot = MetadataChangeProposalWrapper.construct_many( entityUrn=entity, aspects=[ @@ -788,6 +790,7 @@ def _process_table(self, platform, browse_path, container, + subtype, ] ) From 54ce516fea5cbdc5cd6962f52d92ce73166b5941 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Sun, 15 Sep 2024 23:25:00 +0100 Subject: [PATCH 41/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index eb0174c32c342..863befca768b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -23,6 +23,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.usage.usage_common import BaseUsageConfig +from datahub.ingestion.api.source import SourceReport from datahub.metadata.schema_classes import ( DatasetLineageTypeClass, SchemaMetadataClass, @@ -1093,6 +1094,12 @@ def _process_calculation_view( self.report.report_workunit(mcp.as_workunit()) yield mcp.as_workunit() + def get_report(self) -> SourceReport: + return self.report + + def close(self) -> None: + pass + def _sql_dialect(platform: str) -> str: return "tsql" From 5fe26b3a2226856206be77fe036c3c8533d7c52a Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 16 Sep 2024 09:45:24 +0100 Subject: [PATCH 42/43] Update hana.py --- metadata-ingestion/src/datahub/ingestion/source/sql/hana.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 863befca768b4..bf1c9681ad526 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -927,9 +927,7 @@ def get_calculation_view_lineage(self, root: ET, ns: Dict, dataset_path: str, da upstreams.append(f"{column_object.get('schemaName')}.{column_object.get('columnObjectName')}") except Exception as e: - logging.warning( - f"No lineage found for Calculation View {dataset_path}/{dataset_name}. Parsing error: {e}" - ) + logging.error(e) if not upstreams: return None From 94ac539f1719e5a3cc507ef747b3da29e902d9b1 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Mon, 16 Sep 2024 10:54:17 +0100 Subject: [PATCH 43/43] Update hana.py --- .../src/datahub/ingestion/source/sql/hana.py | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index bf1c9681ad526..f5c8c4e7325f5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -889,22 +889,25 @@ def _process_view(self, if isinstance(row_item[0], str) and isinstance(row_item[1], str): upstream_tables.append(f"{row_item[0].lower()}.{row_item[1].lower()}") - self.aggregator.add_known_query_lineage( - KnownQueryLineageInfo( - query_text=view_definition, - upstreams=[ - make_dataset_urn_with_platform_instance( - platform=self.get_platform(), - name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row}", - platform_instance=self.config.platform_instance, - env=self.config.env - ) for row in upstream_tables - ], - downstream=entity, - query_type=QueryType.SELECT - ), - merge_lineage=True, - ) + try: + self.aggregator.add_known_query_lineage( + KnownQueryLineageInfo( + query_text=view_definition, + upstreams=[ + make_dataset_urn_with_platform_instance( + platform=self.get_platform(), + name=f"{(self.config.database.lower() + '.') if self.config.database else ''}{row}", + platform_instance=self.config.platform_instance, + env=self.config.env + ) for row in upstream_tables + ], + downstream=entity, + query_type=QueryType.SELECT + ), + merge_lineage=True, + ) + except Exception as e: + logging.error(e) for mcp in dataset_snapshot: self.report.report_workunit(mcp.as_workunit())