Skip to content

Commit

Permalink
Add include_performance_metadata parameter and track total grok patte…
Browse files Browse the repository at this point in the history
…rns attempted in the grok processor (opensearch-project#4197)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Feb 29, 2024
1 parent 60db08e commit 337eb71
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY;


@SingleThread
Expand Down Expand Up @@ -228,6 +229,8 @@ private void compileMatchPatterns() {
private void matchAndMerge(final Event event) {
final Map<String, Object> grokkedCaptures = new HashMap<>();

int patternsAttempted = 0;

for (final Map.Entry<String, List<Grok>> entry : fieldToGrok.entrySet()) {
for (final Grok grok : entry.getValue()) {
final String value = event.get(entry.getKey(), String.class);
Expand All @@ -238,6 +241,8 @@ private void matchAndMerge(final Event event) {
final Map<String, Object> captures = match.capture();
mergeCaptures(grokkedCaptures, captures);

patternsAttempted++;

if (shouldBreakOnMatch(grokkedCaptures)) {
break;
}
Expand All @@ -262,6 +267,15 @@ private void matchAndMerge(final Event event) {
} else {
grokProcessingMatchCounter.increment();
}

if (grokProcessorConfig.getIncludePerformanceMetadata()) {
Integer totalPatternsAttemptedForEvent = (Integer) event.getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY);
if (totalPatternsAttemptedForEvent == null) {
totalPatternsAttemptedForEvent = 0;
}

event.getMetadata().setAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY, totalPatternsAttemptedForEvent + patternsAttempted);
}
}

private void mergeCaptures(final Map<String, Object> original, final Map<String, Object> updates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import java.util.Map;

public class GrokProcessorConfig {

static final String TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY = "_total_grok_patterns_attempted";

static final String BREAK_ON_MATCH = "break_on_match";
static final String KEEP_EMPTY_CAPTURES = "keep_empty_captures";
static final String MATCH = "match";
Expand All @@ -25,6 +28,8 @@ public class GrokProcessorConfig {
static final String TAGS_ON_MATCH_FAILURE = "tags_on_match_failure";
static final String TAGS_ON_TIMEOUT = "tags_on_timeout";

static final String INCLUDE_PERFORMANCE_METADATA = "include_performance_metadata";

static final boolean DEFAULT_BREAK_ON_MATCH = true;
static final boolean DEFAULT_KEEP_EMPTY_CAPTURES = false;
static final boolean DEFAULT_NAMED_CAPTURES_ONLY = true;
Expand All @@ -46,6 +51,8 @@ public class GrokProcessorConfig {
private final List<String> tagsOnMatchFailure;
private final List<String> tagsOnTimeout;

private final boolean includePerformanceMetadata;

private GrokProcessorConfig(final boolean breakOnMatch,
final boolean keepEmptyCaptures,
final Map<String, List<String>> match,
Expand All @@ -58,7 +65,8 @@ private GrokProcessorConfig(final boolean breakOnMatch,
final String targetKey,
final String grokWhen,
final List<String> tagsOnMatchFailure,
final List<String> tagsOnTimeout) {
final List<String> tagsOnTimeout,
final boolean includePerformanceMetadata) {

this.breakOnMatch = breakOnMatch;
this.keepEmptyCaptures = keepEmptyCaptures;
Expand All @@ -73,6 +81,7 @@ private GrokProcessorConfig(final boolean breakOnMatch,
this.grokWhen = grokWhen;
this.tagsOnMatchFailure = tagsOnMatchFailure;
this.tagsOnTimeout = tagsOnTimeout.isEmpty() ? tagsOnMatchFailure : tagsOnTimeout;
this.includePerformanceMetadata = includePerformanceMetadata;
}

public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting) {
Expand All @@ -88,7 +97,8 @@ public static GrokProcessorConfig buildConfig(final PluginSetting pluginSetting)
pluginSetting.getStringOrDefault(TARGET_KEY, DEFAULT_TARGET_KEY),
pluginSetting.getStringOrDefault(GROK_WHEN, null),
pluginSetting.getTypedList(TAGS_ON_MATCH_FAILURE, String.class),
pluginSetting.getTypedList(TAGS_ON_TIMEOUT, String.class));
pluginSetting.getTypedList(TAGS_ON_TIMEOUT, String.class),
pluginSetting.getBooleanOrDefault(INCLUDE_PERFORMANCE_METADATA, true));
}

public boolean isBreakOnMatch() {
Expand Down Expand Up @@ -140,4 +150,6 @@ public List<String> getTagsOnMatchFailure() {
public List<String> getTagsOnTimeout() {
return tagsOnTimeout;
}

public boolean getIncludePerformanceMetadata() { return includePerformanceMetadata; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void testDefault() {
assertThat(grokProcessorConfig.getGrokWhen(), equalTo(null));
assertThat(grokProcessorConfig.getTagsOnMatchFailure(), equalTo(Collections.emptyList()));
assertThat(grokProcessorConfig.getTagsOnTimeout(), equalTo(Collections.emptyList()));
assertThat(grokProcessorConfig.getIncludePerformanceMetadata(), equalTo(true));
}

@Test
Expand All @@ -91,7 +92,8 @@ public void testValidConfig() {
TEST_PATTERNS_FILES_GLOB,
TEST_PATTERN_DEFINITIONS,
TEST_TIMEOUT_MILLIS,
TEST_TARGET_KEY);
TEST_TARGET_KEY,
false);

final GrokProcessorConfig grokProcessorConfig = GrokProcessorConfig.buildConfig(validPluginSetting);

Expand All @@ -105,6 +107,7 @@ public void testValidConfig() {
assertThat(grokProcessorConfig.getTargetKey(), equalTo(TEST_TARGET_KEY));
assertThat(grokProcessorConfig.isNamedCapturesOnly(), equalTo(false));
assertThat(grokProcessorConfig.getTimeoutMillis(), equalTo(TEST_TIMEOUT_MILLIS));
assertThat(grokProcessorConfig.getIncludePerformanceMetadata(), equalTo(false));
}

@Test
Expand All @@ -119,7 +122,8 @@ public void testInvalidConfig() {
TEST_PATTERNS_FILES_GLOB,
TEST_PATTERN_DEFINITIONS,
TEST_TIMEOUT_MILLIS,
TEST_TARGET_KEY);
TEST_TARGET_KEY,
false);

invalidPluginSetting.getSettings().put(GrokProcessorConfig.MATCH, TEST_INVALID_MATCH);

Expand All @@ -135,7 +139,8 @@ private PluginSetting completePluginSettingForGrokProcessor(final boolean breakO
final String patternsFilesGlob,
final Map<String, String> patternDefinitions,
final int timeoutMillis,
final String targetKey) {
final String targetKey,
final boolean includePerformanceMetadata) {
final Map<String, Object> settings = new HashMap<>();
settings.put(GrokProcessorConfig.BREAK_ON_MATCH, breakOnMatch);
settings.put(GrokProcessorConfig.NAMED_CAPTURES_ONLY, namedCapturesOnly);
Expand All @@ -147,6 +152,7 @@ private PluginSetting completePluginSettingForGrokProcessor(final boolean breakO
settings.put(GrokProcessorConfig.PATTERNS_FILES_GLOB, patternsFilesGlob);
settings.put(GrokProcessorConfig.TIMEOUT_MILLIS, timeoutMillis);
settings.put(GrokProcessorConfig.TARGET_KEY, targetKey);
settings.put(GrokProcessorConfig.INCLUDE_PERFORMANCE_METADATA, includePerformanceMetadata);

return new PluginSetting(PLUGIN_NAME, settings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessor.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT;
import static org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig.TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY;
import static org.opensearch.dataprepper.test.matcher.MapEquals.isEqualWithoutTimestamp;


Expand Down Expand Up @@ -160,6 +161,8 @@ private GrokProcessor createObjectUnderTest() {

@Test
public void testMatchMerge() throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
pluginSetting.getSettings().put(GrokProcessorConfig.INCLUDE_PERFORMANCE_METADATA, false);

grokProcessor = createObjectUnderTest();

capture.put("key_capture_1", "value_capture_1");
Expand All @@ -182,7 +185,11 @@ public void testMatchMerge() throws JsonProcessingException, ExecutionException,

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertThat(grokkedRecords.get(0).getData(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY), equalTo(null));
assertRecordsAreEqual(grokkedRecords.get(0), resultRecord);

verify(grokProcessingMatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMismatchCounter, grokProcessingTimeoutsCounter);
Expand Down Expand Up @@ -516,6 +523,62 @@ public void testNoCaptures() throws JsonProcessingException {
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
}

@Test
public void testMatchOnSecondPattern() throws JsonProcessingException {
pluginSetting.getSettings().put(GrokProcessorConfig.INCLUDE_PERFORMANCE_METADATA, true);

when(match.capture()).thenReturn(Collections.emptyMap());
when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch);
when(secondMatch.capture()).thenReturn(capture);

grokProcessor = createObjectUnderTest();

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertThat(grokkedRecords.get(0).getData(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY), equalTo(2));
assertRecordsAreEqual(grokkedRecords.get(0), record);
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
}

@Test
public void testMatchOnSecondPatternWithExistingMetadataForTotalPatternMatches() throws JsonProcessingException {
pluginSetting.getSettings().put(GrokProcessorConfig.INCLUDE_PERFORMANCE_METADATA, true);

when(match.capture()).thenReturn(Collections.emptyMap());
when(grokSecondMatch.match(messageInput)).thenReturn(secondMatch);
when(secondMatch.capture()).thenReturn(capture);

grokProcessor = createObjectUnderTest();

final Map<String, Object> testData = new HashMap();
testData.put("message", messageInput);
final Record<Event> record = buildRecordWithEvent(testData);

record.getData().getMetadata().setAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY, 1);

final List<Record<Event>> grokkedRecords = (List<Record<Event>>) grokProcessor.doExecute(Collections.singletonList(record));

assertThat(grokkedRecords.size(), equalTo(1));
assertThat(grokkedRecords.get(0), notNullValue());
assertThat(grokkedRecords.get(0).getData(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata(), notNullValue());
assertThat(grokkedRecords.get(0).getData().getMetadata().getAttribute(TOTAL_PATTERNS_ATTEMPTED_METADATA_KEY), equalTo(3));
assertRecordsAreEqual(grokkedRecords.get(0), record);
verify(grokProcessingMismatchCounter, times(1)).increment();
verify(grokProcessingTime, times(1)).record(any(Runnable.class));
verifyNoInteractions(grokProcessingErrorsCounter, grokProcessingMatchCounter, grokProcessingTimeoutsCounter);
}

@Nested
class WithTags {
private String tagOnMatchFailure1;
Expand Down

0 comments on commit 337eb71

Please sign in to comment.