From 0fdd15d74af9ad7e13fb4908cc3c56e714abcd42 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Tue, 30 Apr 2024 22:32:04 +0200 Subject: [PATCH] feat: Support custom service tags and service selection for adapters (#2691) * feat: Support custom service tags and service selection for adapters * Remove empty scss file * Support service selection in runtime resolvable static properties * Fix checkstyle * add short comment to new ENV variable * feat: Add unit tests for CustomServiceTagResolver * add comment for clarification * add comment for clarification * Rename DeploymentConfiguration --------- Co-authored-by: bossenti Co-authored-by: Philipp Zehnder --- .../streampipes/commons/constants/Envs.java | 5 +- .../environment/DefaultEnvironment.java | 5 + .../commons/environment/Environment.java | 2 + .../management/AdapterMasterManagement.java | 21 ++- .../management/GuessManagement.java | 19 ++- .../management/WorkerRestClient.java | 8 +- .../management/WorkerUrlProvider.java | 42 ------ .../connect/management/util/WorkerPaths.java | 23 ---- .../connect/adapter/AdapterDescription.java | 15 +++ .../ExtensionDeploymentConfiguration.java | 50 +++++++ .../extensions/svcdiscovery/SpServiceTag.java | 22 ++++ .../svcdiscovery/SpServiceTagPrefix.java | 1 + .../model/runtime/RuntimeOptionsRequest.java | 11 ++ .../IExtensionsServiceEndpointGenerator.java | 45 +++++++ .../manager/assets/AssetFetcher.java | 2 +- .../ExtensionsServiceEndpointGenerator.java | 76 ++++++----- .../ExtensionsServiceEndpointProvider.java | 32 ----- .../execution/task/DiscoverEndpointsTask.java | 9 +- .../manager/health/PipelineHealthCheck.java | 2 +- .../CustomTransformOutputSchemaGenerator.java | 6 +- .../manager/preview/PipelinePreview.java | 9 +- .../ContainerProvidedOptionsHandler.java | 2 +- .../rest/impl/ServiceTagResource.java | 43 ++++++ .../impl/connect/DescriptionResource.java | 18 ++- .../connect/RuntimeResolvableResource.java | 18 ++- .../svcdiscovery/api/ISpServiceDiscovery.java | 9 +- .../svcdiscovery/SpServiceDiscoveryCore.java | 27 ++-- streampipes-service-extensions/pom.xml | 12 ++ .../extensions/CustomServiceTagResolver.java | 49 +++++++ .../StreamPipesExtensionsServiceBase.java | 2 + .../CustomServiceTagResolverTest.java | 68 ++++++++++ .../src/lib/apis/service-tags.service.ts | 46 +++++++ .../src/lib/model/gen/streampipes-model.ts | 33 ++++- .../platform-services/src/public-api.ts | 1 + ...adapter-deployment-settings.component.html | 78 +++++++++++ .../adapter-deployment-settings.component.ts | 122 ++++++++++++++++++ .../adapter-settings.component.html | 14 ++ .../configuration-group.component.html | 1 + .../configuration-group.component.ts | 7 +- ui/src/app/connect/connect.module.ts | 2 + .../static-alternatives.component.html | 1 + .../static-alternatives.component.ts | 6 +- .../static-collection.component.html | 1 + .../static-collection.component.ts | 6 +- .../static-group/static-group.component.html | 2 + .../static-group/static-group.component.ts | 10 +- .../static-property.component.html | 7 + .../static-property.component.ts | 4 + .../base-runtime-resolvable-input.ts | 6 + 49 files changed, 804 insertions(+), 196 deletions(-) delete mode 100644 streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java create mode 100644 streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java create mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java create mode 100644 streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java create mode 100644 streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java create mode 100644 streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java create mode 100644 ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts create mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html create mode 100644 ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 4c58485e27..5f8de15ef6 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -97,7 +97,10 @@ public enum Envs { SP_NATS_HOST("SP_NATS_HOST", "nats"), SP_NATS_PORT("SP_NATS_PORT", "4222"), - SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"); + SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"), + + // expects a comma separated string of service names + SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""); private final String envVariableName; private String defaultValue; diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index 5e613e0909..3fef8c55f7 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -274,4 +274,9 @@ public StringEnvironmentVariable getPulsarUrl() { return new StringEnvironmentVariable(Envs.SP_PULSAR_URL); } + @Override + public StringEnvironmentVariable getCustomServiceTags() { + return new StringEnvironmentVariable(Envs.SP_SERVICE_TAGS); + } + } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index 092e6992d6..ceb2d2d1f2 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -135,4 +135,6 @@ public interface Environment { StringEnvironmentVariable getPulsarUrl(); + StringEnvironmentVariable getCustomServiceTags(); + } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index ec87909021..56220fde37 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -23,7 +23,7 @@ import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics; import org.apache.streampipes.connect.management.util.GroundingUtils; -import org.apache.streampipes.connect.management.util.WorkerPaths; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsLogProvider; import org.apache.streampipes.manager.verification.DataStreamVerifier; import org.apache.streampipes.model.SpDataStream; @@ -32,12 +32,11 @@ import org.apache.streampipes.resource.management.AdapterResourceManager; import org.apache.streampipes.resource.management.DataStreamResourceManager; import org.apache.streampipes.storage.api.IAdapterStorage; -import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URISyntaxException; import java.util.List; import java.util.NoSuchElementException; @@ -54,12 +53,12 @@ public class AdapterMasterManagement { private final DataStreamResourceManager dataStreamResourceManager; - public AdapterMasterManagement(IAdapterStorage adapterStorage, + public AdapterMasterManagement(IAdapterStorage adapterInstanceStorage, AdapterResourceManager adapterResourceManager, DataStreamResourceManager dataStreamResourceManager, AdapterMetrics adapterMetrics ) { - this.adapterInstanceStorage = adapterStorage; + this.adapterInstanceStorage = adapterInstanceStorage; this.adapterMetrics = adapterMetrics; this.adapterResourceManager = adapterResourceManager; this.dataStreamResourceManager = dataStreamResourceManager; @@ -164,7 +163,11 @@ public void startStreamAdapter(String elementId) throws AdapterException { try { // Find endpoint to start adapter on - var baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId()); + var baseUrl = new ExtensionsServiceEndpointGenerator().getEndpointBaseUrl( + ad.getAppId(), + SpServiceUrlProvider.ADAPTER, + ad.getDeploymentConfiguration().getDesiredServiceTags() + ); // Update selected endpoint URL of adapter ad.setSelectedEndpointUrl(baseUrl); @@ -177,7 +180,7 @@ public void startStreamAdapter(String elementId) throws AdapterException { adapterMetrics.register(ad.getElementId(), ad.getName()); LOG.info("Started adapter " + elementId + " on: " + baseUrl); - } catch (NoServiceEndpointsAvailableException | URISyntaxException e) { + } catch (NoServiceEndpointsAvailableException e) { throw new AdapterException("Could not start adapter due to unavailable service endpoint", e); } } @@ -192,8 +195,4 @@ private void installDataSource(SpDataStream stream, throw new AdapterException(); } } - - private IAdapterStorage getAdapterInstanceStorage() { - return StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage(); - } } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java index 104bae82e0..472cb7eca8 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java @@ -24,11 +24,15 @@ import org.apache.streampipes.connect.management.AdapterEventPreviewPipeline; import org.apache.streampipes.connect.management.util.WorkerPaths; import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; import org.apache.streampipes.manager.execution.ExtensionServiceExecutions; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; import org.apache.streampipes.model.connect.guess.AdapterEventPreview; import org.apache.streampipes.model.connect.guess.GuessSchema; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.serializers.json.JacksonSerializer; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -39,21 +43,25 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Set; public class GuessManagement { private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class); - private final WorkerUrlProvider workerUrlProvider; + private final IExtensionsServiceEndpointGenerator endpointGenerator; private final ObjectMapper objectMapper; public GuessManagement() { - this.workerUrlProvider = new WorkerUrlProvider(); + this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); this.objectMapper = JacksonSerializer.getObjectMapper(); } public GuessSchema guessSchema(AdapterDescription adapterDescription) throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException { - var workerUrl = getWorkerUrl(adapterDescription.getAppId()); + var workerUrl = getWorkerUrl( + adapterDescription.getAppId(), + adapterDescription.getDeploymentConfiguration().getDesiredServiceTags() + ); var description = objectMapper.writeValueAsString(adapterDescription); LOG.info("Guess schema at: " + workerUrl); @@ -72,8 +80,9 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription) } } - private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException { - var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId); + private String getWorkerUrl(String appId, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + var baseUrl = endpointGenerator.getEndpointBaseUrl(appId, SpServiceUrlProvider.ADAPTER, customServiceTags); return baseUrl + WorkerPaths.getGuessSchemaPath(); } diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java index c715459070..91558045e2 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerRestClient.java @@ -52,10 +52,10 @@ public class WorkerRestClient { private static final Logger LOG = LoggerFactory.getLogger(WorkerRestClient.class); - public static void invokeStreamAdapter(String endpointUrl, + public static void invokeStreamAdapter(String baseUrl, String elementId) throws AdapterException { var adapterStreamDescription = getAndDecryptAdapter(elementId); - var url = endpointUrl + WorkerPaths.getStreamInvokePath(); + var url = baseUrl + WorkerPaths.getStreamInvokePath(); startAdapter(url, adapterStreamDescription); updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true); @@ -127,11 +127,11 @@ private static HttpResponse triggerPost(String url, return request.execute().returnResponse(); } - public static RuntimeOptionsResponse getConfiguration(String workerEndpoint, + public static RuntimeOptionsResponse getConfiguration(String baseUrl, String appId, RuntimeOptionsRequest runtimeOptionsRequest) throws AdapterException, SpConfigurationException { - String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId); + String url = baseUrl + WorkerPaths.getRuntimeResolvablePath(appId); try { String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest); diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java deleted file mode 100644 index b9fc970ab4..0000000000 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/WorkerUrlProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.streampipes.connect.management.management; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -public class WorkerUrlProvider { - - - public WorkerUrlProvider() { - } - - public String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException { - return getEndpointGenerator(appId).getEndpointResourceUrl(); - } - - public String getWorkerBaseUrl(String appId) throws NoServiceEndpointsAvailableException { - return getEndpointGenerator(appId).getEndpointBaseUrl(); - } - - private ExtensionsServiceEndpointGenerator getEndpointGenerator(String appId) { - return new ExtensionsServiceEndpointGenerator(appId, SpServiceUrlProvider.ADAPTER); - } - -} diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java index 29d89ebd1b..7bd2b0c1ed 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/util/WorkerPaths.java @@ -17,13 +17,6 @@ */ package org.apache.streampipes.connect.management.util; -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; -import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; - -import java.net.URI; -import java.net.URISyntaxException; - public class WorkerPaths { private static final String WorkerMainPath = "/api/v1/worker"; @@ -36,14 +29,6 @@ public static String getStreamStopPath() { return WorkerMainPath + "/stream/stop"; } - public static String getSetInvokePath() { - return WorkerMainPath + "/set/invoke"; - } - - public static String getSetStopPath() { - return WorkerMainPath + "/set/stop"; - } - public static String getRunningAdaptersPath() { return WorkerMainPath + "/running"; } @@ -56,12 +41,4 @@ public static String getGuessSchemaPath() { return WorkerMainPath + "/guess/schema"; } - public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException { - SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER; - String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl(); - URI uri = new URI(endpointUrl); - return uri.getScheme() + "://" + uri.getAuthority(); - } - - } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java index 5fd6b21b3f..3d96406618 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/AdapterDescription.java @@ -25,6 +25,7 @@ import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription; import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription; import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription; +import org.apache.streampipes.model.deployment.ExtensionDeploymentConfiguration; import org.apache.streampipes.model.grounding.EventGrounding; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.model.shared.annotation.TsModel; @@ -55,6 +56,8 @@ public class AdapterDescription extends VersionedNamedStreamPipesEntity { // Is used to store where the adapter is running to stop it private String selectedEndpointUrl; + private ExtensionDeploymentConfiguration deploymentConfiguration; + /** * This is used to identify all the service within the service group the adapter can be invoked in */ @@ -72,6 +75,7 @@ public AdapterDescription() { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); } public AdapterDescription(int version) { @@ -81,6 +85,7 @@ public AdapterDescription(int version) { this.config = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); this.setVersion(version); } @@ -89,6 +94,7 @@ public AdapterDescription(String elementId, String name, String description) { this.rules = new ArrayList<>(); this.category = new ArrayList<>(); this.dataStream = new SpDataStream(); + this.deploymentConfiguration = new ExtensionDeploymentConfiguration(); } @@ -109,6 +115,7 @@ public AdapterDescription(AdapterDescription other) { this.dataStream = new SpDataStream(other.getDataStream()); } this.running = other.isRunning(); + this.deploymentConfiguration = other.getDeploymentConfiguration(); } public String getRev() { @@ -259,4 +266,12 @@ public boolean isRunning() { public void setRunning(boolean running) { this.running = running; } + + public ExtensionDeploymentConfiguration getDeploymentConfiguration() { + return deploymentConfiguration; + } + + public void setDeploymentConfiguration(ExtensionDeploymentConfiguration deploymentConfiguration) { + this.deploymentConfiguration = deploymentConfiguration; + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java new file mode 100644 index 0000000000..16b7c0753e --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/deployment/ExtensionDeploymentConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.deployment; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; + +import java.util.HashSet; +import java.util.Set; + +public class ExtensionDeploymentConfiguration { + + private Set desiredServiceTags; + private String selectedEndpointUrl; + + public ExtensionDeploymentConfiguration() { + this.desiredServiceTags = new HashSet<>(); + } + + public Set getDesiredServiceTags() { + return desiredServiceTags; + } + + public void setDesiredServiceTags(Set desiredServiceTags) { + this.desiredServiceTags = desiredServiceTags; + } + + public String getSelectedEndpointUrl() { + return selectedEndpointUrl; + } + + public void setSelectedEndpointUrl(String selectedEndpointUrl) { + this.selectedEndpointUrl = selectedEndpointUrl; + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java index 71b7fe428e..572fc4a7f6 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTag.java @@ -17,6 +17,11 @@ */ package org.apache.streampipes.model.extensions.svcdiscovery; +import org.apache.streampipes.model.shared.annotation.TsModel; + +import java.util.Objects; + +@TsModel public class SpServiceTag { private static final String COLON = ":"; @@ -56,4 +61,21 @@ public String getValue() { public void setValue(String value) { this.value = value; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpServiceTag that = (SpServiceTag) o; + return prefix == that.prefix && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(prefix, value); + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java index f412bdbe6f..cea9c39da1 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/svcdiscovery/SpServiceTagPrefix.java @@ -19,6 +19,7 @@ public enum SpServiceTagPrefix { SP_GROUP, + CUSTOM, // Is used for user-defined service tags provided via Envs.SP_SERVICE_TAGS ADAPTER, DATA_STREAM, DATA_PROCESSOR, diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java index 0f5efe2411..3e0417c8bb 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/RuntimeOptionsRequest.java @@ -18,6 +18,7 @@ package org.apache.streampipes.model.runtime; import org.apache.streampipes.model.SpDataStream; +import org.apache.streampipes.model.deployment.ExtensionDeploymentConfiguration; import org.apache.streampipes.model.shared.annotation.TsModel; import org.apache.streampipes.model.staticproperty.StaticProperty; @@ -37,6 +38,8 @@ public class RuntimeOptionsRequest { protected List inputStreams; + protected ExtensionDeploymentConfiguration deploymentConfiguration; + private String belongsTo; public RuntimeOptionsRequest() { @@ -95,4 +98,12 @@ public String getBelongsTo() { public void setBelongsTo(String belongsTo) { this.belongsTo = belongsTo; } + + public ExtensionDeploymentConfiguration getDeploymentConfiguration() { + return deploymentConfiguration; + } + + public void setDeploymentConfiguration(ExtensionDeploymentConfiguration deploymentConfiguration) { + this.deploymentConfiguration = deploymentConfiguration; + } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java new file mode 100644 index 0000000000..7414c7bd26 --- /dev/null +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/IExtensionsServiceEndpointGenerator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.manager.api.extensions; + +import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; + +import java.util.Collections; +import java.util.Set; + +public interface IExtensionsServiceEndpointGenerator { + + String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException; + + default String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider) + throws NoServiceEndpointsAvailableException { + return getEndpointResourceUrl(appId, spServiceUrlProvider, Collections.emptySet()); + } + + String getEndpointBaseUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException; +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java index 553762018e..25c3c9accf 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/assets/AssetFetcher.java @@ -40,7 +40,7 @@ public AssetFetcher(SpServiceUrlProvider spServiceUrlProvider, } public InputStream fetchPipelineElementAssets() throws IOException, NoServiceEndpointsAvailableException { - String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, spServiceUrlProvider).getEndpointResourceUrl(); + String endpointUrl = new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, spServiceUrlProvider); return Request .Get(endpointUrl + ASSET_ENDPOINT_APPENDIX) .execute() diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java index cb2d28408a..ae1b8227c7 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointGenerator.java @@ -18,7 +18,8 @@ package org.apache.streampipes.manager.execution.endpoint; import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.model.base.NamedStreamPipesEntity; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes; import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; @@ -26,59 +27,66 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.stream.Stream; -public class ExtensionsServiceEndpointGenerator { +public class ExtensionsServiceEndpointGenerator implements IExtensionsServiceEndpointGenerator { private static final Logger LOG = LoggerFactory.getLogger(ExtensionsServiceEndpointGenerator.class); - private final String appId; - private final SpServiceUrlProvider spServiceUrlProvider; - - public ExtensionsServiceEndpointGenerator(String appId, - SpServiceUrlProvider spServiceUrlProvider) { - this.appId = appId; - this.spServiceUrlProvider = spServiceUrlProvider; + public ExtensionsServiceEndpointGenerator() { } - public ExtensionsServiceEndpointGenerator(NamedStreamPipesEntity entity) { - this.appId = entity.getAppId(); - this.spServiceUrlProvider = ExtensionsServiceEndpointUtils.getPipelineElementType(entity); + public String getEndpointResourceUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) + throws NoServiceEndpointsAvailableException { + return spServiceUrlProvider.getInvocationUrl(selectService(appId, spServiceUrlProvider, customServiceTags), appId); } - public String getEndpointResourceUrl() throws NoServiceEndpointsAvailableException { - return spServiceUrlProvider.getInvocationUrl(selectService(), appId); + public String getEndpointBaseUrl(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + return selectService(appId, spServiceUrlProvider, customServiceTags); } - public String getEndpointBaseUrl() throws NoServiceEndpointsAvailableException { - return selectService(); + private String selectService(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) throws NoServiceEndpointsAvailableException { + List serviceEndpoints = getServiceEndpoints(appId, spServiceUrlProvider, customServiceTags); + if (!serviceEndpoints.isEmpty()) { + return serviceEndpoints.get(0); + } else { + LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, + spServiceUrlProvider.getServiceTag(appId).asString()); + throw new NoServiceEndpointsAvailableException( + "Could not find any matching service endpoints - are all software components running?"); + } } - private List getServiceEndpoints() { + private List getServiceEndpoints(String appId, + SpServiceUrlProvider spServiceUrlProvider, + Set customServiceTags) { return SpServiceDiscovery .getServiceDiscovery() .getServiceEndpoints( DefaultSpServiceTypes.EXT, true, - Collections - .singletonList( - this.spServiceUrlProvider - .getServiceTag(appId) - .asString() - ) + getDesiredServiceTags(appId, spServiceUrlProvider, customServiceTags) ); } - private String selectService() throws NoServiceEndpointsAvailableException { - List serviceEndpoints = getServiceEndpoints(); - if (!serviceEndpoints.isEmpty()) { - return getServiceEndpoints().get(0); - } else { - LOG.error("Could not find any service endpoints for appId {}, serviceTag {}", appId, - this.spServiceUrlProvider.getServiceTag(appId).asString()); - throw new NoServiceEndpointsAvailableException( - "Could not find any matching service endpoints - are all software components running?"); - } + private List getDesiredServiceTags(String appId, + SpServiceUrlProvider serviceUrlProvider, + Set customServiceTags) { + return Stream.concat( + Stream.of( + serviceUrlProvider.getServiceTag(appId) + ), + customServiceTags.stream() + ) + .map(SpServiceTag::asString) + .toList(); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java deleted file mode 100644 index 6e76fedfe7..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/endpoint/ExtensionsServiceEndpointProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.execution.endpoint; - -import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; -import org.apache.streampipes.model.base.InvocableStreamPipesEntity; - -public class ExtensionsServiceEndpointProvider { - - public String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); - } -} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java index 896c704f04..40da134aa1 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/task/DiscoverEndpointsTask.java @@ -20,7 +20,8 @@ import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException; import org.apache.streampipes.manager.execution.PipelineExecutionInfo; -import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointProvider; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointUtils; import org.apache.streampipes.model.api.EndpointSelectable; import org.apache.streampipes.model.base.InvocableStreamPipesEntity; import org.apache.streampipes.model.pipeline.Pipeline; @@ -69,7 +70,11 @@ private void applyEndpointAndPipeline(String pipelineId, private String findSelectedEndpoint(InvocableStreamPipesEntity pipelineElement) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointProvider().findSelectedEndpoint(pipelineElement); + return new ExtensionsServiceEndpointGenerator() + .getEndpointResourceUrl( + pipelineElement.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(pipelineElement) + ); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java index 40216cc87b..84ab806c30 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java @@ -139,7 +139,7 @@ public void checkAndRestorePipelineElements() { private String findEndpointUrl(InvocableStreamPipesEntity graph) throws NoServiceEndpointsAvailableException { SpServiceUrlProvider serviceUrlProvider = ExtensionsServiceEndpointUtils.getPipelineElementType(graph); - return new ExtensionsServiceEndpointGenerator(graph.getAppId(), serviceUrlProvider).getEndpointResourceUrl(); + return new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(graph.getAppId(), serviceUrlProvider); } private boolean shouldRetry(String instanceId) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java index 936adabb49..77551acb4a 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/output/CustomTransformOutputSchemaGenerator.java @@ -64,8 +64,10 @@ public Tuple2 buildFromTwoStreams(Sp private EventSchema makeRequest() { try { String httpRequestBody = JacksonSerializer.getObjectMapper().writeValueAsString(dataProcessorInvocation); - String endpointUrl = new ExtensionsServiceEndpointGenerator(dataProcessorInvocation.getAppId(), - SpServiceUrlProvider.DATA_PROCESSOR).getEndpointResourceUrl(); + String endpointUrl = new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl( + dataProcessorInvocation.getAppId(), + SpServiceUrlProvider.DATA_PROCESSOR + ); Response httpResp = Request.Post(endpointUrl + "/output").bodyString(httpRequestBody, ContentType .APPLICATION_JSON).execute(); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java index 1113c9a4c8..903ecbeb75 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java @@ -80,10 +80,11 @@ public String getPipelineElementPreview(String previewId, } private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws NoServiceEndpointsAvailableException { - return new ExtensionsServiceEndpointGenerator( - g.getAppId(), - ExtensionsServiceEndpointUtils.getPipelineElementType(g)) - .getEndpointResourceUrl(); + return new ExtensionsServiceEndpointGenerator() + .getEndpointResourceUrl( + g.getAppId(), + ExtensionsServiceEndpointUtils.getPipelineElementType(g) + ); } private void invokeGraphs(List graphs) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java index fd1957f5cc..16acbba678 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/remote/ContainerProvidedOptionsHandler.java @@ -55,6 +55,6 @@ private RuntimeOptionsResponse handleResponse(Response httpResp) throws JsonSynt private String getEndpointUrl(String appId) throws NoServiceEndpointsAvailableException { SpServiceUrlProvider provider = ExtensionsServiceEndpointUtils.getPipelineElementType(appId); - return new ExtensionsServiceEndpointGenerator(appId, provider).getEndpointResourceUrl() + "/configurations"; + return new ExtensionsServiceEndpointGenerator().getEndpointResourceUrl(appId, provider) + "/configurations"; } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java new file mode 100644 index 0000000000..536933f950 --- /dev/null +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/ServiceTagResource.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.impl; + +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; +import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; +import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery; + +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Set; + +@RestController +@RequestMapping("/api/v2/service-tags") +public class ServiceTagResource extends AbstractAuthGuardedRestResource { + + private final ISpServiceDiscovery serviceDiscovery = SpServiceDiscovery.getServiceDiscovery(); + + @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE) + public Set getCustomServiceTags() { + return serviceDiscovery.getCustomServiceTags(true); + } +} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java index dd40dd8541..7d8f562034 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/DescriptionResource.java @@ -22,8 +22,10 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.connect.management.management.DescriptionManagement; -import org.apache.streampipes.connect.management.management.WorkerUrlProvider; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +45,11 @@ public class DescriptionResource extends AbstractAdapterResource { private static final Logger LOG = LoggerFactory.getLogger(DescriptionResource.class); - private final WorkerUrlProvider workerUrlProvider; + private final IExtensionsServiceEndpointGenerator endpointGenerator; public DescriptionResource() { super(DescriptionManagement::new); - workerUrlProvider = new WorkerUrlProvider(); + endpointGenerator = new ExtensionsServiceEndpointGenerator(); } @GetMapping(path = "/adapters", produces = MediaType.APPLICATION_JSON_VALUE) @@ -65,7 +67,7 @@ public ResponseEntity getAdapterAssets(@PathVariable("id") String id) { Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getAssets(workerUrl); } @@ -93,7 +95,7 @@ public ResponseEntity getAdapterIconAsset(@PathVariable("id") String id) { Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getIconAsset(workerUrl); } @@ -120,7 +122,7 @@ public ResponseEntity getAdapterDocumentationAsset(@PathVariable("id") String Optional adapterDescriptionOptional = managementService.getAdapter(id); if (adapterDescriptionOptional.isPresent()) { AdapterDescription adapterDescription = adapterDescriptionOptional.get(); - String workerUrl = workerUrlProvider.getWorkerUrl(adapterDescription.getAppId()); + String workerUrl = getServiceResourceUrl(adapterDescription.getAppId()); result = managementService.getDocumentationAsset(workerUrl); } @@ -148,4 +150,8 @@ public ResponseEntity deleteAdapter(@PathVariable("adapterId") String adapter return badRequest(e); } } + + private String getServiceResourceUrl(String appId) throws NoServiceEndpointsAvailableException { + return endpointGenerator.getEndpointResourceUrl(appId, SpServiceUrlProvider.ADAPTER); + } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java index 6266439bb8..32cfb73044 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/RuntimeResolvableResource.java @@ -24,12 +24,14 @@ import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; import org.apache.streampipes.connect.management.management.WorkerRestClient; -import org.apache.streampipes.connect.management.management.WorkerUrlProvider; +import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator; +import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator; import org.apache.streampipes.model.monitoring.SpLogMessage; import org.apache.streampipes.model.runtime.RuntimeOptionsRequest; import org.apache.streampipes.model.runtime.RuntimeOptionsResponse; import org.apache.streampipes.resource.management.SpResourceManager; import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,17 +49,17 @@ public class RuntimeResolvableResource extends AbstractAdapterResource new WorkerAdministrationManagement( StorageDispatcher.INSTANCE.getNoSqlStore() - .getAdapterInstanceStorage(), + .getAdapterInstanceStorage(), AdapterMetricsManager.INSTANCE.getAdapterMetrics(), new SpResourceManager().manageAdapters(), new SpResourceManager().manageDataStreams() )); - this.workerUrlProvider = new WorkerUrlProvider(); + this.endpointGenerator = new ExtensionsServiceEndpointGenerator(); } @PostMapping( @@ -68,8 +70,12 @@ public ResponseEntity fetchConfigurations(@PathVariable("id") String appId, @RequestBody RuntimeOptionsRequest runtimeOptionsRequest) { try { - String workerEndpoint = workerUrlProvider.getWorkerBaseUrl(appId); - RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(workerEndpoint, appId, runtimeOptionsRequest); + String baseUrl = endpointGenerator.getEndpointBaseUrl( + appId, + SpServiceUrlProvider.ADAPTER, + runtimeOptionsRequest.getDeploymentConfiguration().getDesiredServiceTags() + ); + RuntimeOptionsResponse result = WorkerRestClient.getConfiguration(baseUrl, appId, runtimeOptionsRequest); return ok(result); } catch (AdapterException e) { diff --git a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java index 4f4350f7d8..1b648cef8a 100644 --- a/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java +++ b/streampipes-service-discovery-api/src/main/java/org/apache/streampipes/svcdiscovery/api/ISpServiceDiscovery.java @@ -17,16 +17,19 @@ */ package org.apache.streampipes.svcdiscovery.api; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; + import java.util.List; +import java.util.Set; public interface ISpServiceDiscovery { /** - * Get active pipeline element service endpoints + * Get custom service tags * - * @return list of pipeline element endpoints + * @return set of service tags */ - List getActivePipelineElementEndpoints(); + Set getCustomServiceTags(boolean restrictToHealthy); /** * Get service endpoints diff --git a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java index b887dbd976..19f1479483 100644 --- a/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java +++ b/streampipes-service-discovery/src/main/java/org/apache/streampipes/svcdiscovery/SpServiceDiscoveryCore.java @@ -21,10 +21,10 @@ import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.storage.api.CRUDStorage; import org.apache.streampipes.storage.management.StorageDispatcher; import org.apache.streampipes.svcdiscovery.api.ISpServiceDiscovery; -import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +47,21 @@ public SpServiceDiscoveryCore() { } @Override - public List getActivePipelineElementEndpoints() { - LOG.info("Discovering active pipeline element service endpoints"); - return getServiceEndpoints(DefaultSpServiceTypes.EXT, - true, - List.of()); + public Set getCustomServiceTags(boolean restrictToHealthy) { + var activeServices = findServices(0); + return activeServices + .stream() + .filter(service -> !restrictToHealthy || service.getStatus() != SpServiceStatus.UNHEALTHY) + .flatMap(service -> service.getTags().stream()) + .filter(serviceTag -> serviceTag.getPrefix() == SpServiceTagPrefix.CUSTOM) + .collect(Collectors.toSet()); } @Override public List getServiceEndpoints(String serviceGroup, boolean restrictToHealthy, List filterByTags) { - List activeServices = findService(0); + List activeServices = findServices(0); return activeServices .stream() @@ -73,9 +76,9 @@ private String makeServiceUrl(SpServiceRegistration service) { return service.getServiceUrl(); } -/** -* Checks if all the tags specified in the filter are supported by the service. -*/ + /** + * Checks if all the tags specified in the filter are supported by the service. + */ private boolean allFiltersSupported(SpServiceRegistration service, List filterByTags) { if (filterByTags.isEmpty()) { @@ -88,7 +91,7 @@ private boolean allFiltersSupported(SpServiceRegistration service, return serviceTags.containsAll(filterByTags); } - private List findService(int retryCount) { + private List findServices(int retryCount) { var services = serviceStorage.getAll(); if (services.isEmpty()) { if (retryCount < MAX_RETRIES) { @@ -96,7 +99,7 @@ private List findService(int retryCount) { retryCount++; LOG.info("Could not find any extensions services, retrying ({}/{})", retryCount, MAX_RETRIES); TimeUnit.MILLISECONDS.sleep(1000); - return findService(retryCount); + return findServices(retryCount); } catch (InterruptedException e) { LOG.warn("Could not find a service currently due to exception {}", e.getMessage()); return Collections.emptyList(); diff --git a/streampipes-service-extensions/pom.xml b/streampipes-service-extensions/pom.xml index 8370ad77c0..26dcda6b5b 100644 --- a/streampipes-service-extensions/pom.xml +++ b/streampipes-service-extensions/pom.xml @@ -57,6 +57,18 @@ + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.mockito + mockito-core + test + diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java new file mode 100644 index 0000000000..4033789c8c --- /dev/null +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/CustomServiceTagResolver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.extensions; + +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +public class CustomServiceTagResolver { + + private final Environment env; + + public CustomServiceTagResolver(Environment env) { + this.env = env; + } + + public Set getCustomServiceTags() { + if (env.getCustomServiceTags().exists()) { + var serviceTags = env.getCustomServiceTags().getValue(); + return Arrays.stream( + serviceTags.split(",") + ).map(serviceTagString -> SpServiceTag.create(SpServiceTagPrefix.CUSTOM, serviceTagString)) + .collect(Collectors.toSet()); + } else { + return Collections.emptySet(); + } + } +} diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java index 81775f2271..0bdb9c63ed 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java @@ -19,6 +19,7 @@ package org.apache.streampipes.service.extensions; import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; @@ -130,6 +131,7 @@ protected Set getServiceTags(Set extensi DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup())); } tags.addAll(getExtensionsServiceTags(extensions)); + tags.addAll(new CustomServiceTagResolver(Environments.getEnvironment()).getCustomServiceTags()); return tags; } diff --git a/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java b/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java new file mode 100644 index 0000000000..72e95db8b8 --- /dev/null +++ b/streampipes-service-extensions/src/test/java/org/apache/streampipes/service/extensions/CustomServiceTagResolverTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.extensions; + + +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CustomServiceTagResolverTest { + + private StringEnvironmentVariable customServiceTags; + private CustomServiceTagResolver resolver; + + @BeforeEach + void setUp() { + var env = mock(Environment.class); + customServiceTags = mock(StringEnvironmentVariable.class); + when(env.getCustomServiceTags()).thenReturn(customServiceTags); + resolver = new CustomServiceTagResolver(env); + } + + @Test + void returnsCustomServiceTagsWhenTheyExist() { + when(customServiceTags.exists()).thenReturn(true); + when(customServiceTags.getValue()).thenReturn("tag1,tag2"); + + var result = resolver.getCustomServiceTags(); + + assertEquals(2, result.size()); + assertTrue(result.contains(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, "tag1"))); + assertTrue(result.contains(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, "tag2"))); + } + + @Test + void returnsEmptySetWhenNoCustomServiceTagsExist() { + when(customServiceTags.exists()).thenReturn(false); + + var result = resolver.getCustomServiceTags(); + + assertTrue(result.isEmpty()); + } +} \ No newline at end of file diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts new file mode 100644 index 0000000000..2dd4755377 --- /dev/null +++ b/ui/projects/streampipes/platform-services/src/lib/apis/service-tags.service.ts @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Injectable } from '@angular/core'; +import { HttpClient } from '@angular/common/http'; +import { PlatformServicesCommons } from './commons.service'; +import { Observable } from 'rxjs'; +import { SpServiceTag } from '../model/gen/streampipes-model'; +import { map } from 'rxjs/operators'; + +@Injectable({ + providedIn: 'root', +}) +export class ServiceTagService { + constructor( + private http: HttpClient, + private platformServicesCommons: PlatformServicesCommons, + ) {} + + getCustomServiceTags(): Observable { + return this.http + .get(`${this.platformServicesCommons.apiBasePath}/service-tags`) + .pipe( + map(response => { + return (response as any[]).map(p => + SpServiceTag.fromData(p), + ); + }), + ); + } +} diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts index a8c7743976..b0de88590b 100644 --- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts +++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts @@ -20,7 +20,7 @@ /* tslint:disable */ /* eslint-disable */ // @ts-nocheck -// Generated using typescript-generator version 3.2.1263 on 2024-04-22 14:32:58. +// Generated using typescript-generator version 3.2.1263 on 2024-04-30 20:29:03. export class NamedStreamPipesEntity { '@class': @@ -112,6 +112,7 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { 'correspondingServiceGroup': string; 'createdAt': number; 'dataStream': SpDataStream; + 'deploymentConfiguration': ExtensionDeploymentConfiguration; 'eventGrounding': EventGrounding; 'eventSchema': EventSchema; 'icon': string; @@ -142,6 +143,10 @@ export class AdapterDescription extends VersionedNamedStreamPipesEntity { instance.correspondingServiceGroup = data.correspondingServiceGroup; instance.createdAt = data.createdAt; instance.dataStream = SpDataStream.fromData(data.dataStream); + instance.deploymentConfiguration = + ExtensionDeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.eventGrounding = EventGrounding.fromData(data.eventGrounding); instance.eventSchema = EventSchema.fromData(data.eventSchema); instance.icon = data.icon; @@ -1794,6 +1799,26 @@ export class ExportItem { } } +export class ExtensionDeploymentConfiguration { + desiredServiceTags: SpServiceTag[]; + selectedEndpointUrl: string; + + static fromData( + data: ExtensionDeploymentConfiguration, + target?: ExtensionDeploymentConfiguration, + ): ExtensionDeploymentConfiguration { + if (!data) { + return data; + } + const instance = target || new ExtensionDeploymentConfiguration(); + instance.desiredServiceTags = __getCopyArrayFn(SpServiceTag.fromData)( + data.desiredServiceTags, + ); + instance.selectedEndpointUrl = data.selectedEndpointUrl; + return instance; + } +} + export class ExtensionItemDescription { appId: string; available: boolean; @@ -3245,6 +3270,7 @@ export class RuntimeOptionsRequest { | 'org.apache.streampipes.model.runtime.RuntimeOptionsResponse'; 'appId': string; 'belongsTo': string; + 'deploymentConfiguration': ExtensionDeploymentConfiguration; 'inputStreams': SpDataStream[]; 'requestId': string; 'staticProperties': StaticPropertyUnion[]; @@ -3260,6 +3286,10 @@ export class RuntimeOptionsRequest { instance['@class'] = data['@class']; instance.appId = data.appId; instance.belongsTo = data.belongsTo; + instance.deploymentConfiguration = + ExtensionDeploymentConfiguration.fromData( + data.deploymentConfiguration, + ); instance.inputStreams = __getCopyArrayFn(SpDataStream.fromData)( data.inputStreams, ); @@ -4176,6 +4206,7 @@ export type SpServiceStatus = export type SpServiceTagPrefix = | 'SP_GROUP' + | 'CUSTOM' | 'ADAPTER' | 'DATA_STREAM' | 'DATA_PROCESSOR' diff --git a/ui/projects/streampipes/platform-services/src/public-api.ts b/ui/projects/streampipes/platform-services/src/public-api.ts index e8aae7f708..f1ae0c6015 100644 --- a/ui/projects/streampipes/platform-services/src/public-api.ts +++ b/ui/projects/streampipes/platform-services/src/public-api.ts @@ -45,6 +45,7 @@ export * from './lib/apis/pipeline-element-template.service'; export * from './lib/apis/pipeline-monitoring.service'; export * from './lib/apis/pipeline-template.service'; export * from './lib/apis/semantic-types-rest.service'; +export * from './lib/apis/service-tags.service'; export * from './lib/apis/user.service'; export * from './lib/apis/user-admin.service'; export * from './lib/apis/user-group.service'; diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html new file mode 100644 index 0000000000..64d30f5980 --- /dev/null +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.html @@ -0,0 +1,78 @@ + + +
+ + Any available service + + Restrict to service tags + + + +
+ + Service Tags + + + {{ serviceTag.value }} + + + + + + + {{ serviceTag }} + + + +
+
diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts new file mode 100644 index 0000000000..c019e9c369 --- /dev/null +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-deployment-settings/adapter-deployment-settings.component.ts @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, ElementRef, Input, OnInit, ViewChild } from '@angular/core'; +import { + ExtensionDeploymentConfiguration, + ServiceTagService, + SpServiceTag, +} from '@streampipes/platform-services'; +import { Observable } from 'rxjs'; +import { FormControl } from '@angular/forms'; +import { COMMA, ENTER } from '@angular/cdk/keycodes'; +import { MatChipInputEvent } from '@angular/material/chips'; +import { map, startWith } from 'rxjs/operators'; +import { MatAutocompleteSelectedEvent } from '@angular/material/autocomplete'; +import { MatRadioChange } from '@angular/material/radio'; + +@Component({ + selector: 'sp-adapter-deployment-settings', + templateUrl: './adapter-deployment-settings.component.html', +}) +export class SpAdapterDeploymentSettingsComponent implements OnInit { + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + + availableServiceTags: SpServiceTag[] = []; + availableServiceTagValues: string[] = []; + + deploymentMode = 'all'; + + separatorKeysCodes: number[] = [ENTER, COMMA]; + serviceTagCtrl = new FormControl(''); + filteredServiceTags: Observable; + + @ViewChild('serviceTagInput') serviceTagInput: ElementRef; + + constructor(private serviceTagService: ServiceTagService) { + this.filteredServiceTags = this.serviceTagCtrl.valueChanges.pipe( + startWith(null), + map((serviceTagValue: string | null) => { + return serviceTagValue + ? this._filter(serviceTagValue) + : this.availableServiceTagValues.slice(); + }), + ); + } + + ngOnInit(): void { + if (this.deploymentConfiguration.desiredServiceTags.length > 0) { + this.deploymentMode = 'filter'; + } + this.serviceTagService.getCustomServiceTags().subscribe(res => { + this.availableServiceTags = res; + this.availableServiceTagValues = res.map(st => st.value); + }); + } + + add(event: MatChipInputEvent): void { + const value = (event.value || '').trim(); + + if (value) { + this.deploymentConfiguration.desiredServiceTags.push( + this.findTag(value), + ); + } + + event.chipInput!.clear(); + + this.serviceTagCtrl.setValue(null); + } + + findTag(value: string): SpServiceTag { + return this.availableServiceTags.find(st => st.value === value); + } + + remove(serviceTag: string): void { + const index = this.deploymentConfiguration.desiredServiceTags.findIndex( + st => st.value === serviceTag, + ); + + if (index >= 0) { + this.deploymentConfiguration.desiredServiceTags.splice(index, 1); + } + } + + selected(event: MatAutocompleteSelectedEvent): void { + this.deploymentConfiguration.desiredServiceTags.push( + this.findTag(event.option.viewValue), + ); + this.serviceTagInput.nativeElement.value = ''; + this.serviceTagCtrl.setValue(null); + } + + private _filter(value: string): string[] { + const filterValue = value.toLowerCase(); + + return this.availableServiceTagValues.filter(st => + st.toLowerCase().includes(filterValue), + ); + } + + handleSelectionChange(event: MatRadioChange): void { + if (event.value === 'all') { + this.deploymentConfiguration.desiredServiceTags = []; + } + } +} diff --git a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html index 4b3be6f469..4e5c556cb2 100644 --- a/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html +++ b/ui/src/app/connect/components/adapter-configuration/adapter-settings/adapter-settings.component.html @@ -18,6 +18,17 @@
+ + + + diff --git a/ui/src/app/connect/components/configuration-group/configuration-group.component.html b/ui/src/app/connect/components/configuration-group/configuration-group.component.html index a09b11b734..68569ade6d 100644 --- a/ui/src/app/connect/components/configuration-group/configuration-group.component.html +++ b/ui/src/app/connect/components/configuration-group/configuration-group.component.html @@ -25,6 +25,7 @@
implements OnInit { + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + @Output() inputEmitter: EventEmitter = new EventEmitter(); completedStaticProperty: ConfigurationInfo; diff --git a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html index 29b084725b..c2db12bedb 100644 --- a/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html +++ b/ui/src/app/core-ui/static-properties/static-collection/static-collection.component.html @@ -24,6 +24,7 @@ >
{ + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + constructor() { super(); } diff --git a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html index bd8d67bde3..fb77d388b8 100644 --- a/ui/src/app/core-ui/static-properties/static-group/static-group.component.html +++ b/ui/src/app/core-ui/static-properties/static-group/static-group.component.html @@ -20,6 +20,7 @@ implements OnInit { + @Input() + deploymentConfiguration: ExtensionDeploymentConfiguration; + @Output() inputEmitter: EventEmitter = new EventEmitter(); dependentStaticProperties: Map = new Map< diff --git a/ui/src/app/core-ui/static-properties/static-property.component.html b/ui/src/app/core-ui/static-properties/static-property.component.html index 34aaad2028..4ecbd7c49b 100644 --- a/ui/src/app/core-ui/static-properties/static-property.component.html +++ b/ui/src/app/core-ui/static-properties/static-property.component.html @@ -86,6 +86,7 @@