Skip to content

Commit

Permalink
ISSUE-16094: fix s3 storage parquet structureFormat ingestion (#18660)
Browse files Browse the repository at this point in the history
This aims at fixing the s3 ingestion for parquet files, current behaviour is that
the pipeline will break if it encounters a file that is not valid parquet in the
the container, this is not great as containers might container non parquet files
on purpose like for example _SUCCESS files created by spark.

For that do not fail the whole pipeline when a single container fails, instead
count it as a failure and move on with the remainder of the containers, this is
already an improvement by ideally the ingestion should try a couple more files
under the given prefix before given up, additionally we can allow users to specify
file patterns to be ignored.

Co-authored-by: Abdallah Serghine <[email protected]>
Co-authored-by: Pere Miquel Brull <[email protected]>
  • Loading branch information
3 people authored and ulixius9 committed Jan 22, 2025
1 parent ff9f906 commit 54f7985
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions ingestion/src/metadata/ingestion/source/storage/s3/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,26 @@ def _generate_container_details(
)
# if we have a sample file to fetch a schema from
if sample_key:
columns = self._get_columns(
container_name=bucket_name,
sample_key=sample_key,
metadata_entry=metadata_entry,
config_source=S3Config(
securityConfig=self.service_connection.awsConfig
),
client=self.s3_client,
)
try:
columns = self._get_columns(
container_name=bucket_name,
sample_key=sample_key,
metadata_entry=metadata_entry,
config_source=S3Config(
securityConfig=self.service_connection.awsConfig
),
client=self.s3_client,
)
except Exception as err:
logger.warning()
self.status.failed(
error=StackTraceError(
name=f"{bucket_name}/{sample_key}",
error=f"Error extracting columns from [{bucket_name}/{sample_key}] due to: [{err}]",
stackTrace=traceback.format_exc(),
)
)
return None
if columns:
prefix = (
f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
Expand Down Expand Up @@ -460,7 +471,7 @@ def _yield_nested_unstructured_containers(
candidate_keys = [
entry["Key"]
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry and entry.get("Key")
if entry and entry.get("Key") and not entry.get("Key").endswith("/")
]
for key in candidate_keys:
if self.is_valid_unstructured_file(metadata_entry.unstructuredFormats, key):
Expand Down Expand Up @@ -669,7 +680,7 @@ def _get_sample_file_path(
candidate_keys = [
entry["Key"]
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry and entry.get("Key")
if entry and entry.get("Key") and not entry.get("Key").endswith("/")
]
# pick a random key out of the candidates if any were returned
if candidate_keys:
Expand Down

0 comments on commit 54f7985

Please sign in to comment.