Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into abstract-data-explorer…
Browse files Browse the repository at this point in the history
…-from-influx
  • Loading branch information
bossenti committed May 2, 2024
2 parents 79a490c + d69504c commit b3796ae
Show file tree
Hide file tree
Showing 65 changed files with 1,863 additions and 740 deletions.
8 changes: 7 additions & 1 deletion VULNERABILITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,11 @@
| https://osv.dev/GHSA-6mjq-h674-j845 | 6.5 | Maven | io.netty:netty-handler | 4.1.72.Final | pom.xml |
| https://osv.dev/GHSA-w596-4wvx-j9j6<br/>https://osv.dev/PYSEC-2022-42969 | 7.5 | PyPI | py | 1.11.0 | streampipes-client-python/poetry.lock |
| https://osv.dev/GHSA-269g-pwp5-87pp | 4.4 | Maven | junit:junit (dev) | 4.8.2 | streampipes-maven-plugin/pom.xml |
| https://osv.dev/GHSA-wf5p-g6vw-rhxx | 6.5 | npm | axios | 0.21.4 | ui/package-lock.json |
| https://osv.dev/GHSA-rv95-896h-c2vc | 6.1 | npm | express (dev) | 4.18.3 | ui/package-lock.json |
| https://osv.dev/GHSA-cxjh-pqwp-8mfp | 6.5 | npm | follow-redirects | 1.15.5 | ui/package-lock.json |
| https://osv.dev/GHSA-4943-9vgg-gr5r | 6.1 | npm | quill | 1.3.7 | ui/package-lock.json |
| https://osv.dev/GHSA-wr3j-pwj9-hqq6 | 7.4 | npm | webpack-dev-middleware (dev) | 6.1.1 | ui/package-lock.json |
| https://osv.dev/GHSA-f5x3-32g6-xq36 | 6.5 | npm | tar (dev) | 6.2.0 | ui/package-lock.json |
| https://osv.dev/GHSA-9qxr-qj54-h672 | 2.6 | npm | undici (dev) | 6.7.1 | ui/package-lock.json |
| https://osv.dev/GHSA-m4v8-wqvr-p9f7 | 3.9 | npm | undici (dev) | 6.7.1 | ui/package-lock.json |
| https://osv.dev/GHSA-8jhw-289h-jh2g | 5.9 | npm | vite (dev) | 5.1.5 | ui/package-lock.json |
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<apache-sis-referencing.version>1.2</apache-sis-referencing.version>
<boofcv.version>1.1.0</boofcv.version>
<classindex.version>3.9</classindex.version>
<checker-qual.version>3.42.0</checker-qual.version>
<checker-qual.version>3.43.0</checker-qual.version>
<commons-codec.version>1.17.0</commons-codec.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-compress.version>1.26.0</commons-compress.version>
Expand All @@ -67,7 +67,7 @@
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.9</httpcore.version>
<influxdb.version>2.24</influxdb.version>
<inlong.version>1.11.0</inlong.version>
<inlong.version>1.12.0</inlong.version>
<jackson.version>2.17.0</jackson.version>
<jackson.databind.version>2.17.0</jackson.databind.version>
<jakarta-annotation.version>3.0.0</jakarta-annotation.version>
Expand Down Expand Up @@ -114,7 +114,7 @@
<rendersnake.version>1.9.0</rendersnake.version>
<rocketmq.version>5.0.2</rocketmq.version>
<siddhi.version>5.1.27</siddhi.version>
<simple-java-mail.version>8.9.0</simple-java-mail.version>
<simple-java-mail.version>8.10.0</simple-java-mail.version>
<slack-api.version>1.4.0</slack-api.version>
<slf4j.version>2.0.6</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
Expand Down Expand Up @@ -868,6 +868,7 @@
<module>streampipes-storage-couchdb</module>
<module>streampipes-storage-management</module>
<module>streampipes-test-utils</module>
<module>streampipes-test-utils-executors</module>
<module>streampipes-user-management</module>
<module>streampipes-vocabulary</module>
<module>streampipes-wrapper</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ public enum Envs {
SP_NATS_HOST("SP_NATS_HOST", "nats"),
SP_NATS_PORT("SP_NATS_PORT", "4222"),

SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650");
SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"),

// expects a comma separated string of service names
SP_SERVICE_TAGS("SP_SERVICE_TAGS", "");

private final String envVariableName;
private String defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,9 @@ public StringEnvironmentVariable getPulsarUrl() {
return new StringEnvironmentVariable(Envs.SP_PULSAR_URL);
}

@Override
public StringEnvironmentVariable getCustomServiceTags() {
return new StringEnvironmentVariable(Envs.SP_SERVICE_TAGS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,6 @@ public interface Environment {

StringEnvironmentVariable getPulsarUrl();

StringEnvironmentVariable getCustomServiceTags();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
import org.apache.streampipes.connect.management.util.GroundingUtils;
import org.apache.streampipes.connect.management.util.WorkerPaths;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsLogProvider;
import org.apache.streampipes.manager.verification.DataStreamVerifier;
import org.apache.streampipes.model.SpDataStream;
Expand All @@ -32,12 +32,11 @@
import org.apache.streampipes.resource.management.AdapterResourceManager;
import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.List;
import java.util.NoSuchElementException;

Expand All @@ -54,12 +53,12 @@ public class AdapterMasterManagement {

private final DataStreamResourceManager dataStreamResourceManager;

public AdapterMasterManagement(IAdapterStorage adapterStorage,
public AdapterMasterManagement(IAdapterStorage adapterInstanceStorage,
AdapterResourceManager adapterResourceManager,
DataStreamResourceManager dataStreamResourceManager,
AdapterMetrics adapterMetrics
) {
this.adapterInstanceStorage = adapterStorage;
this.adapterInstanceStorage = adapterInstanceStorage;
this.adapterMetrics = adapterMetrics;
this.adapterResourceManager = adapterResourceManager;
this.dataStreamResourceManager = dataStreamResourceManager;
Expand Down Expand Up @@ -164,7 +163,11 @@ public void startStreamAdapter(String elementId) throws AdapterException {

try {
// Find endpoint to start adapter on
var baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
var baseUrl = new ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
ad.getAppId(),
SpServiceUrlProvider.ADAPTER,
ad.getDeploymentConfiguration().getDesiredServiceTags()
);

// Update selected endpoint URL of adapter
ad.setSelectedEndpointUrl(baseUrl);
Expand All @@ -177,7 +180,7 @@ public void startStreamAdapter(String elementId) throws AdapterException {
adapterMetrics.register(ad.getElementId(), ad.getName());

LOG.info("Started adapter " + elementId + " on: " + baseUrl);
} catch (NoServiceEndpointsAvailableException | URISyntaxException e) {
} catch (NoServiceEndpointsAvailableException e) {
throw new AdapterException("Could not start adapter due to unavailable service endpoint", e);
}
}
Expand All @@ -192,8 +195,4 @@ private void installDataSource(SpDataStream stream,
throw new AdapterException();
}
}

private IAdapterStorage getAdapterInstanceStorage() {
return StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import org.apache.streampipes.connect.management.AdapterEventPreviewPipeline;
import org.apache.streampipes.connect.management.util.WorkerPaths;
import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException;
import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -39,21 +43,25 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Set;

public class GuessManagement {

private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class);
private final WorkerUrlProvider workerUrlProvider;
private final IExtensionsServiceEndpointGenerator endpointGenerator;
private final ObjectMapper objectMapper;

public GuessManagement() {
this.workerUrlProvider = new WorkerUrlProvider();
this.endpointGenerator = new ExtensionsServiceEndpointGenerator();
this.objectMapper = JacksonSerializer.getObjectMapper();
}

public GuessSchema guessSchema(AdapterDescription adapterDescription)
throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException {
var workerUrl = getWorkerUrl(adapterDescription.getAppId());
var workerUrl = getWorkerUrl(
adapterDescription.getAppId(),
adapterDescription.getDeploymentConfiguration().getDesiredServiceTags()
);
var description = objectMapper.writeValueAsString(adapterDescription);

LOG.info("Guess schema at: " + workerUrl);
Expand All @@ -72,8 +80,9 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription)
}
}

private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException {
var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId);
private String getWorkerUrl(String appId,
Set<SpServiceTag> customServiceTags) throws NoServiceEndpointsAvailableException {
var baseUrl = endpointGenerator.getEndpointBaseUrl(appId, SpServiceUrlProvider.ADAPTER, customServiceTags);
return baseUrl + WorkerPaths.getGuessSchemaPath();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public class WorkerRestClient {

private static final Logger LOG = LoggerFactory.getLogger(WorkerRestClient.class);

public static void invokeStreamAdapter(String endpointUrl,
public static void invokeStreamAdapter(String baseUrl,
String elementId) throws AdapterException {
var adapterStreamDescription = getAndDecryptAdapter(elementId);
var url = endpointUrl + WorkerPaths.getStreamInvokePath();
var url = baseUrl + WorkerPaths.getStreamInvokePath();

startAdapter(url, adapterStreamDescription);
updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true);
Expand Down Expand Up @@ -127,11 +127,11 @@ private static HttpResponse triggerPost(String url,
return request.execute().returnResponse();
}

public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
public static RuntimeOptionsResponse getConfiguration(String baseUrl,
String appId,
RuntimeOptionsRequest runtimeOptionsRequest)
throws AdapterException, SpConfigurationException {
String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId);
String url = baseUrl + WorkerPaths.getRuntimeResolvablePath(appId);

try {
String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@
*/
package org.apache.streampipes.connect.management.util;

import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import java.net.URI;
import java.net.URISyntaxException;

public class WorkerPaths {

private static final String WorkerMainPath = "/api/v1/worker";
Expand All @@ -36,14 +29,6 @@ public static String getStreamStopPath() {
return WorkerMainPath + "/stream/stop";
}

public static String getSetInvokePath() {
return WorkerMainPath + "/set/invoke";
}

public static String getSetStopPath() {
return WorkerMainPath + "/set/stop";
}

public static String getRunningAdaptersPath() {
return WorkerMainPath + "/running";
}
Expand All @@ -56,12 +41,4 @@ public static String getGuessSchemaPath() {
return WorkerMainPath + "/guess/schema";
}

public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
URI uri = new URI(endpointUrl);
return uri.getScheme() + "://" + uri.getAuthority();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
<artifactId>streampipes-test-utils-executors</artifactId>
<version>0.95.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@

public class NumericalFilterProcessor implements IStreamPipesDataProcessor {

private static final String NUMBER_MAPPING = "number-mapping";
private static final String VALUE = "value";
private static final String OPERATION = "operation";
protected static final String NUMBER_MAPPING = "number-mapping";
protected static final String VALUE = "value";
protected static final String OPERATION = "operation";

private double threshold;
private NumericalOperator numericalOperator;
Expand Down
Loading

0 comments on commit b3796ae

Please sign in to comment.