-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: add TTL support #25
base: master
Are you sure you want to change the base?
Changes from all commits
aeb97c6
3a32041
ab6f170
7265abb
1b81014
6a89f51
e89e7e2
9b92dfa
323e89e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
name: golangci-lint | ||
on: | ||
push: | ||
tags: | ||
- v* | ||
branches: | ||
- master | ||
- main | ||
pull_request: | ||
permissions: | ||
contents: read | ||
# Optional: allow read access to pull request. Use with `only-new-issues` option. | ||
pull-requests: read | ||
jobs: | ||
golangci: | ||
name: lint | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/setup-go@v3 | ||
with: | ||
go-version: 1.17 | ||
- uses: actions/checkout@v3 | ||
- name: golangci-lint | ||
uses: golangci/golangci-lint-action@v3 | ||
with: | ||
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version | ||
version: v1.29 | ||
|
||
# Optional: working directory, useful for monorepos | ||
# working-directory: somedir | ||
|
||
# Optional: golangci-lint command line arguments. | ||
# args: --issues-exit-code=0 | ||
|
||
# Optional: show only new issues if it's a pull request. The default value is `false`. | ||
only-new-issues: true | ||
|
||
# Optional: if set to true then the all caching functionality will be complete disabled, | ||
# takes precedence over all other caching options. | ||
# skip-cache: true | ||
|
||
# Optional: if set to true then the action don't cache or restore ~/go/pkg. | ||
# skip-pkg-cache: true | ||
|
||
# Optional: if set to true then the action don't cache or restore ~/.cache/go-build. | ||
# skip-build-cache: true |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
issues: | ||
exclude-rules: | ||
- linters: | ||
- staticcheck | ||
text: "SA1019:" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,13 @@ limitations under the License. | |
|
||
package galaxycache | ||
|
||
import "time" | ||
|
||
// Codec includes both the BinaryMarshaler and BinaryUnmarshaler | ||
// interfaces | ||
type Codec interface { | ||
MarshalBinary() ([]byte, error) | ||
UnmarshalBinary(data []byte) error | ||
MarshalBinary() ([]byte, time.Time, error) | ||
UnmarshalBinary(data []byte, expire time.Time) error | ||
} | ||
|
||
// Note: to ensure that unmarshaling is a read-only operation, bytes | ||
|
@@ -32,48 +34,60 @@ func cloneBytes(b []byte) []byte { | |
} | ||
|
||
// ByteCodec is a byte slice type that implements Codec | ||
type ByteCodec []byte | ||
type ByteCodec struct { | ||
bytes []byte | ||
expire time.Time | ||
} | ||
Comment on lines
+37
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change breaks the common use-case of This change makes it impossible to extract the value out for anyone outside this package. |
||
|
||
// MarshalBinary on a ByteCodec returns the bytes | ||
func (c *ByteCodec) MarshalBinary() ([]byte, error) { | ||
return *c, nil | ||
func (c *ByteCodec) MarshalBinary() ([]byte, time.Time, error) { | ||
return c.bytes, c.expire, nil | ||
} | ||
|
||
// UnmarshalBinary on a ByteCodec sets the ByteCodec to | ||
// a copy of the provided data | ||
func (c *ByteCodec) UnmarshalBinary(data []byte) error { | ||
*c = cloneBytes(data) | ||
func (c *ByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { | ||
c.bytes = cloneBytes(data) | ||
c.expire = expire | ||
return nil | ||
} | ||
|
||
// CopyingByteCodec is a byte slice type that implements Codec | ||
// and returns a copy of the bytes when marshaled | ||
type CopyingByteCodec []byte | ||
type CopyingByteCodec struct { | ||
bytes []byte | ||
expire time.Time | ||
} | ||
|
||
// MarshalBinary on a CopyingByteCodec returns a copy of the bytes | ||
func (c *CopyingByteCodec) MarshalBinary() ([]byte, error) { | ||
return cloneBytes(*c), nil | ||
func (c *CopyingByteCodec) MarshalBinary() ([]byte, time.Time, error) { | ||
return cloneBytes(c.bytes), c.expire, nil | ||
} | ||
|
||
// UnmarshalBinary on a CopyingByteCodec sets the ByteCodec to | ||
// a copy of the provided data | ||
func (c *CopyingByteCodec) UnmarshalBinary(data []byte) error { | ||
*c = cloneBytes(data) | ||
func (c *CopyingByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { | ||
c.bytes = cloneBytes(data) | ||
c.expire = expire | ||
return nil | ||
} | ||
|
||
// StringCodec is a string type that implements Codec | ||
type StringCodec string | ||
type StringCodec struct { | ||
str string | ||
expire time.Time | ||
} | ||
|
||
// MarshalBinary on a StringCodec returns the bytes underlying | ||
// the string | ||
func (c *StringCodec) MarshalBinary() ([]byte, error) { | ||
return []byte(*c), nil | ||
func (c *StringCodec) MarshalBinary() ([]byte, time.Time, error) { | ||
return []byte(c.str), c.expire, nil | ||
} | ||
|
||
// UnmarshalBinary on a StringCodec sets the StringCodec to | ||
// a stringified copy of the provided data | ||
func (c *StringCodec) UnmarshalBinary(data []byte) error { | ||
*c = StringCodec(data) | ||
func (c *StringCodec) UnmarshalBinary(data []byte, expire time.Time) error { | ||
c.str = string(data) | ||
c.expire = expire | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -400,6 +400,35 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritati | |
} | ||
} | ||
|
||
// GetMultiple is like Get but fetches multiple keys at once into the respective | ||
// destinations (codecs). | ||
func (g *Galaxy) GetMultiple(ctx context.Context, keys []string, destinations []Codec) error { | ||
if len(keys) != len(destinations) { | ||
return fmt.Errorf("number of keys vs. codecs doesn't match (%d vs. %d)", len(keys), len(destinations)) | ||
} | ||
ctx, tagErr := tag.New(ctx, tag.Upsert(GalaxyKey, g.name)) | ||
if tagErr != nil { | ||
return fmt.Errorf("Error tagging context: %s", tagErr) | ||
} | ||
|
||
ctx, span := trace.StartSpan(ctx, "galaxycache.(*Galaxy).GetMultiple on "+g.name) | ||
startTime := time.Now() | ||
defer func() { | ||
g.recordStats(ctx, nil, MRoundtripLatencyMilliseconds.M(sinceInMilliseconds(startTime))) | ||
span.End() | ||
}() | ||
|
||
g.Stats.Gets.Add(1) | ||
g.recordStats(ctx, nil, MGets.M(1)) | ||
|
||
// TODO: | ||
// Group each key by peer. | ||
// Request all of those keys from that one peer concurrently. | ||
// Try to load what is missing. | ||
|
||
return nil | ||
} | ||
|
||
// Get as defined here is the primary "get" called on a galaxy to | ||
// find the value for the given key, using the following logic: | ||
// - First, try the local cache; if its a cache hit, we're done | ||
|
@@ -436,7 +465,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { | |
value.stats.touch() | ||
g.recordRequest(ctx, hlvl, false) | ||
g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) | ||
return dest.UnmarshalBinary(value.data) | ||
return dest.UnmarshalBinary(value.data, value.expire) | ||
} | ||
|
||
span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss") | ||
|
@@ -456,7 +485,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { | |
if destPopulated { | ||
return nil | ||
} | ||
return dest.UnmarshalBinary(value.data) | ||
return dest.UnmarshalBinary(value.data, value.expire) | ||
} | ||
|
||
type valWithLevel struct { | ||
|
@@ -525,7 +554,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi | |
// probably boring (normal task movement), so not | ||
// worth logging I imagine. | ||
} | ||
data, err := g.getLocally(ctx, key, dest) | ||
data, expTm, err := g.getLocally(ctx, key, dest) | ||
if err != nil { | ||
g.Stats.BackendLoadErrors.Add(1) | ||
g.recordStats(ctx, nil, MBackendLoadErrors.M(1)) | ||
|
@@ -535,7 +564,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi | |
g.Stats.CoalescedBackendLoads.Add(1) | ||
g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1)) | ||
destPopulated = true // only one caller of load gets this return value | ||
value = newValWithStat(data, nil) | ||
value = newValWithStat(data, nil, expTm) | ||
g.populateCache(ctx, key, value, &g.mainCache) | ||
return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil | ||
}) | ||
|
@@ -548,22 +577,23 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi | |
return | ||
} | ||
|
||
func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, error) { | ||
func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, time.Time, error) { | ||
err := g.getter.Get(ctx, key, dest) | ||
if err != nil { | ||
return nil, err | ||
return nil, time.Time{}, err | ||
} | ||
return dest.MarshalBinary() | ||
} | ||
|
||
func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (*valWithStat, error) { | ||
data, err := peer.Fetch(ctx, g.name, key) | ||
data, err := peer.Fetch(ctx, g.name, []string{key}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
expire := data[0].TTL | ||
vi, ok := g.candidateCache.get(key) | ||
if !ok { | ||
vi = g.addNewToCandidateCache(key) | ||
vi = g.addNewToCandidateCache(key, expire) | ||
} | ||
|
||
g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update | ||
|
@@ -573,7 +603,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string | |
KeyQPS: kStats.val(), | ||
HCStats: g.hcStatsWithTime.hcs, | ||
} | ||
value := newValWithStat(data, kStats) | ||
value := newValWithStat(data[0].Data, kStats, expire) | ||
if g.opts.promoter.ShouldPromote(key, value.data, stats) { | ||
g.populateCache(ctx, key, value, &g.hotCache) | ||
} | ||
|
@@ -627,7 +657,7 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt | |
} | ||
|
||
func (g *Galaxy) recordStats(ctx context.Context, mutators []tag.Mutator, measurements ...stats.Measurement) { | ||
stats.RecordWithOptions( | ||
_ = stats.RecordWithOptions( | ||
ctx, | ||
stats.WithMeasurements(measurements...), | ||
stats.WithTags(mutators...), | ||
|
@@ -692,8 +722,9 @@ func (c *cache) stats() CacheStats { | |
} | ||
|
||
type valWithStat struct { | ||
data []byte | ||
stats *keyStats | ||
data []byte | ||
stats *keyStats | ||
expire time.Time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bit memory-inefficient to add the expiration time to both to the LRU's value struct and the value we're storing inside the LRU. |
||
} | ||
|
||
// sizeOfValWithStats returns the total size of the value in the hot/main | ||
|
@@ -704,21 +735,21 @@ func (v *valWithStat) size() int64 { | |
return int64(unsafe.Sizeof(*v.stats)) + int64(len(v.data)) + int64(unsafe.Sizeof(v)) + int64(unsafe.Sizeof(*v)) | ||
} | ||
|
||
func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) { | ||
func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats, ttl time.Time)) { | ||
c.lru.OnEvicted = func(key lru.Key, value interface{}) { | ||
val := value.(*valWithStat) | ||
c.nbytes -= int64(len(key.(string))) + val.size() | ||
c.nevict++ | ||
if f != nil { | ||
f(key.(string), val.stats) | ||
f(key.(string), val.stats, val.expire) | ||
} | ||
} | ||
} | ||
|
||
func (c *cache) add(key string, value *valWithStat) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
c.lru.Add(key, value) | ||
c.lru.Add(key, value, value.expire) | ||
c.nbytes += int64(len(key)) + value.size() | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very disruptive way to side-channel the expiration time around that breaks all existing
Codec
implementations as well as allBackendGetter
implementations.