Skip to content

Commit

Permalink
integrate latest code + table definition improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
rcjmurillo committed Oct 20, 2023
1 parent f4b0f6a commit 9d7aea8
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 216 deletions.
33 changes: 18 additions & 15 deletions arroyo-connectors/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Connector for FileSystemConnector {
ref write_target,
..
} => {
let backend_config = BackendConfig::parse_url(&table.write_target.path, true)?;
let backend_config = BackendConfig::parse_url(&write_target.path, true)?;
let is_local = match &backend_config {
BackendConfig::Local { .. } => true,
_ => false,
Expand All @@ -115,7 +115,7 @@ impl Connector for FileSystemConnector {
),
(None, _) => bail!("have to have some format settings"),
};
(desc, op, ConnectionType::Sink)
(description, operator, ConnectionType::Sink)
}
};

Expand Down Expand Up @@ -168,19 +168,20 @@ impl Connector for FileSystemConnector {
EmptyConfig {},
FileSystemTable {
type_: TableType::Source {
read_source: Some(S3 {
bucket: Some(bucket),
compression_format: Some(
compression_format.as_str().try_into().map_err(|_| {
read_source: S3 {
bucket,
compression_format: compression_format
.as_str()
.try_into()
.map_err(|_| {
anyhow::anyhow!(
"unsupported compression format: {}",
compression_format
)
})?,
),
prefix: Some(prefix),
region: Some(region),
}),
prefix,
region,
},
},
},
schema,
Expand Down Expand Up @@ -266,12 +267,14 @@ impl Connector for FileSystemConnector {
name,
EmptyConfig {},
FileSystemTable {
write_target: FolderUrl {
path: storage_url,
storage_options,
type_: TableType::Sink {
write_target: FolderUrl {
path: storage_url,
storage_options,
},
file_settings,
format_settings,
},
file_settings,
format_settings,
},
schema,
)
Expand Down
14 changes: 6 additions & 8 deletions arroyo-worker/src/connectors/filesystem/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use crate::connectors::two_phase_committer::TwoPhaseCommitter;

use anyhow::{bail, Result};

use super::{get_partitioner_from_table, FileSystemTable, MultiPartWriterStats, RollingPolicy, TableType};
use super::{
get_partitioner_from_table, FileSystemTable, MultiPartWriterStats, RollingPolicy, TableType,
};

pub struct LocalFileSystemWriter<K: Key, D: Data + Sync, V: LocalWriter<D>> {
// writer to a local tmp file
Expand All @@ -36,12 +38,10 @@ impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D>> LocalFileSystemWrite
// make sure final_dir and tmp_dir exists
create_dir_all(&tmp_dir).unwrap();

let file_settings = if let TableType::Sink {
let TableType::Sink {
ref file_settings, ..
} = table_properties.type_
{
file_settings.as_ref().unwrap()
} else {
else {
unreachable!("LocalFileSystemWriter can only be used as a sink")
};

Expand All @@ -53,9 +53,7 @@ impl<K: Key, D: Data + Sync + Serialize, V: LocalWriter<D>> LocalFileSystemWrite
subtask_id: 0,
partitioner: get_partitioner_from_table(&table_properties),
finished_files: Vec::new(),
rolling_policy: RollingPolicy::from_file_settings(
table_properties.file_settings.as_ref().unwrap(),
),
rolling_policy: RollingPolicy::from_file_settings(file_settings.as_ref().unwrap()),
table_properties,
phantom: PhantomData,
}
Expand Down
28 changes: 19 additions & 9 deletions arroyo-worker/src/connectors/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ impl<K: Key, T: Data + Sync + Serialize, V: LocalWriter<T>> LocalFileSystemWrite
let table: FileSystemTable =
serde_json::from_value(config.table).expect("Invalid table config for FileSystemSink");
let final_dir = match table.type_ {
TableType::Sink { write_target, .. } => write_target.path.clone(),
TableType::Sink {
ref write_target, ..
} => write_target.path.clone(),
TableType::Source { .. } => {
unreachable!("shouldn't be using local writer for source");
}
Expand All @@ -88,23 +90,23 @@ impl<K: Key, T: Data + Sync + Serialize, R: MultiPartWriter<InputType = T> + Sen
let table: FileSystemTable =
serde_json::from_value(config.table).expect("Invalid table config for FileSystemSink");

let path: &Path = match table.type_ {
let write_target = match table.type_ {
TableType::Sink {
ref write_target, ..
} => write_target.path.as_ref(),
} => write_target.clone(),
TableType::Source { .. } => {
unreachable!("shouldn't be using local writer for source");
unreachable!("multi-part writer can only be used as sink");
}
};

let (sender, receiver) = tokio::sync::mpsc::channel(10000);
let (checkpoint_sender, checkpoint_receiver) = tokio::sync::mpsc::channel(10000);
let partition_func = get_partitioner_from_table(&table);
tokio::spawn(async move {
let path: Path = StorageProvider::get_key(path).unwrap().into();
let path: Path = StorageProvider::get_key(&write_target.path).unwrap().into();
let provider = StorageProvider::for_url_with_options(
&table.write_target.path,
table.write_target.storage_options.clone(),
&write_target.path,
write_target.storage_options.clone(),
)
.await
.unwrap();
Expand All @@ -129,8 +131,16 @@ impl<K: Key, T: Data + Sync + Serialize, R: MultiPartWriter<InputType = T> + Sen
fn get_partitioner_from_table<K: Key, T: Data + Serialize>(
table: &FileSystemTable,
) -> Option<Box<dyn Fn(&Record<K, T>) -> String + Send>> {
let Some(partitions) = table.file_settings.as_ref().unwrap().partitioning.clone() else {
return None;
let partitions = match table.type_ {
TableType::Sink {
file_settings:
Some(FileSettings {
partitioning: Some(ref partitions),
..
}),
..
} => partitions.clone(),
_ => return None,
};
match (
partitions.time_partition_pattern,
Expand Down
37 changes: 6 additions & 31 deletions arroyo-worker/src/connectors/filesystem/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,48 +82,23 @@ impl<K: Data, T: SchemaData + Data> FileSystemSourceFunc<K, T> {

let (storage_provider, compression_format, prefix) = match &self.table {
TableType::Source { read_source } => match read_source {
Some(S3 {
S3 {
bucket,
compression_format,
ref prefix,
region,
}) => {
} => {
let storage_provider = StorageProvider::for_url(&format!(
"https://{}.amazonaws.com/{}/{}",
region.as_ref().ok_or(UserError::new(
"invalid table config",
"region must be set for S3 source".to_string()
))?,
bucket.as_ref().ok_or(UserError::new(
"invalid table config",
"bucket must be set for S3 source"
))?,
prefix.as_ref().ok_or(UserError::new(
"invalid table config",
"prefix must be set for S3 source"
))?
region,
bucket,
prefix.clone()
))
.await
.map_err(|err| {
UserError::new("failed to create storage provider", err.to_string())
})?;
(
storage_provider,
compression_format.ok_or(UserError::new(
"invalid table config",
"compression_format must be set for S3 source".to_string(),
))?,
prefix.clone().ok_or(UserError::new(
"invalid table config",
"prefix must be set for S3 source".to_string(),
))?,
)
}
None => {
return Err(UserError::new(
"invalid table config",
"no read source specified for filesystem source".to_string(),
))
(storage_provider, compression_format.clone(), prefix.clone())
}
},
TableType::Sink { .. } => {
Expand Down
Loading

0 comments on commit 9d7aea8

Please sign in to comment.