From 894fe9e7dfbb98e77ccbd62e86bb1549c685ea89 Mon Sep 17 00:00:00 2001 From: Divyansh Bokadia Date: Wed, 30 Oct 2024 09:48:32 -0500 Subject: [PATCH] Support for dynamic renaming of keys (#5074) Dynamic renaming of keys Signed-off-by: Divyansh Bokadia Co-authored-by: Divyansh Bokadia --- .../model/event/JacksonEventKey.java | 8 +- .../model/event/JacksonEventTest.java | 1 - .../mutateevent/RenameKeyProcessor.java | 33 +++++- .../mutateevent/RenameKeyProcessorConfig.java | 31 ++++- .../mutateevent/RenameKeyProcessorTests.java | 108 +++++++++++++++--- 5 files changed, 152 insertions(+), 29 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java index f81edc34fc..28dbe1a29e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java @@ -125,7 +125,10 @@ public String toString() { } private String checkAndTrimKey(final String key) { - checkKey(key); + if(!supportedActions.equals(Collections.singleton(EventKeyFactory.EventAction.DELETE))) + { + checkKey(key); + } return trimTrailingSlashInKey(key); } @@ -162,7 +165,8 @@ private static boolean isValidKey(final String key) { || c == '@' || c == '/' || c == '[' - || c == ']')) { + || c == ']' + )) { return false; } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index d9813846fa..d16bc345c8 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -526,7 +526,6 @@ public void testKey_withNullKey_throwsNullPointerException() { private void assertThrowsForKeyCheck(final Class expectedThrowable, final String key) { assertThrows(expectedThrowable, () -> event.put(key, UUID.randomUUID())); assertThrows(expectedThrowable, () -> event.get(key, String.class)); - assertThrows(expectedThrowable, () -> event.delete(key)); } @Test diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java index b75aef2f52..b4bddc6acb 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessor.java @@ -21,7 +21,9 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.regex.Pattern; @DataPrepperPlugin(name = "rename_keys", pluginType = Processor.class, pluginConfigurationType = RenameKeyProcessorConfig.class) public class RenameKeyProcessor extends AbstractProcessor, Record> { @@ -44,6 +46,12 @@ public RenameKeyProcessor(final PluginMetrics pluginMetrics, final RenameKeyProc String.format("rename_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getRenameWhen())); } + if (entry.getFromKey() == null && entry.getFromKeyPattern() == null) { + throw new InvalidPluginConfigurationException("Either from_key or from_key_pattern must be specified. Both cannot be set together."); + } + if (entry.getFromKey() != null && entry.getFromKeyPattern() != null) { + throw new InvalidPluginConfigurationException("Only one of from_key or from_key_pattern should be specified."); + } }); } @@ -59,14 +67,29 @@ public Collection> doExecute(final Collection> recor continue; } - if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) { + if (Objects.nonNull(entry.getFromKey()) && (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey()))) { continue; } - if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) { - final Object source = recordEvent.get(entry.getFromKey(), Object.class); - recordEvent.put(entry.getToKey(), source); - recordEvent.delete(entry.getFromKey()); + if(Objects.nonNull(entry.getFromKey())) { + final Object source = recordEvent.get(entry.getFromKey(), Object.class); + recordEvent.put(entry.getToKey(), source); + recordEvent.delete(entry.getFromKey()); + } + if(Objects.nonNull(entry.getFromKeyCompiledPattern())) { + Map eventMap = recordEvent.toMap(); + Pattern fromKeyCompiledPattern = entry.getFromKeyCompiledPattern(); + for (Map.Entry eventEntry : eventMap.entrySet()) { + final String key = eventEntry.getKey(); + final Object value = eventEntry.getValue(); + if (fromKeyCompiledPattern.matcher(key).matches()) { + recordEvent.put(entry.getToKey(), value); + recordEvent.delete(key); + if(!entry.getOverwriteIfToKeyExists()) break; + + } + } + } } } } catch (final Exception e) { diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java index 3969b8b23f..261c814791 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorConfig.java @@ -5,10 +5,11 @@ package org.opensearch.dataprepper.plugins.processor.mutateevent; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -17,19 +18,22 @@ import org.opensearch.dataprepper.model.event.EventKeyFactory; import java.util.List; +import java.util.regex.Pattern; @JsonPropertyOrder @JsonClassDescription("The rename_keys processor renames keys in an event.") public class RenameKeyProcessorConfig { @JsonPropertyOrder public static class Entry { - @NotEmpty - @NotNull @JsonProperty("from_key") @JsonPropertyDescription("The key of the entry to be renamed.") @EventKeyConfiguration({EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE}) private EventKey fromKey; + @JsonProperty("from_key_regex") + @JsonPropertyDescription("The regex pattern of the key of the entry to be renamed.") + private String fromKeyRegex; + @NotEmpty @NotNull @JsonProperty("to_key") @@ -47,10 +51,16 @@ public static class Entry { "run on the event. By default, all events will be processed unless otherwise stated.") private String renameWhen; + private Pattern fromKeyCompiledPattern; + public EventKey getFromKey() { return fromKey; } + public String getFromKeyPattern() { + return fromKeyRegex; + } + public EventKey getToKey() { return toKey; } @@ -59,10 +69,21 @@ public boolean getOverwriteIfToKeyExists() { return overwriteIfToKeyExists; } - public String getRenameWhen() { return renameWhen; } + public String getRenameWhen() { + return renameWhen; + } + + @JsonIgnore + public Pattern getFromKeyCompiledPattern() { + if (fromKeyRegex != null && fromKeyCompiledPattern == null) { + fromKeyCompiledPattern = Pattern.compile(fromKeyRegex); + } + return fromKeyCompiledPattern; + } - public Entry(final EventKey fromKey, final EventKey toKey, final boolean overwriteIfKeyExists, final String renameWhen) { + public Entry(final EventKey fromKey, final String fromKeyPattern, final EventKey toKey, final boolean overwriteIfKeyExists, final String renameWhen) { this.fromKey = fromKey; + this.fromKeyRegex = fromKeyPattern; this.toKey = toKey; this.overwriteIfToKeyExists = overwriteIfKeyExists; this.renameWhen = renameWhen; diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java index 3cdf47e344..d9d0c67dc4 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/RenameKeyProcessorTests.java @@ -19,13 +19,15 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.util.Arrays; +import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.UUID; +import java.util.LinkedList; +import java.util.Arrays; +import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -49,7 +51,7 @@ public class RenameKeyProcessorTests { @Test void invalid_rename_when_throws_InvalidPluginConfigurationException() { final String renameWhen = UUID.randomUUID().toString(); - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, renameWhen))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", true, renameWhen))); when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(false); @@ -57,9 +59,20 @@ void invalid_rename_when_throws_InvalidPluginConfigurationException() { assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); } + @Test + void invalid_config_when_both_from_key_empty_throws_InvalidPluginConfigurationException() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,null, "newMessage", true, null))); + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void invalid_config_when_both_from_key_set_throws_InvalidPluginConfigurationException() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message","m.*", "newMessage", true, null))); + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } @Test public void testSingleOverwriteRenameProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", true, null))); final RenameKeyProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -73,7 +86,7 @@ public void testSingleOverwriteRenameProcessorTests() { @Test public void testSingleNoOverwriteRenameProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", false, null))); final RenameKeyProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -85,9 +98,55 @@ public void testSingleNoOverwriteRenameProcessorTests() { assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("test2")); } + @Test + public void testFromKeyPatternNoOverwriteRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "(detailed_timestamp|detail_timestamp).*","detailed_timestamp", false, null))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("detailed_timestamp_1004", "test2"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp_1004"), is(false)); + assertThat(editedRecords.get(0).getData().get("detailed_timestamp", Object.class), equalTo("test2")); + } + @Test + public void testFromKeyPatternGroupingPatternRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "(detailed_timestamp|detail_timestamp).*","detailed_timestamp", false, null))); + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("detailed_timestamp_1004", "test2"); + record.getData().put("test_key","test_value"); + final Record second_record = getEvent("thisisanewmessage"); + second_record.getData().put("detail_timestamp-123", "test3"); + Collection> records = new ArrayList<>(); + records.add(record); + records.add(second_record); + final List> editedRecords = (List>) processor.doExecute(records); + assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("test_key"), is(true)); + assertThat(editedRecords.get(1).getData().containsKey("detailed_timestamp"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp_1004"), is(false)); + assertThat(editedRecords.get(1).getData().containsKey("detail_timestamp-123"), is(false)); + assertThat(editedRecords.get(0).getData().get("detailed_timestamp", Object.class), equalTo("test2")); + assertThat(editedRecords.get(1).getData().get("detailed_timestamp", Object.class), equalTo("test3")); + } + @Test + public void testFromKeyPatternOverwriteRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "me.*","newMessage", true, null))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + record.getData().put("newMessage", "test2"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(false)); + assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage")); + } + @Test public void testFromKeyDneRenameProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2", "newMessage", false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2",null, "newMessage", false, null))); final RenameKeyProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -100,8 +159,8 @@ public void testFromKeyDneRenameProcessorTests() { @Test public void testMultiMixedOverwriteRenameProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null), - createEntry("message2", "existingMessage", false, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", true, null), + createEntry("message2",null, "existingMessage", false, null))); final RenameKeyProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -118,8 +177,23 @@ public void testMultiMixedOverwriteRenameProcessorTests() { @Test public void testChainRenamingRenameProcessorTests() { - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null), - createEntry("newMessage", "message3", true, null))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", true, null), + createEntry("newMessage", null,"message3", true, null))); + + final RenameKeyProcessor processor = createObjectUnderTest(); + final Record record = getEvent("thisisamessage"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message3"), is(true)); + assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(false)); + assertThat(editedRecords.get(0).getData().containsKey("message"), is(false)); + assertThat(editedRecords.get(0).getData().get("message3", Object.class), equalTo("thisisamessage")); + } + + @Test + public void testChainRenamingFromKeyPatternRenameProcessorTests() { + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,"me.*", "newMessage", true, null), + createEntry(null, "new.*","message3", true, null))); final RenameKeyProcessor processor = createObjectUnderTest(); final Record record = getEvent("thisisamessage"); @@ -135,7 +209,7 @@ public void testChainRenamingRenameProcessorTests() { public void testNoRename_when_RenameWhen_returns_false() { final String renameWhen = UUID.randomUUID().toString(); - when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, renameWhen))); + when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", false, renameWhen))); when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(true); final RenameKeyProcessor processor = createObjectUnderTest(); @@ -150,14 +224,16 @@ public void testNoRename_when_RenameWhen_returns_false() { assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage")); } + + private RenameKeyProcessor createObjectUnderTest() { return new RenameKeyProcessor(pluginMetrics, mockConfig, expressionEvaluator); } - private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) { - final EventKey fromEventKey = eventKeyFactory.createEventKey(fromKey); + private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String fromKeyPattern, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) { + final EventKey fromEventKey = (fromKey == null) ? null : eventKeyFactory.createEventKey(fromKey); final EventKey toEventKey = eventKeyFactory.createEventKey(toKey); - return new RenameKeyProcessorConfig.Entry(fromEventKey, toEventKey, overwriteIfToKeyExists, renameWhen); + return new RenameKeyProcessorConfig.Entry(fromEventKey,fromKeyPattern, toEventKey, overwriteIfToKeyExists, renameWhen); } private List createListOfEntries(final RenameKeyProcessorConfig.Entry... entries) {