Skip to content

Commit

Permalink
CNDB-12739: Fix row filter ignoring distinct index and query analyzers (
Browse files Browse the repository at this point in the history
#1548)

Make RowFilter.Expression consider the two different index analyzers that an indexed column can have, 
one for write time (index_analyzer) and the other for read time (query_analyzer).
  • Loading branch information
adelapena authored Feb 6, 2025
1 parent 46716b0 commit 4544627
Show file tree
Hide file tree
Showing 13 changed files with 580 additions and 247 deletions.
206 changes: 162 additions & 44 deletions src/java/org/apache/cassandra/cql3/Operator.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ else if (otherValue == null)
// the condition value is not null, so only NEQ can return true
return operator == Operator.NEQ;
}
return operator.isSatisfiedBy(type, otherValue, value, null); // We don't use any analyzers in LWT, see CNDB-11658
return operator.isSatisfiedBy(type, otherValue, value, null, null); // We don't use any analyzers in LWT, see CNDB-11658
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public Set<ColumnMetadata> getAnalyzedColumns(IndexRegistry indexRegistry)

for (ColumnCondition condition : this)
{
if (indexRegistry.getAnalyzerFor(condition.column, condition.operator).isPresent())
if (indexRegistry.getIndexAnalyzerFor(condition.column, condition.operator).isPresent())
{
analyzedColumns.add(condition.column);
}
Expand Down
80 changes: 55 additions & 25 deletions src/java/org/apache/cassandra/db/filter/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, Abstract
ByteBuffer value = keyValidator instanceof CompositeType
? ((CompositeType) keyValidator).split(key.getKey())[e.column.position()]
: key.getKey();
if (!e.operator().isSatisfiedBy(e.column.type, value, e.value, e.analyzer()))
if (!e.operator().isSatisfiedBy(e.column.type, value, e.value, e.indexAnalyzer(), e.queryAnalyzer()))
return false;
}
return true;
Expand All @@ -278,7 +278,7 @@ public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering<?> clustering)
if (!e.column.isClusteringColumn())
continue;

if (!e.operator().isSatisfiedBy(e.column.type, clustering.bufferAt(e.column.position()), e.value, e.analyzer()))
if (!e.operator().isSatisfiedBy(e.column.type, clustering.bufferAt(e.column.position()), e.value, e.indexAnalyzer(), e.queryAnalyzer()))
return false;
}
return true;
Expand Down Expand Up @@ -465,7 +465,7 @@ else if (builder.current.children.size() == 1 && builder.current.expressions.isE
public SimpleExpression add(ColumnMetadata def, Operator op, ByteBuffer value)
{
assert op != Operator.ANN : "ANN expressions should be added with the addANNExpression method";
SimpleExpression expression = new SimpleExpression(def, op, value, analyzer(def, op), null);
SimpleExpression expression = new SimpleExpression(def, op, value, indexAnalyzer(def, op), queryAnalyzer(def, op), null);
add(expression);
return expression;
}
Expand All @@ -479,18 +479,24 @@ public SimpleExpression add(ColumnMetadata def, Operator op, ByteBuffer value)
*/
public void addANNExpression(ColumnMetadata def, ByteBuffer value, ANNOptions annOptions)
{
add(new SimpleExpression(def, Operator.ANN, value, null, annOptions));
add(new SimpleExpression(def, Operator.ANN, value, null, null, annOptions));
}

public void addMapComparison(ColumnMetadata def, ByteBuffer key, Operator op, ByteBuffer value)
{
add(new MapComparisonExpression(def, key, op, value, analyzer(def, op)));
add(new MapComparisonExpression(def, key, op, value, indexAnalyzer(def, op), queryAnalyzer(def, op)));
}

@Nullable
private Index.Analyzer analyzer(ColumnMetadata def, Operator op)
private Index.Analyzer indexAnalyzer(ColumnMetadata def, Operator op)
{
return indexRegistry == null ? null : indexRegistry.getAnalyzerFor(def, op).orElse(null);
return indexRegistry == null ? null : indexRegistry.getIndexAnalyzerFor(def, op).orElse(null);
}

@Nullable
private Index.Analyzer queryAnalyzer(ColumnMetadata def, Operator op)
{
return indexRegistry == null ? null : indexRegistry.getQueryAnalyzerFor(def, op).orElse(null);
}

public void addGeoDistanceExpression(ColumnMetadata def, ByteBuffer point, Operator op, ByteBuffer distance)
Expand Down Expand Up @@ -870,7 +876,13 @@ public Operator operator()
}

@Nullable
public Index.Analyzer analyzer()
public Index.Analyzer indexAnalyzer()
{
return null;
}

@Nullable
public Index.Analyzer queryAnalyzer()
{
return null;
}
Expand Down Expand Up @@ -1046,7 +1058,9 @@ public Expression deserialize(DataInputPlus in, int version, TableMetadata metad
ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
Operator operator = Operator.readFrom(in);
ColumnMetadata column = metadata.getColumn(name);
Index.Analyzer analyzer = IndexRegistry.obtain(metadata).getAnalyzerFor(column, operator).orElse(null);
IndexRegistry indexRegistry = IndexRegistry.obtain(metadata);
Index.Analyzer indexAnalyzer = indexRegistry.getIndexAnalyzerFor(column, operator).orElse(null);
Index.Analyzer queryAnalyzer = indexRegistry.getQueryAnalyzerFor(column, operator).orElse(null);

// Compact storage tables, when used with thrift, used to allow falling through this withouot throwing an
// exception. However, since thrift was removed in 4.0, this behaviour was not restored in CASSANDRA-16217
Expand All @@ -1058,11 +1072,11 @@ public Expression deserialize(DataInputPlus in, int version, TableMetadata metad
case SIMPLE:
ByteBuffer value = ByteBufferUtil.readWithShortLength(in);
ANNOptions annOptions = operator == Operator.ANN ? ANNOptions.serializer.deserialize(in, version) : null;
return new SimpleExpression(column, operator, value, analyzer, annOptions);
return new SimpleExpression(column, operator, value, indexAnalyzer, queryAnalyzer, annOptions);
case MAP_COMPARISON:
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
ByteBuffer val = ByteBufferUtil.readWithShortLength(in);
return new MapComparisonExpression(column, key, operator, val, analyzer);
return new MapComparisonExpression(column, key, operator, val, indexAnalyzer, queryAnalyzer);
case VECTOR_RADIUS:
Operator boundaryOperator = Operator.readFrom(in);
ByteBuffer distance = ByteBufferUtil.readWithShortLength(in);
Expand Down Expand Up @@ -1119,26 +1133,40 @@ public long serializedSize(Expression expression, int version)
public abstract static class AnalyzableExpression extends Expression
{
@Nullable
protected final Index.Analyzer analyzer;
protected final Index.Analyzer indexAnalyzer;

@Nullable
protected final Index.Analyzer queryAnalyzer;

public AnalyzableExpression(ColumnMetadata column, Operator operator, ByteBuffer value, @Nullable Index.Analyzer analyzer)
public AnalyzableExpression(ColumnMetadata column,
Operator operator,
ByteBuffer value,
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer)
{
super(column, operator, value);
this.analyzer = analyzer;
this.indexAnalyzer = indexAnalyzer;
this.queryAnalyzer = queryAnalyzer;
}

@Nullable
public final Index.Analyzer indexAnalyzer()
{
return indexAnalyzer;
}

@Nullable
public final Index.Analyzer analyzer()
public final Index.Analyzer queryAnalyzer()
{
return analyzer;
return queryAnalyzer;
}

@Override
public int numFilteredValues()
{
return analyzer == null
return indexAnalyzer == null
? super.numFilteredValues()
: analyzer().analyze(value).size();
: indexAnalyzer().analyze(value).size();
}
}

Expand All @@ -1153,10 +1181,11 @@ public static class SimpleExpression extends AnalyzableExpression
public SimpleExpression(ColumnMetadata column,
Operator operator,
ByteBuffer value,
@Nullable Index.Analyzer analyzer,
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer,
@Nullable ANNOptions annOptions)
{
super(column, operator, value, analyzer);
super(column, operator, value, indexAnalyzer, queryAnalyzer);
this.annOptions = annOptions;
}

Expand Down Expand Up @@ -1194,13 +1223,13 @@ public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey,
return false;

ByteBuffer counterValue = LongType.instance.decompose(CounterContext.instance().total(foundValue, ByteBufferAccessor.instance));
return operator.isSatisfiedBy(LongType.instance, counterValue, value, analyzer);
return operator.isSatisfiedBy(LongType.instance, counterValue, value, indexAnalyzer, queryAnalyzer);
}
else
{
// Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
ByteBuffer foundValue = getValue(metadata, partitionKey, row);
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, analyzer);
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, indexAnalyzer, queryAnalyzer);
}
}
case NEQ:
Expand All @@ -1214,7 +1243,7 @@ public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey,
assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";
ByteBuffer foundValue = getValue(metadata, partitionKey, row);
// Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, analyzer);
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, indexAnalyzer, queryAnalyzer);
}
case CONTAINS:
return contains(metadata, partitionKey, row);
Expand Down Expand Up @@ -1345,9 +1374,10 @@ public MapComparisonExpression(ColumnMetadata column,
ByteBuffer key,
Operator operator,
ByteBuffer value,
@Nullable Index.Analyzer analyzer)
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer)
{
super(column, operator, value, analyzer);
super(column, operator, value, indexAnalyzer, queryAnalyzer);
assert column.type instanceof MapType && (operator == Operator.EQ || operator == Operator.NEQ || operator.isSlice());
this.key = key;
}
Expand Down
19 changes: 15 additions & 4 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,23 @@ default RowFilter.CustomExpression customExpressionFor(TableMetadata metadata, B
}

/**
* Returns the {@link Analyzer} for this index, if any. If the index doesn't transform the column values, this
* method will return an empty optional.
* Returns the write-time {@link Analyzer} for this index, if any. If the index doesn't transform the column values,
* this method will return an empty optional.
*
* @return the transforming column value analyzer for the index, if any
* @return the write-time transforming column value analyzer for the index, if any
*/
default Optional<Analyzer> getAnalyzer()
default Optional<Analyzer> getIndexAnalyzer()
{
return Optional.empty();
}

/**
* Returns the query-time {@link Analyzer} for this index, if any. If the index doesn't transform the column values,
* this method will return an empty optional.
*
* @return the query-time transforming column value analyzer for the index, if any
*/
default Optional<Analyzer> getQueryAnalyzer()
{
return Optional.empty();
}
Expand Down
17 changes: 15 additions & 2 deletions src/java/org/apache/cassandra/index/IndexRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand Down Expand Up @@ -314,13 +315,25 @@ default void registerIndex(Index index)
Index getIndex(IndexMetadata indexMetadata);
Collection<Index> listIndexes();

default Optional<Index.Analyzer> getAnalyzerFor(ColumnMetadata column, Operator operator)
default Optional<Index.Analyzer> getIndexAnalyzerFor(ColumnMetadata column, Operator operator)
{
return getAnalyzerFor(column, operator, Index::getIndexAnalyzer);
}

default Optional<Index.Analyzer> getQueryAnalyzerFor(ColumnMetadata column, Operator operator)
{
return getAnalyzerFor(column, operator, Index::getQueryAnalyzer);
}

default Optional<Index.Analyzer> getAnalyzerFor(ColumnMetadata column,
Operator operator,
Function<Index, Optional<Index.Analyzer>> analyzerGetter)
{
for (Index index : listIndexes())
{
if (index.supportsExpression(column, operator))
{
Optional<Index.Analyzer> analyzer = index.getAnalyzer();
Optional<Index.Analyzer> analyzer = analyzerGetter.apply(index);
if (analyzer.isPresent())
return analyzer;
}
Expand Down
46 changes: 28 additions & 18 deletions src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,26 +660,36 @@ public AbstractType<?> customExpressionValueType()
}

@Override
public Optional<Analyzer> getAnalyzer()
public Optional<Analyzer> getIndexAnalyzer()
{
if (!indexContext.isAnalyzed())
return Optional.empty();
return indexContext.isAnalyzed()
? Optional.of(value -> analyze(indexContext.getAnalyzerFactory(), value))
: Optional.empty();
}

return Optional.of(value -> {
List<ByteBuffer> tokens = new ArrayList<>();
AbstractAnalyzer analyzer = indexContext.getQueryAnalyzerFactory().create();
try
{
analyzer.reset(value);
while (analyzer.hasNext())
tokens.add(analyzer.next());
}
finally
{
analyzer.end();
}
return tokens;
});
@Override
public Optional<Analyzer> getQueryAnalyzer()
{
return indexContext.isAnalyzed()
? Optional.of(value -> analyze(indexContext.getQueryAnalyzerFactory(), value))
: Optional.empty();
}

private static List<ByteBuffer> analyze(AbstractAnalyzer.AnalyzerFactory factory, ByteBuffer value)
{
List<ByteBuffer> tokens = new ArrayList<>();
AbstractAnalyzer analyzer = factory.create();
try
{
analyzer.reset(value.duplicate());
while (analyzer.hasNext())
tokens.add(analyzer.next());
}
finally
{
analyzer.end();
}
return tokens;
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/index/sai/plan/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ static Node buildExpression(QueryController controller, RowFilter.Expression exp
ByteBufferAccessor.instance,
offset,
ProtocolVersion.V3),
expression.analyzer(),
expression.indexAnalyzer(),
expression.queryAnalyzer(),
expression.annOptions())));
offset += TypeSizes.INT_SIZE + ByteBufferAccessor.instance.getInt(expression.getIndexValue(), offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.Assert;
import org.junit.BeforeClass;

import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
Expand Down Expand Up @@ -180,4 +181,38 @@ void assertCannotStartDueToConfigurationException(Cluster cluster)
Assert.assertEquals(ConfigurationException.class.getName(), tr.getClass().getName());
}
}

/**
* Runs the given function before and after a flush of sstables. This is useful for checking that behavior is
* the same whether data is in memtables or sstables.
*
* @param cluster the tested cluster
* @param keyspace the keyspace to flush
* @param runnable the test to run
*/
public static void beforeAndAfterFlush(Cluster cluster, String keyspace, CQLTester.CheckedFunction runnable) throws Throwable
{
try
{
runnable.apply();
}
catch (Throwable t)
{
throw new AssertionError("Test failed before flush:\n" + t, t);
}

for (int i = 1; i <= cluster.size(); i++)
{
cluster.get(i).flush(keyspace);

try
{
runnable.apply();
}
catch (Throwable t)
{
throw new AssertionError("Test failed after flushing node " + i + ":\n" + t, t);
}
}
}
}
Loading

0 comments on commit 4544627

Please sign in to comment.