Skip to content

Commit

Permalink
Clean up file system table schema and support maps in UI
Browse files Browse the repository at this point in the history
Remove some unnecessary nesting in the file system table schema,
standardize the field capitalization, and add a MapWidget component to
support a Storage Options string map. Also renders all helper text in
the JsonForm as markdown so it can include links.
  • Loading branch information
jbeisen authored and jacksonrnewhouse committed Nov 21, 2023
1 parent f76fb1e commit 2f816cc
Show file tree
Hide file tree
Showing 12 changed files with 771 additions and 179 deletions.
7 changes: 4 additions & 3 deletions arroyo-connectors/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ impl Connector for DeltaLakeConnector {
schema: Option<&ConnectionSchema>,
) -> anyhow::Result<crate::Connection> {
let TableType::Sink {
write_path,
file_settings,
format_settings,
write_target,
} = &table.type_
..
} = &table.table_type
else {
bail!("Delta Lake connector only supports sink tables");
};
Expand All @@ -94,7 +95,7 @@ impl Connector for DeltaLakeConnector {
bail!("commit_style must be DeltaLake");
}

let backend_config = BackendConfig::parse_url(&write_target.path, true)?;
let backend_config = BackendConfig::parse_url(&write_path, true)?;
let is_local = match &backend_config {
BackendConfig::Local { .. } => true,
_ => false,
Expand Down
38 changes: 18 additions & 20 deletions arroyo-connectors/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Connector for FileSystemConnector {
}

fn table_type(&self, _: Self::ProfileT, table: Self::TableT) -> ConnectionType {
match table.type_ {
match table.table_type {
TableType::Source { .. } => ConnectionType::Source,
TableType::Sink { .. } => ConnectionType::Sink,
}
Expand All @@ -82,15 +82,15 @@ impl Connector for FileSystemConnector {
table: Self::TableT,
schema: Option<&ConnectionSchema>,
) -> anyhow::Result<crate::Connection> {
let (description, operator, connection_type) = match table.type_ {
let (description, operator, connection_type) = match table.table_type {
TableType::Source { .. } => (
"FileSystem".to_string(),
"connectors::filesystem::source::FileSystemSourceFunc",
ConnectionType::Source,
),
TableType::Sink {
ref write_path,
ref format_settings,
ref write_target,
ref file_settings,
..
} => {
Expand All @@ -103,7 +103,7 @@ impl Connector for FileSystemConnector {
bail!("commit_style must be Direct");
};

let backend_config = BackendConfig::parse_url(&write_target.path, true)?;
let backend_config = BackendConfig::parse_url(&write_path, true)?;
let is_local = match &backend_config {
BackendConfig::Local { .. } => true,
_ => false,
Expand All @@ -117,11 +117,11 @@ impl Connector for FileSystemConnector {
"FileSystem<Parquet>".to_string(),
"connectors::filesystem::ParquetFileSystemSink::<#in_k, #in_t, #in_tRecordBatchBuilder>"
),
(Some(FormatSettings::Json { }), true) => (
(Some(FormatSettings::Json { ..}), true) => (
"LocalFileSystem<JSON>".to_string(),
"connectors::filesystem::LocalJsonFileSystemSink::<#in_k, #in_t>"
),
(Some(FormatSettings::Json { }), false) => (
(Some(FormatSettings::Json {.. }), false) => (
"FileSystem<JSON>".to_string(),
"connectors::filesystem::JsonFileSystemSink::<#in_k, #in_t>"
),
Expand Down Expand Up @@ -180,11 +180,9 @@ impl Connector for FileSystemConnector {
name,
EmptyConfig {},
FileSystemTable {
type_: TableType::Source {
read_source: Source {
path: storage_url,
storage_options,
},
table_type: TableType::Source {
path: storage_url,
storage_options,
compression_format: Some(compression_format),
regex_pattern: matching_pattern,
},
Expand Down Expand Up @@ -255,8 +253,8 @@ pub fn file_system_sink_from_options(
None
};

let filenaming = if prefix.is_some() || suffix.is_some() || strategy.is_some() {
Some(Filenaming {
let file_naming = if prefix.is_some() || suffix.is_some() || strategy.is_some() {
Some(FileNaming {
prefix,
suffix,
strategy,
Expand All @@ -273,7 +271,7 @@ pub fn file_system_sink_from_options(
target_part_size,
partitioning,
commit_style: Some(commit_style),
filenaming,
file_naming,
});

let format_settings = match schema
Expand All @@ -300,17 +298,17 @@ pub fn file_system_sink_from_options(
row_group_size,
})
}
Format::Json(..) => Some(FormatSettings::Json {}),
Format::Json(..) => Some(FormatSettings::Json {
json_format: JsonFormat::Json,
}),
other => bail!("Unsupported format: {:?}", other),
};
Ok(FileSystemTable {
type_: TableType::Sink {
table_type: TableType::Sink {
file_settings,
format_settings,
write_target: FolderUrl {
path: storage_url,
storage_options,
},
write_path: storage_url,
storage_options,
},
})
}
1 change: 1 addition & 0 deletions arroyo-console/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-icons": "^4.11.0",
"react-markdown": "^9.0.1",
"react-resizable-panels": "^0.0.55",
"react-router-dom": "^6.13.0",
"react-syntax-highlighter": "^15.5.0",
Expand Down
Loading

0 comments on commit 2f816cc

Please sign in to comment.