Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map_to_list processor #3945

Merged
merged 5 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,63 @@ the last element will be kept:
* `flattened_element` - (optional) - Valid options are "first" and "last", default is "first". This specifies which element, first one or last one, to keep if `flatten` option is true.


## map_to_list Processor
A processor that converts a map of key-value pairs to a list of objects, each contains the key and value in separate fields.

For example, if the input event has the following data:
```json
{
"my-map": {
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
}
```
with `map_to_list` processor configured to:
```yaml
...
processor:
- map_to_list:
source: "my-map"
target: "my-list"
...
```
The processed event will have the following data:
```json
{
"my-list": [
{
"key": "key1",
"value": "value1"
},
{
"key": "key2",
"value": "value2"
},
{
"key": "key3",
"value": "value3"
}
],
"my-map": {
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
}
```

### Configuration
* `source` - (required): the source map to perform the operation
* `target` - (required): the target list to put the converted list
* `key_name` - (optional): the key name of the field to hold the original key, default is "key"
* `value_name` - (optional): the key name of the field to hold the original value, default is "value"
* `exclude_keys` - (optional): the keys in source map that will be excluded from processing, default is empty list
* `remove_processed_fields` - (optional): default is false; if true, will remove processed fields from source map
* `map_to_list_when` - (optional): used to configure a condition for event processing based on certain property of the incoming event. Default is null (all events will be processed).


## Developer Guide
This plugin is compatible with Java 11 and 17. Refer to the following developer guides for plugin development:
- [Developer Guide](https://github.com/opensearch-project/data-prepper/blob/main/docs/developer_guide.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

@DataPrepperPlugin(name = "map_to_list", pluginType = Processor.class, pluginConfigurationType = MapToListProcessorConfig.class)
public class MapToListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(MapToListProcessor.class);
private final MapToListProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;
private final Set<String> excludeKeySet = new HashSet<>();

@DataPrepperPluginConstructor
public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.config = config;
this.expressionEvaluator = expressionEvaluator;
excludeKeySet.addAll(config.getExcludeKeys());
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();

if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) {
continue;
}

try {
final Map<String, Object> sourceMap = recordEvent.get(config.getSource(), Map.class);
final List<Map<String, Object>> targetList = new ArrayList<>();

Map<String, Object> modifiedSourceMap = new HashMap<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (excludeKeySet.contains(entry.getKey())) {
if (config.getRemoveProcessedFields()) {
modifiedSourceMap.put(entry.getKey(), entry.getValue());
}
continue;
}
targetList.add(Map.of(
config.getKeyName(), entry.getKey(),
config.getValueName(), entry.getValue()
));
}

if (config.getRemoveProcessedFields()) {
recordEvent.put(config.getSource(), modifiedSourceMap);
}

recordEvent.put(config.getTarget(), targetList);
} catch (Exception e) {
LOG.error("Fail to perform Map to List operation", e);
//TODO: add tagging on failure
}
}
return records;
}

@Override
public void prepareForShutdown() {
}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.ArrayList;
import java.util.List;

public class MapToListProcessorConfig {
private static final String DEFAULT_KEY_NAME = "key";
private static final String DEFAULT_VALUE_NAME = "value";
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false;

@NotEmpty
@NotNull
@JsonProperty("source")
private String source;

@NotEmpty
@NotNull
@JsonProperty("target")
private String target;

@JsonProperty("key_name")
private String keyName = DEFAULT_KEY_NAME;

@JsonProperty("value_name")
private String valueName = DEFAULT_VALUE_NAME;

@JsonProperty("map_to_list_when")
private String mapToListWhen;

@JsonProperty("exclude_keys")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

@JsonProperty("remove_processed_fields")
private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS;

public String getSource() {
return source;
}

public String getTarget() {
return target;
}

public String getKeyName() {
return keyName;
}

public String getValueName() {
return valueName;
}

public String getMapToListWhen() {
return mapToListWhen;
}

public List<String> getExcludeKeys() {
return excludeKeys;
}

public boolean getRemoveProcessedFields() {
return removeProcessedFields;
}
}
Loading
Loading