Skip to content

Commit

Permalink
Deprecate FSDirectory dependency and FileWatcher.
Browse files Browse the repository at this point in the history
Signed-off-by: Dooyong Kim <[email protected]>
  • Loading branch information
Dooyong Kim committed Oct 17, 2024
1 parent f383f82 commit 94058d3
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 121 deletions.
44 changes: 28 additions & 16 deletions src/main/java/org/opensearch/knn/index/KNNIndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.knn.index.memory.NativeMemoryAllocation;
Expand All @@ -28,7 +31,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -91,14 +93,18 @@ public void warmup() throws IOException {
try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-warmup")) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
try {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.get(
new NativeMemoryEntryContext.IndexEntryContext(
directory,
engineFileContext.getIndexPath(),
cacheKey,
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(),
getParametersAtLoading(
engineFileContext.getSpaceType(),
KNNEngine.getEngineNameFromPath(engineFileContext.getIndexPath()),
KNNEngine.getEngineNameFromPath(engineFileContext.getVectorFileName()),
getIndexName(),
engineFileContext.getVectorDataType()
),
Expand Down Expand Up @@ -130,9 +136,13 @@ public void clearCache() {
indexAllocation.writeLock();
log.info("[KNN] Evicting index from cache: [{}]", indexName);
try (Engine.Searcher searcher = indexShard.acquireSearcher(INDEX_SHARD_CLEAR_CACHE_SEARCHER)) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach(
(engineFileContext) -> nativeMemoryCacheManager.invalidate(engineFileContext.getIndexPath())
);
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.invalidate(cacheKey);
});
} catch (IOException ex) {
log.error("[KNN] Failed to evict index from cache: [{}]", indexName, ex);
throw new RuntimeException(ex);
Expand Down Expand Up @@ -176,8 +186,7 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine
String modelId = fieldInfo.attributes().getOrDefault(MODEL_ID, null);
engineFiles.addAll(
getEngineFileContexts(
reader.getSegmentInfo().files(),
reader.getSegmentInfo().info.name,
reader.getSegmentInfo(),
fieldInfo.name,
fileExtension,
spaceType,
Expand All @@ -197,20 +206,22 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine

@VisibleForTesting
List<EngineFileContext> getEngineFileContexts(
Collection<String> files,
String segmentName,
SegmentCommitInfo segmentCommitInfo,
String fieldName,
String fileExtension,
SpaceType spaceType,
String modelId,
VectorDataType vectorDataType
) {
String prefix = buildEngineFilePrefix(segmentName);
String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return files.stream()
) throws IOException {
// Ex: _0
final String prefix = buildEngineFilePrefix(segmentCommitInfo.info.name);
// Ex: my_field.faiss
final String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return segmentCommitInfo.files()
.stream()
.filter(fileName -> fileName.startsWith(prefix))
.filter(fileName -> fileName.endsWith(suffix))
.map(fileName -> new EngineFileContext(spaceType, modelId, fileName, vectorDataType))
.map(vectorFileName -> new EngineFileContext(spaceType, modelId, vectorFileName, vectorDataType, segmentCommitInfo.info))
.collect(Collectors.toList());
}

Expand All @@ -220,7 +231,8 @@ List<EngineFileContext> getEngineFileContexts(
static class EngineFileContext {
private final SpaceType spaceType;
private final String modelId;
private final String indexPath;
private final String vectorFileName;
private final VectorDataType vectorDataType;
private final SegmentInfo segmentInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import java.io.IOException;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

@Log4j2
public class KNN80DocValuesProducer extends DocValuesProducer {
private final DocValuesProducer delegate;
Expand All @@ -39,17 +42,23 @@ public KNN80DocValuesProducer(DocValuesProducer delegate, SegmentReadState state
this.delegate = delegate;

for (FieldInfo field : state.fieldInfos) {
if (!field.attributes().containsKey(KNN_FIELD)) {
continue;
}
// Only segments that contains BinaryDocValues and doesn't have vector values should be considered.
// By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true
// to create binary doc values explicitly like any other field. Hence, we only want to include fields
// where approximate search is possible only by BinaryDocValues.
if (field.hasVectorValues() || field.getDocValuesType() != DocValuesType.BINARY) {
if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues()) {
continue;
}

final String vectorIndexFileName = KNNCodecUtil.getEngineFileFromFieldInfo(field, state.segmentInfo);
if (vectorIndexFileName != null) {
fieldNameToVectorFileName.putIfAbsent(field.getName(), vectorIndexFileName);
if (vectorIndexFileName == null) {
continue;
}
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, state.segmentInfo);
fieldNameToVectorFileName.putIfAbsent(field.getName(), cacheKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.IOUtils;
import org.opensearch.common.UUIDs;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
Expand All @@ -35,6 +36,8 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

/**
* Vectors reader class for reading the flat vectors for native engines. The class provides methods for iterating
* over the vectors and retrieving their values.
Expand All @@ -55,10 +58,18 @@ public NativeEngines990KnnVectorsReader(final SegmentReadState state, final Flat

private void fillIndexToVectorFileName(SegmentReadState state) {
for (FieldInfo field : state.fieldInfos) {
if (!field.attributes().containsKey(KNN_FIELD)) {
continue;
}
if (!field.hasVectorValues()) {
continue;
}
final String vectorIndexFileName = KNNCodecUtil.getEngineFileFromFieldInfo(field, state.segmentInfo);
if (vectorIndexFileName != null) {
fieldNameToVectorFileName.putIfAbsent(field.getName(), vectorIndexFileName);
if (vectorIndexFileName == null) {
continue;
}
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, state.segmentInfo);
fieldNameToVectorFileName.putIfAbsent(field.getName(), cacheKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

public class KNNCodecUtil {
Expand Down Expand Up @@ -116,9 +115,6 @@ public static String getEngineFileFromFieldInfo(FieldInfo field, SegmentInfo seg
* @return if and only if Native Engine we return specific engine, else return null
*/
private static KNNEngine getNativeKNNEngine(@NonNull FieldInfo field) {
if (field.attributes().containsKey(MODEL_ID)) {
return null;
}
final KNNEngine engine = FieldInfoExtractor.extractKNNEngine(field);
if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().contains(engine)) {
return engine;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.util;

import org.apache.lucene.index.SegmentInfo;

import java.util.Base64;

public final class NativeMemoryCacheKeyHelper {
private NativeMemoryCacheKeyHelper() {}

/**
* Construct a unique cache key for look-up operation in {@link org.opensearch.knn.index.memory.NativeMemoryCacheManager}
*
* @param vectorIndexFileName Vector index file name. Ex: _0_165_test_field.faiss.
* @param segmentInfo Segment info object representing a logical segment unit containing a vector index.
* @return Unique cache key that can be used for look-up and invalidating in
* {@link org.opensearch.knn.index.memory.NativeMemoryCacheManager}
*/
public static String constructCacheKey(final String vectorIndexFileName, final SegmentInfo segmentInfo) {
final String segmentId = Base64.getEncoder().encodeToString(segmentInfo.getId());
final String cacheKey = vectorIndexFileName + "@" + segmentId;
return cacheKey;
}

public static String extractVectorIndexFileName(final String cacheKey) {
final int indexOfDelimiter = cacheKey.indexOf('@');
if (indexOfDelimiter != -1) {
final String vectorFileName = cacheKey.substring(0, indexOfDelimiter);
return vectorFileName;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,40 +80,40 @@ public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMem
* Constructor
*
* @param directory Lucene directory to create required IndexInput/IndexOutput to access files.
* @param indexPath Path to index file. Also used as key in cache.
* @param vectorIndexCacheKey Cache key for {@link NativeMemoryCacheManager}. It must contain a vector file name.
* @param indexLoadStrategy Strategy to load index into memory
* @param parameters Load time parameters
* @param openSearchIndexName Opensearch index associated with index
*/
public IndexEntryContext(
Directory directory,
String indexPath,
String vectorIndexCacheKey,
NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy,
Map<String, Object> parameters,
String openSearchIndexName
) {
this(directory, indexPath, indexLoadStrategy, parameters, openSearchIndexName, null);
this(directory, vectorIndexCacheKey, indexLoadStrategy, parameters, openSearchIndexName, null);
}

/**
* Constructor
*
* @param directory Lucene directory to create required IndexInput/IndexOutput to access files.
* @param indexPath path to index file. Also used as key in cache.
* @param vectorIndexCacheKey Cache key for {@link NativeMemoryCacheManager}. It must contain a vector file name.
* @param indexLoadStrategy strategy to load index into memory
* @param parameters load time parameters
* @param openSearchIndexName opensearch index associated with index
* @param modelId model to be loaded. If none available, pass null
*/
public IndexEntryContext(
Directory directory,
String indexPath,
String vectorIndexCacheKey,
NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy,
Map<String, Object> parameters,
String openSearchIndexName,
String modelId
) {
super(indexPath);
super(vectorIndexCacheKey);
this.directory = directory;
this.indexLoadStrategy = indexLoadStrategy;
this.openSearchIndexName = openSearchIndexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.store.IndexInputWithBuffer;
import org.opensearch.knn.index.util.IndexUtil;
Expand Down Expand Up @@ -71,16 +72,25 @@ private IndexLoadStrategy() {
@Override
public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.IndexEntryContext indexEntryContext)
throws IOException {
// Ex: _0_NativeEngines990KnnVectorsFormat_0.vec
final String vectorFileName = indexEntryContext.getKey();
final KNNEngine knnEngine = KNNEngine.getEngineNameFromPath(vectorFileName);
// Extract vector file name from the given cache key.
// Ex: _0_165_my_field.faiss@1vaqiupVUwvkXAG4Qc/RPg==
final String cacheKey = indexEntryContext.getKey();
final String vectorFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(cacheKey);
if (vectorFileName == null) {
throw new IllegalStateException(
"Invalid cache key was given. The key [" + cacheKey + "] does not contain corresponding vector file name."
);
}

// Prepare for opening index input from directory.
final KNNEngine knnEngine = KNNEngine.getEngineNameFromPath(vectorFileName);
final Directory directory = indexEntryContext.getDirectory();
final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);

// Try to open an index input then pass it down to native engine for loading an index.
try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) {
IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream);
long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine);
final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream);
final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine);

return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName);
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/opensearch/knn/index/query/KNNWeight.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryAllocation;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.memory.NativeMemoryEntryContext;
Expand Down Expand Up @@ -274,6 +275,7 @@ private Map<Integer, Float> doANNSearch(
}

final String vectorIndexFileName = engineFiles.get(0);
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, reader.getSegmentInfo().info);

final KNNQueryResult[] results;
KNNCounter.GRAPH_QUERY_REQUESTS.increment();
Expand All @@ -284,7 +286,7 @@ private Map<Integer, Float> doANNSearch(
indexAllocation = nativeMemoryCacheManager.get(
new NativeMemoryEntryContext.IndexEntryContext(
reader.directory(),
vectorIndexFileName,
cacheKey,
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(),
getParametersAtLoading(
spaceType,
Expand Down
Loading

0 comments on commit 94058d3

Please sign in to comment.