From e54c838d9e0b89548e61f3ad23e5645084676062 Mon Sep 17 00:00:00 2001 From: Jonah Calvo Date: Thu, 10 Aug 2023 15:33:20 -0500 Subject: [PATCH] Adding cardinality key support for AD processor (#3073) * Adding cardinality key support for AD processor Signed-off-by: Jonah Calvo * Refactor hash function to common package. Add metrics for RCF instances. Implement optional verbose mode for RCF Signed-off-by: Jonah Calvo --------- Signed-off-by: Jonah Calvo --- .../AggregateActionSynchronizer.java | 5 +- .../aggregate/AggregateGroupManager.java | 15 +- .../aggregate/AggregateProcessor.java | 15 +- .../AggregateActionSynchronizerTest.java | 3 +- .../aggregate/AggregateGroupManagerTest.java | 17 +- .../aggregate/AggregateProcessorTest.java | 17 +- .../anomaly-detector-processor/build.gradle | 9 +- .../anomalydetector/AnomalyDetectorMode.java | 4 +- .../AnomalyDetectorProcessor.java | 40 ++++- .../AnomalyDetectorProcessorConfig.java | 14 ++ .../modes/RandomCutForestMode.java | 5 +- .../AnomalyDetectorProcessorTests.java | 159 +++++++++++------- .../modes/RandomCutForestModeTests.java | 47 +++++- .../hasher/IdentificationKeysHasher.java} | 12 +- .../hasher/IdentificationKeysHasherTest.java} | 34 ++-- 15 files changed, 261 insertions(+), 135 deletions(-) rename data-prepper-plugins/{aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java => common/src/main/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasher.java} (76%) rename data-prepper-plugins/{aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java => common/src/test/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasherTest.java} (65%) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizer.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizer.java index 14eeae80f2..c60e1da6e7 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizer.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizer.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ private AggregateActionSynchronizer(final AggregateAction aggregateAction, final this.actionConcludeGroupEventsProcessingErrors = pluginMetrics.counter(ACTION_CONCLUDE_GROUP_EVENTS_PROCESSING_ERRORS); } - AggregateActionOutput concludeGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) { + AggregateActionOutput concludeGroup(final IdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) { final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock(); final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock(); @@ -74,7 +75,7 @@ AggregateActionOutput concludeGroup(final AggregateIdentificationKeysHasher.Iden return actionOutput; } - AggregateActionResponse handleEventForGroup(final Event event, final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup) { + AggregateActionResponse handleEventForGroup(final Event event, final IdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup) { final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock(); final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock(); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java index 19431e131d..9d271aa40b 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; import com.google.common.collect.Maps; +import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; import java.time.Duration; import java.util.ArrayList; @@ -14,20 +15,20 @@ class AggregateGroupManager { - private final Map allGroups = Maps.newConcurrentMap(); + private final Map allGroups = Maps.newConcurrentMap(); private final Duration groupDuration; AggregateGroupManager(final Duration groupDuration) { this.groupDuration = groupDuration; } - AggregateGroup getAggregateGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) { + AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) { return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap())); } - List> getGroupsToConclude(final boolean forceConclude) { - final List> groupsToConclude = new ArrayList<>(); - for (final Map.Entry groupEntry : allGroups.entrySet()) { + List> getGroupsToConclude(final boolean forceConclude) { + final List> groupsToConclude = new ArrayList<>(); + for (final Map.Entry groupEntry : allGroups.entrySet()) { if (groupEntry.getValue().shouldConcludeGroup(groupDuration) || forceConclude) { groupsToConclude.add(groupEntry); } @@ -35,12 +36,12 @@ List, Record< private final AggregateProcessorConfig aggregateProcessorConfig; private final AggregateGroupManager aggregateGroupManager; private final AggregateActionSynchronizer aggregateActionSynchronizer; - private final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; + private final IdentificationKeysHasher identificationKeysHasher; private final AggregateAction aggregateAction; private boolean forceConclude = false; @@ -51,15 +52,15 @@ public class AggregateProcessor extends AbstractProcessor, Record< @DataPrepperPluginConstructor public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final ExpressionEvaluator expressionEvaluator) { this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(aggregateProcessorConfig.getGroupDuration()), - new AggregateIdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator); + new IdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator); } public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final AggregateGroupManager aggregateGroupManager, - final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher, final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider, final ExpressionEvaluator expressionEvaluator) { + final IdentificationKeysHasher identificationKeysHasher, final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.aggregateProcessorConfig = aggregateProcessorConfig; this.aggregateGroupManager = aggregateGroupManager; this.expressionEvaluator = expressionEvaluator; - this.aggregateIdentificationKeysHasher = aggregateIdentificationKeysHasher; + this.identificationKeysHasher = identificationKeysHasher; this.aggregateAction = loadAggregateAction(pluginFactory); this.aggregateActionSynchronizer = aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics); @@ -82,8 +83,8 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) { public Collection> doExecute(Collection> records) { final List> recordsOut = new LinkedList<>(); - final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude); - for (final Map.Entry groupEntry : groupsToConclude) { + final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude); + for (final Map.Entry groupEntry : groupsToConclude) { final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude); final List concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null; @@ -105,7 +106,7 @@ public Collection> doExecute(Collection> records) { handleEventsDropped++; continue; } - final AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event); + final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap); final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizerTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizerTest.java index 4b6e71611a..ed8195d0fb 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizerTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionSynchronizerTest.java @@ -17,6 +17,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; import java.time.Duration; import java.util.List; @@ -47,7 +48,7 @@ public class AggregateActionSynchronizerTest { private AggregateGroup aggregateGroup; @Mock - private AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap; + private IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap; @Mock private AggregateActionResponse aggregateActionResponse; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java index 3ddca61aa9..03d3c634b6 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManagerTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; import java.time.Duration; import java.util.Collections; @@ -28,7 +29,7 @@ public class AggregateGroupManagerTest { private AggregateGroupManager aggregateGroupManager; - private AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap; + private IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap; private static final Duration TEST_GROUP_DURATION = Duration.ofSeconds(new Random().nextInt(10) + 10); @@ -37,7 +38,7 @@ void setup() { final Map identificationKeysHash = new HashMap<>(); identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - identificationKeysMap = new AggregateIdentificationKeysHasher.IdentificationKeysMap(identificationKeysHash); + identificationKeysMap = new IdentificationKeysHasher.IdentificationKeysMap(identificationKeysHash); } private AggregateGroupManager createObjectUnderTest() { @@ -92,16 +93,16 @@ void getGroupsToConclude_returns_correct_group() { final AggregateGroup groupToConclude = mock(AggregateGroup.class); when(groupToConclude.shouldConcludeGroup(TEST_GROUP_DURATION)).thenReturn(true); - final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class); + final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude = mock(IdentificationKeysHasher.IdentificationKeysMap.class); final AggregateGroup groupToNotConclude = mock(AggregateGroup.class); when(groupToNotConclude.shouldConcludeGroup(TEST_GROUP_DURATION)).thenReturn(false); - final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToNotConclude = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class); + final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToNotConclude = mock(IdentificationKeysHasher.IdentificationKeysMap.class); aggregateGroupManager.putGroupWithHash(hashForGroupToConclude, groupToConclude); aggregateGroupManager.putGroupWithHash(hashForGroupToNotConclude, groupToNotConclude); - final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(false); + final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(false); assertThat(groupsToConclude.size(), equalTo(1)); assertThat(groupsToConclude.get(0), notNullValue()); @@ -114,15 +115,15 @@ void getGroupsToConclude_with_force_conclude_return_all() { aggregateGroupManager = createObjectUnderTest(); final AggregateGroup groupToConclude1 = mock(AggregateGroup.class); - final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude1 = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class); + final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude1 = mock(IdentificationKeysHasher.IdentificationKeysMap.class); final AggregateGroup groupToConclude2 = mock(AggregateGroup.class); - final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude2 = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class); + final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude2 = mock(IdentificationKeysHasher.IdentificationKeysMap.class); aggregateGroupManager.putGroupWithHash(hashForGroupToConclude1, groupToConclude1); aggregateGroupManager.putGroupWithHash(hashForGroupToConclude2, groupToConclude2); - final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(true); + final List> groupsToConclude = aggregateGroupManager.getGroupsToConclude(true); assertThat(groupsToConclude.size(), equalTo(2)); assertThat(groupsToConclude.get(0), notNullValue()); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index 90d912cbd8..ad0d763078 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; import java.util.AbstractMap; import java.util.Collection; @@ -53,10 +54,10 @@ public class AggregateProcessorTest { private PluginFactory pluginFactory; @Mock - private AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; + private IdentificationKeysHasher identificationKeysHasher; @Mock - private AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap; + private IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap; @Mock private AggregateProcessorConfig aggregateProcessorConfig; @@ -115,7 +116,7 @@ public class AggregateProcessorTest { private Event event; private AggregateProcessor createObjectUnderTest() { - return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, aggregateGroupManager, aggregateIdentificationKeysHasher, aggregateActionSynchronizerProvider, expressionEvaluator); + return new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, aggregateGroupManager, identificationKeysHasher, aggregateActionSynchronizerProvider, expressionEvaluator); } @BeforeEach @@ -160,7 +161,7 @@ void getIdentificationKeys_should_return_configured_identification_keys() { class TestDoExecute { @BeforeEach void setup() { - when(aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event)) + when(identificationKeysHasher.createIdentificationKeysMapFromEvent(event)) .thenReturn(identificationKeysMap); when(aggregateGroupManager.getAggregateGroup(identificationKeysMap)).thenReturn(aggregateGroup); when(aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroup)).thenReturn(aggregateActionResponse); @@ -209,7 +210,7 @@ void handleEvent_returning_with_condition_eliminates_one_record() { .build(); - when(aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent)) + when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent)) .thenReturn(identificationKeysMap); when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse); when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true); @@ -267,7 +268,7 @@ void handleEvent_returning_with_event_adds_event_to_records_out() { void concludeGroup_returning_with_no_event_does_not_add_event_to_records_out() { final AggregateProcessor objectUnderTest = createObjectUnderTest(); - final Map.Entry groupEntry = new AbstractMap.SimpleEntry(identificationKeysMap, aggregateGroup); + final Map.Entry groupEntry = new AbstractMap.SimpleEntry(identificationKeysMap, aggregateGroup); when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.singletonList(groupEntry)); when(aggregateActionResponse.getEvent()).thenReturn(null); when(aggregateActionSynchronizer.concludeGroup(identificationKeysMap, aggregateGroup, false)).thenReturn(new AggregateActionOutput(List.of())); @@ -289,7 +290,7 @@ void concludeGroup_returning_with_no_event_does_not_add_event_to_records_out() { void concludeGroup_returning_with_event_adds_event_to_records_out() { final AggregateProcessor objectUnderTest = createObjectUnderTest(); - final Map.Entry groupEntry = new AbstractMap.SimpleEntry(identificationKeysMap, aggregateGroup); + final Map.Entry groupEntry = new AbstractMap.SimpleEntry(identificationKeysMap, aggregateGroup); when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.singletonList(groupEntry)); when(aggregateActionResponse.getEvent()).thenReturn(null); when(aggregateActionSynchronizer.concludeGroup(identificationKeysMap, aggregateGroup, false)).thenReturn(new AggregateActionOutput(List.of(event))); @@ -314,7 +315,7 @@ void concludeGroup_after_prepare_for_shutdown() { final AggregateProcessor objectUnderTest = createObjectUnderTest(); objectUnderTest.prepareForShutdown(); - final Map.Entry groupEntry = new AbstractMap.SimpleEntry(identificationKeysMap, aggregateGroup); + final Map.Entry groupEntry = new AbstractMap.SimpleEntry(identificationKeysMap, aggregateGroup); when(aggregateGroupManager.getGroupsToConclude(eq(true))).thenReturn(Collections.singletonList(groupEntry)); when(aggregateActionResponse.getEvent()).thenReturn(null); when(aggregateActionSynchronizer.concludeGroup(identificationKeysMap, aggregateGroup, true)).thenReturn(new AggregateActionOutput(List.of(event))); diff --git a/data-prepper-plugins/anomaly-detector-processor/build.gradle b/data-prepper-plugins/anomaly-detector-processor/build.gradle index e76b92a960..76f5ae9513 100644 --- a/data-prepper-plugins/anomaly-detector-processor/build.gradle +++ b/data-prepper-plugins/anomaly-detector-processor/build.gradle @@ -10,12 +10,13 @@ plugins { dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-test-common') + implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'io.micrometer:micrometer-core' - implementation 'software.amazon.randomcutforest:randomcutforest-testutils:3.7.0' - implementation 'software.amazon.randomcutforest:randomcutforest-core:3.7.0' - implementation 'software.amazon.randomcutforest:randomcutforest-examples:3.7.0' - implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.7.0' + implementation 'software.amazon.randomcutforest:randomcutforest-testutils:3.8.0' + implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0' + implementation 'software.amazon.randomcutforest:randomcutforest-examples:3.8.0' + implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0' implementation 'software.amazon.randomcutforest:randomcutforest-serialization-json:1.0' testImplementation libs.commons.lang3 } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorMode.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorMode.java index a2fc9e6773..911b587c58 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorMode.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorMode.java @@ -20,8 +20,10 @@ public interface AnomalyDetectorMode { * * @param keys List of keys which are used as dimensions in the anomaly detector * @since 2.1 + * + * @param verbose Optional, when true, RCF will turn off Auto-Adjust, and anomalies will be continually detected after a level shift */ - void initialize(List keys); + void initialize(List keys, boolean verbose); /** * handles a collection of records diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java index 74e7ba8501..57872c7ecd 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.processor.anomalydetector; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -15,26 +16,38 @@ import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Objects; @DataPrepperPlugin(name = "anomaly_detector", pluginType = Processor.class, pluginConfigurationType = AnomalyDetectorProcessorConfig.class) public class AnomalyDetectorProcessor extends AbstractProcessor, Record> { public static final String DEVIATION_KEY = "deviation_from_expected"; public static final String GRADE_KEY = "grade"; + static final String NUMBER_RCF_INSTANCES = "numberRCFInstances"; + private final Boolean verbose; + private final IdentificationKeysHasher identificationKeysHasher; + private final Counter numberRCFInstances; private final List keys; - private final AnomalyDetectorMode mode; + private final PluginFactory pluginFactory; + private final HashMap forestMap; private final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig; @DataPrepperPluginConstructor public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { super(pluginMetrics); + this.identificationKeysHasher = new IdentificationKeysHasher(anomalyDetectorProcessorConfig.getIdentificationKeys()); this.anomalyDetectorProcessorConfig = anomalyDetectorProcessorConfig; - keys = anomalyDetectorProcessorConfig.getKeys(); - mode = loadAnomalyDetectorMode(pluginFactory); - mode.initialize(keys); + this.pluginFactory = pluginFactory; + this.numberRCFInstances = pluginMetrics.counter(NUMBER_RCF_INSTANCES); + this.keys = anomalyDetectorProcessorConfig.getKeys(); + this.verbose = anomalyDetectorProcessorConfig.getVerbose(); + forestMap = new HashMap<>(); } private AnomalyDetectorMode loadAnomalyDetectorMode(final PluginFactory pluginFactory) { @@ -45,7 +58,24 @@ private AnomalyDetectorMode loadAnomalyDetectorMode(final PluginFactory pluginFa @Override public Collection> doExecute(Collection> records) { - return mode.handleEvents(records); + final List> recordsOut = new LinkedList<>(); + + for (final Record record : records) { + final Event event = record.getData(); + // If user has not configured IdentificationKeys, the empty set will always hash to "31", + // so the same forest will be used, and we don't need to write a special case. + final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); + AnomalyDetectorMode forest = forestMap.get(identificationKeysMap.hashCode()); + + if (Objects.isNull(forest)) { + forest = loadAnomalyDetectorMode(pluginFactory); + forest.initialize(keys, verbose); + forestMap.put(identificationKeysMap.hashCode(), forest); + this.numberRCFInstances.increment(); + } + recordsOut.addAll(forestMap.get(identificationKeysMap.hashCode()).handleEvents(List.of(record))); + } + return recordsOut; } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java index 08bf2455c1..c92fdb9000 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java @@ -21,6 +21,12 @@ public class AnomalyDetectorProcessorConfig { @NotEmpty private List keys; + @JsonProperty("identification_keys") + private List identificationKeys; + + @JsonProperty("verbose") + private Boolean verbose = false; + public PluginModel getDetectorMode() { return detectorMode; } @@ -34,4 +40,12 @@ public List getKeys() { return keys; } + public List getIdentificationKeys() { + return identificationKeys; + } + public boolean getVerbose() { + return verbose; + } + + } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestMode.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestMode.java index 9a8f6c0b8f..c3be98d136 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestMode.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestMode.java @@ -58,7 +58,7 @@ public RandomCutForestMode(final RandomCutForestModeConfig randomCutForestModeCo } @Override - public void initialize(List keys) { + public void initialize(List keys, boolean verbose) { this.keys = keys; baseDimensions = keys.size(); Precision precision = Precision.FLOAT_32; @@ -79,7 +79,8 @@ public void initialize(List keys) { .transformMethod(transformMethod) .outputAfter(outputAfter) .timeDecay(timeDecay / sampleSize) - .initialAcceptFraction(INITIAL_ACCEPT_FRACTION).build(); + .initialAcceptFraction(INITIAL_ACCEPT_FRACTION) + .autoAdjust(!verbose).build(); forest.setLowerThreshold(LOWER_THRESHOLD); forest.setHorizon(HORIZON_VALUE); } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java index 90336ee09d..65c39518d2 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java @@ -5,6 +5,10 @@ package org.opensearch.dataprepper.plugins.processor.anomalydetector; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -43,6 +47,16 @@ public class AnomalyDetectorProcessorTests { @Mock private PluginMetrics pluginMetrics; + @Mock + private Counter numberRCFInstances; + @Mock + private Counter recordsIn; + + @Mock + private Counter recordsOut; + + @Mock + private Timer timeElapsed; @Mock private AnomalyDetectorProcessorConfig mockConfig; @@ -55,48 +69,29 @@ public class AnomalyDetectorProcessorTests { private AnomalyDetectorProcessor anomalyDetectorProcessor; - static Record buildRecordWithEvent(final Map data) { - return new Record<>(JacksonEvent.builder() - .withData(data) - .withEventType("event") - .build()); - } - private Record getBytesStringMessage(String message, String bytes) { - final Map testData = new HashMap(); - testData.put("message", message); - testData.put("bytes", bytes); - return buildRecordWithEvent(testData); - } + @BeforeEach + void setUp() { + when(mockConfig.getKeys()).thenReturn(new ArrayList(Collections.singleton("latency"))); + RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig(); - private Record getLatencyMessage(String message, Object latency) { - final Map testData = new HashMap(); - testData.put("message", message); - testData.put("latency", latency); - return buildRecordWithEvent(testData); - } + when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration); + when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); + when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); + when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) + .thenAnswer(invocation -> new RandomCutForestMode(randomCutForestModeConfig)); + + when(pluginMetrics.counter(AnomalyDetectorProcessor.NUMBER_RCF_INSTANCES)).thenReturn(numberRCFInstances); + when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(recordsIn); + when(pluginMetrics.counter(MetricNames.RECORDS_OUT)).thenReturn(recordsOut); + when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed); - private Record getLatencyBytesMessage(String message, double latency, long bytes) { - final Map testData = new HashMap(); - testData.put("message", message); - testData.put("latency", latency); - testData.put("bytes", bytes); - return buildRecordWithEvent(testData); } @ParameterizedTest @ValueSource(ints = {1, 2, 3, 4, 5, 6}) void testAnomalyDetectorProcessor(int type) { - when(mockConfig.getKeys()).thenReturn(new ArrayList(Collections.singleton("latency"))); - RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig(); - AnomalyDetectorMode anomalyDetectorMode = new RandomCutForestMode(randomCutForestModeConfig); - when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration); - when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); - when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); - - when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) - .thenReturn(anomalyDetectorMode); anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory); final int numSamples = 1024; final List> records = new ArrayList>(); @@ -146,19 +141,7 @@ void testAnomalyDetectorProcessor(int type) { @Test void testAnomalyDetectorProcessorTwoKeys() { - List keyList = new ArrayList(); - keyList.add("latency"); - keyList.add("bytes"); - when(mockConfig.getKeys()).thenReturn(keyList); - RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig(); - AnomalyDetectorMode anomalyDetectorMode = new RandomCutForestMode(randomCutForestModeConfig); - when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration); - when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); - when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); - - when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) - .thenReturn(anomalyDetectorMode); anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory); final int numSamples = 1024; final List> records = new ArrayList>(); @@ -170,7 +153,7 @@ void testAnomalyDetectorProcessorTwoKeys() { assertThat(recordsWithAnomaly.size(), equalTo(1)); Event event = recordsWithAnomaly.get(0).getData(); List deviation = event.get(AnomalyDetectorProcessor.DEVIATION_KEY, List.class); - for (int i = 0; i < keyList.size(); i++) { + for (int i = 0; i < mockConfig.getKeys().size(); i++) { assertThat((double)deviation.get(i), greaterThan(9.0)); } double grade = (double)event.get(AnomalyDetectorProcessor.GRADE_KEY, Double.class); @@ -180,15 +163,6 @@ void testAnomalyDetectorProcessorTwoKeys() { @Test void testAnomalyDetectorProcessorNoMatchingKeys() { when(mockConfig.getKeys()).thenReturn(new ArrayList(Collections.singleton("bytes"))); - RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig(); - AnomalyDetectorMode anomalyDetectorMode = new RandomCutForestMode(randomCutForestModeConfig); - - when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration); - when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); - when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); - - when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) - .thenReturn(anomalyDetectorMode); anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory); final int numSamples = 1024; final List> records = new ArrayList>(); @@ -204,15 +178,6 @@ void testAnomalyDetectorProcessorNoMatchingKeys() { @Test void testAnomalyDetectorProcessorInvalidTypeKeys() { when(mockConfig.getKeys()).thenReturn(new ArrayList(Collections.singleton("bytes"))); - RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig(); - AnomalyDetectorMode anomalyDetectorMode = new RandomCutForestMode(randomCutForestModeConfig); - - when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration); - when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); - when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); - - when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) - .thenReturn(anomalyDetectorMode); anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory); final int numSamples = 1024; final List> records = new ArrayList>(); @@ -221,4 +186,70 @@ void testAnomalyDetectorProcessorInvalidTypeKeys() { } assertThrows(RuntimeException.class, () -> anomalyDetectorProcessor.doExecute(records)); } + + @Test + void testAnomalyDetectorCardinality() { + List identificationKeyList = new ArrayList(); + identificationKeyList.add("ip"); + when(mockConfig.getIdentificationKeys()).thenReturn(identificationKeyList); + + anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory); + final int numSamples = 1024; + final List> records = new ArrayList>(); + for (int i = 0; i < numSamples; i++) { + if (i % 2 == 0) { + records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6), ThreadLocalRandom.current().nextLong(100, 110), "1.1.1.1")); + } else { + records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "255.255.255.255")); + } + } + + anomalyDetectorProcessor.doExecute(records); + + final List> slowRecordFromFastIp = (List>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.8), ThreadLocalRandom.current().nextLong(1000, 1110), "1.1.1.1"))); + assertThat(slowRecordFromFastIp.size(), equalTo(1)); + + final List> slowRecordFromSlowIp = (List>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "255.255.255.255"))); + assertThat(slowRecordFromSlowIp.size(), equalTo(0)); + + } + + static Record buildRecordWithEvent(final Map data) { + return new Record<>(JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build()); + } + + private Record getBytesStringMessage(String message, String bytes) { + final Map testData = new HashMap(); + testData.put("message", message); + testData.put("bytes", bytes); + return buildRecordWithEvent(testData); + } + + private Record getLatencyMessage(String message, Object latency) { + final Map testData = new HashMap(); + testData.put("message", message); + testData.put("latency", latency); + return buildRecordWithEvent(testData); + } + + private Record getLatencyBytesMessage(String message, double latency, long bytes) { + final Map testData = new HashMap(); + testData.put("message", message); + testData.put("latency", latency); + testData.put("bytes", bytes); + return buildRecordWithEvent(testData); + } + + private Record getLatencyBytesMessageWithIp(String message, double latency, long bytes, String ip) { + final Map testData = new HashMap(); + testData.put("message", message); + testData.put("latency", latency); + testData.put("bytes", bytes); + testData.put("ip", ip); + + return buildRecordWithEvent(testData); + } } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java index be3afe1fad..c52788433f 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; + import static org.mockito.Mockito.mock; import org.mockito.Mock; import static org.mockito.Mockito.when; @@ -92,7 +93,7 @@ private RandomCutForestMode createObjectUnderTest() { void testRandomCutForestMode() { randomCutForestMode = createObjectUnderTest(); List keys = new ArrayList(Collections.singleton("latency")); - randomCutForestMode.initialize(keys); + randomCutForestMode.initialize(keys, false); final int numSamples = 1024; List> records = new ArrayList>(); for (int i = 0; i < numSamples; i++) { @@ -118,7 +119,7 @@ void testRandomCutForestModeMultipleKeys() { String longFieldName = "bytes"; keyList.add(floatFieldName); keyList.add(longFieldName); - randomCutForestMode.initialize(keyList); + randomCutForestMode.initialize(keyList, false); final int numSamples = 1024; List> records = new ArrayList>(); for (int i = 0; i < numSamples; i++) { @@ -151,7 +152,7 @@ void testRandomCutForestModeWithOutputAfter() { String longFieldName = "hour"; keyList.add(floatFieldName); keyList.add(longFieldName); - randomCutForestMode.initialize(keyList); + randomCutForestMode.initialize(keyList, false); final int numSamples = (365+200)*24+4; // number of samples more than a year List> records = new ArrayList>(); long hour = 0; @@ -166,4 +167,44 @@ void testRandomCutForestModeWithOutputAfter() { final List> anomalyRecords = randomCutForestMode.handleEvents(recordsWithAnomaly).stream().collect(toList());; assertThat(anomalyRecords.size(), equalTo(1)); } + + @Test + void testRandomCutForestModeVerboseTrue() { + randomCutForestMode = createObjectUnderTest(); + List keys = new ArrayList(Collections.singleton("latency")); + randomCutForestMode.initialize(keys, true); + final int numSamples = 1024; + List> records = new ArrayList>(); + for (int i = 0; i < numSamples; i++) { + records.add(getLatencyMessage(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6))); + } + randomCutForestMode.handleEvents(records); + final List> recordsWithAnomaly = new ArrayList>(); + for (int i = 0; i < numSamples; i++) { + recordsWithAnomaly.add(getLatencyMessage(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1, 1.1))); + } + + final List> anomalyRecords = randomCutForestMode.handleEvents(recordsWithAnomaly).stream().collect(toList());; + assertThat(anomalyRecords.size(), greaterThan(5)); + } + + @Test + void testRandomCutForestModeVerboseFalse() { + randomCutForestMode = createObjectUnderTest(); + List keys = new ArrayList(Collections.singleton("latency")); + randomCutForestMode.initialize(keys, false); + final int numSamples = 1024; + List> records = new ArrayList>(); + for (int i = 0; i < numSamples; i++) { + records.add(getLatencyMessage(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6))); + } + randomCutForestMode.handleEvents(records); + final List> recordsWithAnomaly = new ArrayList>(); + for (int i = 0; i < numSamples; i++) { + recordsWithAnomaly.add(getLatencyMessage(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1, 1.1))); + } + + final List> anomalyRecords = randomCutForestMode.handleEvents(recordsWithAnomaly).stream().collect(toList());; + assertThat(anomalyRecords.size(), equalTo(1)); + } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasher.java similarity index 76% rename from data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java rename to data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasher.java index cc2a7ca580..2dd86ae893 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasher.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasher.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.aggregate; +package org.opensearch.dataprepper.plugins.hasher; import org.opensearch.dataprepper.model.event.Event; @@ -12,13 +12,13 @@ import java.util.Map; import java.util.Objects; -class AggregateIdentificationKeysHasher { +public class IdentificationKeysHasher { private final List identificationKeys; - AggregateIdentificationKeysHasher(final List identificationKeys) { + public IdentificationKeysHasher(final List identificationKeys) { this.identificationKeys = identificationKeys; } - IdentificationKeysMap createIdentificationKeysMapFromEvent(final Event event) { + public IdentificationKeysMap createIdentificationKeysMapFromEvent(final Event event) { final Map identificationKeysMap = new HashMap<>(); for (final String identificationKey : identificationKeys) { identificationKeysMap.put(identificationKey, event.get(identificationKey, Object.class)); @@ -29,7 +29,7 @@ IdentificationKeysMap createIdentificationKeysMapFromEvent(final Event event) { public static class IdentificationKeysMap { private final Map keyMap; - IdentificationKeysMap(final Map keyMap) { + public IdentificationKeysMap(final Map keyMap) { this.keyMap = keyMap; } @@ -46,7 +46,7 @@ public int hashCode() { return Objects.hash(keyMap); } - Map getKeyMap() { + public Map getKeyMap() { return keyMap; } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasherTest.java similarity index 65% rename from data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java rename to data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasherTest.java index 63221811f6..aac85d02b8 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateIdentificationKeysHasherTest.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/hasher/IdentificationKeysHasherTest.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.processor.aggregate; +package org.opensearch.dataprepper.plugins.hasher; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; @@ -21,10 +21,10 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; -public class AggregateIdentificationKeysHasherTest { +public class IdentificationKeysHasherTest { private Event event; private List identificationKeys; - private AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher; + private IdentificationKeysHasher identificationKeysHasher; @BeforeEach void setup() { @@ -33,18 +33,18 @@ void setup() { identificationKeys.add("secondIdentificationKey"); } - private AggregateIdentificationKeysHasher createObjectUnderTest() { - return new AggregateIdentificationKeysHasher(identificationKeys); + private IdentificationKeysHasher createObjectUnderTest() { + return new IdentificationKeysHasher(identificationKeys); } @Test void createIdentificationKeysMapFromEvent_returns_expected_IdentficationKeysMap() { - aggregateIdentificationKeysHasher = createObjectUnderTest(); + identificationKeysHasher = createObjectUnderTest(); final Map eventMap = new HashMap<>(); eventMap.put("firstIdentificationKey", UUID.randomUUID().toString()); eventMap.put("secondIdentificationKey", UUID.randomUUID().toString()); - final AggregateIdentificationKeysHasher.IdentificationKeysMap expectedResult = new AggregateIdentificationKeysHasher.IdentificationKeysMap(new HashMap<>(eventMap)); + final IdentificationKeysHasher.IdentificationKeysMap expectedResult = new IdentificationKeysHasher.IdentificationKeysMap(new HashMap<>(eventMap)); eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); @@ -53,20 +53,20 @@ void createIdentificationKeysMapFromEvent_returns_expected_IdentficationKeysMap( .withData(eventMap) .build(); - final AggregateIdentificationKeysHasher.IdentificationKeysMap result = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event); + final IdentificationKeysHasher.IdentificationKeysMap result = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); assertThat(result, equalTo(expectedResult)); } @Test void createIdentificationKeysMapFromEvent_where_Event_does_not_contain_one_of_the_identification_keys_returns_expected_Map() { - aggregateIdentificationKeysHasher = createObjectUnderTest(); + identificationKeysHasher = createObjectUnderTest(); final Map eventMap = new HashMap<>(); eventMap.put("firstIdentificationKey", UUID.randomUUID().toString()); final Map mapForExpectedHash = new HashMap<>(eventMap); mapForExpectedHash.put("secondIdentificationKey", null); - final AggregateIdentificationKeysHasher.IdentificationKeysMap expectedResult = new AggregateIdentificationKeysHasher.IdentificationKeysMap(mapForExpectedHash); + final IdentificationKeysHasher.IdentificationKeysMap expectedResult = new IdentificationKeysHasher.IdentificationKeysMap(mapForExpectedHash); eventMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); @@ -75,13 +75,13 @@ void createIdentificationKeysMapFromEvent_where_Event_does_not_contain_one_of_th .withData(eventMap) .build(); - final AggregateIdentificationKeysHasher.IdentificationKeysMap result = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event); + final IdentificationKeysHasher.IdentificationKeysMap result = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); assertThat(result, equalTo(expectedResult)); } @Test void identical_identification_hashes_but_different_objects_are_considered_equal() { - aggregateIdentificationKeysHasher = createObjectUnderTest(); + identificationKeysHasher = createObjectUnderTest(); final Map eventMap = new HashMap<>(); eventMap.put("firstIdentificationKey", UUID.randomUUID().toString()); eventMap.put("secondIdentificationKey", UUID.randomUUID().toString()); @@ -92,15 +92,15 @@ void identical_identification_hashes_but_different_objects_are_considered_equal( .withData(eventMap) .build(); - final AggregateIdentificationKeysHasher.IdentificationKeysMap result = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event); - final AggregateIdentificationKeysHasher.IdentificationKeysMap secondResult = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event); + final IdentificationKeysHasher.IdentificationKeysMap result = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); + final IdentificationKeysHasher.IdentificationKeysMap secondResult = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); assertThat(result, equalTo(secondResult)); } @Test void different_identification_hashes_are_not_considered_equal() { - aggregateIdentificationKeysHasher = createObjectUnderTest(); + identificationKeysHasher = createObjectUnderTest(); final Map eventMap = new HashMap<>(); eventMap.put("firstIdentificationKey", UUID.randomUUID().toString()); eventMap.put("secondIdentificationKey", UUID.randomUUID().toString()); @@ -119,8 +119,8 @@ void different_identification_hashes_are_not_considered_equal() { .withData(secondEventMap) .build(); - final AggregateIdentificationKeysHasher.IdentificationKeysMap result = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event); - final AggregateIdentificationKeysHasher.IdentificationKeysMap secondResult = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(secondEvent); + final IdentificationKeysHasher.IdentificationKeysMap result = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); + final IdentificationKeysHasher.IdentificationKeysMap secondResult = identificationKeysHasher.createIdentificationKeysMapFromEvent(secondEvent); assertThat(result, is(not(equalTo(secondResult)))); }