From 08c3bd956200ec35997b5fdd30bbb494b50427ba Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Wed, 3 Apr 2024 23:27:23 +0200 Subject: [PATCH 1/9] feat: Support custom service tags and service selection for adapters --- .../streampipes/commons/constants/Envs.java | 4 +- .../environment/DefaultEnvironment.java | 5 + .../commons/environment/Environment.java | 2 + .../management/AdapterMasterManagement.java | 21 ++- .../management/GuessManagement.java | 19 ++- .../management/WorkerRestClient.java | 4 +- .../management/WorkerUrlProvider.java | 42 ------ .../connect/management/util/WorkerPaths.java | 23 ---- .../connect/adapter/AdapterDescription.java | 15 +++ .../deployment/DeploymentConfiguration.java | 36 +++-- .../extensions/svcdiscovery/SpServiceTag.java | 22 ++++ .../svcdiscovery/SpServiceTagPrefix.java | 1 + .../IExtensionsServiceEndpointGenerator.java | 45 +++++++ .../manager/assets/AssetFetcher.java | 2 +- .../ExtensionsServiceEndpointGenerator.java | 76 ++++++----- .../execution/task/DiscoverEndpointsTask.java | 9 +- .../manager/health/PipelineHealthCheck.java | 2 +- .../CustomTransformOutputSchemaGenerator.java | 6 +- .../manager/preview/PipelinePreview.java | 9 +- .../ContainerProvidedOptionsHandler.java | 2 +- .../rest/impl/ServiceTagResource.java | 43 ++++++ .../impl/connect/DescriptionResource.java | 19 ++- .../connect/RuntimeResolvableResource.java | 19 ++- .../svcdiscovery/api/ISpServiceDiscovery.java | 9 +- .../svcdiscovery/SpServiceDiscoveryCore.java | 27 ++-- .../extensions/CustomServiceTagResolver.java | 49 +++++++ .../StreamPipesExtensionsServiceBase.java | 2 + .../src/lib/apis/service-tags.service.ts | 46 +++++++ .../src/lib/model/gen/streampipes-model.ts | 49 +++---- .../platform-services/src/public-api.ts | 1 + ...adapter-deployment-settings.component.html | 78 +++++++++++ ...adapter-deployment-settings.component.scss | 0 .../adapter-deployment-settings.component.ts | 123 ++++++++++++++++++ .../adapter-settings.component.html | 11 ++ ui/src/app/connect/connect.module.ts | 2 + 35 files changed, 636 insertions(+), 187 deletions(-) delete mode 100644 streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java rename streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java => streampipes-model/src/main/java/org/apache/streampipes/model/deployment/DeploymentConfiguration.java (50%) create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java create mode 100644 streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java create mode 100644 streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java create mode 100644 ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts create mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html create mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.scss create mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 4c58485e27..489c54d5a3 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -97,7 +97,9 @@ public enum Envs { SP_NATS_HOST("SP_NATS_HOST", "nats"), SP_NATS_PORT("SP_NATS_PORT", "4222"), - SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"); + SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"), + + SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""); private final String envVariableName; private String defaultValue; diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index 5e613e0909..3fef8c55f7 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -274,4 +274,9 @@ public StringEnvironmentVariable getPulsarUrl() { return new StringEnvironmentVariable(Envs.SP_PULSAR_URL); } + @Override + public StringEnvironmentVariable getCustomServiceTags() { + return new StringEnvironmentVariable(Envs.SP_SERVICE_TAGS); + } + } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index 092e6992d6..ceb2d2d1f2 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -135,4 +135,6 @@ public interface Environment { StringEnvironmentVariable getPulsarUrl(); + StringEnvironmentVariable getCustomServiceTags(); + } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index ec87909021..56220fde37 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -23,7 +23,7 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics; import org.apache.streampipes.connect.management.util.GroundingUtils; -import org.apache.streampipes.connect.management.util.WorkerPaths; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsLogProvider; import org.apache.streampipes.manager.verification.DataStreamVerifier; import org.apache.streampipes.model.SpDataStream; @@ -32,12 +32,11 @@ import org.apache.streampipes.resource.management.AdapterResourceManager; import org.apache.streampipes.resource.management.DataStreamResourceManager; import org.apache.streampipes.storage.api.IAdapterStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URISyntaxException; import java.util.List; import java.util.NoSuchElementException; @@ -54,12 +53,12 @@ public class AdapterMasterManagement { private final DataStreamResourceManager dataStreamResourceManager; - public AdapterMasterManagement(IAdapterStorage adapterStorage, + public AdapterMasterManagement(IAdapterStorage adapterInstanceStorage, AdapterResourceManager adapterResourceManager, DataStreamResourceManager dataStreamResourceManager, AdapterMetrics adapterMetrics ) { - this.adapterInstanceStorage = adapterStorage; + this.adapterInstanceStorage = adapterInstanceStorage; this.adapterMetrics = adapterMetrics; this.adapterResourceManager = adapterResourceManager; this.dataStreamResourceManager = dataStreamResourceManager; @@ -164,7 +163,11 @@ public void startStreamAdapter(String elementId) throws AdapterException { try { // Find endpoint to start adapter on - var baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId()); + var baseUrl = new ExtensionsServiceEndpointGenerator().getEndpointBaseUrl( + ad.getAppId(), + SpServiceUrlProvider.ADAPTER, + ad.getDeploymentConfiguration().getDesiredServiceTags() + ); // Update selected endpoint URL of adapter ad.setSelectedEndpointUrl(baseUrl); @@ -177,7 +180,7 @@ public void startStreamAdapter(String elementId) throws AdapterException { adapterMetrics.register(ad.getElementId(), ad.getName()); LOG.info("Started adapter " + elementId + " on: " + baseUrl); - } catch (NoServiceEndpointsAvailableException | URISyntaxException e) { + } catch (NoServiceEndpointsAvailableException e) { throw new AdapterException("Could not start adapter due to unavailable service endpoint", e); } } @@ -192,8 +195,4 @@ private void installDataSource(SpDataStream stream, throw new AdapterException(); } } - - private IAdapterStorage getAdapterInstanceStorage() { - return StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(); - } } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java index 104bae82e0..472cb7eca8 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java @@ -24,11 +24,15 @@ import org.apache.streampipes.connect.management.AdapterEventPreviewPipeline; import org.apache.streampipes.connect.management.util.WorkerPaths; import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.AdapterEventPreview; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.serializers.json.JacksonSerializer; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -39,21 +43,25 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Set; public class GuessManagement { private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class); - private final WorkerUrlProvider workerUrlProvider; + private final IExtensionsServiceEndpointGenerator endpointGenerator; private final ObjectMapper objectMapper; public GuessManagement() { - this.workerUrlProvider = new WorkerUrlProvider(); + this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); this.objectMapper = JacksonSerializer.getObjectMapper(); } public GuessSchema guessSchema(AdapterDescription adapterDescription) throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException { - var workerUrl = getWorkerUrl(adapterDescription.getAppId()); + var workerUrl = getWorkerUrl( + adapterDescription.getAppId(), + adapterDescription.getDeploymentConfiguration().getDesiredServiceTags() + ); var description = objectMapper.writeValueAsString(adapterDescription); LOG.info("Guess schema at: " + workerUrl); @@ -72,8 +80,9 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription) } } - private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException { - var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId); + private String getWorkerUrl(String appId, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + var baseUrl = endpointGenerator.getEndpointBaseUrl(appId, SpServiceUrlProvider.ADAPTER, customServiceTags); return baseUrl + WorkerPaths.getGuessSchemaPath(); } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index c715459070..4104c82a4f 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -52,10 +52,10 @@ public class WorkerRestClient { private static final Logger LOG = LoggerFactory.getLogger(WorkerRestClient.class); - public static void invokeStreamAdapter(String endpointUrl, + public static void invokeStreamAdapter(String baseUrl, String elementId) throws AdapterException { var adapterStreamDescription = getAndDecryptAdapter(elementId); - var url = endpointUrl + WorkerPaths.getStreamInvokePath(); + var url = baseUrl + WorkerPaths.getStreamInvokePath(); startAdapter(url, adapterStreamDescription); updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true); diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java deleted file mode 100644 index b9fc970ab4..0000000000 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.connect.management.management; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -public class WorkerUrlProvider { - - - public WorkerUrlProvider() { - } - - public String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException { - return getEndpointGenerator(appId).getEndpointResourceUrl(); - } - - public String getWorkerBaseUrl(String appId) throws NoServiceEndpointsAvailableException { - return getEndpointGenerator(appId).getEndpointBaseUrl(); - } - - private ExtensionsServiceEndpointGenerator getEndpointGenerator(String appId) { - return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.ADAPTER); - } - -} diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java index 29d89ebd1b..7bd2b0c1ed 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java @@ -17,13 +17,6 @@ */ package org.apache.streampipes.connect.management.util; -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import java.net.URI; -import java.net.URISyntaxException; - public class WorkerPaths { private static final String WorkerMainPath = "/api/v1/worker"; @@ -36,14 +29,6 @@ public static String getStreamStopPath() { return WorkerMainPath + "/stream/stop"; } - public static String getSetInvokePath() { - return WorkerMainPath + "/set/invoke"; - } - - public static String getSetStopPath() { - return WorkerMainPath + "/set/stop"; - } - public static String getRunningAdaptersPath() { return WorkerMainPath + "/running"; } @@ -56,12 +41,4 @@ public static String getGuessSchemaPath() { return WorkerMainPath + "/guess/schema"; } - public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException { - SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER; - String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl(); - URI uri = new URI(endpointUrl); - return uri.getScheme() + "://" + uri.getAuthority(); - } - - } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java index 5fd6b21b3f..d8b549211d 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java @@ -25,6 +25,7 @@ import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription; import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription; import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription; +import org.apache.streampipes.model.deployment.DeploymentConfiguration; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.shared.annotation.TsModel; @@ -55,6 +56,8 @@ public class AdapterDescription extends VersionedNamedStreamPipesEntity { // Is used to store where the adapter is running to stop it private String selectedEndpointUrl; + private DeploymentConfiguration deploymentConfiguration; + /** * This is used to identify all the service within the service group the adapter can be invoked in */ @@ -72,6 +75,7 @@ public AdapterDescription() { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new DeploymentConfiguration(); } public AdapterDescription(int version) { @@ -81,6 +85,7 @@ public AdapterDescription(int version) { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new DeploymentConfiguration(); this.setVersion(version); } @@ -89,6 +94,7 @@ public AdapterDescription(String elementId, String name, String description) { this.rules = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new DeploymentConfiguration(); } @@ -109,6 +115,7 @@ public AdapterDescription(AdapterDescription other) { this.dataStream = new SpDataStream(other.getDataStream()); } this.running = other.isRunning(); + this.deploymentConfiguration = other.getDeploymentConfiguration(); } public String getRev() { @@ -259,4 +266,12 @@ public boolean isRunning() { public void setRunning(boolean running) { this.running = running; } + + public DeploymentConfiguration getDeploymentConfiguration() { + return deploymentConfiguration; + } + + public void setDeploymentConfiguration(DeploymentConfiguration deploymentConfiguration) { + this.deploymentConfiguration = deploymentConfiguration; + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/DeploymentConfiguration.java similarity index 50% rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/deployment/DeploymentConfiguration.java index 6e76fedfe7..3e44f8f5e6 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/DeploymentConfiguration.java @@ -16,17 +16,35 @@ * */ -package org.apache.streampipes.manager.execution.endpoint; +package org.apache.streampipes.model.deployment; -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; -public class ExtensionsServiceEndpointProvider { +import java.util.HashSet; +import java.util.Set; - public String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); +public class DeploymentConfiguration { + + private Set desiredServiceTags; + private String selectedEndpointUrl; + + public DeploymentConfiguration() { + this.desiredServiceTags = new HashSet<>(); + } + + public Set getDesiredServiceTags() { + return desiredServiceTags; + } + + public void setDesiredServiceTags(Set desiredServiceTags) { + this.desiredServiceTags = desiredServiceTags; + } + + public String getSelectedEndpointUrl() { + return selectedEndpointUrl; + } + + public void setSelectedEndpointUrl(String selectedEndpointUrl) { + this.selectedEndpointUrl = selectedEndpointUrl; } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java index 71b7fe428e..572fc4a7f6 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java @@ -17,6 +17,11 @@ */ package org.apache.streampipes.model.extensions.svcdiscovery; +import org.apache.streampipes.model.shared.annotation.TsModel; + +import java.util.Objects; + +@TsModel public class SpServiceTag { private static final String COLON = ":"; @@ -56,4 +61,21 @@ public String getValue() { public void setValue(String value) { this.value = value; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpServiceTag that = (SpServiceTag) o; + return prefix == that.prefix && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(prefix, value); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java index f412bdbe6f..568a8295cb 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java @@ -19,6 +19,7 @@ public enum SpServiceTagPrefix { SP_GROUP, + CUSTOM, ADAPTER, DATA_STREAM, DATA_PROCESSOR, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java new file mode 100644 index 0000000000..7414c7bd26 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.api.extensions; + +import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + +import java.util.Collections; +import java.util.Set; + +public interface IExtensionsServiceEndpointGenerator { + + String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException; + + default String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider) + throws NoServiceEndpointsAvailableException { + return getEndpointResourceUrl(appId, spServiceUrlProvider, Collections.emptySet()); + } + + String getEndpointBaseUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException; +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java index 553762018e..25c3c9accf 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java @@ -40,7 +40,7 @@ public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider, } public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException { - String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, spServiceUrlProvider).getEndpointResourceUrl(); + String endpointUrl = new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, spServiceUrlProvider); return Request .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX) .execute() diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java index cb2d28408a..ae1b8227c7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java @@ -18,7 +18,8 @@ package org.apache.streampipes.manager.execution.endpoint; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; @@ -26,59 +27,66 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.stream.Stream; -public class ExtensionsServiceEndpointGenerator { +public class ExtensionsServiceEndpointGenerator implements IExtensionsServiceEndpointGenerator { private static final Logger LOG = LoggerFactory.getLogger(ExtensionsServiceEndpointGenerator.class); - private final String appId; - private final SpServiceUrlProvider spServiceUrlProvider; - - public ExtensionsServiceEndpointGenerator(String appId, - SpServiceUrlProvider spServiceUrlProvider) { - this.appId = appId; - this.spServiceUrlProvider = spServiceUrlProvider; + public ExtensionsServiceEndpointGenerator() { } - public ExtensionsServiceEndpointGenerator(NamedStreamPipesEntity entity) { - this.appId = entity.getAppId(); - this.spServiceUrlProvider = ExtensionsServiceEndpointUtils.getPipelineElementType(entity); + public String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException { + return spServiceUrlProvider.getInvocationUrl(selectService(appId, spServiceUrlProvider, customServiceTags), appId); } - public String getEndpointResourceUrl() throws NoServiceEndpointsAvailableException { - return spServiceUrlProvider.getInvocationUrl(selectService(), appId); + public String getEndpointBaseUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + return selectService(appId, spServiceUrlProvider, customServiceTags); } - public String getEndpointBaseUrl() throws NoServiceEndpointsAvailableException { - return selectService(); + private String selectService(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + List serviceEndpoints = getServiceEndpoints(appId, spServiceUrlProvider, customServiceTags); + if (!serviceEndpoints.isEmpty()) { + return serviceEndpoints.get(0); + } else { + LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, + spServiceUrlProvider.getServiceTag(appId).asString()); + throw new NoServiceEndpointsAvailableException( + "Could not find any matching service endpoints - are all software components running?"); + } } - private List getServiceEndpoints() { + private List getServiceEndpoints(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) { return SpServiceDiscovery .getServiceDiscovery() .getServiceEndpoints( DefaultSpServiceTypes.EXT, true, - Collections - .singletonList( - this.spServiceUrlProvider - .getServiceTag(appId) - .asString() - ) + getDesiredServiceTags(appId, spServiceUrlProvider, customServiceTags) ); } - private String selectService() throws NoServiceEndpointsAvailableException { - List serviceEndpoints = getServiceEndpoints(); - if (!serviceEndpoints.isEmpty()) { - return getServiceEndpoints().get(0); - } else { - LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, - this.spServiceUrlProvider.getServiceTag(appId).asString()); - throw new NoServiceEndpointsAvailableException( - "Could not find any matching service endpoints - are all software components running?"); - } + private List getDesiredServiceTags(String appId, + SpServiceUrlProvider serviceUrlProvider, + Set customServiceTags) { + return Stream.concat( + Stream.of( + serviceUrlProvider.getServiceTag(appId) + ), + customServiceTags.stream() + ) + .map(SpServiceTag::asString) + .toList(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java index 896c704f04..40da134aa1 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java @@ -20,7 +20,8 @@ import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.manager.execution.PipelineExecutionInfo; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointProvider; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.Pipeline; @@ -69,7 +70,11 @@ private void applyEndpointAndPipeline(String pipelineId, private String findSelectedEndpoint(InvocableStreamPipesEntity pipelineElement) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(pipelineElement); + return new ExtensionsServiceEndpointGenerator() + .getEndpointResourceUrl( + pipelineElement.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement) + ); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java index 40216cc87b..84ab806c30 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java @@ -139,7 +139,7 @@ public void checkAndRestorePipelineElements() { private String findEndpointUrl(InvocableStreamPipesEntity graph) throws NoServiceEndpointsAvailableException { SpServiceUrlProvider serviceUrlProvider = ExtensionsServiceEndpointUtils.getPipelineElementType(graph); - return new ExtensionsServiceEndpointGenerator(graph.getAppId(), serviceUrlProvider).getEndpointResourceUrl(); + return new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(graph.getAppId(), serviceUrlProvider); } private boolean shouldRetry(String instanceId) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java index 936adabb49..77551acb4a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java @@ -64,8 +64,10 @@ public Tuple2 buildFromTwoStreams(Sp private EventSchema makeRequest() { try { String httpRequestBody = JacksonSerializer.getObjectMapper().writeValueAsString(dataProcessorInvocation); - String endpointUrl = new ExtensionsServiceEndpointGenerator(dataProcessorInvocation.getAppId(), - SpServiceUrlProvider.DATA_PROCESSOR).getEndpointResourceUrl(); + String endpointUrl = new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl( + dataProcessorInvocation.getAppId(), + SpServiceUrlProvider.DATA_PROCESSOR + ); Response httpResp = Request.Post(endpointUrl + "/output").bodyString(httpRequestBody, ContentType .APPLICATION_JSON).execute(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java index 1113c9a4c8..903ecbeb75 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java @@ -80,10 +80,11 @@ public String getPipelineElementPreview(String previewId, } private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); + return new ExtensionsServiceEndpointGenerator() + .getEndpointResourceUrl( + g.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(g) + ); } private void invokeGraphs(List graphs) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java index fd1957f5cc..16acbba678 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java @@ -55,6 +55,6 @@ private RuntimeOptionsResponse handleResponse(Response httpResp) throws JsonSynt private String getEndpointUrl(String appId) throws NoServiceEndpointsAvailableException { SpServiceUrlProvider provider = ExtensionsServiceEndpointUtils.getPipelineElementType(appId); - return new ExtensionsServiceEndpointGenerator(appId, provider).getEndpointResourceUrl() + "/configurations"; + return new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, provider) + "/configurations"; } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java new file mode 100644 index 0000000000..536933f950 --- /dev/null +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.impl; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; +import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; +import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Set; + +@RestController +@RequestMapping("/api/v2/service-tags") +public class ServiceTagResource extends AbstractAuthGuardedRestResource { + + private final ISpServiceDiscovery serviceDiscovery = SpServiceDiscovery.getServiceDiscovery(); + + @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE) + public Set getCustomServiceTags() { + return serviceDiscovery.getCustomServiceTags(true); + } +} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java index dd40dd8541..ab49f32a9c 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java @@ -22,11 +22,14 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.management.DescriptionManagement; -import org.apache.streampipes.connect.management.management.WorkerUrlProvider; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.DeleteMapping; @@ -43,11 +46,11 @@ public class DescriptionResource extends AbstractAdapterResource { private static final Logger LOG = LoggerFactory.getLogger(DescriptionResource.class); - private final WorkerUrlProvider workerUrlProvider; + private final IExtensionsServiceEndpointGenerator endpointGenerator; public DescriptionResource() { super(DescriptionManagement::new); - workerUrlProvider = new WorkerUrlProvider(); + endpointGenerator = new ExtensionsServiceEndpointGenerator(); } @GetMapping(path = "/adapters", produces = MediaType.APPLICATION_JSON_VALUE) @@ -65,7 +68,7 @@ public ResponseEntity getAdapterAssets(@PathVariable("id") String id) { Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getAssets(workerUrl); } @@ -93,7 +96,7 @@ public ResponseEntity getAdapterIconAsset(@PathVariable("id") String id) { Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getIconAsset(workerUrl); } @@ -120,7 +123,7 @@ public ResponseEntity getAdapterDocumentationAsset(@PathVariable("id") String Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getDocumentationAsset(workerUrl); } @@ -148,4 +151,8 @@ public ResponseEntity deleteAdapter(@PathVariable("adapterId") String adapter return badRequest(e); } } + + private String getServiceResourceUrl(String appId) throws NoServiceEndpointsAvailableException { + return endpointGenerator.getEndpointResourceUrl(appId, SpServiceUrlProvider.ADAPTER); + } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index 6266439bb8..dc7d8c816d 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -24,15 +24,18 @@ import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; import org.apache.streampipes.connect.management.management.WorkerRestClient; -import org.apache.streampipes.connect.management.management.WorkerUrlProvider; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; @@ -41,23 +44,25 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.Collections; + @RestController @RequestMapping("/api/v2/connect/master/resolvable") public class RuntimeResolvableResource extends AbstractAdapterResource { private static final Logger LOG = LoggerFactory.getLogger(RuntimeResolvableResource.class); - private final WorkerUrlProvider workerUrlProvider; + private final IExtensionsServiceEndpointGenerator endpointGenerator; public RuntimeResolvableResource() { super(() -> new WorkerAdministrationManagement( StorageDispatcher.INSTANCE.getNoSqlStore() - .getAdapterInstanceStorage(), + .getAdapterInstanceStorage(), AdapterMetricsManager.INSTANCE.getAdapterMetrics(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams() )); - this.workerUrlProvider = new WorkerUrlProvider(); + this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); } @PostMapping( @@ -68,7 +73,11 @@ public ResponseEntity fetchConfigurations(@PathVariable("id") String appId, @RequestBody RuntimeOptionsRequest runtimeOptionsRequest) { try { - String workerEndpoint = workerUrlProvider.getWorkerBaseUrl(appId); + String workerEndpoint = endpointGenerator.getEndpointBaseUrl( + appId, + SpServiceUrlProvider.ADAPTER, + Collections.emptySet() + ); RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, appId, runtimeOptionsRequest); return ok(result); diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java index 4f4350f7d8..1b648cef8a 100644 --- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java +++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java @@ -17,16 +17,19 @@ */ package org.apache.streampipes.svcdiscovery.api; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; + import java.util.List; +import java.util.Set; public interface ISpServiceDiscovery { /** - * Get active pipeline element service endpoints + * Get custom service tags * - * @return list of pipeline element endpoints + * @return set of service tags */ - List getActivePipelineElementEndpoints(); + Set getCustomServiceTags(boolean restrictToHealthy); /** * Get service endpoints diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java index b887dbd976..19f1479483 100644 --- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java +++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java @@ -21,10 +21,10 @@ import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery; -import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +47,21 @@ public SpServiceDiscoveryCore() { } @Override - public List getActivePipelineElementEndpoints() { - LOG.info("Discovering active pipeline element service endpoints"); - return getServiceEndpoints(DefaultSpServiceTypes.EXT, - true, - List.of()); + public Set getCustomServiceTags(boolean restrictToHealthy) { + var activeServices = findServices(0); + return activeServices + .stream() + .filter(service -> !restrictToHealthy || service.getStatus() != SpServiceStatus.UNHEALTHY) + .flatMap(service -> service.getTags().stream()) + .filter(serviceTag -> serviceTag.getPrefix() == SpServiceTagPrefix.CUSTOM) + .collect(Collectors.toSet()); } @Override public List getServiceEndpoints(String serviceGroup, boolean restrictToHealthy, List filterByTags) { - List activeServices = findService(0); + List activeServices = findServices(0); return activeServices .stream() @@ -73,9 +76,9 @@ private String makeServiceUrl(SpServiceRegistration service) { return service.getServiceUrl(); } -/** -* Checks if all the tags specified in the filter are supported by the service. -*/ + /** + * Checks if all the tags specified in the filter are supported by the service. + */ private boolean allFiltersSupported(SpServiceRegistration service, List filterByTags) { if (filterByTags.isEmpty()) { @@ -88,7 +91,7 @@ private boolean allFiltersSupported(SpServiceRegistration service, return serviceTags.containsAll(filterByTags); } - private List findService(int retryCount) { + private List findServices(int retryCount) { var services = serviceStorage.getAll(); if (services.isEmpty()) { if (retryCount < MAX_RETRIES) { @@ -96,7 +99,7 @@ private List findService(int retryCount) { retryCount++; LOG.info("Could not find any extensions services, retrying ({}/{})", retryCount, MAX_RETRIES); TimeUnit.MILLISECONDS.sleep(1000); - return findService(retryCount); + return findServices(retryCount); } catch (InterruptedException e) { LOG.warn("Could not find a service currently due to exception {}", e.getMessage()); return Collections.emptyList(); diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java new file mode 100644 index 0000000000..4033789c8c --- /dev/null +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.extensions; + +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +public class CustomServiceTagResolver { + + private final Environment env; + + public CustomServiceTagResolver(Environment env) { + this.env = env; + } + + public Set getCustomServiceTags() { + if (env.getCustomServiceTags().exists()) { + var serviceTags = env.getCustomServiceTags().getValue(); + return Arrays.stream( + serviceTags.split(",") + ).map(serviceTagString -> SpServiceTag.create(SpServiceTagPrefix.CUSTOM, serviceTagString)) + .collect(Collectors.toSet()); + } else { + return Collections.emptySet(); + } + } +} diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java index 81775f2271..0bdb9c63ed 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java @@ -19,6 +19,7 @@ package org.apache.streampipes.service.extensions; import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; @@ -130,6 +131,7 @@ protected Set getServiceTags(Set extensi DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup())); } tags.addAll(getExtensionsServiceTags(extensions)); + tags.addAll(new CustomServiceTagResolver(Environments.getEnvironment()).getCustomServiceTags()); return tags; } diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts new file mode 100644 index 0000000000..2dd4755377 --- /dev/null +++ b/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Injectable } from '@angular/core'; +import { HttpClient } from '@angular/common/http'; +import { PlatformServicesCommons } from './commons.service'; +import { Observable } from 'rxjs'; +import { SpServiceTag } from '../model/gen/streampipes-model'; +import { map } from 'rxjs/operators'; + +@Injectable({ + providedIn: 'root', +}) +export class ServiceTagService { + constructor( + private http: HttpClient, + private platformServicesCommons: PlatformServicesCommons, + ) {} + + getCustomServiceTags(): Observable { + return this.http + .get(`${this.platformServicesCommons.apiBasePath}/service-tags`) + .pipe( + map(response => { + return (response as any[]).map(p => + SpServiceTag.fromData(p), + ); + }), + ); + } +} diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index b3b8efeadf..0e1b55b5c8 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-03-28 15:50:50. +// Generated using typescript-generator version 3.2.1263 on 2024-04-02 22:53:18. export class NamedStreamPipesEntity { '@class': @@ -112,6 +112,7 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { 'correspondingServiceGroup': string; 'createdAt': number; 'dataStream': SpDataStream; + 'deploymentConfiguration': DeploymentConfiguration; 'eventGrounding': EventGrounding; 'eventSchema': EventSchema; 'icon': string; @@ -142,6 +143,9 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { instance.correspondingServiceGroup = data.correspondingServiceGroup; instance.createdAt = data.createdAt; instance.dataStream = SpDataStream.fromData(data.dataStream); + instance.deploymentConfiguration = DeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.eventGrounding = EventGrounding.fromData(data.eventGrounding); instance.eventSchema = EventSchema.fromData(data.eventSchema); instance.icon = data.icon; @@ -1364,6 +1368,26 @@ export class DeleteRuleDescription extends SchemaTransformationRuleDescription { } } +export class DeploymentConfiguration { + desiredServiceTags: SpServiceTag[]; + selectedEndpointUrl: string; + + static fromData( + data: DeploymentConfiguration, + target?: DeploymentConfiguration, + ): DeploymentConfiguration { + if (!data) { + return data; + } + const instance = target || new DeploymentConfiguration(); + instance.desiredServiceTags = __getCopyArrayFn(SpServiceTag.fromData)( + data.desiredServiceTags, + ); + instance.selectedEndpointUrl = data.selectedEndpointUrl; + return instance; + } +} + export class DomainStaticProperty extends StaticProperty { '@class': 'org.apache.streampipes.model.staticproperty.DomainStaticProperty'; 'requiredClass': string; @@ -2521,28 +2545,6 @@ export class PageResult extends DataSeries { } } -export class PersistedDataStream { - measureName: string; - pipelineId: string; - pipelineName: string; - schema: EventSchema; - - static fromData( - data: PersistedDataStream, - target?: PersistedDataStream, - ): PersistedDataStream { - if (!data) { - return data; - } - const instance = target || new PersistedDataStream(); - instance.measureName = data.measureName; - instance.pipelineId = data.pipelineId; - instance.pipelineName = data.pipelineName; - instance.schema = EventSchema.fromData(data.schema); - return instance; - } -} - export class Pipeline extends ElementComposition { _id: string; _rev: string; @@ -4198,6 +4200,7 @@ export type SpServiceStatus = export type SpServiceTagPrefix = | 'SP_GROUP' + | 'CUSTOM' | 'ADAPTER' | 'DATA_STREAM' | 'DATA_PROCESSOR' diff --git a/ui/projects/streampipes/platform-services/src/public-api.ts b/ui/projects/streampipes/platform-services/src/public-api.ts index 669bf4f238..dddd4ad0f8 100644 --- a/ui/projects/streampipes/platform-services/src/public-api.ts +++ b/ui/projects/streampipes/platform-services/src/public-api.ts @@ -45,6 +45,7 @@ export * from './lib/apis/pipeline-element-template.service'; export * from './lib/apis/pipeline-monitoring.service'; export * from './lib/apis/pipeline-template.service'; export * from './lib/apis/semantic-types-rest.service'; +export * from './lib/apis/service-tags.service'; export * from './lib/apis/user.service'; export * from './lib/apis/user-admin.service'; export * from './lib/apis/user-group.service'; diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html new file mode 100644 index 0000000000..64d30f5980 --- /dev/null +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html @@ -0,0 +1,78 @@ + + +
+ + Any available service + + Restrict to service tags + + + +
+ + Service Tags + + + {{ serviceTag.value }} + + + + + + + {{ serviceTag }} + + + +
+
diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.scss b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.scss new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts new file mode 100644 index 0000000000..2047b223d0 --- /dev/null +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, ElementRef, Input, OnInit, ViewChild } from '@angular/core'; +import { + DeploymentConfiguration, + ServiceTagService, + SpServiceTag, +} from '@streampipes/platform-services'; +import { Observable } from 'rxjs'; +import { FormControl } from '@angular/forms'; +import { COMMA, ENTER } from '@angular/cdk/keycodes'; +import { MatChipInputEvent } from '@angular/material/chips'; +import { map, startWith } from 'rxjs/operators'; +import { MatAutocompleteSelectedEvent } from '@angular/material/autocomplete'; +import { MatRadioChange } from '@angular/material/radio'; + +@Component({ + selector: 'sp-adapter-deployment-settings', + templateUrl: './adapter-deployment-settings.component.html', + styleUrls: ['./adapter-deployment-settings.component.scss'], +}) +export class SpAdapterDeploymentSettingsComponent implements OnInit { + @Input() + deploymentConfiguration: DeploymentConfiguration; + + availableServiceTags: SpServiceTag[] = []; + availableServiceTagValues: string[] = []; + + deploymentMode = 'all'; + + separatorKeysCodes: number[] = [ENTER, COMMA]; + serviceTagCtrl = new FormControl(''); + filteredServiceTags: Observable; + + @ViewChild('serviceTagInput') serviceTagInput: ElementRef; + + constructor(private serviceTagService: ServiceTagService) { + this.filteredServiceTags = this.serviceTagCtrl.valueChanges.pipe( + startWith(null), + map((serviceTagValue: string | null) => { + return serviceTagValue + ? this._filter(serviceTagValue) + : this.availableServiceTagValues.slice(); + }), + ); + } + + ngOnInit(): void { + if (this.deploymentConfiguration.desiredServiceTags.length > 0) { + this.deploymentMode = 'filter'; + } + this.serviceTagService.getCustomServiceTags().subscribe(res => { + this.availableServiceTags = res; + this.availableServiceTagValues = res.map(st => st.value); + }); + } + + add(event: MatChipInputEvent): void { + const value = (event.value || '').trim(); + + if (value) { + this.deploymentConfiguration.desiredServiceTags.push( + this.findTag(value), + ); + } + + event.chipInput!.clear(); + + this.serviceTagCtrl.setValue(null); + } + + findTag(value: string): SpServiceTag { + return this.availableServiceTags.find(st => st.value === value); + } + + remove(serviceTag: string): void { + const index = this.deploymentConfiguration.desiredServiceTags.findIndex( + st => st.value === serviceTag, + ); + + if (index >= 0) { + this.deploymentConfiguration.desiredServiceTags.splice(index, 1); + } + } + + selected(event: MatAutocompleteSelectedEvent): void { + this.deploymentConfiguration.desiredServiceTags.push( + this.findTag(event.option.viewValue), + ); + this.serviceTagInput.nativeElement.value = ''; + this.serviceTagCtrl.setValue(null); + } + + private _filter(value: string): string[] { + const filterValue = value.toLowerCase(); + + return this.availableServiceTagValues.filter(st => + st.toLowerCase().includes(filterValue), + ); + } + + handleSelectionChange(event: MatRadioChange): void { + if (event.value === 'all') { + this.deploymentConfiguration.desiredServiceTags = []; + } + } +} diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html index 4b3be6f469..82bb3806b3 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html @@ -18,6 +18,17 @@
+ + + + Date: Wed, 3 Apr 2024 23:40:30 +0200 Subject: [PATCH 2/9] Remove empty scss file --- .../adapter-deployment-settings.component.scss | 0 .../adapter-deployment-settings.component.ts | 1 - 2 files changed, 1 deletion(-) delete mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.scss diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.scss b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.scss deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts index 2047b223d0..b3d8143ccd 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts @@ -33,7 +33,6 @@ import { MatRadioChange } from '@angular/material/radio'; @Component({ selector: 'sp-adapter-deployment-settings', templateUrl: './adapter-deployment-settings.component.html', - styleUrls: ['./adapter-deployment-settings.component.scss'], }) export class SpAdapterDeploymentSettingsComponent implements OnInit { @Input() From 0684669d1e693ab4b9839f912bce049a2dc9995b Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Thu, 4 Apr 2024 08:22:31 +0200 Subject: [PATCH 3/9] Support service selection in runtime resolvable static properties --- .../management/management/WorkerRestClient.java | 4 ++-- .../model/runtime/RuntimeOptionsRequest.java | 11 +++++++++++ .../rest/impl/connect/DescriptionResource.java | 1 - .../rest/impl/connect/RuntimeResolvableResource.java | 8 +++----- .../src/lib/model/gen/streampipes-model.ts | 6 +++++- .../adapter-settings/adapter-settings.component.html | 3 +++ .../configuration-group.component.html | 1 + .../configuration-group.component.ts | 7 ++++++- .../static-alternatives.component.html | 1 + .../static-alternatives.component.ts | 5 +++++ .../static-collection.component.html | 1 + .../static-collection/static-collection.component.ts | 6 +++++- .../static-group/static-group.component.html | 2 ++ .../static-group/static-group.component.ts | 10 ++++++++-- .../static-properties/static-property.component.html | 7 +++++++ .../static-properties/static-property.component.ts | 4 ++++ .../base-runtime-resolvable-input.ts | 6 ++++++ 17 files changed, 70 insertions(+), 13 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index 4104c82a4f..91558045e2 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -127,11 +127,11 @@ private static HttpResponse triggerPost(String url, return request.execute().returnResponse(); } - public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, + public static RuntimeOptionsResponse getConfiguration(String baseUrl, String appId, RuntimeOptionsRequest runtimeOptionsRequest) throws AdapterException, SpConfigurationException { - String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId); + String url = baseUrl + WorkerPaths.getRuntimeResolvablePath(appId); try { String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest); diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java index 0f5efe2411..1aee67d79d 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java @@ -18,6 +18,7 @@ package org.apache.streampipes.model.runtime; import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.deployment.DeploymentConfiguration; import org.apache.streampipes.model.shared.annotation.TsModel; import org.apache.streampipes.model.staticproperty.StaticProperty; @@ -37,6 +38,8 @@ public class RuntimeOptionsRequest { protected List inputStreams; + protected DeploymentConfiguration deploymentConfiguration; + private String belongsTo; public RuntimeOptionsRequest() { @@ -95,4 +98,12 @@ public String getBelongsTo() { public void setBelongsTo(String belongsTo) { this.belongsTo = belongsTo; } + + public DeploymentConfiguration getDeploymentConfiguration() { + return deploymentConfiguration; + } + + public void setDeploymentConfiguration(DeploymentConfiguration deploymentConfiguration) { + this.deploymentConfiguration = deploymentConfiguration; + } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java index ab49f32a9c..7d8f562034 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java @@ -29,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.DeleteMapping; diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index dc7d8c816d..93b5efd811 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -44,8 +44,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.Collections; - @RestController @RequestMapping("/api/v2/connect/master/resolvable") public class RuntimeResolvableResource extends AbstractAdapterResource { @@ -73,12 +71,12 @@ public ResponseEntity fetchConfigurations(@PathVariable("id") String appId, @RequestBody RuntimeOptionsRequest runtimeOptionsRequest) { try { - String workerEndpoint = endpointGenerator.getEndpointBaseUrl( + String baseUrl = endpointGenerator.getEndpointBaseUrl( appId, SpServiceUrlProvider.ADAPTER, - Collections.emptySet() + runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags() ); - RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, appId, runtimeOptionsRequest); + RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest); return ok(result); } catch (AdapterException e) { diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index 0e1b55b5c8..1a82be45d3 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-04-02 22:53:18. +// Generated using typescript-generator version 3.2.1263 on 2024-04-04 07:44:56. export class NamedStreamPipesEntity { '@class': @@ -3269,6 +3269,7 @@ export class RuntimeOptionsRequest { | 'org.apache.streampipes.model.runtime.RuntimeOptionsResponse'; 'appId': string; 'belongsTo': string; + 'deploymentConfiguration': DeploymentConfiguration; 'inputStreams': SpDataStream[]; 'requestId': string; 'staticProperties': StaticPropertyUnion[]; @@ -3284,6 +3285,9 @@ export class RuntimeOptionsRequest { instance['@class'] = data['@class']; instance.appId = data.appId; instance.belongsTo = data.belongsTo; + instance.deploymentConfiguration = DeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.inputStreams = __getCopyArrayFn(SpDataStream.fromData)( data.inputStreams, ); diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html index 82bb3806b3..4e5c556cb2 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html @@ -70,6 +70,9 @@ [configurationGroup]="specificAdapterForm" [adapterId]="adapterDescription.appId" [configuration]="adapterDescription.config" + [deploymentConfiguration]=" + adapterDescription.deploymentConfiguration + " > diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.html b/ui/src/app/connect/components/configuration-group/configuration-group.component.html index a09b11b734..68569ade6d 100644 --- a/ui/src/app/connect/components/configuration-group/configuration-group.component.html +++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.html @@ -25,6 +25,7 @@
implements OnInit { + @Input() + deploymentConfiguration: DeploymentConfiguration; + @Output() inputEmitter: EventEmitter = new EventEmitter(); completedStaticProperty: ConfigurationInfo; diff --git a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html index 29b084725b..c2db12bedb 100644 --- a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html +++ b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html @@ -24,6 +24,7 @@ >
{ + @Input() + deploymentConfiguration: DeploymentConfiguration; + constructor() { super(); } diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html index bd8d67bde3..fb77d388b8 100644 --- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html +++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html @@ -20,6 +20,7 @@ implements OnInit { + @Input() + deploymentConfiguration: DeploymentConfiguration; + @Output() inputEmitter: EventEmitter = new EventEmitter(); dependentStaticProperties: Map = new Map< diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html index 34aaad2028..4ecbd7c49b 100644 --- a/ui/src/app/core-ui/static-properties/static-property.component.html +++ b/ui/src/app/core-ui/static-properties/static-property.component.html @@ -86,6 +86,7 @@ Date: Thu, 4 Apr 2024 08:29:12 +0200 Subject: [PATCH 4/9] Fix checkstyle --- .../streampipes/rest/impl/connect/RuntimeResolvableResource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index 93b5efd811..32cfb73044 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -35,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; From 8c8ed98319392f38bfbee1cc441139b1c7f3c874 Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 Apr 2024 08:59:30 +0200 Subject: [PATCH 5/9] add short comment to new ENV variable --- .../main/java/org/apache/streampipes/commons/constants/Envs.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 489c54d5a3..5f8de15ef6 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -99,6 +99,7 @@ public enum Envs { SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"), + // expects a comma separated string of service names SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""); private final String envVariableName; From 5ae113576daa60079e7046875eceea3769299833 Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Tue, 30 Apr 2024 09:47:36 +0200 Subject: [PATCH 6/9] feat: Add unit tests for CustomServiceTagResolver --- streampipes-service-extensions/pom.xml | 12 ++++ .../CustomServiceTagResolverTest.java | 68 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java diff --git a/streampipes-service-extensions/pom.xml b/streampipes-service-extensions/pom.xml index 8370ad77c0..26dcda6b5b 100644 --- a/streampipes-service-extensions/pom.xml +++ b/streampipes-service-extensions/pom.xml @@ -57,6 +57,18 @@ + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.mockito + mockito-core + test + diff --git a/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java b/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java new file mode 100644 index 0000000000..72e95db8b8 --- /dev/null +++ b/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.extensions; + + +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CustomServiceTagResolverTest { + + private StringEnvironmentVariable customServiceTags; + private CustomServiceTagResolver resolver; + + @BeforeEach + void setUp() { + var env = mock(Environment.class); + customServiceTags = mock(StringEnvironmentVariable.class); + when(env.getCustomServiceTags()).thenReturn(customServiceTags); + resolver = new CustomServiceTagResolver(env); + } + + @Test + void returnsCustomServiceTagsWhenTheyExist() { + when(customServiceTags.exists()).thenReturn(true); + when(customServiceTags.getValue()).thenReturn("tag1,tag2"); + + var result = resolver.getCustomServiceTags(); + + assertEquals(2, result.size()); + assertTrue(result.contains(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, "tag1"))); + assertTrue(result.contains(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, "tag2"))); + } + + @Test + void returnsEmptySetWhenNoCustomServiceTagsExist() { + when(customServiceTags.exists()).thenReturn(false); + + var result = resolver.getCustomServiceTags(); + + assertTrue(result.isEmpty()); + } +} \ No newline at end of file From 989ef9a78fcc42f9809d5f147be3b58092874ee3 Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 Apr 2024 09:52:16 +0200 Subject: [PATCH 7/9] add comment for clarification --- .../model/extensions/svcdiscovery/SpServiceTagPrefix.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java index 568a8295cb..bbec9a63ff 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java @@ -19,7 +19,7 @@ public enum SpServiceTagPrefix { SP_GROUP, - CUSTOM, + CUSTOM, // Is used for service tags provided by Envs.SP_SERVICE_TAGS ADAPTER, DATA_STREAM, DATA_PROCESSOR, From 0a23587aa0b1512f334abe2f33dd60caf3455690 Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 Apr 2024 10:25:31 +0200 Subject: [PATCH 8/9] add comment for clarification --- .../model/extensions/svcdiscovery/SpServiceTagPrefix.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java index bbec9a63ff..cea9c39da1 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java @@ -19,7 +19,7 @@ public enum SpServiceTagPrefix { SP_GROUP, - CUSTOM, // Is used for service tags provided by Envs.SP_SERVICE_TAGS + CUSTOM, // Is used for user-defined service tags provided via Envs.SP_SERVICE_TAGS ADAPTER, DATA_STREAM, DATA_PROCESSOR, From 874bfe93ea47ebd742574e03f79baaaf758ed8f4 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Tue, 30 Apr 2024 22:05:34 +0200 Subject: [PATCH 9/9] Rename DeploymentConfiguration --- .../connect/adapter/AdapterDescription.java | 14 ++--- ... => ExtensionDeploymentConfiguration.java} | 4 +- .../model/runtime/RuntimeOptionsRequest.java | 8 +-- .../src/lib/model/gen/streampipes-model.ts | 60 ++++++++++--------- .../adapter-deployment-settings.component.ts | 4 +- .../configuration-group.component.ts | 4 +- .../static-alternatives.component.ts | 5 +- .../static-collection.component.ts | 4 +- .../static-group/static-group.component.ts | 4 +- .../static-property.component.ts | 4 +- .../base-runtime-resolvable-input.ts | 4 +- 11 files changed, 58 insertions(+), 57 deletions(-) rename streampipes-model/src/main/java/org/apache/streampipes/model/deployment/{DeploymentConfiguration.java => ExtensionDeploymentConfiguration.java} (94%) diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java index d8b549211d..3d96406618 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java @@ -25,7 +25,7 @@ import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription; import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription; import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription; -import org.apache.streampipes.model.deployment.DeploymentConfiguration; +import org.apache.streampipes.model.deployment.ExtensionDeploymentConfiguration; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.shared.annotation.TsModel; @@ -56,7 +56,7 @@ public class AdapterDescription extends VersionedNamedStreamPipesEntity { // Is used to store where the adapter is running to stop it private String selectedEndpointUrl; - private DeploymentConfiguration deploymentConfiguration; + private ExtensionDeploymentConfiguration deploymentConfiguration; /** * This is used to identify all the service within the service group the adapter can be invoked in @@ -75,7 +75,7 @@ public AdapterDescription() { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); - this.deploymentConfiguration = new DeploymentConfiguration(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); } public AdapterDescription(int version) { @@ -85,7 +85,7 @@ public AdapterDescription(int version) { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); - this.deploymentConfiguration = new DeploymentConfiguration(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); this.setVersion(version); } @@ -94,7 +94,7 @@ public AdapterDescription(String elementId, String name, String description) { this.rules = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); - this.deploymentConfiguration = new DeploymentConfiguration(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); } @@ -267,11 +267,11 @@ public void setRunning(boolean running) { this.running = running; } - public DeploymentConfiguration getDeploymentConfiguration() { + public ExtensionDeploymentConfiguration getDeploymentConfiguration() { return deploymentConfiguration; } - public void setDeploymentConfiguration(DeploymentConfiguration deploymentConfiguration) { + public void setDeploymentConfiguration(ExtensionDeploymentConfiguration deploymentConfiguration) { this.deploymentConfiguration = deploymentConfiguration; } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/DeploymentConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java similarity index 94% rename from streampipes-model/src/main/java/org/apache/streampipes/model/deployment/DeploymentConfiguration.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java index 3e44f8f5e6..16b7c0753e 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/DeploymentConfiguration.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java @@ -23,12 +23,12 @@ import java.util.HashSet; import java.util.Set; -public class DeploymentConfiguration { +public class ExtensionDeploymentConfiguration { private Set desiredServiceTags; private String selectedEndpointUrl; - public DeploymentConfiguration() { + public ExtensionDeploymentConfiguration() { this.desiredServiceTags = new HashSet<>(); } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java index 1aee67d79d..3e0417c8bb 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java @@ -18,7 +18,7 @@ package org.apache.streampipes.model.runtime; import org.apache.streampipes.model.SpDataStream; -import org.apache.streampipes.model.deployment.DeploymentConfiguration; +import org.apache.streampipes.model.deployment.ExtensionDeploymentConfiguration; import org.apache.streampipes.model.shared.annotation.TsModel; import org.apache.streampipes.model.staticproperty.StaticProperty; @@ -38,7 +38,7 @@ public class RuntimeOptionsRequest { protected List inputStreams; - protected DeploymentConfiguration deploymentConfiguration; + protected ExtensionDeploymentConfiguration deploymentConfiguration; private String belongsTo; @@ -99,11 +99,11 @@ public void setBelongsTo(String belongsTo) { this.belongsTo = belongsTo; } - public DeploymentConfiguration getDeploymentConfiguration() { + public ExtensionDeploymentConfiguration getDeploymentConfiguration() { return deploymentConfiguration; } - public void setDeploymentConfiguration(DeploymentConfiguration deploymentConfiguration) { + public void setDeploymentConfiguration(ExtensionDeploymentConfiguration deploymentConfiguration) { this.deploymentConfiguration = deploymentConfiguration; } } diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index a354dc22f0..b0de88590b 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-04-29 20:46:18. +// Generated using typescript-generator version 3.2.1263 on 2024-04-30 20:29:03. export class NamedStreamPipesEntity { '@class': @@ -112,7 +112,7 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { 'correspondingServiceGroup': string; 'createdAt': number; 'dataStream': SpDataStream; - 'deploymentConfiguration': DeploymentConfiguration; + 'deploymentConfiguration': ExtensionDeploymentConfiguration; 'eventGrounding': EventGrounding; 'eventSchema': EventSchema; 'icon': string; @@ -143,9 +143,10 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { instance.correspondingServiceGroup = data.correspondingServiceGroup; instance.createdAt = data.createdAt; instance.dataStream = SpDataStream.fromData(data.dataStream); - instance.deploymentConfiguration = DeploymentConfiguration.fromData( - data.deploymentConfiguration, - ); + instance.deploymentConfiguration = + ExtensionDeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.eventGrounding = EventGrounding.fromData(data.eventGrounding); instance.eventSchema = EventSchema.fromData(data.eventSchema); instance.icon = data.icon; @@ -1368,26 +1369,6 @@ export class DeleteRuleDescription extends SchemaTransformationRuleDescription { } } -export class DeploymentConfiguration { - desiredServiceTags: SpServiceTag[]; - selectedEndpointUrl: string; - - static fromData( - data: DeploymentConfiguration, - target?: DeploymentConfiguration, - ): DeploymentConfiguration { - if (!data) { - return data; - } - const instance = target || new DeploymentConfiguration(); - instance.desiredServiceTags = __getCopyArrayFn(SpServiceTag.fromData)( - data.desiredServiceTags, - ); - instance.selectedEndpointUrl = data.selectedEndpointUrl; - return instance; - } -} - export class DomainStaticProperty extends StaticProperty { '@class': 'org.apache.streampipes.model.staticproperty.DomainStaticProperty'; 'requiredClass': string; @@ -1818,6 +1799,26 @@ export class ExportItem { } } +export class ExtensionDeploymentConfiguration { + desiredServiceTags: SpServiceTag[]; + selectedEndpointUrl: string; + + static fromData( + data: ExtensionDeploymentConfiguration, + target?: ExtensionDeploymentConfiguration, + ): ExtensionDeploymentConfiguration { + if (!data) { + return data; + } + const instance = target || new ExtensionDeploymentConfiguration(); + instance.desiredServiceTags = __getCopyArrayFn(SpServiceTag.fromData)( + data.desiredServiceTags, + ); + instance.selectedEndpointUrl = data.selectedEndpointUrl; + return instance; + } +} + export class ExtensionItemDescription { appId: string; available: boolean; @@ -3269,7 +3270,7 @@ export class RuntimeOptionsRequest { | 'org.apache.streampipes.model.runtime.RuntimeOptionsResponse'; 'appId': string; 'belongsTo': string; - 'deploymentConfiguration': DeploymentConfiguration; + 'deploymentConfiguration': ExtensionDeploymentConfiguration; 'inputStreams': SpDataStream[]; 'requestId': string; 'staticProperties': StaticPropertyUnion[]; @@ -3285,9 +3286,10 @@ export class RuntimeOptionsRequest { instance['@class'] = data['@class']; instance.appId = data.appId; instance.belongsTo = data.belongsTo; - instance.deploymentConfiguration = DeploymentConfiguration.fromData( - data.deploymentConfiguration, - ); + instance.deploymentConfiguration = + ExtensionDeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.inputStreams = __getCopyArrayFn(SpDataStream.fromData)( data.inputStreams, ); diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts index b3d8143ccd..c019e9c369 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts @@ -18,7 +18,7 @@ import { Component, ElementRef, Input, OnInit, ViewChild } from '@angular/core'; import { - DeploymentConfiguration, + ExtensionDeploymentConfiguration, ServiceTagService, SpServiceTag, } from '@streampipes/platform-services'; @@ -36,7 +36,7 @@ import { MatRadioChange } from '@angular/material/radio'; }) export class SpAdapterDeploymentSettingsComponent implements OnInit { @Input() - deploymentConfiguration: DeploymentConfiguration; + deploymentConfiguration: ExtensionDeploymentConfiguration; availableServiceTags: SpServiceTag[] = []; availableServiceTagValues: string[] = []; diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.ts b/ui/src/app/connect/components/configuration-group/configuration-group.component.ts index 4a979fe310..1e109beab6 100644 --- a/ui/src/app/connect/components/configuration-group/configuration-group.component.ts +++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.ts @@ -19,7 +19,7 @@ import { Component, Input } from '@angular/core'; import { UntypedFormGroup } from '@angular/forms'; import { - DeploymentConfiguration, + ExtensionDeploymentConfiguration, StaticPropertyUnion, } from '@streampipes/platform-services'; import { ConfigurationInfo } from '../../model/ConfigurationInfo'; @@ -36,7 +36,7 @@ export class ConfigurationGroupComponent { @Input() configuration: StaticPropertyUnion[]; - @Input() deploymentConfiguration: DeploymentConfiguration; + @Input() deploymentConfiguration: ExtensionDeploymentConfiguration; completedStaticProperty: ConfigurationInfo; diff --git a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts index abb524e0f3..6e4c668984 100644 --- a/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts +++ b/ui/src/app/core-ui/static-properties/static-alternatives/static-alternatives.component.ts @@ -26,12 +26,11 @@ import { } from '@angular/core'; import { AbstractStaticPropertyRenderer } from '../base/abstract-static-property'; import { - DeploymentConfiguration, + ExtensionDeploymentConfiguration, StaticPropertyAlternative, StaticPropertyAlternatives, } from '@streampipes/platform-services'; import { ConfigurationInfo } from '../../../connect/model/ConfigurationInfo'; -import { MatRadioChange } from '@angular/material/radio'; @Component({ selector: 'sp-app-static-alternatives', @@ -43,7 +42,7 @@ export class StaticAlternativesComponent implements OnInit { @Input() - deploymentConfiguration: DeploymentConfiguration; + deploymentConfiguration: ExtensionDeploymentConfiguration; @Output() inputEmitter: EventEmitter = new EventEmitter(); diff --git a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts index 19baa55c0f..013893d932 100644 --- a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts +++ b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.ts @@ -19,7 +19,7 @@ import { Component, Input } from '@angular/core'; import { CollectionStaticProperty, - DeploymentConfiguration, + ExtensionDeploymentConfiguration, StaticPropertyUnion, } from '@streampipes/platform-services'; import { AbstractValidatedStaticPropertyRenderer } from '../base/abstract-validated-static-property'; @@ -31,7 +31,7 @@ import { AbstractValidatedStaticPropertyRenderer } from '../base/abstract-valida }) export class StaticCollectionComponent extends AbstractValidatedStaticPropertyRenderer { @Input() - deploymentConfiguration: DeploymentConfiguration; + deploymentConfiguration: ExtensionDeploymentConfiguration; constructor() { super(); diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts b/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts index bf597f785f..45da778749 100644 --- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts +++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.ts @@ -19,7 +19,7 @@ import { Component, EventEmitter, Input, OnInit, Output } from '@angular/core'; import { AbstractStaticPropertyRenderer } from '../base/abstract-static-property'; import { - DeploymentConfiguration, + ExtensionDeploymentConfiguration, StaticPropertyGroup, } from '@streampipes/platform-services'; import { ConfigurationInfo } from '../../../connect/model/ConfigurationInfo'; @@ -34,7 +34,7 @@ export class StaticGroupComponent implements OnInit { @Input() - deploymentConfiguration: DeploymentConfiguration; + deploymentConfiguration: ExtensionDeploymentConfiguration; @Output() inputEmitter: EventEmitter = new EventEmitter(); diff --git a/ui/src/app/core-ui/static-properties/static-property.component.ts b/ui/src/app/core-ui/static-properties/static-property.component.ts index 7a76b4509f..ef3ddc98eb 100644 --- a/ui/src/app/core-ui/static-properties/static-property.component.ts +++ b/ui/src/app/core-ui/static-properties/static-property.component.ts @@ -24,8 +24,8 @@ import { CodeInputStaticProperty, CollectionStaticProperty, ColorPickerStaticProperty, - DeploymentConfiguration, EventSchema, + ExtensionDeploymentConfiguration, FileStaticProperty, FreeTextStaticProperty, MappingPropertyNary, @@ -84,7 +84,7 @@ export class StaticPropertyComponent implements OnInit { pipelineElement: InvocablePipelineElementUnion; @Input() - deploymentConfiguration: DeploymentConfiguration; + deploymentConfiguration: ExtensionDeploymentConfiguration; showLabel = true; diff --git a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts index 8341eddb88..19e9a9cda6 100644 --- a/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts +++ b/ui/src/app/core-ui/static-properties/static-runtime-resolvable-input/base-runtime-resolvable-input.ts @@ -18,7 +18,7 @@ import { AbstractStaticPropertyRenderer } from '../base/abstract-static-property'; import { - DeploymentConfiguration, + ExtensionDeploymentConfiguration, RuntimeOptionsRequest, RuntimeOptionsResponse, RuntimeResolvableAnyStaticProperty, @@ -50,7 +50,7 @@ export abstract class BaseRuntimeResolvableInput< @Input() completedStaticProperty: ConfigurationInfo; - @Input() deploymentConfiguration: DeploymentConfiguration; + @Input() deploymentConfiguration: ExtensionDeploymentConfiguration; showOptions = false; loading = false;