From 4795cf21718a7f42ffec92fb761e94cbaa2872af Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Wed, 13 Nov 2024 15:27:27 +0000 Subject: [PATCH] Update hive.py --- .../src/datahub/ingestion/source/sql/hive.py | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 4495e0ed323ba..1d57188153bb4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -173,8 +173,8 @@ def parse_storage_location(location: str) -> Optional[Tuple[StoragePlatform, str return platform, path - except Exception as e: - logger.warning(f"Failed to parse storage location {location}: {e}") + except Exception as exp: + logger.warning(f"Failed to parse storage location {location}: {exp}") return None @staticmethod @@ -259,10 +259,13 @@ def __init__( config: HiveStorageLineageConfig, env: str, platform_instance: Optional[str] = None, + convert_urns_to_lowercase: bool = False, ): self.config = config self.env = env - self.platform_instance = platform_instance + if self.config.storage_platform_instance is None: + self.config.platform_instance = platform_instance + self.convert_urns_to_lowercase = convert_urns_to_lowercase self.report = HiveStorageSourceReport() def _make_dataset_platform_instance( @@ -290,16 +293,29 @@ def _make_storage_dataset_urn( storage_info = StoragePathParser.parse_storage_location(storage_location) if not storage_info: + logger.debug(f"Could not parse storage location: {storage_location}") return None platform, path = storage_info - storage_urn = make_dataset_urn_with_platform_instance( - platform=StoragePathParser.get_platform_name(platform), - name=path, - env=self.env, - platform_instance=self.config.storage_platform_instance, - ) - return storage_urn, StoragePathParser.get_platform_name(platform) + platform_name = StoragePathParser.get_platform_name(platform) + + if self.convert_urns_to_lowercase: + platform_name = platform_name.lower() + path = path.lower() + if self.config.storage_platform_instance: + self.config.platform_instance = self.config.storage_platform_instance.lower() + + try: + storage_urn = make_dataset_urn_with_platform_instance( + platform=platform_name, + name=path, + env=self.env, + platform_instance=self.config.storage_platform_instance, + ) + return storage_urn, platform_name + except Exception as exp: + logger.error(f"Failed to create URN for {platform_name}:{path}: {exp}") + return None def _get_fine_grained_lineages( self, @@ -526,8 +542,8 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw): DatabricksPyhiveDialect.get_columns = dbapi_get_columns_patched except ModuleNotFoundError: pass -except Exception as e: - logger.warning(f"Failed to patch method due to {e}") +except Exception as exp: + logger.warning(f"Failed to patch method due to {exp}") @reflection.cache # type: ignore @@ -592,6 +608,7 @@ def __init__(self, config, ctx): config=config.storage_lineage, env=config.env, platform_instance=config.platform_instance, + convert_urns_to_lowercase=config.convert_urns_to_lowercase, ) @classmethod @@ -612,8 +629,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: dataset_urn = wu.get_urn() dataset_props = wu.get_aspect_of_type(DatasetPropertiesClass) schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) - except Exception as e: - logger.warning(f"Failed to process workunit {wu.id}: {e}") + except Exception as exp: + logger.warning(f"Failed to process workunit {wu.id}: {exp}") continue # Only proceed if we have the necessary properties