From 62d18e423739e24e447b3d87f4fbced1da2446e1 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Thu, 18 Jul 2024 13:38:36 -0700 Subject: [PATCH] Add partial support of advanced input transformations (ingest) (#220) Signed-off-by: Tyler Ohlsen --- common/constants.ts | 2 + common/interfaces.ts | 50 ++++- public/configs/ml_processor.ts | 10 - .../workflow_inputs/config_field_list.tsx | 26 +-- .../ingest_inputs/source_data.tsx | 1 + .../input_fields/map_field.tsx | 11 +- .../workflow_inputs/processor_inputs/index.ts | 7 + .../input_transform_modal.tsx | 198 ++++++++++++++++++ .../processor_inputs/ml_processor_inputs.tsx | 138 ++++++++++++ .../output_transform_modal.tsx | 45 ++++ .../processor_inputs/processor_inputs.tsx | 57 +++++ .../workflow_inputs/processors_list.tsx | 9 +- .../workflow_inputs/workflow_inputs.tsx | 35 +++- .../workflows/workflow_list/workflow_list.tsx | 4 +- public/route_service.ts | 38 ++++ public/store/reducers/opensearch_reducer.ts | 46 +++- public/utils/config_to_form_utils.ts | 5 +- public/utils/config_to_schema_utils.ts | 17 +- public/utils/config_to_template_utils.ts | 2 +- public/utils/form_to_pipeline_utils.ts | 56 +++++ public/utils/index.ts | 1 + public/utils/utils.ts | 6 +- server/routes/opensearch_routes_service.ts | 83 ++++++++ 23 files changed, 791 insertions(+), 56 deletions(-) create mode 100644 public/pages/workflow_detail/workflow_inputs/processor_inputs/index.ts create mode 100644 public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx create mode 100644 public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx create mode 100644 public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx create mode 100644 public/pages/workflow_detail/workflow_inputs/processor_inputs/processor_inputs.tsx create mode 100644 public/utils/form_to_pipeline_utils.ts diff --git a/common/constants.ts b/common/constants.ts index ffed2a0f..10ada745 100644 --- a/common/constants.ts +++ b/common/constants.ts @@ -32,6 +32,8 @@ export const BASE_OPENSEARCH_NODE_API_PATH = `${BASE_NODE_API_PATH}/opensearch`; export const CAT_INDICES_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/catIndices`; export const SEARCH_INDEX_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/search`; export const INGEST_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/ingest`; +export const BULK_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/bulk`; +export const SIMULATE_PIPELINE_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/simulatePipeline`; // Flow Framework node APIs export const BASE_WORKFLOW_NODE_API_PATH = `${BASE_NODE_API_PATH}/workflow`; diff --git a/common/interfaces.ts b/common/interfaces.ts index 90311dca..205707ae 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -15,10 +15,15 @@ export type Index = { /** ********** WORKFLOW TYPES/INTERFACES ********** -TODO: over time these can become less generic as the form inputs & UX becomes finalized */ -export type ConfigFieldType = 'string' | 'json' | 'select' | 'model' | 'map'; +export type ConfigFieldType = + | 'string' + | 'json' + | 'jsonArray' + | 'select' + | 'model' + | 'map'; export type ConfigFieldValue = string | {}; export interface IConfigField { type: ConfigFieldType; @@ -46,6 +51,12 @@ export type ProcessorsConfig = { processors: IProcessorConfig[]; }; +export type IngestPipelineConfig = ProcessorsConfig & { + description?: string; +}; + +export type SearchPipelineConfig = ProcessorsConfig; + export type IndexConfig = { name: IConfigField; mappings: IConfigField; @@ -384,13 +395,6 @@ export type ModelFormValue = { ********** MISC TYPES/INTERFACES ************ */ -// TODO: finalize how we have the launch data model -export type WorkflowLaunch = { - id: string; - state: WORKFLOW_STATE; - lastUpdated: number; -}; - // Based off of https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/model/State.java export enum WORKFLOW_STATE { NOT_STARTED = 'Not started', @@ -431,3 +435,31 @@ export enum WORKFLOW_STEP_TO_RESOURCE_TYPE_MAP { export type WorkflowDict = { [workflowId: string]: Workflow; }; + +/** + ********** OPENSEARCH TYPES/INTERFACES ************ + */ + +// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path +export type SimulateIngestPipelineDoc = { + _index: string; + _id: string; + _source: {}; +}; + +// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path +export type SimulateIngestPipelineDocResponse = { + doc: SimulateIngestPipelineDoc & { + _ingest: { + timestamp: string; + }; + }; + error?: { + reason: string; + }; +}; + +// from https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest/#example-specify-a-pipeline-in-the-path +export type SimulateIngestPipelineResponse = { + docs: SimulateIngestPipelineDocResponse[]; +}; diff --git a/public/configs/ml_processor.ts b/public/configs/ml_processor.ts index ce797307..53a6bef2 100644 --- a/public/configs/ml_processor.ts +++ b/public/configs/ml_processor.ts @@ -21,22 +21,12 @@ export abstract class MLProcessor extends Processor { type: 'model', }, { - label: 'Input Map', id: 'inputMap', type: 'map', - // TODO: move these fields directly into the component once design is finalized - helpText: `An array specifying how to map fields from the ingested document to the model’s input.`, - helpLink: - 'https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/#configuration-parameters', }, { - label: 'Output Map', id: 'outputMap', type: 'map', - // TODO: move these fields directly into the component once design is finalized - helpText: `An array specifying how to map the model’s output to new fields.`, - helpLink: - 'https://opensearch.org/docs/latest/ingest-pipelines/processors/ml-inference/#configuration-parameters', }, ]; } diff --git a/public/pages/workflow_detail/workflow_inputs/config_field_list.tsx b/public/pages/workflow_detail/workflow_inputs/config_field_list.tsx index 8daf1022..01db981b 100644 --- a/public/pages/workflow_detail/workflow_inputs/config_field_list.tsx +++ b/public/pages/workflow_detail/workflow_inputs/config_field_list.tsx @@ -55,19 +55,19 @@ export function ConfigFieldList(props: ConfigFieldListProps) { ); break; } - case 'map': { - el = ( - - - - - ); - break; - } + // case 'map': { + // el = ( + // + // + // + // + // ); + // break; + // } // case 'json': { // el = ( // diff --git a/public/pages/workflow_detail/workflow_inputs/ingest_inputs/source_data.tsx b/public/pages/workflow_detail/workflow_inputs/ingest_inputs/source_data.tsx index d6ba2a8e..2e8a1937 100644 --- a/public/pages/workflow_detail/workflow_inputs/ingest_inputs/source_data.tsx +++ b/public/pages/workflow_detail/workflow_inputs/ingest_inputs/source_data.tsx @@ -65,6 +65,7 @@ export function SourceData(props: SourceDataProps) { {}} diff --git a/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx b/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx index 23714314..a27071e3 100644 --- a/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx +++ b/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx @@ -25,6 +25,9 @@ import { interface MapFieldProps { field: IConfigField; fieldPath: string; // the full path in string-form to the field (e.g., 'ingest.enrich.processors.text_embedding_processor.inputField') + label: string; + helpLink?: string; + helpText?: string; onFormChange: () => void; } @@ -60,17 +63,17 @@ export function MapField(props: MapFieldProps) { return ( - + Learn more ) : undefined } - helpText={props.field.helpText || undefined} + helpText={props.helpText || undefined} error={ getIn(errors, field.name) !== undefined && getIn(errors, field.name).length > 0 diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/index.ts b/public/pages/workflow_detail/workflow_inputs/processor_inputs/index.ts new file mode 100644 index 00000000..95257a19 --- /dev/null +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +export * from './ml_processor_inputs'; +export * from './processor_inputs'; diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx b/public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx new file mode 100644 index 00000000..9995303e --- /dev/null +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx @@ -0,0 +1,198 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import React, { useState } from 'react'; +import { useFormikContext } from 'formik'; +import { + EuiButton, + EuiButtonEmpty, + EuiCodeBlock, + EuiCodeEditor, + EuiFlexGroup, + EuiFlexItem, + EuiModal, + EuiModalBody, + EuiModalFooter, + EuiModalHeader, + EuiModalHeaderTitle, + EuiSpacer, + EuiText, +} from '@elastic/eui'; +import { + IProcessorConfig, + IngestPipelineConfig, + PROCESSOR_CONTEXT, + SimulateIngestPipelineDoc, + SimulateIngestPipelineResponse, + WorkflowConfig, + WorkflowFormValues, +} from '../../../../../common'; +import { formikToIngestPipeline, generateId } from '../../../../utils'; +import { simulatePipeline, useAppDispatch } from '../../../../store'; +import { getCore } from '../../../../services'; + +interface InputTransformModalProps { + uiConfig: WorkflowConfig; + config: IProcessorConfig; + context: PROCESSOR_CONTEXT; + onClose: () => void; + onConfirm: () => void; +} + +/** + * A modal to configure advanced JSON-to-JSON transforms into a model's expected input + */ +export function InputTransformModal(props: InputTransformModalProps) { + const dispatch = useAppDispatch(); + const { values } = useFormikContext(); + + // source input / transformed output state + const [sourceInput, setSourceInput] = useState('[]'); + const [transformedOutput, setTransformedOutput] = useState('TODO'); + + return ( + + + +

{`Configure input transform`}

+
+
+ + + + <> + Expected input + { + switch (props.context) { + case PROCESSOR_CONTEXT.INGEST: { + const curIngestPipeline = formikToIngestPipeline( + values, + props.uiConfig, + props.config.id + ); + // if there are preceding processors, we need to generate the ingest pipeline + // up to this point and simulate, in order to get the latest transformed + // version of the docs + if (curIngestPipeline !== undefined) { + const curDocs = prepareDocsForSimulate( + values.ingest.docs, + values.ingest.index.name + ); + await dispatch( + simulatePipeline({ + pipeline: curIngestPipeline as IngestPipelineConfig, + docs: curDocs, + }) + ) + .unwrap() + .then((resp: SimulateIngestPipelineResponse) => { + setSourceInput(unwrapTransformedDocs(resp)); + }) + .catch((error: any) => { + getCore().notifications.toasts.addDanger( + `Failed to fetch input schema` + ); + }); + } else { + setSourceInput(values.ingest.docs); + } + break; + } + // TODO: complete for search request / search response contexts + } + }} + > + Fetch + + + + {sourceInput} + + + + + <> + Define transform with JSONPath + + + + + + <> + Expected output + + + {transformedOutput} + + + + + + + Cancel + + Save + + +
+ ); +} + +// docs are expected to be in a certain format to be passed to the simulate ingest pipeline API. +// for details, see https://opensearch.org/docs/latest/ingest-pipelines/simulate-ingest +function prepareDocsForSimulate( + docs: string, + indexName: string +): SimulateIngestPipelineDoc[] { + const preparedDocs = [] as SimulateIngestPipelineDoc[]; + const docObjs = JSON.parse(docs) as {}[]; + docObjs.forEach((doc) => { + preparedDocs.push({ + _index: indexName, + _id: generateId(), + _source: doc, + }); + }); + return preparedDocs; +} + +// docs are returned in a certain format from the simulate ingest pipeline API. We want +// to format them into a more readable string to display +function unwrapTransformedDocs( + simulatePipelineResponse: SimulateIngestPipelineResponse +) { + let errorDuringSimulate = undefined as string | undefined; + const transformedDocsSources = simulatePipelineResponse.docs.map( + (transformedDoc) => { + if (transformedDoc.error !== undefined) { + errorDuringSimulate = transformedDoc.error.reason || ''; + } else { + return transformedDoc.doc._source; + } + } + ); + + // there is an edge case where simulate may fail if there is some server-side or OpenSearch issue when + // running ingest (e.g., hitting rate limits on remote model) + // We pull out any returned error from a document and propagate it to the user. + if (errorDuringSimulate !== undefined) { + getCore().notifications.toasts.addDanger( + `Failed to simulate ingest on all documents: ${errorDuringSimulate}` + ); + } + return JSON.stringify(transformedDocsSources, undefined, 2); +} diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx new file mode 100644 index 00000000..0db21fc4 --- /dev/null +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import React, { useState } from 'react'; +import { getIn, useFormikContext } from 'formik'; +import { EuiButton, EuiRadioGroup, EuiSpacer, EuiText } from '@elastic/eui'; +import { + WorkspaceFormValues, + IProcessorConfig, + IConfigField, + PROCESSOR_CONTEXT, + WorkflowConfig, +} from '../../../../../common'; +import { MapField, ModelField } from '../input_fields'; +import { isEmpty } from 'lodash'; +import { InputTransformModal } from './input_transform_modal'; +import { OutputTransformModal } from './output_transform_modal'; + +interface MLProcessorInputsProps { + uiConfig: WorkflowConfig; + config: IProcessorConfig; + baseConfigPath: string; // the base path of the nested config, if applicable. e.g., 'ingest.enrich' + onFormChange: () => void; + context: PROCESSOR_CONTEXT; +} + +/** + * Component to render ML processor inputs. Offers simple and advanced flows for configuring data transforms + * before and after executing an ML inference request + */ +export function MLProcessorInputs(props: MLProcessorInputsProps) { + const { values } = useFormikContext(); + + // extracting field info from the ML processor config + // TODO: have a better mechanism for guaranteeing the expected fields/config instead of hardcoding them here + const modelField = props.config.fields.find( + (field) => field.type === 'model' + ) as IConfigField; + const modelFieldPath = `${props.baseConfigPath}.${props.config.id}.${modelField.id}`; + const inputMapField = props.config.fields.find( + (field) => field.id === 'inputMap' + ) as IConfigField; + const inputMapFieldPath = `${props.baseConfigPath}.${props.config.id}.${inputMapField.id}`; + const outputMapField = props.config.fields.find( + (field) => field.id === 'outputMap' + ) as IConfigField; + const outputMapFieldPath = `${props.baseConfigPath}.${props.config.id}.${outputMapField.id}`; + + // advanced transformations modal state + const [isInputTransformModalOpen, setIsInputTransformModalOpen] = useState< + boolean + >(false); + const [isOutputTransformModalOpen, setIsOutputTransformModalOpen] = useState< + boolean + >(false); + + return ( + <> + {isInputTransformModalOpen && ( + setIsInputTransformModalOpen(false)} + onConfirm={() => { + console.log('saving transform input configuration...'); + setIsInputTransformModalOpen(false); + }} + /> + )} + {isOutputTransformModalOpen && ( + setIsOutputTransformModalOpen(false)} + onConfirm={() => { + console.log('saving transform output configuration...'); + setIsOutputTransformModalOpen(false); + }} + /> + )} + + {!isEmpty(getIn(values, modelFieldPath)?.id) && ( + <> + + {`Configure data transformations (optional)`} + + + { + setIsInputTransformModalOpen(true); + }} + > + Advanced input configuration + + + + + { + setIsOutputTransformModalOpen(true); + }} + > + Advanced output configuration + + + + + )} + + ); +} diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx b/public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx new file mode 100644 index 00000000..3e8403ee --- /dev/null +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import React from 'react'; +import { + EuiButton, + EuiButtonEmpty, + EuiModal, + EuiModalBody, + EuiModalFooter, + EuiModalHeader, + EuiModalHeaderTitle, + EuiText, +} from '@elastic/eui'; + +interface OutputTransformModalProps { + onClose: () => void; + onConfirm: () => void; +} + +/** + * A modal to configure advanced JSON-to-JSON transforms from a model's expected output + */ +export function OutputTransformModal(props: OutputTransformModalProps) { + return ( + + + +

{`Configure output transform`}

+
+
+ + TODO TODO TODO + + + Cancel + + Save + + +
+ ); +} diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/processor_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/processor_inputs/processor_inputs.tsx new file mode 100644 index 00000000..ce64962b --- /dev/null +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/processor_inputs.tsx @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import React from 'react'; +import { EuiFlexItem, EuiSpacer } from '@elastic/eui'; +import { + IProcessorConfig, + PROCESSOR_CONTEXT, + PROCESSOR_TYPE, + WorkflowConfig, +} from '../../../../../common'; +import { MLProcessorInputs } from './ml_processor_inputs'; + +/** + * Base component for rendering processor form inputs based on the processor type + */ + +interface ProcessorInputsProps { + uiConfig: WorkflowConfig; + config: IProcessorConfig; + baseConfigPath: string; // the base path of the nested config, if applicable. e.g., 'ingest.enrich' + onFormChange: () => void; + context: PROCESSOR_CONTEXT; +} + +const PROCESSOR_INPUTS_SPACER_SIZE = 'm'; + +export function ProcessorInputs(props: ProcessorInputsProps) { + const configType = props.config.type; + return ( + + {(() => { + let el; + switch (configType) { + case PROCESSOR_TYPE.ML: { + el = ( + + + + + ); + break; + } + } + return el; + })()} + + ); +} diff --git a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx index 225a3a17..d53f1937 100644 --- a/public/pages/workflow_detail/workflow_inputs/processors_list.tsx +++ b/public/pages/workflow_detail/workflow_inputs/processors_list.tsx @@ -18,19 +18,18 @@ import { import { cloneDeep } from 'lodash'; import { useFormikContext } from 'formik'; import { - IConfig, IProcessorConfig, PROCESSOR_CONTEXT, WorkflowConfig, WorkflowFormValues, } from '../../../../common'; -import { ConfigFieldList } from './config_field_list'; import { formikToUiConfig } from '../../../utils'; import { MLIngestProcessor, MLSearchRequestProcessor, MLSearchResponseProcessor, } from '../../../configs'; +import { ProcessorInputs } from './processor_inputs'; interface ProcessorsListProps { onFormChange: () => void; @@ -133,7 +132,7 @@ export function ProcessorsList(props: ProcessorsListProps) { return ( - {processors.map((processor: IConfig, processorIndex) => { + {processors.map((processor: IProcessorConfig, processorIndex) => { return ( @@ -153,7 +152,8 @@ export function ProcessorsList(props: ProcessorsListProps) { -
diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index bf6b2f78..e6dc6322 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -36,9 +36,9 @@ import { IngestInputs } from './ingest_inputs'; import { SearchInputs } from './search_inputs'; import { AppState, + bulk, deprovisionWorkflow, getWorkflow, - ingest, provisionWorkflow, removeDirty, searchIndex, @@ -52,6 +52,7 @@ import { configToTemplateFlows, hasProvisionedIngestResources, hasProvisionedSearchResources, + generateId, } from '../../../utils'; import { BooleanField } from './input_fields'; import { ExportOptions } from './export_options'; @@ -209,16 +210,16 @@ export function WorkflowInputs(props: WorkflowInputsProps) { try { let ingestDocsObjs = [] as {}[]; try { - // TODO: test with multiple objs, make sure parsing logic works - const ingestDocObj = JSON.parse(props.ingestDocs); - ingestDocsObjs = [ingestDocObj]; + ingestDocsObjs = JSON.parse(props.ingestDocs); } catch (e) {} if (ingestDocsObjs.length > 0 && !isEmpty(ingestDocsObjs[0])) { success = await validateAndUpdateWorkflow(); if (success) { - const indexName = values.ingest.index.name; - const doc = ingestDocsObjs[0]; - dispatch(ingest({ index: indexName, doc })) + const bulkBody = prepareBulkBody( + values.ingest.index.name, + ingestDocsObjs + ); + dispatch(bulk({ body: bulkBody })) .unwrap() .then(async (resp) => { props.setIngestResponse(JSON.stringify(resp, undefined, 2)); @@ -230,7 +231,9 @@ export function WorkflowInputs(props: WorkflowInputsProps) { }); } } else { - getCore().notifications.toasts.addDanger('No valid document provided'); + getCore().notifications.toasts.addDanger( + 'No valid document provided. Ensure it is a valid JSON array.' + ); } } catch (error) { console.error('Error ingesting documents: ', error); @@ -559,3 +562,19 @@ export function WorkflowInputs(props: WorkflowInputsProps) { ); } + +// ingesting multiple documents must follow the proper format for the bulk API. +// see https://opensearch.org/docs/latest/api-reference/document-apis/bulk/#request-body +function prepareBulkBody(indexName: string, docObjs: {}[]): {} { + const bulkBody = [] as any[]; + docObjs.forEach((doc) => { + bulkBody.push({ + index: { + _index: indexName, + _id: generateId(), + }, + }); + bulkBody.push(doc); + }); + return bulkBody; +} diff --git a/public/pages/workflows/workflow_list/workflow_list.tsx b/public/pages/workflows/workflow_list/workflow_list.tsx index a196722b..9e7c0a2a 100644 --- a/public/pages/workflows/workflow_list/workflow_list.tsx +++ b/public/pages/workflows/workflow_list/workflow_list.tsx @@ -152,7 +152,7 @@ export function WorkflowList(props: WorkflowListProps) { ); }) .catch((err: any) => { - getCore().notifications.toasts.addSuccess( + getCore().notifications.toasts.addDanger( `Failed to delete ${selectedWorkflow.name}` ); console.error( @@ -194,7 +194,7 @@ export function WorkflowList(props: WorkflowListProps) { diff --git a/public/route_service.ts b/public/route_service.ts index 782de01f..9e110c38 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -19,6 +19,10 @@ import { WorkflowTemplate, SEARCH_INDEX_NODE_API_PATH, INGEST_NODE_API_PATH, + SIMULATE_PIPELINE_NODE_API_PATH, + IngestPipelineConfig, + SimulateIngestPipelineDoc, + BULK_NODE_API_PATH, } from '../common'; /** @@ -48,7 +52,12 @@ export interface RouteService { searchPipeline?: string ) => Promise; ingest: (index: string, doc: {}) => Promise; + bulk: (body: {}, ingestPipeline?: string) => Promise; searchModels: (body: {}) => Promise; + simulatePipeline: (body: { + pipeline: IngestPipelineConfig; + docs: SimulateIngestPipelineDoc[]; + }) => Promise; } export function configureRoutes(core: CoreStart): RouteService { @@ -192,6 +201,19 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, + bulk: async (body: {}, ingestPipeline?: string) => { + try { + const path = ingestPipeline + ? `${BULK_NODE_API_PATH}/${ingestPipeline}` + : BULK_NODE_API_PATH; + const response = await core.http.post<{ respString: string }>(path, { + body: JSON.stringify(body), + }); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, searchModels: async (body: {}) => { try { const response = await core.http.post<{ respString: string }>( @@ -205,5 +227,21 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, + simulatePipeline: async (body: { + pipeline: IngestPipelineConfig; + docs: SimulateIngestPipelineDoc[]; + }) => { + try { + const response = await core.http.post<{ respString: string }>( + SIMULATE_PIPELINE_NODE_API_PATH, + { + body: JSON.stringify(body), + } + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, }; } diff --git a/public/store/reducers/opensearch_reducer.ts b/public/store/reducers/opensearch_reducer.ts index 44a92b06..c977a05b 100644 --- a/public/store/reducers/opensearch_reducer.ts +++ b/public/store/reducers/opensearch_reducer.ts @@ -5,7 +5,11 @@ import { createAsyncThunk, createSlice } from '@reduxjs/toolkit'; import { getRouteService } from '../../services'; -import { Index } from '../../../common'; +import { + Index, + IngestPipelineConfig, + SimulateIngestPipelineDoc, +} from '../../../common'; import { HttpFetchError } from '../../../../../src/core/public'; const initialState = { @@ -18,6 +22,8 @@ const OPENSEARCH_PREFIX = 'opensearch'; const CAT_INDICES_ACTION = `${OPENSEARCH_PREFIX}/catIndices`; const SEARCH_INDEX_ACTION = `${OPENSEARCH_PREFIX}/search`; const INGEST_ACTION = `${OPENSEARCH_PREFIX}/ingest`; +const BULK_ACTION = `${OPENSEARCH_PREFIX}/bulk`; +const SIMULATE_PIPELINE_ACTION = `${OPENSEARCH_PREFIX}/simulatePipeline`; export const catIndices = createAsyncThunk( CAT_INDICES_ACTION, @@ -75,6 +81,44 @@ export const ingest = createAsyncThunk( } ); +export const bulk = createAsyncThunk( + BULK_ACTION, + async ( + bulkInfo: { body: {}; ingestPipeline?: string }, + { rejectWithValue } + ) => { + const { body, ingestPipeline } = bulkInfo; + const response: any | HttpFetchError = await getRouteService().bulk( + body, + ingestPipeline + ); + if (response instanceof HttpFetchError) { + return rejectWithValue('Error performing bulk: ' + response.body.message); + } else { + return response; + } + } +); + +export const simulatePipeline = createAsyncThunk( + SIMULATE_PIPELINE_ACTION, + async ( + body: { pipeline: IngestPipelineConfig; docs: SimulateIngestPipelineDoc[] }, + { rejectWithValue } + ) => { + const response: + | any + | HttpFetchError = await getRouteService().simulatePipeline(body); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error simulating ingest pipeline: ' + response.body.message + ); + } else { + return response; + } + } +); + const opensearchSlice = createSlice({ name: OPENSEARCH_PREFIX, initialState, diff --git a/public/utils/config_to_form_utils.ts b/public/utils/config_to_form_utils.ts index fa6a3dc3..afc51414 100644 --- a/public/utils/config_to_form_utils.ts +++ b/public/utils/config_to_form_utils.ts @@ -42,7 +42,7 @@ function ingestConfigToFormik( let ingestFormikValues = {} as FormikValues; if (ingestConfig) { ingestFormikValues['enabled'] = ingestConfig.enabled; - ingestFormikValues['docs'] = ingestDocs || getInitialValue('json'); + ingestFormikValues['docs'] = ingestDocs || getInitialValue('jsonArray'); ingestFormikValues['enrich'] = processorsConfigToFormik( ingestConfig.enrich ); @@ -120,5 +120,8 @@ export function getInitialValue(fieldType: ConfigFieldType): ConfigFieldValue { case 'json': { return '{}'; } + case 'jsonArray': { + return '[]'; + } } } diff --git a/public/utils/config_to_schema_utils.ts b/public/utils/config_to_schema_utils.ts index 72d8055a..9ccb60c0 100644 --- a/public/utils/config_to_schema_utils.ts +++ b/public/utils/config_to_schema_utils.ts @@ -32,7 +32,7 @@ function ingestConfigToSchema( ): ObjectSchema { const ingestSchemaObj = {} as { [key: string]: Schema }; if (ingestConfig) { - ingestSchemaObj['docs'] = getFieldSchema('json'); + ingestSchemaObj['docs'] = getFieldSchema('jsonArray'); ingestSchemaObj['enrich'] = processorsConfigToSchema(ingestConfig.enrich); ingestSchemaObj['index'] = indexConfigToSchema(ingestConfig.index); } @@ -118,6 +118,21 @@ function getFieldSchema(fieldType: ConfigFieldType): Schema { } }); + break; + } + case 'jsonArray': { + baseSchema = yup + .string() + .test('jsonArray', 'Invalid JSON array', (value) => { + try { + // @ts-ignore + return Array.isArray(JSON.parse(value)); + return true; + } catch (error) { + return false; + } + }); + break; } } diff --git a/public/utils/config_to_template_utils.ts b/public/utils/config_to_template_utils.ts index 3ff21934..641ffa65 100644 --- a/public/utils/config_to_template_utils.ts +++ b/public/utils/config_to_template_utils.ts @@ -131,7 +131,7 @@ function searchConfigToTemplateNodes( // General fn to process all processor configs and convert them // into a final list of template-formatted IngestProcessor/SearchProcessors. -function processorConfigsToTemplateProcessors( +export function processorConfigsToTemplateProcessors( processorConfigs: IProcessorConfig[] ): (IngestProcessor | SearchProcessor)[] { const processorsList = [] as (IngestProcessor | SearchProcessor)[]; diff --git a/public/utils/form_to_pipeline_utils.ts b/public/utils/form_to_pipeline_utils.ts new file mode 100644 index 00000000..21f7f957 --- /dev/null +++ b/public/utils/form_to_pipeline_utils.ts @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { isEmpty } from 'lodash'; +import { + IProcessorConfig, + IngestPipelineConfig, + SearchPipelineConfig, + WorkflowConfig, + WorkflowFormValues, +} from '../../common'; +import { formikToUiConfig } from './form_to_config_utils'; +import { processorConfigsToTemplateProcessors } from './config_to_template_utils'; + +/* + **************** Form -> pipeline utils ********************** + Collection of utility fns for converting the current form state + to partial/in-progress ingest and search pipelines to run + and collect their current outputs. Primarily used for determining + the input schema at a certain stage of a pipeline. + */ + +export function formikToIngestPipeline( + values: WorkflowFormValues, + existingConfig: WorkflowConfig, + curProcessorId: string +): IngestPipelineConfig | SearchPipelineConfig | undefined { + const uiConfig = formikToUiConfig(values, existingConfig); + const precedingProcessors = getPrecedingProcessors( + uiConfig.ingest.enrich.processors, + curProcessorId + ); + if (!isEmpty(precedingProcessors)) { + return { + processors: processorConfigsToTemplateProcessors(precedingProcessors), + } as IngestPipelineConfig | SearchPipelineConfig; + } + return undefined; +} + +function getPrecedingProcessors( + allProcessors: IProcessorConfig[], + curProcessorId: string +): IProcessorConfig[] { + const precedingProcessors = [] as IProcessorConfig[]; + allProcessors.some((processor) => { + if (processor.id === curProcessorId) { + return true; + } else { + precedingProcessors.push(processor); + } + }); + return precedingProcessors; +} diff --git a/public/utils/index.ts b/public/utils/index.ts index 696b7021..1e340a10 100644 --- a/public/utils/index.ts +++ b/public/utils/index.ts @@ -10,3 +10,4 @@ export * from './config_to_form_utils'; export * from './config_to_workspace_utils'; export * from './config_to_schema_utils'; export * from './form_to_config_utils'; +export * from './form_to_pipeline_utils'; diff --git a/public/utils/utils.ts b/public/utils/utils.ts index d38347a5..80eaac7d 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -7,12 +7,14 @@ import yaml from 'js-yaml'; import { WORKFLOW_STEP_TYPE, Workflow } from '../../common'; // Append 16 random characters -export function generateId(prefix: string): string { +export function generateId(prefix?: string): string { const uniqueChar = () => { // eslint-disable-next-line no-bitwise return (((1 + Math.random()) * 0x10000) | 0).toString(16).substring(1); }; - return `${prefix}_${uniqueChar()}${uniqueChar()}${uniqueChar()}${uniqueChar()}`; + return `${ + prefix || '' + }_${uniqueChar()}${uniqueChar()}${uniqueChar()}${uniqueChar()}`; } export function hasProvisionedIngestResources( diff --git a/server/routes/opensearch_routes_service.ts b/server/routes/opensearch_routes_service.ts index b1440f97..084e2a3b 100644 --- a/server/routes/opensearch_routes_service.ts +++ b/server/routes/opensearch_routes_service.ts @@ -12,10 +12,15 @@ import { OpenSearchDashboardsResponseFactory, } from '../../../../src/core/server'; import { + BULK_NODE_API_PATH, CAT_INDICES_NODE_API_PATH, INGEST_NODE_API_PATH, Index, + IngestPipelineConfig, SEARCH_INDEX_NODE_API_PATH, + SIMULATE_PIPELINE_NODE_API_PATH, + SimulateIngestPipelineDoc, + SimulateIngestPipelineResponse, } from '../../common'; import { generateCustomError } from './helpers'; @@ -75,6 +80,39 @@ export function registerOpenSearchRoutes( }, opensearchRoutesService.ingest ); + router.post( + { + path: `${BULK_NODE_API_PATH}/{pipeline}`, + validate: { + params: schema.object({ + pipeline: schema.string(), + }), + body: schema.any(), + }, + }, + opensearchRoutesService.bulk + ); + router.post( + { + path: BULK_NODE_API_PATH, + validate: { + body: schema.any(), + }, + }, + opensearchRoutesService.bulk + ); + router.post( + { + path: SIMULATE_PIPELINE_NODE_API_PATH, + validate: { + body: schema.object({ + pipeline: schema.any(), + docs: schema.any(), + }), + }, + }, + opensearchRoutesService.simulatePipeline + ); } export class OpenSearchRoutesService { @@ -156,4 +194,49 @@ export class OpenSearchRoutesService { return generateCustomError(res, err); } }; + + bulk = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { pipeline } = req.params as { + pipeline: string | undefined; + }; + const body = req.body; + + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('bulk', { + body, + pipeline, + }); + + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + simulatePipeline = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { pipeline, docs } = req.body as { + pipeline: IngestPipelineConfig; + docs: SimulateIngestPipelineDoc[]; + }; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('ingest.simulate', { body: { pipeline, docs } }); + return res.ok({ + body: { docs: response.docs } as SimulateIngestPipelineResponse, + }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; }