Skip to content

Commit

Permalink
Support for dynamic renaming of keys
Browse files Browse the repository at this point in the history
Signed-off-by: Divyansh Bokadia <[email protected]>

Dynamic renaming of keys

Signed-off-by: Divyansh Bokadia <[email protected]>
  • Loading branch information
Divyansh Bokadia committed Oct 29, 2024
1 parent 967c956 commit 86ba9b7
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ public String toString() {
}

private String checkAndTrimKey(final String key) {
checkKey(key);
if(!supportedActions.equals(Collections.singleton(EventKeyFactory.EventAction.DELETE)))
{
checkKey(key);
}
return trimTrailingSlashInKey(key);
}

Expand Down Expand Up @@ -161,7 +164,8 @@ private static boolean isValidKey(final String key) {
|| c == '@'
|| c == '/'
|| c == '['
|| c == ']')) {
|| c == ']'
)) {

return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ public void testKey_withNullKey_throwsNullPointerException() {
private <T extends Throwable> void assertThrowsForKeyCheck(final Class<T> expectedThrowable, final String key) {
assertThrows(expectedThrowable, () -> event.put(key, UUID.randomUUID()));
assertThrows(expectedThrowable, () -> event.get(key, String.class));
assertThrows(expectedThrowable, () -> event.delete(key));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

@DataPrepperPlugin(name = "rename_keys", pluginType = Processor.class, pluginConfigurationType = RenameKeyProcessorConfig.class)
public class RenameKeyProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
Expand All @@ -44,6 +46,12 @@ public RenameKeyProcessor(final PluginMetrics pluginMetrics, final RenameKeyProc
String.format("rename_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
entry.getRenameWhen()));
}
if (entry.getFromKey() == null && entry.getFromKeyPattern() == null) {
throw new InvalidPluginConfigurationException("Either from_key or from_key_pattern must be specified. Both cannot be set together.");
}
if (entry.getFromKey() != null && entry.getFromKeyPattern() != null) {
throw new InvalidPluginConfigurationException("Only one of from_key or from_key_pattern should be specified.");
}
});
}

Expand All @@ -59,14 +67,29 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
continue;
}

if (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey())) {
if (Objects.nonNull(entry.getFromKey()) && (entry.getFromKey().equals(entry.getToKey()) || !recordEvent.containsKey(entry.getFromKey()))) {
continue;
}

if (!recordEvent.containsKey(entry.getToKey()) || entry.getOverwriteIfToKeyExists()) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
recordEvent.delete(entry.getFromKey());
if(Objects.nonNull(entry.getFromKey())) {
final Object source = recordEvent.get(entry.getFromKey(), Object.class);
recordEvent.put(entry.getToKey(), source);
recordEvent.delete(entry.getFromKey());
}
if(Objects.nonNull(entry.getFromKeyCompiledPattern())) {
Map<String,Object> eventMap = recordEvent.toMap();
Pattern fromKeyCompiledPattern = entry.getFromKeyCompiledPattern();
for (Map.Entry<String, Object> eventEntry : eventMap.entrySet()) {
final String key = eventEntry.getKey();
final Object value = eventEntry.getValue();
if (fromKeyCompiledPattern.matcher(key).matches()) {
recordEvent.put(entry.getToKey(), value);
recordEvent.delete(key);
if(!entry.getOverwriteIfToKeyExists()) break;

}
}
}
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
Expand All @@ -17,19 +18,22 @@
import org.opensearch.dataprepper.model.event.EventKeyFactory;

import java.util.List;
import java.util.regex.Pattern;

@JsonPropertyOrder
@JsonClassDescription("The <code>rename_keys</code> processor renames keys in an event.")
public class RenameKeyProcessorConfig {
@JsonPropertyOrder
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("from_key")
@JsonPropertyDescription("The key of the entry to be renamed.")
@EventKeyConfiguration({EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE})
private EventKey fromKey;

@JsonProperty("from_key_regex")
@JsonPropertyDescription("The regex pattern of the key of the entry to be renamed.")
private String fromKeyRegex;

@NotEmpty
@NotNull
@JsonProperty("to_key")
Expand All @@ -47,10 +51,16 @@ public static class Entry {
"run on the event. By default, all events will be processed unless otherwise stated.")
private String renameWhen;

private Pattern fromKeyCompiledPattern;

public EventKey getFromKey() {
return fromKey;
}

public String getFromKeyPattern() {
return fromKeyRegex;
}

public EventKey getToKey() {
return toKey;
}
Expand All @@ -59,10 +69,21 @@ public boolean getOverwriteIfToKeyExists() {
return overwriteIfToKeyExists;
}

public String getRenameWhen() { return renameWhen; }
public String getRenameWhen() {
return renameWhen;
}

@JsonIgnore
public Pattern getFromKeyCompiledPattern() {
if (fromKeyRegex != null && fromKeyCompiledPattern == null) {
fromKeyCompiledPattern = Pattern.compile(fromKeyRegex);
}
return fromKeyCompiledPattern;
}

public Entry(final EventKey fromKey, final EventKey toKey, final boolean overwriteIfKeyExists, final String renameWhen) {
public Entry(final EventKey fromKey, final String fromKeyPattern, final EventKey toKey, final boolean overwriteIfKeyExists, final String renameWhen) {
this.fromKey = fromKey;
this.fromKeyRegex = fromKeyPattern;
this.toKey = toKey;
this.overwriteIfToKeyExists = overwriteIfKeyExists;
this.renameWhen = renameWhen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.LinkedList;
import java.util.Arrays;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
Expand All @@ -49,17 +51,28 @@ public class RenameKeyProcessorTests {
@Test
void invalid_rename_when_throws_InvalidPluginConfigurationException() {
final String renameWhen = UUID.randomUUID().toString();
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, renameWhen)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", true, renameWhen)));


when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void invalid_config_when_both_from_key_empty_throws_InvalidPluginConfigurationException() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,null, "newMessage", true, null)));
assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void invalid_config_when_both_from_key_set_throws_InvalidPluginConfigurationException() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message","m.*", "newMessage", true, null)));
assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}
@Test
public void testSingleOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -73,7 +86,7 @@ public void testSingleOverwriteRenameProcessorTests() {

@Test
public void testSingleNoOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", null,"newMessage", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -85,9 +98,55 @@ public void testSingleNoOverwriteRenameProcessorTests() {
assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("test2"));
}

@Test
public void testFromKeyPatternNoOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "(detailed_timestamp|detail_timestamp).*","detailed_timestamp", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("detailed_timestamp_1004", "test2");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp_1004"), is(false));
assertThat(editedRecords.get(0).getData().get("detailed_timestamp", Object.class), equalTo("test2"));
}
@Test
public void testFromKeyPatternGroupingPatternRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "(detailed_timestamp|detail_timestamp).*","detailed_timestamp", false, null)));
final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("detailed_timestamp_1004", "test2");
record.getData().put("test_key","test_value");
final Record<Event> second_record = getEvent("thisisanewmessage");
second_record.getData().put("detail_timestamp-123", "test3");
Collection<Record<Event>> records = new ArrayList<>();
records.add(record);
records.add(second_record);
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(records);
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("test_key"), is(true));
assertThat(editedRecords.get(1).getData().containsKey("detailed_timestamp"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("detailed_timestamp_1004"), is(false));
assertThat(editedRecords.get(1).getData().containsKey("detail_timestamp-123"), is(false));
assertThat(editedRecords.get(0).getData().get("detailed_timestamp", Object.class), equalTo("test2"));
assertThat(editedRecords.get(1).getData().get("detailed_timestamp", Object.class), equalTo("test3"));
}
@Test
public void testFromKeyPatternOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null, "me.*","newMessage", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("newMessage", "test2");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("message"), is(false));
assertThat(editedRecords.get(0).getData().get("newMessage", Object.class), equalTo("thisisamessage"));
}

@Test
public void testFromKeyDneRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2", "newMessage", false, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message2",null, "newMessage", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -100,8 +159,8 @@ public void testFromKeyDneRenameProcessorTests() {

@Test
public void testMultiMixedOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null),
createEntry("message2", "existingMessage", false, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", true, null),
createEntry("message2",null, "existingMessage", false, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -118,8 +177,23 @@ public void testMultiMixedOverwriteRenameProcessorTests() {

@Test
public void testChainRenamingRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null),
createEntry("newMessage", "message3", true, null)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", true, null),
createEntry("newMessage", null,"message3", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message3"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(false));
assertThat(editedRecords.get(0).getData().containsKey("message"), is(false));
assertThat(editedRecords.get(0).getData().get("message3", Object.class), equalTo("thisisamessage"));
}

@Test
public void testChainRenamingFromKeyPatternRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry(null,"me.*", "newMessage", true, null),
createEntry(null, "new.*","message3", true, null)));

final RenameKeyProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
Expand All @@ -135,7 +209,7 @@ public void testChainRenamingRenameProcessorTests() {
public void testNoRename_when_RenameWhen_returns_false() {
final String renameWhen = UUID.randomUUID().toString();

when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", false, renameWhen)));
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message",null, "newMessage", false, renameWhen)));
when(expressionEvaluator.isValidExpressionStatement(renameWhen)).thenReturn(true);

final RenameKeyProcessor processor = createObjectUnderTest();
Expand All @@ -150,14 +224,16 @@ public void testNoRename_when_RenameWhen_returns_false() {
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("thisisamessage"));
}



private RenameKeyProcessor createObjectUnderTest() {
return new RenameKeyProcessor(pluginMetrics, mockConfig, expressionEvaluator);
}

private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) {
final EventKey fromEventKey = eventKeyFactory.createEventKey(fromKey);
private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String fromKeyPattern, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) {
final EventKey fromEventKey = (fromKey == null) ? null : eventKeyFactory.createEventKey(fromKey);
final EventKey toEventKey = eventKeyFactory.createEventKey(toKey);
return new RenameKeyProcessorConfig.Entry(fromEventKey, toEventKey, overwriteIfToKeyExists, renameWhen);
return new RenameKeyProcessorConfig.Entry(fromEventKey,fromKeyPattern, toEventKey, overwriteIfToKeyExists, renameWhen);
}

private List<RenameKeyProcessorConfig.Entry> createListOfEntries(final RenameKeyProcessorConfig.Entry... entries) {
Expand Down

0 comments on commit 86ba9b7

Please sign in to comment.