Skip to content

Commit

Permalink
feat: add vertical scaling and SoftReference for snapshot repository …
Browse files Browse the repository at this point in the history
…data cache

- Applies `SoftReference` to cached repository data for efficient memory management under heap pressure.
- Enables cache size configuration in `opensearch.yml`, adjustable within a range of 500KB to 1% of heap memory.
- Sets the default cache size to `Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2)` so it’s generally proportional to heap size. In cases where 1% of the heap is less than 1000KB, indicating a low-memory environment, the default reverts to 500KB as before.
- Since `BytesReference` internally uses `byte[]`, the compressed array size is capped at `Integer.MAX_VALUE - 8` to ensure compatibility with JDK limitations on array sizes. Therefore, the maximum cache size cannot exceed this limit.

Signed-off-by: inpink <[email protected]>
  • Loading branch information
inpink committed Nov 8, 2024
1 parent e688388 commit a607693
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files))
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Snapshot related Settings
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,

SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.IndexMetaDataGenerations;
Expand All @@ -167,6 +168,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.SoftReference;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -196,6 +198,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
Expand Down Expand Up @@ -253,6 +256,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";

public static final String SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME = "snapshot.repository_data.cache.threshold";

public static final double SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE = 0.01;

public static final long CACHE_MIN_THRESHOLD = ByteSizeUnit.KB.toBytes(500);

public static final long CACHE_MAX_THRESHOLD = calculateMaxSnapshotRepositoryDataCacheThreshold();

public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold();

/**
* Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions. This ensures compatibility across various JDK versions. For a practical usage example, see this link: https://github.com/openjdk/jdk11u/blob/cee8535a9d3de8558b4b5028d68e397e508bef71/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ByteArrayChannel.java#L226
*/
private static final int MAX_SAFE_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
* contents will not result in the repository being marked as corrupted.
Expand All @@ -275,6 +293,58 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.Deprecated
);

/**
* Sets the cache size for snapshot repository data: the valid range is within 500Kb ... 1% of the node heap memory.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD = new Setting<>(
SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME,
CACHE_DEFAULT_THRESHOLD + "b",
(s) -> {
ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME);
long userDefinedLimitBytes = userDefinedLimit.getBytes();

if (userDefinedLimitBytes > CACHE_MAX_THRESHOLD) {
throw new IllegalArgumentException(
"["
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
+ "] cannot be larger than ["
+ CACHE_MAX_THRESHOLD
+ "] bytes."
);
}

if (userDefinedLimitBytes < CACHE_MIN_THRESHOLD) {
throw new IllegalArgumentException(
"["
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
+ "] cannot be smaller than ["
+ CACHE_MIN_THRESHOLD
+ "] bytes."
);
}

return userDefinedLimit;
},
Setting.Property.NodeScope
);

public static long calculateDefaultSnapshotRepositoryDataCacheThreshold() {
return Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2);
}

public static long calculateMaxSnapshotRepositoryDataCacheThreshold() {
long jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
long defaultThresholdOfHeap = (long) (jvmHeapSize * SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);
long maxThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

return maxThreshold;
}

protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, long defaultAbsoluteThreshold) {
return Math.min(Math.max(defaultThresholdOfHeap, defaultAbsoluteThreshold), MAX_SAFE_ARRAY_SIZE);
}

/**
* Size hint for the IO buffer size to use when reading from and writing to the repository.
*/
Expand Down Expand Up @@ -461,6 +531,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private volatile boolean enableAsyncDeletion;

protected final long repositoryDataCacheThreshold;

/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
Expand Down Expand Up @@ -515,6 +587,7 @@ protected BlobStoreRepository(
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
this.repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD.get(clusterService.getSettings()).getBytes();
}

@Override
Expand Down Expand Up @@ -1132,7 +1205,8 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
cached = latestKnownRepositoryData.get();
SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
cached = (softRef != null) ? softRef.get() : null;
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
Expand Down Expand Up @@ -2926,15 +3000,19 @@ public void endVerification(String seed) {
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);

// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
private final AtomicReference<SoftReference<Tuple<Long, BytesReference>>> latestKnownRepositoryData = new AtomicReference<>(
new SoftReference<>(null)
);

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null));
return;
}
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;

// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
Expand Down Expand Up @@ -2983,7 +3061,8 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
genToLoad = latestKnownRepoGen.get();
}
try {
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
Expand Down Expand Up @@ -3050,19 +3129,22 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
try {
serialized = CompressorRegistry.defaultCompressor().compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
long cacheWarningThreshold = Math.min(repositoryDataCacheThreshold * 10, MAX_SAFE_ARRAY_SIZE);
if (len > repositoryDataCacheThreshold) {
logger.debug(
"Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in"
"Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in"
+ " serialized size",
len,
metadata.name()
metadata.name(),
repositoryDataCacheThreshold
);
if (len > ByteSizeUnit.MB.toBytes(5)) {
if (len > cacheWarningThreshold) {
logger.warn(
"Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh"
"Your repository metadata blob for repository [{}] is larger than [{}] bytes. Consider moving to a fresh"
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable"
+ " repository behavior going forward.",
metadata.name()
metadata.name(),
cacheWarningThreshold
);
}
// Set empty repository data to not waste heap for an outdated cached value
Expand All @@ -3074,11 +3156,12 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
logger.warn("Failed to serialize repository data", e);
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
latestKnownRepositoryData.updateAndGet(knownRef -> {
Tuple<Long, BytesReference> known = (knownRef != null) ? knownRef.get() : null;
if (known != null && known.v1() > generation) {
return known;
return knownRef;
}
return new Tuple<>(generation, serialized);
return new SoftReference<>(new Tuple<>(generation, serialized));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@

import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -127,22 +129,75 @@ public void testIndicesFieldDataCacheSetting() {
);
}

public void testSnapshotRepositoryDataCacheSizeSetting() {
assertMemorySizeSettingInRange(
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,
"snapshot.repository_data.cache.threshold",
new ByteSizeValue(BlobStoreRepository.calculateDefaultSnapshotRepositoryDataCacheThreshold()),
ByteSizeUnit.KB.toBytes(500),
1.0
);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue) {
assertMemorySizeSetting(setting, settingKey, defaultValue, Settings.EMPTY);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue, Settings settings) {
assertMemorySizeSetting(setting, settingKey, defaultValue, 25.0, 1024, settings);
}

private void assertMemorySizeSetting(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
double availablePercentage,
long availableBytes,
Settings settings
) {
assertThat(setting, notNullValue());
assertThat(setting.getKey(), equalTo(settingKey));
assertThat(setting.getProperties(), hasItem(Property.NodeScope));
assertThat(setting.getDefault(settings), equalTo(defaultValue));
Settings settingWithPercentage = Settings.builder().put(settingKey, "25%").build();
Settings settingWithPercentage = Settings.builder().put(settingKey, percentageAsString(availablePercentage)).build();
assertThat(
setting.get(settingWithPercentage),
equalTo(new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.25)))
equalTo(
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * percentageAsFraction(availablePercentage)))
)
);
Settings settingWithBytesValue = Settings.builder().put(settingKey, "1024b").build();
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(1024)));
Settings settingWithBytesValue = Settings.builder().put(settingKey, availableBytes + "b").build();
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(availableBytes)));
}

private void assertMemorySizeSettingInRange(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
long minBytes,
double maxPercentage
) {
assertMemorySizeSetting(setting, settingKey, defaultValue, maxPercentage, minBytes, Settings.EMPTY);

assertThrows(IllegalArgumentException.class, () -> {
Settings settingWithTooSmallValue = Settings.builder().put(settingKey, minBytes - 1).build();
setting.get(settingWithTooSmallValue);
});

assertThrows(IllegalArgumentException.class, () -> {
double unavailablePercentage = maxPercentage + 0.1;
Settings settingWithPercentageExceedingLimit = Settings.builder()
.put(settingKey, percentageAsString(unavailablePercentage))
.build();
setting.get(settingWithPercentageExceedingLimit);
});
}

private double percentageAsFraction(double availablePercentage) {
return availablePercentage / 100.0;
}

private String percentageAsString(double availablePercentage) {
return availablePercentage + "%";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.stream.Collectors;

import static org.opensearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.calculateMaxWithinIntLimit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -653,4 +654,53 @@ public void testGetRestrictedSystemRepositorySettings() {
assertTrue(settings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY));
repository.close();
}

public void testSnapshotRepositoryDataCacheDefaultSetting() {
// given
BlobStoreRepository repository = setupRepo();
long maxThreshold = repository.calculateMaxSnapshotRepositoryDataCacheThreshold();

// when
long expectedThreshold = Math.max(ByteSizeUnit.KB.toBytes(500), maxThreshold / 2);

// then
assertEquals(repository.repositoryDataCacheThreshold, expectedThreshold);
}

public void testHeapThresholdUsed() {
// given
long defaultThresholdOfHeap = ByteSizeUnit.GB.toBytes(1);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(defaultThresholdOfHeap, expectedThreshold);
}

public void testAbsoluteThresholdUsed() {
// given
long defaultThresholdOfHeap = ByteSizeUnit.KB.toBytes(499);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long result = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(defaultAbsoluteThreshold, result);
}

public void testThresholdCappedAtIntMax() {
// given
int maxSafeArraySize = Integer.MAX_VALUE - 8;
long defaultThresholdOfHeap = (long) maxSafeArraySize + 1;
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(maxSafeArraySize, expectedThreshold);
}
}

0 comments on commit a607693

Please sign in to comment.