Skip to content

Commit

Permalink
SED-1456 wrong list of attributes used for ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
david-stephan committed Oct 11, 2022
1 parent 47dedcf commit 6c9c098
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import step.plugins.measurements.Measurement;
import step.plugins.measurements.MeasurementHandler;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TimeSeriesBucketingHandler implements MeasurementHandler {

Expand All @@ -24,8 +24,11 @@ public class TimeSeriesBucketingHandler implements MeasurementHandler {

private final TimeSeriesIngestionPipeline ingestionPipeline;

public TimeSeriesBucketingHandler(TimeSeriesIngestionPipeline ingestionPipeline) {
private final List<String> attributes;

public TimeSeriesBucketingHandler(TimeSeriesIngestionPipeline ingestionPipeline, List<String> attributes) {
this.ingestionPipeline = ingestionPipeline;
this.attributes = attributes;
}

@Override
Expand All @@ -50,11 +53,13 @@ public void processMeasurement(Measurement measurement) {
}

private BucketAttributes measurementToBucketAttributes(Measurement measurement) {
return new BucketAttributes(measurement.entrySet().stream().collect(Collectors.toMap(k -> k.getKey(),
v -> {
Object value = v.getValue();
return value != null ? value.toString() : null;
})));
Map<String, String> bucketAttributesMap = new HashMap<>();
attributes.forEach(a -> {
if (measurement.containsKey(a)) {
bucketAttributesMap.put(a,measurement.get(a).toString());
}
});
return new BucketAttributes(bucketAttributesMap);
}

private void removeKeys(Map<String, String> map, String... attributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import step.plugins.measurements.GaugeCollectorRegistry;
import step.plugins.measurements.MeasurementPlugin;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -23,6 +25,9 @@ public class TimeSeriesControllerPlugin extends AbstractControllerPlugin {
public static String RESOLUTION_PERIOD_PROPERTY = "plugins.timeseries.resolution.period";
public static String TIME_SERIES_COLLECTION_PROPERTY = "timeseries";

public static String TIME_SERIES_ATTRIBUTES_PROPERTY = "plugins.timeseries.attributes";
public static String TIME_SERIES_ATTRIBUTES_DEFAULT = "eId,taskId,metricType,name,rnStatus,project,type";

private static final Logger logger = LoggerFactory.getLogger(TimeSeriesControllerPlugin.class);
private TimeSeriesIngestionPipeline mainIngestionPipeline;

Expand All @@ -31,6 +36,7 @@ public void serverStart(GlobalContext context) {
Configuration configuration = context.getConfiguration();
Integer resolutionPeriod = configuration.getPropertyAsInteger(RESOLUTION_PERIOD_PROPERTY, 1000);
Long flushPeriod = configuration.getPropertyAsLong("plugins.timeseries.flush.period", 1000L);
List<String> attributes = Arrays.asList(configuration.getProperty(TIME_SERIES_ATTRIBUTES_PROPERTY, TIME_SERIES_ATTRIBUTES_DEFAULT).split(","));
CollectionFactory collectionFactory = context.getCollectionFactory();

TimeSeries timeSeries = new TimeSeries(collectionFactory, TIME_SERIES_COLLECTION_PROPERTY, Set.of(), resolutionPeriod);
Expand All @@ -42,7 +48,7 @@ public void serverStart(GlobalContext context) {
context.put(TimeSeriesAggregationPipeline.class, aggregationPipeline);

context.getServiceRegistrationCallback().registerService(TimeSeriesService.class);
TimeSeriesBucketingHandler handler = new TimeSeriesBucketingHandler(mainIngestionPipeline);
TimeSeriesBucketingHandler handler = new TimeSeriesBucketingHandler(mainIngestionPipeline, attributes);
MeasurementPlugin.registerMeasurementHandlers(handler);
GaugeCollectorRegistry.getInstance().registerHandler(handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

import static step.plugins.timeseries.TimeSeriesControllerPlugin.TIME_SERIES_ATTRIBUTES_DEFAULT;
import static step.plugins.timeseries.TimeSeriesControllerPlugin.TIME_SERIES_ATTRIBUTES_PROPERTY;
import static step.plugins.timeseries.TimeSeriesExecutionPlugin.TIMESERIES_FLAG;

@Singleton
Expand Down Expand Up @@ -142,7 +144,8 @@ public AsyncTaskStatus<Object> rebuildTimeSeries(TimeSeriesRebuildRequest reques
// the flushing period can be a big value, because we will force flush every time.
// we create a new pipeline for every migration
try (TimeSeriesIngestionPipeline ingestionPipeline = timeSeries.newIngestionPipeline(3000)) {
TimeSeriesBucketingHandler timeSeriesBucketingHandler = new TimeSeriesBucketingHandler(ingestionPipeline);
List<String> attributes = Arrays.asList(configuration.getProperty(TIME_SERIES_ATTRIBUTES_PROPERTY, TIME_SERIES_ATTRIBUTES_DEFAULT).split(","));
TimeSeriesBucketingHandler timeSeriesBucketingHandler = new TimeSeriesBucketingHandler(ingestionPipeline, attributes);
LongAdder count = new LongAdder();
SearchOrder searchOrder = new SearchOrder("begin", 1);
// Iterate over each measurement and ingest it again
Expand Down
Original file line number Diff line number Diff line change
@@ -1,65 +1,140 @@
package step.plugins.timeseries;

import jakarta.servlet.DispatcherType;
import jakarta.servlet.Filter;
import junit.framework.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Before;
import org.junit.Test;
import step.artefacts.BaseArtefactPlugin;
import step.artefacts.ThreadGroup;
import step.core.GlobalContext;
import step.core.artefacts.reports.ReportNodeStatus;
import step.core.collections.inmemory.InMemoryCollectionFactory;
import step.core.deployment.WebApplicationConfigurationManager;
import step.core.dynamicbeans.DynamicValue;
import step.core.execution.ExecutionEngine;
import step.core.execution.model.Execution;
import step.core.plans.Plan;
import step.core.plans.builder.PlanBuilder;
import step.core.plans.runner.PlanRunnerResult;
import step.core.timeseries.TimeSeriesAggregationPipeline;
import step.core.timeseries.TimeSeriesAggregationQuery;
import step.core.timeseries.TimeSeriesAggregationResponse;
import step.engine.plugins.FunctionPlugin;
import step.engine.plugins.LocalFunctionPlugin;
import step.framework.server.ServiceRegistrationCallback;
import step.handlers.javahandler.AbstractKeyword;
import step.handlers.javahandler.Keyword;
import step.planbuilder.BaseArtefacts;
import step.planbuilder.FunctionArtefacts;
import step.plugins.measurements.GaugeCollectorRegistry;
import step.plugins.measurements.MeasurementControllerPlugin;
import step.plugins.measurements.MeasurementPlugin;
import step.threadpool.ThreadPoolPlugin;

import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static step.plugins.timeseries.TimeSeriesExecutionPlugin.TIMESERIES_FLAG;

class TimeSeriesExecutionPluginTest {
public class TimeSeriesExecutionPluginTest extends AbstractKeyword {

ExecutionEngine engine;
@BeforeEach
void setUp() {

TimeSeriesAggregationPipeline timeSeriesAggregationPipeline;

TimeSeriesControllerPlugin tsPlugin;
private GlobalContext globalContext;

@Before
public void setUp() throws Exception {
globalContext = new GlobalContext();
globalContext.setCollectionFactory(new InMemoryCollectionFactory(null));
globalContext.setServiceRegistrationCallback(new ServiceRegistrationCallback() {
@Override
public void register(Object o) { }
@Override
public void registerService(Class<?> aClass) { }
@Override
public void registerHandler(Handler handler) { }
@Override
public void registerServlet(ServletHolder servletHolder, String s) { }
@Override
public FilterHolder registerServletFilter(Class<? extends Filter> aClass, String s, EnumSet<DispatcherType> enumSet) {
return null;
}
@Override
public void stop() { }
@Override
public void registerPackage(Package aPackage) { }
});
globalContext.put(WebApplicationConfigurationManager.class, new WebApplicationConfigurationManager());
MeasurementControllerPlugin mc = new MeasurementControllerPlugin();
mc.serverStart(globalContext);
tsPlugin = new TimeSeriesControllerPlugin();
tsPlugin.serverStart(globalContext);
timeSeriesAggregationPipeline = globalContext.get(TimeSeriesAggregationPipeline.class);
engine = ExecutionEngine.builder().withPlugin(new MeasurementPlugin(GaugeCollectorRegistry.getInstance()))
.withPlugin(new FunctionPlugin()).withPlugin(new ThreadPoolPlugin())
.withPlugin(new LocalFunctionPlugin()).withPlugin(new BaseArtefactPlugin()).withPlugin(new TimeSeriesExecutionPlugin())
.build();

}

@Test
void initializeExecutionContext() {
public void testTimeSeriesPlugins() throws InterruptedException {

ThreadGroup threadGroup = new ThreadGroup();
threadGroup.setIterations(new DynamicValue<Integer>(10));
threadGroup.setPacing(new DynamicValue<Integer>(10));
threadGroup.setPacing(new DynamicValue<Integer>(0));
threadGroup.setUsers(new DynamicValue<Integer>(5));

AtomicInteger iterations = new AtomicInteger(0);

Plan plan = PlanBuilder.create()
.startBlock(BaseArtefacts.sequence())
.startBlock(threadGroup)
.startBlock(FunctionArtefacts.keyword("TestKeywordWithMeasurements"))
.endBlock()
.endBlock()
.endBlock()
.build();

long t1 = System.currentTimeMillis();
AtomicReference<String> execId = new AtomicReference<>();
PlanRunnerResult planRunnerResult = engine.execute(plan).visitReportNodes(node -> {
PlanRunnerResult planRunnerResult = engine.execute(plan).visitReportNodes(node->{
Assert.assertEquals(ReportNodeStatus.PASSED, node.getStatus());
execId.set(node.getExecutionID());
});
long t2 = System.currentTimeMillis();
String executionId = planRunnerResult.getExecutionId();
Execution execution = engine.getExecutionEngineContext().getExecutionAccessor().get(executionId);
boolean tsFlag = (boolean) execution.getCustomField(TIMESERIES_FLAG);
Assert.assertTrue(tsFlag);
long t2 = System.currentTimeMillis();

tsPlugin.serverStop(globalContext);

//Thread.sleep(20000);

TimeSeriesAggregationResponse keywordsBuckets = timeSeriesAggregationPipeline.newQuery().range(t1, t2).filter(Map.of("type", "keyword")).groupBy(Set.of("name")).run();
Assert.assertEquals(1,keywordsBuckets.getSeries().size());
//assertEquals(50,keywordsBuckets.getSeries().values().stream().findFirst().get().values().stream().findFirst().get().getCount());
TimeSeriesAggregationResponse customBuckets = timeSeriesAggregationPipeline.newQuery().range(t1, t2).filter(Map.of("type", "custom")).groupBy(Set.of("name","customAttr")).run();
Assert.assertEquals(2,customBuckets.getSeries().size());
}

@Keyword
public void TestKeywordWithMeasurements() {
output.addMeasure("myMeasure1", 1000, Map.of("customAttr","val1"));
output.addMeasure("myMeasure2", 1000,Map.of("customAttr","val2"));
output.addMeasure("myMeasure2", 100,Map.of("customAttr","val3"));
output.addMeasure("myMeasure2", 10,Map.of("customAttr","val4"));
}
}

0 comments on commit 6c9c098

Please sign in to comment.