From 3e8cf654bfd55f1cab7d2e15c4ef454670c7e419 Mon Sep 17 00:00:00 2001 From: Jonah Eisen Date: Fri, 15 Dec 2023 15:10:37 -0500 Subject: [PATCH] Add bad data option to create connection form Add a bad data option selector to the Define schema stage of the create connection flow. --- arroyo-console/src/gen/api-types.ts | 12 ++++- .../src/routes/connections/DefineSchema.tsx | 44 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/arroyo-console/src/gen/api-types.ts b/arroyo-console/src/gen/api-types.ts index 6e37ccdb1..985265856 100644 --- a/arroyo-console/src/gen/api-types.ts +++ b/arroyo-console/src/gen/api-types.ts @@ -197,10 +197,17 @@ export interface components { schemas: { AvroFormat: { confluentSchemaRegistry?: boolean; - embeddedSchema?: boolean; intoUnstructuredJson?: boolean; + rawDatums?: boolean; readerSchema?: string; + /** Format: int32 */ + schemaId?: number | null; }; + BadData: OneOf<[{ + fail: Record; + }, { + drop: Record; + }]>; Checkpoint: { backend: string; /** Format: int32 */ @@ -239,6 +246,7 @@ export interface components { name: string; }; ConnectionSchema: { + badData?: components["schemas"]["BadData"] | null; definition?: components["schemas"]["SchemaDefinition"] | null; fields: (components["schemas"]["SourceField"])[]; format?: components["schemas"]["Format"] | null; @@ -363,6 +371,8 @@ export interface components { confluentSchemaRegistry?: boolean; debezium?: boolean; includeSchema?: boolean; + /** Format: int32 */ + schemaId?: number | null; timestampFormat?: components["schemas"]["TimestampFormat"]; unstructured?: boolean; }; diff --git a/arroyo-console/src/routes/connections/DefineSchema.tsx b/arroyo-console/src/routes/connections/DefineSchema.tsx index aec0ded91..54103500f 100644 --- a/arroyo-console/src/routes/connections/DefineSchema.tsx +++ b/arroyo-console/src/routes/connections/DefineSchema.tsx @@ -262,6 +262,7 @@ export const DefineSchema = ({ type DataFormatOption = { name: string; value: string; el?: ReactElement; disabled?: boolean }; const [selectedFormat, setSelectedFormat] = useState(undefined); const [selectedFraming, setSelectedFraming] = useState(undefined); + const [selectedBadData, setSelectedBadData] = useState(undefined); let { connectionProfiles, connectionProfilesLoading } = useConnectionProfiles(); @@ -334,6 +335,16 @@ export const DefineSchema = ({ }, ]; + type BadDataOption = { + name: string; + value: components['schemas']['BadData']; + }; + + const badDataOptions: BadDataOption[] = [ + { name: 'Fail', value: { fail: {} } }, + { name: 'Drop', value: { drop: {} } }, + ]; + const onFormatChange = (e: ChangeEvent) => { let format = String(e.target.value); setSelectedFormat(format); @@ -368,6 +379,18 @@ export const DefineSchema = ({ }); }; + const onBadDataChange: ChangeEventHandler = e => { + setSelectedBadData(e.target.value); + setState({ + ...state, + schema: { + ...state.schema, + fields: [], + badData: badDataOptions.find(f => f.name == e.target.value)?.value, + }, + }); + }; + return ( @@ -385,6 +408,27 @@ export const DefineSchema = ({ + + Bad Data + + + This option describes how the job should handle data that doesn't match the defined + schema. 'Fail' will cause the job to fail, while 'Drop' will cause the job to drop + (ignore) bad data. + + + Data format