Skip to content

Commit

Permalink
Enable persiting cache to disk
Browse files Browse the repository at this point in the history
If implemented, the cache will be persisted to disk by calling
`Persist`. It will be loaded when instantiating a cache by calling `New`
if an existing `path` is provided.

Signed-off-by: Soule BA <[email protected]>
  • Loading branch information
souleb committed May 31, 2024
1 parent a108669 commit c0c8ee5
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 18 deletions.
226 changes: 217 additions & 9 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ limitations under the License.
package cache

import (
"bufio"
"cmp"
"encoding/binary"
"encoding/json"
"errors"
"io"
"os"
"slices"
"sort"
"sync"
Expand All @@ -40,34 +46,37 @@ type Cache[T any] struct {
// item is an item stored in the cache.
type item[T any] struct {
key string
// object is the item's object.
object T
// expiration is the item's expiration time.
expiration int64
// object is the item's object.
object T
}

type cache[T any] struct {
// index holds the cache index.
index map[string]*item[T]
// items is the store of elements in the cache.
items []*item[T]
// sorted indicates whether the items are sorted by expiration time.
// It is initially true, and set to false when the items are not sorted.
sorted bool

// capacity is the maximum number of index the cache can hold.
capacity int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
closed bool
path string
buf buffer
// sorted indicates whether the items are sorted by expiration time.
// It is initially true, and set to false when the items are not sorted.
sorted bool
closed bool

mu sync.RWMutex
}

var _ Expirable[any] = &Cache[any]{}

// New creates a new cache with the given configuration.
func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
func New[T any](capacity int, path string, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
opt := storeOptions[T]{}
for _, o := range opts {
err := o(&opt)
Expand All @@ -83,6 +92,7 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
path: path,
metrics: newCacheMetrics(opt.registerer, opt.extraLabels...),
labelsFunc: opt.labelsFunc,
janitor: &janitor[T]{
Expand All @@ -93,6 +103,16 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]

C := &Cache[T]{cache: c, keyFunc: keyFunc}

if c.path != "" {
// load the cache from the file if it exists
if _, err := os.Stat(c.path); err == nil {
err = c.load()
if err != nil {
return nil, err
}
}
}

if opt.interval > 0 {
go c.janitor.run(c)
}
Expand Down Expand Up @@ -341,8 +361,8 @@ func (c *cache[T]) Resize(size int) int {
}

// delete the overflow indexes
for _, v := range c.items[:overflow] {
delete(c.index, v.key)
for _, item := range c.items[:overflow] {
delete(c.index, item.key)
c.metrics.incCacheEvictions()
c.metrics.decCacheItems()
}
Expand Down Expand Up @@ -494,3 +514,191 @@ func (j *janitor[T]) run(c *cache[T]) {
}
}
}

// buffer is a helper type used to write data to a byte slice
type buffer []byte

// clear clears the buffer
func (s *buffer) clear() {
*s = (*s)[:0]
}

// writeByteSlice writes a byte slice to the buffer
func (s *buffer) writeByteSlice(v []byte) {
*s = append(*s, v...)
}

// writeUint64 writes a uint64 to the buffer
// it is written in little endian format
func (s *buffer) writeUint64(v uint64) {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], v)
*s = append(*s, buf[:]...)
}

// writeBuf writes the buffer to the file
func (c *cache[T]) writeBuf(file *os.File) error {
if _, err := file.Write(c.buf); err != nil {
return err
}
// sync the file to disk straight away
file.Sync()
return nil
}

// Persist writes the cache to disk
// The cache is written to a temporary file first
// and then renamed to the final file name to atomically
// update the cache file. This is done to avoid corrupting
// the cache file in case of a crash while writing to the file. If a file
// with the same name exists, it is overwritten.
// The cache file is written in the following format:
// key length, key, expiration, data length, data // repeat for each item
// The key length and data length are written as uint64 in little endian format
// The expiration is written as a unix timestamp in seconds as uint64 in little endian format
// The key is written as a byte slice
// The data is written as a json encoded byte slice
func (c *cache[T]) Persist() error {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.writeToBuf(); err != nil {
return err
}

// create new temp file
newFile, err := os.Create(c.path + ".tmp")
if err != nil {
errf := os.Remove(c.path + ".tmp")
return errors.Join(err, errf)
}

if err := c.writeBuf(newFile); err != nil {
errf := os.Remove(c.path + ".tmp")
return errors.Join(err, errf)
}

// close the file
if err := newFile.Close(); err != nil {
errf := os.Remove(c.path + ".tmp")
return errors.Join(err, errf)
}

if err := os.Rename(c.path+".tmp", c.path); err != nil {
panic("shrink failed: " + err.Error())
}

return nil
}

// writeToBuf writes the cache to the buffer
func (c *cache[T]) writeToBuf() error {
c.buf.clear()
for _, item := range c.items {
data, err := json.Marshal(item.object)
if err != nil {
return err
}

// write the key, expiration and data to the buffer
// format: key length, key, expiration, data length, data
// doing this this way, gives us the ability to read the file
// without having to read the entire file into memory. This is
// done for possible future use cases e.g. where the cache file
// could be very large or for range queries.
c.buf.writeUint64(uint64(len(item.key)))
c.buf.writeByteSlice([]byte(item.key))
c.buf.writeUint64(uint64(item.expiration))
c.buf.writeUint64(uint64(len(data)))
c.buf.writeByteSlice(data)
}
return nil
}

// load reads the cache from disk
// The cache file is read in the following format:
// key length, key, expiration, data length, data // repeat for each item
// This function cannot be called concurrently, and should be called
// before the cache is used.
func (c *cache[T]) load() error {
file, err := os.Open(c.path)
if err != nil {
return err
}
defer file.Close()

rd := bufio.NewReader(file)
items, err := c.readFrom(rd)
if err != nil {
return err
}

for _, item := range items {
if len(c.items) >= c.capacity {
break
}
c.items = append(c.items, item)
c.index[item.key] = item
}

if len(c.items) > 0 {
c.metrics.setCachedItems(float64(len(c.items)))
c.sorted = false
}
return nil
}

func (c *cache[T]) readFrom(rd io.Reader) ([]*item[T], error) {
items := make([]*item[T], 0)
for {
// read until EOF
item, err := c.readItem(rd)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
items = append(items, item)
}
return items, nil
}

func (c *cache[T]) readItem(rd io.Reader) (*item[T], error) {
var (
buf = make([]byte, 8)
item item[T]
)
if _, err := io.ReadFull(rd, buf); err != nil {
if err == io.EOF {
return nil, err
}
return nil, err
}
keyLen := binary.LittleEndian.Uint64(buf)
key := make([]byte, keyLen)
if _, err := io.ReadFull(rd, key); err != nil {
return nil, err
}
item.key = string(key)

if _, err := io.ReadFull(rd, buf); err != nil {
return nil, err
}
item.expiration = int64(binary.LittleEndian.Uint64(buf))

if _, err := io.ReadFull(rd, buf); err != nil {
return nil, err
}
dataLen := binary.LittleEndian.Uint64(buf)
data := make([]byte, dataLen)
if _, err := io.ReadFull(rd, data); err != nil {
return nil, err
}

if err := json.Unmarshal(data, &item.object); err != nil {
return nil, err
}

return &item, nil
}
Loading

0 comments on commit c0c8ee5

Please sign in to comment.