diff --git a/awswrangler/s3/_fs.py b/awswrangler/s3/_fs.py index 72f1a7ae3..4abcb31f4 100644 --- a/awswrangler/s3/_fs.py +++ b/awswrangler/s3/_fs.py @@ -406,39 +406,44 @@ def flush(self, force: bool = False) -> None: return None if total_size == 0: return None - _logger.debug("Flushing: %s bytes", total_size) - self._mpu = self._mpu or _utils.try_it( - f=self._client.create_multipart_upload, # type: ignore[arg-type] - ex=_S3_RETRYABLE_ERRORS, - base=0.5, - max_num_tries=6, - Bucket=self._bucket, - Key=self._key, - **get_botocore_valid_kwargs( - function_name="create_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs - ), - ) - self._buffer.seek(0) - for chunk_size in _utils.get_even_chunks_sizes( - total_size=total_size, chunk_size=_MIN_WRITE_BLOCK, upper_bound=False - ): - _logger.debug("chunk_size: %s bytes", chunk_size) - self._parts_count += 1 - self._upload_proxy.upload( - bucket=self._bucket, - key=self._key, - part=self._parts_count, - upload_id=self._mpu["UploadId"], - data=self._buffer.read(chunk_size), - s3_client=self._client, - boto3_kwargs=get_botocore_valid_kwargs( - function_name="upload_part", s3_additional_kwargs=self._s3_additional_kwargs + + try: + _logger.debug("Flushing: %s bytes", total_size) + self._mpu = self._mpu or _utils.try_it( + f=self._client.create_multipart_upload, # type: ignore[arg-type] + ex=_S3_RETRYABLE_ERRORS, + base=0.5, + max_num_tries=6, + Bucket=self._bucket, + Key=self._key, + **get_botocore_valid_kwargs( + function_name="create_multipart_upload", s3_additional_kwargs=self._s3_additional_kwargs ), ) - self._buffer.seek(0) - self._buffer.truncate(0) - self._buffer.close() - self._buffer = io.BytesIO() + self._buffer.seek(0) + for chunk_size in _utils.get_even_chunks_sizes( + total_size=total_size, chunk_size=_MIN_WRITE_BLOCK, upper_bound=False + ): + _logger.debug("chunk_size: %s bytes", chunk_size) + self._parts_count += 1 + self._upload_proxy.upload( + bucket=self._bucket, + key=self._key, + part=self._parts_count, + upload_id=self._mpu["UploadId"], + data=self._buffer.read(chunk_size), + s3_client=self._client, + boto3_kwargs=get_botocore_valid_kwargs( + function_name="upload_part", s3_additional_kwargs=self._s3_additional_kwargs + ), + ) + finally: + # Ensure that the buffer is cleared (even in the event of an exception) so that + # any partial data doesn't get written when close() is called. + self._buffer.seek(0) + self._buffer.truncate(0) + self._buffer.close() + self._buffer = io.BytesIO() return None def readable(self) -> bool: