Skip to content

Commit

Permalink
Update hive.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Nov 13, 2024
1 parent 9454851 commit 4795cf2
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ def parse_storage_location(location: str) -> Optional[Tuple[StoragePlatform, str

return platform, path

except Exception as e:
logger.warning(f"Failed to parse storage location {location}: {e}")
except Exception as exp:
logger.warning(f"Failed to parse storage location {location}: {exp}")
return None

@staticmethod
Expand Down Expand Up @@ -259,10 +259,13 @@ def __init__(
config: HiveStorageLineageConfig,
env: str,
platform_instance: Optional[str] = None,
convert_urns_to_lowercase: bool = False,
):
self.config = config
self.env = env
self.platform_instance = platform_instance
if self.config.storage_platform_instance is None:
self.config.platform_instance = platform_instance
self.convert_urns_to_lowercase = convert_urns_to_lowercase
self.report = HiveStorageSourceReport()

def _make_dataset_platform_instance(
Expand Down Expand Up @@ -290,16 +293,29 @@ def _make_storage_dataset_urn(

storage_info = StoragePathParser.parse_storage_location(storage_location)
if not storage_info:
logger.debug(f"Could not parse storage location: {storage_location}")
return None

platform, path = storage_info
storage_urn = make_dataset_urn_with_platform_instance(
platform=StoragePathParser.get_platform_name(platform),
name=path,
env=self.env,
platform_instance=self.config.storage_platform_instance,
)
return storage_urn, StoragePathParser.get_platform_name(platform)
platform_name = StoragePathParser.get_platform_name(platform)

if self.convert_urns_to_lowercase:
platform_name = platform_name.lower()
path = path.lower()
if self.config.storage_platform_instance:
self.config.platform_instance = self.config.storage_platform_instance.lower()

try:
storage_urn = make_dataset_urn_with_platform_instance(
platform=platform_name,
name=path,
env=self.env,
platform_instance=self.config.storage_platform_instance,
)
return storage_urn, platform_name
except Exception as exp:
logger.error(f"Failed to create URN for {platform_name}:{path}: {exp}")
return None

def _get_fine_grained_lineages(
self,
Expand Down Expand Up @@ -526,8 +542,8 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw):
DatabricksPyhiveDialect.get_columns = dbapi_get_columns_patched
except ModuleNotFoundError:
pass
except Exception as e:
logger.warning(f"Failed to patch method due to {e}")
except Exception as exp:
logger.warning(f"Failed to patch method due to {exp}")


@reflection.cache # type: ignore
Expand Down Expand Up @@ -592,6 +608,7 @@ def __init__(self, config, ctx):
config=config.storage_lineage,
env=config.env,
platform_instance=config.platform_instance,
convert_urns_to_lowercase=config.convert_urns_to_lowercase,
)

@classmethod
Expand All @@ -612,8 +629,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
dataset_urn = wu.get_urn()
dataset_props = wu.get_aspect_of_type(DatasetPropertiesClass)
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
except Exception as e:
logger.warning(f"Failed to process workunit {wu.id}: {e}")
except Exception as exp:
logger.warning(f"Failed to process workunit {wu.id}: {exp}")
continue

# Only proceed if we have the necessary properties
Expand Down

0 comments on commit 4795cf2

Please sign in to comment.