diff --git a/streampipes-data-explorer-management/src/main/java/org/apache/streampipes/dataexplorer/management/DataExplorerDispatcher.java b/streampipes-data-explorer-management/src/main/java/org/apache/streampipes/dataexplorer/management/DataExplorerDispatcher.java
new file mode 100644
index 0000000000..96ba5b70ae
--- /dev/null
+++ b/streampipes-data-explorer-management/src/main/java/org/apache/streampipes/dataexplorer/management/DataExplorerDispatcher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.management;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerManager;
+import org.apache.streampipes.dataexplorer.influx.DataExplorerManagerInflux;
+
+public class DataExplorerDispatcher {
+
+ public IDataExplorerManager getDataExplorerManager() {
+
+ // currently this SWITCH CASE statement is not necessary
+ // but aims to give an idea how to deal with multiple data explorer storages
+ return switch (Environments.getEnvironment()
+ .getTsStorage()
+ .getValueOrDefault()) {
+ case SupportedDataExplorerStorages.INFLUX_DB -> DataExplorerManagerInflux.INSTANCE;
+ default -> DataExplorerManagerInflux.INSTANCE;
+ };
+ }
+}
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/package-info.java b/streampipes-data-explorer-management/src/main/java/org/apache/streampipes/dataexplorer/management/SupportedDataExplorerStorages.java
similarity index 65%
rename from streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/package-info.java
rename to streampipes-data-explorer-management/src/main/java/org/apache/streampipes/dataexplorer/management/SupportedDataExplorerStorages.java
index c96a83d257..70af654be8 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/package-info.java
+++ b/streampipes-data-explorer-management/src/main/java/org/apache/streampipes/dataexplorer/management/SupportedDataExplorerStorages.java
@@ -16,10 +16,16 @@
*
*/
+package org.apache.streampipes.dataexplorer.management;
+
/**
- * This package contains classes that have direct dependencies on InfluxDB-specific implementations.
- * To enhance code organization and promote agnostic design, these classes are slated for migration to other modules,
- * such as 'streampipes-data-explorer' or 'streampipes-data-explorer-api'.
- * Pending migration, they are temporarily housed within this package.
+ * Class containing constants for supported time series storage implementations.
+ *
+ * Supported time series storage implementations:
+ *
+ * - {@link #INFLUX_DB}: Represents the InfluxDB time series storage implementation.
+ *
*/
-package org.apache.streampipes.dataexplorer.influx.migrate;
\ No newline at end of file
+public class SupportedDataExplorerStorages {
+ public static final String INFLUX_DB = "influxdb";
+}
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java
similarity index 90%
rename from streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/AutoAggregationHandler.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java
index 692e7412be..c0751c7fee 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/AutoAggregationHandler.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/AutoAggregationHandler.java
@@ -15,15 +15,14 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.influx.migrate;
+package org.apache.streampipes.dataexplorer;
-import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
-import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
-import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.param.model.SelectColumn;
import org.apache.streampipes.model.datalake.DataLakeQueryOrdering;
import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.apache.streampipes.storage.management.StorageDispatcher;
+import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
+import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,16 +49,10 @@ public class AutoAggregationHandler {
private final IDataExplorerQueryManagement dataLakeQueryManagement;
private final ProvidedRestQueryParams queryParams;
- public AutoAggregationHandler(ProvidedRestQueryParams params) {
+ public AutoAggregationHandler(ProvidedRestQueryParams params,
+ IDataExplorerQueryManagement dataExplorerQueryManagement) {
this.queryParams = params;
- this.dataLakeQueryManagement = getDataLakeQueryManagement();
- }
-
- private IDataExplorerQueryManagement getDataLakeQueryManagement() {
- var dataLakeStorage = StorageDispatcher.INSTANCE
- .getNoSqlStore()
- .getDataLakeStorage();
- return new DataExplorerQueryManagement(new DataExplorerSchemaManagement(dataLakeStorage));
+ this.dataLakeQueryManagement = dataExplorerQueryManagement;
}
public ProvidedRestQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
@@ -129,7 +122,7 @@ private SpQueryResult getSingleRecord(DataLakeQueryOrdering order) throws ParseE
singleEvent.update(SupportedRestQueryParams.QP_LIMIT, 1);
singleEvent.update(SupportedRestQueryParams.QP_ORDER, order.name());
singleEvent.update(SupportedRestQueryParams.QP_COLUMNS, transformColumns(singleEvent.getAsString(
- SupportedRestQueryParams.QP_COLUMNS)));
+ SupportedRestQueryParams.QP_COLUMNS)));
return fireQuery(singleEvent);
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeMeasurementSanitizer.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeMeasurementSanitizer.java
new file mode 100644
index 0000000000..9c8904ae8e
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeMeasurementSanitizer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer;
+
+import org.apache.streampipes.client.api.IStreamPipesClient;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.api.IDataLakeMeasurementSanitizer;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.schema.EventProperty;
+
+import java.util.List;
+
+/**
+ * Base class with shared implementation that is common for all time series storage backends.
+ * Leaves open the storage specific implementation
+ */
+public abstract class DataLakeMeasurementSanitizer implements IDataLakeMeasurementSanitizer {
+
+ protected final DataLakeMeasure measure;
+ protected final IStreamPipesClient client;
+
+ public DataLakeMeasurementSanitizer(IStreamPipesClient client, DataLakeMeasure measure){
+ this.client = client;
+ this.measure = measure;
+ }
+
+ /**
+ * Sanitizes the data lake measure and registers it with the data lake.
+ *
+ * This method first sanitizes the data lake measure,
+ * then registers it at the data lake.
+ *
+ * @return The sanitized and registered data lake measure.
+ */
+ @Override
+ public DataLakeMeasure sanitizeAndRegister(){
+ sanitizeDataLakeMeasure();
+ registerAtDataLake();
+
+ return measure;
+ }
+
+ /**
+ * Sanitizes the data lake measure and updates it in the data lake.
+ *
+ * This method first sanitizes the data lake measure,
+ * then updates it at the data lake.
+ *
+ * @return The sanitized and updated data lake measure.
+ */
+ @Override
+ public DataLakeMeasure sanitizeAndUpdate(){
+ sanitizeDataLakeMeasure();
+ updateAtDataLake();
+
+ return measure;
+ }
+
+
+
+ private void registerAtDataLake() throws SpRuntimeException {
+ client.dataLakeMeasureApi().create(measure);
+ }
+
+ private void updateAtDataLake() throws SpRuntimeException {
+ client.dataLakeMeasureApi().update(measure);
+ }
+
+ private void sanitizeDataLakeMeasure() throws SpRuntimeException {
+ removeTimestampsFromEventSchema();
+ cleanDataLakeMeasure();
+ }
+
+ /**
+ * Cleans the data lake measure to ensure compliance with the requirements of the respective time series storage.
+ *
+ * This method performs the following steps:
+ *
+ * - Sanitizes the name of the measure.
+ * - Sanitizes all runtime names associated with the measure.
+ *
+ * @throws SpRuntimeException if an error occurs during the cleaning process.
+ */
+ protected abstract void cleanDataLakeMeasure() throws SpRuntimeException;
+
+ protected void removeTimestampsFromEventSchema() throws SpRuntimeException{
+ var timestampField = measure.getTimestampField();
+
+ if (timestampField == null){
+ throw new SpRuntimeException("Data lake measurement does not have a timestamp field - timestamp field is null.");
+ }
+
+ List eventPropertiesWithoutTimestamp = measure.getEventSchema()
+ .getEventProperties()
+ .stream()
+ .filter(eventProperty -> !timestampField.endsWith(
+ eventProperty.getRuntimeName()
+ ))
+ .toList();
+ measure.getEventSchema().setEventProperties(eventPropertiesWithoutTimestamp);
+ }
+}
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/ImageStore.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/ImageStore.java
similarity index 91%
rename from streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/ImageStore.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/ImageStore.java
index 6cfd2087be..0b961b6b50 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/ImageStore.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/ImageStore.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.dataexplorer.influx.migrate;
+package org.apache.streampipes.dataexplorer;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -27,8 +27,6 @@
import org.apache.commons.codec.binary.Base64;
import org.lightcouch.CouchDbClient;
import org.lightcouch.CouchDbProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -36,12 +34,10 @@
import java.util.UUID;
public class ImageStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(ImageStore.class);
private static final String DB_NAME = "images";
- private List imageProperties;
- private CouchDbClient couchDbClient;
+ private final List imageProperties;
+ private final CouchDbClient couchDbClient;
public ImageStore(DataLakeMeasure measure,
Environment environment) {
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/ImageStoreUtils.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/ImageStoreUtils.java
similarity index 91%
rename from streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/ImageStoreUtils.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/ImageStoreUtils.java
index 6504303a5a..59d4719a7d 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/ImageStoreUtils.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/ImageStoreUtils.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.dataexplorer.influx.migrate;
+package org.apache.streampipes.dataexplorer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.schema.EventProperty;
@@ -30,7 +30,8 @@ public class ImageStoreUtils {
public static List getImageProperties(DataLakeMeasure measure) {
return measure.getEventSchema().getEventProperties().stream()
.filter(eventProperty -> eventProperty.getDomainProperties() != null
- && eventProperty.getDomainProperties().size() > 0
+ && !eventProperty.getDomainProperties()
+ .isEmpty()
&& eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
.collect(Collectors.toList());
}
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java
similarity index 63%
rename from streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/QueryResultProvider.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java
index 85a23b188f..eeeadec24f 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/QueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/QueryResultProvider.java
@@ -16,44 +16,56 @@
*
*/
-package org.apache.streampipes.dataexplorer.influx.migrate;
+package org.apache.streampipes.dataexplorer;
-import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
+import org.apache.streampipes.dataexplorer.query.DataExplorerQueryExecutor;
+import org.apache.streampipes.model.datalake.SpQueryResult;
import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams;
-import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.util.Optional;
public class QueryResultProvider {
public static final String FOR_ID_KEY = "forId";
protected final boolean ignoreMissingData;
+ protected final IDataExplorerQueryManagement dataExplorerQueryManagement;
+ protected final DataExplorerQueryExecutor, ?> queryExecutor;
protected ProvidedRestQueryParams queryParams;
public QueryResultProvider(ProvidedRestQueryParams queryParams,
+ IDataExplorerQueryManagement dataExplorerQueryManagement,
+ DataExplorerQueryExecutor, ?> queryExecutor,
boolean ignoreMissingData) {
this.queryParams = queryParams;
this.ignoreMissingData = ignoreMissingData;
+ this.dataExplorerQueryManagement = dataExplorerQueryManagement;
+ this.queryExecutor = queryExecutor;
}
public SpQueryResult getData() {
if (queryParams.has(SupportedRestQueryParams.QP_AUTO_AGGREGATE)) {
- queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
+ queryParams = new AutoAggregationHandler(queryParams,
+ dataExplorerQueryManagement).makeAutoAggregationQueryParams();
}
SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(queryParams);
if (queryParams.getProvidedParams().containsKey(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
- int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS));
- return new DataExplorerInfluxQueryExecutor(maximumAmountOfEvents).executeQuery(qp, ignoreMissingData);
+ int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams()
+ .get(SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS)
+ );
+ return queryExecutor.executeQuery(qp, maximumAmountOfEvents, Optional.empty(), ignoreMissingData);
}
if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
- return new DataExplorerInfluxQueryExecutor(forWidgetId).executeQuery(qp, ignoreMissingData);
+ return queryExecutor.executeQuery(qp, -1, Optional.of(forWidgetId), ignoreMissingData);
} else {
- return new DataExplorerInfluxQueryExecutor().executeQuery(qp, ignoreMissingData);
+ return queryExecutor.executeQuery(qp, -1, Optional.empty(), ignoreMissingData);
}
}
}
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/StreamedQueryResultProvider.java
similarity index 90%
rename from streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/StreamedQueryResultProvider.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/StreamedQueryResultProvider.java
index 52ed5d7881..18e288ec98 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/StreamedQueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/StreamedQueryResultProvider.java
@@ -16,15 +16,17 @@
*
*/
-package org.apache.streampipes.dataexplorer.influx.migrate;
+package org.apache.streampipes.dataexplorer;
-import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
-import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.export.ConfiguredOutputWriter;
import org.apache.streampipes.dataexplorer.export.OutputFormat;
+import org.apache.streampipes.dataexplorer.query.DataExplorerQueryExecutor;
import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
+import org.apache.streampipes.model.datalake.param.SupportedRestQueryParams;
import java.io.IOException;
import java.io.OutputStream;
@@ -40,8 +42,10 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
public StreamedQueryResultProvider(ProvidedRestQueryParams params,
OutputFormat format,
+ IDataExplorerQueryManagement dataExplorerQueryManagement,
+ DataExplorerQueryExecutor, ?> queryExecutor,
boolean ignoreMissingValues) {
- super(params, ignoreMissingValues);
+ super(params, dataExplorerQueryManagement, queryExecutor, ignoreMissingValues);
this.format = format;
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/TimeSeriesStorage.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/TimeSeriesStorage.java
new file mode 100644
index 0000000000..ceaa6b9dc0
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/TimeSeriesStorage.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.api.ITimeSeriesStorage;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class TimeSeriesStorage implements ITimeSeriesStorage {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesStorage.class);
+
+ protected final DataLakeMeasure measure;
+ protected final List allEventProperties;
+ protected final Map sanitizedRuntimeNames = new HashMap<>();
+
+ public TimeSeriesStorage(DataLakeMeasure measure) {
+ this.measure = measure;
+ storeSanitizedRuntimeNames();
+ allEventProperties = getAllEventPropertiesExceptTimestamp();
+ }
+
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ validateInputEventAndLogMissingFields(event);
+ sanitizeRuntimeNamesInEvent(event);
+ writeToTimeSeriesStorage(event);
+ }
+
+ private void validateInputEventAndLogMissingFields(Event event) {
+ checkEventIsNotNull(event);
+
+ logMissingFields(event);
+
+ logNullFields(event);
+ }
+
+ private void checkEventIsNotNull(Event event) {
+ if (event == null) {
+ throw new SpRuntimeException("Input event is null");
+ }
+ }
+
+ /**
+ * Logs all fields which are present in the schema, but not in the provided event
+ */
+ private void logMissingFields(Event event) {
+ var missingFields = getMissingProperties(allEventProperties, event);
+ if (!missingFields.isEmpty()) {
+ LOG.debug(
+ "Ignored {} fields which were present in the schema, but not in the provided event: {}",
+ missingFields.size(),
+ String.join(", ", missingFields)
+ );
+ }
+ }
+
+ /**
+ * Returns a list of the runtime names that are missing within the event
+ */
+ private List getMissingProperties(
+ List allEventProperties,
+ Event event
+ ) {
+ return allEventProperties.stream()
+ .map(EventProperty::getRuntimeName)
+ .filter(runtimeName -> event.getOptionalFieldByRuntimeName(runtimeName)
+ .isEmpty())
+ .toList();
+ }
+
+ /**
+ * Logs all fields that contain null values
+ */
+ private void logNullFields(Event event) {
+ List nullFields = allEventProperties
+ .stream()
+ .filter(EventPropertyPrimitive.class::isInstance)
+ .filter(ep -> {
+ var runtimeName = ep.getRuntimeName();
+ var field = event.getOptionalFieldByRuntimeName(runtimeName);
+
+ return field.isPresent() && field.get()
+ .getAsPrimitive()
+ .getRawValue() == null;
+ })
+ .map(EventProperty::getRuntimeName)
+ .collect(Collectors.toList());
+
+ if (!nullFields.isEmpty()) {
+ LOG.warn("Ignored {} fields which had a value 'null': {}", nullFields.size(), String.join(", ", nullFields));
+ }
+ }
+
+ /**
+ * Returns all measurements properties except the timestamp field
+ */
+ private List getAllEventPropertiesExceptTimestamp() {
+ return measure.getEventSchema()
+ .getEventProperties()
+ .stream()
+ .filter(ep -> !measure.getTimestampField()
+ .endsWith(ep.getRuntimeName()))
+ .toList();
+ }
+
+ /**
+ * store sanitized target property runtime names in variable `sanitizedRuntimeNames`
+ */
+ protected abstract void storeSanitizedRuntimeNames();
+
+ /**
+ * Iterates over all properties of the event and renames the key if it is a reserved keywords in InfluxDB
+ */
+ protected abstract void sanitizeRuntimeNamesInEvent(Event event);
+
+ protected abstract void writeToTimeSeriesStorage(Event event) throws SpRuntimeException;
+}
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/TimeSeriesStore.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/TimeSeriesStore.java
similarity index 72%
rename from streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/TimeSeriesStore.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/TimeSeriesStore.java
index ef8af9a24d..8a0a0fcc66 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/migrate/TimeSeriesStore.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/TimeSeriesStore.java
@@ -16,13 +16,11 @@
*
*/
-package org.apache.streampipes.dataexplorer.influx.migrate;
+package org.apache.streampipes.dataexplorer;
-import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
-import org.apache.streampipes.dataexplorer.influx.InfluxStore;
+import org.apache.streampipes.dataexplorer.api.ITimeSeriesStorage;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
@@ -34,23 +32,21 @@
public class TimeSeriesStore {
private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesStore.class);
- private final InfluxStore influxStore;
+ private final ITimeSeriesStorage timeSeriesStorage;
private ImageStore imageStore;
- public TimeSeriesStore(Environment environment,
- IStreamPipesClient client,
- DataLakeMeasure measure,
- boolean enableImageStore) {
-
- DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure);
+ public TimeSeriesStore(
+ ITimeSeriesStorage timeSeriesStorage,
+ DataLakeMeasure measure,
+ Environment environment,
+ boolean enableImageStore
+ ) {
if (enableImageStore) {
this.imageStore = new ImageStore(measure, environment);
}
-
- this.influxStore = new InfluxStore(measure, environment, new InfluxClientProvider());
-
+ this.timeSeriesStorage = timeSeriesStorage;
}
public boolean onEvent(Event event) throws SpRuntimeException {
@@ -60,7 +56,7 @@ public boolean onEvent(Event event) throws SpRuntimeException {
}
// Store event in time series database
- this.influxStore.onEvent(event);
+ this.timeSeriesStorage.onEvent(event);
return true;
}
@@ -75,6 +71,6 @@ public void close() throws SpRuntimeException {
}
}
- this.influxStore.close();
+ this.timeSeriesStorage.close();
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
index d56856257f..347070a804 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
@@ -29,54 +29,39 @@
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.Optional;
public abstract class DataExplorerQueryExecutor {
private static final Logger LOG = LoggerFactory.getLogger(DataExplorerQueryExecutor.class);
- protected int maximumAmountOfEvents;
-
- protected boolean appendId = false;
- protected String forId;
-
- public DataExplorerQueryExecutor() {
- this.maximumAmountOfEvents = -1;
- }
-
- public DataExplorerQueryExecutor(String forId) {
- this();
- this.appendId = true;
- this.forId = forId;
- }
-
- public DataExplorerQueryExecutor(int maximumAmountOfEvents) {
- this();
- this.maximumAmountOfEvents = maximumAmountOfEvents;
- }
/**
* Execute the data explorer query and return the result or a warning message
* in case the maximum amount of events to return is defined
*/
public SpQueryResult executeQuery(SelectQueryParams params,
+ int maximumAmountOfEvents,
+ Optional forIdOpt,
boolean ignoreMissingValues) throws RuntimeException {
X query = makeSelectQuery(params);
- var result = executeQuery(query, ignoreMissingValues);
- if (this.maximumAmountOfEvents != -1) {
- return validateAndReturnQueryResult(result, params.getLimit());
+ var result = executeQuery(query, forIdOpt, ignoreMissingValues);
+ if (maximumAmountOfEvents != -1) {
+ return validateAndReturnQueryResult(result, params.getLimit(), maximumAmountOfEvents);
} else {
return result;
}
}
private SpQueryResult validateAndReturnQueryResult(SpQueryResult queryResult,
- int limit) {
+ int limit,
+ int maximumAmountOfEvents) {
var amountOfResults = queryResult.getAllDataSeries()
.stream()
.mapToInt(DataSeries::getTotal)
.sum();
var amountOfQueryResults = limit == Integer.MIN_VALUE ? amountOfResults : Math.min(amountOfResults, limit);
- if (amountOfQueryResults > this.maximumAmountOfEvents) {
+ if (amountOfQueryResults > maximumAmountOfEvents) {
return makeTooMuchDataResult(amountOfQueryResults);
} else {
return queryResult;
@@ -91,10 +76,11 @@ private SpQueryResult makeTooMuchDataResult(int amountOfQueryResults) {
}
public SpQueryResult executeQuery(DeleteQueryParams params) {
- return executeQuery(makeDeleteQuery(params), true);
+ return executeQuery(makeDeleteQuery(params), Optional.empty(), true);
}
public SpQueryResult executeQuery(X query,
+ Optional forIdOpt,
boolean ignoreMissingValues) {
if (LOG.isDebugEnabled()) {
LOG.debug("Data Lake Query {}", asQueryString(query));
@@ -105,10 +91,11 @@ public SpQueryResult executeQuery(X query,
LOG.debug("Data Lake Query Result: {}", result.toString());
}
- return postQuery(result, ignoreMissingValues);
+ return postQuery(result, forIdOpt, ignoreMissingValues);
}
protected abstract SpQueryResult postQuery(W queryResult,
+ Optional forIdOpt,
boolean ignoreMissingValues);
public abstract W executeQuery(X query);
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
index d9b9f7d216..239a09c3ac 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
@@ -41,19 +41,25 @@
org.apache.streampipes
- streampipes-extensions-management
+ streampipes-data-explorer-influx
0.95.0-SNAPSHOT
org.apache.streampipes
- streampipes-wrapper-standalone
+ streampipes-data-explorer-management
0.95.0-SNAPSHOT
org.apache.streampipes
- streampipes-data-explorer-influx
+ streampipes-extensions-management
0.95.0-SNAPSHOT
+
+ org.apache.streampipes
+ streampipes-wrapper-standalone
+ 0.95.0-SNAPSHOT
+
+
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 2a8250cb82..ef88bbdcdc 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -20,11 +20,13 @@
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataexplorer.influx.migrate.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.TimeSeriesStore;
+import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataLakeMeasureSchemaUpdateStrategy;
+import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
@@ -34,7 +36,6 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
@@ -54,25 +55,25 @@ public class DataLakeSink extends StreamPipesDataSink {
@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder
- .create("org.apache.streampipes.sinks.internal.jvm.datalake", 1)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .category(DataSinkType.INTERNAL)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE
- )
- .build())
- .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
- .requiredSingleValueSelection(
- Labels.withId(SCHEMA_UPDATE_KEY),
- Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION)
- )
-
- .build();
+ .create("org.apache.streampipes.sinks.internal.jvm.datalake", 1)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
+ .category(DataSinkType.INTERNAL)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+ .requiredSingleValueSelection(
+ Labels.withId(SCHEMA_UPDATE_KEY),
+ Options.from(SCHEMA_UPDATE_OPTION, EXTEND_EXISTING_SCHEMA_OPTION)
+ )
+
+ .build();
}
@Override
@@ -95,10 +96,14 @@ public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeC
measure.setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA);
}
+ measure = new DataExplorerDispatcher().getDataExplorerManager()
+ .getMeasurementSanitizer(runtimeContext.getStreamPipesClient(), measure)
+ .sanitizeAndRegister();
+
this.timeSeriesStore = new TimeSeriesStore(
- Environments.getEnvironment(),
- runtimeContext.getStreamPipesClient(),
+ new DataExplorerDispatcher().getDataExplorerManager().getTimeseriesStorage(measure),
measure,
+ Environments.getEnvironment(),
true
);
diff --git a/streampipes-platform-services/pom.xml b/streampipes-platform-services/pom.xml
index 13f92da128..ffe439094a 100644
--- a/streampipes-platform-services/pom.xml
+++ b/streampipes-platform-services/pom.xml
@@ -38,6 +38,11 @@
streampipes-data-explorer-influx
0.95.0-SNAPSHOT
+
+ org.apache.streampipes
+ streampipes-data-explorer-management
+ 0.95.0-SNAPSHOT
+
org.apache.streampipes
streampipes-model
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index 64c009e7a8..266bca7b27 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -18,12 +18,10 @@
package org.apache.streampipes.ps;
-import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
-import org.apache.streampipes.dataexplorer.influx.migrate.DataLakeMeasurementCount;
+import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
-import org.apache.streampipes.storage.management.StorageDispatcher;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
@@ -48,10 +46,8 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
private final IDataExplorerSchemaManagement dataLakeMeasureManagement;
public DataLakeMeasureResourceV4() {
- var dataLakeStorage = StorageDispatcher.INSTANCE
- .getNoSqlStore()
- .getDataLakeStorage();
- this.dataLakeMeasureManagement = new DataExplorerSchemaManagement(dataLakeStorage);
+ this.dataLakeMeasureManagement = new DataExplorerDispatcher().getDataExplorerManager()
+ .getSchemaManagement();
}
@PostMapping(
@@ -67,7 +63,12 @@ public ResponseEntity addDataLake(@RequestBody DataLakeMeasure
public ResponseEntity
+
+ org.apache.streampipes
+ streampipes-data-explorer-management
+ 0.95.0-SNAPSHOT
+
org.apache.streampipes
streampipes-data-export
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index 3c68acf0b9..3d57aa8ce7 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -22,8 +22,7 @@
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
-import org.apache.streampipes.dataexplorer.influx.migrate.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
import org.apache.streampipes.manager.file.FileManager;
import org.apache.streampipes.manager.pipeline.PipelineCacheManager;
import org.apache.streampipes.manager.pipeline.PipelineCanvasMetadataCacheManager;
@@ -137,12 +136,12 @@ private static void deleteAllFiles() {
}
private static void removeAllDataInDataLake() {
- var dataLakeStorage = StorageDispatcher.INSTANCE
- .getNoSqlStore()
- .getDataLakeStorage();
- var dataLakeMeasureManagement = new DataExplorerSchemaManagement(dataLakeStorage);
- var dataExplorerQueryManagement =
- new DataExplorerQueryManagement(dataLakeMeasureManagement);
+ var dataLakeMeasureManagement = new DataExplorerDispatcher()
+ .getDataExplorerManager()
+ .getSchemaManagement();
+ var dataExplorerQueryManagement = new DataExplorerDispatcher()
+ .getDataExplorerManager()
+ .getQueryManagement(dataLakeMeasureManagement);
List allMeasurements = dataLakeMeasureManagement.getAllMeasurements();
allMeasurements.forEach(measurement -> {
boolean isSuccessDataLake = dataExplorerQueryManagement.deleteData(measurement.getMeasureName());