Skip to content

Commit

Permalink
Add support for inferring schemas (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Nov 22, 2023
1 parent bc962c5 commit f8a88dd
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE UNIQUE INDEX case_insensitive_connection_unique ON connection_tables (organization_id, LOWER(name));
2 changes: 1 addition & 1 deletion arroyo-api/src/connection_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub async fn create_connection_table(
.to_string();

if let Some(schema) = &schema {
if schema.definition.is_none() {
if schema.definition.is_none() && schema.inferred != Some(true) {
return Err(required_field("schema.definition"));
}
}
Expand Down
13 changes: 5 additions & 8 deletions arroyo-connectors/src/blackhole.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use arroyo_rpc::api_types::connections::{ConnectionSchema, ConnectionType, TestSourceMessage};
use arroyo_rpc::OperatorConfig;
use axum::response::sse::Event;
Expand Down Expand Up @@ -35,7 +36,7 @@ impl Connector for BlackholeConnector {
}

fn table_type(&self, _: Self::ProfileT, _: Self::TableT) -> ConnectionType {
return ConnectionType::Source;
return ConnectionType::Sink;
}

fn get_schema(
Expand Down Expand Up @@ -98,13 +99,9 @@ impl Connector for BlackholeConnector {
id,
name: name.to_string(),
connection_type: ConnectionType::Sink,
schema: s.cloned().unwrap_or_else(|| ConnectionSchema {
format: None,
framing: None,
struct_name: None,
fields: vec![],
definition: None,
}),
schema: s
.cloned()
.ok_or_else(|| anyhow!("no schema for blackhole sink"))?,
operator: "connectors::blackhole::BlackholeSinkFunc::<#in_k, #in_t>".to_string(),
config: serde_json::to_string(&config).unwrap(),
description,
Expand Down
5 changes: 2 additions & 3 deletions arroyo-connectors/src/impulse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import_types!(schema = "../connector-schemas/impulse/table.json");
const ICON: &str = include_str!("../resources/impulse.svg");

pub fn impulse_schema() -> ConnectionSchema {
// use grpc::api::PrimitiveType::*;
// use source_field_type::Type::Primitive;
ConnectionSchema {
format: None,
framing: None,
Expand All @@ -27,6 +25,7 @@ pub fn impulse_schema() -> ConnectionSchema {
source_field("subtask_index", Primitive(PrimitiveType::UInt64)),
],
definition: None,
inferred: None,
}
}

Expand Down Expand Up @@ -113,7 +112,7 @@ impl Connector for ImpulseConnector {

// validate the schema
if let Some(s) = s {
if s.fields != impulse_schema().fields {
if !s.fields.is_empty() && s.fields != impulse_schema().fields {
bail!("invalid schema for impulse source");
}
}
Expand Down
51 changes: 32 additions & 19 deletions arroyo-connectors/src/nexmark.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::bail;
use anyhow::{anyhow, bail};
use arroyo_rpc::api_types::connections::FieldType::Primitive;
use arroyo_rpc::api_types::connections::{
ConnectionSchema, ConnectionType, FieldType, SourceFieldType, StructType, TestSourceMessage,
Expand All @@ -7,9 +7,10 @@ use arroyo_rpc::OperatorConfig;
use axum::response::sse::Event;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::str::FromStr;
use typify::import_types;

use crate::{nullable_field, source_field, Connection, Connector, EmptyConfig};
use crate::{nullable_field, pull_opt, source_field, Connection, Connector, EmptyConfig};

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/nexmark/table.json");
const ICON: &str = include_str!("../resources/nexmark.svg");
Expand Down Expand Up @@ -83,6 +84,7 @@ pub fn nexmark_schema() -> ConnectionSchema {
),
],
definition: None,
inferred: None,
}
}

Expand Down Expand Up @@ -148,24 +150,35 @@ impl Connector for NexmarkConnector {

fn from_options(
&self,
_: &str,
_: &mut std::collections::HashMap<String, String>,
_: Option<&ConnectionSchema>,
name: &str,
options: &mut std::collections::HashMap<String, String>,
schema: Option<&ConnectionSchema>,
) -> anyhow::Result<Connection> {
// let event_rate =
// f64::from_str(&pull_opt("event_rate", options)?)
// .map_err(|_| anyhow!("invalid value for event_rate; expected float"))?;

// let runtime = options.remove("runtime")
// .map(|t| f64::from_str(&t))
// .transpose()
// .map_err(|_| anyhow!("invalid value for runtime; expected float"))?;

// self.from_config(None, name, EmptyConfig {}, NexmarkTable {
// event_rate,
// runtime,
// }, None)
bail!("Nexmark sources cannot currently be created in SQL; create using the web ui instead")
let event_rate = f64::from_str(&pull_opt("event_rate", options)?)
.map_err(|_| anyhow!("invalid value for event_rate; expected float"))?;

let runtime = options
.remove("runtime")
.map(|t| f64::from_str(&t))
.transpose()
.map_err(|_| anyhow!("invalid value for runtime; expected float"))?;

if let Some(schema) = schema {
if !schema.fields.is_empty() && schema.fields != nexmark_schema().fields {
bail!("invalid schema for nexmark source; omit fields to rely on inference");
}
}

self.from_config(
None,
name,
EmptyConfig {},
NexmarkTable {
event_rate,
runtime,
},
None,
)
}

fn from_config(
Expand Down
1 change: 1 addition & 0 deletions arroyo-console/src/gen/api-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ export interface components {
fields: (components["schemas"]["SourceField"])[];
format?: components["schemas"]["Format"] | null;
framing?: components["schemas"]["Framing"] | null;
inferred?: boolean | null;
structName?: string | null;
};
ConnectionTable: {
Expand Down
7 changes: 7 additions & 0 deletions arroyo-console/src/lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,10 @@ export const generate_udf_id = () => {
}
return result;
};

export function capitalize(string: string): string {
if (string == 'json') {
return 'JSON';
}
return string.charAt(0).toUpperCase() + string.slice(1);
}
30 changes: 29 additions & 1 deletion arroyo-console/src/routes/connections/DefineSchema.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from '../../lib/data_fetching';
import { ConfluentSchemaEditor } from './ConfluentSchemaEditor';
import { components } from '../../gen/api-types';
import { capitalize } from '../../lib/util';

const SchemaFormatEditor = ({
connector,
Expand All @@ -37,7 +38,10 @@ const SchemaFormatEditor = ({
format: 'json' | 'avro';
}) => {
type SchemaTypeOption = { name: string; value: string };
let schemaTypeOptions: SchemaTypeOption[] = [{ name: format + ' schema', value: 'schema' }];
let schemaTypeOptions: SchemaTypeOption[] = [
{ name: 'Infer schema', value: 'inferred' },
{ name: capitalize(format) + ' schema', value: 'schema' },
];

if (format == 'json') {
schemaTypeOptions.push({ name: 'Unstructured JSON', value: 'unstructured' });
Expand Down Expand Up @@ -91,6 +95,17 @@ const SchemaFormatEditor = ({
} else if ((state.schema?.definition || {})[def_name] != undefined) {
editor = <SchemaEditor state={state} setState={setState} next={next} format={format} />;
value = 'schema';
} else if (state.schema?.inferred) {
editor = (
<Stack spacing={4} maxW={'lg'}>
<Text>
The schema for this connection will be inferred from context in the SQL query. This option
should generally just be used for sinks.
</Text>
<Button onClick={next}>Continue</Button>
</Stack>
);
value = 'inferred';
} else {
editor = null;
value = undefined;
Expand Down Expand Up @@ -137,6 +152,19 @@ const SchemaFormatEditor = ({
},
});
break;
case 'inferred':
setState({
...state,
schema: {
...state.schema,
definition: null,
fields: [],
format: { json: { unstructured: false, confluentSchemaRegistry: false } },
inferred: true,
},
});
break;

default:
setState({
...state,
Expand Down
3 changes: 3 additions & 0 deletions arroyo-rpc/src/api_types/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub struct ConnectionSchema {
pub struct_name: Option<String>,
pub fields: Vec<SourceField>,
pub definition: Option<SchemaDefinition>,
pub inferred: Option<bool>,
}

impl ConnectionSchema {
Expand All @@ -141,13 +142,15 @@ impl ConnectionSchema {
struct_name: Option<String>,
fields: Vec<SourceField>,
definition: Option<SchemaDefinition>,
inferred: Option<bool>,
) -> anyhow::Result<Self> {
let s = ConnectionSchema {
format,
framing,
struct_name,
fields,
definition,
inferred,
};

s.validate()
Expand Down
2 changes: 2 additions & 0 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub fn get_json_schema_source() -> Result<Connection> {
.map(|field| field.try_into().unwrap())
.collect(),
Some(definition),
None,
)?;
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
Expand Down Expand Up @@ -189,6 +190,7 @@ pub fn get_avro_source() -> Result<Connection> {
.map(|field| field.try_into().unwrap())
.collect(),
Some(definition),
None,
)?;
let config = KafkaConfig {
authentication: arroyo_connectors::kafka::KafkaConfigAuthentication::None {},
Expand Down
26 changes: 26 additions & 0 deletions arroyo-sql-testing/src/full_query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,29 @@ full_pipeline_codegen! {
"kafka_avro_source",
"SELECT * FROM kafka_avro_schema"
}

full_pipeline_codegen! {
"source_inferred_schema",
"CREATE TABLE n WITH (
connector = 'nexmark',
event_rate = '10'
);
select * from n;
"
}

full_pipeline_codegen! {
"sink_inferred_schema",
"CREATE TABLE schemaless_sink WITH (
connector = 'kafka',
bootstrap_servers = 'localhost:9092',
type = 'sink',
topic = 'outputs',
format = 'json'
);
INSERT INTO schemaless_sink
select bid.url
from nexmark;"
}
1 change: 1 addition & 0 deletions arroyo-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ schemars = "0.8"
serde_json_path = "0.6.3"
apache-avro = "0.16.0"
prettyplease = "0.2.4"
unicase = "2.7.0"
Loading

0 comments on commit f8a88dd

Please sign in to comment.