Skip to content

Commit

Permalink
Reapply "storage/copy-to-s3: emit empty file even if input is empty"
Browse files Browse the repository at this point in the history
This reverts commit b1b2c28.
  • Loading branch information
benesch committed Dec 16, 2024
1 parent 79d8655 commit 9a6ef72
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/aws-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ bytes = "1.3.0"
bytesize = "1.1.0"
http = "1.1.0"
hyper-tls = "0.5.0"
mz-ore = { path = "../ore", default-features = false }
mz-ore = { path = "../ore", default-features = true }
thiserror = "1.0.37"
tokio = { version = "1.38.0", default-features = false, features = ["macros"] }
uuid = { version = "1.7.0", features = ["v4"] }
Expand Down
45 changes: 19 additions & 26 deletions src/aws-util/src/s3_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ pub struct S3MultiPartUploader {
upload_handles: Vec<JoinHandle<Result<(Option<String>, i32), S3MultiPartUploadError>>>,
}

/// The smallest allowable part number (inclusive).
///
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
const AWS_S3_MIN_PART_COUNT: i32 = 1;
/// The largest allowable part number (inclusive).
///
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
Expand Down Expand Up @@ -218,18 +214,12 @@ impl S3MultiPartUploader {
}
}

/// Method to finish the multi part upload. If the buffer is not empty,
/// it flushes the buffer first and then makes a call to `complete_multipart_upload`.
/// Finishes the multi part upload.
///
/// Returns the number of parts and number of bytes uploaded.
pub async fn finish(mut self) -> Result<CompletedUpload, S3MultiPartUploadError> {
if self.buffer.len() > 0 {
let remaining = self.buffer.split();
self.upload_part_internal(remaining.freeze())?;
}

if self.part_count < AWS_S3_MIN_PART_COUNT {
return Err(S3MultiPartUploadError::AtLeastMinPartNumber);
}
let remaining = self.buffer.split();
self.upload_part_internal(remaining.freeze())?;

let mut parts: Vec<CompletedPart> = Vec::with_capacity(self.upload_handles.len());
for handle in self.upload_handles {
Expand Down Expand Up @@ -336,11 +326,6 @@ pub enum S3MultiPartUploadError {
AWS_S3_MAX_PART_COUNT
)]
ExceedsMaxPartNumber,
#[error(
"multi-part upload should have at least {} part",
AWS_S3_MIN_PART_COUNT
)]
AtLeastMinPartNumber,
#[error("multi-part upload will exceed configured file_size_limit: {} bytes", .0)]
UploadExceedsMaxFileLimit(u64),
#[error("{}", .0.display_with_causes())]
Expand Down Expand Up @@ -503,7 +488,7 @@ mod tests {
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
#[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
async fn multi_part_upload_error() -> Result<(), S3MultiPartUploadError> {
async fn multi_part_upload_no_data() -> Result<(), S3MultiPartUploadError> {
let sdk_config = defaults().load().await;
let (bucket, key) = match s3_bucket_key_for_test() {
Some(tuple) => tuple,
Expand All @@ -514,12 +499,20 @@ mod tests {
let uploader =
S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;

// Calling finish without adding any data should error
let err = uploader.finish().await.unwrap_err();
assert_eq!(
err.to_string(),
"multi-part upload should have at least 1 part"
);
// Calling finish without adding any data should succeed.
uploader.finish().await.unwrap();

// The file should exist but have no content.
let s3_client = s3::new_client(&sdk_config);
let uploaded_object = s3_client
.get_object()
.bucket(bucket)
.key(key)
.send()
.await
.unwrap();

assert_eq!(uploaded_object.content_length(), Some(0));

Ok(())
}
Expand Down
20 changes: 20 additions & 0 deletions src/storage-operators/src/s3_oneshot_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,23 @@ where

// Map of an uploader per batch.
let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();

// As a special case, the 0th worker always forces a file to be
// created for batch 0, even if it never sees any data for batch 0.
// This ensures that we always write at least one file to S3, even
// if the input is empty. See database-issue#8599.
if worker_id == 0 {
let mut uploader = T::new(
sdk_config.clone(),
connection_details.clone(),
&sink_id,
0,
params.clone(),
)?;
uploader.force_new_file().await?;
s3_uploaders.insert(0, uploader);
}

let mut row_count = 0;
let mut last_row = None;
while let Some(event) = input_handle.next().await {
Expand Down Expand Up @@ -580,6 +597,9 @@ trait CopyToS3Uploader: Sized {
batch: u64,
params: CopyToParameters,
) -> Result<Self, anyhow::Error>;
/// Force the start of a new file, even if no rows have yet been appended or
/// if the current file has not yet reached the configured `max_file_size`.
async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
/// Append a row to the internal buffer, and optionally flush the buffer to S3.
async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
/// Flush the full remaining internal buffer to S3, and close all open resources.
Expand Down
5 changes: 5 additions & 0 deletions src/storage-operators/src/s3_oneshot_sink/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ impl CopyToS3Uploader for ParquetUploader {
}
Ok(())
}

async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
self.start_new_file().await?;
Ok(())
}
}

impl ParquetUploader {
Expand Down
4 changes: 4 additions & 0 deletions src/storage-operators/src/s3_oneshot_sink/pgcopy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl CopyToS3Uploader for PgCopyUploader {
Err(e) => Err(e.into()),
}
}

async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
self.start_new_file_upload().await
}
}

impl PgCopyUploader {
Expand Down
22 changes: 22 additions & 0 deletions test/testdrive/copy-to-s3-minio.td
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ contains:S3 bucket path is not empty
HEADER = true
)

> COPY (SELECT 1 WHERE FALSE) TO 's3://copytos3/test/5'
WITH (
AWS CONNECTION = aws_conn,
MAX FILE SIZE = "100MB",
FORMAT = 'csv'
);

$ set-from-sql var=key-1
SELECT TO_CHAR(now(), 'YYYY-MM-DD')

Expand All @@ -168,8 +175,10 @@ $ s3-verify-data bucket=copytos3 key=test/2 sort-rows=true
1
2

# The double `a` here is a result of the header being written once per file.
$ s3-verify-data bucket=copytos3 key=test/2_5 sort-rows=true
a
a
1

$ s3-verify-data bucket=copytos3 key=test/3 sort-rows=true
Expand All @@ -179,6 +188,9 @@ $ s3-verify-data bucket=copytos3 key=test/4_5 sort-rows=true
array;int4;jsonb;timestamp
{1,2};83647;`{"s":"ab``c"}`;2010-10-10 10:10:10

# Ensure that at least one file is written even when the input is empty.
$ s3-verify-keys bucket=copytos3 prefix-path=test/5 key-pattern=^test/5/mz.*\.csv$

# Copy a large amount of data in the background and check to see that the INCOMPLETE
# sentinel object is written during the copy

Expand Down Expand Up @@ -212,6 +224,13 @@ array;int4;jsonb;timestamp
FORMAT = 'parquet'
);

> COPY (SELECT 1 WHERE FALSE) TO 's3://copytos3/parquet_test/4'
WITH (
AWS CONNECTION = aws_conn,
MAX FILE SIZE = "100MB",
FORMAT = 'parquet'
);

$ s3-verify-data bucket=copytos3 key=parquet_test/1/${key-1} sort-rows=true
1
2
Expand All @@ -223,6 +242,9 @@ $ s3-verify-data bucket=copytos3 key=parquet_test/2 sort-rows=true
$ s3-verify-data bucket=copytos3 key=parquet_test/3 sort-rows=true
{items: [1, 2], dimensions: 1} {items: [1, 2, , 4], dimensions: 2} false inf {"s":"abc"} 85907cb9ac9b4e3584b860dc69368aca 1 32767 2147483647 9223372036854775807 1234567890123456789012.4567890123456789 2010-10-10 10:10:10 2010-10-10T10:10:10 2010-10-10T08:10:10Z aaaa 5c7841414141 това е d182d0b5d0bad181d182

# Ensure that at least one file is written even when the input is empty.
$ s3-verify-keys bucket=copytos3 prefix-path=parquet_test/4 key-pattern=^parquet_test/4/mz.*\.parquet$

# Confirm that unimplemented types will early exit before writing to s3
! COPY (SELECT '0 day'::interval) TO 's3://copytos3/parquet_test/5'
WITH (
Expand Down

0 comments on commit 9a6ef72

Please sign in to comment.