Skip to content

Commit

Permalink
Remove FileWatcher from KNN (opensearch-project#2182)
Browse files Browse the repository at this point in the history
Signed-off-by: Dooyong Kim <[email protected]>

(cherry picked from commit e5599aa)
  • Loading branch information
0ctopus13prime authored and Dooyong Kim committed Oct 21, 2024
1 parent 4a224e3 commit 77e2e61
Show file tree
Hide file tree
Showing 21 changed files with 333 additions and 325 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Add support to build vector data structures greedily and perform exact search when there are no engine files [#1942](https://github.com/opensearch-project/k-NN/issues/1942)
* Add CompressionLevel Calculation for PQ [#2200](https://github.com/opensearch-project/k-NN/pull/2200)
* Introduce a loading layer in native engine [#2185](https://github.com/opensearch-project/k-NN/pull/2185)
* Remove FSDirectory dependency from native engine constructing side and deprecated FileWatcher [#2182](https://github.com/opensearch-project/k-NN/pull/2182)
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
* KNN80DocValues should only be considered for BinaryDocValues fields [#2147](https://github.com/opensearch-project/k-NN/pull/2147)
Expand Down
51 changes: 28 additions & 23 deletions src/main/java/org/opensearch/knn/index/KNNIndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.knn.index.memory.NativeMemoryAllocation;
Expand All @@ -29,9 +30,7 @@
import org.opensearch.knn.index.engine.KNNEngine;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -94,14 +93,18 @@ public void warmup() throws IOException {
try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-warmup")) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
try {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.get(
new NativeMemoryEntryContext.IndexEntryContext(
directory,
engineFileContext.getIndexPath(),
cacheKey,
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(),
getParametersAtLoading(
engineFileContext.getSpaceType(),
KNNEngine.getEngineNameFromPath(engineFileContext.getIndexPath()),
KNNEngine.getEngineNameFromPath(engineFileContext.getVectorFileName()),
getIndexName(),
engineFileContext.getVectorDataType()
),
Expand Down Expand Up @@ -133,9 +136,13 @@ public void clearCache() {
indexAllocation.writeLock();
log.info("[KNN] Evicting index from cache: [{}]", indexName);
try (Engine.Searcher searcher = indexShard.acquireSearcher(INDEX_SHARD_CLEAR_CACHE_SEARCHER)) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach(
(engineFileContext) -> nativeMemoryCacheManager.invalidate(engineFileContext.getIndexPath())
);
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.invalidate(cacheKey);
});
} catch (IOException ex) {
log.error("[KNN] Failed to evict index from cache: [{}]", indexName, ex);
throw new RuntimeException(ex);
Expand Down Expand Up @@ -166,7 +173,6 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine

for (LeafReaderContext leafReaderContext : indexReader.leaves()) {
SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader());
Path shardPath = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory();
String fileExtension = reader.getSegmentInfo().info.getUseCompoundFile()
? knnEngine.getCompoundExtension()
: knnEngine.getExtension();
Expand All @@ -180,11 +186,9 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine
String modelId = fieldInfo.attributes().getOrDefault(MODEL_ID, null);
engineFiles.addAll(
getEngineFileContexts(
reader.getSegmentInfo().files(),
reader.getSegmentInfo().info.name,
reader.getSegmentInfo(),
fieldInfo.name,
fileExtension,
shardPath,
spaceType,
modelId,
FieldInfoExtractor.extractQuantizationConfig(fieldInfo) == QuantizationConfig.EMPTY
Expand All @@ -202,22 +206,22 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine

@VisibleForTesting
List<EngineFileContext> getEngineFileContexts(
Collection<String> files,
String segmentName,
SegmentCommitInfo segmentCommitInfo,
String fieldName,
String fileExtension,
Path shardPath,
SpaceType spaceType,
String modelId,
VectorDataType vectorDataType
) {
String prefix = buildEngineFilePrefix(segmentName);
String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return files.stream()
) throws IOException {
// Ex: 0_
final String prefix = buildEngineFilePrefix(segmentCommitInfo.info.name);
// Ex: _my_field.faiss
final String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return segmentCommitInfo.files()
.stream()
.filter(fileName -> fileName.startsWith(prefix))
.filter(fileName -> fileName.endsWith(suffix))
.map(fileName -> shardPath.resolve(fileName).toString())
.map(fileName -> new EngineFileContext(spaceType, modelId, fileName, vectorDataType))
.map(vectorFileName -> new EngineFileContext(spaceType, modelId, vectorFileName, vectorDataType, segmentCommitInfo.info))
.collect(Collectors.toList());
}

Expand All @@ -227,7 +231,8 @@ List<EngineFileContext> getEngineFileContexts(
static class EngineFileContext {
private final SpaceType spaceType;
private final String modelId;
private final String indexPath;
private final String vectorFileName;
private final VectorDataType vectorDataType;
private final SegmentInfo segmentInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,82 +11,34 @@

package org.opensearch.knn.index.codec.KNN80Codec;

import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;

import java.io.IOException;

import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.io.PathUtils;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

@Log4j2
public class KNN80DocValuesProducer extends DocValuesProducer {

private final SegmentReadState state;
private final DocValuesProducer delegate;
private final NativeMemoryCacheManager nativeMemoryCacheManager;
private final Map<String, String> indexPathMap = new HashMap();
private List<String> cacheKeys;

public KNN80DocValuesProducer(DocValuesProducer delegate, SegmentReadState state) {
this.delegate = delegate;
this.state = state;
this.nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();

Directory directory = state.directory;
// directory would be CompoundDirectory, we need get directory firstly and then unwrap
if (state.directory instanceof KNN80CompoundDirectory) {
directory = ((KNN80CompoundDirectory) state.directory).getDir();
}

Directory dir = FilterDirectory.unwrap(directory);
if (!(dir instanceof FSDirectory)) {
log.warn("{} can not casting to FSDirectory", directory);
return;
}
String directoryPath = ((FSDirectory) dir).getDirectory().toString();
for (FieldInfo field : state.fieldInfos) {
if (!field.attributes().containsKey(KNN_FIELD)) {
continue;
}
// Only segments that contains BinaryDocValues and doesn't have vector values should be considered.
// By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true
// to create binary doc values explicitly like any other field. Hence, we only want to include fields
// where approximate search is possible only by BinaryDocValues.
if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues() == true) {
continue;
}
// Only Native Engine put into indexPathMap
KNNEngine knnEngine = getNativeKNNEngine(field);
if (knnEngine == null) {
continue;
}
List<String> engineFiles = KNNCodecUtil.getEngineFiles(knnEngine.getExtension(), field.name, state.segmentInfo);
Path indexPath = PathUtils.get(directoryPath, engineFiles.get(0));
indexPathMap.putIfAbsent(field.getName(), indexPath.toString());

}
this.cacheKeys = getVectorCacheKeysFromSegmentReaderState(state);
}

@Override
Expand Down Expand Up @@ -121,32 +73,35 @@ public void checkIntegrity() throws IOException {

@Override
public void close() throws IOException {
for (String path : indexPathMap.values()) {
nativeMemoryCacheManager.invalidate(path);
}
final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
cacheKeys.forEach(nativeMemoryCacheManager::invalidate);
delegate.close();
}

public final List<String> getOpenedIndexPath() {
return new ArrayList<>(indexPathMap.values());
public final List<String> getCacheKeys() {
return new ArrayList<>(cacheKeys);
}

/**
* Get KNNEngine From FieldInfo
*
* @param field which field we need produce from engine
* @return if and only if Native Engine we return specific engine, else return null
*/
private KNNEngine getNativeKNNEngine(@NonNull FieldInfo field) {

final String modelId = field.attributes().get(MODEL_ID);
if (modelId != null) {
return null;
}
KNNEngine engine = FieldInfoExtractor.extractKNNEngine(field);
if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().contains(engine)) {
return engine;
private static List<String> getVectorCacheKeysFromSegmentReaderState(SegmentReadState segmentReadState) {
final List<String> cacheKeys = new ArrayList<>();

for (FieldInfo field : segmentReadState.fieldInfos) {
// Only segments that contains BinaryDocValues and doesn't have vector values should be considered.
// By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true
// to create binary doc values explicitly like any other field. Hence, we only want to include fields
// where approximate search is possible only by BinaryDocValues.
if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues()) {
continue;
}

final String vectorIndexFileName = KNNCodecUtil.getNativeEngineFileFromFieldInfo(field, segmentReadState.segmentInfo);
if (vectorIndexFileName == null) {
continue;
}
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, segmentReadState.segmentInfo);
cacheKeys.add(cacheKey);
}
return null;

return cacheKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.opensearch.common.UUIDs;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateReadConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -40,12 +45,14 @@
public class NativeEngines990KnnVectorsReader extends KnnVectorsReader {

private final FlatVectorsReader flatVectorsReader;
private final SegmentReadState segmentReadState;
private Map<String, String> quantizationStateCacheKeyPerField;
private SegmentReadState segmentReadState;
private final List<String> cacheKeys;

public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) throws IOException {
this.segmentReadState = state;
public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) {
this.flatVectorsReader = flatVectorsReader;
this.segmentReadState = state;
this.cacheKeys = getVectorCacheKeysFromSegmentReaderState(state);
loadCacheKeyMap();
}

Expand Down Expand Up @@ -176,10 +183,18 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits
*/
@Override
public void close() throws IOException {
// Clean up allocated vector indices resources from cache.
final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
cacheKeys.forEach(nativeMemoryCacheManager::invalidate);

// Close a reader.
IOUtils.close(flatVectorsReader);

// Clean up quantized state cache.
if (quantizationStateCacheKeyPerField != null) {
final QuantizationStateCacheManager quantizationStateCacheManager = QuantizationStateCacheManager.getInstance();
for (String cacheKey : quantizationStateCacheKeyPerField.values()) {
QuantizationStateCacheManager.getInstance().evict(cacheKey);
quantizationStateCacheManager.evict(cacheKey);
}
}
}
Expand All @@ -192,11 +207,26 @@ public long ramBytesUsed() {
return flatVectorsReader.ramBytesUsed();
}

private void loadCacheKeyMap() throws IOException {
private void loadCacheKeyMap() {
quantizationStateCacheKeyPerField = new HashMap<>();
for (FieldInfo fieldInfo : segmentReadState.fieldInfos) {
String cacheKey = UUIDs.base64UUID();
quantizationStateCacheKeyPerField.put(fieldInfo.getName(), cacheKey);
}
}

private static List<String> getVectorCacheKeysFromSegmentReaderState(SegmentReadState segmentReadState) {
final List<String> cacheKeys = new ArrayList<>();

for (FieldInfo field : segmentReadState.fieldInfos) {
final String vectorIndexFileName = KNNCodecUtil.getNativeEngineFileFromFieldInfo(field, segmentReadState.segmentInfo);
if (vectorIndexFileName == null) {
continue;
}
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, segmentReadState.segmentInfo);
cacheKeys.add(cacheKey);
}

return cacheKeys;
}
}
Loading

0 comments on commit 77e2e61

Please sign in to comment.