diff --git a/arroyo-worker/src/connectors/filesystem/local.rs b/arroyo-worker/src/connectors/filesystem/local.rs index c244abcad..6ecc2d6bd 100644 --- a/arroyo-worker/src/connectors/filesystem/local.rs +++ b/arroyo-worker/src/connectors/filesystem/local.rs @@ -20,8 +20,8 @@ use crate::{ use anyhow::{bail, Result}; use super::{ - delta, get_partitioner_from_table, CommitState, CommitStyle, FileSystemTable, FilenameStrategy, - Filenaming, MultiPartWriterStats, RollingPolicy, + add_suffix_prefix, delta, get_partitioner_from_table, CommitState, CommitStyle, + FileSystemTable, FilenameStrategy, Filenaming, MultiPartWriterStats, RollingPolicy, }; pub struct LocalFileSystemWriter> { @@ -58,12 +58,15 @@ impl> LocalFileSystemWrite CommitStyle::Direct => CommitState::VanillaParquet, }; - let filenaming = table_properties + let mut filenaming = table_properties .file_settings .clone() .unwrap() .filenaming .unwrap(); + if filenaming.suffix.is_none() { + filenaming.suffix = Some(V::file_suffix().to_string()); + } Self { writers: HashMap::new(), @@ -90,49 +93,33 @@ impl> LocalFileSystemWrite None => FilenameStrategy::Serial, }; - // This allows us to override the file suffix (extension) - let file_suffix = if self.filenaming.suffix.is_some() { - self.filenaming.suffix.as_ref().unwrap() - } else { - V::file_suffix() - }; - - // This forms the base for naming files depending on strategy - let filename_base = if filename_strategy == FilenameStrategy::Uuid { - Uuid::new_v4().to_string() - } else { - format!("{:>05}-{:>03}", self.next_file_index, self.subtask_id) - }; - - // This allows us to manipulate the filename_base - let filename_core = if self.filenaming.prefix.is_some() { - format!( - "{}-{}", - self.filenaming.prefix.as_ref().unwrap(), - filename_base - ) - } else { - filename_base - }; if !self.writers.contains_key(partition) { - let file_name = match partition { + // This forms the base for naming files depending on strategy + let filename_base = if filename_strategy == FilenameStrategy::Uuid { + Uuid::new_v4().to_string() + } else { + format!("{:>05}-{:>03}", self.next_file_index, self.subtask_id) + }; + let filename = add_suffix_prefix( + filename_base, + self.filenaming.prefix.as_ref(), + self.filenaming.suffix.as_ref().unwrap(), + ); + + let filename = match partition { Some(partition) => { // make sure the partition directory exists in tmp and final create_dir_all(&format!("{}/{}", self.tmp_dir, partition)).unwrap(); create_dir_all(&format!("{}/{}", self.final_dir, partition)).unwrap(); - if filename_strategy == FilenameStrategy::Uuid { - format!("{}/{}.{}", partition, filename_core, file_suffix) - } else { - format!("{}/{}.{}", partition, filename_core, file_suffix) - } + format!("{}/{}", partition, filename) } - None => format!("{}.{}", filename_core, V::file_suffix()), + None => filename, }; self.writers.insert( partition.clone(), V::new( - format!("{}/{}", self.tmp_dir, file_name), - format!("{}/{}", self.final_dir, file_name), + format!("{}/{}", self.tmp_dir, filename), + format!("{}/{}", self.final_dir, filename), &self.table_properties, ), ); diff --git a/arroyo-worker/src/connectors/filesystem/mod.rs b/arroyo-worker/src/connectors/filesystem/mod.rs index 69daa53a7..e5c443f36 100644 --- a/arroyo-worker/src/connectors/filesystem/mod.rs +++ b/arroyo-worker/src/connectors/filesystem/mod.rs @@ -377,6 +377,8 @@ pub trait MultiPartWriter { fn name(&self) -> String; + fn suffix() -> String; + fn partition(&self) -> Option; async fn insert_value( @@ -637,13 +639,16 @@ where CommitStyle::DeltaLake => CommitState::DeltaLake { last_version: -1 }, CommitStyle::Direct => CommitState::VanillaParquet, }; - let filenaming = writer_properties + let mut filenaming = writer_properties .file_settings .as_ref() .unwrap() .filenaming .clone() .unwrap(); + if filenaming.suffix.is_none() { + filenaming.suffix = Some(R::suffix()); + } Self { path, active_writers: HashMap::new(), @@ -776,21 +781,15 @@ where } else { format!("{:>05}-{:>03}", self.max_file_index, self.subtask_id) }; - - // This allows us to manipulate the filename_base - let filename_core = if self.filenaming.prefix.is_some() { - format!( - "{}-{}", - self.filenaming.prefix.as_ref().unwrap(), - filename_base - ) - } else { - filename_base - }; + let filename = add_suffix_prefix( + filename_base, + self.filenaming.prefix.as_ref(), + self.filenaming.suffix.as_ref().unwrap(), + ); let path = match partition { - Some(sub_bucket) => format!("{}/{}/{}", self.path, sub_bucket, filename_core), - None => format!("{}/{}", self.path, filename_core), + Some(sub_bucket) => format!("{}/{}/{}", self.path, sub_bucket, filename), + None => format!("{}/{}", self.path, filename), }; R::new( self.object_store.clone(), @@ -1319,7 +1318,6 @@ impl> Mul ) -> Self { let batch_builder = BB::new(config); let batch_buffering_writer = BBW::new(config); - let path = format!("{}.{}", path, BBW::suffix()).into(); Self { batch_builder, batch_buffering_writer, @@ -1332,6 +1330,10 @@ impl> Mul self.multipart_manager.name() } + fn suffix() -> String { + BBW::suffix() + } + fn partition(&self) -> Option { self.multipart_manager.partition.clone() } @@ -1641,3 +1643,14 @@ impl + Send + 'static> bail!("checkpoint receiver closed unexpectedly") } } + +pub(crate) fn add_suffix_prefix( + filename: String, + prefix: Option<&String>, + suffix: &String, +) -> String { + match prefix { + None => format!("{}.{}", filename, suffix), + Some(prefix) => format!("{}-{}.{}", prefix, filename, suffix), + } +}