Skip to content

Commit

Permalink
final attempts at doing it generally before learning that wasnt even …
Browse files Browse the repository at this point in the history
…needed
  • Loading branch information
Peter Alfonsi committed Oct 17, 2023
1 parent 990ba9c commit d21af15
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public interface BytesReference extends Comparable<BytesReference>, ToXContentFragment {
public interface BytesReference extends Comparable<BytesReference>, ToXContentFragment, Serializable {

/**
* Convert an {@link XContentBuilder} into a BytesReference. This method closes the builder,
Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/indices/CachingTier.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
*/
public interface CachingTier<K, V> {

V get(K key) throws IOException;
V get(K key);

void put(K key, V value) throws IOException;
void put(K key, V value);

V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception;

void invalidate(K key) throws IOException;
void invalidate(K key);

V compute(K key, TieredCacheLoader<K, V> loader) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

Expand All @@ -40,6 +41,7 @@
import java.util.Collections;

public class EhcacheDiskCachingTier<K extends Writeable, V> implements DiskCachingTier<K, V>, RemovalListener<K, V> {
// & Writeable.Reader<K> ?

private final PersistentCacheManager cacheManager;
private final Cache<EhcacheKey, V> cache;
Expand Down Expand Up @@ -108,12 +110,16 @@ public EhcacheDiskCachingTier(
}

@Override
public V get(K key) throws IOException {
public V get(K key) {
// I don't think we need to do the future stuff as the cache is threadsafe

// if (keystore.contains(key.hashCode()) {
long now = System.nanoTime();
V value = cache.get(new EhcacheKey(key));
V value = null;
try {
value = cache.get(new EhcacheKey(key));
} catch (IOException ignored) { // do smth with this later
}
double tookTimeMillis = ((double) (System.nanoTime() - now)) / 1000000;
getTimeMillisEWMA.addValue(tookTimeMillis);
return value;
Expand All @@ -122,12 +128,15 @@ public V get(K key) throws IOException {
}

@Override
public void put(K key, V value) throws IOException {
public void put(K key, V value) {
// No need to get old value, this is handled by EhcacheEventListener.

// CheckDataResult policyResult = policy.checkData(value)
// if (policyResult.isAccepted()) {
cache.put(new EhcacheKey(key), value);
try {
cache.put(new EhcacheKey(key), value);
} catch (IOException ignored) { // do smth with this later
}
// keystore.add(key.hashCode());
// else { do something with policyResult.deniedReason()? }
// }
Expand All @@ -139,12 +148,15 @@ public V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception
}

@Override
public void invalidate(K key) throws IOException {
public void invalidate(K key) {
// keep keystore check to avoid unneeded disk seek
// RemovalNotification is handled by EhcacheEventListener

// if (keystore.contains(key.hashCode()) {
cache.remove(new EhcacheKey(key));
try {
cache.remove(new EhcacheKey(key));
} catch (IOException ignored) { // do smth with this later
}
// keystore.remove(key.hashCode());
// }
}
Expand Down Expand Up @@ -207,6 +219,16 @@ public K convertEhcacheKeyToOriginal(EhcacheKey eKey) throws IOException {
is.readBytes(bytes, 0, bytes.length);
// we somehow have to use the Reader thing in the Writeable interface
// otherwise its not generic
try {
K key = keyType.getDeclaredConstructor(new Class[]{StreamInput.class}).newInstance();
// This cannot be the proper way to do it
// but if it is, make K extend some interface guaranteeing it has such a constructor (which im sure already exists somewhere in OS)
return key;
} catch (Exception e) {
System.out.println("Was unable to reconstruct EhcacheKey into K");
e.printStackTrace();
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class EhcacheEventListener<K extends Writeable, V> implements CacheEventL
// Receives key-value pairs (BytesReference, BytesReference), but must transform into (Key, BytesReference)
// to send removal notifications
private RemovalListener<K, V> removalListener;
private EhcacheDiskCachingTier tier;
private EhcacheDiskCachingTier<K, V> tier;
EhcacheEventListener(RemovalListener<K, V> removalListener, EhcacheDiskCachingTier tier) {
this.removalListener = removalListener;
this.tier = tier; // needed to handle count changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ interface CacheEntity extends Accountable, Writeable {
*
* @opensearch.internal
*/
class Key implements Accountable, Writeable {
class Key implements Accountable, Writeable, Writeable.Reader<Key> {
private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);

public final CacheEntity entity; // use as identity equality
Expand Down Expand Up @@ -349,6 +349,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(readerCacheKeyUniqueId);
out.writeBytesReference(value);
}

@Override
public Key read(StreamInput in) throws IOException {
return new Key(in);
}
}

private class CleanupKey implements IndexReader.ClosedListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ public void testAddDirectToEhcache() throws Exception {
cache.closeDiskTier();
}

public void testSimpleEhcache() throws Exception {
/*public void testSimpleEhcache() throws Exception {
// for debug only, delete
CounterMetric count = new CounterMetric();
String cacheAlias = "dummy";
CounterMetric count = new CounterMetric();
String cacheAlias = "dummy";
class DummyRemovalListener implements RemovalListener<Integer, String> {
public DummyRemovalListener() { }
Expand Down Expand Up @@ -275,10 +275,11 @@ public void onRemoval(RemovalNotification<Integer, String> notification) {
IndicesRequestCache.Key key = new IndicesRequestCache.Key(entity, reader.getReaderCacheHelper().getKey(), termBytes);*/


cacheManager.removeCache(cacheAlias);
/*cacheManager.removeCache(cacheAlias);
cacheManager.close();
//IOUtils.close(reader, writer, dir);
}
}*/

public void testSpillover() throws Exception {
// fill the on-heap cache until we spill over
Expand Down

0 comments on commit d21af15

Please sign in to comment.