From 0fdd15d74af9ad7e13fb4908cc3c56e714abcd42 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Tue, 30 Apr 2024 22:32:04 +0200 Subject: [PATCH 1/7] feat: Support custom service tags and service selection for adapters (#2691) * feat: Support custom service tags and service selection for adapters * Remove empty scss file * Support service selection in runtime resolvable static properties * Fix checkstyle * add short comment to new ENV variable * feat: Add unit tests for CustomServiceTagResolver * add comment for clarification * add comment for clarification * Rename DeploymentConfiguration --------- Co-authored-by: bossenti Co-authored-by: Philipp Zehnder --- .../streampipes/commons/constants/Envs.java | 5 +- .../environment/DefaultEnvironment.java | 5 + .../commons/environment/Environment.java | 2 + .../management/AdapterMasterManagement.java | 21 ++- .../management/GuessManagement.java | 19 ++- .../management/WorkerRestClient.java | 8 +- .../management/WorkerUrlProvider.java | 42 ------ .../connect/management/util/WorkerPaths.java | 23 ---- .../connect/adapter/AdapterDescription.java | 15 +++ .../ExtensionDeploymentConfiguration.java | 50 +++++++ .../extensions/svcdiscovery/SpServiceTag.java | 22 ++++ .../svcdiscovery/SpServiceTagPrefix.java | 1 + .../model/runtime/RuntimeOptionsRequest.java | 11 ++ .../IExtensionsServiceEndpointGenerator.java | 45 +++++++ .../manager/assets/AssetFetcher.java | 2 +- .../ExtensionsServiceEndpointGenerator.java | 76 ++++++----- .../ExtensionsServiceEndpointProvider.java | 32 ----- .../execution/task/DiscoverEndpointsTask.java | 9 +- .../manager/health/PipelineHealthCheck.java | 2 +- .../CustomTransformOutputSchemaGenerator.java | 6 +- .../manager/preview/PipelinePreview.java | 9 +- .../ContainerProvidedOptionsHandler.java | 2 +- .../rest/impl/ServiceTagResource.java | 43 ++++++ .../impl/connect/DescriptionResource.java | 18 ++- .../connect/RuntimeResolvableResource.java | 18 ++- .../svcdiscovery/api/ISpServiceDiscovery.java | 9 +- .../svcdiscovery/SpServiceDiscoveryCore.java | 27 ++-- streampipes-service-extensions/pom.xml | 12 ++ .../extensions/CustomServiceTagResolver.java | 49 +++++++ .../StreamPipesExtensionsServiceBase.java | 2 + .../CustomServiceTagResolverTest.java | 68 ++++++++++ .../src/lib/apis/service-tags.service.ts | 46 +++++++ .../src/lib/model/gen/streampipes-model.ts | 33 ++++- .../platform-services/src/public-api.ts | 1 + ...adapter-deployment-settings.component.html | 78 +++++++++++ .../adapter-deployment-settings.component.ts | 122 ++++++++++++++++++ .../adapter-settings.component.html | 14 ++ .../configuration-group.component.html | 1 + .../configuration-group.component.ts | 7 +- ui/src/app/connect/connect.module.ts | 2 + .../static-alternatives.component.html | 1 + .../static-alternatives.component.ts | 6 +- .../static-collection.component.html | 1 + .../static-collection.component.ts | 6 +- .../static-group/static-group.component.html | 2 + .../static-group/static-group.component.ts | 10 +- .../static-property.component.html | 7 + .../static-property.component.ts | 4 + .../base-runtime-resolvable-input.ts | 6 + 49 files changed, 804 insertions(+), 196 deletions(-) delete mode 100644 streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java create mode 100644 streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java create mode 100644 streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java create mode 100644 streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java create mode 100644 ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts create mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html create mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 4c58485e27..5f8de15ef6 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -97,7 +97,10 @@ public enum Envs { SP_NATS_HOST("SP_NATS_HOST", "nats"), SP_NATS_PORT("SP_NATS_PORT", "4222"), - SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"); + SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"), + + // expects a comma separated string of service names + SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""); private final String envVariableName; private String defaultValue; diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index 5e613e0909..3fef8c55f7 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -274,4 +274,9 @@ public StringEnvironmentVariable getPulsarUrl() { return new StringEnvironmentVariable(Envs.SP_PULSAR_URL); } + @Override + public StringEnvironmentVariable getCustomServiceTags() { + return new StringEnvironmentVariable(Envs.SP_SERVICE_TAGS); + } + } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index 092e6992d6..ceb2d2d1f2 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -135,4 +135,6 @@ public interface Environment { StringEnvironmentVariable getPulsarUrl(); + StringEnvironmentVariable getCustomServiceTags(); + } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index ec87909021..56220fde37 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -23,7 +23,7 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics; import org.apache.streampipes.connect.management.util.GroundingUtils; -import org.apache.streampipes.connect.management.util.WorkerPaths; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsLogProvider; import org.apache.streampipes.manager.verification.DataStreamVerifier; import org.apache.streampipes.model.SpDataStream; @@ -32,12 +32,11 @@ import org.apache.streampipes.resource.management.AdapterResourceManager; import org.apache.streampipes.resource.management.DataStreamResourceManager; import org.apache.streampipes.storage.api.IAdapterStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URISyntaxException; import java.util.List; import java.util.NoSuchElementException; @@ -54,12 +53,12 @@ public class AdapterMasterManagement { private final DataStreamResourceManager dataStreamResourceManager; - public AdapterMasterManagement(IAdapterStorage adapterStorage, + public AdapterMasterManagement(IAdapterStorage adapterInstanceStorage, AdapterResourceManager adapterResourceManager, DataStreamResourceManager dataStreamResourceManager, AdapterMetrics adapterMetrics ) { - this.adapterInstanceStorage = adapterStorage; + this.adapterInstanceStorage = adapterInstanceStorage; this.adapterMetrics = adapterMetrics; this.adapterResourceManager = adapterResourceManager; this.dataStreamResourceManager = dataStreamResourceManager; @@ -164,7 +163,11 @@ public void startStreamAdapter(String elementId) throws AdapterException { try { // Find endpoint to start adapter on - var baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId()); + var baseUrl = new ExtensionsServiceEndpointGenerator().getEndpointBaseUrl( + ad.getAppId(), + SpServiceUrlProvider.ADAPTER, + ad.getDeploymentConfiguration().getDesiredServiceTags() + ); // Update selected endpoint URL of adapter ad.setSelectedEndpointUrl(baseUrl); @@ -177,7 +180,7 @@ public void startStreamAdapter(String elementId) throws AdapterException { adapterMetrics.register(ad.getElementId(), ad.getName()); LOG.info("Started adapter " + elementId + " on: " + baseUrl); - } catch (NoServiceEndpointsAvailableException | URISyntaxException e) { + } catch (NoServiceEndpointsAvailableException e) { throw new AdapterException("Could not start adapter due to unavailable service endpoint", e); } } @@ -192,8 +195,4 @@ private void installDataSource(SpDataStream stream, throw new AdapterException(); } } - - private IAdapterStorage getAdapterInstanceStorage() { - return StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(); - } } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java index 104bae82e0..472cb7eca8 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java @@ -24,11 +24,15 @@ import org.apache.streampipes.connect.management.AdapterEventPreviewPipeline; import org.apache.streampipes.connect.management.util.WorkerPaths; import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.AdapterEventPreview; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.serializers.json.JacksonSerializer; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -39,21 +43,25 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Set; public class GuessManagement { private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class); - private final WorkerUrlProvider workerUrlProvider; + private final IExtensionsServiceEndpointGenerator endpointGenerator; private final ObjectMapper objectMapper; public GuessManagement() { - this.workerUrlProvider = new WorkerUrlProvider(); + this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); this.objectMapper = JacksonSerializer.getObjectMapper(); } public GuessSchema guessSchema(AdapterDescription adapterDescription) throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException { - var workerUrl = getWorkerUrl(adapterDescription.getAppId()); + var workerUrl = getWorkerUrl( + adapterDescription.getAppId(), + adapterDescription.getDeploymentConfiguration().getDesiredServiceTags() + ); var description = objectMapper.writeValueAsString(adapterDescription); LOG.info("Guess schema at: " + workerUrl); @@ -72,8 +80,9 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription) } } - private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException { - var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId); + private String getWorkerUrl(String appId, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + var baseUrl = endpointGenerator.getEndpointBaseUrl(appId, SpServiceUrlProvider.ADAPTER, customServiceTags); return baseUrl + WorkerPaths.getGuessSchemaPath(); } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index c715459070..91558045e2 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -52,10 +52,10 @@ public class WorkerRestClient { private static final Logger LOG = LoggerFactory.getLogger(WorkerRestClient.class); - public static void invokeStreamAdapter(String endpointUrl, + public static void invokeStreamAdapter(String baseUrl, String elementId) throws AdapterException { var adapterStreamDescription = getAndDecryptAdapter(elementId); - var url = endpointUrl + WorkerPaths.getStreamInvokePath(); + var url = baseUrl + WorkerPaths.getStreamInvokePath(); startAdapter(url, adapterStreamDescription); updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true); @@ -127,11 +127,11 @@ private static HttpResponse triggerPost(String url, return request.execute().returnResponse(); } - public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, + public static RuntimeOptionsResponse getConfiguration(String baseUrl, String appId, RuntimeOptionsRequest runtimeOptionsRequest) throws AdapterException, SpConfigurationException { - String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId); + String url = baseUrl + WorkerPaths.getRuntimeResolvablePath(appId); try { String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest); diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java deleted file mode 100644 index b9fc970ab4..0000000000 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.connect.management.management; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -public class WorkerUrlProvider { - - - public WorkerUrlProvider() { - } - - public String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException { - return getEndpointGenerator(appId).getEndpointResourceUrl(); - } - - public String getWorkerBaseUrl(String appId) throws NoServiceEndpointsAvailableException { - return getEndpointGenerator(appId).getEndpointBaseUrl(); - } - - private ExtensionsServiceEndpointGenerator getEndpointGenerator(String appId) { - return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.ADAPTER); - } - -} diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java index 29d89ebd1b..7bd2b0c1ed 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java @@ -17,13 +17,6 @@ */ package org.apache.streampipes.connect.management.util; -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import java.net.URI; -import java.net.URISyntaxException; - public class WorkerPaths { private static final String WorkerMainPath = "/api/v1/worker"; @@ -36,14 +29,6 @@ public static String getStreamStopPath() { return WorkerMainPath + "/stream/stop"; } - public static String getSetInvokePath() { - return WorkerMainPath + "/set/invoke"; - } - - public static String getSetStopPath() { - return WorkerMainPath + "/set/stop"; - } - public static String getRunningAdaptersPath() { return WorkerMainPath + "/running"; } @@ -56,12 +41,4 @@ public static String getGuessSchemaPath() { return WorkerMainPath + "/guess/schema"; } - public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException { - SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER; - String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl(); - URI uri = new URI(endpointUrl); - return uri.getScheme() + "://" + uri.getAuthority(); - } - - } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java index 5fd6b21b3f..3d96406618 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java @@ -25,6 +25,7 @@ import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription; import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription; import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription; +import org.apache.streampipes.model.deployment.ExtensionDeploymentConfiguration; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.shared.annotation.TsModel; @@ -55,6 +56,8 @@ public class AdapterDescription extends VersionedNamedStreamPipesEntity { // Is used to store where the adapter is running to stop it private String selectedEndpointUrl; + private ExtensionDeploymentConfiguration deploymentConfiguration; + /** * This is used to identify all the service within the service group the adapter can be invoked in */ @@ -72,6 +75,7 @@ public AdapterDescription() { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); } public AdapterDescription(int version) { @@ -81,6 +85,7 @@ public AdapterDescription(int version) { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); this.setVersion(version); } @@ -89,6 +94,7 @@ public AdapterDescription(String elementId, String name, String description) { this.rules = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); } @@ -109,6 +115,7 @@ public AdapterDescription(AdapterDescription other) { this.dataStream = new SpDataStream(other.getDataStream()); } this.running = other.isRunning(); + this.deploymentConfiguration = other.getDeploymentConfiguration(); } public String getRev() { @@ -259,4 +266,12 @@ public boolean isRunning() { public void setRunning(boolean running) { this.running = running; } + + public ExtensionDeploymentConfiguration getDeploymentConfiguration() { + return deploymentConfiguration; + } + + public void setDeploymentConfiguration(ExtensionDeploymentConfiguration deploymentConfiguration) { + this.deploymentConfiguration = deploymentConfiguration; + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java new file mode 100644 index 0000000000..16b7c0753e --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.deployment; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; + +import java.util.HashSet; +import java.util.Set; + +public class ExtensionDeploymentConfiguration { + + private Set desiredServiceTags; + private String selectedEndpointUrl; + + public ExtensionDeploymentConfiguration() { + this.desiredServiceTags = new HashSet<>(); + } + + public Set getDesiredServiceTags() { + return desiredServiceTags; + } + + public void setDesiredServiceTags(Set desiredServiceTags) { + this.desiredServiceTags = desiredServiceTags; + } + + public String getSelectedEndpointUrl() { + return selectedEndpointUrl; + } + + public void setSelectedEndpointUrl(String selectedEndpointUrl) { + this.selectedEndpointUrl = selectedEndpointUrl; + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java index 71b7fe428e..572fc4a7f6 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java @@ -17,6 +17,11 @@ */ package org.apache.streampipes.model.extensions.svcdiscovery; +import org.apache.streampipes.model.shared.annotation.TsModel; + +import java.util.Objects; + +@TsModel public class SpServiceTag { private static final String COLON = ":"; @@ -56,4 +61,21 @@ public String getValue() { public void setValue(String value) { this.value = value; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpServiceTag that = (SpServiceTag) o; + return prefix == that.prefix && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(prefix, value); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java index f412bdbe6f..cea9c39da1 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java @@ -19,6 +19,7 @@ public enum SpServiceTagPrefix { SP_GROUP, + CUSTOM, // Is used for user-defined service tags provided via Envs.SP_SERVICE_TAGS ADAPTER, DATA_STREAM, DATA_PROCESSOR, diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java index 0f5efe2411..3e0417c8bb 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java @@ -18,6 +18,7 @@ package org.apache.streampipes.model.runtime; import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.deployment.ExtensionDeploymentConfiguration; import org.apache.streampipes.model.shared.annotation.TsModel; import org.apache.streampipes.model.staticproperty.StaticProperty; @@ -37,6 +38,8 @@ public class RuntimeOptionsRequest { protected List inputStreams; + protected ExtensionDeploymentConfiguration deploymentConfiguration; + private String belongsTo; public RuntimeOptionsRequest() { @@ -95,4 +98,12 @@ public String getBelongsTo() { public void setBelongsTo(String belongsTo) { this.belongsTo = belongsTo; } + + public ExtensionDeploymentConfiguration getDeploymentConfiguration() { + return deploymentConfiguration; + } + + public void setDeploymentConfiguration(ExtensionDeploymentConfiguration deploymentConfiguration) { + this.deploymentConfiguration = deploymentConfiguration; + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java new file mode 100644 index 0000000000..7414c7bd26 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.api.extensions; + +import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + +import java.util.Collections; +import java.util.Set; + +public interface IExtensionsServiceEndpointGenerator { + + String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException; + + default String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider) + throws NoServiceEndpointsAvailableException { + return getEndpointResourceUrl(appId, spServiceUrlProvider, Collections.emptySet()); + } + + String getEndpointBaseUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException; +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java index 553762018e..25c3c9accf 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java @@ -40,7 +40,7 @@ public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider, } public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException { - String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, spServiceUrlProvider).getEndpointResourceUrl(); + String endpointUrl = new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, spServiceUrlProvider); return Request .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX) .execute() diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java index cb2d28408a..ae1b8227c7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java @@ -18,7 +18,8 @@ package org.apache.streampipes.manager.execution.endpoint; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; @@ -26,59 +27,66 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.stream.Stream; -public class ExtensionsServiceEndpointGenerator { +public class ExtensionsServiceEndpointGenerator implements IExtensionsServiceEndpointGenerator { private static final Logger LOG = LoggerFactory.getLogger(ExtensionsServiceEndpointGenerator.class); - private final String appId; - private final SpServiceUrlProvider spServiceUrlProvider; - - public ExtensionsServiceEndpointGenerator(String appId, - SpServiceUrlProvider spServiceUrlProvider) { - this.appId = appId; - this.spServiceUrlProvider = spServiceUrlProvider; + public ExtensionsServiceEndpointGenerator() { } - public ExtensionsServiceEndpointGenerator(NamedStreamPipesEntity entity) { - this.appId = entity.getAppId(); - this.spServiceUrlProvider = ExtensionsServiceEndpointUtils.getPipelineElementType(entity); + public String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException { + return spServiceUrlProvider.getInvocationUrl(selectService(appId, spServiceUrlProvider, customServiceTags), appId); } - public String getEndpointResourceUrl() throws NoServiceEndpointsAvailableException { - return spServiceUrlProvider.getInvocationUrl(selectService(), appId); + public String getEndpointBaseUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + return selectService(appId, spServiceUrlProvider, customServiceTags); } - public String getEndpointBaseUrl() throws NoServiceEndpointsAvailableException { - return selectService(); + private String selectService(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + List serviceEndpoints = getServiceEndpoints(appId, spServiceUrlProvider, customServiceTags); + if (!serviceEndpoints.isEmpty()) { + return serviceEndpoints.get(0); + } else { + LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, + spServiceUrlProvider.getServiceTag(appId).asString()); + throw new NoServiceEndpointsAvailableException( + "Could not find any matching service endpoints - are all software components running?"); + } } - private List getServiceEndpoints() { + private List getServiceEndpoints(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) { return SpServiceDiscovery .getServiceDiscovery() .getServiceEndpoints( DefaultSpServiceTypes.EXT, true, - Collections - .singletonList( - this.spServiceUrlProvider - .getServiceTag(appId) - .asString() - ) + getDesiredServiceTags(appId, spServiceUrlProvider, customServiceTags) ); } - private String selectService() throws NoServiceEndpointsAvailableException { - List serviceEndpoints = getServiceEndpoints(); - if (!serviceEndpoints.isEmpty()) { - return getServiceEndpoints().get(0); - } else { - LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, - this.spServiceUrlProvider.getServiceTag(appId).asString()); - throw new NoServiceEndpointsAvailableException( - "Could not find any matching service endpoints - are all software components running?"); - } + private List getDesiredServiceTags(String appId, + SpServiceUrlProvider serviceUrlProvider, + Set customServiceTags) { + return Stream.concat( + Stream.of( + serviceUrlProvider.getServiceTag(appId) + ), + customServiceTags.stream() + ) + .map(SpServiceTag::asString) + .toList(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java deleted file mode 100644 index 6e76fedfe7..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.execution.endpoint; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; - -public class ExtensionsServiceEndpointProvider { - - public String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java index 896c704f04..40da134aa1 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java @@ -20,7 +20,8 @@ import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.manager.execution.PipelineExecutionInfo; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointProvider; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.Pipeline; @@ -69,7 +70,11 @@ private void applyEndpointAndPipeline(String pipelineId, private String findSelectedEndpoint(InvocableStreamPipesEntity pipelineElement) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(pipelineElement); + return new ExtensionsServiceEndpointGenerator() + .getEndpointResourceUrl( + pipelineElement.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement) + ); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java index 40216cc87b..84ab806c30 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java @@ -139,7 +139,7 @@ public void checkAndRestorePipelineElements() { private String findEndpointUrl(InvocableStreamPipesEntity graph) throws NoServiceEndpointsAvailableException { SpServiceUrlProvider serviceUrlProvider = ExtensionsServiceEndpointUtils.getPipelineElementType(graph); - return new ExtensionsServiceEndpointGenerator(graph.getAppId(), serviceUrlProvider).getEndpointResourceUrl(); + return new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(graph.getAppId(), serviceUrlProvider); } private boolean shouldRetry(String instanceId) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java index 936adabb49..77551acb4a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java @@ -64,8 +64,10 @@ public Tuple2 buildFromTwoStreams(Sp private EventSchema makeRequest() { try { String httpRequestBody = JacksonSerializer.getObjectMapper().writeValueAsString(dataProcessorInvocation); - String endpointUrl = new ExtensionsServiceEndpointGenerator(dataProcessorInvocation.getAppId(), - SpServiceUrlProvider.DATA_PROCESSOR).getEndpointResourceUrl(); + String endpointUrl = new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl( + dataProcessorInvocation.getAppId(), + SpServiceUrlProvider.DATA_PROCESSOR + ); Response httpResp = Request.Post(endpointUrl + "/output").bodyString(httpRequestBody, ContentType .APPLICATION_JSON).execute(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java index 1113c9a4c8..903ecbeb75 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java @@ -80,10 +80,11 @@ public String getPipelineElementPreview(String previewId, } private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); + return new ExtensionsServiceEndpointGenerator() + .getEndpointResourceUrl( + g.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(g) + ); } private void invokeGraphs(List graphs) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java index fd1957f5cc..16acbba678 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java @@ -55,6 +55,6 @@ private RuntimeOptionsResponse handleResponse(Response httpResp) throws JsonSynt private String getEndpointUrl(String appId) throws NoServiceEndpointsAvailableException { SpServiceUrlProvider provider = ExtensionsServiceEndpointUtils.getPipelineElementType(appId); - return new ExtensionsServiceEndpointGenerator(appId, provider).getEndpointResourceUrl() + "/configurations"; + return new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, provider) + "/configurations"; } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java new file mode 100644 index 0000000000..536933f950 --- /dev/null +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.impl; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; +import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; +import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Set; + +@RestController +@RequestMapping("/api/v2/service-tags") +public class ServiceTagResource extends AbstractAuthGuardedRestResource { + + private final ISpServiceDiscovery serviceDiscovery = SpServiceDiscovery.getServiceDiscovery(); + + @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE) + public Set getCustomServiceTags() { + return serviceDiscovery.getCustomServiceTags(true); + } +} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java index dd40dd8541..7d8f562034 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java @@ -22,8 +22,10 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.management.DescriptionManagement; -import org.apache.streampipes.connect.management.management.WorkerUrlProvider; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +45,11 @@ public class DescriptionResource extends AbstractAdapterResource { private static final Logger LOG = LoggerFactory.getLogger(DescriptionResource.class); - private final WorkerUrlProvider workerUrlProvider; + private final IExtensionsServiceEndpointGenerator endpointGenerator; public DescriptionResource() { super(DescriptionManagement::new); - workerUrlProvider = new WorkerUrlProvider(); + endpointGenerator = new ExtensionsServiceEndpointGenerator(); } @GetMapping(path = "/adapters", produces = MediaType.APPLICATION_JSON_VALUE) @@ -65,7 +67,7 @@ public ResponseEntity getAdapterAssets(@PathVariable("id") String id) { Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getAssets(workerUrl); } @@ -93,7 +95,7 @@ public ResponseEntity getAdapterIconAsset(@PathVariable("id") String id) { Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getIconAsset(workerUrl); } @@ -120,7 +122,7 @@ public ResponseEntity getAdapterDocumentationAsset(@PathVariable("id") String Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getDocumentationAsset(workerUrl); } @@ -148,4 +150,8 @@ public ResponseEntity deleteAdapter(@PathVariable("adapterId") String adapter return badRequest(e); } } + + private String getServiceResourceUrl(String appId) throws NoServiceEndpointsAvailableException { + return endpointGenerator.getEndpointResourceUrl(appId, SpServiceUrlProvider.ADAPTER); + } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index 6266439bb8..32cfb73044 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -24,12 +24,14 @@ import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; import org.apache.streampipes.connect.management.management.WorkerRestClient; -import org.apache.streampipes.connect.management.management.WorkerUrlProvider; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,17 +49,17 @@ public class RuntimeResolvableResource extends AbstractAdapterResource new WorkerAdministrationManagement( StorageDispatcher.INSTANCE.getNoSqlStore() - .getAdapterInstanceStorage(), + .getAdapterInstanceStorage(), AdapterMetricsManager.INSTANCE.getAdapterMetrics(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams() )); - this.workerUrlProvider = new WorkerUrlProvider(); + this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); } @PostMapping( @@ -68,8 +70,12 @@ public ResponseEntity fetchConfigurations(@PathVariable("id") String appId, @RequestBody RuntimeOptionsRequest runtimeOptionsRequest) { try { - String workerEndpoint = workerUrlProvider.getWorkerBaseUrl(appId); - RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, appId, runtimeOptionsRequest); + String baseUrl = endpointGenerator.getEndpointBaseUrl( + appId, + SpServiceUrlProvider.ADAPTER, + runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags() + ); + RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest); return ok(result); } catch (AdapterException e) { diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java index 4f4350f7d8..1b648cef8a 100644 --- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java +++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java @@ -17,16 +17,19 @@ */ package org.apache.streampipes.svcdiscovery.api; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; + import java.util.List; +import java.util.Set; public interface ISpServiceDiscovery { /** - * Get active pipeline element service endpoints + * Get custom service tags * - * @return list of pipeline element endpoints + * @return set of service tags */ - List getActivePipelineElementEndpoints(); + Set getCustomServiceTags(boolean restrictToHealthy); /** * Get service endpoints diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java index b887dbd976..19f1479483 100644 --- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java +++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java @@ -21,10 +21,10 @@ import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery; -import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +47,21 @@ public SpServiceDiscoveryCore() { } @Override - public List getActivePipelineElementEndpoints() { - LOG.info("Discovering active pipeline element service endpoints"); - return getServiceEndpoints(DefaultSpServiceTypes.EXT, - true, - List.of()); + public Set getCustomServiceTags(boolean restrictToHealthy) { + var activeServices = findServices(0); + return activeServices + .stream() + .filter(service -> !restrictToHealthy || service.getStatus() != SpServiceStatus.UNHEALTHY) + .flatMap(service -> service.getTags().stream()) + .filter(serviceTag -> serviceTag.getPrefix() == SpServiceTagPrefix.CUSTOM) + .collect(Collectors.toSet()); } @Override public List getServiceEndpoints(String serviceGroup, boolean restrictToHealthy, List filterByTags) { - List activeServices = findService(0); + List activeServices = findServices(0); return activeServices .stream() @@ -73,9 +76,9 @@ private String makeServiceUrl(SpServiceRegistration service) { return service.getServiceUrl(); } -/** -* Checks if all the tags specified in the filter are supported by the service. -*/ + /** + * Checks if all the tags specified in the filter are supported by the service. + */ private boolean allFiltersSupported(SpServiceRegistration service, List filterByTags) { if (filterByTags.isEmpty()) { @@ -88,7 +91,7 @@ private boolean allFiltersSupported(SpServiceRegistration service, return serviceTags.containsAll(filterByTags); } - private List findService(int retryCount) { + private List findServices(int retryCount) { var services = serviceStorage.getAll(); if (services.isEmpty()) { if (retryCount < MAX_RETRIES) { @@ -96,7 +99,7 @@ private List findService(int retryCount) { retryCount++; LOG.info("Could not find any extensions services, retrying ({}/{})", retryCount, MAX_RETRIES); TimeUnit.MILLISECONDS.sleep(1000); - return findService(retryCount); + return findServices(retryCount); } catch (InterruptedException e) { LOG.warn("Could not find a service currently due to exception {}", e.getMessage()); return Collections.emptyList(); diff --git a/streampipes-service-extensions/pom.xml b/streampipes-service-extensions/pom.xml index 8370ad77c0..26dcda6b5b 100644 --- a/streampipes-service-extensions/pom.xml +++ b/streampipes-service-extensions/pom.xml @@ -57,6 +57,18 @@ + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.mockito + mockito-core + test + diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java new file mode 100644 index 0000000000..4033789c8c --- /dev/null +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.extensions; + +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +public class CustomServiceTagResolver { + + private final Environment env; + + public CustomServiceTagResolver(Environment env) { + this.env = env; + } + + public Set getCustomServiceTags() { + if (env.getCustomServiceTags().exists()) { + var serviceTags = env.getCustomServiceTags().getValue(); + return Arrays.stream( + serviceTags.split(",") + ).map(serviceTagString -> SpServiceTag.create(SpServiceTagPrefix.CUSTOM, serviceTagString)) + .collect(Collectors.toSet()); + } else { + return Collections.emptySet(); + } + } +} diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java index 81775f2271..0bdb9c63ed 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java @@ -19,6 +19,7 @@ package org.apache.streampipes.service.extensions; import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; @@ -130,6 +131,7 @@ protected Set getServiceTags(Set extensi DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup())); } tags.addAll(getExtensionsServiceTags(extensions)); + tags.addAll(new CustomServiceTagResolver(Environments.getEnvironment()).getCustomServiceTags()); return tags; } diff --git a/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java b/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java new file mode 100644 index 0000000000..72e95db8b8 --- /dev/null +++ b/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.extensions; + + +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CustomServiceTagResolverTest { + + private StringEnvironmentVariable customServiceTags; + private CustomServiceTagResolver resolver; + + @BeforeEach + void setUp() { + var env = mock(Environment.class); + customServiceTags = mock(StringEnvironmentVariable.class); + when(env.getCustomServiceTags()).thenReturn(customServiceTags); + resolver = new CustomServiceTagResolver(env); + } + + @Test + void returnsCustomServiceTagsWhenTheyExist() { + when(customServiceTags.exists()).thenReturn(true); + when(customServiceTags.getValue()).thenReturn("tag1,tag2"); + + var result = resolver.getCustomServiceTags(); + + assertEquals(2, result.size()); + assertTrue(result.contains(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, "tag1"))); + assertTrue(result.contains(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, "tag2"))); + } + + @Test + void returnsEmptySetWhenNoCustomServiceTagsExist() { + when(customServiceTags.exists()).thenReturn(false); + + var result = resolver.getCustomServiceTags(); + + assertTrue(result.isEmpty()); + } +} \ No newline at end of file diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts new file mode 100644 index 0000000000..2dd4755377 --- /dev/null +++ b/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Injectable } from '@angular/core'; +import { HttpClient } from '@angular/common/http'; +import { PlatformServicesCommons } from './commons.service'; +import { Observable } from 'rxjs'; +import { SpServiceTag } from '../model/gen/streampipes-model'; +import { map } from 'rxjs/operators'; + +@Injectable({ + providedIn: 'root', +}) +export class ServiceTagService { + constructor( + private http: HttpClient, + private platformServicesCommons: PlatformServicesCommons, + ) {} + + getCustomServiceTags(): Observable { + return this.http + .get(`${this.platformServicesCommons.apiBasePath}/service-tags`) + .pipe( + map(response => { + return (response as any[]).map(p => + SpServiceTag.fromData(p), + ); + }), + ); + } +} diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index a8c7743976..b0de88590b 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-04-22 14:32:58. +// Generated using typescript-generator version 3.2.1263 on 2024-04-30 20:29:03. export class NamedStreamPipesEntity { '@class': @@ -112,6 +112,7 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { 'correspondingServiceGroup': string; 'createdAt': number; 'dataStream': SpDataStream; + 'deploymentConfiguration': ExtensionDeploymentConfiguration; 'eventGrounding': EventGrounding; 'eventSchema': EventSchema; 'icon': string; @@ -142,6 +143,10 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { instance.correspondingServiceGroup = data.correspondingServiceGroup; instance.createdAt = data.createdAt; instance.dataStream = SpDataStream.fromData(data.dataStream); + instance.deploymentConfiguration = + ExtensionDeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.eventGrounding = EventGrounding.fromData(data.eventGrounding); instance.eventSchema = EventSchema.fromData(data.eventSchema); instance.icon = data.icon; @@ -1794,6 +1799,26 @@ export class ExportItem { } } +export class ExtensionDeploymentConfiguration { + desiredServiceTags: SpServiceTag[]; + selectedEndpointUrl: string; + + static fromData( + data: ExtensionDeploymentConfiguration, + target?: ExtensionDeploymentConfiguration, + ): ExtensionDeploymentConfiguration { + if (!data) { + return data; + } + const instance = target || new ExtensionDeploymentConfiguration(); + instance.desiredServiceTags = __getCopyArrayFn(SpServiceTag.fromData)( + data.desiredServiceTags, + ); + instance.selectedEndpointUrl = data.selectedEndpointUrl; + return instance; + } +} + export class ExtensionItemDescription { appId: string; available: boolean; @@ -3245,6 +3270,7 @@ export class RuntimeOptionsRequest { | 'org.apache.streampipes.model.runtime.RuntimeOptionsResponse'; 'appId': string; 'belongsTo': string; + 'deploymentConfiguration': ExtensionDeploymentConfiguration; 'inputStreams': SpDataStream[]; 'requestId': string; 'staticProperties': StaticPropertyUnion[]; @@ -3260,6 +3286,10 @@ export class RuntimeOptionsRequest { instance['@class'] = data['@class']; instance.appId = data.appId; instance.belongsTo = data.belongsTo; + instance.deploymentConfiguration = + ExtensionDeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.inputStreams = __getCopyArrayFn(SpDataStream.fromData)( data.inputStreams, ); @@ -4176,6 +4206,7 @@ export type SpServiceStatus = export type SpServiceTagPrefix = | 'SP_GROUP' + | 'CUSTOM' | 'ADAPTER' | 'DATA_STREAM' | 'DATA_PROCESSOR' diff --git a/ui/projects/streampipes/platform-services/src/public-api.ts b/ui/projects/streampipes/platform-services/src/public-api.ts index e8aae7f708..f1ae0c6015 100644 --- a/ui/projects/streampipes/platform-services/src/public-api.ts +++ b/ui/projects/streampipes/platform-services/src/public-api.ts @@ -45,6 +45,7 @@ export * from './lib/apis/pipeline-element-template.service'; export * from './lib/apis/pipeline-monitoring.service'; export * from './lib/apis/pipeline-template.service'; export * from './lib/apis/semantic-types-rest.service'; +export * from './lib/apis/service-tags.service'; export * from './lib/apis/user.service'; export * from './lib/apis/user-admin.service'; export * from './lib/apis/user-group.service'; diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html new file mode 100644 index 0000000000..64d30f5980 --- /dev/null +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html @@ -0,0 +1,78 @@ + + +
+ + Any available service + + Restrict to service tags + + + +
+ + Service Tags + + + {{ serviceTag.value }} + + + + + + + {{ serviceTag }} + + + +
+
diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts new file mode 100644 index 0000000000..c019e9c369 --- /dev/null +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, ElementRef, Input, OnInit, ViewChild } from '@angular/core'; +import { + ExtensionDeploymentConfiguration, + ServiceTagService, + SpServiceTag, +} from '@streampipes/platform-services'; +import { Observable } from 'rxjs'; +import { FormControl } from '@angular/forms'; +import { COMMA, ENTER } from '@angular/cdk/keycodes'; +import { MatChipInputEvent } from '@angular/material/chips'; +import { map, startWith } from 'rxjs/operators'; +import { MatAutocompleteSelectedEvent } from '@angular/material/autocomplete'; +import { MatRadioChange } from '@angular/material/radio'; + +@Component({ + selector: 'sp-adapter-deployment-settings', + templateUrl: './adapter-deployment-settings.component.html', +}) +export class SpAdapterDeploymentSettingsComponent implements OnInit { + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + + availableServiceTags: SpServiceTag[] = []; + availableServiceTagValues: string[] = []; + + deploymentMode = 'all'; + + separatorKeysCodes: number[] = [ENTER, COMMA]; + serviceTagCtrl = new FormControl(''); + filteredServiceTags: Observable; + + @ViewChild('serviceTagInput') serviceTagInput: ElementRef; + + constructor(private serviceTagService: ServiceTagService) { + this.filteredServiceTags = this.serviceTagCtrl.valueChanges.pipe( + startWith(null), + map((serviceTagValue: string | null) => { + return serviceTagValue + ? this._filter(serviceTagValue) + : this.availableServiceTagValues.slice(); + }), + ); + } + + ngOnInit(): void { + if (this.deploymentConfiguration.desiredServiceTags.length > 0) { + this.deploymentMode = 'filter'; + } + this.serviceTagService.getCustomServiceTags().subscribe(res => { + this.availableServiceTags = res; + this.availableServiceTagValues = res.map(st => st.value); + }); + } + + add(event: MatChipInputEvent): void { + const value = (event.value || '').trim(); + + if (value) { + this.deploymentConfiguration.desiredServiceTags.push( + this.findTag(value), + ); + } + + event.chipInput!.clear(); + + this.serviceTagCtrl.setValue(null); + } + + findTag(value: string): SpServiceTag { + return this.availableServiceTags.find(st => st.value === value); + } + + remove(serviceTag: string): void { + const index = this.deploymentConfiguration.desiredServiceTags.findIndex( + st => st.value === serviceTag, + ); + + if (index >= 0) { + this.deploymentConfiguration.desiredServiceTags.splice(index, 1); + } + } + + selected(event: MatAutocompleteSelectedEvent): void { + this.deploymentConfiguration.desiredServiceTags.push( + this.findTag(event.option.viewValue), + ); + this.serviceTagInput.nativeElement.value = ''; + this.serviceTagCtrl.setValue(null); + } + + private _filter(value: string): string[] { + const filterValue = value.toLowerCase(); + + return this.availableServiceTagValues.filter(st => + st.toLowerCase().includes(filterValue), + ); + } + + handleSelectionChange(event: MatRadioChange): void { + if (event.value === 'all') { + this.deploymentConfiguration.desiredServiceTags = []; + } + } +} diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html index 4b3be6f469..4e5c556cb2 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html @@ -18,6 +18,17 @@
+ + + + diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.html b/ui/src/app/connect/components/configuration-group/configuration-group.component.html index a09b11b734..68569ade6d 100644 --- a/ui/src/app/connect/components/configuration-group/configuration-group.component.html +++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.html @@ -25,6 +25,7 @@
implements OnInit { + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + @Output() inputEmitter: EventEmitter = new EventEmitter(); completedStaticProperty: ConfigurationInfo; diff --git a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html index 29b084725b..c2db12bedb 100644 --- a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html +++ b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html @@ -24,6 +24,7 @@ >
{ + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + constructor() { super(); } diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html index bd8d67bde3..fb77d388b8 100644 --- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html +++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html @@ -20,6 +20,7 @@ implements OnInit { + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + @Output() inputEmitter: EventEmitter = new EventEmitter(); dependentStaticProperties: Map = new Map< diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html index 34aaad2028..4ecbd7c49b 100644 --- a/ui/src/app/core-ui/static-properties/static-property.component.html +++ b/ui/src/app/core-ui/static-properties/static-property.component.html @@ -86,6 +86,7 @@ Date: Wed, 1 May 2024 07:04:53 +0200 Subject: [PATCH 2/7] monthly update of vulnerability report (#2804) Signed-off-by: bossenti Co-authored-by: bossenti --- VULNERABILITY.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/VULNERABILITY.md b/VULNERABILITY.md index 253128c4a0..60e925573f 100644 --- a/VULNERABILITY.md +++ b/VULNERABILITY.md @@ -4,5 +4,11 @@ | https://osv.dev/GHSA-6mjq-h674-j845 | 6.5 | Maven | io.netty:netty-handler | 4.1.72.Final | pom.xml | | https://osv.dev/GHSA-w596-4wvx-j9j6
https://osv.dev/PYSEC-2022-42969 | 7.5 | PyPI | py | 1.11.0 | streampipes-client-python/poetry.lock | | https://osv.dev/GHSA-269g-pwp5-87pp | 4.4 | Maven | junit:junit (dev) | 4.8.2 | streampipes-maven-plugin/pom.xml | +| https://osv.dev/GHSA-wf5p-g6vw-rhxx | 6.5 | npm | axios | 0.21.4 | ui/package-lock.json | +| https://osv.dev/GHSA-rv95-896h-c2vc | 6.1 | npm | express (dev) | 4.18.3 | ui/package-lock.json | +| https://osv.dev/GHSA-cxjh-pqwp-8mfp | 6.5 | npm | follow-redirects | 1.15.5 | ui/package-lock.json | | https://osv.dev/GHSA-4943-9vgg-gr5r | 6.1 | npm | quill | 1.3.7 | ui/package-lock.json | -| https://osv.dev/GHSA-wr3j-pwj9-hqq6 | 7.4 | npm | webpack-dev-middleware (dev) | 6.1.1 | ui/package-lock.json | +| https://osv.dev/GHSA-f5x3-32g6-xq36 | 6.5 | npm | tar (dev) | 6.2.0 | ui/package-lock.json | +| https://osv.dev/GHSA-9qxr-qj54-h672 | 2.6 | npm | undici (dev) | 6.7.1 | ui/package-lock.json | +| https://osv.dev/GHSA-m4v8-wqvr-p9f7 | 3.9 | npm | undici (dev) | 6.7.1 | ui/package-lock.json | +| https://osv.dev/GHSA-8jhw-289h-jh2g | 5.9 | npm | vite (dev) | 5.1.5 | ui/package-lock.json | From c1dd08ab2754963ed2ab658e7cdb0d2c6708aaf4 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 1 May 2024 09:13:40 +0200 Subject: [PATCH 3/7] chore(deps): bump org.simplejavamail:simple-java-mail (#2805) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d8b09d4ba6..d14528af8f 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,7 @@ 1.9.0 5.0.2 5.1.27 - 8.9.0 + 8.10.0 1.4.0 2.0.6 2.2 From 28266e703117fe3d9c49d844206d06bc3aa243ae Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 2 May 2024 07:39:47 +0200 Subject: [PATCH 4/7] chore(deps): bump org.checkerframework:checker-qual (#2807) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d14528af8f..4077a186ff 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ 1.2 1.1.0 3.9 - 3.42.0 + 3.43.0 1.17.0 3.2.2 1.26.0 From ea776b5096c8ed914722a572ac1164c7ea20581d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 2 May 2024 07:39:58 +0200 Subject: [PATCH 5/7] chore(deps): bump org.apache.inlong:tubemq-client from 1.11.0 to 1.12.0 (#2806) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4077a186ff..f1ceafa3e6 100644 --- a/pom.xml +++ b/pom.xml @@ -67,7 +67,7 @@ 4.5.13 4.4.9 2.24 - 1.11.0 + 1.12.0 2.17.0 2.17.0 3.0.0 From 1d37ce4ed7d3f031989654081d3db8fdf49b3418 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 2 May 2024 08:25:12 +0200 Subject: [PATCH 6/7] chore(deps-dev): bump ejs from 3.1.9 to 3.1.10 in /ui (#2808) Bumps [ejs](https://github.com/mde/ejs) from 3.1.9 to 3.1.10. - [Release notes](https://github.com/mde/ejs/releases) - [Commits](https://github.com/mde/ejs/compare/v3.1.9...v3.1.10) --- updated-dependencies: - dependency-name: ejs dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- ui/package-lock.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ui/package-lock.json b/ui/package-lock.json index 404082b4f5..208887d8ff 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -10112,9 +10112,9 @@ "integrity": "sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==" }, "node_modules/ejs": { - "version": "3.1.9", - "resolved": "https://registry.npmjs.org/ejs/-/ejs-3.1.9.tgz", - "integrity": "sha512-rC+QVNMJWv+MtPgkt0y+0rVEIdbtxVADApW9JXrUVlzHetgcyczP/E7DJmWJ4fJCZF2cPcBk0laWO9ZHMG3DmQ==", + "version": "3.1.10", + "resolved": "https://registry.npmjs.org/ejs/-/ejs-3.1.10.tgz", + "integrity": "sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA==", "dev": true, "dependencies": { "jake": "^10.8.5" From d69504c91540b4f93be267910d41d1dae44abfd1 Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Thu, 2 May 2024 08:38:14 +0200 Subject: [PATCH 7/7] feat(#2374): Add suggestion to test processing elements (#2375) * feat(#2374): Add suggestion to test processing elements * #2374 refactored unit tests to use ProcessingElementTestExecutor * #2374 cleanup * #2374 cleanup * #2374 updated test executor to non-static methods * #2374 updated test executor to non-static methods * test(#2374): provide a framework to easily unit test processing elements (#2730) * #2374 refactored unit tests to use ProcessingElementTestExecutor * #2374 cleanup * #2374 cleanup * #2374 updated test executor to non-static methods * #2374 updated test executor to non-static methods * #2374 Added test configuration builder and resolved selector prefixes * #2374 Moved test executor to a new package * feat(#2374): Fix cyclic dependencies and checkstyle * feat(#2374): Fix checkstyle * #2374 extracted TestConfigurationBuilder into a separate file --------- Co-authored-by: Isaak --- pom.xml | 1 + .../pom.xml | 2 +- .../NumericalFilterProcessor.java | 6 +- .../TestBooleanFilterProcessor.java | 118 +++--- .../compose/TestComposeProcessor.java | 208 +++++------ .../merge/TestMergeByTimeProcessor.java | 276 +++++++------- .../NumericalFilterProcessorTest.java | 83 +++++ ...stSwingingDoorTrendingFilterProcessor.java | 339 +++++++++++------- .../textfilter/TestTextFilterProcessor.java | 188 +++++----- .../PipelineElementTemplateVisitor.java | 5 +- streampipes-test-utils-executors/pom.xml | 75 ++++ .../test/executors/PrefixStrategy.java | 23 ++ .../ProcessingElementTestExecutor.java | 190 ++++++++++ .../test/executors/TestConfiguration.java | 47 +++ .../executors/TestConfigurationBuilder.java | 62 ++++ 15 files changed, 1066 insertions(+), 557 deletions(-) create mode 100644 streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java create mode 100644 streampipes-test-utils-executors/pom.xml create mode 100644 streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/PrefixStrategy.java create mode 100644 streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java create mode 100644 streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfiguration.java create mode 100644 streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java diff --git a/pom.xml b/pom.xml index f1ceafa3e6..967ff16e7b 100644 --- a/pom.xml +++ b/pom.xml @@ -868,6 +868,7 @@ streampipes-storage-couchdb streampipes-storage-management streampipes-test-utils + streampipes-test-utils-executors streampipes-user-management streampipes-vocabulary streampipes-wrapper diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml b/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml index ed05e4cbc5..536e4c9929 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml +++ b/streampipes-extensions/streampipes-processors-filters-jvm/pom.xml @@ -62,7 +62,7 @@ org.apache.streampipes - streampipes-test-utils + streampipes-test-utils-executors 0.95.0-SNAPSHOT test diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java index 14062127cb..5f9a030d5a 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java @@ -39,9 +39,9 @@ public class NumericalFilterProcessor implements IStreamPipesDataProcessor { - private static final String NUMBER_MAPPING = "number-mapping"; - private static final String VALUE = "value"; - private static final String OPERATION = "operation"; + protected static final String NUMBER_MAPPING = "number-mapping"; + protected static final String VALUE = "value"; + protected static final String OPERATION = "operation"; private double threshold; private NumericalOperator numericalOperator; diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java index 3f473f6f61..a6d1a73727 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/booleanfilter/TestBooleanFilterProcessor.java @@ -18,106 +18,80 @@ package org.apache.streampipes.processors.filters.jvm.processor.booleanfilter; -import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; -import org.apache.streampipes.model.runtime.Event; -import org.apache.streampipes.model.runtime.EventFactory; -import org.apache.streampipes.model.runtime.SchemaInfo; -import org.apache.streampipes.model.runtime.SourceInfo; -import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; -import org.apache.streampipes.test.extensions.api.StoreEventCollector; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Stream; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class TestBooleanFilterProcessor { - private static final String STREAM_PREFIX = "s0::"; - private static final Logger LOG = LoggerFactory.getLogger(TestBooleanFilterProcessor.class); + BooleanFilterProcessor processor; + private static final String FIELD_NAME = "Test"; - static Stream data() { - return Stream.of( - Arguments.of("True", Arrays.asList(true, true, false, true, false, true, false, true), 5), - Arguments.of("True", Arrays.asList(true, true, true), 3), - Arguments.of("True", Arrays.asList(false, false, false), 0), - Arguments.of("True", Collections.emptyList(), 0), - Arguments.of("False", Arrays.asList(true, false, true, false, true, false, true, false, true), 4), - Arguments.of("False", Arrays.asList(true, true, true), 0), - Arguments.of("False", Arrays.asList(false, false, false), 3), - Arguments.of("False", Collections.emptyList(), 0) - ); + @BeforeEach + public void setup(){ + processor = new BooleanFilterProcessor(); } - @ParameterizedTest @MethodSource("data") - public void testBoolenFilter( + public void test( String boolToKeep, List eventBooleans, - int expectedFilteredBooleanCount + List outputEventBooleans ) { - var fieldName = "Test"; - var processorParams = mock(ProcessorParams.class); - var eventProcessorRuntimeContext = mock(EventProcessorRuntimeContext.class); + TestConfiguration configuration = TestConfiguration.builder() + .configWithDefaultPrefix(BooleanFilterProcessor.BOOLEAN_MAPPING, FIELD_NAME) + .config(BooleanFilterProcessor.VALUE, boolToKeep) + .build(); - var processor = new BooleanFilterProcessor(); - var extractor = mock(ProcessingElementParameterExtractor.class); - when(processorParams.extractor()).thenReturn(extractor); - when(extractor.mappingPropertyValue(BooleanFilterProcessor.BOOLEAN_MAPPING)).thenReturn(STREAM_PREFIX + fieldName); - when(extractor.selectedSingleValue(BooleanFilterProcessor.VALUE, String.class)).thenReturn(boolToKeep); + List> events = new ArrayList<>(); + eventBooleans.forEach(bool->events.add(Map.of(FIELD_NAME, bool))); - var collector = new StoreEventCollector(); - processor.onInvocation(processorParams, collector, eventProcessorRuntimeContext); + List> outputEvents = new ArrayList<>(); + outputEventBooleans.forEach(bool->outputEvents.add(Map.of(FIELD_NAME, bool))); - sendEvents(processor, collector, eventBooleans, fieldName); + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); - assertEquals(expectedFilteredBooleanCount, collector.getEvents().size()); + testExecutor.run(events, outputEvents); } - private void sendEvents( - BooleanFilterProcessor processor, - StoreEventCollector collector, - List eventBooleans, - String fieldName) { - List events = makeEvents(eventBooleans, fieldName); - for (Event event : events) { - LOG.info("Sending event with value " - + event.getFieldBySelector(STREAM_PREFIX + fieldName) - .getAsPrimitive() - .getAsBoolean()); - processor.onEvent(event, collector); - - } - } - - private List makeEvents(List eventBooleans, String fieldName) { - List events = new ArrayList<>(); - for (Boolean eventSetting : eventBooleans) { - events.add(makeEvent(eventSetting, fieldName)); - } - return events; - } - - private Event makeEvent(Boolean value, String fieldName) { - Map map = new HashMap<>(); - map.put(fieldName, value); - return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "s0"), - new SchemaInfo(null, new ArrayList<>()) + static Stream data() { + return Stream.of( + Arguments.of("True", + Arrays.asList(true, true, false, true, false, true, false, true), + Arrays.asList(true, true, true, true, true)), + Arguments.of("True", + Arrays.asList(true, true, true), + Arrays.asList(true, true, true)), + Arguments.of("True", + Arrays.asList(false, false, false), + Collections.emptyList()), + Arguments.of("True", + Collections.emptyList(), + Collections.emptyList()), + Arguments.of("False", + Arrays.asList(true, false, true, false, true, false, true, false, true), + Arrays.asList(false, false, false, false)), + Arguments.of("False", + Arrays.asList(true, true, true), + Collections.emptyList()), + Arguments.of("False", + Arrays.asList(false, false, false), + Arrays.asList(false, false, false)), + Arguments.of("False", + Collections.emptyList(), + Collections.emptyList()) ); } } diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java index fd56bb23ee..05bf18cccd 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/compose/TestComposeProcessor.java @@ -18,124 +18,98 @@ package org.apache.streampipes.processors.filters.jvm.processor.compose; -//@RunWith(Parameterized.class) +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.model.output.OutputStrategy; +import org.apache.streampipes.test.executors.PrefixStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; + public class TestComposeProcessor { -// -// private static final Logger LOG = LoggerFactory.getLogger(TestComposeProcessor.class); -// -// @org.junit.runners.Parameterized.Parameter -// public String testName; -// -// @org.junit.runners.Parameterized.Parameter(1) -// public List> eventMaps; -// -// @org.junit.runners.Parameterized.Parameter(2) -// public List selectorPrefixes; -// -// @org.junit.runners.Parameterized.Parameter(3) -// public int expectedNumOfEvents; -// -// @org.junit.runners.Parameterized.Parameter(4) -// public int expectedEventSize; -// -// private static final String outputKeySelector1 = "key-selector1"; -// private static final String outputKeySelector2 = "key-selector2"; -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// Map mapWithFirstOutputSelector = new HashMap<>(); -// mapWithFirstOutputSelector.put(outputKeySelector1, new Object()); -// -// Map mapWithSecondOutputSelector = new HashMap<>(); -// mapWithSecondOutputSelector.put(outputKeySelector2, new Object()); -// -// Map mapWithInvalidOutputSelector = new HashMap<>(); -// mapWithInvalidOutputSelector.put("invalid-selector", new Object()); -// -// List> singleMap = new ArrayList<>(); -// singleMap.add(mapWithFirstOutputSelector); -// -// List> twoMapsMatching = new ArrayList<>(); -// twoMapsMatching.add(mapWithFirstOutputSelector); -// twoMapsMatching.add(mapWithSecondOutputSelector); -// -// List> twoMapsOneMatching = new ArrayList<>(); -// twoMapsOneMatching.add(mapWithFirstOutputSelector); -// twoMapsOneMatching.add(mapWithInvalidOutputSelector); -// -// List> twoMapsNoneMatching = new ArrayList<>(); -// twoMapsNoneMatching.add(mapWithInvalidOutputSelector); -// twoMapsNoneMatching.add(new HashMap<>(mapWithInvalidOutputSelector)); -// -// return Arrays.asList(new Object[][]{ -// {"testWithOneEvent", singleMap, List.of("s0"), 0, 0}, -// {"testWithTwoEventsSamePrefix", twoMapsMatching, List.of("s0", "s0"), 0, 0}, -// {"testWithTwoEvents", twoMapsMatching, List.of("s0", "s1"), 1, 2}, -// {"testWithTwoEventsAnd1InvalidSelector", twoMapsOneMatching, List.of("s0", "s1"), 1, 1}, -// {"testWithTwoEventsWithInvalidSelectors", twoMapsNoneMatching, List.of("s0", "s1"), 1, 0} -// }); -// } -// -// -// -// @Test -// public void testComposeProcessor() { -// LOG.info("Executing test: {}", testName); -// var processor = new ComposeProcessor(); -// var originalGraph = processor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// var graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// List outputStrategies = new ArrayList<>(); -// outputStrategies.add(new CustomOutputStrategy(List.of("s0::" + outputKeySelector1, "s1::" + outputKeySelector2))); -// graph.setOutputStrategies(outputStrategies); -// var params = new ProcessorParams(graph); -// -// var eventCollector = new StoreEventCollector(); -// processor.onInvocation(params, eventCollector, null); -// -// List collectedEvents = sendEvents(processor, eventCollector); -// -// LOG.info("Expected collected event count is: {}", expectedNumOfEvents); -// LOG.info("Actual collected event count is: {}", collectedEvents.size()); -// assertEquals(expectedNumOfEvents, collectedEvents.size()); -// -// if (!collectedEvents.isEmpty()){ -// int eventSize = collectedEvents.get(0).getFields().size(); -// -// LOG.info("Expected event size is: {}", expectedEventSize); -// LOG.info("Actual event size is: {}", eventSize); -// assertEquals(expectedEventSize, eventSize); -// } -// } -// -// private List sendEvents(ComposeProcessor processor, StoreEventCollector collector) { -// List events = makeEvents(); -// for (Event event : events) { -// LOG.info("Sending event with map: " + event.getFields() -// + ", and prefix selector: " + event.getSourceInfo().getSelectorPrefix()); -// processor.onEvent(event, collector); -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } -// return collector.getEvents(); -// } -// private List makeEvents() { -// List events = new ArrayList<>(); -// for (int i = 0; i < eventMaps.size(); i++) { -// events.add(makeEvent(eventMaps.get(i), selectorPrefixes.get(i))); -// } -// return events; -// } -// -// private Event makeEvent(Map eventMap, String selectorPrefix) { -// return EventFactory.fromMap(eventMap, new SourceInfo("test" + "-topic", selectorPrefix), -// new SchemaInfo(null, new ArrayList<>())); -// } -} + ComposeProcessor processor; + private static final String SELECTOR_1 = "key-selector1"; + private static final String SELECTOR_2 = "key-selector2"; + private static final String INVALID_SELECTOR = "invalid-selector"; + private static final String S0_PREFIX = "s0::"; + private static final String S1_PREFIX = "s1::"; + + @BeforeEach + public void setup(){ + processor = new ComposeProcessor(); + } + + @ParameterizedTest + @MethodSource("data") + public void test(List> events, + List> outputEvents, + PrefixStrategy prefixStrategy) { + Consumer invocationConfig = (invocation->{ + List outputStrategies = new ArrayList<>(); + outputStrategies.add(new CustomOutputStrategy(List.of(S0_PREFIX + SELECTOR_1, S1_PREFIX + SELECTOR_2))); + invocation.setOutputStrategies(outputStrategies); + }); + TestConfiguration configuration = TestConfiguration.builder().prefixStrategy(prefixStrategy).build(); + + ProcessingElementTestExecutor testExecutor = + new ProcessingElementTestExecutor(processor, configuration, invocationConfig); + + testExecutor.run(events, outputEvents); + } + static Stream data() { + + var object1 = new Object(); + var object2 = new Object(); + + return Stream.of( + Arguments.of( + List.of( + Map.of(SELECTOR_1, object1)), + List.of(), + PrefixStrategy.SAME_PREFIX), + Arguments.of( + List.of( + Map.of(SELECTOR_1, object1), + Map.of(SELECTOR_2, object2)), + List.of(), + PrefixStrategy.SAME_PREFIX), + Arguments.of( + List.of( + Map.of(SELECTOR_1, object1), + Map.of(SELECTOR_2, object2)), + List.of( + Map.of(SELECTOR_1, object1, SELECTOR_2, object2) + ), + PrefixStrategy.ALTERNATE), + Arguments.of( + List.of( + Map.of(SELECTOR_1, object1), + Map.of(INVALID_SELECTOR, object2)), + List.of( + Map.of(SELECTOR_1, object1) + ), + PrefixStrategy.ALTERNATE), + Arguments.of( + List.of( + Map.of(INVALID_SELECTOR, object1), + Map.of(INVALID_SELECTOR, object2)), + List.of( + Map.of() + ), + PrefixStrategy.ALTERNATE) + ); + } +} diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java index 2c4334fb00..953080fb68 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/merge/TestMergeByTimeProcessor.java @@ -17,145 +17,141 @@ */ package org.apache.streampipes.processors.filters.jvm.processor.merge; -//@RunWith(Parameterized.class) +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.output.CustomOutputStrategy; +import org.apache.streampipes.test.executors.PrefixStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; +import org.apache.streampipes.test.executors.TestConfigurationBuilder; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; + + public class TestMergeByTimeProcessor { -// -// private static final Logger LOG = LoggerFactory.getLogger(TestMergeByTimeProcessor.class); -// -// private static final Integer timeInterval = 100; -// @org.junit.runners.Parameterized.Parameter -// public String testName; -// @org.junit.runners.Parameterized.Parameter(1) -// public List eventStrings; -// @org.junit.runners.Parameterized.Parameter(2) -// public List expectedValue; -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// return Arrays.asList(new Object[][]{ -// {"testWithInInterval", Arrays.asList("s0:0", "s1:90"), List.of("(90,0)")}, -// {"testNotWithInInterval", Arrays.asList("s0:0", "s1:110"), List.of()}, -// {"testWithInAndNotWithInInterval", Arrays.asList("s0:0", "s1:80", "s0:110", "s1:500"), -// List.of("(80,0)")}, -// {"testFigGvnInDocs", -// Arrays.asList("s1:0", "s0:10", "s0:110", "s1:115", "s0:120", "s1:230", "s0:340", "s0:500", -// "s1:510"), -// Arrays.asList("(0,10)", "(115,110)", "(510,500)")} -// }); -// } -// -// @Test -// public void testMergeByTimeProcessor() { -// MergeByTimeProcessor mergeByTimeProcessor = new MergeByTimeProcessor(); -// DataProcessorDescription originalGraph = mergeByTimeProcessor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// graph.setInputStreams(Arrays.asList( -// EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stram0")), -// EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream1")) -// )); -// -// graph.setOutputStream( -// EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")) -// ); -// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() -// .setActualTopicName("output-topic"); -// -// List outputKeySelectors = graph.getOutputStrategies() -// .stream() -// .filter(CustomOutputStrategy.class::isInstance) -// .map(o -> (CustomOutputStrategy) o) -// .findFirst() -// .map(CustomOutputStrategy::getSelectedPropertyKeys) -// .orElse(new ArrayList<>()); -// outputKeySelectors.add("s0::timestamp_mapping_stream_1"); -// outputKeySelectors.add("s1::timestamp_mapping_stream_2"); -// -// List mappingPropertyUnaries = graph.getStaticProperties() -// .stream() -// .filter(p -> p instanceof MappingPropertyUnary) -// .map((p -> (MappingPropertyUnary) p)) -// .filter(p -> Arrays.asList( -// MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, -// MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY) -// .contains(p.getInternalName())) -// .collect(Collectors.toList()); -// -// assert mappingPropertyUnaries.size() == 2; -// mappingPropertyUnaries.get(0) -// .setSelectedProperty("s0::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); -// mappingPropertyUnaries.get(1) -// .setSelectedProperty("s1::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); -// -// FreeTextStaticProperty fsp = graph.getStaticProperties().stream() -// .filter(p -> p instanceof FreeTextStaticProperty) -// .map((p -> (FreeTextStaticProperty) p)) -// .filter(p -> p.getInternalName().equals(MergeByTimeProcessor.TIME_INTERVAL)) -// .findFirst().orElse(null); -// assert fsp != null; -// fsp.setValue(String.valueOf(timeInterval)); -// -// ProcessorParams params = new ProcessorParams(graph); -// -// StoreEventCollector collector = new StoreEventCollector(); -// -// mergeByTimeProcessor.onInvocation(params, collector, null); -// sendEvents(mergeByTimeProcessor, collector); -// -// List actualCollectedEvents = collector.getEvents().stream() -// .map(e -> formatMergedEvent(e)) -// .collect(Collectors.toList()); -// -// LOG.info("Expected merged event is {}", expectedValue); -// LOG.info("Actual merged event is {}", actualCollectedEvents); -// assertTrue(eventsEquals(expectedValue, actualCollectedEvents)); -// } -// -// private boolean eventsEquals(List expectedValue, List actualCollectedEvents) { -// if (expectedValue.size() != actualCollectedEvents.size()) { -// return false; -// } -// for (int i = 0; i < expectedValue.size(); i++) { -// if (!expectedValue.get(i).equalsIgnoreCase(actualCollectedEvents.get(i))) { -// return false; -// } -// } -// return true; -// } -// -// private String formatMergedEvent(Event mergedEvent) { -// return String.format("(%s)", mergedEvent.getFields().values().stream() -// .map(m -> m.getAsPrimitive().getAsString()).collect(Collectors.joining(","))); -// } -// -// private void sendEvents(MergeByTimeProcessor mergeByTimeProcessor, StoreEventCollector spOut) { -// List events = makeEvents(); -// for (Event event : events) { -// mergeByTimeProcessor.onEvent(event, spOut); -// } -// -// } -// -// private List makeEvents() { -// List events = Lists.newArrayList(); -// for (String eventString : eventStrings) { -// events.add(makeEvent(eventString)); -// } -// return events; -// } -// -// private Event makeEvent(String eventString) { -// Map map = Maps.newHashMap(); -// String streamId = eventString.split(":")[0]; -// String timestamp = eventString.split(":")[1]; -// if (streamId.equals("s0")) { -// map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, timestamp); -// } else { -// map.put(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, timestamp); -// } -// return EventFactory.fromMap(map, -// new SourceInfo("test", streamId), -// new SchemaInfo(null, Lists.newArrayList())); -// } + + private static final String S0_PREFIX = "s0"; + private static final String S1_PREFIX = "s1"; + private static final Integer timeInterval = 100; + + MergeByTimeProcessor processor; + + @BeforeEach + public void setup(){ + processor = new MergeByTimeProcessor(); + } + @ParameterizedTest + @MethodSource("data") + public void test(List> events, + List> outputEvents, + PrefixStrategy prefixStrategy, + List customPrefixes){ + + + TestConfigurationBuilder configurationBuilder = TestConfiguration.builder() + .config(MergeByTimeProcessor.TIME_INTERVAL, timeInterval) + .configWithPrefix(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, S0_PREFIX) + .configWithPrefix(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, S1_PREFIX); + + if (customPrefixes != null){ + configurationBuilder.customPrefixStrategy(customPrefixes); + } else { + configurationBuilder.prefixStrategy(prefixStrategy); + } + + + Consumer invocationConfig = (invocation->{ + List outputKeySelectors = invocation.getOutputStrategies() + .stream() + .filter(CustomOutputStrategy.class::isInstance) + .map(o -> (CustomOutputStrategy) o) + .findFirst() + .map(CustomOutputStrategy::getSelectedPropertyKeys) + .orElse(new ArrayList<>()); + outputKeySelectors.add(S0_PREFIX + "::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY); + outputKeySelectors.add(S1_PREFIX + "::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY); + }); + + ProcessingElementTestExecutor testExecutor = + new ProcessingElementTestExecutor(processor, configurationBuilder.build(), invocationConfig); + + testExecutor.run(events, outputEvents); + } + + static Stream data() { + return Stream.of( + Arguments.of( + List.of( + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "90") + ), + List.of( + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "90", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0" + )), + PrefixStrategy.ALTERNATE, + null), + Arguments.of( + List.of( + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "110") + ), + List.of(), + PrefixStrategy.ALTERNATE, + null), + Arguments.of( + List.of( + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "80"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "500") + ), + List.of( + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "80", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0" + )), + PrefixStrategy.ALTERNATE, + null), + Arguments.of( + List.of( + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "0"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "10"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "115"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "120"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "230"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "340"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "500"), + Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "510") + ), + List.of( + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "0", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "10" + ), + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "115", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110" + ), + Map.of( + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "510", + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "500" + ) + ), + null, + List.of(S1_PREFIX, S0_PREFIX, S0_PREFIX, S1_PREFIX, S0_PREFIX, S1_PREFIX, S0_PREFIX, S0_PREFIX, S1_PREFIX)) + ); + } } \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java new file mode 100644 index 0000000000..ed08eecd9f --- /dev/null +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessorTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.filters.jvm.processor.numericalfilter; + +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +public class NumericalFilterProcessorTest { + + private static final String PROPERTY_NAME = "propertyName"; + private NumericalFilterProcessor processor; + + + @BeforeEach + public void setup() { + processor = new NumericalFilterProcessor(); + } + + @Test + public void testLowerThenOperatorFilterNotApplied() { + + TestConfiguration configuration = TestConfiguration.builder() + .configWithDefaultPrefix(NumericalFilterProcessor.NUMBER_MAPPING, PROPERTY_NAME) + .config(NumericalFilterProcessor.VALUE, 10.0) + .config(NumericalFilterProcessor.OPERATION, "<") + .build(); + + List> inputEvents = List.of( + Map.of(PROPERTY_NAME, 1.0f) + ); + + List> outputEvents = List.of( + Map.of(PROPERTY_NAME, 1.0f) + ); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(inputEvents, outputEvents); + } + + @Test + public void testLowerThenOperatorFilterApplied() { + + TestConfiguration configuration = TestConfiguration.builder() + .configWithDefaultPrefix(NumericalFilterProcessor.NUMBER_MAPPING, PROPERTY_NAME) + .config(NumericalFilterProcessor.VALUE, 10.0) + .config(NumericalFilterProcessor.OPERATION, "<") + .build(); + + List> inputEvents = List.of( + Map.of(PROPERTY_NAME, 11.0f) + ); + + List> outputEvents = List.of(); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(inputEvents, outputEvents); + } + +} \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java index 3a5dbf2288..07f14e2ad5 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/sdt/TestSwingingDoorTrendingFilterProcessor.java @@ -18,133 +18,216 @@ package org.apache.streampipes.processors.filters.jvm.processor.sdt; -//@RunWith(Parameterized.class) +import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + public class TestSwingingDoorTrendingFilterProcessor { -// -// private static final Logger LOG = LoggerFactory.getLogger(TestSwingingDoorTrendingFilterProcessor.class); -// -// @org.junit.runners.Parameterized.Parameter -// public String testName; -// @org.junit.runners.Parameterized.Parameter(1) -// public String sdtCompressionDeviation; -// @org.junit.runners.Parameterized.Parameter(2) -// public String sdtCompressionMinTimeInterval; -// @org.junit.runners.Parameterized.Parameter(3) -// public String sdtCompressionMaxTimeInterval; -// @org.junit.runners.Parameterized.Parameter(4) -// public int expectedFilteredCount; -// @org.junit.runners.Parameterized.Parameter(5) -// public List> eventSettings; -// @org.junit.runners.Parameterized.Parameter(6) -// public boolean expectException; -// @org.junit.runners.Parameterized.Parameter(7) -// public String expectedErrorMessage; -// -// private final String sdtTimestampField = "sdtTimestampField"; -// private final String sdtValueField = "sdtValueField"; -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// -// return Arrays.asList(new Object[][]{ -// {"testWithOneEvent", "10.0", "100", "500", 1, List.of(Pair.of(9, 50.0)), false, ""}, -// {"testFullFilter", "10.0", "100", "500", 4, List.of( -// Pair.of(0, 50.0), //true -// Pair.of(50, 50.0), //false -// Pair.of(200, 100.0), //false -// Pair.of(270, 140.0), //false -// Pair.of(300, 250.0), //true -// Pair.of(900, 500.0), //true -// Pair.of(1100, 800.0), //false -// Pair.of(1250, 1600.0) //true -// ) -// , false, ""}, -// {"testWithNegativeCompressionDeviation", "-10.0", "100", "500", 1, new ArrayList<>(), true -// , "Compression Deviation should be positive!"}, -// {"testWithNegativeMinInterval", "10.0", "-100", "500", 1, new ArrayList<>(), true -// , "Compression Minimum Time Interval should be >= 0!"}, -// {"testWithMinInterval>MaxInterval", "10.0", "1000", "500", 1, new ArrayList<>(), true -// , "Compression Minimum Time Interval should be < Compression Maximum Time Interval!"} -// }); -// } -// -// @Rule -// public ExpectedException exceptionRule = ExpectedException.none(); -// -// @Test -// public void testSdtFilter() { -// LOG.info("Executing test: {}", testName); -// SwingingDoorTrendingFilterProcessor processor = new SwingingDoorTrendingFilterProcessor(); -// DataProcessorDescription originalGraph = processor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = -// InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// -// ProcessorParams params = new ProcessorParams(graph); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY -// , MappingPropertyUnary.class) -// .setSelectedProperty("test::" + sdtTimestampField); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY -// , MappingPropertyUnary.class) -// .setSelectedProperty("test::" + sdtValueField); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY -// , FreeTextStaticProperty.class) -// .setValue(sdtCompressionDeviation); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY -// , FreeTextStaticProperty.class) -// .setValue(sdtCompressionMinTimeInterval); -// params.extractor().getStaticPropertyByName(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY -// , FreeTextStaticProperty.class) -// .setValue(sdtCompressionMaxTimeInterval); -// -// if (expectException){ -// LOG.info("Expecting Error Message: {}", expectedErrorMessage); -// exceptionRule.expect(SpRuntimeException.class); -// exceptionRule.expectMessage(expectedErrorMessage); -// } -// -// StoreEventCollector eventCollector = new StoreEventCollector(); -// processor.onInvocation(params, eventCollector, null); -// -// int result = sendEvents(processor, eventCollector); -// -// LOG.info("Expected SDT filtered count is: {}", expectedFilteredCount); -// LOG.info("Actual SDT filtered count is: {}", result); -// assertEquals(expectedFilteredCount, result); -// } -// -// -// private int sendEvents(SwingingDoorTrendingFilterProcessor processor, StoreEventCollector collector) { -// List events = makeEvents(); -// for (Event event : events) { -// LOG.info("Sending event with timestamp: " -// + event.getFieldBySelector("test::" + sdtTimestampField).getAsPrimitive().getAsLong() -// + ", and value: " -// + event.getFieldBySelector("test::" + sdtValueField).getAsPrimitive().getAsFloat()); -// processor.onEvent(event, collector); -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } -// return collector.getEvents().size(); -// } -// -// private List makeEvents() { -// List events = new ArrayList<>(); -// for (Pair eventSetting: eventSettings) { -// events.add(makeEvent(eventSetting)); -// } -// return events; -// } -// -// private Event makeEvent(Pair eventSetting) { -// Map map = new HashMap<>(); -// map.put(sdtTimestampField, eventSetting.getKey()); -// map.put(sdtValueField, eventSetting.getValue()); -// return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "test"), -// new SchemaInfo(null, new ArrayList<>())); -// } + + private final String sdtTimestampField = "sdtTimestampField"; + private final String sdtValueField = "sdtValueField"; + + private SwingingDoorTrendingFilterProcessor processor; + + @BeforeEach + public void setup() { + processor = new SwingingDoorTrendingFilterProcessor(); + } + + @Test + public void test() { + TestConfiguration configuration = TestConfiguration + .builder() + .configWithDefaultPrefix(SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, sdtTimestampField) + .configWithDefaultPrefix(SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, sdtValueField) + .config(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, "10.0") + .config(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, "100") + .config(SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, "500") + .build(); + + List> inputEvents = List.of( + Map.of(sdtTimestampField, 0, + sdtValueField, 50.0 + ), + Map.of(sdtTimestampField, 50, + sdtValueField, 50.0 + ), + Map.of(sdtTimestampField, 200, + sdtValueField, 100.0 + ), + Map.of(sdtTimestampField, 270, + sdtValueField, 140.0 + ), + Map.of(sdtTimestampField, 300, + sdtValueField, 250.0 + ), + Map.of(sdtTimestampField, 900, + sdtValueField, 500.0 + ), + Map.of(sdtTimestampField, 1100, + sdtValueField, 800.0 + ), + Map.of(sdtTimestampField, 1250, + sdtValueField, 1600.0 + ) + ); + + List> outputEvents = List.of( + Map.of(sdtTimestampField, 0, + sdtValueField, 50.0 + ), + Map.of(sdtTimestampField, 270, + sdtValueField, 140.0 + ), + Map.of(sdtTimestampField, 900, + sdtValueField, 500.0 + ), + Map.of(sdtTimestampField, 1100, + sdtValueField, 800.0 + ) + ); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(inputEvents, outputEvents); + } + + @Test + public void testInvalidDeviationKey() { + TestConfiguration configuration = TestConfiguration + .builder() + .configWithDefaultPrefix( + SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, + sdtTimestampField + ) + .configWithDefaultPrefix( + SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, + sdtValueField + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, + "-10.0" + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, + "100" + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, + "500" + ) + .build(); + + List> inputEvents = new ArrayList<>(); + + List> outputEvents = new ArrayList<>(); + + Exception expectedException = new SpRuntimeException("Compression Deviation should be positive!"); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + Exception exception = assertThrows(expectedException.getClass(), () -> { + testExecutor.run(inputEvents, outputEvents); + }); + String expectedMessage = expectedException.getMessage(); + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + + } + + @Test + public void testNegativeMinimumTime() { + TestConfiguration configuration = TestConfiguration + .builder() + .configWithDefaultPrefix( + SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, + sdtTimestampField + ) + .configWithDefaultPrefix( + SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, + sdtValueField + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, + "10.0" + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, + "-100" + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, + "500" + ) + .build(); + + List> inputEvents = new ArrayList<>(); + + List> outputEvents = new ArrayList<>(); + + Exception expectedException = new SpRuntimeException("Compression Minimum Time Interval should be >= 0!"); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + Exception exception = assertThrows(expectedException.getClass(), () -> { + testExecutor.run(inputEvents, outputEvents); + }); + String expectedMessage = expectedException.getMessage(); + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } + + @Test + public void testInvalidTimeInterval() { + TestConfiguration configuration = TestConfiguration + .builder() + .configWithDefaultPrefix( + SwingingDoorTrendingFilterProcessor.SDT_TIMESTAMP_FIELD_KEY, + sdtTimestampField + ) + .configWithDefaultPrefix( + SwingingDoorTrendingFilterProcessor.SDT_VALUE_FIELD_KEY, + sdtValueField + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_DEVIATION_KEY, + "10.0" + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MIN_INTERVAL_KEY, + "1000" + ) + .config( + SwingingDoorTrendingFilterProcessor.SDT_COMPRESSION_MAX_INTERVAL_KEY, + "500" + ) + .build(); + + List> inputEvents = new ArrayList<>(); + + List> outputEvents = new ArrayList<>(); + + Exception expectedException = new + SpRuntimeException("Compression Minimum Time Interval should be < Compression Maximum Time Interval!"); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + Exception exception = assertThrows(expectedException.getClass(), () -> { + testExecutor.run(inputEvents, outputEvents); + }); + String expectedMessage = expectedException.getMessage(); + String actualMessage = exception.getMessage(); + assertTrue(actualMessage.contains(expectedMessage)); + } } \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java index 54dc91129a..d283eb04ce 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/textfilter/TestTextFilterProcessor.java @@ -18,101 +18,99 @@ package org.apache.streampipes.processors.filters.jvm.processor.textfilter; -//@RunWith(Parameterized.class) +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + public class TestTextFilterProcessor { -// private static final Logger LOG = LoggerFactory.getLogger(TestTextFilterProcessor.class); -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// return Arrays.asList(new Object[][] { -// {"TestLowerCaseMatch", "keyword", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 1}, -// {"TestUpperCaseMatch", "KEYWORD", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 1}, -// {"TestMixMatch", "KeyWord", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 1}, -// {"TestEmptyMatch", "keYWord", StringOperator.MATCHES, Arrays.asList("keyword", "KeyWord", "KEYWORD"), 0}, -// {"TestMultipleMatch", "KeyWord", StringOperator.MATCHES, -// Arrays.asList("keyword", "KeyWord", "KEYWORD", "KeyWord"), 2}, -// -// {"TestContainsWord", "keyword", StringOperator.CONTAINS, -// Arrays.asList("text contains keyword", "text doesn't have word"), 1}, -// {"TestNotContainsWord", "keyword", StringOperator.CONTAINS, -// Arrays.asList("text is empty", "text doesn't have word"), 0}, -// }); -// } -// -// @org.junit.runners.Parameterized.Parameter -// public String selectedFieldName; -// -// @org.junit.runners.Parameterized.Parameter(1) -// public String keyword; -// -// @org.junit.runners.Parameterized.Parameter(2) -// public StringOperator stringOperator; -// -// @org.junit.runners.Parameterized.Parameter(3) -// public List eventStrings; -// -// @org.junit.runners.Parameterized.Parameter(4) -// public int expectedFilteredTextCount; -// -// @Test -// public void testTextFilter() { -// TextFilterProcessor textFilterProcessor = new TextFilterProcessor(); -// DataProcessorDescription originalGraph = textFilterProcessor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// graph.setInputStreams(Collections -// .singletonList(EventStreamGenerator -// .makeStreamWithProperties(Collections.singletonList(selectedFieldName)))); -// graph.setOutputStream(EventStreamGenerator -// .makeStreamWithProperties(Collections.singletonList(selectedFieldName))); -// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() -// .setActualTopicName("output-topic"); -// -// graph.getStaticProperties().stream() -// .filter(p -> p instanceof MappingPropertyUnary) -// .map(p -> (MappingPropertyUnary) p) -// .filter(p -> p.getInternalName().equals(TextFilterProcessor.MAPPING_PROPERTY_ID)) -// .findFirst().get().setSelectedProperty("s0::" + selectedFieldName); -// ProcessorParams params = new ProcessorParams(graph); -// params.extractor().getStaticPropertyByName(TextFilterProcessor.OPERATION_ID, OneOfStaticProperty.class) -// .getOptions() -// .stream().filter(o -> o.getName().equals(stringOperator.name())).findFirst().get().setSelected(true); -// params.extractor().getStaticPropertyByName(TextFilterProcessor.KEYWORD_ID, FreeTextStaticProperty.class) -// .setValue(keyword); -// StoreEventCollector collector = new StoreEventCollector(); -// -// textFilterProcessor.onInvocation(params, collector, null); -// int result = sendEvents(textFilterProcessor, collector); -// -// LOG.info("Expected filtered text count is {}", expectedFilteredTextCount); -// LOG.info("Actual filtered text count is {}", result); -// assertEquals(expectedFilteredTextCount, result); -// -// } -// -// private int sendEvents(TextFilterProcessor textFilterProcessor, StoreEventCollector collector) { -// List events = makeEvents(); -// for (Event e : events) { -// textFilterProcessor.onEvent(e, collector); -// } -// return collector.getEvents().size(); -// } -// -// private List makeEvents() { -// List events = new ArrayList<>(); -// for (String eventString : eventStrings) { -// events.add(makeEvent(eventString)); -// } -// return events; -// } -// -// private Event makeEvent(String eventString) { -// Map map = Maps.newHashMap(); -// map.put(selectedFieldName, eventString); -// return EventFactory.fromMap(map, -// new SourceInfo("test", "s0"), -// new SchemaInfo(null, Lists.newArrayList())); -// } + TextFilterProcessor processor; + + public static final String FIELD_NAME = "selectedField"; + + @BeforeEach + public void setup(){ + processor = new TextFilterProcessor(); + } + + @ParameterizedTest + @MethodSource("data") + public void test(String keyword, + StringOperator stringOperator, + List eventValues, + List outputEventValues){ + + TestConfiguration configuration = TestConfiguration.builder() + .configWithDefaultPrefix(TextFilterProcessor.MAPPING_PROPERTY_ID, FIELD_NAME) + .config(TextFilterProcessor.OPERATION_ID, stringOperator) + .config(TextFilterProcessor.KEYWORD_ID, keyword) + .build(); + + List> events = new ArrayList<>(); + eventValues.forEach(value->events.add(Map.of(FIELD_NAME, value))); + + List> outputEvents = new ArrayList<>(); + outputEventValues.forEach(value->outputEvents.add(Map.of(FIELD_NAME, value))); + + ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(events, outputEvents); + } + + + static Stream data() { + return Stream.of( + Arguments.of( + "keyword", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of("keyword") + ), + Arguments.of( + "KEYWORD", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of("KEYWORD") + ), + Arguments.of( + "KeyWord", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of("KeyWord") + ), + Arguments.of( + "keYWord", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD"), + List.of() + ), + Arguments.of( + "KeyWord", + StringOperator.MATCHES, + List.of("keyword", "KeyWord", "KEYWORD", "KeyWord"), + List.of("KeyWord", "KeyWord") + ), + Arguments.of( + "keyword", + StringOperator.CONTAINS, + List.of("text contains keyword", "text doesn't have word"), + List.of("text contains keyword") + ), + Arguments.of( + "keyword", + StringOperator.CONTAINS, + List.of("text is empty", "text doesn't have word"), + List.of() + ) + ); + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java index 627eedb250..742417ddd8 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/PipelineElementTemplateVisitor.java @@ -116,7 +116,10 @@ public void visit(MappingPropertyNary mappingPropertyNary) { @Override public void visit(MappingPropertyUnary mappingPropertyUnary) { - // Do nothing, not supported by pipeline element templates + if (hasKey(mappingPropertyUnary)) { + var selectedProperty = getAsString(mappingPropertyUnary); + mappingPropertyUnary.setSelectedProperty(selectedProperty); + } } @Override diff --git a/streampipes-test-utils-executors/pom.xml b/streampipes-test-utils-executors/pom.xml new file mode 100644 index 0000000000..8004169299 --- /dev/null +++ b/streampipes-test-utils-executors/pom.xml @@ -0,0 +1,75 @@ + + + + + + streampipes-parent + org.apache.streampipes + 0.95.0-SNAPSHOT + + 4.0.0 + + streampipes-test-utils-executors + + + + + org.junit.jupiter + junit-jupiter-api + compile + + + org.mockito + mockito-core + compile + + + org.apache.streampipes + streampipes-pipeline-management + 0.95.0-SNAPSHOT + + + org.objenesis + objenesis + + + net.bytebuddy + byte-buddy + + + + + org.apache.streampipes + streampipes-test-utils + 0.95.0-SNAPSHOT + + + org.junit.jupiter + junit-jupiter-params + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + + diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/PrefixStrategy.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/PrefixStrategy.java new file mode 100644 index 0000000000..3cc2a1ee60 --- /dev/null +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/PrefixStrategy.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.test.executors; + +public enum PrefixStrategy { + SAME_PREFIX, ALTERNATE +} diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java new file mode 100644 index 0000000000..f2d87197ad --- /dev/null +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.test.executors; + +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; +import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; +import org.apache.streampipes.manager.template.DataProcessorTemplateHandler; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.model.runtime.EventFactory; +import org.apache.streampipes.model.runtime.SchemaInfo; +import org.apache.streampipes.model.runtime.SourceInfo; +import org.apache.streampipes.model.template.PipelineElementTemplate; +import org.apache.streampipes.model.template.PipelineElementTemplateConfig; +import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; +import org.apache.streampipes.test.generator.EventStreamGenerator; + +import org.junit.jupiter.api.Assertions; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.IntStream; + + +public class ProcessingElementTestExecutor { + + private final IStreamPipesDataProcessor processor; + private TestConfiguration testConfiguration; + private Iterator selectorPrefixes; + private Consumer invocationConfig; + + public ProcessingElementTestExecutor(IStreamPipesDataProcessor processor, TestConfiguration testConfiguration, + Consumer invocationConfig) { + this.processor = processor; + this.testConfiguration = testConfiguration; + this.selectorPrefixes = testConfiguration.getPrefixes().iterator(); + this.invocationConfig = invocationConfig; + } + + public ProcessingElementTestExecutor(IStreamPipesDataProcessor processor, TestConfiguration testConfiguration) { + this.processor = processor; + this.testConfiguration = testConfiguration; + this.selectorPrefixes = testConfiguration.getPrefixes().iterator(); + } + + public ProcessingElementTestExecutor(IStreamPipesDataProcessor processor, + Consumer invocationConfig) { + this.processor = processor; + this.invocationConfig = invocationConfig; + this.selectorPrefixes = List.of("").iterator(); + } + + + /** + * This method is used to run a data processor with a given configuration and a list of input events. + * It then verifies the output events against the expected output events. + * + * @param inputEvents The list of input events to be processed. + * @param expectedOutputEvents The list of expected output events. + */ + public void run( + List> inputEvents, + List> expectedOutputEvents + ) { + + + // initialize the extractor with the provided configuration of the user input + var dataProcessorInvocation = getProcessorInvocation(); + if (invocationConfig != null){ + invocationConfig.accept(dataProcessorInvocation); + } + + var e = getProcessingElementParameterExtractor(dataProcessorInvocation); + var mockParams = Mockito.mock(IDataProcessorParameters.class); + + Mockito.when(mockParams.getModel()).thenReturn(dataProcessorInvocation); + Mockito.when(mockParams.extractor()).thenReturn(e); + + // calls the onPipelineStarted method of the processor to initialize it + processor.onPipelineStarted(mockParams, null, null); + + // mock the output collector to capture the output events and validate the results later + var mockCollector = Mockito.mock(SpOutputCollector.class); + var spOutputCollectorCaptor = ArgumentCaptor.forClass(Event.class); + + + // Iterate over all input events and call the onEvent method of the processor + for (Map inputRawEvent : inputEvents) { + processor.onEvent(getEvent(inputRawEvent), mockCollector); + } + + // Validate the output of the processor + Mockito.verify(mockCollector, + Mockito.times(expectedOutputEvents.size())).collect(spOutputCollectorCaptor.capture()); + var resultingEvents = spOutputCollectorCaptor.getAllValues(); + IntStream.range(0, expectedOutputEvents.size()) + .forEach(i -> Assertions.assertEquals( + expectedOutputEvents.get(i), + resultingEvents.get(i) + .getRaw() + )); + + // validate that the processor is stopped correctly + processor.onPipelineStopped(); + } + + private static ProcessingElementParameterExtractor getProcessingElementParameterExtractor( + DataProcessorInvocation dataProcessorInvocation + ) { + return ProcessingElementParameterExtractor.from(dataProcessorInvocation); + } + + private DataProcessorInvocation getProcessorInvocation() { + var pipelineElementTemplate = getPipelineElementTemplate(); + + var invocation = new DataProcessorInvocation( + processor + .declareConfig() + .getDescription() + ); + + invocation.setOutputStream(EventStreamGenerator.makeEmptyStream()); + + + return new DataProcessorTemplateHandler( + pipelineElementTemplate, + invocation, + true + ) + .applyTemplateOnPipelineElement(); + } + + private PipelineElementTemplate getPipelineElementTemplate() { + var staticProperties = processor + .declareConfig() + .getDescription() + .getStaticProperties(); + + + var configs = new HashMap(); + + staticProperties.forEach(staticProperty -> { + var value = testConfiguration.getFieldConfiguration().get(staticProperty.getInternalName()); + configs.put( + staticProperty.getInternalName(), + new PipelineElementTemplateConfig(true, true, value) + ); + }); + + return new PipelineElementTemplate("name", "description", configs); + } + + private Event getEvent(Map rawEvent) { + + if (!selectorPrefixes.hasNext()){ + selectorPrefixes = testConfiguration.getPrefixes().iterator(); + } + + String selectorPrefix = selectorPrefixes.next(); + + + var sourceInfo = new SourceInfo("", selectorPrefix); + var schemaInfo = new SchemaInfo(null, new ArrayList<>()); + + return EventFactory.fromMap(rawEvent, sourceInfo, schemaInfo); + } +} diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfiguration.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfiguration.java new file mode 100644 index 0000000000..fb2270f3a4 --- /dev/null +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfiguration.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.test.executors; + +import java.util.List; +import java.util.Map; + +public class TestConfiguration { + + private final Map fieldConfiguration; + + private final List prefixes; + + public TestConfiguration(Map fieldConfiguration, + List eventPrefixes) { + this.fieldConfiguration = fieldConfiguration; + this.prefixes = eventPrefixes; + } + + public Map getFieldConfiguration() { + return fieldConfiguration; + } + + public List getPrefixes() { + return prefixes; + } + + public static TestConfigurationBuilder builder() { + return new TestConfigurationBuilder(); + } +} diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java new file mode 100644 index 0000000000..e2260c5366 --- /dev/null +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.test.executors; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestConfigurationBuilder{ + private Map fieldConfiguration = new HashMap<>(); + private List eventPrefixes = List.of(""); + + public TestConfigurationBuilder config(String key, Object value){ + this.fieldConfiguration.put(key, value); + return this; + } + public TestConfigurationBuilder configWithPrefix(String key, Object value, String prefix){ + this.fieldConfiguration.put(key, prefix + "::" + value); + return this; + } + + public TestConfigurationBuilder configWithDefaultPrefix(String key, Object value){ + return this.configWithPrefix(key, value, ""); + } + + public TestConfigurationBuilder config(Map config){ + this.fieldConfiguration = config; + return this; + } + + public TestConfigurationBuilder prefixStrategy(PrefixStrategy strategy){ + this.eventPrefixes = switch (strategy){ + case SAME_PREFIX -> List.of("s0"); + case ALTERNATE -> List.of("s0", "s1"); + }; + return this; + } + + public TestConfigurationBuilder customPrefixStrategy(List eventPrefixes) { + this.eventPrefixes = eventPrefixes; + return this; + } + + public TestConfiguration build(){ + return new TestConfiguration(this.fieldConfiguration, this.eventPrefixes); + } +} \ No newline at end of file