Skip to content

Commit

Permalink
[Bugfix] Fixes IRC NPE bug for timed-out cacheable queries (opensearc…
Browse files Browse the repository at this point in the history
…h-project#15327)

* Fix IRC timeout bug

Signed-off-by: Peter Alfonsi <[email protected]>

* addressed Sagar's comments

Signed-off-by: Peter Alfonsi <[email protected]>

* addressed Ankit's comments

Signed-off-by: Peter Alfonsi <[email protected]>

* Add UT for test coverage

Signed-off-by: Peter Alfonsi <[email protected]>

* rerun gradle

Signed-off-by: Peter Alfonsi <[email protected]>

* tweak imports in new UT

Signed-off-by: Peter Alfonsi <[email protected]>

* rerun gradle

Signed-off-by: Peter Alfonsi <[email protected]>

* rerun gradle

Signed-off-by: Peter Alfonsi <[email protected]>

* rerun gradle

Signed-off-by: Peter Alfonsi <[email protected]>

---------

Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
2 people authored and akolarkunnu committed Sep 10, 2024
1 parent fa0e657 commit b971ab8
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand All @@ -56,7 +62,10 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
Expand All @@ -65,6 +74,7 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
Expand Down Expand Up @@ -768,6 +778,59 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
assertTrue(stats.getMemorySizeInBytes() == 0);
}

public void testTimedOutQuery() throws Exception {
// A timed out query should be cached and then invalidated
Client client = client();
String index = "index";
assertAcked(
client.admin()
.indices()
.prepareCreate(index)
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
)
.get()
);
indexRandom(true, client.prepareIndex(index).setSource("k", "hello"));
ensureSearchable(index);
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
forceMerge(client, index);

QueryBuilder timeoutQueryBuilder = new TermQueryBuilder("k", "hello") {
@Override
protected Query doToQuery(QueryShardContext context) {
return new TermQuery(new Term("k", "hello")) {
@Override
public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
// Create the weight before sleeping. Otherwise, TermStates.build() (in the call to super.createWeight()) will
// sometimes throw an exception on timeout, rather than timing out gracefully.
Weight result = super.createWeight(searcher, scoreMode, boost);
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {}
return result;
}
};
}
};

SearchResponse resp = client.prepareSearch(index)
.setRequestCache(true)
.setQuery(timeoutQueryBuilder)
.setTimeout(TimeValue.ZERO)
.get();
assertTrue(resp.isTimedOut());
RequestCacheStats requestCacheStats = getRequestCacheStats(client, index);
// The cache should be empty as the timed-out query was invalidated
assertEquals(0, requestCacheStats.getMemorySizeInBytes());
}

private Path[] shardDirectory(String server, Index index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,11 @@ BytesReference getOrCompute(
* @param cacheKey the cache key to invalidate
*/
void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
assert reader.getReaderCacheHelper() != null;
String readerCacheKeyId = null;
if (reader instanceof OpenSearchDirectoryReader) {
IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper();
readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId();
}
assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper;
OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader
.getReaderCacheHelper();
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();

IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity();
cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -1754,8 +1755,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
if (context.getQueryShardContext().isCacheable() == false) {
return false;
}
return true;

return context.searcher().getDirectoryReader().getReaderCacheHelper() instanceof DelegatingCacheHelper;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,23 @@

package org.opensearch.indices;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexGraveyard;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -76,8 +80,11 @@
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.test.TestSearchContext;
import org.opensearch.test.hamcrest.RegexMatcher;

import java.io.IOException;
Expand Down Expand Up @@ -627,4 +634,32 @@ public void testClusterRemoteTranslogBufferIntervalDefault() {
indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval()
);
}

public void testDirectoryReaderWithoutDelegatingCacheHelperNotCacheable() throws IOException {
IndicesService indicesService = getIndicesService();
final IndexService indexService = createIndex("test");
ShardSearchRequest request = mock(ShardSearchRequest.class);
when(request.requestCache()).thenReturn(true);

TestSearchContext context = new TestSearchContext(indexService.getBigArrays(), indexService) {
@Override
public SearchType searchType() {
return SearchType.QUERY_THEN_FETCH;
}
};

ContextIndexSearcher searcher = mock(ContextIndexSearcher.class);
context.setSearcher(searcher);
DirectoryReader reader = mock(DirectoryReader.class);
when(searcher.getDirectoryReader()).thenReturn(reader);
when(searcher.getIndexReader()).thenReturn(reader);
IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class);
DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class);

for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) {
IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper;
when(reader.getReaderCacheHelper()).thenReturn(cacheHelper);
assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context));
}
}
}

0 comments on commit b971ab8

Please sign in to comment.