Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: abstract data explorer modules from Influx-related code #2803

Merged
merged 19 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public enum Envs {


// Time Series Storage
SP_TS_STORAGE("SP_TS_STORAGE", "influxDB"),
bossenti marked this conversation as resolved.
Show resolved Hide resolved
SP_TS_STORAGE_PROTOCOL("SP_TS_STORAGE_PROTOCOL", "http"),
SP_TS_STORAGE_HOST("SP_TS_STORAGE_HOST", "influxdb", DefaultEnvValues.LOCALHOST),
SP_TS_STORAGE_PORT("SP_TS_STORAGE_PORT", "8086"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public IntEnvironmentVariable getSpCorePort() {
return new IntEnvironmentVariable(Envs.SP_CORE_PORT);
}

@Override
public StringEnvironmentVariable getTsStorage() {
return new StringEnvironmentVariable(Envs.SP_TS_STORAGE);
}

@Override
public StringEnvironmentVariable getTsStorageProtocol() {
return new StringEnvironmentVariable(Envs.SP_TS_STORAGE_PROTOCOL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface Environment {

// Time series storage env variables

StringEnvironmentVariable getTsStorage();

StringEnvironmentVariable getTsStorageProtocol();

StringEnvironmentVariable getTsStorageHost();
Expand Down
5 changes: 5 additions & 0 deletions streampipes-data-explorer-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

<dependencies>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-client-api</artifactId>
<version>0.95.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-data-explorer-export</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.model.datalake.DataLakeMeasure;

import java.util.List;

public interface IDataExplorerManager {

/**
* Provide an instance of {@link IDataLakeMeasurementCounter} for counting the sizes of measurements within a data
* lake.
*
* @param allMeasurements A list of {@link DataLakeMeasure} objects representing all measurements in the data lake.
* @param measurementsToCount A list of measurement names for which the sizes should be counted.
* @return An instance of {@link IDataLakeMeasurementCounter} configured to count the sizes of the specified measurements.
*/
IDataLakeMeasurementCounter getMeasurementCounter(
List<DataLakeMeasure> allMeasurements,
List<String> measurementsToCount
);

IDataExplorerQueryManagement getQueryManagement(IDataExplorerSchemaManagement dataExplorerSchemaManagement);

IDataExplorerSchemaManagement getSchemaManagement();

ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure);

IDataLakeMeasurementSanitizer getMeasurementSanitizer(IStreamPipesClient client, DataLakeMeasure measure);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.dataexplorer.influx.migrate;
package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.dataexplorer.export.OutputFormat;
import org.apache.streampipes.model.datalake.SpQueryResult;
Expand All @@ -28,8 +28,9 @@

public interface IDataExplorerQueryManagement {

SpQueryResult getData(ProvidedRestQueryParams queryParams,
boolean ignoreMissingData) throws IllegalArgumentException;
SpQueryResult getData(
ProvidedRestQueryParams queryParams,
boolean ignoreMissingData) throws IllegalArgumentException;

void getDataAsStream(ProvidedRestQueryParams params,
OutputFormat format,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.api;

import java.util.Map;

/**
* Interface for counting the number of events per measurement within the StreamPipes data storage.
*/
public interface IDataLakeMeasurementCounter {

/**
* Counts the sizes of measurements within the StreamPipes data storage.
*
* @return A map where each key represents a measurement name and its corresponding value represents
* the number of events contained by that measurement.
*/
Map<String, Integer> countMeasurementSizes();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.model.datalake.DataLakeMeasure;

/**
* The IDataLakeMeasurementSanitizer interface defines methods for sanitizing and registering or
* updating data lake measures.
* Implementations of this interface provide functionality to ensure that the measurement complies to
* the requirements of the underlying time series storage, e.g., to not contain any reserved symbols.
*/
public interface IDataLakeMeasurementSanitizer {

/**
* Sanitizes and registers a data lake measure.
* This method should perform any necessary data validation and cleanup operations
* before registering the measure in the data lake.
*
* @return The sanitized and registered data lake measure.
*/
DataLakeMeasure sanitizeAndRegister();

/**
* Sanitizes and updates a data lake measure.
* This method should perform any necessary data validation and cleanup operations
* before updating the measure in the data lake.
*
* @return The sanitized and updated data lake measure.
*/
DataLakeMeasure sanitizeAndUpdate();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.api;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.runtime.Event;

public interface ITimeSeriesStorage {

void onEvent(Event event) throws SpRuntimeException;

void close() throws SpRuntimeException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.streampipes.dataexplorer.influx;

import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.influx.migrate.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
import org.apache.streampipes.dataexplorer.api.IDataLakeQueryBuilder;
Expand All @@ -37,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand All @@ -45,18 +45,6 @@

public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {

public DataExplorerInfluxQueryExecutor() {
super();
}

public DataExplorerInfluxQueryExecutor(String forId) {
super(forId);
}

public DataExplorerInfluxQueryExecutor(int maximumAmountOfEvents) {
super(maximumAmountOfEvents);
}

protected DataSeries convertResult(QueryResult.Series series,
boolean ignoreMissingValues) {
List<String> columns = series.getColumns();
Expand All @@ -79,6 +67,7 @@ protected DataSeries convertResult(QueryResult.Series series,
}

protected SpQueryResult postQuery(QueryResult queryResult,
Optional<String> forIdOpt,
boolean ignoreMissingValues) throws RuntimeException {
SpQueryResult result = new SpQueryResult();
AtomicLong lastTimestamp = new AtomicLong();
Expand All @@ -96,9 +85,7 @@ protected SpQueryResult postQuery(QueryResult queryResult,
result.setLastTimestamp(lastTimestamp.get());
}

if (this.appendId) {
result.setForId(this.forId);
}
forIdOpt.ifPresent(result::setForId);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.dataexplorer.influx;

import org.apache.streampipes.client.api.IStreamPipesClient;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerManager;
import org.apache.streampipes.dataexplorer.api.IDataLakeMeasurementCounter;
import org.apache.streampipes.dataexplorer.api.IDataLakeMeasurementSanitizer;
import org.apache.streampipes.dataexplorer.api.ITimeSeriesStorage;
import org.apache.streampipes.dataexplorer.influx.client.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.influx.sanitize.DataLakeMeasurementSanitizerInflux;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.storage.management.StorageDispatcher;

import java.util.List;

public enum DataExplorerManagerInflux implements IDataExplorerManager {

INSTANCE;

@Override
public IDataLakeMeasurementCounter getMeasurementCounter(
List<DataLakeMeasure> allMeasurements,
List<String> measurementsToCount) {
return new DataLakeMeasurementCounterInflux(allMeasurements, measurementsToCount);
}

@Override
public IDataExplorerQueryManagement getQueryManagement(
IDataExplorerSchemaManagement dataExplorerSchemaManagement
) {
return new DataExplorerQueryManagementInflux(dataExplorerSchemaManagement);
}

@Override
public IDataExplorerSchemaManagement getSchemaManagement() {
return new DataExplorerSchemaManagement(StorageDispatcher.INSTANCE
.getNoSqlStore()
.getDataLakeStorage());
}

@Override
public ITimeSeriesStorage getTimeseriesStorage(DataLakeMeasure measure) {
return new TimeSeriesStorageInflux(measure, Environments.getEnvironment(), new InfluxClientProvider());
}

@Override
public IDataLakeMeasurementSanitizer getMeasurementSanitizer(IStreamPipesClient client, DataLakeMeasure measure) {
return new DataLakeMeasurementSanitizerInflux(client, measure);
}
}
Loading
Loading