Skip to content

Commit

Permalink
ML-2572: Fix caching bug (#27)
Browse files Browse the repository at this point in the history
* ML-2572: Fix caching bug

* Minor refactoring

* Fix GC bug + optimizations + locking + tests

* Add puts to same key

* Improve tests
  • Loading branch information
Gal Topper authored Sep 7, 2022
1 parent 19a00c4 commit 8b5d747
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 19 deletions.
25 changes: 25 additions & 0 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from v3iofs.fs import _Cache


def test_put_and_get():
cache = _Cache(10, 100)
cache.put("k1", "v1")
cache.put("k2", "v2")
cache.put("k3", "v3.1")
cache.put("k3", "v3.2")
assert cache.get("k1") == "v1"
assert cache.get("k2") == "v2"
assert cache.get("k3") == "v3.2"


def test_invalidation_and_gc():
cache = _Cache(10, 0)
cache.put("k1", "v1")
cache.put("k2", "v2")
cache.put("k3", "v3.1")
cache.put("k3", "v3.2")
assert cache.get("k1") is None
assert cache.get("k2") is None
assert cache.get("k3") is None
assert cache._cache == {}
assert cache._expiry_to_key == []
44 changes: 25 additions & 19 deletions v3iofs/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import datetime, timezone
from os import environ
import time
from threading import Lock
from urllib.parse import urlparse

from fsspec.spec import AbstractFileSystem
Expand All @@ -32,52 +33,52 @@
class _Cache:
def __init__(self, capacity, cache_validity_seconds):
self._cache = {}
self._time_to_key = []
self._expiry_to_key = []
self._capacity = capacity
self._cache_validity_seconds = cache_validity_seconds

def put(self, key, value):
start_time = time.monotonic()
now = time.monotonic()
expiry = now + self._cache_validity_seconds

self._cache[key] = (start_time, value)
self._time_to_key.append((start_time, key))
self._cache[key] = (expiry, value)
self._expiry_to_key.append((expiry, key))

# GC
if len(self._cache) > self._capacity:
self._gc(start_time)
self._gc(now)

def get(self, key):
start_time = time.monotonic()

lookup_result = self._cache.get(key)
if lookup_result is None:
return None
key_time, value = lookup_result
if key_time <= start_time + self._cache_validity_seconds:
expiry, value = lookup_result
now = time.monotonic()
if now <= expiry:
return value

self._gc(start_time)
self._gc(now)

return None

def delete_if_exists(self, key):
self._cache.pop(key, None)

def _gc(self, until):
for key_time, key in self._time_to_key:
num_removed = 0
if key_time > until + self._cache_validity_seconds or len(self._cache) > self._capacity:
num_removed = 0
for expiry, key in self._expiry_to_key:
if expiry <= until or len(self._cache) > self._capacity:
result = self._cache.get(key, None)
if result:
cache_time, value = result
# cache entry may have been deleted and re-added
if cache_time == key_time:
if cache_time == expiry:
del self._cache[key]

num_removed += 1
else:
break
self._time_to_key = self._time_to_key[num_removed:]
self._expiry_to_key = self._expiry_to_key[num_removed:]


class V3ioFS(AbstractFileSystem):
Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(self, v3io_api=None, v3io_access_key=None, cache_validity_seconds=N
cache_capacity = 128
if cache_validity_seconds > 0:
self._cache = _Cache(int(cache_capacity), int(cache_validity_seconds))
self._cache_lock = Lock()

def ls(self, path, detail=True, marker=None, **kwargs):
"""Lists files & directories under path"""
Expand Down Expand Up @@ -210,7 +212,8 @@ def _rm(self, path):
f'{resp.status_code} received while accessing {path!r}')

if self._cache:
self._cache.delete_if_exists(path)
with self._cache_lock:
self._cache.delete_if_exists(path)

def touch(self, path, truncate=True, **kwargs):
if not truncate: # TODO
Expand Down Expand Up @@ -247,7 +250,8 @@ def info(self, path, **kw):
path_with_container = strip_schema(path)

if self._cache:
lookup_result = self._cache.get(path_with_container)
with self._cache_lock:
lookup_result = self._cache.get(path_with_container)
if lookup_result:
return lookup_result

Expand All @@ -271,7 +275,8 @@ def info(self, path, **kw):
'uid': resp.output.item['__uid'],
}
if self._cache:
self._cache.put(path_with_container, entry)
with self._cache_lock:
self._cache.put(path_with_container, entry)
return entry
elif resp.status_code == 404:
pass # The file may still be a directory.
Expand All @@ -290,7 +295,8 @@ def info(self, path, **kw):
if resp.status_code == 200:
entry = {'name': path_with_container, 'size': 0, 'type': 'directory'}
if self._cache:
self._cache.put(path_with_container, entry)
with self._cache_lock:
self._cache.put(path_with_container, entry)
return entry
elif resp.status_code == 404:
raise FileNotFoundError(path_with_container)
Expand Down

0 comments on commit 8b5d747

Please sign in to comment.