Skip to content

Commit

Permalink
Truncate Processor: Add support to truncate all fields in an event (o…
Browse files Browse the repository at this point in the history
…pensearch-project#4317)

Truncate Processor: Add support to truncate all fields in an event

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Mar 25, 2024
1 parent 8f139a0 commit 8a7132d
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;

Expand All @@ -41,14 +42,38 @@ public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProces
}

private String getTruncatedValue(final String value, final int startIndex, final Integer length) {
String truncatedValue =
(length == null || startIndex+length >= value.length()) ?
value.substring(startIndex) :
String truncatedValue =
(length == null || startIndex+length >= value.length()) ?
value.substring(startIndex) :
value.substring(startIndex, startIndex + length);

return truncatedValue;
}

private void truncateKey(Event event, String key, Object value, TruncateProcessorConfig.Entry entryConfig) {
final boolean recurse = entryConfig.getRecurse();
final int startIndex = entryConfig.getStartAt() == null ? 0 : entryConfig.getStartAt();
final Integer length = entryConfig.getLength();
if (value instanceof String) {
event.put(key, getTruncatedValue((String) value, startIndex, length));
} else if (value instanceof List) {
List<Object> result = new ArrayList<>();
for (Object listItem : (List) value) {
if (listItem instanceof String) {
result.add(getTruncatedValue((String) listItem, startIndex, length));
} else {
result.add(listItem);
}
}
event.put(key, result);
} else if (recurse && (value instanceof Map)) {
Map<String, Object> valueMap = (Map<String, Object>)value;
for (Map.Entry<String, Object> mapEntry: valueMap.entrySet()) {
truncateKey(event, key+"/"+mapEntry.getKey(), mapEntry.getValue(), entryConfig);
}
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
Expand All @@ -58,30 +83,26 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
for (TruncateProcessorConfig.Entry entry : entries) {
final List<String> sourceKeys = entry.getSourceKeys();
final String truncateWhen = entry.getTruncateWhen();
final boolean recurse = entry.getRecurse();
final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt();
final Integer length = entry.getLength();
if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) {
continue;
}
if (sourceKeys == null) {
for (Map.Entry<String, Object> mapEntry: recordEvent.toMap().entrySet()) {
truncateKey(recordEvent, mapEntry.getKey(), mapEntry.getValue(), entry);
}
continue;
}

for (String sourceKey : sourceKeys) {
if (!recordEvent.containsKey(sourceKey)) {
continue;
}

final Object value = recordEvent.get(sourceKey, Object.class);
if (value instanceof String) {
recordEvent.put(sourceKey, getTruncatedValue((String) value, startIndex, length));
} else if (value instanceof List) {
List<Object> result = new ArrayList<>();
for (Object listItem : (List) value) {
if (listItem instanceof String) {
result.add(getTruncatedValue((String) listItem, startIndex, length));
} else {
result.add(listItem);
}
}
recordEvent.put(sourceKey, result);
}
truncateKey(recordEvent, sourceKey, value, entry);
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

public class TruncateProcessorConfig {
public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("source_keys")
private List<String> sourceKeys;

Expand All @@ -26,14 +24,18 @@ public static class Entry {
@JsonProperty("length")
private Integer length;

@JsonProperty("recursive")
private Boolean recurse = false;

@JsonProperty("truncate_when")
private String truncateWhen;

public Entry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) {
public Entry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen, final Boolean recurse) {
this.sourceKeys = sourceKeys;
this.startAt = startAt;
this.length = length;
this.truncateWhen = truncateWhen;
this.recurse = recurse;
}

public Entry() {}
Expand All @@ -46,6 +48,10 @@ public Integer getStartAt() {
return startAt;
}

public Boolean getRecurse() {
return recurse;
}

public Integer getLength() {
return length;
}
Expand All @@ -54,7 +60,7 @@ public String getTruncateWhen() {
return truncateWhen;
}

@AssertTrue(message = "source_keys must be specified and at least one of start_at or length or both must be specified and the values must be positive integers")
@AssertTrue(message = "At least one of start_at or length or both must be specified and the values must be positive integers")
public boolean isValidConfig() {
if (length == null && startAt == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.ImmutableMap;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -53,7 +54,7 @@ private TruncateProcessor createObjectUnderTest() {
@ArgumentsSource(TruncateArgumentsProvider.class)
void testTruncateProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) {

when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), startAt, truncateLength, null)));
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), startAt, truncateLength, null, false)));
final TruncateProcessor truncateProcessor = createObjectUnderTest();
final Record<Event> record = createEvent("message", messageValue);
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateProcessor.doExecute(Collections.singletonList(record));
Expand All @@ -64,8 +65,8 @@ void testTruncateProcessor(final Object messageValue, final Integer startAt, fin
@ParameterizedTest
@ArgumentsSource(MultipleTruncateArgumentsProvider.class)
void testTruncateProcessorMultipleEntries(final Object messageValue, final Integer startAt1, final Integer truncateLength1, final Integer startAt2, final Integer truncateLength2, final Object truncatedMessage1, final Object truncatedMessage2) {
TruncateProcessorConfig.Entry entry1 = createEntry(List.of("message1"), startAt1, truncateLength1, null);
TruncateProcessorConfig.Entry entry2 = createEntry(List.of("message2"), startAt2, truncateLength2, null);
TruncateProcessorConfig.Entry entry1 = createEntry(List.of("message1"), startAt1, truncateLength1, null, false);
TruncateProcessorConfig.Entry entry2 = createEntry(List.of("message2"), startAt2, truncateLength2, null, false);
when(config.getEntries()).thenReturn(List.of(entry1, entry2));
final Record<Event> record1 = createEvent("message1", messageValue);
final Record<Event> record2 = createEvent("message2", messageValue);
Expand All @@ -82,7 +83,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() {
final String truncateWhen = UUID.randomUUID().toString();
final String message = UUID.randomUUID().toString();

when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen)));
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen, false)));

final TruncateProcessor truncateProcessor = createObjectUnderTest();
final Record<Event> record = createEvent("message", message);
Expand All @@ -92,8 +93,32 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() {
assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap()));
}

private TruncateProcessorConfig.Entry createEntry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) {
return new TruncateProcessorConfig.Entry(sourceKeys, startAt, length, truncateWhen);
@Test
void test_event_with_all_fields_truncated() {
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(null, null, 5, null, false)));
final TruncateProcessor truncateProcessor = createObjectUnderTest();
final Record<Event> record = createEventWithMultipleKeys(Map.of("key1", "aaaaa12345", "key2", "bbbbb12345", "key3", "ccccccc12345"));
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateProcessor.doExecute(Collections.singletonList(record));
Event event = truncatedRecords.get(0).getData();
assertThat(event.get("key1", String.class), equalTo("aaaaa"));
assertThat(event.get("key2", String.class), equalTo("bbbbb"));
assertThat(event.get("key3", String.class), equalTo("ccccc"));
}

@Test
void test_event_with_all_fields_truncated_recursively() {
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(null, null, 5, null, true)));
final TruncateProcessor truncateProcessor = createObjectUnderTest();
final Record<Event> record = createEventWithMultipleKeys(ImmutableMap.of("key1", "aaaaa12345", "key2", ImmutableMap.of("key3", "bbbbb12345", "key4", ImmutableMap.of("key5", "ccccccc12345"))));
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateProcessor.doExecute(Collections.singletonList(record));
Event event = truncatedRecords.get(0).getData();
assertThat(event.get("key1", String.class), equalTo("aaaaa"));
assertThat(event.get("key2/key3", String.class), equalTo("bbbbb"));
assertThat(event.get("key2/key4/key5", String.class), equalTo("ccccc"));
}

private TruncateProcessorConfig.Entry createEntry(final List<String> sourceKeys, final Integer startAt, final Integer length, final String truncateWhen, final boolean recurse) {
return new TruncateProcessorConfig.Entry(sourceKeys, startAt, length, truncateWhen, recurse);
}

private Record<Event> createEvent(final String key, final Object value) {
Expand All @@ -105,6 +130,13 @@ private Record<Event> createEvent(final String key, final Object value) {
.build());
}

private Record<Event> createEventWithMultipleKeys(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withEventType("event")
.withData(data)
.build());
}

static class TruncateArgumentsProvider implements ArgumentsProvider {

@Override
Expand Down

0 comments on commit 8a7132d

Please sign in to comment.