Skip to content

Commit

Permalink
Add limit to cardinality key, and metric for cardinality overflow
Browse files Browse the repository at this point in the history
Signed-off-by: Jonah Calvo <[email protected]>
  • Loading branch information
JonahCalvo committed Aug 16, 2023
1 parent 252a0dd commit cca9e28
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.anomalydetector;


import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand All @@ -17,6 +18,8 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
Expand All @@ -30,14 +33,19 @@ public class AnomalyDetectorProcessor extends AbstractProcessor<Record<Event>, R
public static final String DEVIATION_KEY = "deviation_from_expected";
public static final String GRADE_KEY = "grade";
static final String NUMBER_RCF_INSTANCES = "RCFInstances";
static final String CARDINALITY_OVERFLOW = "cardinalityOverflow";

private final Boolean verbose;
private final int cardinalityLimit;
private final IdentificationKeysHasher identificationKeysHasher;
private final List<String> keys;
private final PluginFactory pluginFactory;
private final HashMap<Integer, AnomalyDetectorMode> forestMap;
private final AtomicInteger cardinality;
private final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig;
private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorProcessor.class);
private final Counter cardinalityOverflowCounter;
private boolean overflowWarned = false;

@DataPrepperPluginConstructor
public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) {
Expand All @@ -48,6 +56,8 @@ public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDete
this.keys = anomalyDetectorProcessorConfig.getKeys();
this.verbose = anomalyDetectorProcessorConfig.getVerbose();
this.cardinality = pluginMetrics.gauge(NUMBER_RCF_INSTANCES, new AtomicInteger());
this.cardinalityLimit = anomalyDetectorProcessorConfig.getCardinalityLimit();
this.cardinalityOverflowCounter = pluginMetrics.counter(CARDINALITY_OVERFLOW);
forestMap = new HashMap<>();
}

Expand All @@ -68,12 +78,20 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
AnomalyDetectorMode forest = forestMap.get(identificationKeysMap.hashCode());

if (Objects.isNull(forest)) {
if (Objects.nonNull(forest)) {
recordsOut.addAll(forest.handleEvents(List.of(record)));
} else if (forestMap.size() < cardinalityLimit) {
forest = loadAnomalyDetectorMode(pluginFactory);
forest.initialize(keys, verbose);
forestMap.put(identificationKeysMap.hashCode(), forest);
recordsOut.addAll(forest.handleEvents(List.of(record)));
} else {
if (!overflowWarned) {
LOG.warn("Cardinality limit reached, see cardinalityOverflow metric for count of skipped records");
overflowWarned = true;
}
cardinalityOverflowCounter.increment();
}
recordsOut.addAll(forestMap.get(identificationKeysMap.hashCode()).handleEvents(List.of(record)));
}
cardinality.set(forestMap.size());
return recordsOut;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class AnomalyDetectorProcessorConfig {
@JsonProperty("verbose")
private Boolean verbose = false;

@JsonProperty("cardinality_limit")
private int cardinalityLimit = 5000;

public PluginModel getDetectorMode() {
return detectorMode;
}
Expand All @@ -47,6 +50,9 @@ public List<String> getIdentificationKeys() {
public boolean getVerbose() {
return verbose;
}
public int getCardinalityLimit() {
return cardinalityLimit;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class AnomalyDetectorProcessorTests {

@Mock
private Counter recordsOut;
@Mock
private Counter cardinalityOverflow;

@Mock
private Timer timeElapsed;
Expand All @@ -77,6 +79,7 @@ void setUp() {
RandomCutForestModeConfig randomCutForestModeConfig = new RandomCutForestModeConfig();

when(mockConfig.getDetectorMode()).thenReturn(modeConfiguration);
when(mockConfig.getCardinalityLimit()).thenReturn(5000);
when(modeConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString());
when(modeConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap());
when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class)))
Expand Down Expand Up @@ -215,6 +218,43 @@ void testAnomalyDetectorCardinality() {

}

@Test
void testAnomalyDetectorMaxCardinality() {
List<String> identificationKeyList = new ArrayList<String>();
identificationKeyList.add("ip");
when(mockConfig.getIdentificationKeys()).thenReturn(identificationKeyList);
when(mockConfig.getCardinalityLimit()).thenReturn(2);
when(pluginMetrics.counter(AnomalyDetectorProcessor.CARDINALITY_OVERFLOW)).thenReturn(cardinalityOverflow);

anomalyDetectorProcessor = new AnomalyDetectorProcessor(mockConfig, pluginMetrics, pluginFactory);
final int numSamples = 1024;
final List<Record<Event>> records = new ArrayList<Record<Event>>();
for (int i = 0; i < numSamples; i++) {
if (i % 2 == 0) {
records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6), ThreadLocalRandom.current().nextLong(100, 110), "1.1.1.1"));
} else {
records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "255.255.255.255"));
}
}
// Since limit is 2, the next two IPs will not have anomaly detection
for (int i = 0; i < numSamples; i++) {
if (i % 2 == 0) {
records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(0.5, 0.6), ThreadLocalRandom.current().nextLong(100, 110), "2.2.2.2"));
} else {
records.add(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(15.5, 15.6), ThreadLocalRandom.current().nextLong(1000, 1110), "254.254.254.254"));
}
}

anomalyDetectorProcessor.doExecute(records);

final List<Record<Event>> newIpOne = (List<Record<Event>>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1000.5, 1000.6), ThreadLocalRandom.current().nextLong(1000, 1110), "3.3.3.3")));
assertThat(newIpOne.size(), equalTo(0));

final List<Record<Event>> newIpTwo = (List<Record<Event>>) anomalyDetectorProcessor.doExecute(Collections.singletonList(getLatencyBytesMessageWithIp(UUID.randomUUID().toString(), ThreadLocalRandom.current().nextDouble(1500.5, 1500.6), ThreadLocalRandom.current().nextLong(1000, 1110), "144.123.412.123")));
assertThat(newIpTwo.size(), equalTo(0));

}

static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
Expand Down

0 comments on commit cca9e28

Please sign in to comment.