From 64d2b94548a805b0e3fb20dceec70094a8dd2e96 Mon Sep 17 00:00:00 2001 From: Isaak Krut Date: Tue, 16 Apr 2024 08:08:06 -0400 Subject: [PATCH] test(#2374): provide a framework to easily unit test processing elements (#2730) * #2374 refactored unit tests to use ProcessingElementTestExecutor * #2374 cleanup * #2374 cleanup * #2374 updated test executor to non-static methods * #2374 updated test executor to non-static methods --- .../TestBooleanFilterProcessor.java | 122 ++++---- .../compose/TestComposeProcessor.java | 197 +++++-------- .../merge/TestMergeByTimeProcessor.java | 256 ++++++++-------- .../NumericalFilterProcessorTest.java | 15 +- .../ProcessingElementTestExecutor.java | 88 ++++-- ...stSwingingDoorTrendingFilterProcessor.java | 277 ++++++++++-------- .../textfilter/TestTextFilterProcessor.java | 190 ++++++------ 7 files changed, 559 insertions(+), 586 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java index 3f473f6f61..b195a3e241 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java @@ -18,106 +18,82 @@ package org.apache.streampipes.processors.filters.jvm.processor.booleanfilter; -import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; -import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.model.runtime.EventFactory; -import org.apache.streampipes.model.runtime.SchemaInfo; -import org.apache.streampipes.model.runtime.SourceInfo; -import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.test.extensions.api.StoreEventCollector; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; +import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor; + + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Stream; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class TestBooleanFilterProcessor { - private static final String STREAM_PREFIX = "s0::"; - - private static final Logger LOG = LoggerFactory.getLogger(TestBooleanFilterProcessor.class); - static Stream data() { - return Stream.of( - Arguments.of("True", Arrays.asList(true, true, false, true, false, true, false, true), 5), - Arguments.of("True", Arrays.asList(true, true, true), 3), - Arguments.of("True", Arrays.asList(false, false, false), 0), - Arguments.of("True", Collections.emptyList(), 0), - Arguments.of("False", Arrays.asList(true, false, true, false, true, false, true, false, true), 4), - Arguments.of("False", Arrays.asList(true, true, true), 0), - Arguments.of("False", Arrays.asList(false, false, false), 3), - Arguments.of("False", Collections.emptyList(), 0) - ); + BooleanFilterProcessor processor; + private static final String FIELD_NAME = "Test"; + private static final String FIELD_NAME_WITH_PREFIX = "s0::" + FIELD_NAME; + @BeforeEach + public void setup(){ + processor = new BooleanFilterProcessor(); } - @ParameterizedTest @MethodSource("data") - public void testBoolenFilter( + public void test( String boolToKeep, List eventBooleans, - int expectedFilteredBooleanCount + List outputEventBooleans ) { - var fieldName = "Test"; - var processorParams = mock(ProcessorParams.class); - var eventProcessorRuntimeContext = mock(EventProcessorRuntimeContext.class); + Map userConfiguration = + Map.of( + BooleanFilterProcessor.BOOLEAN_MAPPING, FIELD_NAME_WITH_PREFIX, + BooleanFilterProcessor.VALUE, boolToKeep + ); - var processor = new BooleanFilterProcessor(); - var extractor = mock(ProcessingElementParameterExtractor.class); - when(processorParams.extractor()).thenReturn(extractor); - when(extractor.mappingPropertyValue(BooleanFilterProcessor.BOOLEAN_MAPPING)).thenReturn(STREAM_PREFIX + fieldName); - when(extractor.selectedSingleValue(BooleanFilterProcessor.VALUE, String.class)).thenReturn(boolToKeep); + List> events = new ArrayList<>(); + eventBooleans.forEach(bool->events.add(Map.of(FIELD_NAME_WITH_PREFIX, bool))); - var collector = new StoreEventCollector(); - processor.onInvocation(processorParams, collector, eventProcessorRuntimeContext); + List> outputEvents = new ArrayList<>(); + outputEventBooleans.forEach(bool->outputEvents.add(Map.of(FIELD_NAME, bool))); - sendEvents(processor, collector, eventBooleans, fieldName); + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); - assertEquals(expectedFilteredBooleanCount, collector.getEvents().size()); + testExecutor.run(events, outputEvents); } - private void sendEvents( - BooleanFilterProcessor processor, - StoreEventCollector collector, - List eventBooleans, - String fieldName) { - List events = makeEvents(eventBooleans, fieldName); - for (Event event : events) { - LOG.info("Sending event with value " - + event.getFieldBySelector(STREAM_PREFIX + fieldName) - .getAsPrimitive() - .getAsBoolean()); - processor.onEvent(event, collector); - - } - } - - private List makeEvents(List eventBooleans, String fieldName) { - List events = new ArrayList<>(); - for (Boolean eventSetting : eventBooleans) { - events.add(makeEvent(eventSetting, fieldName)); - } - return events; - } - - private Event makeEvent(Boolean value, String fieldName) { - Map map = new HashMap<>(); - map.put(fieldName, value); - return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "s0"), - new SchemaInfo(null, new ArrayList<>()) + static Stream data() { + return Stream.of( + Arguments.of("True", + Arrays.asList(true, true, false, true, false, true, false, true), + Arrays.asList(true, true, true, true, true)), + Arguments.of("True", + Arrays.asList(true, true, true), + Arrays.asList(true, true, true)), + Arguments.of("True", + Arrays.asList(false, false, false), + Collections.emptyList()), + Arguments.of("True", + Collections.emptyList(), + Collections.emptyList()), + Arguments.of("False", + Arrays.asList(true, false, true, false, true, false, true, false, true), + Arrays.asList(false, false, false, false)), + Arguments.of("False", + Arrays.asList(true, true, true), + Collections.emptyList()), + Arguments.of("False", + Arrays.asList(false, false, false), + Arrays.asList(false, false, false)), + Arguments.of("False", + Collections.emptyList(), + Collections.emptyList()) ); } } diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java index fd56bb23ee..7332cd7911 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java @@ -18,124 +18,87 @@ package org.apache.streampipes.processors.filters.jvm.processor.compose; -//@RunWith(Parameterized.class) +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.model.output.OutputStrategy; +import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; + public class TestComposeProcessor { -// -// private static final Logger LOG = LoggerFactory.getLogger(TestComposeProcessor.class); -// -// @org.junit.runners.Parameterized.Parameter -// public String testName; -// -// @org.junit.runners.Parameterized.Parameter(1) -// public List> eventMaps; -// -// @org.junit.runners.Parameterized.Parameter(2) -// public List selectorPrefixes; -// -// @org.junit.runners.Parameterized.Parameter(3) -// public int expectedNumOfEvents; -// -// @org.junit.runners.Parameterized.Parameter(4) -// public int expectedEventSize; -// -// private static final String outputKeySelector1 = "key-selector1"; -// private static final String outputKeySelector2 = "key-selector2"; -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// Map mapWithFirstOutputSelector = new HashMap<>(); -// mapWithFirstOutputSelector.put(outputKeySelector1, new Object()); -// -// Map mapWithSecondOutputSelector = new HashMap<>(); -// mapWithSecondOutputSelector.put(outputKeySelector2, new Object()); -// -// Map mapWithInvalidOutputSelector = new HashMap<>(); -// mapWithInvalidOutputSelector.put("invalid-selector", new Object()); -// -// List> singleMap = new ArrayList<>(); -// singleMap.add(mapWithFirstOutputSelector); -// -// List> twoMapsMatching = new ArrayList<>(); -// twoMapsMatching.add(mapWithFirstOutputSelector); -// twoMapsMatching.add(mapWithSecondOutputSelector); -// -// List> twoMapsOneMatching = new ArrayList<>(); -// twoMapsOneMatching.add(mapWithFirstOutputSelector); -// twoMapsOneMatching.add(mapWithInvalidOutputSelector); -// -// List> twoMapsNoneMatching = new ArrayList<>(); -// twoMapsNoneMatching.add(mapWithInvalidOutputSelector); -// twoMapsNoneMatching.add(new HashMap<>(mapWithInvalidOutputSelector)); -// -// return Arrays.asList(new Object[][]{ -// {"testWithOneEvent", singleMap, List.of("s0"), 0, 0}, -// {"testWithTwoEventsSamePrefix", twoMapsMatching, List.of("s0", "s0"), 0, 0}, -// {"testWithTwoEvents", twoMapsMatching, List.of("s0", "s1"), 1, 2}, -// {"testWithTwoEventsAnd1InvalidSelector", twoMapsOneMatching, List.of("s0", "s1"), 1, 1}, -// {"testWithTwoEventsWithInvalidSelectors", twoMapsNoneMatching, List.of("s0", "s1"), 1, 0} -// }); -// } -// -// -// -// @Test -// public void testComposeProcessor() { -// LOG.info("Executing test: {}", testName); -// var processor = new ComposeProcessor(); -// var originalGraph = processor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// var graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// List outputStrategies = new ArrayList<>(); -// outputStrategies.add(new CustomOutputStrategy(List.of("s0::" + outputKeySelector1, "s1::" + outputKeySelector2))); -// graph.setOutputStrategies(outputStrategies); -// var params = new ProcessorParams(graph); -// -// var eventCollector = new StoreEventCollector(); -// processor.onInvocation(params, eventCollector, null); -// -// List collectedEvents = sendEvents(processor, eventCollector); -// -// LOG.info("Expected collected event count is: {}", expectedNumOfEvents); -// LOG.info("Actual collected event count is: {}", collectedEvents.size()); -// assertEquals(expectedNumOfEvents, collectedEvents.size()); -// -// if (!collectedEvents.isEmpty()){ -// int eventSize = collectedEvents.get(0).getFields().size(); -// -// LOG.info("Expected event size is: {}", expectedEventSize); -// LOG.info("Actual event size is: {}", eventSize); -// assertEquals(expectedEventSize, eventSize); -// } -// } -// -// private List sendEvents(ComposeProcessor processor, StoreEventCollector collector) { -// List events = makeEvents(); -// for (Event event : events) { -// LOG.info("Sending event with map: " + event.getFields() -// + ", and prefix selector: " + event.getSourceInfo().getSelectorPrefix()); -// processor.onEvent(event, collector); -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } -// return collector.getEvents(); -// } -// private List makeEvents() { -// List events = new ArrayList<>(); -// for (int i = 0; i < eventMaps.size(); i++) { -// events.add(makeEvent(eventMaps.get(i), selectorPrefixes.get(i))); -// } -// return events; -// } -// -// private Event makeEvent(Map eventMap, String selectorPrefix) { -// return EventFactory.fromMap(eventMap, new SourceInfo("test" + "-topic", selectorPrefix), -// new SchemaInfo(null, new ArrayList<>())); -// } -} + ComposeProcessor processor; + private static final String SELECTOR_1 = "key-selector1"; + private static final String SELECTOR_2 = "key-selector2"; + private static final String INVALID_SELECTOR = "invalid-selector"; + private static final String S0_PREFIX = "s0::"; + private static final String S1_PREFIX = "s1::"; + + @BeforeEach + public void setup(){ + processor = new ComposeProcessor(); + } + @ParameterizedTest + @MethodSource("data") + public void test(List> events, + List> outputEvents) { + Consumer invocationConfig = (invocation->{ + List outputStrategies = new ArrayList<>(); + outputStrategies.add(new CustomOutputStrategy(List.of(S0_PREFIX + SELECTOR_1, S1_PREFIX + SELECTOR_2))); + invocation.setOutputStrategies(outputStrategies); + }); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, invocationConfig); + + testExecutor.run(events, outputEvents); + } + static Stream data() { + + var object1 = new Object(); + var object2 = new Object(); + + return Stream.of( + Arguments.of( + List.of( + Map.of(S0_PREFIX + SELECTOR_1, object1)), + List.of()), + Arguments.of( + List.of( + Map.of(S0_PREFIX + SELECTOR_1, object1), + Map.of(S0_PREFIX + SELECTOR_2, object2)), + List.of()), + Arguments.of( + List.of( + Map.of(S0_PREFIX + SELECTOR_1, object1), + Map.of(S1_PREFIX + SELECTOR_2, object2)), + List.of( + Map.of(SELECTOR_1, object1, SELECTOR_2, object2) + )), + Arguments.of( + List.of( + Map.of(S0_PREFIX + SELECTOR_1, object1), + Map.of(S1_PREFIX + INVALID_SELECTOR, object2)), + List.of( + Map.of(SELECTOR_1, object1) + )), + Arguments.of( + List.of( + Map.of(S0_PREFIX + INVALID_SELECTOR, object1), + Map.of(S1_PREFIX + INVALID_SELECTOR, object2)), + List.of( + Map.of() + )) + ); + } +} diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java index 2c4334fb00..b8a952a2ac 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java @@ -17,145 +17,121 @@ */ package org.apache.streampipes.processors.filters.jvm.processor.merge; -//@RunWith(Parameterized.class) +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; + + + public class TestMergeByTimeProcessor { -// -// private static final Logger LOG = LoggerFactory.getLogger(TestMergeByTimeProcessor.class); -// -// private static final Integer timeInterval = 100; -// @org.junit.runners.Parameterized.Parameter -// public String testName; -// @org.junit.runners.Parameterized.Parameter(1) -// public List eventStrings; -// @org.junit.runners.Parameterized.Parameter(2) -// public List expectedValue; -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// return Arrays.asList(new Object[][]{ -// {"testWithInInterval", Arrays.asList("s0:0", "s1:90"), List.of("(90,0)")}, -// {"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), List.of()}, -// {"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), -// List.of("(80,0)")}, -// {"testFigGvnInDocs", -// Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", -// "s1:510"), -// Arrays.asList("(0,10)", "(115,110)", "(510,500)")} -// }); -// } -// -// @Test -// public void testMergeByTimeProcessor() { -// MergeByTimeProcessor mergeByTimeProcessor = new MergeByTimeProcessor(); -// DataProcessorDescription originalGraph = mergeByTimeProcessor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// graph.setInputStreams(Arrays.asList( -// EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")), -// EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1")) -// )); -// -// graph.setOutputStream( -// EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")) -// ); -// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() -// .setActualTopicName("output-topic"); -// -// List outputKeySelectors = graph.getOutputStrategies() -// .stream() -// .filter(CustomOutputStrategy.class::isInstance) -// .map(o -> (CustomOutputStrategy) o) -// .findFirst() -// .map(CustomOutputStrategy::getSelectedPropertyKeys) -// .orElse(new ArrayList<>()); -// outputKeySelectors.add("s0::timestamp_mapping_stream_1"); -// outputKeySelectors.add("s1::timestamp_mapping_stream_2"); -// -// List mappingPropertyUnaries = graph.getStaticProperties() -// .stream() -// .filter(p -> p instanceof MappingPropertyUnary) -// .map((p -> (MappingPropertyUnary) p)) -// .filter(p -> Arrays.asList( -// MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, -// MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY) -// .contains(p.getInternalName())) -// .collect(Collectors.toList()); -// -// assert mappingPropertyUnaries.size() == 2; -// mappingPropertyUnaries.get(0) -// .setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); -// mappingPropertyUnaries.get(1) -// .setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); -// -// FreeTextStaticProperty fsp = graph.getStaticProperties().stream() -// .filter(p -> p instanceof FreeTextStaticProperty) -// .map((p -> (FreeTextStaticProperty) p)) -// .filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL)) -// .findFirst().orElse(null); -// assert fsp != null; -// fsp.setValue(String.valueOf(timeInterval)); -// -// ProcessorParams params = new ProcessorParams(graph); -// -// StoreEventCollector collector = new StoreEventCollector(); -// -// mergeByTimeProcessor.onInvocation(params, collector, null); -// sendEvents(mergeByTimeProcessor, collector); -// -// List actualCollectedEvents = collector.getEvents().stream() -// .map(e -> formatMergedEvent(e)) -// .collect(Collectors.toList()); -// -// LOG.info("Expected merged event is {}", expectedValue); -// LOG.info("Actual merged event is {}", actualCollectedEvents); -// assertTrue(eventsEquals(expectedValue, actualCollectedEvents)); -// } -// -// private boolean eventsEquals(List expectedValue, List actualCollectedEvents) { -// if (expectedValue.size() != actualCollectedEvents.size()) { -// return false; -// } -// for (int i = 0; i < expectedValue.size(); i++) { -// if (!expectedValue.get(i).equalsIgnoreCase(actualCollectedEvents.get(i))) { -// return false; -// } -// } -// return true; -// } -// -// private String formatMergedEvent(Event mergedEvent) { -// return String.format("(%s)", mergedEvent.getFields().values().stream() -// .map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(","))); -// } -// -// private void sendEvents(MergeByTimeProcessor mergeByTimeProcessor, StoreEventCollector spOut) { -// List events = makeEvents(); -// for (Event event : events) { -// mergeByTimeProcessor.onEvent(event, spOut); -// } -// -// } -// -// private List makeEvents() { -// List events = Lists.newArrayList(); -// for (String eventString : eventStrings) { -// events.add(makeEvent(eventString)); -// } -// return events; -// } -// -// private Event makeEvent(String eventString) { -// Map map = Maps.newHashMap(); -// String streamId = eventString.split(":")[0]; -// String timestamp = eventString.split(":")[1]; -// if (streamId.equals("s0")) { -// map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, timestamp); -// } else { -// map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, timestamp); -// } -// return EventFactory.fromMap(map, -// new SourceInfo("test", streamId), -// new SchemaInfo(null, Lists.newArrayList())); -// } + + private static final String S0_PREFIX = "s0::"; + private static final String S1_PREFIX = "s1::"; + private static final Integer timeInterval = 100; + + MergeByTimeProcessor processor; + + @BeforeEach + public void setup(){ + processor = new MergeByTimeProcessor(); + } + @ParameterizedTest + @MethodSource("data") + public void test(List> events, + List> outputEvents){ + + + Map userConfiguration = + Map.of( + MergeByTimeProcessor.TIME_INTERVAL, timeInterval, + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, + S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, + S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY + ); + + Consumer invocationConfig = (invocation->{ + List outputKeySelectors = invocation.getOutputStrategies() + .stream() + .filter(CustomOutputStrategy.class::isInstance) + .map(o -> (CustomOutputStrategy) o) + .findFirst() + .map(CustomOutputStrategy::getSelectedPropertyKeys) + .orElse(new ArrayList<>()); + outputKeySelectors.add(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); + outputKeySelectors.add(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); + }); + + ProcessingElementTestExecutor testExecutor = + new ProcessingElementTestExecutor(processor, userConfiguration, invocationConfig); + + testExecutor.run(events, outputEvents); + } + + static Stream data() { + return Stream.of( + Arguments.of(List.of( + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"), + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "90") + ), + List.of( + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "90", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0" + ) + )), + Arguments.of(List.of( + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"), + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "110") + ), List.of()), + Arguments.of(List.of( + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"), + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "80"), + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"), + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "500") + ), + List.of( + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "80", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0" + ) + )), + Arguments.of(List.of( + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "0"), + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "10"), + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"), + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "115"), + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "120"), + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "230"), + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "340"), + Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "500"), + Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "510") + ), + List.of( + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "0", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "10" + ), + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "115", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110" + ), + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "510", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "500" + ) + )) + ); + } } \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java index 6846f00745..d18e2f1d76 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java @@ -18,19 +18,19 @@ package org.apache.streampipes.processors.filters.jvm.processor.numericalfilter; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; public class NumericalFilterProcessorTest { - private static final String PROPERTY_NAME = "proptertyName"; + private static final String PROPERTY_NAME = "propertyName"; private NumericalFilterProcessor processor; - @Before + @BeforeEach public void setup() { processor = new NumericalFilterProcessor(); } @@ -53,8 +53,9 @@ public void testLowerThenOperatorFilterNotApplied() { Map.of(PROPERTY_NAME, 1.0f) ); + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); - ProcessingElementTestExecutor.run(processor, userConfiguration, inputEvents, outputEvents); + testExecutor.run(inputEvents, outputEvents); } @Test @@ -73,9 +74,9 @@ public void testLowerThenOperatorFilterApplied() { List> outputEvents = List.of(); + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); - ProcessingElementTestExecutor.run(processor, userConfiguration, inputEvents, outputEvents); + testExecutor.run(inputEvents, outputEvents); } - } \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/ProcessingElementTestExecutor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/ProcessingElementTestExecutor.java index 3426b674d0..a6ea97ed6c 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/ProcessingElementTestExecutor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/ProcessingElementTestExecutor.java @@ -30,6 +30,7 @@ import org.apache.streampipes.model.template.PipelineElementTemplate; import org.apache.streampipes.model.template.PipelineElementTemplateConfig; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; +import org.apache.streampipes.test.generator.EventStreamGenerator; import org.mockito.ArgumentCaptor; @@ -37,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; @@ -47,27 +49,53 @@ public class ProcessingElementTestExecutor { + private final IStreamPipesDataProcessor processor; + private final Map userConfiguration; + private Consumer invocationConfig; + + public ProcessingElementTestExecutor(IStreamPipesDataProcessor processor, Map userConfiguration, + Consumer invocationConfig) { + this.processor = processor; + this.userConfiguration = userConfiguration; + this.invocationConfig = invocationConfig; + } + + public ProcessingElementTestExecutor(IStreamPipesDataProcessor processor, Map userConfiguration) { + this.processor = processor; + this.userConfiguration = userConfiguration; + } + + public ProcessingElementTestExecutor(IStreamPipesDataProcessor processor, + Consumer invocationConfig) { + this.processor = processor; + this.userConfiguration = new HashMap<>(); + this.invocationConfig = invocationConfig; + } + /** * This method is used to run a data processor with a given configuration and a list of input events. * It then verifies the output events against the expected output events. * - * @param processor The data processor under test. - * @param userConfiguration The user input configuration for the processor. * @param inputEvents The list of input events to be processed. * @param expectedOutputEvents The list of expected output events. */ - public static void run( - IStreamPipesDataProcessor processor, - Map userConfiguration, + public void run( List> inputEvents, List> expectedOutputEvents ) { + // initialize the extractor with the provided configuration of the user input - var e = getProcessingElementParameterExtractor(processor, userConfiguration); + var dataProcessorInvocation = getProcessorInvocation(); + if (invocationConfig != null){ + invocationConfig.accept(dataProcessorInvocation); + } + + var e = getProcessingElementParameterExtractor(dataProcessorInvocation); var mockParams = mock(IDataProcessorParameters.class); - when(mockParams.extractor()).thenReturn(e); + when(mockParams.getModel()).thenReturn(dataProcessorInvocation); + when(mockParams.extractor()).thenReturn(e); // calls the onPipelineStarted method of the processor to initialize it processor.onPipelineStarted(mockParams, null, null); @@ -92,25 +120,18 @@ public static void run( .getRaw() )); - // vlaidate that the processor is stopped correctly + // validate that the processor is stopped correctly processor.onPipelineStopped(); } private static ProcessingElementParameterExtractor getProcessingElementParameterExtractor( - IStreamPipesDataProcessor processor, - Map userConfiguration + DataProcessorInvocation dataProcessorInvocation ) { - - var dataProcessorInvocation = getProcessorInvocation(processor, userConfiguration); - return ProcessingElementParameterExtractor.from(dataProcessorInvocation); } - private static DataProcessorInvocation getProcessorInvocation( - IStreamPipesDataProcessor processor, - Map userConfiguration - ) { - var pipelineElementTemplate = getPipelineElementTemplate(processor, userConfiguration); + private DataProcessorInvocation getProcessorInvocation() { + var pipelineElementTemplate = getPipelineElementTemplate(); var invocation = new DataProcessorInvocation( processor @@ -118,6 +139,9 @@ private static DataProcessorInvocation getProcessorInvocation( .getDescription() ); + invocation.setOutputStream(EventStreamGenerator.makeEmptyStream()); + + return new DataProcessorTemplateHandler( pipelineElementTemplate, invocation, @@ -126,10 +150,7 @@ private static DataProcessorInvocation getProcessorInvocation( .applyTemplateOnPipelineElement(); } - private static PipelineElementTemplate getPipelineElementTemplate( - IStreamPipesDataProcessor processor, - Map userConfiguration - ) { + private PipelineElementTemplate getPipelineElementTemplate() { var staticProperties = processor .declareConfig() .getDescription() @@ -149,11 +170,26 @@ private static PipelineElementTemplate getPipelineElementTemplate( return new PipelineElementTemplate("name", "description", configs); } - private static Event getEvent(Map rawEvent) { - // TODO here we need a better solution to deal with the event schema and the selector prefix - var sourceInfo = new SourceInfo("", ""); + private Event getEvent(Map rawEvent) { + + // separate the prefix and remove it from the map + Map eventMap = new HashMap<>(rawEvent); + String selectorPrefix = eventMap.keySet().stream() + .filter(s->s.contains("::")) + .map(s->s.split("::")[0]) + .findFirst().orElse(""); + + for (var key : eventMap.keySet()){ + if (key.contains("::")){ + var value = rawEvent.get(key); + var newKey = key.split("::")[1]; + eventMap.remove(key); + eventMap.put(newKey, value); + } + } + var sourceInfo = new SourceInfo("", selectorPrefix); var schemaInfo = new SchemaInfo(null, new ArrayList<>()); - return EventFactory.fromMap(rawEvent, sourceInfo, schemaInfo); + return EventFactory.fromMap(eventMap, sourceInfo, schemaInfo); } } diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java index 3a5dbf2288..6730dca9af 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java @@ -18,133 +18,154 @@ package org.apache.streampipes.processors.filters.jvm.processor.sdt; -//@RunWith(Parameterized.class) +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + public class TestSwingingDoorTrendingFilterProcessor { -// -// private static final Logger LOG = LoggerFactory.getLogger(TestSwingingDoorTrendingFilterProcessor.class); -// -// @org.junit.runners.Parameterized.Parameter -// public String testName; -// @org.junit.runners.Parameterized.Parameter(1) -// public String sdtCompressionDeviation; -// @org.junit.runners.Parameterized.Parameter(2) -// public String sdtCompressionMinTimeInterval; -// @org.junit.runners.Parameterized.Parameter(3) -// public String sdtCompressionMaxTimeInterval; -// @org.junit.runners.Parameterized.Parameter(4) -// public int expectedFilteredCount; -// @org.junit.runners.Parameterized.Parameter(5) -// public List> eventSettings; -// @org.junit.runners.Parameterized.Parameter(6) -// public boolean expectException; -// @org.junit.runners.Parameterized.Parameter(7) -// public String expectedErrorMessage; -// -// private final String sdtTimestampField = "sdtTimestampField"; -// private final String sdtValueField = "sdtValueField"; -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// -// return Arrays.asList(new Object[][]{ -// {"testWithOneEvent", "10.0", "100", "500", 1, List.of(Pair.of(9, 50.0)), false, ""}, -// {"testFullFilter", "10.0", "100", "500", 4, List.of( -// Pair.of(0, 50.0), //true -// Pair.of(50, 50.0), //false -// Pair.of(200, 100.0), //false -// Pair.of(270, 140.0), //false -// Pair.of(300, 250.0), //true -// Pair.of(900, 500.0), //true -// Pair.of(1100, 800.0), //false -// Pair.of(1250, 1600.0) //true -// ) -// , false, ""}, -// {"testWithNegativeCompressionDeviation", "-10.0", "100", "500", 1, new ArrayList<>(), true -// , "Compression Deviation should be positive!"}, -// {"testWithNegativeMinInterval", "10.0", "-100", "500", 1, new ArrayList<>(), true -// , "Compression Minimum Time Interval should be >= 0!"}, -// {"testWithMinInterval>MaxInterval", "10.0", "1000", "500", 1, new ArrayList<>(), true -// , "Compression Minimum Time Interval should be < Compression Maximum Time Interval!"} -// }); -// } -// -// @Rule -// public ExpectedException exceptionRule = ExpectedException.none(); -// -// @Test -// public void testSdtFilter() { -// LOG.info("Executing test: {}", testName); -// SwingingDoorTrendingFilterProcessor processor = new SwingingDoorTrendingFilterProcessor(); -// DataProcessorDescription originalGraph = processor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = -// InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// -// ProcessorParams params = new ProcessorParams(graph); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY -// , MappingPropertyUnary.class) -// .setSelectedProperty("test::" + sdtTimestampField); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY -// , MappingPropertyUnary.class) -// .setSelectedProperty("test::" + sdtValueField); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY -// , FreeTextStaticProperty.class) -// .setValue(sdtCompressionDeviation); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY -// , FreeTextStaticProperty.class) -// .setValue(sdtCompressionMinTimeInterval); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY -// , FreeTextStaticProperty.class) -// .setValue(sdtCompressionMaxTimeInterval); -// -// if (expectException){ -// LOG.info("Expecting Error Message: {}", expectedErrorMessage); -// exceptionRule.expect(SpRuntimeException.class); -// exceptionRule.expectMessage(expectedErrorMessage); -// } -// -// StoreEventCollector eventCollector = new StoreEventCollector(); -// processor.onInvocation(params, eventCollector, null); -// -// int result = sendEvents(processor, eventCollector); -// -// LOG.info("Expected SDT filtered count is: {}", expectedFilteredCount); -// LOG.info("Actual SDT filtered count is: {}", result); -// assertEquals(expectedFilteredCount, result); -// } -// -// -// private int sendEvents(SwingingDoorTrendingFilterProcessor processor, StoreEventCollector collector) { -// List events = makeEvents(); -// for (Event event : events) { -// LOG.info("Sending event with timestamp: " -// + event.getFieldBySelector("test::" + sdtTimestampField).getAsPrimitive().getAsLong() -// + ", and value: " -// + event.getFieldBySelector("test::" + sdtValueField).getAsPrimitive().getAsFloat()); -// processor.onEvent(event, collector); -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } -// return collector.getEvents().size(); -// } -// -// private List makeEvents() { -// List events = new ArrayList<>(); -// for (Pair eventSetting: eventSettings) { -// events.add(makeEvent(eventSetting)); -// } -// return events; -// } -// -// private Event makeEvent(Pair eventSetting) { -// Map map = new HashMap<>(); -// map.put(sdtTimestampField, eventSetting.getKey()); -// map.put(sdtValueField, eventSetting.getValue()); -// return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "test"), -// new SchemaInfo(null, new ArrayList<>())); -// } + + private final String sdtTimestampField = "sdtTimestampField"; + private final String sdtValueField = "sdtValueField"; + + private SwingingDoorTrendingFilterProcessor processor; + + @BeforeEach + public void setup() { + processor = new SwingingDoorTrendingFilterProcessor(); + } + + @Test + public void test(){ + Map userConfiguration = Map.of( + SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, "::" + sdtTimestampField, + SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, "::" + sdtValueField, + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, "10.0", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, "100", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, "500" + ); + + List> inputEvents = List.of( + Map.of(sdtTimestampField, 0, + sdtValueField, 50.0), + Map.of(sdtTimestampField, 50, + sdtValueField, 50.0), + Map.of(sdtTimestampField, 200, + sdtValueField, 100.0), + Map.of(sdtTimestampField, 270, + sdtValueField, 140.0), + Map.of(sdtTimestampField, 300, + sdtValueField, 250.0), + Map.of(sdtTimestampField, 900, + sdtValueField, 500.0), + Map.of(sdtTimestampField, 1100, + sdtValueField, 800.0), + Map.of(sdtTimestampField, 1250, + sdtValueField, 1600.0) + ); + + List> outputEvents = List.of( + Map.of(sdtTimestampField, 0, + sdtValueField, 50.0), + Map.of(sdtTimestampField, 270, + sdtValueField, 140.0), + Map.of(sdtTimestampField, 900, + sdtValueField, 500.0), + Map.of(sdtTimestampField, 1100, + sdtValueField, 800.0) + ); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); + + testExecutor.run(inputEvents, outputEvents); + } + + @Test + public void testInvalidDeviationKey(){ + Map userConfiguration = Map.of( + SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, "::" + sdtTimestampField, + SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, "::" + sdtValueField, + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, "-10.0", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, "100", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, "500" + ); + + List> inputEvents = new ArrayList<>(); + + List> outputEvents = new ArrayList<>(); + + Exception expectedException = new SpRuntimeException("Compression Deviation should be positive!"); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); + + Exception exception = assertThrows(expectedException.getClass(), () -> { + testExecutor.run(inputEvents, outputEvents); + }); + String expectedMessage = expectedException.getMessage(); + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + + } + + @Test + public void testNegativeMinimumTime(){ + Map userConfiguration = Map.of( + SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, "::" + sdtTimestampField, + SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, "::" + sdtValueField, + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, "10.0", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, "-100", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, "500" + ); + + List> inputEvents = new ArrayList<>(); + + List> outputEvents = new ArrayList<>(); + + Exception expectedException = new SpRuntimeException("Compression Minimum Time Interval should be >= 0!"); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); + + Exception exception = assertThrows(expectedException.getClass(), () -> { + testExecutor.run(inputEvents, outputEvents); + }); + String expectedMessage = expectedException.getMessage(); + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } + + @Test + public void testInvalidTimeInterval(){ + Map userConfiguration = Map.of( + SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, "::" + sdtTimestampField, + SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, "::" + sdtValueField, + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, "10.0", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, "1000", + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, "500" + ); + + List> inputEvents = new ArrayList<>(); + + List> outputEvents = new ArrayList<>(); + + Exception expectedException = new + SpRuntimeException("Compression Minimum Time Interval should be < Compression Maximum Time Interval!"); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); + + Exception exception = assertThrows(expectedException.getClass(), () -> { + testExecutor.run(inputEvents, outputEvents); + }); + String expectedMessage = expectedException.getMessage(); + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } } \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java index 54dc91129a..5278eb28ae 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java @@ -18,101 +18,101 @@ package org.apache.streampipes.processors.filters.jvm.processor.textfilter; -//@RunWith(Parameterized.class) +import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + + public class TestTextFilterProcessor { -// private static final Logger LOG = LoggerFactory.getLogger(TestTextFilterProcessor.class); -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// return Arrays.asList(new Object[][] { -// {"TestLowerCaseMatch", "keyword", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 1}, -// {"TestUpperCaseMatch", "KEYWORD", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 1}, -// {"TestMixMatch", "KeyWord", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 1}, -// {"TestEmptyMatch", "keYWord", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 0}, -// {"TestMultipleMatch", "KeyWord", StringOperator.MATCHES, -// Arrays.asList("keyword", "KeyWord", "KEYWORD", "KeyWord"), 2}, -// -// {"TestContainsWord", "keyword", StringOperator.CONTAINS, -// Arrays.asList("text contains keyword", "text doesn't have word"), 1}, -// {"TestNotContainsWord", "keyword", StringOperator.CONTAINS, -// Arrays.asList("text is empty", "text doesn't have word"), 0}, -// }); -// } -// -// @org.junit.runners.Parameterized.Parameter -// public String selectedFieldName; -// -// @org.junit.runners.Parameterized.Parameter(1) -// public String keyword; -// -// @org.junit.runners.Parameterized.Parameter(2) -// public StringOperator stringOperator; -// -// @org.junit.runners.Parameterized.Parameter(3) -// public List eventStrings; -// -// @org.junit.runners.Parameterized.Parameter(4) -// public int expectedFilteredTextCount; -// -// @Test -// public void testTextFilter() { -// TextFilterProcessor textFilterProcessor = new TextFilterProcessor(); -// DataProcessorDescription originalGraph = textFilterProcessor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// graph.setInputStreams(Collections -// .singletonList(EventStreamGenerator -// .makeStreamWithProperties(Collections.singletonList(selectedFieldName)))); -// graph.setOutputStream(EventStreamGenerator -// .makeStreamWithProperties(Collections.singletonList(selectedFieldName))); -// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() -// .setActualTopicName("output-topic"); -// -// graph.getStaticProperties().stream() -// .filter(p -> p instanceof MappingPropertyUnary) -// .map(p -> (MappingPropertyUnary) p) -// .filter(p -> p.getInternalName().equals(TextFilterProcessor.MAPPING_PROPERTY_ID)) -// .findFirst().get().setSelectedProperty("s0::" + selectedFieldName); -// ProcessorParams params = new ProcessorParams(graph); -// params.extractor().getStaticPropertyByName(TextFilterProcessor.OPERATION_ID, OneOfStaticProperty.class) -// .getOptions() -// .stream().filter(o -> o.getName().equals(stringOperator.name())).findFirst().get().setSelected(true); -// params.extractor().getStaticPropertyByName(TextFilterProcessor.KEYWORD_ID, FreeTextStaticProperty.class) -// .setValue(keyword); -// StoreEventCollector collector = new StoreEventCollector(); -// -// textFilterProcessor.onInvocation(params, collector, null); -// int result = sendEvents(textFilterProcessor, collector); -// -// LOG.info("Expected filtered text count is {}", expectedFilteredTextCount); -// LOG.info("Actual filtered text count is {}", result); -// assertEquals(expectedFilteredTextCount, result); -// -// } -// -// private int sendEvents(TextFilterProcessor textFilterProcessor, StoreEventCollector collector) { -// List events = makeEvents(); -// for (Event e : events) { -// textFilterProcessor.onEvent(e, collector); -// } -// return collector.getEvents().size(); -// } -// -// private List makeEvents() { -// List events = new ArrayList<>(); -// for (String eventString : eventStrings) { -// events.add(makeEvent(eventString)); -// } -// return events; -// } -// -// private Event makeEvent(String eventString) { -// Map map = Maps.newHashMap(); -// map.put(selectedFieldName, eventString); -// return EventFactory.fromMap(map, -// new SourceInfo("test", "s0"), -// new SchemaInfo(null, Lists.newArrayList())); -// } + TextFilterProcessor processor; + + public static final String FIELD_NAME = "selectedField"; + public static final String FIELD_NAME_WITH_PREFIX = "s0::" + FIELD_NAME; + + @BeforeEach + public void setup(){ + processor = new TextFilterProcessor(); + } + + @ParameterizedTest + @MethodSource("data") + public void test(String keyword, + StringOperator stringOperator, + List eventValues, + List outputEventValues){ + + Map userConfiguration = + Map.of( + TextFilterProcessor.MAPPING_PROPERTY_ID, FIELD_NAME_WITH_PREFIX, + TextFilterProcessor.OPERATION_ID, stringOperator, + TextFilterProcessor.KEYWORD_ID, keyword + ); + + List> events = new ArrayList<>(); + eventValues.forEach(value->events.add(Map.of(FIELD_NAME_WITH_PREFIX, value))); + + List> outputEvents = new ArrayList<>(); + outputEventValues.forEach(value->outputEvents.add(Map.of(FIELD_NAME, value))); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration); + + testExecutor.run(events, outputEvents); + } + + + static Stream data() { + return Stream.of( + Arguments.of( + "keyword", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of("keyword") + ), + Arguments.of( + "KEYWORD", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of("KEYWORD") + ), + Arguments.of( + "KeyWord", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of("KeyWord") + ), + Arguments.of( + "keYWord", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of() + ), + Arguments.of( + "KeyWord", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD", "KeyWord"), + List.of("KeyWord", "KeyWord") + ), + Arguments.of( + "keyword", + StringOperator.CONTAINS, + List.of("text contains keyword", "text doesn't have word"), + List.of("text contains keyword") + ), + Arguments.of( + "keyword", + StringOperator.CONTAINS, + List.of("text is empty", "text doesn't have word"), + List.of() + ) + ); + } }