Skip to content

Commit

Permalink
arroyo-worker: refactor FilenamingStrategy behavior to share code. (#409
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Jackson Newhouse authored Nov 14, 2023
1 parent 571da91 commit a47e63e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 51 deletions.
59 changes: 23 additions & 36 deletions arroyo-worker/src/connectors/filesystem/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::{
use anyhow::{bail, Result};

use super::{
delta, get_partitioner_from_table, CommitState, CommitStyle, FileSystemTable, FilenameStrategy,
Filenaming, MultiPartWriterStats, RollingPolicy,
add_suffix_prefix, delta, get_partitioner_from_table, CommitState, CommitStyle,
FileSystemTable, FilenameStrategy, Filenaming, MultiPartWriterStats, RollingPolicy,
};

pub struct LocalFileSystemWriter<K: Key, D: Data + Sync, V: LocalWriter<D>> {
Expand Down Expand Up @@ -58,12 +58,15 @@ impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D>> LocalFileSystemWrite
CommitStyle::Direct => CommitState::VanillaParquet,
};

let filenaming = table_properties
let mut filenaming = table_properties
.file_settings
.clone()
.unwrap()
.filenaming
.unwrap();
if filenaming.suffix.is_none() {
filenaming.suffix = Some(V::file_suffix().to_string());
}

Self {
writers: HashMap::new(),
Expand All @@ -90,49 +93,33 @@ impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D>> LocalFileSystemWrite
None => FilenameStrategy::Serial,
};

// This allows us to override the file suffix (extension)
let file_suffix = if self.filenaming.suffix.is_some() {
self.filenaming.suffix.as_ref().unwrap()
} else {
V::file_suffix()
};

// This forms the base for naming files depending on strategy
let filename_base = if filename_strategy == FilenameStrategy::Uuid {
Uuid::new_v4().to_string()
} else {
format!("{:>05}-{:>03}", self.next_file_index, self.subtask_id)
};

// This allows us to manipulate the filename_base
let filename_core = if self.filenaming.prefix.is_some() {
format!(
"{}-{}",
self.filenaming.prefix.as_ref().unwrap(),
filename_base
)
} else {
filename_base
};
if !self.writers.contains_key(partition) {
let file_name = match partition {
// This forms the base for naming files depending on strategy
let filename_base = if filename_strategy == FilenameStrategy::Uuid {
Uuid::new_v4().to_string()
} else {
format!("{:>05}-{:>03}", self.next_file_index, self.subtask_id)
};
let filename = add_suffix_prefix(
filename_base,
self.filenaming.prefix.as_ref(),
self.filenaming.suffix.as_ref().unwrap(),
);

let filename = match partition {
Some(partition) => {
// make sure the partition directory exists in tmp and final
create_dir_all(&format!("{}/{}", self.tmp_dir, partition)).unwrap();
create_dir_all(&format!("{}/{}", self.final_dir, partition)).unwrap();
if filename_strategy == FilenameStrategy::Uuid {
format!("{}/{}.{}", partition, filename_core, file_suffix)
} else {
format!("{}/{}.{}", partition, filename_core, file_suffix)
}
format!("{}/{}", partition, filename)
}
None => format!("{}.{}", filename_core, V::file_suffix()),
None => filename,
};
self.writers.insert(
partition.clone(),
V::new(
format!("{}/{}", self.tmp_dir, file_name),
format!("{}/{}", self.final_dir, file_name),
format!("{}/{}", self.tmp_dir, filename),
format!("{}/{}", self.final_dir, filename),
&self.table_properties,
),
);
Expand Down
43 changes: 28 additions & 15 deletions arroyo-worker/src/connectors/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ pub trait MultiPartWriter {

fn name(&self) -> String;

fn suffix() -> String;

fn partition(&self) -> Option<String>;

async fn insert_value(
Expand Down Expand Up @@ -637,13 +639,16 @@ where
CommitStyle::DeltaLake => CommitState::DeltaLake { last_version: -1 },
CommitStyle::Direct => CommitState::VanillaParquet,
};
let filenaming = writer_properties
let mut filenaming = writer_properties
.file_settings
.as_ref()
.unwrap()
.filenaming
.clone()
.unwrap();
if filenaming.suffix.is_none() {
filenaming.suffix = Some(R::suffix());
}
Self {
path,
active_writers: HashMap::new(),
Expand Down Expand Up @@ -776,21 +781,15 @@ where
} else {
format!("{:>05}-{:>03}", self.max_file_index, self.subtask_id)
};

// This allows us to manipulate the filename_base
let filename_core = if self.filenaming.prefix.is_some() {
format!(
"{}-{}",
self.filenaming.prefix.as_ref().unwrap(),
filename_base
)
} else {
filename_base
};
let filename = add_suffix_prefix(
filename_base,
self.filenaming.prefix.as_ref(),
self.filenaming.suffix.as_ref().unwrap(),
);

let path = match partition {
Some(sub_bucket) => format!("{}/{}/{}", self.path, sub_bucket, filename_core),
None => format!("{}/{}", self.path, filename_core),
Some(sub_bucket) => format!("{}/{}/{}", self.path, sub_bucket, filename),
None => format!("{}/{}", self.path, filename),
};
R::new(
self.object_store.clone(),
Expand Down Expand Up @@ -1319,7 +1318,6 @@ impl<BB: BatchBuilder, BBW: BatchBufferingWriter<BatchData = BB::BatchData>> Mul
) -> Self {
let batch_builder = BB::new(config);
let batch_buffering_writer = BBW::new(config);
let path = format!("{}.{}", path, BBW::suffix()).into();
Self {
batch_builder,
batch_buffering_writer,
Expand All @@ -1332,6 +1330,10 @@ impl<BB: BatchBuilder, BBW: BatchBufferingWriter<BatchData = BB::BatchData>> Mul
self.multipart_manager.name()
}

fn suffix() -> String {
BBW::suffix()
}

fn partition(&self) -> Option<String> {
self.multipart_manager.partition.clone()
}
Expand Down Expand Up @@ -1641,3 +1643,14 @@ impl<K: Key, T: Data + Sync, R: MultiPartWriter<InputType = T> + Send + 'static>
bail!("checkpoint receiver closed unexpectedly")
}
}

pub(crate) fn add_suffix_prefix(
filename: String,
prefix: Option<&String>,
suffix: &String,
) -> String {
match prefix {
None => format!("{}.{}", filename, suffix),
Some(prefix) => format!("{}-{}.{}", prefix, filename, suffix),
}
}

0 comments on commit a47e63e

Please sign in to comment.