Skip to content

Commit

Permalink
[timeseries] Add Support for limit and numGroupsLimit
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Jan 30, 2025
1 parent 1585490 commit a0c98b6
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri
Long endTs = null;
String step = null;
String timeoutStr = null;
int limit = RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT;
int numGroupsLimit = RangeTimeSeriesRequest.DEFAULT_NUM_GROUPS_LIMIT;
for (NameValuePair nameValuePair : pairs) {
switch (nameValuePair.getName()) {
case "query":
Expand All @@ -167,6 +169,12 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri
case "timeout":
timeoutStr = nameValuePair.getValue();
break;
case "limit":
limit = Integer.parseInt(nameValuePair.getValue());
break;
case "numGroupsLimit":
numGroupsLimit = Integer.parseInt(nameValuePair.getValue());
break;
default:
/* Okay to ignore unknown parameters since the language implementor may be using them. */
break;
Expand All @@ -182,7 +190,8 @@ private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, Stri
timeout = HumanReadableDuration.from(timeoutStr);
}
// TODO: Pass full raw query param string to the request
return new RangeTimeSeriesRequest(language, query, startTs, endTs, stepSeconds, timeout, queryParamString);
return new RangeTimeSeriesRequest(language, query, startTs, endTs, stepSeconds, timeout, limit, numGroupsLimit,
queryParamString);
}

public static Long getStepSeconds(@Nullable String step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ public static TimeSeriesBlock buildTimeSeriesBlock(TimeBuckets timeBuckets,
if (groupByResultsBlock.getNumRows() == 0) {
return new TimeSeriesBlock(timeBuckets, new HashMap<>());
}
if (groupByResultsBlock.isNumGroupsLimitReached()) {
throw new IllegalStateException(String.format("Series limit reached. Number of series: %s",
groupByResultsBlock.getNumRows()));
}
// TODO: Check isNumGroupsLimitReached, and propagate it somehow to the caller.
Map<Long, List<TimeSeries>> timeSeriesMap = new HashMap<>(groupByResultsBlock.getNumRows());
List<String> tagNames = getTagNamesFromDataSchema(Objects.requireNonNull(groupByResultsBlock.getDataSchema(),
"DataSchema is null in leaf stage of time-series query"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -78,7 +80,8 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
switch (command) {
case "fetch":
List<String> tokens = commands.get(commandId).subList(1, commands.get(commandId).size());
currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns);
currentNode = handleFetchNode(planIdGenerator.generateId(), tokens, children, aggInfo, groupByColumns,
request);
break;
case "sum":
case "min":
Expand Down Expand Up @@ -118,7 +121,8 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
}

public BaseTimeSeriesPlanNode handleFetchNode(String planId, List<String> tokens,
List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String> groupByColumns) {
List<BaseTimeSeriesPlanNode> children, AggInfo aggInfo, List<String> groupByColumns,
RangeTimeSeriesRequest request) {
Preconditions.checkState(tokens.size() % 2 == 0, "Mismatched args");
String tableName = null;
String timeColumn = null;
Expand Down Expand Up @@ -152,7 +156,11 @@ public BaseTimeSeriesPlanNode handleFetchNode(String planId, List<String> tokens
Preconditions.checkNotNull(timeColumn, "Time column not set. Set via time_col=");
Preconditions.checkNotNull(timeUnit, "Time unit not set. Set via time_unit=");
Preconditions.checkNotNull(valueExpr, "Value expression not set. Set via value=");
Map<String, String> queryOptions = new HashMap<>();
if (request.getNumGroupsLimit() > 0) {
queryOptions.put("numGroupsLimit", Integer.toString(request.getNumGroupsLimit()));
}
return new LeafTimeSeriesPlanNode(planId, children, tableName, timeColumn, timeUnit, 0L, filter, valueExpr, aggInfo,
groupByColumns);
groupByColumns, request.getLimit(), queryOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.query.runtime.timeseries;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -111,14 +110,17 @@ QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExec
leafNode.getTimeUnit(), timeBuckets, leafNode.getOffsetSeconds() == null ? 0 : leafNode.getOffsetSeconds());
ExpressionContext aggregation = TimeSeriesAggregationFunction.create(context.getLanguage(),
leafNode.getValueExpression(), timeTransform, timeBuckets, leafNode.getAggInfo());
Map<String, String> queryOptions = new HashMap<>(leafNode.getQueryOptions());
queryOptions.put(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getRemainingTimeMs()));
return new QueryContext.Builder()
.setTableName(leafNode.getTableName())
.setFilter(filterContext)
.setGroupByExpressions(groupByExpressions)
.setSelectExpressions(List.of(aggregation))
.setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getRemainingTimeMs())))
.setQueryOptions(queryOptions)
.setAliasList(Collections.emptyList())
.setLimit(Integer.MAX_VALUE)
.setQueryOptions(leafNode.getQueryOptions())
.setLimit(leafNode.getLimit())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerMetrics;
Expand All @@ -43,6 +44,8 @@
public class PhysicalTimeSeriesServerPlanVisitorTest {
private static final String LANGUAGE = "m3ql";
private static final int DUMMY_DEADLINE_MS = 10_000;
private static final int SERIES_LIMIT = 1000;
private static final Map<String, String> QUERY_OPTIONS = Collections.emptyMap();

@BeforeClass
public void setUp() {
Expand All @@ -65,7 +68,8 @@ public void testCompileQueryContext() {
DUMMY_DEADLINE_MS, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L,
filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName"));
filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName"), SERIES_LIMIT,
QUERY_OPTIONS);
QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context);
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= '1990')");
Expand All @@ -78,7 +82,8 @@ public void testCompileQueryContext() {
DUMMY_DEADLINE_MS, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 10L,
filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')"));
filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')"),
SERIES_LIMIT, QUERY_OPTIONS);
QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context);
assertNotNull(queryContext);
assertNotNull(queryContext.getGroupByExpressions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
Expand All @@ -31,6 +32,9 @@


public class TimeSeriesPlanFragmenterTest {
private static final int SERIES_LIMIT = 1000;
private static final Map<String, String> QUERY_OPTIONS = Collections.emptyMap();

@Test
public void testGetFragmentsWithMultipleLeafNodes() {
/*
Expand Down Expand Up @@ -136,7 +140,8 @@ public void testGetFragmentsWithSinglePlanNode() {

private LeafTimeSeriesPlanNode createMockLeafNode(String id) {
return new LeafTimeSeriesPlanNode(id, Collections.emptyList(), "someTableName", "someTimeColumn",
TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList());
TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList(),
SERIES_LIMIT, QUERY_OPTIONS);
}

static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
* </ul>
*/
public class RangeTimeSeriesRequest {
// TODO: It's not ideal to have another default, that plays with numGroupsLimit, etc.
public static final int DEFAULT_SERIES_LIMIT = 100_000;
public static final int DEFAULT_NUM_GROUPS_LIMIT = -1;
/** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */
private final String _language;
/** Query is the raw query sent by the caller. */
Expand All @@ -63,11 +66,15 @@ public class RangeTimeSeriesRequest {
private final long _stepSeconds;
/** E2E timeout for the query. */
private final Duration _timeout;
/** Series limit for the query */
private final int _limit;
/** The numGroupsLimit value used in Pinot's Grouping Algorithm. */
private final int _numGroupsLimit;
/** Full query string to allow language implementations to pass custom parameters. */
private final String _fullQueryString;

public RangeTimeSeriesRequest(String language, String query, long startSeconds, long endSeconds, long stepSeconds,
Duration timeout, String fullQueryString) {
Duration timeout, int limit, int numGroupsLimit, String fullQueryString) {
Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds "
+ "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s",
startSeconds, endSeconds);
Expand All @@ -77,6 +84,8 @@ public RangeTimeSeriesRequest(String language, String query, long startSeconds,
_endSeconds = endSeconds;
_stepSeconds = stepSeconds;
_timeout = timeout;
_limit = limit;
_numGroupsLimit = numGroupsLimit;
_fullQueryString = fullQueryString;
}

Expand Down Expand Up @@ -104,6 +113,14 @@ public Duration getTimeout() {
return _timeout;
}

public int getLimit() {
return _limit;
}

public int getNumGroupsLimit() {
return _numGroupsLimit;
}

public String getFullQueryString() {
return _fullQueryString;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
Expand All @@ -44,6 +46,8 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
private final String _valueExpression;
private final AggInfo _aggInfo;
private final List<String> _groupByExpressions;
private final Map<String, String> _queryOptions;
private final int _limit;

@JsonCreator
public LeafTimeSeriesPlanNode(
Expand All @@ -52,7 +56,8 @@ public LeafTimeSeriesPlanNode(
@JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offsetSeconds") Long offsetSeconds,
@JsonProperty("filterExpression") String filterExpression,
@JsonProperty("valueExpression") String valueExpression, @JsonProperty("aggInfo") AggInfo aggInfo,
@JsonProperty("groupByExpressions") List<String> groupByExpressions) {
@JsonProperty("groupByExpressions") List<String> groupByExpressions,
@JsonProperty("limit") int limit, @JsonProperty("queryOptions") Map<String, String> queryOptions) {
super(id, inputs);
_tableName = tableName;
_timeColumn = timeColumn;
Expand All @@ -62,17 +67,19 @@ public LeafTimeSeriesPlanNode(
_valueExpression = valueExpression;
_aggInfo = aggInfo;
_groupByExpressions = groupByExpressions;
_limit = limit <= 0 ? RangeTimeSeriesRequest.DEFAULT_SERIES_LIMIT : limit;
_queryOptions = queryOptions;
}

public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) {
return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds,
_filterExpression, _valueExpression, newAggInfo, _groupByExpressions);
_filterExpression, _valueExpression, newAggInfo, _groupByExpressions, _limit, _queryOptions);
}

@Override
public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs) {
return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds,
_filterExpression, _valueExpression, _aggInfo, _groupByExpressions);
_filterExpression, _valueExpression, _aggInfo, _groupByExpressions, _limit, _queryOptions);
}

@Override
Expand All @@ -83,8 +90,8 @@ public String getKlass() {
@Override
public String getExplainName() {
return String.format("LEAF_TIME_SERIES_PLAN_NODE(%s, table=%s, timeExpr=%s, valueExpr=%s, aggInfo=%s, "
+ "groupBy=%s, filter=%s, offsetSeconds=%s)", _id, _tableName, _timeColumn, _valueExpression,
_aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, _offsetSeconds);
+ "groupBy=%s, filter=%s, offsetSeconds=%s, limit=%s)", _id, _tableName, _timeColumn, _valueExpression,
_aggInfo.getAggFunction(), _groupByExpressions, _filterExpression, _offsetSeconds, _limit);
}

@Override
Expand Down Expand Up @@ -124,6 +131,14 @@ public List<String> getGroupByExpressions() {
return _groupByExpressions;
}

public int getLimit() {
return _limit;
}

public Map<String, String> getQueryOptions() {
return _queryOptions;
}

public String getEffectiveFilter(TimeBuckets timeBuckets) {
String filter = _filterExpression == null ? "" : _filterExpression;
long startTime = _timeUnit.convert(Duration.ofSeconds(timeBuckets.getTimeRangeStartExclusive() - _offsetSeconds));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pinot.tsdb.spi.plan;

import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
Expand All @@ -33,6 +35,8 @@ public class LeafTimeSeriesPlanNodeTest {
private static final String TABLE = "myTable";
private static final String TIME_COLUMN = "orderTime";
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final int SERIES_LIMIT = 10;
private static final Map<String, String> QUERY_OPTIONS = ImmutableMap.of("numGroupsLimit", "100000");

@Test
public void testGetEffectiveFilter() {
Expand All @@ -44,23 +48,24 @@ public void testGetEffectiveFilter() {
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 0L, "", "value_col",
new AggInfo("SUM", false, null), Collections.singletonList("cityName"));
new AggInfo("SUM", false, null), Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter);
}
// Case-2: Offset, but empty filter
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, "", "value_col",
new AggInfo("SUM", false, null), Collections.singletonList("cityName"));
new AggInfo("SUM", false, null), Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
"orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123));
}
// Case-3: Offset and non-empty filter
{
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter,
"value_col", new AggInfo("SUM", false, Collections.emptyMap()), Collections.singletonList("cityName"));
"value_col", new AggInfo("SUM", false, Collections.emptyMap()), Collections.singletonList("cityName"),
SERIES_LIMIT, QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter,
(expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123)));
Expand All @@ -70,7 +75,7 @@ public void testGetEffectiveFilter() {
LeafTimeSeriesPlanNode planNode =
new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TimeUnit.MILLISECONDS, 123L,
nonEmptyFilter, "value_col", new AggInfo("SUM", false, Collections.emptyMap()),
Collections.singletonList("cityName"));
Collections.singletonList("cityName"), SERIES_LIMIT, QUERY_OPTIONS);
assertEquals(planNode.getEffectiveFilter(timeBuckets),
String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter,
(expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.tsdb.spi.plan.serde;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,14 +35,18 @@


public class TimeSeriesPlanSerdeTest {
private static final int SERIES_LIMIT = 1000;
private static final Map<String, String> QUERY_OPTIONS = Collections.emptyMap();

@Test
public void testSerdeForScanFilterProjectNode() {
Map<String, String> aggParams = new HashMap<>();
aggParams.put("window", "5m");

LeafTimeSeriesPlanNode leafTimeSeriesPlanNode =
new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, 0L,
"myFilterExpression", "myValueExpression", new AggInfo("SUM", false, aggParams), new ArrayList<>());
"myFilterExpression", "myValueExpression", new AggInfo("SUM", false, aggParams), new ArrayList<>(),
SERIES_LIMIT, QUERY_OPTIONS);
BaseTimeSeriesPlanNode planNode =
TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode));
assertTrue(planNode instanceof LeafTimeSeriesPlanNode);
Expand Down

0 comments on commit a0c98b6

Please sign in to comment.