Skip to content

Commit

Permalink
Fix cache size setting
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Nov 13, 2024
1 parent 5068fad commit 646231b
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ static class TieredSpilloverCacheSegment<K, V> implements ICache<K, V> {
.build(),
builder.cacheType,
builder.cacheFactories

);

this.diskCache = builder.diskCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(onDiskRemovalListener)
.setKeyType(builder.cacheConfig.getKeyType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the onHeap cache size to be used within tiered cache.
* This setting overrides size settings from the heap tier implementation.
* For example, if OpenSearchOnHeapCache is the heap tier in the request cache, and
* indices.requests.cache.opensearch_onheap.size is set, that value will be ignored in favor of this setting.
*
* Pattern: {cache_type}.tiered_spillover.onheap.store.size
* Example: indices.request.cache.tiered_spillover.onheap.store.size
Expand All @@ -96,6 +99,7 @@ public class TieredSpilloverCacheSettings {

/**
* Setting which defines the disk cache size to be used within tiered cache.
* Similarly, this setting overrides the size setting from the disk tier implementation.
*/
public static final Setting.AffixSetting<Long> TIERED_SPILLOVER_DISK_STORE_SIZE = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.size",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public void close() {

}

long getMaxSize() {
return maxSize;
}

public static class MockDiskCacheFactory implements Factory {

public static final String NAME = "mockDiskCache";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

import static org.opensearch.cache.common.tier.TieredSpilloverCache.ZERO_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.MIN_DISK_CACHE_SIZE_IN_BYTES;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
Expand Down Expand Up @@ -2112,6 +2113,139 @@ public void testTieredCacheDefaultSegmentCount() {
assertTrue(VALID_SEGMENT_COUNT_VALUES.contains(tieredSpilloverCache.getNumberOfSegments()));
}

public void testSegmentSizesWhenUsingFactory() {
// The TSC's tier size settings, TIERED_SPILLOVER_ONHEAP_STORE_SIZE and TIERED_SPILLOVER_DISK_STORE_SIZE,
// should always be respected, overriding the individual implementation's size settings if present
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
long heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;
int numSegments = getNumberOfSegments();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
// These two size settings should be honored
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedHeapSize + "b"
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
expectedDiskSize
)
// The size setting from the OpenSearchOnHeap implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()).getKey(),
numSegments
)
.build();
String storagePath = getStoragePath(settings);

TieredSpilloverCache<String, String> tieredSpilloverCache = (TieredSpilloverCache<
String,
String>) new TieredSpilloverCache.TieredSpilloverCacheFactory().create(
new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setKeyType(String.class)
.setWeigher((k, v) -> keyValueSize)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(settings)
.setDimensionNames(dimensionNames)
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
// 20_000_000 ns = 20 ms to compute
.setClusterSettings(clusterSettings)
.setStoragePath(storagePath)
.build(),
CacheType.INDICES_REQUEST_CACHE,
Map.of(
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME,
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
MockDiskCache.MockDiskCacheFactory.NAME,
// The size value passed in here acts as the "implementation setting" for the disk tier, and should also be ignored
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, false, keyValueSize)
)
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

public void testSegmentSizesWhenNotUsingFactory() {
long expectedHeapSize = 256L * between(10, 20);
long expectedDiskSize = MIN_DISK_CACHE_SIZE_IN_BYTES + 256L * between(30, 40);
int heapSizeFromImplSetting = 50;
int diskSizeFromImplSetting = 50;

Settings settings = Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
// The size setting from the OpenSearchOnHeapCache implementation should not be honored
.put(
OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
heapSizeFromImplSetting + "b"
)
.build();

int keyValueSize = 1;
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
int numSegments = getNumberOfSegments();
CacheConfig<String, String> cacheConfig = getCacheConfig(1, settings, removalListener, numSegments);
TieredSpilloverCache<String, String> tieredSpilloverCache = getTieredSpilloverCache(
new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(),
new MockDiskCache.MockDiskCacheFactory(0, diskSizeFromImplSetting, true, keyValueSize),
cacheConfig,
null,
removalListener,
numSegments,
expectedHeapSize,
expectedDiskSize
);
checkSegmentSizes(tieredSpilloverCache, expectedHeapSize, expectedDiskSize);
}

private void checkSegmentSizes(TieredSpilloverCache<String, String> cache, long expectedHeapSize, long expectedDiskSize) {
OpenSearchOnHeapCache<String, String> segmentHeapCache = (OpenSearchOnHeapCache<
String,
String>) cache.tieredSpilloverCacheSegments[0].getOnHeapCache();
assertEquals(expectedHeapSize / cache.getNumberOfSegments(), segmentHeapCache.getMaximumWeight());

MockDiskCache<String, String> segmentDiskCache = (MockDiskCache<String, String>) cache.tieredSpilloverCacheSegments[0]
.getDiskCache();
assertEquals(expectedDiskSize / cache.getNumberOfSegments(), segmentDiskCache.getMaxSize());
}

private List<String> getMockDimensions() {
List<String> dims = new ArrayList<>();
for (String dimensionName : dimensionNames) {
Expand Down Expand Up @@ -2401,9 +2535,9 @@ private void verifyComputeIfAbsentThrowsException(
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
Settings settings = Settings.builder()
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class EhcacheDiskCacheSettings {

/**
* Disk cache max size setting.
* If this cache is used as a tier in a TieredSpilloverCache, this setting is ignored.
*/
public static final Setting.AffixSetting<Long> DISK_CACHE_MAX_SIZE_IN_BYTES_SETTING = Setting.suffixKeySetting(
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME + ".max_size_in_bytes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ private V deserializeValue(ByteArrayWrapper binary) {
return valueSerializer.deserialize(binary.value);
}

// Pkg-private for testing.
long getMaxWeightInBytes() {
return maxWeightInBytes;
}

/**
* Factory to create an ehcache disk cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand All @@ -51,6 +53,7 @@
import java.util.function.ToLongBiFunction;

import org.ehcache.PersistentCacheManager;
import org.ehcache.impl.serialization.StringSerializer;

import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_LISTENER_MODE_SYNC_KEY;
import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_MAX_SIZE_IN_BYTES_KEY;
Expand Down Expand Up @@ -1201,6 +1204,65 @@ public void testEhcacheCloseWithDestroyCacheMethodThrowingException() throws Exc
ehcacheDiskCache.close();
}

public void testWithCacheConfigSizeSettings() throws Exception {
// The cache should get its size from the config if present, and otherwise should get it from the setting.
long maxSizeFromSetting = between(MINIMUM_MAX_SIZE_IN_BYTES + 1000, MINIMUM_MAX_SIZE_IN_BYTES + 2000);
long maxSizeFromConfig = between(MINIMUM_MAX_SIZE_IN_BYTES + 3000, MINIMUM_MAX_SIZE_IN_BYTES + 4000);

EhcacheDiskCache<String, String> cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, false);
assertEquals(maxSizeFromSetting, cache.getMaxWeightInBytes());

cache = setupMaxSizeTest(maxSizeFromSetting, maxSizeFromConfig, true);
assertEquals(maxSizeFromConfig, cache.getMaxWeightInBytes());
}

// Modified from OpenSearchOnHeapCacheTests. Can't reuse, as we can't add a dependency on the server.test module.
private EhcacheDiskCache<String, String> setupMaxSizeTest(long maxSizeFromSetting, long maxSizeFromConfig, boolean putSizeInConfig)
throws Exception {
MockRemovalListener<String, String> listener = new MockRemovalListener<>();
try (NodeEnvironment env = newNodeEnvironment(Settings.builder().build())) {
Settings settings = Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, true)
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
EhcacheDiskCache.EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_MAX_SIZE_IN_BYTES_KEY)
.getKey(),
maxSizeFromSetting
)
.put(
EhcacheDiskCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(DISK_STORAGE_PATH_KEY)
.getKey(),
env.nodePaths()[0].indicesPath.toString() + "/request_cache/" + 0
)
.build();

CacheConfig.Builder<String, String> cacheConfigBuilder = new CacheConfig.Builder<String, String>().setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setWeigher(getWeigher())
.setRemovalListener(listener)
.setSettings(settings)
.setDimensionNames(List.of(dimensionName))
.setStatsTrackingEnabled(true);
if (putSizeInConfig) {
cacheConfigBuilder.setMaxSizeInBytes(maxSizeFromConfig);
}

ICache.Factory cacheFactory = new EhcacheDiskCache.EhcacheDiskCacheFactory();
return (EhcacheDiskCache<String, String>) cacheFactory.create(
cacheConfigBuilder.build(),
CacheType.INDICES_REQUEST_CACHE,
null
);
}
}

static class MockEhcahceDiskCache extends EhcacheDiskCache<String, String> {

public MockEhcahceDiskCache(Builder<String, String> builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public <K, V> ICache<K, V> createCache(CacheConfig<K, V> config, CacheType cache
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
if (!pluggableCachingEnabled(cacheType, settings)) {
// Condition 1: In case feature flag is off, we default to onHeap.
// Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older
// settings, so we again fallback to onHeap to maintain backward compatibility.
Expand All @@ -74,4 +74,15 @@ public NodeCacheStats stats(CommonStatsFlags flags) {
}
return new NodeCacheStats(statsMap, flags);
}

/**
* Check if pluggable caching is on, and if a store type is present for this cache type.
*/
public static boolean pluggableCachingEnabled(CacheType cacheType, Settings settings) {
Setting<String> cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String storeName = cacheSettingForCacheType.get(settings);
return FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) && storeName != null && !storeName.isBlank();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;
Expand Down Expand Up @@ -80,8 +81,8 @@ public OpenSearchOnHeapCache(Builder<K, V> builder) {
this.weigher = builder.getWeigher();
}

// package private for testing
long getMaximumWeight() {
// public for testing
public long getMaximumWeight() {
return this.maximumWeight;
}

Expand Down Expand Up @@ -192,8 +193,12 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
);
long maxSizeInBytes = ((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes();

if (config.getMaxSizeInBytes() > 0) { // If this is passed from upstream(like tieredCache), then use this
// instead.
if (config.getMaxSizeInBytes() > 0) {
/*
Use the cache config value if present.
This can be passed down from the TieredSpilloverCache when creating individual segments,
but is not passed in from the IRC if pluggable caching is on.
*/
builder.setMaximumWeightInBytes(config.getMaxSizeInBytes());
} else {
builder.setMaximumWeightInBytes(maxSizeInBytes);
Expand All @@ -204,8 +209,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
builder.setNumberOfSegments(-1); // By default it will use 256 segments.
}

String storeName = cacheSettingForCacheType.get(settings);
if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) {
if (!CacheService.pluggableCachingEnabled(cacheType, settings)) {
// For backward compatibility as the user intent is to use older settings.
builder.setMaximumWeightInBytes(config.getMaxSizeInBytes());
builder.setExpireAfterAccess(config.getExpireAfterAccess());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class OpenSearchOnHeapCacheSettings {

/**
* Setting to define maximum size for the cache as a percentage of heap memory available.
* If this cache is used as a tier in a TieredSpilloverCache, this setting is ignored.
*
* Setting pattern: {cache_type}.opensearch_onheap.size
*/
Expand Down
Loading

0 comments on commit 646231b

Please sign in to comment.