diff --git a/src/aws-util/Cargo.toml b/src/aws-util/Cargo.toml index 002f20e087766..56dcf397bc796 100644 --- a/src/aws-util/Cargo.toml +++ b/src/aws-util/Cargo.toml @@ -20,7 +20,7 @@ bytes = "1.3.0" bytesize = "1.1.0" http = "1.1.0" hyper-tls = "0.5.0" -mz-ore = { path = "../ore", default-features = false } +mz-ore = { path = "../ore", default-features = true } thiserror = "1.0.37" tokio = { version = "1.38.0", default-features = false, features = ["macros"] } uuid = { version = "1.7.0", features = ["v4"] } diff --git a/src/aws-util/src/s3_uploader.rs b/src/aws-util/src/s3_uploader.rs index a3e80c1956fbc..f574fdc7778f9 100644 --- a/src/aws-util/src/s3_uploader.rs +++ b/src/aws-util/src/s3_uploader.rs @@ -51,10 +51,6 @@ pub struct S3MultiPartUploader { upload_handles: Vec, i32), S3MultiPartUploadError>>>, } -/// The smallest allowable part number (inclusive). -/// -/// From -const AWS_S3_MIN_PART_COUNT: i32 = 1; /// The largest allowable part number (inclusive). /// /// From @@ -218,18 +214,12 @@ impl S3MultiPartUploader { } } - /// Method to finish the multi part upload. If the buffer is not empty, - /// it flushes the buffer first and then makes a call to `complete_multipart_upload`. + /// Finishes the multi part upload. + /// /// Returns the number of parts and number of bytes uploaded. pub async fn finish(mut self) -> Result { - if self.buffer.len() > 0 { - let remaining = self.buffer.split(); - self.upload_part_internal(remaining.freeze())?; - } - - if self.part_count < AWS_S3_MIN_PART_COUNT { - return Err(S3MultiPartUploadError::AtLeastMinPartNumber); - } + let remaining = self.buffer.split(); + self.upload_part_internal(remaining.freeze())?; let mut parts: Vec = Vec::with_capacity(self.upload_handles.len()); for handle in self.upload_handles { @@ -336,11 +326,6 @@ pub enum S3MultiPartUploadError { AWS_S3_MAX_PART_COUNT )] ExceedsMaxPartNumber, - #[error( - "multi-part upload should have at least {} part", - AWS_S3_MIN_PART_COUNT - )] - AtLeastMinPartNumber, #[error("multi-part upload will exceed configured file_size_limit: {} bytes", .0)] UploadExceedsMaxFileLimit(u64), #[error("{}", .0.display_with_causes())] @@ -503,7 +488,7 @@ mod tests { #[mz_ore::test(tokio::test(flavor = "multi_thread"))] #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux` - async fn multi_part_upload_error() -> Result<(), S3MultiPartUploadError> { + async fn multi_part_upload_no_data() -> Result<(), S3MultiPartUploadError> { let sdk_config = defaults().load().await; let (bucket, key) = match s3_bucket_key_for_test() { Some(tuple) => tuple, @@ -514,12 +499,20 @@ mod tests { let uploader = S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?; - // Calling finish without adding any data should error - let err = uploader.finish().await.unwrap_err(); - assert_eq!( - err.to_string(), - "multi-part upload should have at least 1 part" - ); + // Calling finish without adding any data should succeed. + uploader.finish().await.unwrap(); + + // The file should exist but have no content. + let s3_client = s3::new_client(&sdk_config); + let uploaded_object = s3_client + .get_object() + .bucket(bucket) + .key(key) + .send() + .await + .unwrap(); + + assert_eq!(uploaded_object.content_length(), Some(0)); Ok(()) } diff --git a/src/storage-operators/src/s3_oneshot_sink.rs b/src/storage-operators/src/s3_oneshot_sink.rs index f54a3a2ca7f52..92ba0950d538d 100644 --- a/src/storage-operators/src/s3_oneshot_sink.rs +++ b/src/storage-operators/src/s3_oneshot_sink.rs @@ -425,6 +425,23 @@ where // Map of an uploader per batch. let mut s3_uploaders: BTreeMap = BTreeMap::new(); + + // As a special case, the 0th worker always forces a file to be + // created for batch 0, even if it never sees any data for batch 0. + // This ensures that we always write at least one file to S3, even + // if the input is empty. See database-issue#8599. + if worker_id == 0 { + let mut uploader = T::new( + sdk_config.clone(), + connection_details.clone(), + &sink_id, + 0, + params.clone(), + )?; + uploader.force_new_file().await?; + s3_uploaders.insert(0, uploader); + } + let mut row_count = 0; let mut last_row = None; while let Some(event) = input_handle.next().await { @@ -580,6 +597,9 @@ trait CopyToS3Uploader: Sized { batch: u64, params: CopyToParameters, ) -> Result; + /// Force the start of a new file, even if no rows have yet been appended or + /// if the current file has not yet reached the configured `max_file_size`. + async fn force_new_file(&mut self) -> Result<(), anyhow::Error>; /// Append a row to the internal buffer, and optionally flush the buffer to S3. async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>; /// Flush the full remaining internal buffer to S3, and close all open resources. diff --git a/src/storage-operators/src/s3_oneshot_sink/parquet.rs b/src/storage-operators/src/s3_oneshot_sink/parquet.rs index 194e96f045fd2..34eb24a9a4d50 100644 --- a/src/storage-operators/src/s3_oneshot_sink/parquet.rs +++ b/src/storage-operators/src/s3_oneshot_sink/parquet.rs @@ -212,6 +212,11 @@ impl CopyToS3Uploader for ParquetUploader { } Ok(()) } + + async fn force_new_file(&mut self) -> Result<(), anyhow::Error> { + self.start_new_file().await?; + Ok(()) + } } impl ParquetUploader { diff --git a/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs b/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs index a99840d7978ab..59b6e668fa1fa 100644 --- a/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs +++ b/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs @@ -119,6 +119,10 @@ impl CopyToS3Uploader for PgCopyUploader { Err(e) => Err(e.into()), } } + + async fn force_new_file(&mut self) -> Result<(), anyhow::Error> { + self.start_new_file_upload().await + } } impl PgCopyUploader { diff --git a/test/testdrive/copy-to-s3-minio.td b/test/testdrive/copy-to-s3-minio.td index 671a9284a3c20..831b23ae8fdfa 100644 --- a/test/testdrive/copy-to-s3-minio.td +++ b/test/testdrive/copy-to-s3-minio.td @@ -157,6 +157,13 @@ contains:S3 bucket path is not empty HEADER = true ) +> COPY (SELECT 1 WHERE FALSE) TO 's3://copytos3/test/5' + WITH ( + AWS CONNECTION = aws_conn, + MAX FILE SIZE = "100MB", + FORMAT = 'csv' + ); + $ set-from-sql var=key-1 SELECT TO_CHAR(now(), 'YYYY-MM-DD') @@ -168,8 +175,10 @@ $ s3-verify-data bucket=copytos3 key=test/2 sort-rows=true 1 2 +# The double `a` here is a result of the header being written once per file. $ s3-verify-data bucket=copytos3 key=test/2_5 sort-rows=true a +a 1 $ s3-verify-data bucket=copytos3 key=test/3 sort-rows=true @@ -179,6 +188,9 @@ $ s3-verify-data bucket=copytos3 key=test/4_5 sort-rows=true array;int4;jsonb;timestamp {1,2};83647;`{"s":"ab``c"}`;2010-10-10 10:10:10 +# Ensure that at least one file is written even when the input is empty. +$ s3-verify-keys bucket=copytos3 prefix-path=test/5 key-pattern=^test/5/mz.*\.csv$ + # Copy a large amount of data in the background and check to see that the INCOMPLETE # sentinel object is written during the copy @@ -212,6 +224,13 @@ array;int4;jsonb;timestamp FORMAT = 'parquet' ); +> COPY (SELECT 1 WHERE FALSE) TO 's3://copytos3/parquet_test/4' + WITH ( + AWS CONNECTION = aws_conn, + MAX FILE SIZE = "100MB", + FORMAT = 'parquet' + ); + $ s3-verify-data bucket=copytos3 key=parquet_test/1/${key-1} sort-rows=true 1 2 @@ -223,6 +242,9 @@ $ s3-verify-data bucket=copytos3 key=parquet_test/2 sort-rows=true $ s3-verify-data bucket=copytos3 key=parquet_test/3 sort-rows=true {items: [1, 2], dimensions: 1} {items: [1, 2, , 4], dimensions: 2} false inf {"s":"abc"} 85907cb9ac9b4e3584b860dc69368aca 1 32767 2147483647 9223372036854775807 1234567890123456789012.4567890123456789 2010-10-10 10:10:10 2010-10-10T10:10:10 2010-10-10T08:10:10Z aaaa 5c7841414141 това е d182d0b5d0bad181d182 +# Ensure that at least one file is written even when the input is empty. +$ s3-verify-keys bucket=copytos3 prefix-path=parquet_test/4 key-pattern=^parquet_test/4/mz.*\.parquet$ + # Confirm that unimplemented types will early exit before writing to s3 ! COPY (SELECT '0 day'::interval) TO 's3://copytos3/parquet_test/5' WITH (