Skip to content

Commit

Permalink
Modified to make truncate processor a top level processor, not specif…
Browse files Browse the repository at this point in the history
…ic to strings

Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jan 9, 2024
1 parent f73f553 commit 96c2568
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 114 deletions.
51 changes: 0 additions & 51 deletions data-prepper-plugins/mutate-string-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,57 +196,6 @@ When you run Data Prepper with this `pipeline.yaml`, you should see the followin
### Configuration
* `with_keys` - (required) - A list of keys to trim the whitespace from

## TruncateStringProcessor
A processor that truncates string by removing user configured number of characters at beginning or at the end or both sides of a string.

### Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- trucate_string:
entries:
- source: "message"
length: 5
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message": "hello,world"}
```
When you run Data Prepper with this `pipeline.yaml`, you should see the following output:

```json
{"message":["hello"]}
```

If the above yaml file has additional config of `start_at: 2`, then the output would be following:

```json
{"message":["llo,w"]}
```

If the above yaml file has additional config of `start_at: 2`, and does not have `length: 5` in the config, then the output would be following:

```json
{"message":["llo,world"]}
```

### Configuration
* `entries` - (required) - A list of entries to add to an event
* `source` - (required) - The key to be modified
* `start_at` - (optional) - starting index of the string. Defaults to 0.
* `length` - (optional) - length of the string after truncation. Defaults to end of the string.
Either `start_at` or `length` or both must be present

---

## Developer Guide
Expand Down

This file was deleted.

92 changes: 92 additions & 0 deletions data-prepper-plugins/truncate-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Truncate Processor

This is a processor that truncates key's value at the beginning or at the end or at both sides of a string as per the configuration. If the key's value is a list, then each of the string members of the list are truncated. Non-string members of the list are left untouched. If `truncate_when` option is provided, the truncation of the input is done only when the condition specified is true for the event being processed.

## Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- trucate_string:
entries:
- source: "message"
length: 5
sink:
- stdout:
```
Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message": "hello,world"}
```
When you run Data Prepper with this `pipeline.yaml`, you should see the following output:

```json
{"message":["hello"]}
```

If the above yaml file has additional config of `start_at: 2`, then the output would be following:

```json
{"message":["llo,w"]}
```

If the above yaml file has additional config of `start_at: 2`, and does not have `length: 5` in the config, then the output would be following:

```json
{"message":["llo,world"]}
```

If the source has an list of strings, then the result will be an array of strings where each of the member of the list is truncated. The following input
```json
{"message": ["hello_one", "hello_two", "hello_three"]}
```
is transformed to the following:

```json
{"message": ["hello", "hello", "hello"]}
```

Example configuration with `truncate_when` option:
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- trucate_string:
entries:
- source: "message"
length: 5
start_at: 7
truncate_when: '/id == 1'
sink:
- stdout:
```

When the pipeline started with the above configuration receives the following two events
```json
{"message": "hello, world", "id": 1}
{"message": "hello, world,not-truncated", "id": 2}
```
the output would be
```json
{"message": "world", "id": 1}
{"message": "hello, world,not-truncated", "id": 2}
```

### Configuration
* `entries` - (required) - A list of entries to add to an event
* `source` - (required) - The key to be modified
* `start_at` - (optional) - starting index of the string. Defaults to 0.
* `length` - (optional) - length of the string after truncation. Defaults to end of the string.
Either `start_at` or `length` or both must be present

17 changes: 17 additions & 0 deletions data-prepper-plugins/truncate-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-test-common')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation libs.commons.lang3
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.truncate;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.Collection;
import java.util.ArrayList;
import java.util.List;

/**
* This processor takes in a key and truncates its value to a string with
* characters from the front or at the end or at both removed.
* If the value is not a string, no action is performed.
*/
@DataPrepperPlugin(name = "truncate", pluginType = Processor.class, pluginConfigurationType = TruncateStringProcessorConfig.class)
public class TruncateStringProcessor extends AbstractProcessor<Record<Event>, Record<Event>>{
private final List<TruncateStringProcessorConfig.Entry> entries;
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public TruncateStringProcessor(final PluginMetrics pluginMetrics, final TruncateStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.entries = config.getEntries();
this.expressionEvaluator = expressionEvaluator;
}

private String getTruncatedValue(final TruncateStringProcessorConfig.Entry entry, final String value) {
int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt();
Integer length = entry.getLength();
String truncatedValue = (length == null || startIndex+length >= value.length()) ? value.substring(startIndex) : value.substring(startIndex, startIndex + length);

return truncatedValue;
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for(final Record<Event> record : records) {
final Event recordEvent = record.getData();
for(TruncateStringProcessorConfig.Entry entry : entries) {
if (entry.getTruncateWhen() != null && !expressionEvaluator.evaluateConditional(entry.getTruncateWhen(), recordEvent)) {
continue;
}
final String key = entry.getSource();
if (!recordEvent.containsKey(key)) {
continue;
}

final Object value = recordEvent.get(key, Object.class);
if (value instanceof String) {
recordEvent.put(key, getTruncatedValue(entry, (String)value));
} else if (value instanceof List) {
List<Object> result = new ArrayList<>();
for (Object arrayObject: (List)value) {
if (arrayObject instanceof String) {
result.add(getTruncatedValue(entry, (String)arrayObject));
} else {
result.add(arrayObject);
}
}
recordEvent.put(key, result);
}
}
}

return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;
package org.opensearch.dataprepper.plugins.processor.truncate;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
Expand All @@ -14,7 +13,7 @@

import java.util.List;

public class TruncateStringProcessorConfig implements StringProcessorConfig<TruncateStringProcessorConfig.Entry> {
public class TruncateStringProcessorConfig {
public static class Entry {

@NotEmpty
Expand Down Expand Up @@ -68,12 +67,6 @@ public Entry(final String source, final Integer startAt, final Integer length, f
public Entry() {}
}

@Override
@JsonIgnore
public List<Entry> getIterativeConfig() {
return entries;
}

private List<@Valid Entry> entries;

public List<Entry> getEntries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;
package org.opensearch.dataprepper.plugins.processor.truncate;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand Down Expand Up @@ -51,12 +51,12 @@ private TruncateStringProcessor createObjectUnderTest() {

@ParameterizedTest
@ArgumentsSource(TruncateStringArgumentsProvider.class)
void testTruncateStringProcessor(final String message, final Integer startAt, final Integer truncateLength, final String truncatedMessage) {
void testTruncateStringProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) {

when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", startAt, truncateLength, null)));
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", startAt, truncateLength, null)));

final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest();
final Record<Event> record = createEvent(message);
final Record<Event> record = createEvent(messageValue);
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateStringProcessor.doExecute(Collections.singletonList(record));
assertThat(truncatedRecords.get(0).getData().get("message", Object.class), notNullValue());
assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage));
Expand All @@ -76,7 +76,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() {
final String truncateWhen = UUID.randomUUID().toString();
final String message = UUID.randomUUID().toString();

when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, 5, truncateWhen)));
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, 5, truncateWhen)));

final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest();
final Record<Event> record = createEvent(message);
Expand All @@ -91,7 +91,7 @@ private TruncateStringProcessorConfig.Entry createEntry(final String source, fin
return new TruncateStringProcessorConfig.Entry(source, startAt, length, truncateWhen);
}

private Record<Event> createEvent(final String message) {
private Record<Event> createEvent(final Object message) {
final Map<String, Object> eventData = new HashMap<>();
eventData.put("message", message);
return new Record<>(JacksonEvent.builder()
Expand All @@ -116,7 +116,9 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
arguments("hello,world,no-truncate", null, 100, "hello,world,no-truncate"),
arguments("hello,world,truncate", null, 11, "hello,world"),
arguments("hello,world", null, 1, "h"),
arguments("hello", null, 0, "")
arguments("hello", null, 0, ""),
arguments(List.of("hello_one", "hello_two", "hello_three"), null, 5, List.of("hello", "hello", "hello")),
arguments(List.of("hello_one", 2, "hello_three"), null, 5, List.of("hello", 2, "hello"))
);
}
}
Expand Down
Loading

0 comments on commit 96c2568

Please sign in to comment.