diff --git a/Cargo.lock b/Cargo.lock index 57e35d5e9..c72b274d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -749,6 +749,7 @@ dependencies = [ "tokio", "tracing", "typify", + "unicase", ] [[package]] diff --git a/arroyo-api/migrations/V19__case_insensitive_connection_names.sql b/arroyo-api/migrations/V19__case_insensitive_connection_names.sql new file mode 100644 index 000000000..fc2f6565b --- /dev/null +++ b/arroyo-api/migrations/V19__case_insensitive_connection_names.sql @@ -0,0 +1 @@ +CREATE UNIQUE INDEX case_insensitive_connection_unique ON connection_tables (organization_id, LOWER(name)); \ No newline at end of file diff --git a/arroyo-api/src/connection_tables.rs b/arroyo-api/src/connection_tables.rs index d24f8537f..3396045fa 100644 --- a/arroyo-api/src/connection_tables.rs +++ b/arroyo-api/src/connection_tables.rs @@ -261,7 +261,7 @@ pub async fn create_connection_table( .to_string(); if let Some(schema) = &schema { - if schema.definition.is_none() { + if schema.definition.is_none() && schema.inferred != Some(true) { return Err(required_field("schema.definition")); } } diff --git a/arroyo-connectors/src/blackhole.rs b/arroyo-connectors/src/blackhole.rs index bafb28925..5b0fc503d 100644 --- a/arroyo-connectors/src/blackhole.rs +++ b/arroyo-connectors/src/blackhole.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use arroyo_rpc::api_types::connections::{ConnectionSchema, ConnectionType, TestSourceMessage}; use arroyo_rpc::OperatorConfig; use axum::response::sse::Event; @@ -35,7 +36,7 @@ impl Connector for BlackholeConnector { } fn table_type(&self, _: Self::ProfileT, _: Self::TableT) -> ConnectionType { - return ConnectionType::Source; + return ConnectionType::Sink; } fn get_schema( @@ -98,13 +99,9 @@ impl Connector for BlackholeConnector { id, name: name.to_string(), connection_type: ConnectionType::Sink, - schema: s.cloned().unwrap_or_else(|| ConnectionSchema { - format: None, - framing: None, - struct_name: None, - fields: vec![], - definition: None, - }), + schema: s + .cloned() + .ok_or_else(|| anyhow!("no schema for blackhole sink"))?, operator: "connectors::blackhole::BlackholeSinkFunc::<#in_k, #in_t>".to_string(), config: serde_json::to_string(&config).unwrap(), description, diff --git a/arroyo-connectors/src/impulse.rs b/arroyo-connectors/src/impulse.rs index 8f7eb3868..81d625a78 100644 --- a/arroyo-connectors/src/impulse.rs +++ b/arroyo-connectors/src/impulse.rs @@ -16,8 +16,6 @@ import_types!(schema = "../connector-schemas/impulse/table.json"); const ICON: &str = include_str!("../resources/impulse.svg"); pub fn impulse_schema() -> ConnectionSchema { - // use grpc::api::PrimitiveType::*; - // use source_field_type::Type::Primitive; ConnectionSchema { format: None, framing: None, @@ -27,6 +25,7 @@ pub fn impulse_schema() -> ConnectionSchema { source_field("subtask_index", Primitive(PrimitiveType::UInt64)), ], definition: None, + inferred: None, } } @@ -113,7 +112,7 @@ impl Connector for ImpulseConnector { // validate the schema if let Some(s) = s { - if s.fields != impulse_schema().fields { + if !s.fields.is_empty() && s.fields != impulse_schema().fields { bail!("invalid schema for impulse source"); } } diff --git a/arroyo-connectors/src/nexmark.rs b/arroyo-connectors/src/nexmark.rs index db3f8d87c..09ea50888 100644 --- a/arroyo-connectors/src/nexmark.rs +++ b/arroyo-connectors/src/nexmark.rs @@ -1,4 +1,4 @@ -use anyhow::bail; +use anyhow::{anyhow, bail}; use arroyo_rpc::api_types::connections::FieldType::Primitive; use arroyo_rpc::api_types::connections::{ ConnectionSchema, ConnectionType, FieldType, SourceFieldType, StructType, TestSourceMessage, @@ -7,9 +7,10 @@ use arroyo_rpc::OperatorConfig; use axum::response::sse::Event; use serde::{Deserialize, Serialize}; use std::convert::Infallible; +use std::str::FromStr; use typify::import_types; -use crate::{nullable_field, source_field, Connection, Connector, EmptyConfig}; +use crate::{nullable_field, pull_opt, source_field, Connection, Connector, EmptyConfig}; const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/nexmark/table.json"); const ICON: &str = include_str!("../resources/nexmark.svg"); @@ -83,6 +84,7 @@ pub fn nexmark_schema() -> ConnectionSchema { ), ], definition: None, + inferred: None, } } @@ -148,24 +150,35 @@ impl Connector for NexmarkConnector { fn from_options( &self, - _: &str, - _: &mut std::collections::HashMap, - _: Option<&ConnectionSchema>, + name: &str, + options: &mut std::collections::HashMap, + schema: Option<&ConnectionSchema>, ) -> anyhow::Result { - // let event_rate = - // f64::from_str(&pull_opt("event_rate", options)?) - // .map_err(|_| anyhow!("invalid value for event_rate; expected float"))?; - - // let runtime = options.remove("runtime") - // .map(|t| f64::from_str(&t)) - // .transpose() - // .map_err(|_| anyhow!("invalid value for runtime; expected float"))?; - - // self.from_config(None, name, EmptyConfig {}, NexmarkTable { - // event_rate, - // runtime, - // }, None) - bail!("Nexmark sources cannot currently be created in SQL; create using the web ui instead") + let event_rate = f64::from_str(&pull_opt("event_rate", options)?) + .map_err(|_| anyhow!("invalid value for event_rate; expected float"))?; + + let runtime = options + .remove("runtime") + .map(|t| f64::from_str(&t)) + .transpose() + .map_err(|_| anyhow!("invalid value for runtime; expected float"))?; + + if let Some(schema) = schema { + if !schema.fields.is_empty() && schema.fields != nexmark_schema().fields { + bail!("invalid schema for nexmark source; omit fields to rely on inference"); + } + } + + self.from_config( + None, + name, + EmptyConfig {}, + NexmarkTable { + event_rate, + runtime, + }, + None, + ) } fn from_config( diff --git a/arroyo-console/src/gen/api-types.ts b/arroyo-console/src/gen/api-types.ts index 2fbb3774d..6e37ccdb1 100644 --- a/arroyo-console/src/gen/api-types.ts +++ b/arroyo-console/src/gen/api-types.ts @@ -243,6 +243,7 @@ export interface components { fields: (components["schemas"]["SourceField"])[]; format?: components["schemas"]["Format"] | null; framing?: components["schemas"]["Framing"] | null; + inferred?: boolean | null; structName?: string | null; }; ConnectionTable: { diff --git a/arroyo-console/src/lib/util.ts b/arroyo-console/src/lib/util.ts index 97aee7ad1..6b926d9bf 100644 --- a/arroyo-console/src/lib/util.ts +++ b/arroyo-console/src/lib/util.ts @@ -189,3 +189,10 @@ export const generate_udf_id = () => { } return result; }; + +export function capitalize(string: string): string { + if (string == 'json') { + return 'JSON'; + } + return string.charAt(0).toUpperCase() + string.slice(1); +} diff --git a/arroyo-console/src/routes/connections/DefineSchema.tsx b/arroyo-console/src/routes/connections/DefineSchema.tsx index b9298570d..edd0c95d4 100644 --- a/arroyo-console/src/routes/connections/DefineSchema.tsx +++ b/arroyo-console/src/routes/connections/DefineSchema.tsx @@ -20,6 +20,7 @@ import { } from '../../lib/data_fetching'; import { ConfluentSchemaEditor } from './ConfluentSchemaEditor'; import { components } from '../../gen/api-types'; +import { capitalize } from '../../lib/util'; const SchemaFormatEditor = ({ connector, @@ -37,7 +38,10 @@ const SchemaFormatEditor = ({ format: 'json' | 'avro'; }) => { type SchemaTypeOption = { name: string; value: string }; - let schemaTypeOptions: SchemaTypeOption[] = [{ name: format + ' schema', value: 'schema' }]; + let schemaTypeOptions: SchemaTypeOption[] = [ + { name: 'Infer schema', value: 'inferred' }, + { name: capitalize(format) + ' schema', value: 'schema' }, + ]; if (format == 'json') { schemaTypeOptions.push({ name: 'Unstructured JSON', value: 'unstructured' }); @@ -91,6 +95,17 @@ const SchemaFormatEditor = ({ } else if ((state.schema?.definition || {})[def_name] != undefined) { editor = ; value = 'schema'; + } else if (state.schema?.inferred) { + editor = ( + + + The schema for this connection will be inferred from context in the SQL query. This option + should generally just be used for sinks. + + + + ); + value = 'inferred'; } else { editor = null; value = undefined; @@ -137,6 +152,19 @@ const SchemaFormatEditor = ({ }, }); break; + case 'inferred': + setState({ + ...state, + schema: { + ...state.schema, + definition: null, + fields: [], + format: { json: { unstructured: false, confluentSchemaRegistry: false } }, + inferred: true, + }, + }); + break; + default: setState({ ...state, diff --git a/arroyo-rpc/src/api_types/connections.rs b/arroyo-rpc/src/api_types/connections.rs index 3da3083ed..e0210ab4e 100644 --- a/arroyo-rpc/src/api_types/connections.rs +++ b/arroyo-rpc/src/api_types/connections.rs @@ -132,6 +132,7 @@ pub struct ConnectionSchema { pub struct_name: Option, pub fields: Vec, pub definition: Option, + pub inferred: Option, } impl ConnectionSchema { @@ -141,6 +142,7 @@ impl ConnectionSchema { struct_name: Option, fields: Vec, definition: Option, + inferred: Option, ) -> anyhow::Result { let s = ConnectionSchema { format, @@ -148,6 +150,7 @@ impl ConnectionSchema { struct_name, fields, definition, + inferred, }; s.validate() diff --git a/arroyo-sql-macro/src/connectors.rs b/arroyo-sql-macro/src/connectors.rs index b5970f858..f54d3e937 100644 --- a/arroyo-sql-macro/src/connectors.rs +++ b/arroyo-sql-macro/src/connectors.rs @@ -85,6 +85,7 @@ pub fn get_json_schema_source() -> Result { .map(|field| field.try_into().unwrap()) .collect(), Some(definition), + None, )?; let config = KafkaConfig { authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {}, @@ -189,6 +190,7 @@ pub fn get_avro_source() -> Result { .map(|field| field.try_into().unwrap()) .collect(), Some(definition), + None, )?; let config = KafkaConfig { authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {}, diff --git a/arroyo-sql-testing/src/full_query_tests.rs b/arroyo-sql-testing/src/full_query_tests.rs index 7e95229df..3ab4910b9 100644 --- a/arroyo-sql-testing/src/full_query_tests.rs +++ b/arroyo-sql-testing/src/full_query_tests.rs @@ -495,3 +495,29 @@ full_pipeline_codegen! { "kafka_avro_source", "SELECT * FROM kafka_avro_schema" } + +full_pipeline_codegen! { + "source_inferred_schema", + "CREATE TABLE n WITH ( + connector = 'nexmark', + event_rate = '10' + ); + + select * from n; + " +} + +full_pipeline_codegen! { + "sink_inferred_schema", + "CREATE TABLE schemaless_sink WITH ( + connector = 'kafka', + bootstrap_servers = 'localhost:9092', + type = 'sink', + topic = 'outputs', + format = 'json' +); + +INSERT INTO schemaless_sink +select bid.url +from nexmark;" +} diff --git a/arroyo-sql/Cargo.toml b/arroyo-sql/Cargo.toml index db2c88a17..424fd98fd 100644 --- a/arroyo-sql/Cargo.toml +++ b/arroyo-sql/Cargo.toml @@ -34,3 +34,4 @@ schemars = "0.8" serde_json_path = "0.6.3" apache-avro = "0.16.0" prettyplease = "0.2.4" +unicase = "2.7.0" diff --git a/arroyo-sql/src/lib.rs b/arroyo-sql/src/lib.rs index cc050c334..fc6fe8aff 100644 --- a/arroyo-sql/src/lib.rs +++ b/arroyo-sql/src/lib.rs @@ -48,10 +48,12 @@ use datafusion_common::DataFusionError; use prettyplease::unparse; use regex::Regex; use std::collections::HashSet; + use std::time::{Duration, SystemTime}; use std::{collections::HashMap, sync::Arc}; use syn::{parse_file, parse_quote, parse_str, FnArg, Item, ReturnType, Visibility}; use tracing::warn; +use unicase::UniCase; const DEFAULT_IDLE_TIME: Option = Some(Duration::from_secs(5 * 60)); @@ -69,7 +71,7 @@ pub struct UdfDef { #[derive(Debug, Clone, Default)] pub struct ArroyoSchemaProvider { pub source_defs: HashMap, - tables: HashMap, + tables: HashMap, Table>, pub functions: HashMap>, pub aggregate_functions: HashMap>, pub connections: HashMap, @@ -207,17 +209,22 @@ impl ArroyoSchemaProvider { } self.tables.insert( - connection.name.clone(), + UniCase::new(connection.name.clone()), Table::ConnectorTable(connection.into()), ); } fn insert_table(&mut self, table: Table) { - self.tables.insert(table.name().to_string(), table); + self.tables + .insert(UniCase::new(table.name().to_string()), table); + } + + pub fn get_table(&self, table_name: impl Into) -> Option<&Table> { + self.tables.get(&UniCase::new(table_name.into())) } - fn get_table(&self, table_name: &String) -> Option<&Table> { - self.tables.get(table_name) + pub fn get_table_mut(&mut self, table_name: impl Into) -> Option<&mut Table> { + self.tables.get_mut(&UniCase::new(table_name.into())) } fn vec_inner_type(ty: &syn::Type) -> Option { @@ -382,15 +389,12 @@ impl ContextProvider for ArroyoSchemaProvider { &self, name: TableReference, ) -> datafusion_common::Result> { - let table = self.tables.get(&name.to_string()).ok_or_else(|| { + let table = self.get_table(name.to_string()).ok_or_else(|| { datafusion::error::DataFusionError::Plan(format!("Table {} not found", name)) })?; - let fields = table.get_fields().map_err(|err| { - datafusion::error::DataFusionError::Plan(format!( - "Table {} failed to get fields with {}", - name, err - )) - })?; + + let fields = table.get_fields(); + Ok(create_table_source(fields)) } @@ -455,7 +459,10 @@ pub fn parse_and_get_program_sync( if let Some(table) = Table::try_from_statement(&statement, &schema_provider)? { schema_provider.insert_table(table); } else { - inserts.push(Insert::try_from_statement(&statement, &schema_provider)?); + inserts.push(Insert::try_from_statement( + &statement, + &mut schema_provider, + )?); }; } @@ -495,6 +502,7 @@ pub fn parse_and_get_program_sync( event_time_field: None, watermark_field: None, idle_time: DEFAULT_IDLE_TIME, + inferred_fields: None, }); plan_graph.add_sql_operator(sink.as_sql_sink(insert)?); @@ -670,6 +678,7 @@ pub fn get_test_expression( .map(|s| s.clone().try_into().unwrap()) .collect(), definition: None, + inferred: None, }; let mut schema_provider = ArroyoSchemaProvider::new(); @@ -706,7 +715,7 @@ pub fn get_test_expression( if let Some(table) = Table::try_from_statement(&statement, &schema_provider).unwrap() { schema_provider.insert_table(table); } else { - inserts.push(Insert::try_from_statement(&statement, &schema_provider).unwrap()); + inserts.push(Insert::try_from_statement(&statement, &mut schema_provider).unwrap()); }; } diff --git a/arroyo-sql/src/tables.rs b/arroyo-sql/src/tables.rs index d5593c343..d438a3eba 100644 --- a/arroyo-sql/src/tables.rs +++ b/arroyo-sql/src/tables.rs @@ -9,6 +9,7 @@ use arroyo_rpc::api_types::connections::{ ConnectionSchema, ConnectionType, SchemaDefinition, SourceField, }; use arroyo_rpc::formats::{Format, Framing}; +use datafusion::sql::sqlparser::ast::Query; use datafusion::{ optimizer::{analyzer::Analyzer, optimizer::Optimizer, OptimizerContext}, sql::{ @@ -49,6 +50,8 @@ pub struct ConnectorTable { pub event_time_field: Option, pub watermark_field: Option, pub idle_time: Option, + + pub inferred_fields: Option>, } #[derive(Debug, Clone)] @@ -147,6 +150,7 @@ impl From for ConnectorTable { event_time_field: None, watermark_field: None, idle_time: DEFAULT_IDLE_TIME, + inferred_fields: None, } } } @@ -202,12 +206,21 @@ impl ConnectorTable { }) .collect(); - let schema = ConnectionSchema::try_new(format, framing, None, schema_fields?, None)?; + let schema = ConnectionSchema::try_new( + format, + framing, + None, + schema_fields?, + None, + Some(fields.is_empty()), + )?; let connection = connector.from_options(name, options, Some(&schema))?; let mut table: ConnectorTable = connection.into(); - table.fields = fields; + if !fields.is_empty() { + table.fields = fields; + } table.event_time_field = options.remove("event_time_field"); table.watermark_field = options.remove("watermark_field"); @@ -495,6 +508,10 @@ fn value_to_inner_string(value: &Value) -> Result { } } +fn qualified_field(f: &DFField) -> Field { + Field::new(f.qualified_name(), f.data_type().clone(), f.is_nullable()) +} + impl Table { fn schema_from_columns( columns: &Vec, @@ -635,14 +652,16 @@ impl Table { ))), } } else { - match &produce_optimized_plan(statement, schema_provider)? { + match &produce_optimized_plan(statement, schema_provider) { // views and memory tables are the same now. - LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { name, input, .. })) - | LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { + Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView { + name, input, .. + }))) + | Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable { name, input, .. - })) => { + }))) => { // Return a TableFromQuery Ok(Some(Table::TableFromQuery { name: name.to_string(), @@ -661,31 +680,56 @@ impl Table { } } - pub fn get_fields(&self) -> Result> { + pub fn set_inferred_fields(&mut self, fields: Vec) -> Result<()> { + let Table::ConnectorTable(t) = self else { + bail!("can only infer schema for connector tables"); + }; + + if !t.fields.is_empty() { + return Ok(()); + } + + if let Some(existing) = &t.inferred_fields { + let matches = existing.len() == fields.len() + && existing + .iter() + .zip(&fields) + .all(|(a, b)| a.name() == b.name() && a.data_type() == b.data_type()); + + if !matches { + bail!("all inserts into a table must share the same schema"); + } + } + + t.inferred_fields.replace(fields); + + Ok(()) + } + + pub fn get_fields(&self) -> Vec { match self { - Table::MemoryTable { fields, .. } => fields - .iter() - .map(|field| { - let field: Field = field.clone().into(); - Ok(field) - }) - .collect::>>(), - Table::ConnectorTable(ConnectorTable { fields, .. }) => fields - .iter() - .map(|field| { - let field: Field = field.struct_field().clone().into(); - Ok(field) - }) - .collect::>>(), + Table::MemoryTable { fields, .. } => { + fields.iter().map(|field| field.clone().into()).collect() + } + Table::ConnectorTable(ConnectorTable { + fields, + inferred_fields, + .. + }) => inferred_fields + .as_ref() + .map(|fs| fs.iter().map(|f| qualified_field(&*f)).collect()) + .unwrap_or_else(|| { + fields + .iter() + .map(|field| field.struct_field().clone().into()) + .collect() + }), Table::TableFromQuery { logical_plan, .. } => logical_plan .schema() .fields() .iter() - .map(|field| { - let field: Field = (**field.field()).clone(); - Ok(field) - }) - .collect::>>(), + .map(qualified_field) + .collect(), } } @@ -729,11 +773,38 @@ pub enum Insert { }, } +fn infer_sink_schema( + source: &Box, + table_name: String, + schema_provider: &mut ArroyoSchemaProvider, +) -> Result<()> { + let plan = produce_optimized_plan(&Statement::Query(source.clone()), schema_provider)?; + let table = schema_provider + .get_table_mut(&table_name) + .ok_or_else(|| anyhow!("table {} not found", table_name))?; + + table.set_inferred_fields(plan.schema().fields().iter().map(|f| f.clone()).collect())?; + + Ok(()) +} + impl Insert { pub fn try_from_statement( statement: &Statement, - schema_provider: &ArroyoSchemaProvider, + schema_provider: &mut ArroyoSchemaProvider, ) -> Result { + if let Statement::Insert { + source, + into, + table_name, + .. + } = statement + { + if *into { + infer_sink_schema(source, table_name.to_string(), schema_provider)?; + } + } + let logical_plan = produce_optimized_plan(statement, schema_provider)?; match &logical_plan {