Skip to content

Commit

Permalink
snapshot column capabilities for realtime cursors (#17386)
Browse files Browse the repository at this point in the history
* snapshot column capabilities for realtime cursors

changes:
* adds `CursorBuildSpec.getPhysicalColumns()` to allow specifying the set of required physical columns from a segment. if null, all columns are assumed to be required (e.g. full scan)
* `IncrementalIndexCursorFactory`/`IncrementalIndexCursorHolder` uses the physical columns from the cursor build spec to know which set of dimensions to 'snapshot' the capabilities for, allowing expression selectors on realtime queries to no longer be required to treat selectors from `StringDimensionIndexer` as multi-valued unless they truly are multi-valued. this fixes several bugs with expressions on realtime queries that change a value from `StringDimensionIndexer` to some type other than string, which would often result in a single element array from the column being handled as multi-valued
* `StringDimensionIndexer.setSparseIndexed()` now adds the default value to the dictionary when set
* `StringDimensionIndexer` column value selectors now always report that they are dictionary encoded, and that name lookup is possible in advance on their selectors (since set sparse adds the null value so the cardinality is correct)
* fixed a mistake that expression selectors for realtime queries with no null values could not use dictionary encoded selectors

* hmm

* test changes

* cleanup

* add test coverage

* fix test

* fixes

* cleanup
  • Loading branch information
clintropolis authored Dec 9, 2024
1 parent ae4ea51 commit 80d2cd3
Show file tree
Hide file tree
Showing 52 changed files with 3,041 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static IncrementalIndex generateIndex() throws IOException
.withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
.build();

return TestIndex.loadIncrementalIndex(
return TestIndex.loadIncrementalIndexFromCharSource(
() -> new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,7 @@ public static CursorBuildSpec makeCursorBuildSpec(GroupByQuery query, @Nullable
.setInterval(query.getSingleInterval())
.setFilter(Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())))
.setVirtualColumns(query.getVirtualColumns())
.setPhysicalColumns(query.getRequiredColumns())
.setGroupingColumns(query.getGroupingColumns())
.setAggregators(query.getAggregatorSpecs())
.setQueryContext(query.context())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public static CursorBuildSpec makeCursorBuildSpec(ScanQuery query, @Nullable Que
.setInterval(query.getSingleInterval())
.setFilter(Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())))
.setVirtualColumns(query.getVirtualColumns())
.setPhysicalColumns(query.getRequiredColumns())
.setPreferredOrdering(query.getOrderBys())
.setQueryContext(query.context())
.setQueryMetrics(queryMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public Object2IntRBTreeMap<SearchHit> execute(final int limit)
)
)
.setVirtualColumns(query.getVirtualColumns())
.setPhysicalColumns(query.getRequiredColumns())
.setQueryContext(query.context())
.build();
try (final CursorHolder cursorHolder = adapter.makeCursorHolder(buildSpec)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public static CursorBuildSpec makeCursorBuildSpec(TimeBoundaryQuery query)
.setInterval(query.getSingleInterval())
.setFilter(Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())))
.setVirtualColumns(query.getVirtualColumns())
.setPhysicalColumns(query.getRequiredColumns())
.setQueryContext(query.context())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ public static CursorBuildSpec makeCursorBuildSpec(TimeseriesQuery query, @Nullab
.setInterval(query.getSingleInterval())
.setFilter(Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())))
.setVirtualColumns(query.getVirtualColumns())
.setPhysicalColumns(query.getRequiredColumns())
.setAggregators(query.getAggregatorSpecs())
.setQueryContext(query.context())
.setPreferredOrdering(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ public static CursorBuildSpec makeCursorBuildSpec(TopNQuery query, @Nullable Que
.setFilter(Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())))
.setGroupingColumns(Collections.singletonList(query.getDimensionSpec().getDimension()))
.setVirtualColumns(query.getVirtualColumns())
.setPhysicalColumns(query.getRequiredColumns())
.setAggregators(query.getAggregatorSpecs())
.setQueryContext(query.context())
.setQueryMetrics(queryMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,13 @@ public Projections.ProjectionMatch matches(
if (!queryCursorBuildSpec.isCompatibleOrdering(orderingWithTimeSubstitution)) {
return null;
}
final List<String> queryGrouping = queryCursorBuildSpec.getGroupingColumns();
Projections.ProjectionMatchBuilder matchBuilder = new Projections.ProjectionMatchBuilder();

if (timeColumnName != null) {
matchBuilder.remapColumn(timeColumnName, ColumnHolder.TIME_COLUMN_NAME);
matchBuilder.remapColumn(timeColumnName, ColumnHolder.TIME_COLUMN_NAME)
.addReferencedPhysicalColumn(ColumnHolder.TIME_COLUMN_NAME);
}
final List<String> queryGrouping = queryCursorBuildSpec.getGroupingColumns();
if (queryGrouping != null) {
for (String queryColumn : queryGrouping) {
matchBuilder = matchRequiredColumn(
Expand Down Expand Up @@ -331,7 +332,9 @@ public Projections.ProjectionMatch matches(
for (AggregatorFactory projectionAgg : aggregators) {
final AggregatorFactory combining = queryAgg.substituteCombiningFactory(projectionAgg);
if (combining != null) {
matchBuilder.remapColumn(queryAgg.getName(), projectionAgg.getName()).addPreAggregatedAggregator(combining);
matchBuilder.remapColumn(queryAgg.getName(), projectionAgg.getName())
.addReferencedPhysicalColumn(projectionAgg.getName())
.addPreAggregatedAggregator(combining);
foundMatch = true;
break;
}
Expand Down Expand Up @@ -388,7 +391,7 @@ private Projections.ProjectionMatchBuilder matchRequiredColumn(
projectionEquivalent.getOutputName()
);
}
return matchBuilder;
return matchBuilder.addReferencedPhysicalColumn(projectionEquivalent.getOutputName());
}

matchBuilder.addReferenceedVirtualColumn(buildSpecVirtualColumn);
Expand All @@ -401,7 +404,8 @@ private Projections.ProjectionMatchBuilder matchRequiredColumn(
if (virtualGranularity.isFinerThan(granularity)) {
return null;
}
return matchBuilder.remapColumn(column, timeColumnName);
return matchBuilder.remapColumn(column, ColumnHolder.TIME_COLUMN_NAME)
.addReferencedPhysicalColumn(ColumnHolder.TIME_COLUMN_NAME);
} else {
// anything else with __time requires none granularity
if (Granularities.NONE.equals(granularity)) {
Expand All @@ -425,7 +429,7 @@ private Projections.ProjectionMatchBuilder matchRequiredColumn(
}
} else {
if (physicalColumnChecker.check(name, column)) {
return matchBuilder;
return matchBuilder.addReferencedPhysicalColumn(column);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ public ColumnCapabilities getColumnCapabilities(String column)

private boolean isVirtualColumn(final String columnName)
{
return virtualColumns.getVirtualColumn(columnName) != null;
return virtualColumns.exists(columnName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Set;

public class CursorBuildSpec
{
Expand Down Expand Up @@ -61,14 +62,18 @@ public static CursorBuildSpecBuilder builder(CursorBuildSpec spec)

private final boolean isAggregate;

@Nullable
private final Set<String> physicalColumns;

@Nullable
private final QueryMetrics<?> queryMetrics;

public CursorBuildSpec(
@Nullable Filter filter,
Interval interval,
@Nullable List<String> groupingColumns,
@Nullable Set<String> physicalColumns,
VirtualColumns virtualColumns,
@Nullable List<String> groupingColumns,
@Nullable List<AggregatorFactory> aggregators,
List<OrderBy> preferredOrdering,
QueryContext queryContext,
Expand All @@ -77,8 +82,9 @@ public CursorBuildSpec(
{
this.filter = filter;
this.interval = Preconditions.checkNotNull(interval, "interval");
this.groupingColumns = groupingColumns;
this.virtualColumns = Preconditions.checkNotNull(virtualColumns, "virtualColumns");
this.physicalColumns = physicalColumns;
this.groupingColumns = groupingColumns;
this.aggregators = aggregators;
this.preferredOrdering = Preconditions.checkNotNull(preferredOrdering, "preferredOrdering");
this.queryContext = Preconditions.checkNotNull(queryContext, "queryContext");
Expand Down Expand Up @@ -107,14 +113,13 @@ public Interval getInterval()
}

/**
* Any columns which will be used for grouping by a query engine for the {@link CursorHolder}, useful for
* specializing the {@link Cursor} or {@link org.apache.druid.segment.vector.VectorCursor} if any pre-aggregated
* data is available.
* Set of physical columns required from a cursor. If null, and {@link #groupingColumns} is null or empty and
* {@link #aggregators} is null or empty, then a {@link CursorHolder} must assume that ALL columns are required
*/
@Nullable
public List<String> getGroupingColumns()
public Set<String> getPhysicalColumns()
{
return groupingColumns;
return physicalColumns;
}

/**
Expand All @@ -126,6 +131,17 @@ public VirtualColumns getVirtualColumns()
return virtualColumns;
}

/**
* Any columns which will be used for grouping by a query engine for the {@link CursorHolder}, useful for
* specializing the {@link Cursor} or {@link org.apache.druid.segment.vector.VectorCursor} if any pre-aggregated
* data is available.
*/
@Nullable
public List<String> getGroupingColumns()
{
return groupingColumns;
}

/**
* Any {@link AggregatorFactory} which will be used by a query engine for the {@link CursorHolder}, useful
* to assist in determining if {@link CursorHolder#canVectorize()}, as well as specializing the {@link Cursor} or
Expand Down Expand Up @@ -208,10 +224,12 @@ public static class CursorBuildSpecBuilder
@Nullable
private Filter filter;
private Interval interval = Intervals.ETERNITY;
private VirtualColumns virtualColumns = VirtualColumns.EMPTY;
@Nullable
private Set<String> physicalColumns;

@Nullable
private List<String> groupingColumns;
private VirtualColumns virtualColumns = VirtualColumns.EMPTY;
@Nullable
private List<AggregatorFactory> aggregators;
private List<OrderBy> preferredOrdering = Collections.emptyList();
Expand All @@ -229,8 +247,9 @@ private CursorBuildSpecBuilder(CursorBuildSpec buildSpec)
{
this.filter = buildSpec.filter;
this.interval = buildSpec.interval;
this.groupingColumns = buildSpec.groupingColumns;
this.physicalColumns = buildSpec.physicalColumns;
this.virtualColumns = buildSpec.virtualColumns;
this.groupingColumns = buildSpec.groupingColumns;
this.aggregators = buildSpec.aggregators;
this.preferredOrdering = buildSpec.preferredOrdering;
this.queryContext = buildSpec.queryContext;
Expand All @@ -256,13 +275,11 @@ public CursorBuildSpecBuilder setInterval(Interval interval)
}

/**
* @see CursorBuildSpec#getGroupingColumns()
* @see CursorBuildSpec#getPhysicalColumns()
*/
public CursorBuildSpecBuilder setGroupingColumns(
@Nullable List<String> groupingColumns
)
public CursorBuildSpecBuilder setPhysicalColumns(@Nullable Set<String> physicalColumns)
{
this.groupingColumns = groupingColumns;
this.physicalColumns = physicalColumns;
return this;
}

Expand All @@ -275,6 +292,15 @@ public CursorBuildSpecBuilder setVirtualColumns(VirtualColumns virtualColumns)
return this;
}

/**
* @see CursorBuildSpec#getGroupingColumns()
*/
public CursorBuildSpecBuilder setGroupingColumns(@Nullable List<String> groupingColumns)
{
this.groupingColumns = groupingColumns;
return this;
}

/**
* @see CursorBuildSpec#getAggregators()
*/
Expand Down Expand Up @@ -316,8 +342,9 @@ public CursorBuildSpec build()
return new CursorBuildSpec(
filter,
interval,
groupingColumns,
physicalColumns,
virtualColumns,
groupingColumns,
aggregators,
preferredOrdering,
queryContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public DictionaryEncodedColumnIndexer(@NotNull DimensionDictionary<ActualType> d
@Override
public void setSparseIndexed()
{
isSparse = true;
if (!isSparse) {
dimLookup.add(null);
isSparse = true;
}
}

public int getSortedEncodedValueFromUnsorted(Integer unsortedIntermediateValue)
Expand Down Expand Up @@ -227,17 +230,6 @@ public Class classOfObject()
return new SortedDimensionSelector();
}

/**
* returns true if all values are encoded in {@link #dimLookup}
*/
protected boolean dictionaryEncodesAllValues()
{
// name lookup is possible in advance if we explicitly process a value for every row, or if we've encountered an
// actual null value and it is present in our dictionary. otherwise the dictionary will be missing ids for implicit
// null values
return !isSparse || dimLookup.getIdForNull() != DimensionDictionary.ABSENT_VALUE_ID;
}

protected SortedDimensionDictionary<ActualType> sortedLookup()
{
return sortedLookup == null ? sortedLookup = dimLookup.sort() : sortedLookup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

package org.apache.druid.segment;

import com.google.common.collect.ImmutableList;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.Filters;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class FilteredCursorFactory implements CursorFactory
{
Expand All @@ -45,20 +47,29 @@ public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
final CursorBuildSpec.CursorBuildSpecBuilder buildSpecBuilder = CursorBuildSpec.builder(spec);
final Filter newFilter;
if (spec.getFilter() == null) {
if (filter != null) {
final Set<String> physicalColumns;
if (filter != null) {
if (spec.getFilter() == null) {
newFilter = filter.toFilter();
} else {
newFilter = null;
newFilter = Filters.and(Arrays.asList(spec.getFilter(), filter.toFilter()));
}
} else {
if (filter != null) {
newFilter = new AndFilter(ImmutableList.of(spec.getFilter(), filter.toFilter()));
if (spec.getPhysicalColumns() != null) {
physicalColumns = new HashSet<>(spec.getPhysicalColumns());
for (String column : filter.getRequiredColumns()) {
if (!spec.getVirtualColumns().exists(column)) {
physicalColumns.add(column);
}
}
} else {
newFilter = spec.getFilter();
physicalColumns = null;
}
} else {
newFilter = spec.getFilter();
physicalColumns = spec.getPhysicalColumns();
}
buildSpecBuilder.setFilter(newFilter);
buildSpecBuilder.setFilter(newFilter)
.setPhysicalColumns(physicalColumns);
return delegate.makeCursorHolder(buildSpecBuilder.build());
}

Expand Down
Loading

0 comments on commit 80d2cd3

Please sign in to comment.