From 2d200fcedd8326b45276a8e0a9f5f0eaeea48dba Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 17 Oct 2023 12:33:53 -0700 Subject: [PATCH] Fixed multiple disk tier instance issue, made spillover test pass --- .../common/metrics/CounterMetric.java | 1 - .../indices/EhcacheDiskCachingTier.java | 34 +++-- .../indices/EhcacheEventListener.java | 2 +- .../org/opensearch/indices/EhcacheKey.java | 1 - .../TieredCacheSpilloverStrategyHandler.java | 11 +- .../indices/IndicesRequestCacheTests.java | 139 +++--------------- 6 files changed, 51 insertions(+), 137 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java index 33fc5e32e9c60..2e5eae5ceebe0 100644 --- a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java @@ -63,5 +63,4 @@ public void dec(long n) { public long count() { return counter.sum(); } - } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 5676a4278a45f..190cfa1b9f7cc 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -9,40 +9,30 @@ package org.opensearch.indices; import org.ehcache.PersistentCacheManager; +import org.ehcache.config.CacheRuntimeConfiguration; import org.ehcache.config.builders.CacheConfigurationBuilder; import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; import org.ehcache.config.builders.CacheManagerBuilder; import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; import org.ehcache.config.builders.ResourcePoolsBuilder; import org.ehcache.config.units.MemoryUnit; -import org.ehcache.core.internal.statistics.DefaultStatisticsService; -import org.ehcache.core.spi.service.StatisticsService; -import org.ehcache.core.statistics.TierStatistics; -import org.ehcache.event.CacheEvent; -import org.ehcache.event.CacheEventListener; +import org.ehcache.event.EventFiring; +import org.ehcache.event.EventOrdering; import org.ehcache.event.EventType; import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration; -import org.opensearch.action.admin.indices.exists.indices.IndicesExistsAction; import org.opensearch.common.ExponentiallyWeightedMovingAverage; import org.opensearch.common.cache.RemovalListener; import org.ehcache.Cache; import org.opensearch.common.cache.RemovalNotification; -import org.opensearch.common.cache.RemovalReason; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Collections; +import java.util.EnumSet; public class EhcacheDiskCachingTier implements DiskCachingTier, RemovalListener { - // & Writeable.Reader ? public static PersistentCacheManager cacheManager; private Cache cache; @@ -124,7 +114,22 @@ private void getOrCreateCache(boolean isPersistent, long maxWeightInBytes) { .withService(listenerConfig)); } catch (IllegalArgumentException e) { // Thrown when the cache already exists, which may happen in test cases + // In this case the listener is configured to send messages to some other disk tier instance, which we don't want + // (it was set up unnecessarily by the test case) + + // change config of existing cache to use this listener rather than the one instantiated by the test case cache = cacheManager.getCache(cacheAlias, EhcacheKey.class, BytesReference.class); + // cache.getRuntimeConfiguration().cacheConfigurationListenerList contains the old listener, but it's private + // and theres no method to clear it unless you have the actual listener object, so it has to stay i think + + cache.getRuntimeConfiguration().registerCacheEventListener(listener, EventOrdering.ORDERED, EventFiring.ASYNCHRONOUS, + EnumSet.of( + EventType.EVICTED, + EventType.EXPIRED, + EventType.REMOVED, + EventType.UPDATED, + EventType.CREATED)); + int k = 1; } } @@ -204,6 +209,7 @@ public Iterable keys() { @Override public int count() { + int j = 0; return (int) count.count(); } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java index 567daa4a266ce..e14102fd80ed5 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java @@ -17,7 +17,7 @@ import org.opensearch.core.common.bytes.BytesReference; public class EhcacheEventListener implements CacheEventListener { - // Receives key-value pairs (BytesReference, BytesReference), but must transform into (Key, BytesReference) + // Receives key-value pairs (EhcacheKey, BytesReference), but must transform into (Key, BytesReference) // to send removal notifications private final RemovalListener removalListener; private final EhcacheDiskCachingTier tier; diff --git a/server/src/main/java/org/opensearch/indices/EhcacheKey.java b/server/src/main/java/org/opensearch/indices/EhcacheKey.java index 43695f4bcc26f..f8fa87932d66f 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheKey.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheKey.java @@ -10,7 +10,6 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.common.io.stream.Writeable; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java index 203f72c8ff979..6a4aa812cf010 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -91,6 +91,15 @@ public long count() { return totalCount; } + public long count(TierType tierType) { + for (CachingTier cachingTier : cachingTierList) { + if (cachingTier.getTierType() == tierType) { + return cachingTier.count(); + } + } + return -1L; + } + @Override public void onRemoval(RemovalNotification notification) { if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) { @@ -173,7 +182,7 @@ public Builder setTieredCacheEventListener(TieredCacheEventListener public TieredCacheSpilloverStrategyHandler build() { return new TieredCacheSpilloverStrategyHandler( this.onHeapCachingTier, - this.diskCachingTier, // not sure why it was yelling about this, it already is an EhcacheDiskCachingTier + this.diskCachingTier, this.tieredCacheEventListener ); } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 6ef23c7bb0790..35da7bac938e6 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -59,11 +59,8 @@ import org.ehcache.event.EventType; import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration; import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.cache.RemovalListener; -import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; -import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; @@ -164,128 +161,29 @@ public void testAddDirectToEhcache() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); AtomicBoolean indexShard = new AtomicBoolean(true); TestEntity entity = new TestEntity(requestCacheStats, indexShard); - Loader loader = new Loader(reader, 0); - IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9]; TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); IndicesRequestCache.Key key = cache.new Key(entity, termBytes, rKey); - //TestBytesReference value = new TestBytesReference(124); BytesReference value = new BytesArray(new byte[]{0}); cache.tieredCacheHandler.getDiskCachingTier().put(key, value); - System.out.println("Size: " + cache.tieredCacheHandler.getDiskCachingTier().count()); + BytesReference res = cache.tieredCacheHandler.getDiskCachingTier().get(key); + assertEquals(value, res); + assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK)); IOUtils.close(reader, writer, dir, cache); cache.closeDiskTier(); } - /*public void testSimpleEhcache() throws Exception { - // for debug only, delete - CounterMetric count = new CounterMetric(); - String cacheAlias = "dummy"; - - class DummyRemovalListener implements RemovalListener { - public DummyRemovalListener() { } - @Override - public void onRemoval(RemovalNotification notification) { - System.out.println(":)"); - } - } - - CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder - .newEventListenerConfiguration(new EhcacheEventListener(new DummyRemovalListener(), count), - EventType.EVICTED, - EventType.EXPIRED, - EventType.REMOVED, - EventType.UPDATED, - EventType.CREATED) - .ordered().asynchronous(); // ordered() has some performance penalty as compared to unordered(), we can also use synchronous() - - StatisticsService statsService = new DefaultStatisticsService(); - - PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() - .defaultPool("default", 0, 4) - .build(); - - PersistentCacheManager cacheManager; - - boolean doIntCache = false; - - if (doIntCache) { - cacheManager = CacheManagerBuilder.newCacheManagerBuilder() - .using(statsService) // https://stackoverflow.com/questions/40453859/how-to-get-ehcache-3-1-statistics - .using(threadConfig) - .with(CacheManagerBuilder.persistence(EhcacheDiskCachingTier.DISK_CACHE_FP)) - .withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( - Integer.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(10, MemoryUnit.MB, false)) - .withService(listenerConfig) - ).build(true); - Cache integerCache = cacheManager.getCache(cacheAlias, Integer.class, String.class); - - integerCache.put(0, "blorp"); - System.out.println("Counter value = " + count.count()); - String res = integerCache.get(0); - System.out.println("Got result " + res); - System.out.println("Counter value = " + count.count()); - } else { - cacheManager = CacheManagerBuilder.newCacheManagerBuilder() - .using(statsService) // https://stackoverflow.com/questions/40453859/how-to-get-ehcache-3-1-statistics - .using(threadConfig) - .with(CacheManagerBuilder.persistence(EhcacheDiskCachingTier.DISK_CACHE_FP)) - .withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( - DummySerializableKey.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(10, MemoryUnit.MB, false)) - .withService(listenerConfig) - ).build(true); - Cache cache = cacheManager.getCache(cacheAlias, DummySerializableKey.class, String.class); - - DummySerializableKey key = new DummySerializableKey(Integer.valueOf(0), "blah"); - cache.put(key, "blorp"); - System.out.println("Counter value = " + count.count()); - String res = cache.get(key); - System.out.println("Got result " + res); - System.out.println("Counter value = " + count.count()); - TierStatistics ts = statsService.getCacheStatistics(cacheAlias).getTierStatistics().get("Disk"); - System.out.println("self-reported count = " + ts.getMappings()); - System.out.println("self-reported misses = " + ts.getMisses()); - System.out.println("self-reported hits = " + ts.getHits()); - - List> foos = new ArrayList<>(); - for(Cache.Entry entry : cache) { - foos.add(entry); - } - int j = 0; - j++; - System.out.println(j); - } - - /*Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); - Loader loader = new Loader(reader, 0); - System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes()); - IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9]; - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndicesRequestCache.Key key = new IndicesRequestCache.Key(entity, reader.getReaderCacheHelper().getKey(), termBytes);*/ - - - /*cacheManager.removeCache(cacheAlias); - cacheManager.close(); - //IOUtils.close(reader, writer, dir); - - }*/ - public void testSpillover() throws Exception { // fill the on-heap cache until we spill over ShardRequestCache requestCacheStats = new ShardRequestCache(); Settings.Builder settingsBuilder = Settings.builder(); - long heapSizeBytes = 1000; // each of these queries is 115 bytes, so we can fit 8 in the heap cache + long heapSizeBytes = 1000; // each of these queries is 131 bytes, so we can fit 7 in the heap cache + int heapKeySize = 131; + int maxNumInHeap = 1000 / heapKeySize; settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes)); IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class)); @@ -298,8 +196,8 @@ public void testSpillover() throws Exception { TestEntity entity = new TestEntity(requestCacheStats, indexShard); Loader loader = new Loader(reader, 0); System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes()); - IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9]; - for (int i = 0; i < 9; i++) { + IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[maxNumInHeap + 1]; + for (int i = 0; i < maxNumInHeap + 1; i++) { TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i)); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); @@ -310,15 +208,18 @@ public void testSpillover() throws Exception { } // attempt to get value from disk cache, the first key should have been evicted BytesReference firstValue = cache.tieredCacheHandler.get(keys[0]); - System.out.println("Final on-heap cache size = " + requestCacheStats.stats().getMemorySizeInBytes()); // is correctly 920 - //System.out.println("Final self-reported disk size = " + cache.tieredCacheHandler.getDiskWeightBytes()); // is 0, should be 115 - System.out.println("On-heap tier evictions = " + requestCacheStats.stats().getEvictions()); // is correctly 1 - System.out.println("Disk tier hits = " + requestCacheStats.stats(TierType.DISK).getHitCount()); // should be 1, is 0 bc keys not serializable - System.out.println("Disk tier misses = " + requestCacheStats.stats(TierType.DISK).getMissCount()); // should be 9, is 10 bc keys not serializable - //System.out.println("Disk tier self-reported misses = " + cache.tieredCacheHandler.getDiskCachingTier().getMisses()); // should be same as other one - System.out.println("On-heap tier hits = " + requestCacheStats.stats().getHitCount()); // is correctly 0 - System.out.println("On-heap tier misses = " + requestCacheStats.stats().getMissCount()); // is correctly 10 - System.out.println("Disk count = " + cache.tieredCacheHandler.getDiskCachingTier().count()); // should be 1, is 0 + + assertEquals(maxNumInHeap * heapKeySize, requestCacheStats.stats().getMemorySizeInBytes()); + // TODO: disk weight bytes + assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(1, requestCacheStats.stats(TierType.DISK).getHitCount()); + assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount()); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(maxNumInHeap + 2, requestCacheStats.stats().getMissCount()); + assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP)); + assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK)); + + // more? IOUtils.close(reader, writer, dir, cache); cache.closeDiskTier(); }