Skip to content

Commit

Permalink
add: filesystem write configurations (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshit2283 authored Nov 14, 2023
1 parent a64a894 commit 0eb6114
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 23 deletions.
20 changes: 20 additions & 0 deletions arroyo-connectors/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ pub fn file_system_table_from_options(
let rollover_seconds = pull_option_to_i64("rollover_seconds", opts)?;
let target_file_size = pull_option_to_i64("target_file_size", opts)?;
let target_part_size = pull_option_to_i64("target_part_size", opts)?;
let prefix = opts.remove("filename.prefix");
let suffix = opts.remove("filename.suffix");
let strategy = opts
.remove("filename.strategy")
.map(|value| {
FilenameStrategy::try_from(&value)
.map_err(|_err| anyhow!("{} is not a valid Filenaming Strategy", value))
})
.transpose()?;

let partition_fields: Vec<_> = opts
.remove("partition_fields")
Expand All @@ -192,6 +201,16 @@ pub fn file_system_table_from_options(
None
};

let filenaming = if prefix.is_some() || suffix.is_some() || strategy.is_some() {
Some(Filenaming {
prefix,
suffix,
strategy,
})
} else {
None
};

let file_settings = Some(FileSettings {
inactivity_rollover_seconds,
max_parts,
Expand All @@ -200,6 +219,7 @@ pub fn file_system_table_from_options(
target_part_size,
partitioning,
commit_style: Some(commit_style),
filenaming,
});

let format_settings = match schema
Expand Down
63 changes: 48 additions & 15 deletions arroyo-worker/src/connectors/filesystem/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bincode::{Decode, Encode};
use serde::Serialize;
use tokio::{fs::OpenOptions, io::AsyncWriteExt};
use tracing::info;
use uuid::Uuid;

use crate::{
connectors::{filesystem::FinishedFile, two_phase_committer::TwoPhaseCommitter},
Expand All @@ -19,8 +20,8 @@ use crate::{
use anyhow::{bail, Result};

use super::{
delta, get_partitioner_from_table, CommitState, CommitStyle, FileSystemTable,
MultiPartWriterStats, RollingPolicy,
delta, get_partitioner_from_table, CommitState, CommitStyle, FileSystemTable, FilenameStrategy,
Filenaming, MultiPartWriterStats, RollingPolicy,
};

pub struct LocalFileSystemWriter<K: Key, D: Data + Sync, V: LocalWriter<D>> {
Expand All @@ -36,6 +37,7 @@ pub struct LocalFileSystemWriter<K: Key, D: Data + Sync, V: LocalWriter<D>> {
table_properties: FileSystemTable,
commit_state: CommitState,
phantom: PhantomData<(K, D)>,
filenaming: Filenaming,
}

impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D>> LocalFileSystemWriter<K, D, V> {
Expand All @@ -56,6 +58,13 @@ impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D>> LocalFileSystemWrite
CommitStyle::Direct => CommitState::VanillaParquet,
};

let filenaming = table_properties
.file_settings
.clone()
.unwrap()
.filenaming
.unwrap();

Self {
writers: HashMap::new(),
tmp_dir,
Expand All @@ -70,30 +79,54 @@ impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D>> LocalFileSystemWrite
table_properties,
commit_state,
phantom: PhantomData,
filenaming,
}
}

fn get_or_insert_writer(&mut self, partition: &Option<String>) -> &mut V {
let filename_strategy = match self.filenaming.strategy {
Some(FilenameStrategy::Uuid) => FilenameStrategy::Uuid,
Some(FilenameStrategy::Serial) => FilenameStrategy::Serial,
None => FilenameStrategy::Serial,
};

// This allows us to override the file suffix (extension)
let file_suffix = if self.filenaming.suffix.is_some() {
self.filenaming.suffix.as_ref().unwrap()
} else {
V::file_suffix()
};

// This forms the base for naming files depending on strategy
let filename_base = if filename_strategy == FilenameStrategy::Uuid {
Uuid::new_v4().to_string()
} else {
format!("{:>05}-{:>03}", self.next_file_index, self.subtask_id)
};

// This allows us to manipulate the filename_base
let filename_core = if self.filenaming.prefix.is_some() {
format!(
"{}-{}",
self.filenaming.prefix.as_ref().unwrap(),
filename_base
)
} else {
filename_base
};
if !self.writers.contains_key(partition) {
let file_name = match partition {
Some(partition) => {
// make sure the partition directory exists in tmp and final
create_dir_all(&format!("{}/{}", self.tmp_dir, partition)).unwrap();
create_dir_all(&format!("{}/{}", self.final_dir, partition)).unwrap();
format!(
"{}/{:>05}-{:>03}.{}",
partition,
self.next_file_index,
self.subtask_id,
V::file_suffix()
)
if filename_strategy == FilenameStrategy::Uuid {
format!("{}/{}.{}", partition, filename_core, file_suffix)
} else {
format!("{}/{}.{}", partition, filename_core, file_suffix)
}
}
None => format!(
"{:>05}-{:>03}.{}",
self.next_file_index,
self.subtask_id,
V::file_suffix()
),
None => format!("{}.{}", filename_core, V::file_suffix()),
};
self.writers.insert(
partition.clone(),
Expand Down
46 changes: 38 additions & 8 deletions arroyo-worker/src/connectors/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::warn;
use typify::import_types;
use uuid::Uuid;

import_types!(schema = "../connector-schemas/filesystem/table.json");

Expand Down Expand Up @@ -355,6 +356,7 @@ struct AsyncMultipartFileSystemWriter<T: Data + SchemaData + Sync, R: MultiPartW
properties: FileSystemTable,
rolling_policy: RollingPolicy,
commit_state: CommitState,
filenaming: Filenaming,
}

#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
Expand Down Expand Up @@ -635,6 +637,13 @@ where
CommitStyle::DeltaLake => CommitState::DeltaLake { last_version: -1 },
CommitStyle::Direct => CommitState::VanillaParquet,
};
let filenaming = writer_properties
.file_settings
.as_ref()
.unwrap()
.filenaming
.clone()
.unwrap();
Self {
path,
active_writers: HashMap::new(),
Expand All @@ -652,6 +661,7 @@ where
),
properties: writer_properties,
commit_state,
filenaming,
}
}

Expand Down Expand Up @@ -752,15 +762,35 @@ where
}

fn new_writer(&mut self, partition: &Option<String>) -> R {
let filename_strategy = match self.filenaming.strategy {
Some(FilenameStrategy::Uuid) => FilenameStrategy::Uuid,
Some(FilenameStrategy::Serial) => FilenameStrategy::Serial,
None => FilenameStrategy::Serial,
};

// TODO: File suffix

// This forms the base for naming files depending on strategy
let filename_base = if filename_strategy == FilenameStrategy::Uuid {
Uuid::new_v4().to_string()
} else {
format!("{:>05}-{:>03}", self.max_file_index, self.subtask_id)
};

// This allows us to manipulate the filename_base
let filename_core = if self.filenaming.prefix.is_some() {
format!(
"{}-{}",
self.filenaming.prefix.as_ref().unwrap(),
filename_base
)
} else {
filename_base
};

let path = match partition {
Some(sub_bucket) => format!(
"{}/{}/{:0>5}-{:0>3}",
self.path, sub_bucket, self.max_file_index, self.subtask_id
),
None => format!(
"{}/{:0>5}-{:0>3}",
self.path, self.max_file_index, self.subtask_id
),
Some(sub_bucket) => format!("{}/{}/{}", self.path, sub_bucket, filename_core),
None => format!("{}/{}", self.path, filename_core),
};
R::new(
self.object_store.clone(),
Expand Down
24 changes: 24 additions & 0 deletions connector-schemas/filesystem/table.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,30 @@
"direct",
"delta_lake"
]
},
"filenaming": {
"title": "Filenaming",
"type": "object",
"properties": {
"prefix": {
"title": "Filename Prefix",
"type": "string",
"description": "The prefix to use in file name. i.e prefix-<uuid>.parquet"
},
"suffix": {
"title": "Filename Suffix",
"type": "string",
"description": "This will overwrite the default file suffix. i.e .parquet, use with caution"
},
"strategy":{
"title": "Filename Strategy",
"type": "string",
"enum": [
"serial",
"uuid"
]
}
}
}
},
"additionalProperties": false
Expand Down

0 comments on commit 0eb6114

Please sign in to comment.