diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java index 0eb33f979f..ad4bb8d2c7 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.anomalydetector; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -17,6 +18,8 @@ import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; @@ -30,14 +33,19 @@ public class AnomalyDetectorProcessor extends AbstractProcessor, R public static final String DEVIATION_KEY = "deviation_from_expected"; public static final String GRADE_KEY = "grade"; static final String NUMBER_RCF_INSTANCES = "RCFInstances"; + static final String CARDINALITY_OVERFLOW = "cardinalityOverflow"; private final Boolean verbose; + private final int cardinalityLimit; private final IdentificationKeysHasher identificationKeysHasher; private final List keys; private final PluginFactory pluginFactory; private final HashMap forestMap; private final AtomicInteger cardinality; private final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig; + private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorProcessor.class); + private final Counter cardinalityOverflowCounter; + private boolean overflowWarned = false; @DataPrepperPluginConstructor public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { @@ -48,6 +56,8 @@ public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDete this.keys = anomalyDetectorProcessorConfig.getKeys(); this.verbose = anomalyDetectorProcessorConfig.getVerbose(); this.cardinality = pluginMetrics.gauge(NUMBER_RCF_INSTANCES, new AtomicInteger()); + this.cardinalityLimit = anomalyDetectorProcessorConfig.getCardinalityLimit(); + this.cardinalityOverflowCounter = pluginMetrics.counter(CARDINALITY_OVERFLOW); forestMap = new HashMap<>(); } @@ -68,12 +78,20 @@ public Collection> doExecute(Collection> records) { final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); AnomalyDetectorMode forest = forestMap.get(identificationKeysMap.hashCode()); - if (Objects.isNull(forest)) { + if (Objects.nonNull(forest)) { + recordsOut.addAll(forest.handleEvents(List.of(record))); + } else if (forestMap.size() < cardinalityLimit) { forest = loadAnomalyDetectorMode(pluginFactory); forest.initialize(keys, verbose); forestMap.put(identificationKeysMap.hashCode(), forest); + recordsOut.addAll(forest.handleEvents(List.of(record))); + } else { + if (!overflowWarned) { + LOG.warn("Cardinality limit reached, see cardinalityOverflow metric for count of skipped records"); + overflowWarned = true; + } + cardinalityOverflowCounter.increment(); } - recordsOut.addAll(forestMap.get(identificationKeysMap.hashCode()).handleEvents(List.of(record))); } cardinality.set(forestMap.size()); return recordsOut; diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java index 7e796e660a..6331ee1f21 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java @@ -28,6 +28,9 @@ public class AnomalyDetectorProcessorConfig { @JsonProperty("verbose") private Boolean verbose = false; + @JsonProperty("cardinality_limit") + private int cardinalityLimit = 5000; + public PluginModel getDetectorMode() { return detectorMode; } @@ -47,6 +50,9 @@ public List getIdentificationKeys() { public boolean getVerbose() { return verbose; } + public int getCardinalityLimit() { + return cardinalityLimit; + } } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java index 302a692dd7..c7ab98a4d4 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java @@ -55,6 +55,8 @@ public class AnomalyDetectorProcessorTests { @Mock private Counter recordsOut; + @Mock + private Counter cardinalityOverflow; @Mock private Timer timeElapsed; @@ -77,6 +79,7 @@ void setUp() { RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig(); when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration); + when(mockConfig.getCardinalityLimit()).thenReturn(5000); when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) @@ -215,6 +218,43 @@ void testAnomalyDetectorCardinality() { } + @Test + void testAnomalyDetectorMaxCardinality() { + List identificationKeyList = new ArrayList(); + identificationKeyList.add("ip"); + when(mockConfig.getIdentificationKeys()).thenReturn(identificationKeyList); + when(mockConfig.getCardinalityLimit()).thenReturn(2); + when(pluginMetrics.counter(AnomalyDetectorProcessor.CARDINALITY_OVERFLOW)).thenReturn(cardinalityOverflow); + + anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory); + final int numSamples = 1024; + final List> records = new ArrayList>(); + for (int i = 0; i < numSamples; i++) { + if (i % 2 == 0) { + records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6), ThreadLocalRandom.current().nextLong(100, 110), "1.1.1.1")); + } else { + records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "255.255.255.255")); + } + } + // Since limit is 2, the next two IPs will not have anomaly detection + for (int i = 0; i < numSamples; i++) { + if (i % 2 == 0) { + records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6), ThreadLocalRandom.current().nextLong(100, 110), "2.2.2.2")); + } else { + records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "254.254.254.254")); + } + } + + anomalyDetectorProcessor.doExecute(records); + + final List> newIpOne = (List>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1000.5, 1000.6), ThreadLocalRandom.current().nextLong(1000, 1110), "3.3.3.3"))); + assertThat(newIpOne.size(), equalTo(0)); + + final List> newIpTwo = (List>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1500.5, 1500.6), ThreadLocalRandom.current().nextLong(1000, 1110), "144.123.412.123"))); + assertThat(newIpTwo.size(), equalTo(0)); + + } + static Record buildRecordWithEvent(final Map data) { return new Record<>(JacksonEvent.builder() .withData(data)