diff --git a/data-prepper-plugins/mutate-event-processors/README.md b/data-prepper-plugins/mutate-event-processors/README.md index eb1398e154..e22c2195df 100644 --- a/data-prepper-plugins/mutate-event-processors/README.md +++ b/data-prepper-plugins/mutate-event-processors/README.md @@ -494,6 +494,63 @@ the last element will be kept: * `flattened_element` - (optional) - Valid options are "first" and "last", default is "first". This specifies which element, first one or last one, to keep if `flatten` option is true. +## map_to_list Processor +A processor that converts a map of key-value pairs to a list of objects, each contains the key and value in separate fields. + +For example, if the input event has the following data: +```json +{ + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` +with `map_to_list` processor configured to: +```yaml +... +processor: + - map_to_list: + source: "my-map" + target: "my-list" +... +``` +The processed event will have the following data: +```json +{ + "my-list": [ + { + "key": "key1", + "value": "value1" + }, + { + "key": "key2", + "value": "value2" + }, + { + "key": "key3", + "value": "value3" + } + ], + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +### Configuration +* `source` - (required): the source map to perform the operation +* `target` - (required): the target list to put the converted list +* `key_name` - (optional): the key name of the field to hold the original key, default is "key" +* `value_name` - (optional): the key name of the field to hold the original value, default is "value" +* `exclude_keys` - (optional): the keys in source map that will be excluded from processing, default is empty list +* `remove_processed_fields` - (optional): default is false; if true, will remove processed fields from source map +* `map_to_list_when` - (optional): used to configure a condition for event processing based on certain property of the incoming event. Default is null (all events will be processed). + + ## Developer Guide This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development: - [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md) diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java new file mode 100644 index 0000000000..49e74680fa --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@DataPrepperPlugin(name = "map_to_list", pluginType = Processor.class, pluginConfigurationType = MapToListProcessorConfig.class) +public class MapToListProcessor extends AbstractProcessor, Record> { + private static final Logger LOG = LoggerFactory.getLogger(MapToListProcessor.class); + private final MapToListProcessorConfig config; + private final ExpressionEvaluator expressionEvaluator; + private final Set excludeKeySet = new HashSet<>(); + + @DataPrepperPluginConstructor + public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics); + this.config = config; + this.expressionEvaluator = expressionEvaluator; + excludeKeySet.addAll(config.getExcludeKeys()); + } + + @Override + public Collection> doExecute(final Collection> records) { + for (final Record record : records) { + final Event recordEvent = record.getData(); + + if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) { + continue; + } + + try { + final Map sourceMap = recordEvent.get(config.getSource(), Map.class); + final List> targetList = new ArrayList<>(); + + Map modifiedSourceMap = new HashMap<>(); + for (final Map.Entry entry : sourceMap.entrySet()) { + if (excludeKeySet.contains(entry.getKey())) { + if (config.getRemoveProcessedFields()) { + modifiedSourceMap.put(entry.getKey(), entry.getValue()); + } + continue; + } + targetList.add(Map.of( + config.getKeyName(), entry.getKey(), + config.getValueName(), entry.getValue() + )); + } + + if (config.getRemoveProcessedFields()) { + recordEvent.put(config.getSource(), modifiedSourceMap); + } + + recordEvent.put(config.getTarget(), targetList); + } catch (Exception e) { + LOG.error("Fail to perform Map to List operation", e); + //TODO: add tagging on failure + } + } + return records; + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java new file mode 100644 index 0000000000..3d863ea784 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +import java.util.ArrayList; +import java.util.List; + +public class MapToListProcessorConfig { + private static final String DEFAULT_KEY_NAME = "key"; + private static final String DEFAULT_VALUE_NAME = "value"; + private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); + private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false; + + @NotEmpty + @NotNull + @JsonProperty("source") + private String source; + + @NotEmpty + @NotNull + @JsonProperty("target") + private String target; + + @JsonProperty("key_name") + private String keyName = DEFAULT_KEY_NAME; + + @JsonProperty("value_name") + private String valueName = DEFAULT_VALUE_NAME; + + @JsonProperty("map_to_list_when") + private String mapToListWhen; + + @JsonProperty("exclude_keys") + private List excludeKeys = DEFAULT_EXCLUDE_KEYS; + + @JsonProperty("remove_processed_fields") + private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS; + + public String getSource() { + return source; + } + + public String getTarget() { + return target; + } + + public String getKeyName() { + return keyName; + } + + public String getValueName() { + return valueName; + } + + public String getMapToListWhen() { + return mapToListWhen; + } + + public List getExcludeKeys() { + return excludeKeys; + } + + public boolean getRemoveProcessedFields() { + return removeProcessedFields; + } +} diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java new file mode 100644 index 0000000000..c748e676f4 --- /dev/null +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutateevent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class MapToListProcessorTest { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private MapToListProcessorConfig mockConfig; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @BeforeEach + void setUp() { + lenient().when(mockConfig.getSource()).thenReturn("my-map"); + lenient().when(mockConfig.getTarget()).thenReturn("my-list"); + lenient().when(mockConfig.getKeyName()).thenReturn("key"); + lenient().when(mockConfig.getValueName()).thenReturn("value"); + lenient().when(mockConfig.getMapToListWhen()).thenReturn(null); + lenient().when(mockConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); + lenient().when(mockConfig.getRemoveProcessedFields()).thenReturn(false); + } + + @Test + void testMapToListSuccessWithDefaultOptions() { + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(3)); + assertThat(resultList, containsInAnyOrder( + Map.of("key", "key1", "value", "value1"), + Map.of("key", "key2", "value", "value2"), + Map.of("key", "key3", "value", "value3") + )); + assertThat(resultEvent.containsKey("my-map"), is(true)); + assertSourceMapUnchanged(resultEvent); + } + + @Test + void testMapToListSuccessWithCustomKeyNameValueName() { + final String keyName = "custom-key-name"; + final String valueName = "custom-value-name"; + when(mockConfig.getKeyName()).thenReturn(keyName); + when(mockConfig.getValueName()).thenReturn(valueName); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(3)); + assertThat(resultList, containsInAnyOrder( + Map.of(keyName, "key1", valueName, "value1"), + Map.of(keyName, "key2", valueName, "value2"), + Map.of(keyName, "key3", valueName, "value3") + )); + assertSourceMapUnchanged(resultEvent); + } + + @Test + void testEventNotProcessedWhenSourceNotExistInEvent() { + when(mockConfig.getSource()).thenReturn("my-other-map"); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + assertThat(resultEvent.containsKey("my-list"), is(false)); + assertSourceMapUnchanged(resultEvent); + } + + @Test + void testExcludedKeysAreNotProcessed() { + when(mockConfig.getExcludeKeys()).thenReturn(List.of("key1", "key3", "key5")); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(1)); + assertThat(resultList.get(0), is(Map.of("key", "key2", "value", "value2"))); + assertSourceMapUnchanged(resultEvent); + } + + @Test + void testRemoveProcessedFields() { + when(mockConfig.getExcludeKeys()).thenReturn(List.of("key1", "key3", "key5")); + when(mockConfig.getRemoveProcessedFields()).thenReturn(true); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(1)); + assertThat(resultList.get(0), is(Map.of("key", "key2", "value", "value2"))); + + assertThat(resultEvent.containsKey("my-map"), is(true)); + assertThat(resultEvent.containsKey("my-map/key1"), is(true)); + assertThat(resultEvent.get("my-map/key1", String.class), is("value1")); + assertThat(resultEvent.containsKey("my-map/key2"), is(false)); + assertThat(resultEvent.containsKey("my-map/key3"), is(true)); + assertThat(resultEvent.get("my-map/key3", String.class), is("value3")); + } + + @Test + public void testEventNotProcessedWhenTheWhenConditionIsFalse() { + final String whenCondition = UUID.randomUUID().toString(); + when(mockConfig.getMapToListWhen()).thenReturn(whenCondition); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + when(expressionEvaluator.evaluateConditional(whenCondition, testRecord.getData())).thenReturn(false); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + assertThat(resultEvent.containsKey("my-list"), is(false)); + assertSourceMapUnchanged(resultEvent); + } + + private MapToListProcessor createObjectUnderTest() { + return new MapToListProcessor(pluginMetrics, mockConfig, expressionEvaluator); + } + + private Record createTestRecord() { + final Map> data = Map.of("my-map", Map.of( + "key1", "value1", + "key2", "value2", + "key3", "value3")); + final Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + return new Record<>(event); + } + + private void assertSourceMapUnchanged(final Event resultEvent) { + assertThat(resultEvent.containsKey("my-map"), is(true)); + assertThat(resultEvent.get("my-map/key1", String.class), is("value1")); + assertThat(resultEvent.get("my-map/key2", String.class), is("value2")); + assertThat(resultEvent.get("my-map/key3", String.class), is("value3")); + } +}