Skip to content

Commit

Permalink
refactor: abstract data explorer modules from Influx-related code (#2803
Browse files Browse the repository at this point in the history
)

* refactor: introduce new env variable to determine time series storage

* refactor: abstract sanitation

* refactor: abstract query management

* refactor: abstract counter

* refactor: abstract query execution from Influx

* refactor: abstract time series store from Influx

* refactor: several adaptions

* refactor: adapt usages to new APIs

* feat: data storage management

* refactor: small improvements

* style: fix indentation & import order

* style: fix import order

* style: fix import order

* adapt spelling of influx tag

* change data explorer storage dispatcher to regular class

* adapt to new syntax

* refactor: simplify implementation
  • Loading branch information
bossenti authored May 3, 2024
1 parent a1f2555 commit b75cc2b
Show file tree
Hide file tree
Showing 39 changed files with 1,020 additions and 524 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public enum Envs {


// Time Series Storage
SP_TS_STORAGE("SP_TS_STORAGE", "influxdb"),
SP_TS_STORAGE_PROTOCOL("SP_TS_STORAGE_PROTOCOL", "http"),
SP_TS_STORAGE_HOST("SP_TS_STORAGE_HOST", "influxdb", DefaultEnvValues.LOCALHOST),
SP_TS_STORAGE_PORT("SP_TS_STORAGE_PORT", "8086"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public IntEnvironmentVariable getSpCorePort() {
return new IntEnvironmentVariable(Envs.SP_CORE_PORT);
}

@Override
public StringEnvironmentVariable getTsStorage() {
return new StringEnvironmentVariable(Envs.SP_TS_STORAGE);
}

@Override
public StringEnvironmentVariable getTsStorageProtocol() {
return new StringEnvironmentVariable(Envs.SP_TS_STORAGE_PROTOCOL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface Environment {

// Time series storage env variables

StringEnvironmentVariable getTsStorage();

StringEnvironmentVariable getTsStorageProtocol();

StringEnvironmentVariable getTsStorageHost();
Expand Down
5 changes: 5 additions & 0 deletions streampipes-data-explorer-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

<dependencies>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-client-api</artifactId>
<version>0.95.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-data-explorer-export</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.model.datalake.DataLakeMeasure;

import java.util.List;

public interface IDataExplorerManager {

/**
* Provide an instance of {@link IDataLakeMeasurementCounter} for counting the sizes of measurements within a data
* lake.
*
* @param allMeasurements A list of {@link DataLakeMeasure} objects representing all measurements in the data lake.
* @param measurementsToCount A list of measurement names for which the sizes should be counted.
* @return An instance of {@link IDataLakeMeasurementCounter} configured to count the sizes of the specified measurements.
*/
IDataLakeMeasurementCounter getMeasurementCounter(
List<DataLakeMeasure> allMeasurements,
List<String> measurementsToCount
);

IDataExplorerQueryManagement getQueryManagement(IDataExplorerSchemaManagement dataExplorerSchemaManagement);

IDataExplorerSchemaManagement getSchemaManagement();

ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure);

IDataLakeMeasurementSanitizer getMeasurementSanitizer(IStreamPipesClient client, DataLakeMeasure measure);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.dataexplorer.influx.migrate;
package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.dataexplorer.export.OutputFormat;
import org.apache.streampipes.model.datalake.SpQueryResult;
Expand All @@ -28,8 +28,9 @@

public interface IDataExplorerQueryManagement {

SpQueryResult getData(ProvidedRestQueryParams queryParams,
boolean ignoreMissingData) throws IllegalArgumentException;
SpQueryResult getData(
ProvidedRestQueryParams queryParams,
boolean ignoreMissingData) throws IllegalArgumentException;

void getDataAsStream(ProvidedRestQueryParams params,
OutputFormat format,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.api;

import java.util.Map;

/**
* Interface for counting the number of events per measurement within the StreamPipes data storage.
*/
public interface IDataLakeMeasurementCounter {

/**
* Counts the sizes of measurements within the StreamPipes data storage.
*
* @return A map where each key represents a measurement name and its corresponding value represents
* the number of events contained by that measurement.
*/
Map<String, Integer> countMeasurementSizes();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.model.datalake.DataLakeMeasure;

/**
* The IDataLakeMeasurementSanitizer interface defines methods for sanitizing and registering or
* updating data lake measures.
* Implementations of this interface provide functionality to ensure that the measurement complies to
* the requirements of the underlying time series storage, e.g., to not contain any reserved symbols.
*/
public interface IDataLakeMeasurementSanitizer {

/**
* Sanitizes and registers a data lake measure.
* This method should perform any necessary data validation and cleanup operations
* before registering the measure in the data lake.
*
* @return The sanitized and registered data lake measure.
*/
DataLakeMeasure sanitizeAndRegister();

/**
* Sanitizes and updates a data lake measure.
* This method should perform any necessary data validation and cleanup operations
* before updating the measure in the data lake.
*
* @return The sanitized and updated data lake measure.
*/
DataLakeMeasure sanitizeAndUpdate();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.runtime.Event;

public interface ITimeSeriesStorage {

void onEvent(Event event) throws SpRuntimeException;

void close() throws SpRuntimeException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.streampipes.dataexplorer.influx;

import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.influx.migrate.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
import org.apache.streampipes.dataexplorer.api.IDataLakeQueryBuilder;
Expand All @@ -37,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand All @@ -45,18 +45,6 @@

public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {

public DataExplorerInfluxQueryExecutor() {
super();
}

public DataExplorerInfluxQueryExecutor(String forId) {
super(forId);
}

public DataExplorerInfluxQueryExecutor(int maximumAmountOfEvents) {
super(maximumAmountOfEvents);
}

protected DataSeries convertResult(QueryResult.Series series,
boolean ignoreMissingValues) {
List<String> columns = series.getColumns();
Expand All @@ -79,6 +67,7 @@ protected DataSeries convertResult(QueryResult.Series series,
}

protected SpQueryResult postQuery(QueryResult queryResult,
Optional<String> forIdOpt,
boolean ignoreMissingValues) throws RuntimeException {
SpQueryResult result = new SpQueryResult();
AtomicLong lastTimestamp = new AtomicLong();
Expand All @@ -96,9 +85,7 @@ protected SpQueryResult postQuery(QueryResult queryResult,
result.setLastTimestamp(lastTimestamp.get());
}

if (this.appendId) {
result.setForId(this.forId);
}
forIdOpt.ifPresent(result::setForId);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.influx;

import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerManager;
import org.apache.streampipes.dataexplorer.api.IDataLakeMeasurementCounter;
import org.apache.streampipes.dataexplorer.api.IDataLakeMeasurementSanitizer;
import org.apache.streampipes.dataexplorer.api.ITimeSeriesStorage;
import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.influx.sanitize.DataLakeMeasurementSanitizerInflux;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.storage.management.StorageDispatcher;

import java.util.List;

public enum DataExplorerManagerInflux implements IDataExplorerManager {

INSTANCE;

@Override
public IDataLakeMeasurementCounter getMeasurementCounter(
List<DataLakeMeasure> allMeasurements,
List<String> measurementsToCount) {
return new DataLakeMeasurementCounterInflux(allMeasurements, measurementsToCount);
}

@Override
public IDataExplorerQueryManagement getQueryManagement(
IDataExplorerSchemaManagement dataExplorerSchemaManagement
) {
return new DataExplorerQueryManagementInflux(dataExplorerSchemaManagement);
}

@Override
public IDataExplorerSchemaManagement getSchemaManagement() {
return new DataExplorerSchemaManagement(StorageDispatcher.INSTANCE
.getNoSqlStore()
.getDataLakeStorage());
}

@Override
public ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure) {
return new TimeSeriesStorageInflux(measure, Environments.getEnvironment(), new InfluxClientProvider());
}

@Override
public IDataLakeMeasurementSanitizer getMeasurementSanitizer(IStreamPipesClient client, DataLakeMeasure measure) {
return new DataLakeMeasurementSanitizerInflux(client, measure);
}
}
Loading

0 comments on commit b75cc2b

Please sign in to comment.