Skip to content

Commit

Permalink
Adding cardinality key support for AD processor (#3073)
Browse files Browse the repository at this point in the history
* Adding cardinality key support for AD processor

Signed-off-by: Jonah Calvo <[email protected]>

* Refactor hash function to common package. Add metrics for RCF instances. Implement optional verbose mode for RCF

Signed-off-by: Jonah Calvo <[email protected]>

---------

Signed-off-by: Jonah Calvo <[email protected]>
  • Loading branch information
JonahCalvo authored Aug 10, 2023
1 parent 000c39d commit e54c838
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -49,7 +50,7 @@ private AggregateActionSynchronizer(final AggregateAction aggregateAction, final
this.actionConcludeGroupEventsProcessingErrors = pluginMetrics.counter(ACTION_CONCLUDE_GROUP_EVENTS_PROCESSING_ERRORS);
}

AggregateActionOutput concludeGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) {
AggregateActionOutput concludeGroup(final IdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup, final boolean forceConclude) {
final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock();
final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock();

Expand All @@ -74,7 +75,7 @@ AggregateActionOutput concludeGroup(final AggregateIdentificationKeysHasher.Iden
return actionOutput;
}

AggregateActionResponse handleEventForGroup(final Event event, final AggregateIdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup) {
AggregateActionResponse handleEventForGroup(final Event event, final IdentificationKeysHasher.IdentificationKeysMap hash, final AggregateGroup aggregateGroup) {
final Lock concludeGroupLock = aggregateGroup.getConcludeGroupLock();
final Lock handleEventForGroupLock = aggregateGroup.getHandleEventForGroupLock();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import com.google.common.collect.Maps;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -14,33 +15,33 @@

class AggregateGroupManager {

private final Map<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> allGroups = Maps.newConcurrentMap();
private final Map<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> allGroups = Maps.newConcurrentMap();
private final Duration groupDuration;

AggregateGroupManager(final Duration groupDuration) {
this.groupDuration = groupDuration;
}

AggregateGroup getAggregateGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) {
AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap) {
return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap()));
}

List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
if (groupEntry.getValue().shouldConcludeGroup(groupDuration) || forceConclude) {
groupsToConclude.add(groupEntry);
}
}
return groupsToConclude;
}

void closeGroup(final AggregateIdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
void closeGroup(final IdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
allGroups.remove(hashKeyMap, group);
group.resetGroup();
}

void putGroupWithHash(final AggregateIdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
void putGroupWithHash(final IdentificationKeysHasher.IdentificationKeysMap hashKeyMap, final AggregateGroup group) {
allGroups.put(hashKeyMap, group);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.util.Collection;
import java.util.LinkedList;
Expand All @@ -41,7 +42,7 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
private final AggregateProcessorConfig aggregateProcessorConfig;
private final AggregateGroupManager aggregateGroupManager;
private final AggregateActionSynchronizer aggregateActionSynchronizer;
private final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher;
private final IdentificationKeysHasher identificationKeysHasher;
private final AggregateAction aggregateAction;

private boolean forceConclude = false;
Expand All @@ -51,15 +52,15 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
@DataPrepperPluginConstructor
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final ExpressionEvaluator expressionEvaluator) {
this(aggregateProcessorConfig, pluginMetrics, pluginFactory, new AggregateGroupManager(aggregateProcessorConfig.getGroupDuration()),
new AggregateIdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator);
new IdentificationKeysHasher(aggregateProcessorConfig.getIdentificationKeys()), new AggregateActionSynchronizer.AggregateActionSynchronizerProvider(), expressionEvaluator);
}
public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, final AggregateGroupManager aggregateGroupManager,
final AggregateIdentificationKeysHasher aggregateIdentificationKeysHasher, final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider, final ExpressionEvaluator expressionEvaluator) {
final IdentificationKeysHasher identificationKeysHasher, final AggregateActionSynchronizer.AggregateActionSynchronizerProvider aggregateActionSynchronizerProvider, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.aggregateProcessorConfig = aggregateProcessorConfig;
this.aggregateGroupManager = aggregateGroupManager;
this.expressionEvaluator = expressionEvaluator;
this.aggregateIdentificationKeysHasher = aggregateIdentificationKeysHasher;
this.identificationKeysHasher = identificationKeysHasher;
this.aggregateAction = loadAggregateAction(pluginFactory);
this.aggregateActionSynchronizer = aggregateActionSynchronizerProvider.provide(aggregateAction, aggregateGroupManager, pluginMetrics);

Expand All @@ -82,8 +83,8 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final List<Record<Event>> recordsOut = new LinkedList<>();

final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude);
for (final Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : groupsToConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(forceConclude);
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : groupsToConclude) {
final AggregateActionOutput actionOutput = aggregateActionSynchronizer.concludeGroup(groupEntry.getKey(), groupEntry.getValue(), forceConclude);

final List<Event> concludeGroupEvents = actionOutput != null ? actionOutput.getEvents() : null;
Expand All @@ -105,7 +106,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
handleEventsDropped++;
continue;
}
final AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = aggregateIdentificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);

final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -47,7 +48,7 @@ public class AggregateActionSynchronizerTest {
private AggregateGroup aggregateGroup;

@Mock
private AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;
private IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;

@Mock
private AggregateActionResponse aggregateActionResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -28,7 +29,7 @@ public class AggregateGroupManagerTest {

private AggregateGroupManager aggregateGroupManager;

private AggregateIdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;
private IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap;

private static final Duration TEST_GROUP_DURATION = Duration.ofSeconds(new Random().nextInt(10) + 10);

Expand All @@ -37,7 +38,7 @@ void setup() {
final Map<Object, Object> identificationKeysHash = new HashMap<>();
identificationKeysHash.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());

identificationKeysMap = new AggregateIdentificationKeysHasher.IdentificationKeysMap(identificationKeysHash);
identificationKeysMap = new IdentificationKeysHasher.IdentificationKeysMap(identificationKeysHash);
}

private AggregateGroupManager createObjectUnderTest() {
Expand Down Expand Up @@ -92,16 +93,16 @@ void getGroupsToConclude_returns_correct_group() {

final AggregateGroup groupToConclude = mock(AggregateGroup.class);
when(groupToConclude.shouldConcludeGroup(TEST_GROUP_DURATION)).thenReturn(true);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

final AggregateGroup groupToNotConclude = mock(AggregateGroup.class);
when(groupToNotConclude.shouldConcludeGroup(TEST_GROUP_DURATION)).thenReturn(false);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToNotConclude = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToNotConclude = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

aggregateGroupManager.putGroupWithHash(hashForGroupToConclude, groupToConclude);
aggregateGroupManager.putGroupWithHash(hashForGroupToNotConclude, groupToNotConclude);

final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(false);
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(false);

assertThat(groupsToConclude.size(), equalTo(1));
assertThat(groupsToConclude.get(0), notNullValue());
Expand All @@ -114,15 +115,15 @@ void getGroupsToConclude_with_force_conclude_return_all() {
aggregateGroupManager = createObjectUnderTest();

final AggregateGroup groupToConclude1 = mock(AggregateGroup.class);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude1 = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude1 = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

final AggregateGroup groupToConclude2 = mock(AggregateGroup.class);
final AggregateIdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude2 = mock(AggregateIdentificationKeysHasher.IdentificationKeysMap.class);
final IdentificationKeysHasher.IdentificationKeysMap hashForGroupToConclude2 = mock(IdentificationKeysHasher.IdentificationKeysMap.class);

aggregateGroupManager.putGroupWithHash(hashForGroupToConclude1, groupToConclude1);
aggregateGroupManager.putGroupWithHash(hashForGroupToConclude2, groupToConclude2);

final List<Map.Entry<AggregateIdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(true);
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = aggregateGroupManager.getGroupsToConclude(true);

assertThat(groupsToConclude.size(), equalTo(2));
assertThat(groupsToConclude.get(0), notNullValue());
Expand Down
Loading

0 comments on commit e54c838

Please sign in to comment.