Skip to content

Commit

Permalink
make Contains also return the size (or -1 if unknown)
Browse files Browse the repository at this point in the history
In order to implement the remote asset API, we will need to check for
blobs and return a Digest. At the moment, our Contains function only
returns a boolean, but not the size of the item which is also required.

In most cases, we have the size information when Contains returns true,
but the value wasn't being plumbed through.

In case a proxy backend does not provide size information, return -1.
  • Loading branch information
mostynb committed Feb 14, 2020
1 parent 3159fdb commit 40bd979
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 31 deletions.
5 changes: 3 additions & 2 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type CacheProxy interface {
Get(kind EntryKind, hash string) (io.ReadCloser, int64, error)

// Contains returns whether or not the cache item exists on the
// remote end.
Contains(kind EntryKind, hash string) bool
// remote end, and the size if it exists (and -1 if the size is
// unknown).
Contains(kind EntryKind, hash string) (bool, int64)
}
36 changes: 25 additions & 11 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,32 +521,41 @@ func (c *DiskCache) Get(kind cache.EntryKind, hash string) (io.ReadCloser, int64
return nil, -1, err
}

// Contains returns true if the `hash` key exists in the cache.
// Contains returns true if the `hash` key exists in the cache, and
// the size if known (or -1 if unknown).
//
// If there is a local cache miss, the proxy backend (if there is
// one) will be checked.
func (c *DiskCache) Contains(kind cache.EntryKind, hash string) bool {
func (c *DiskCache) Contains(kind cache.EntryKind, hash string) (bool, int64) {

// The hash format is checked properly in the http/grpc code.
// Just perform a simple/fast check here, to catch bad tests.
if len(hash) != sha256HashStrSize {
return false
return false, int64(-1)
}

var foundLocally bool
size := int64(-1)

c.mu.Lock()
val, found := c.lru.Get(cacheKey(kind, hash))
// Uncommitted (i.e. uploading items) should be reported as not ok
foundLocally := found && val.(*lruItem).committed
if found {
item := val.(*lruItem)
foundLocally = item.committed
size = item.size
}
c.mu.Unlock()

if foundLocally {
return true
return true, size
}

if c.proxy != nil {
return c.proxy.Contains(kind, hash)
}

return false
return false, int64(-1)
}

// MaxSize returns the maximum cache size in bytes.
Expand Down Expand Up @@ -608,7 +617,8 @@ func (c *DiskCache) GetValidatedActionResult(hash string) (*pb.ActionResult, []b

for _, f := range result.OutputFiles {
if len(f.Contents) == 0 && f.Digest.SizeBytes > 0 {
if !c.Contains(cache.CAS, f.Digest.Hash) {
found, _ := c.Contains(cache.CAS, f.Digest.Hash)
if !found {
return nil, nil, nil // aka "not found"
}
}
Expand Down Expand Up @@ -645,7 +655,8 @@ func (c *DiskCache) GetValidatedActionResult(hash string) (*pb.ActionResult, []b
if f.Digest == nil {
continue
}
if !c.Contains(cache.CAS, f.Digest.Hash) {
found, _ := c.Contains(cache.CAS, f.Digest.Hash)
if !found {
return nil, nil, nil // aka "not found"
}
}
Expand All @@ -655,21 +666,24 @@ func (c *DiskCache) GetValidatedActionResult(hash string) (*pb.ActionResult, []b
if f.Digest == nil {
continue
}
if !c.Contains(cache.CAS, f.Digest.Hash) {
found, _ := c.Contains(cache.CAS, f.Digest.Hash)
if !found {
return nil, nil, nil // aka "not found"
}
}
}
}

if result.StdoutDigest != nil && result.StdoutDigest.SizeBytes > 0 {
if !c.Contains(cache.CAS, result.StdoutDigest.Hash) {
found, _ := c.Contains(cache.CAS, result.StdoutDigest.Hash)
if !found {
return nil, nil, nil // aka "not found"
}
}

if result.StderrDigest != nil && result.StderrDigest.SizeBytes > 0 {
if !c.Contains(cache.CAS, result.StderrDigest.Hash) {
found, _ := c.Contains(cache.CAS, result.StderrDigest.Hash)
if !found {
return nil, nil, nil // aka "not found"
}
}
Expand Down
33 changes: 24 additions & 9 deletions cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestCacheExistingFiles(t *testing.T) {
if err != nil {
t.Fatal(err)
}
found := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd")
found, _ := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd")
if found {
t.Fatalf("%s should have been evicted", items[0])
}
Expand Down Expand Up @@ -361,13 +361,20 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) {
if numItems != 3 {
t.Fatalf("Expected test cache size 3 but was %d", numItems)
}
if !testCache.Contains(cache.AC, acHash) {

var found bool
found, _ = testCache.Contains(cache.AC, acHash)
if !found {
t.Fatalf("Expected cache to contain AC entry '%s'", acHash)
}
if !testCache.Contains(cache.CAS, casHash1) {

found, _ = testCache.Contains(cache.CAS, casHash1)
if !found {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash1)
}
if !testCache.Contains(cache.CAS, casHash2) {

found, _ = testCache.Contains(cache.CAS, casHash2)
if !found {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash2)
}
}
Expand Down Expand Up @@ -399,13 +406,21 @@ func TestLoadExistingEntries(t *testing.T) {
t.Fatalf("Expected test cache size %d but was %d",
numBlobs, numItems)
}
if !testCache.Contains(cache.AC, acHash) {

var found bool

found, _ = testCache.Contains(cache.AC, acHash)
if !found {
t.Fatalf("Expected cache to contain AC entry '%s'", acHash)
}
if !testCache.Contains(cache.CAS, casHash) {

found, _ = testCache.Contains(cache.CAS, casHash)
if !found {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash)
}
if !testCache.Contains(cache.RAW, rawHash) {

found, _ = testCache.Contains(cache.RAW, rawHash)
if !found {
t.Fatalf("Expected cache to contain RAW entry '%s'", rawHash)
}
}
Expand Down Expand Up @@ -572,7 +587,7 @@ func TestHttpProxyBackend(t *testing.T) {
// Confirm that it does not contain the item we added to the
// first testCache and the proxy backend.

found := testCache.Contains(cache.CAS, casHash)
found, _ := testCache.Contains(cache.CAS, casHash)
if found {
t.Fatalf("Expected the cache not to contain %s", casHash)
}
Expand All @@ -588,7 +603,7 @@ func TestHttpProxyBackend(t *testing.T) {
// Add the proxy backend and check that we can Get the item.
testCache.proxy = proxy

found = testCache.Contains(cache.CAS, casHash)
found, _ = testCache.Contains(cache.CAS, casHash)
if !found {
t.Fatalf("Expected the cache to contain %s (via the proxy)",
casHash)
Expand Down
6 changes: 3 additions & 3 deletions cache/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,16 @@ func (r *remoteHTTPProxyCache) Get(kind cache.EntryKind, hash string) (io.ReadCl
return rsp.Body, sizeBytes, err
}

func (r *remoteHTTPProxyCache) Contains(kind cache.EntryKind, hash string) bool {
func (r *remoteHTTPProxyCache) Contains(kind cache.EntryKind, hash string) (bool, int64) {

url := requestURL(r.baseURL, hash, kind)

rsp, err := r.remote.Head(url)
if err == nil && rsp.StatusCode == http.StatusOK {
return true
return true, rsp.ContentLength
}

return false
return false, int64(-1)
}

func requestURL(baseURL *url.URL, hash string, kind cache.EntryKind) string {
Expand Down
10 changes: 7 additions & 3 deletions cache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ func (c *s3Cache) Get(kind cache.EntryKind, hash string) (io.ReadCloser, int64,
return object, info.Size, nil
}

func (c *s3Cache) Contains(kind cache.EntryKind, hash string) bool {
func (c *s3Cache) Contains(kind cache.EntryKind, hash string) (bool, int64) {
size := int64(-1)

_, err := c.mcore.StatObject(
s, err := c.mcore.StatObject(
c.bucket, // bucketName
c.objectKey(hash, kind), // objectName
minio.StatObjectOptions{}, // opts
Expand All @@ -168,8 +169,11 @@ func (c *s3Cache) Contains(kind cache.EntryKind, hash string) bool {
exists := (err == nil)
if err != nil {
err = errNotFound
} else {
size = s.Size
}

logResponse(c.accessLogger, "CONTAINS", c.bucket, c.objectKey(hash, kind), err)

return exists
return exists, size
}
3 changes: 2 additions & 1 deletion server/grpc_ac.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest,
}
}

if !s.cache.Contains(cache.CAS, (*digest).Hash) {
found, _ := s.cache.Contains(cache.CAS, (*digest).Hash)
if !found {
err := s.cache.Put(cache.CAS, (*digest).Hash, (*digest).SizeBytes,
bytes.NewReader(*slice))
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion server/grpc_cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (s *grpcServer) FindMissingBlobs(ctx context.Context,
continue
}

if !s.cache.Contains(cache.CAS, hash) {
found, _ := s.cache.Contains(cache.CAS, hash)
if !found {
s.accessLogger.Printf("GRPC CAS HEAD %s NOT FOUND", hash)
resp.MissingBlobDigests = append(resp.MissingBlobDigests, digest)
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) {

// Unvalidated path:

ok := h.cache.Contains(kind, hash)
ok, _ := h.cache.Contains(kind, hash)
if !ok {
http.Error(w, "Not found", http.StatusNotFound)
h.logResponse(http.StatusNotFound, r)
Expand Down

0 comments on commit 40bd979

Please sign in to comment.