From 54f7985dbf4413caf4c3132e4effd77f9433b9fc Mon Sep 17 00:00:00 2001 From: Abdallah Serghine <76706155+KylixSerg@users.noreply.github.com> Date: Sat, 14 Dec 2024 11:40:23 +0100 Subject: [PATCH] ISSUE-16094: fix s3 storage parquet structureFormat ingestion (#18660) This aims at fixing the s3 ingestion for parquet files, current behaviour is that the pipeline will break if it encounters a file that is not valid parquet in the the container, this is not great as containers might container non parquet files on purpose like for example _SUCCESS files created by spark. For that do not fail the whole pipeline when a single container fails, instead count it as a failure and move on with the remainder of the containers, this is already an improvement by ideally the ingestion should try a couple more files under the given prefix before given up, additionally we can allow users to specify file patterns to be ignored. Co-authored-by: Abdallah Serghine Co-authored-by: Pere Miquel Brull --- .../ingestion/source/storage/s3/metadata.py | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index 9949f6163301..6220433d72b5 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -294,15 +294,26 @@ def _generate_container_details( ) # if we have a sample file to fetch a schema from if sample_key: - columns = self._get_columns( - container_name=bucket_name, - sample_key=sample_key, - metadata_entry=metadata_entry, - config_source=S3Config( - securityConfig=self.service_connection.awsConfig - ), - client=self.s3_client, - ) + try: + columns = self._get_columns( + container_name=bucket_name, + sample_key=sample_key, + metadata_entry=metadata_entry, + config_source=S3Config( + securityConfig=self.service_connection.awsConfig + ), + client=self.s3_client, + ) + except Exception as err: + logger.warning() + self.status.failed( + error=StackTraceError( + name=f"{bucket_name}/{sample_key}", + error=f"Error extracting columns from [{bucket_name}/{sample_key}] due to: [{err}]", + stackTrace=traceback.format_exc(), + ) + ) + return None if columns: prefix = ( f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}" @@ -460,7 +471,7 @@ def _yield_nested_unstructured_containers( candidate_keys = [ entry["Key"] for entry in response[S3_CLIENT_ROOT_RESPONSE] - if entry and entry.get("Key") + if entry and entry.get("Key") and not entry.get("Key").endswith("/") ] for key in candidate_keys: if self.is_valid_unstructured_file(metadata_entry.unstructuredFormats, key): @@ -669,7 +680,7 @@ def _get_sample_file_path( candidate_keys = [ entry["Key"] for entry in response[S3_CLIENT_ROOT_RESPONSE] - if entry and entry.get("Key") + if entry and entry.get("Key") and not entry.get("Key").endswith("/") ] # pick a random key out of the candidates if any were returned if candidate_keys: