Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kkondaka authored Jun 26, 2024
2 parents ec0f3b1 + c5fe1b4 commit d29fac8
Show file tree
Hide file tree
Showing 51 changed files with 717 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void put(EventKey key, Object value) {
*/
@Override
public void put(final String key, final Object value) {
final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.PUT);
final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.PUT);
put(jacksonEventKey, value);
}

Expand Down Expand Up @@ -229,7 +229,7 @@ private static JacksonEventKey asJacksonEventKey(EventKey key) {
*/
@Override
public <T> T get(final String key, final Class<T> clazz) {
final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET);
final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET);
return get(jacksonEventKey, clazz);
}

Expand Down Expand Up @@ -274,7 +274,7 @@ public <T> List<T> getList(EventKey key, Class<T> clazz) {
*/
@Override
public <T> List<T> getList(final String key, final Class<T> clazz) {
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET);
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET);
return getList(jacksonEventKey, clazz);
}

Expand Down Expand Up @@ -330,7 +330,7 @@ public void delete(final EventKey key) {
*/
@Override
public void delete(final String key) {
final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.DELETE);
final JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.DELETE);
delete(jacksonEventKey);
}

Expand Down Expand Up @@ -362,7 +362,7 @@ public String getAsJsonString(EventKey key) {

@Override
public String getAsJsonString(final String key) {
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET);
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET);
return getAsJsonString(jacksonEventKey);
}

Expand Down Expand Up @@ -459,7 +459,7 @@ public boolean containsKey(EventKey key) {

@Override
public boolean containsKey(final String key) {
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET);
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET);
return containsKey(jacksonEventKey);
}

Expand All @@ -474,7 +474,7 @@ public boolean isValueAList(EventKey key) {

@Override
public boolean isValueAList(final String key) {
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, EventKeyFactory.EventAction.GET);
JacksonEventKey jacksonEventKey = new JacksonEventKey(key, true, EventKeyFactory.EventAction.GET);
return isValueAList(jacksonEventKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,45 @@ class JacksonEventKey implements EventKey {
private final String key;
private final EventKeyFactory.EventAction[] eventActions;
private final String trimmedKey;
private final List<String> keyPathList;
private final JsonPointer jsonPointer;
private List<String> keyPathList;
private JsonPointer jsonPointer;
private final Set<EventKeyFactory.EventAction> supportedActions;

/**
* Constructor for the JacksonEventKey which should only be used by implementation
* of {@link EventKeyFactory} in Data Prepper core.
*
* @param key The key
* @param eventActions Event actions to support
*/
JacksonEventKey(final String key, final EventKeyFactory.EventAction... eventActions) {
this(key, false, eventActions);
}

/**
* Constructs a new JacksonEventKey.
* <p>
* This overload should only be used by {@link JacksonEvent} directly. It allows for skipping creating
* some resources knowing they will not be needed. The {@link JacksonEvent} only needs a JSON pointer
* when performing GET event actions. So we can optimize PUT/DELETE actions when called with a string
* key instead of an EventKey by not creating the JSON Pointer at all.
* <p>
* For EventKey's constructed through the factory, we should not perform lazy initialization since
* we may lose some possible validations.
*
* @param key the key
* @param lazy Use true to lazily initialize. This will not be thread-safe, however.
* @param eventActions Event actions to support
*/
JacksonEventKey(final String key, final boolean lazy, final EventKeyFactory.EventAction... eventActions) {
this.key = Objects.requireNonNull(key, "Parameter key cannot be null for EventKey.");
this.eventActions = eventActions.length == 0 ? new EventKeyFactory.EventAction[] { EventKeyFactory.EventAction.ALL } : eventActions;

supportedActions = EnumSet.noneOf(EventKeyFactory.EventAction.class);
for (final EventKeyFactory.EventAction eventAction : this.eventActions) {
supportedActions.addAll(eventAction.getSupportedActions());
}

if(key.isEmpty()) {
for (final EventKeyFactory.EventAction action : this.eventActions) {
if (action.isMutableAction()) {
Expand All @@ -40,14 +71,10 @@ class JacksonEventKey implements EventKey {

trimmedKey = checkAndTrimKey(key);

keyPathList = Collections.unmodifiableList(Arrays.asList(trimmedKey.split(SEPARATOR, -1)));
jsonPointer = toJsonPointer(trimmedKey);

supportedActions = EnumSet.noneOf(EventKeyFactory.EventAction.class);
for (final EventKeyFactory.EventAction eventAction : this.eventActions) {
supportedActions.addAll(eventAction.getSupportedActions());
if(!lazy) {
keyPathList = toKeyPathList();
jsonPointer = toJsonPointer(trimmedKey);
}

}

@Override
Expand All @@ -60,10 +87,16 @@ String getTrimmedKey() {
}

List<String> getKeyPathList() {
if(keyPathList == null) {
keyPathList = toKeyPathList();
}
return keyPathList;
}

JsonPointer getJsonPointer() {
if(jsonPointer == null) {
jsonPointer = toJsonPointer(trimmedKey);
}
return jsonPointer;
}

Expand Down Expand Up @@ -136,7 +169,11 @@ private static boolean isValidKey(final String key) {
return true;
}

private JsonPointer toJsonPointer(final String key) {
private List<String> toKeyPathList() {
return Collections.unmodifiableList(Arrays.asList(trimmedKey.split(SEPARATOR, -1)));
}

private static JsonPointer toJsonPointer(final String key) {
final String jsonPointerExpression;
if (key.isEmpty() || key.startsWith("/")) {
jsonPointerExpression = key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,23 @@ void getJsonPointer_returns_the_same_instance_for_multiple_calls() {
assertThat(objectUnderTest.getJsonPointer(), sameInstance(jsonPointer));
}

@ParameterizedTest
@EnumSource(value = EventKeyFactory.EventAction.class)
void getJsonPointer_returns_valid_JsonPointer_when_constructed_with_fromJacksonEvent(final EventKeyFactory.EventAction eventAction) {
final String testKey = UUID.randomUUID().toString();
final JacksonEventKey objectUnderTest = new JacksonEventKey(testKey, true, eventAction);

final JsonPointer jsonPointer = objectUnderTest.getJsonPointer();
assertThat(jsonPointer, notNullValue());
assertThat(jsonPointer.toString(), equalTo("/" + testKey));
}

@ParameterizedTest
@ArgumentsSource(KeyPathListArgumentsProvider.class)
void getKeyPathList_returns_expected_value_when_constructed_with_fromJacksonEvent(final String key, final List<String> expectedKeyPathList) {
assertThat(new JacksonEventKey(key, true).getKeyPathList(), equalTo(expectedKeyPathList));
}

@ParameterizedTest
@ArgumentsSource(SupportsArgumentsProvider.class)
void supports_returns_true_if_any_supports(final List<EventKeyFactory.EventAction> eventActionsList, final EventKeyFactory.EventAction otherAction, final boolean expectedSupports) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.integration;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;

class Router_ThreeRoutesDefaultIT {
private static final String TESTING_KEY = "ConditionalRoutingIT";
private static final String ALL_SOURCE_KEY = TESTING_KEY + "_all";
private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha";
private static final String BETA_SOURCE_KEY = TESTING_KEY + "_beta";
private static final String ALPHA_DEFAULT_SOURCE_KEY = TESTING_KEY + "_alpha_default";
private static final String ALPHA_BETA_GAMMA_SOURCE_KEY = TESTING_KEY + "_alpha_beta_gamma";
private static final String DEFAULT_SOURCE_KEY = TESTING_KEY + "_default";
private static final String KNOWN_CONDITIONAL_KEY = "value";
private static final String ALPHA_VALUE = "a";
private static final String BETA_VALUE = "b";
private static final String GAMMA_VALUE = "g";
private static final String DEFAULT_VALUE = "z";
private DataPrepperTestRunner dataPrepperTestRunner;
private InMemorySourceAccessor inMemorySourceAccessor;
private InMemorySinkAccessor inMemorySinkAccessor;

@BeforeEach
void setUp() {
dataPrepperTestRunner = DataPrepperTestRunner.builder()
.withPipelinesDirectoryOrFile("route/three-route-with-default-route.yaml")
.build();

dataPrepperTestRunner.start();
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
}

@AfterEach
void tearDown() {
dataPrepperTestRunner.stop();
}

@Test
void test_default_route() {
final List<Record<Event>> alphaEvents = createEvents(ALPHA_VALUE, 10);
final List<Record<Event>> betaEvents = createEvents(BETA_VALUE, 20);
final List<Record<Event>> gammaEvents = createEvents(GAMMA_VALUE, 20);
final List<Record<Event>> defaultEvents = createEvents(DEFAULT_VALUE, 20);

final List<Record<Event>> allEvents = new ArrayList<>(alphaEvents);
allEvents.addAll(betaEvents);
allEvents.addAll(gammaEvents);
allEvents.addAll(defaultEvents);
Collections.shuffle(allEvents);

inMemorySourceAccessor.submit(TESTING_KEY, allEvents);

await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(inMemorySinkAccessor.get(ALPHA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(BETA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALL_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALPHA_DEFAULT_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(ALPHA_BETA_GAMMA_SOURCE_KEY), not(empty()));
assertThat(inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY), not(empty()));
});

final List<Record<Event>> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY);

assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size()));

assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream()
.filter(alphaEvents::contains).toArray()));

final List<Record<Event>> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY);

assertThat(actualBetaRecords.size(), equalTo(betaEvents.size()));

assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream()
.filter(betaEvents::contains).toArray()));

final List<Record<Event>> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY);

assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size()));
assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(defaultEvents::contains).toArray()));

final List<Record<Event>> actualAlphaDefaultRecords = new ArrayList<>();
actualAlphaDefaultRecords.addAll(actualAlphaRecords);
actualAlphaDefaultRecords.addAll(actualDefaultRecords);
assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size()));
assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream()
.filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray()));

}

private List<Record<Event>> createEvents(final String value, final int numberToCreate) {
return IntStream.range(0, numberToCreate)
.mapToObj(i -> Map.of(KNOWN_CONDITIONAL_KEY, value, "arbitrary_field", UUID.randomUUID().toString()))
.map(map -> JacksonEvent.builder().withData(map).withEventType("TEST").build())
.map(jacksonEvent -> (Event) jacksonEvent)
.map(Record::new)
.collect(Collectors.toList());
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
routing-pipeline:
workers: 4
delay: 10
source:
in_memory:
testing_key: ConditionalRoutingIT
buffer:
bounded_blocking:
# Use a small batch size to help ensure that multiple threads
# are picking up the different routes.
batch_size: 10
route:
- alpha: '/value == "a"'
- beta: '/value == "b"'
- gamma: '/value == "g"'
sink:
- in_memory:
testing_key: ConditionalRoutingIT_alpha
routes:
- alpha
- in_memory:
testing_key: ConditionalRoutingIT_beta
routes:
- beta
- in_memory:
testing_key: ConditionalRoutingIT_alpha_default
routes:
- alpha
- _default
- in_memory:
testing_key: ConditionalRoutingIT_alpha_beta_gamma
routes:
- alpha
- beta
- gamma
- in_memory:
testing_key: ConditionalRoutingIT_default
routes:
- _default
- in_memory:
testing_key: ConditionalRoutingIT_all
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@ public static List<Processor> decorateProcessors(
"Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId);
}

final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, firstInnerProcessor, pluginId, identificationKeys, pipelineWorkerThreads);

return processors.stream()
.map(processor -> {
PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads);
return new PeerForwardingProcessorDecorator(peerForwarder, processor);
})
.collect(Collectors.toList());
return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor))
.collect(Collectors.toList());
}

private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* intended to help break apart {@link Router} for better testing.
*/
class DataFlowComponentRouter {
static final String DEFAULT_ROUTE = "_default";
<C> void route(final Collection<Record> allRecords,
final DataFlowComponent<C> dataFlowComponent,
final Map<Record, Set<String>> recordsToRoutes,
Expand All @@ -37,7 +38,9 @@ <C> void route(final Collection<Record> allRecords,
final Set<String> routesForEvent = recordsToRoutes
.getOrDefault(record, Collections.emptySet());

if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) {
if (routesForEvent.size() == 0 && dataFlowComponentRoutes.contains(DEFAULT_ROUTE)) {
recordsForComponent.add(getRecordStrategy.getRecord(record));
} else if (routesForEvent.stream().anyMatch(dataFlowComponentRoutes::contains)) {
recordsForComponent.add(getRecordStrategy.getRecord(record));
}
}
Expand Down
Loading

0 comments on commit d29fac8

Please sign in to comment.