From c1064cf14ff93a3943fd558058c84ed8e2395cd1 Mon Sep 17 00:00:00 2001 From: Pontus Freyhult Date: Fri, 17 Jan 2025 10:38:17 +0100 Subject: [PATCH 1/2] Fix weird dot in released archive file name which was really whitespace --- .goreleaser.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 670edc0..0f4a172 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -32,7 +32,7 @@ archives: - sdafs name_template: >- sdafs_ - {{ .Tag }}_ + {{- .Version }}_ {{- title .Os }}_ {{- if eq .Arch "amd64" }}x86_64 {{- else if eq .Arch "386" }}i386 @@ -46,7 +46,7 @@ archives: - csi-driver name_template: >- csi-driver_ - {{ .Tag }}_ + {{- .Version }}_ {{- title .Os }}_ {{- if eq .Arch "amd64" }}x86_64 {{- else if eq .Arch "386" }}i386 From ef938885eeb42d5bd7c8792c027840cf2a4c933d Mon Sep 17 00:00:00 2001 From: Pontus Freyhult Date: Fri, 17 Jan 2025 10:44:11 +0100 Subject: [PATCH 2/2] Changed cache, now uses ristretto to provide proper cache limiting and other better management. Added credentials file rerad. --- cmd/sdafs/main.go | 15 +- cmd/sdafs/main_test.go | 27 +++ go.mod | 11 +- go.sum | 24 ++- internal/httpreader/httpreader.go | 247 +++++++++++++------------ internal/httpreader/httpreader_test.go | 53 +++--- internal/sdafs/sdafs.go | 112 ++++++++--- 7 files changed, 312 insertions(+), 177 deletions(-) diff --git a/cmd/sdafs/main.go b/cmd/sdafs/main.go index 92d3f31..c497164 100644 --- a/cmd/sdafs/main.go +++ b/cmd/sdafs/main.go @@ -10,6 +10,8 @@ import ( "os" "os/signal" + "github.com/pbnjay/memory" + "github.com/NBISweden/sdafs/internal/sdafs" "github.com/jacobsa/fuse" "github.com/sevlyar/go-daemon" @@ -19,6 +21,8 @@ var credentialsFile, rootURL, logFile string var foreground, open bool var maxRetries uint var chunkSize uint +var cacheSize uint +var cacheMemPerCent uint var logLevel int var Version string = "development" @@ -61,6 +65,8 @@ func getConfigs() mainConfig { flag.UintVar(&chunkSize, "chunksize", 5120, "Chunk size (in kb) used when fetching data. "+ "Higher values likely to give better throughput but higher latency. Min 64 Max 65536.") flag.IntVar(&logLevel, "loglevel", 0, "Loglevel, specified as per https://pkg.go.dev/log/slog#Level") + flag.UintVar(&cacheSize, "cachesize", 0, "Cache size (in mb), overrides percent if set") + flag.UintVar(&cacheMemPerCent, "cachemempercent", 8, "Cache size (in % of process visible RAM)") flag.Parse() @@ -93,7 +99,7 @@ func getConfigs() mainConfig { RootURL: rootURL, CredentialsFile: credentialsFile, SkipLevels: 0, - ChunkSize: int(chunkSize), + ChunkSize: uint64(chunkSize), MaxRetries: int(maxRetries), } @@ -105,6 +111,13 @@ func getConfigs() mainConfig { conf.FilePerms = 0444 } + if cacheSize == 0 { + total := memory.TotalMemory() + conf.CacheSize = total * uint64(cacheMemPerCent) / 100 + } else { + conf.CacheSize = uint64(cacheSize * 1024 * 1024) + } + m := mainConfig{mountPoint: mountPoint, sdafsconf: &conf, foreground: foreground, diff --git a/cmd/sdafs/main_test.go b/cmd/sdafs/main_test.go index 41dd353..2fee594 100644 --- a/cmd/sdafs/main_test.go +++ b/cmd/sdafs/main_test.go @@ -128,6 +128,33 @@ func TestConfOptions(t *testing.T) { assert.Equal(t, os.FileMode(0444), c.sdafsconf.FilePerms, "Not default value for fileperms as expected") + defaultCacheWithMem := c.sdafsconf.CacheSize + + os.Args = []string{"binary", "-cachesize", "10", "mount6"} + flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError) + flag.Parse() + c = getConfigs() + + assert.Equal(t, "mount6", c.mountPoint, + "Didn't pick up expected mountpoint") + + assert.Equal(t, uint64(10*1024*1024), c.sdafsconf.CacheSize, + "Did not see expected cache size") + + os.Args = []string{"binary", "-cachemempercent", "90", "mount7"} + flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError) + flag.Parse() + c = getConfigs() + + assert.Equal(t, "mount7", c.mountPoint, + "Didn't pick up expected mountpoint") + + assert.Less(t, uint64(0), c.sdafsconf.CacheSize, + "Cache size not picked up from memory as expected ") + + assert.Greater(t, c.sdafsconf.CacheSize, defaultCacheWithMem, + "Cache with 90% of RAM not larger than with default (8%)") + os.Args = safeArgs } diff --git a/go.mod b/go.mod index c4c39dd..4c4c3f9 100644 --- a/go.mod +++ b/go.mod @@ -4,34 +4,39 @@ go 1.23.2 require ( github.com/container-storage-interface/spec v1.11.0 + github.com/dgraph-io/ristretto/v2 v2.1.0 github.com/jacobsa/fuse v0.0.0-20241025064006-8ccd61173b05 github.com/jarcoal/httpmock v1.3.1 github.com/neicnordic/crypt4gh v1.13.0 + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/sevlyar/go-daemon v0.1.6 github.com/stretchr/testify v1.10.0 github.com/tj/assert v0.0.3 google.golang.org/grpc v1.69.4 - google.golang.org/protobuf v1.36.2 + google.golang.org/protobuf v1.36.3 gopkg.in/ini.v1 v1.67.0 k8s.io/klog/v2 v2.130.1 - k8s.io/kubelet v0.32.0 + k8s.io/kubelet v0.32.1 ) require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dchest/bcrypt_pbkdf v0.0.0-20150205184540-83f37f9c154a // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect github.com/kr/pretty v0.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4fca721..9263c69 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/container-storage-interface/spec v1.11.0 h1:H/YKTOeUZwHtyPOr9raR+HgFmGluGCklulxDYxSdVNM= github.com/container-storage-interface/spec v1.11.0/go.mod h1:DtUvaQszPml1YJfIK7c00mlv6/g4wNMLanLgiUbKFRI= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -9,6 +11,12 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dchest/bcrypt_pbkdf v0.0.0-20150205184540-83f37f9c154a h1:saTgr5tMLFnmy/yg3qDTft4rE5DY2uJ/cCxCe3q0XTU= github.com/dchest/bcrypt_pbkdf v0.0.0-20150205184540-83f37f9c154a/go.mod h1:Bw9BbhOJVNR+t0jCqx2GC6zv0TGBsShs56Y3gfSCvl0= +github.com/dgraph-io/ristretto/v2 v2.1.0 h1:59LjpOJLNDULHh8MC4UaegN52lC4JnO2dITsie/Pa8I= +github.com/dgraph-io/ristretto/v2 v2.1.0/go.mod h1:uejeqfYXpUomfse0+lO+13ATz4TypQYLJZzBSAemuB4= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -42,7 +50,11 @@ github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04 github.com/maxatome/go-testdeep v1.12.0/go.mod h1:lPZc/HAcJMP92l7yI6TRz1aZN5URwUBUAfUNvrclaNM= github.com/neicnordic/crypt4gh v1.13.0 h1:NbSAPx1+zFpG6a8GCVwW80y/TGHfGdXJF/zqQKqlHZ8= github.com/neicnordic/crypt4gh v1.13.0/go.mod h1:lfNIrhlcQrSf5awgCaW+poCsRBlvKOrNjR3CBvXU5Ek= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= +github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -104,12 +116,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= -google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -121,5 +133,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kubelet v0.32.0 h1:uLyiKlz195Wo4an/K2tyge8o3QHx0ZkhVN3pevvp59A= -k8s.io/kubelet v0.32.0/go.mod h1:lAwuVZT/Hm7EdLn0jW2D+WdrJoorjJL2rVSdhOFnegw= +k8s.io/kubelet v0.32.1 h1:bB91GvMsZb+LfzBxnjPEr1Fal/sdxZtYphlfwAaRJGw= +k8s.io/kubelet v0.32.1/go.mod h1:4sAEZ6PlewD0GroV3zscY7llym6kmNNTVmUI/Qshm6w= diff --git a/internal/httpreader/httpreader.go b/internal/httpreader/httpreader.go index d439217..1255032 100644 --- a/internal/httpreader/httpreader.go +++ b/internal/httpreader/httpreader.go @@ -12,14 +12,19 @@ import ( "strings" "sync" "time" + + "github.com/dgraph-io/ristretto/v2" ) +const cacheLifeTime = 4 * time.Hour + type Conf struct { Token string Client *http.Client Headers *http.Header MaxRetries int - ChunkSize int + ChunkSize uint64 + CacheSize uint64 } type Request struct { @@ -31,8 +36,13 @@ const traceLevel = -12 // TODO: Make something better that does TTLing -var cache map[string][]CacheBlock -var cacheLock sync.Mutex +// type cacheKey struct { +// fileURL string +// offset uint64 +// } + +var cache *ristretto.Cache[string, *CacheBlock] + var prefetches map[string][]uint64 var prefetchLock sync.Mutex @@ -47,23 +57,27 @@ type CacheBlock struct { data []byte } -func (r *HTTPReader) initCache() { +func (r *HTTPReader) init() { once.Do(func() { + var err error // Make the map if not done yet - cache = make(map[string][]CacheBlock) - prefetches = make(map[string][]uint64) + config := ristretto.Config[string, *CacheBlock]{ + NumCounters: int64(r.conf.CacheSize / r.prefetchSize() * 10), + MaxCost: int64(r.conf.CacheSize), + BufferItems: 64, + } + cache, err = ristretto.NewCache(&config) + if err != nil { + slog.Error("Couldn't set up cache, running without, this will "+ + "impact performance impact", + "error", err) + } + prefetches = make(map[string][]uint64) }) - cacheLock.Lock() - _, found := cache[r.fileURL] - if !found { - cache[r.fileURL] = make([]CacheBlock, 0, 32) - } - cacheLock.Unlock() - prefetchLock.Lock() - _, found = prefetches[r.fileURL] + _, found := prefetches[r.fileURL] if !found { prefetches[r.fileURL] = make([]uint64, 0, 32) } @@ -80,6 +94,8 @@ type HTTPReader struct { id uint64 } +// NewHTTPReader allocates a unique id and returns a HTTPReader from the +// configuration struct (expected to be common) and the request func NewHTTPReader(conf *Conf, request *Request, ) (*HTTPReader, error) { idLock.Lock() @@ -93,40 +109,23 @@ func NewHTTPReader(conf *Conf, request *Request, nextID += 1 idLock.Unlock() - reader.initCache() + reader.init() return reader, nil } +// Close is a no-op func (r *HTTPReader) Close() (err error) { slog.Log(context.Background(), traceLevel, - "Pruning cache", + "Close", "url", r.fileURL, "id", r.id) return nil } -func (r *HTTPReader) pruneCache() { - slog.Log(context.Background(), - traceLevel, - "Pruning cache", - "url", r.fileURL, - "id", r.id) - - cacheLock.Lock() - defer cacheLock.Unlock() - - if len(cache[r.fileURL]) < 16 { - return - } - - // Prune the cache - keepfrom := len(cache[r.fileURL]) - 8 - cache[r.fileURL] = cache[r.fileURL][keepfrom:] -} - +// prefetchSzie returns the prefetch size to use, sanity chcked func (r *HTTPReader) prefetchSize() uint64 { if r.conf.ChunkSize < 64 { return 64 * 1024 @@ -134,6 +133,7 @@ func (r *HTTPReader) prefetchSize() uint64 { return uint64(r.conf.ChunkSize * 1024) } +// doFetch fetches the specified range of the url from the HTTPReader func (r *HTTPReader) doFetch(rangeSpec string) ([]byte, error) { slog.Log(context.Background(), traceLevel, @@ -146,7 +146,7 @@ func (r *HTTPReader) doFetch(rangeSpec string) ([]byte, error) { if rangeSpec != "" { // Archive being broken regarding ranges for now, - // use query parameters instead. + // use query parameters as well to trigger partial file delivery byteRange := strings.TrimPrefix(rangeSpec, "bytes=") byteRanges := strings.Split(byteRange, "-") @@ -227,34 +227,45 @@ func (r *HTTPReader) doFetch(rangeSpec string) ([]byte, error) { return dat, err } +func (r *HTTPReader) getCacheKey(offset uint64) string { + return fmt.Sprintf("%s%c%d", + r.fileURL, + '\x00', + offset/r.prefetchSize()) +} + func (r *HTTPReader) isInCache(offset uint64) bool { + key := r.getCacheKey(offset) + slog.Log(context.Background(), traceLevel, "Checking if offset is in cache", "url", r.fileURL, "id", r.id, - "offset", offset) - cacheLock.Lock() - defer cacheLock.Unlock() - // Check if we have the data in cache - for _, p := range cache[r.fileURL] { - if offset >= p.start && offset < p.start+p.length { - // At least part of the data is here - slog.Log(context.Background(), - traceLevel, - "Offset found in cache", - "url", r.fileURL, - "id", r.id, - "offset", offset) - return true - } + "offset", offset, + "key", key) + + cache.Wait() + entry, found := cache.Get(r.getCacheKey(offset)) + + if found && offset >= entry.start && offset < (entry.start+entry.length) { + + slog.Log(context.Background(), + traceLevel, + "Offset found in cache", + "url", r.fileURL, + "id", r.id, + "offset", offset, + "key", key) + return true } slog.Log(context.Background(), traceLevel, "Offset not found in cache", "url", r.fileURL, - "offset", offset) + "offset", offset, + "key", key) return false } @@ -265,12 +276,11 @@ func (r *HTTPReader) prefetchAt(waitBefore time.Duration, offset uint64) { "Doing prefetch", "url", r.fileURL, "offset", offset) - if waitBefore.Seconds() > 0 { + + if waitBefore > 0 { time.Sleep(waitBefore) } - r.pruneCache() - if offset >= r.objectSize { // Don't try to prefetch what doesn't exist return @@ -299,8 +309,8 @@ func (r *HTTPReader) prefetchAt(waitBefore time.Duration, offset uint64) { if err != nil { - if waitBefore == 0*time.Second { - waitBefore = 1 + if waitBefore == 0 { + waitBefore = 1 * time.Second } else { waitBefore = 2 * waitBefore } @@ -312,7 +322,7 @@ func (r *HTTPReader) prefetchAt(waitBefore time.Duration, offset uint64) { return } - r.addToCache(CacheBlock{offset, uint64(len(prefetchedData)), prefetchedData}) + r.addToCache(&CacheBlock{offset, uint64(len(prefetchedData)), prefetchedData}) } func (r *HTTPReader) Seek(offset int64, whence int) (int64, error) { @@ -370,23 +380,27 @@ func (r *HTTPReader) Seek(offset int64, whence int) (int64, error) { } // addToCache adds a prefetch to the list of outstanding prefetches once it's no longer active -func (r *HTTPReader) addToCache(cacheBlock CacheBlock) { - cacheLock.Lock() - defer cacheLock.Unlock() - - if len(cache[r.fileURL]) > 16 { - // Don't cache anything more right now - return - } - - // Store in cache +func (r *HTTPReader) addToCache(cacheBlock *CacheBlock) { + slog.Log(context.Background(), + traceLevel, + "Adding to cache", + "url", r.fileURL, + "offset", cacheBlock.start, + "length", cacheBlock.length, + "key", r.getCacheKey(cacheBlock.start)) - cache[r.fileURL] = append(cache[r.fileURL], cacheBlock) + key := r.getCacheKey(cacheBlock.start) + cache.SetWithTTL(key, cacheBlock, int64(cacheBlock.length), cacheLifeTime) } // addToOutstanding adds a prefetch to the list of outstanding prefetches once it's no longer active func (r *HTTPReader) addToOutstanding(toAdd uint64) { + slog.Log(context.Background(), + traceLevel, + "Adding to outstanding", + "url", r.fileURL, + "offset", toAdd) prefetchLock.Lock() prefetches[r.fileURL] = append(prefetches[r.fileURL], toAdd) prefetchLock.Unlock() @@ -394,6 +408,12 @@ func (r *HTTPReader) addToOutstanding(toAdd uint64) { // removeFromOutstanding removes a prefetch from the list of outstanding prefetches once it's no longer active func (r *HTTPReader) removeFromOutstanding(toRemove uint64) { + slog.Log(context.Background(), + traceLevel, + "Removing from outstanding", + "url", r.fileURL, + "offset", toRemove) + prefetchLock.Lock() defer prefetchLock.Unlock() @@ -439,25 +459,6 @@ func (r *HTTPReader) isPrefetching(offset uint64) bool { return false } -func (r *HTTPReader) doRequest() (*http.Response, error) { - - req, err := http.NewRequest("GET", r.fileURL, nil) - if err != nil { - return nil, fmt.Errorf( - "Couldn't make request for %s: %v", - r.fileURL, err) - } - - if r.conf.Headers != nil { - for h := range *r.conf.Headers { - req.Header.Add(h, r.conf.Headers.Get(h)) - } - } - - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.conf.Token)) - return r.conf.Client.Do(req) -} - func (r *HTTPReader) Read(dst []byte) (n int, err error) { r.lock.Lock() @@ -485,36 +486,54 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) { time.Sleep(1 * time.Millisecond) } - if r.isInCache(start) { + key := r.getCacheKey(start) + slog.Log(context.Background(), + traceLevel, + "Checking cache", + "url", r.fileURL, + "id", r.id, + "offset", start, + "key", key) + + // Worth waiting for quiescence if we can skip a fetch + cache.Wait() + p, found := cache.Get(key) + if found && + (start >= p.start && start < p.start+p.length) { + // At least part of the data is here + + slog.Log(context.Background(), + traceLevel, + "Hit in cache", + "url", r.fileURL, + "id", r.id, + "offset", start, + "key", key, + "cachestart", p.start, + "cacheend", p.start+p.length) + + offsetInBlock := start - p.start + + // Pull out wanted data (as much as we have) + n = copy(dst, p.data[offsetInBlock:]) r.lock.Lock() - cacheLock.Lock() - - defer r.lock.Unlock() - defer cacheLock.Unlock() - - // Walk through the cache - for _, p := range cache[r.fileURL] { - if start >= p.start && start < p.start+p.length { - // At least part of the data is here - - offsetInBlock := start - p.start - - // Pull out wanted data (as much as we have) - n = copy(dst, p.data[offsetInBlock:]) - r.currentOffset = start + uint64(n) // #nosec G115 + r.currentOffset = start + uint64(n) // #nosec G115 + r.lock.Unlock() - // Prefetch the next bit - go r.prefetchAt(0*time.Second, r.currentOffset) - - return n, nil - } - } + // Prefetch the next bit + go r.prefetchAt(0*time.Second, r.currentOffset) - // Expected in cache but not found, bail out and hope for better luck - // on retry - return 0, nil + return n, nil } + slog.Log(context.Background(), + traceLevel, + "Not found in cache, need to read", + "url", r.fileURL, + "id", r.id, + "offset", start, + "key", key) + // Not found in cache, need to fetch data wantedRange := fmt.Sprintf("bytes=%d-%d", r.currentOffset, min(r.currentOffset+r.prefetchSize()-1, r.objectSize-1)) @@ -541,7 +560,7 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) { // Add to cache cacheBytes := bytes.Clone(data) - r.addToCache(CacheBlock{start, uint64(len(cacheBytes)), cacheBytes}) + r.addToCache(&CacheBlock{start, uint64(len(cacheBytes)), cacheBytes}) b := bytes.NewBuffer(data) n, err = b.Read(dst) diff --git a/internal/httpreader/httpreader_test.go b/internal/httpreader/httpreader_test.go index 2f70c50..0ba0042 100644 --- a/internal/httpreader/httpreader_test.go +++ b/internal/httpreader/httpreader_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "log/slog" "net/http" "slices" "testing" @@ -91,6 +92,7 @@ func useConf() *Conf { Client: http.DefaultClient, Headers: clientPublicKeyHeader(), MaxRetries: 7, + CacheSize: 24 * 1024 * 1024, Token: "token"} return &c @@ -199,6 +201,7 @@ func TestHTTPReader(t *testing.T) { func TestHTTPReaderPrefetches(t *testing.T) { // Some special tests here, messing with internals to expose behaviour + slog.SetLogLoggerLevel(-100) httpmock.Activate() defer httpmock.DeactivateAndReset() @@ -232,22 +235,18 @@ func TestHTTPReaderPrefetches(t *testing.T) { assert.NotNil(t, reader, "unexpected error when creating reader") s := reader - + cache.Clear() s.prefetchAt(0*time.Second, 0) - assert.Equal(t, 1, len(cache[url]), "nothing cached after prefetch") - // Clear cache - cache[url] = cache[url][:0] + cache.Wait() + _, found := cache.Get(s.getCacheKey(0)) + assert.Equal(t, true, found, "value not cached after prefetch") - prefetches[url] = []uint64{} - t.Logf("Cache %v, outstanding %v", cache[url], prefetches[url]) + // Clear cache + cache.Clear() - for i := 0; i < 30; i++ { - cache[url] = append(cache[url], CacheBlock{90000000, uint64(0), nil}) - } + prefetches[url] = []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} s.prefetchAt(0*time.Second, 0) - assert.Equal(t, 9, len(cache[url]), "unexpected length of cache after prefetch") - prefetches[url] = []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} s.removeFromOutstanding(9) assert.Equal(t, prefetches[url], []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8}, "unexpected outstanding prefetches after remove") s.removeFromOutstanding(19) @@ -259,6 +258,10 @@ func TestHTTPReaderPrefetches(t *testing.T) { } func TestHTTPReaderFailures(t *testing.T) { + if testing.Short() { + return + } + httpmock.Activate() defer httpmock.DeactivateAndReset() @@ -356,7 +359,7 @@ func testDoRequestResponder(r *http.Request) (*http.Response, error) { return &resp, nil } -func TestDoRequest(t *testing.T) { +func TestDoFetch(t *testing.T) { httpmock.Activate() defer httpmock.DeactivateAndReset() @@ -370,37 +373,25 @@ func TestDoRequest(t *testing.T) { conf: c, fileURL: checkurl, } - resp, err := r.doRequest() - - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected status code for doRequest") + resp, err := r.doFetch("bytes=0-10") assert.Nil(t, err, "Unexpected error from doRequest") - msg, err := io.ReadAll(resp.Body) - assert.Equal(t, []byte("Auth:Bearer token A: B: "), msg, "Unexpected headers from doRequest") + assert.Equal(t, []byte("Auth:Bearer token A: B: "), resp, "Unexpected headers from doRequest") assert.Nil(t, err, "Unexpected error from doRequest") h := http.Header{} r.conf.Headers = &h h.Add("HeaderA", "SomeGoose") - resp, err = r.doRequest() - - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected status code for doRequest") - assert.Nil(t, err, "Unexpected error from doRequest") + resp, err = r.doFetch("bytes=0-10") - msg, err = io.ReadAll(resp.Body) - assert.Equal(t, []byte("Auth:Bearer token A:SomeGoose B: "), msg, "Unexpected headers from doRequest with A header") - assert.Nil(t, err, "Unexpected error from doRequest") + assert.Nil(t, err, "Unexpected error from doFetch") + assert.Equal(t, []byte("Auth:Bearer token A:SomeGoose B: "), resp, "Unexpected headers from doRequest with A header") h.Add("HeaderB", "SomeELSE") - resp, err = r.doRequest() - - assert.Equal(t, http.StatusOK, resp.StatusCode, "Unexpected status code for doRequest with A and B headers") - assert.Nil(t, err, "Unexpected error from doRequest") - - msg, err = io.ReadAll(resp.Body) - assert.Equal(t, []byte("Auth:Bearer token A:SomeGoose B:SomeELSE "), msg, "Unexpected headers from doRequest") + resp, err = r.doFetch("bytes=0-10") assert.Nil(t, err, "Unexpected error from doRequest") + assert.Equal(t, []byte("Auth:Bearer token A:SomeGoose B:SomeELSE "), resp, "Unexpected headers from doRequest") } diff --git a/internal/sdafs/sdafs.go b/internal/sdafs/sdafs.go index 025842e..4808c2a 100644 --- a/internal/sdafs/sdafs.go +++ b/internal/sdafs/sdafs.go @@ -30,6 +30,8 @@ import ( "gopkg.in/ini.v1" ) +const connectionCheckDelay = 1 * time.Second + // SDAfs is the main structure to keep track of our SDA connection type SDAfs struct { fuseutil.NotImplementedFileSystem @@ -46,12 +48,15 @@ type SDAfs struct { FilePerms os.FileMode inodes map[fuseops.InodeID]*inode - startTime time.Time - client *http.Client - maplock sync.Mutex - nextInode fuseops.InodeID - handles map[fuseops.HandleID]io.ReadSeekCloser - keyHeader http.Header + startTime time.Time + client *http.Client + maplock sync.Mutex + nextInode fuseops.InodeID + handles map[fuseops.HandleID]io.ReadSeekCloser + extraHeader *http.Header + + httpReaderConf *httpreader.Conf + tokenLoadTime time.Time } // Conf holds the configuration @@ -70,7 +75,8 @@ type Conf struct { DirPerms os.FileMode FilePerms os.FileMode HTTPClient *http.Client - ChunkSize int + ChunkSize uint64 + CacheSize uint64 SessionCookieName string } @@ -133,6 +139,8 @@ func (s *SDAfs) readToken() error { return fmt.Errorf("No configuration provided") } + s.tokenLoadTime = time.Now() + f, err := ini.Load(s.conf.CredentialsFile) if err != nil { @@ -144,6 +152,7 @@ func (s *SDAfs) readToken() error { for _, section := range f.Sections() { if section.HasKey("access_token") { s.token = section.Key("access_token").String() + s.httpReaderConf.Token = s.token return nil } } @@ -170,9 +179,9 @@ func (s *SDAfs) doRequest(relPath, method string) (*http.Response, error) { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token)) // Add extra headers picked up, e.g. cookies - if s.keyHeader != nil { - for h := range s.keyHeader { - for _, v := range s.keyHeader.Values(h) { + if s.extraHeader != nil { + for h := range *s.extraHeader { + for _, v := range s.extraHeader.Values(h) { req.Header.Add(h, v) } } @@ -188,7 +197,6 @@ func (s *SDAfs) extractCookies(r *http.Response) { slog.Log(context.Background(), traceLevel, "extracting cookies for reuse", - "header", r.Header, ) setCookies := r.Header.Values("set-cookie") @@ -209,7 +217,11 @@ func (s *SDAfs) extractCookies(r *http.Response) { newCookies = newCookies + cookie } - s.keyHeader.Set("cookie", newCookies) + // We can't update directly since a Header is a map, but we can create + // a copy, update that and point to the new one. + newHeader := s.extraHeader.Clone() + newHeader.Set("cookie", newCookies) + s.extraHeader = &newHeader } func (s *SDAfs) getDatasets() error { @@ -276,9 +288,57 @@ func NewSDAfs(conf *Conf) (*SDAfs, error) { return nil, fmt.Errorf("error while verifying credentials: %v", err) } + go n.checkConnectionLoop() return n, nil } +func (s *SDAfs) checkConnectionLoop() { + for { + time.Sleep(connectionCheckDelay) + + stat, err := os.Stat(s.conf.CredentialsFile) + if err != nil { + slog.Error("Failed to stat credentials file", + "file", s.conf.CredentialsFile, + "error", err) + } + + // Credentials file hasn't been updated, even if the token no longer + // works, there's nothing we can do, so continue waiting. + if !stat.ModTime().After(s.tokenLoadTime) { + continue + } + + slog.Error("Credentials file changed, rereading", + "file", s.conf.CredentialsFile) + + safeToken := s.token + + err = s.readToken() + if err != nil { + // Failed, try to restore + s.token = safeToken + s.httpReaderConf.Token = safeToken + slog.Error("Failed to read token from credentials file", + "file", s.conf.CredentialsFile, + "error", err) + continue + } + + err = s.getDatasets() + if err != nil { + // The updated token was no better it seems, go back + s.token = safeToken + s.httpReaderConf.Token = safeToken + + slog.Error("Getting datasets failed after token reread, "+ + "reverting to previous token", + "error", err) + continue + } + } +} + func (s *SDAfs) getDatasetContents(datasetName string) ([]datasetFile, error) { rel := fmt.Sprintf("/metadata/datasets/%s/files", datasetName) r, err := s.doRequest(rel, "GET") @@ -326,6 +386,11 @@ func (s *SDAfs) getDatasetContents(datasetName string) ([]datasetFile, error) { func (s *SDAfs) createRoot() { + if len(s.inodes) > 0 { + // Already done + return + } + entries := make([]fuseutil.Dirent, 0) dirAttrs := fuseops.InodeAttributes{ @@ -630,8 +695,18 @@ func (s *SDAfs) setup() error { publicKeyEncoded := base64.StdEncoding.EncodeToString(w.Bytes()) s.publicC4GHkey = publicKeyEncoded - s.keyHeader = make(http.Header) - s.keyHeader.Add("Client-Public-Key", publicKeyEncoded) + header := make(http.Header) + s.extraHeader = &header + s.extraHeader.Add("Client-Public-Key", publicKeyEncoded) + + s.httpReaderConf = &httpreader.Conf{ + Token: s.token, + Client: s.client, + Headers: s.extraHeader, + MaxRetries: s.conf.MaxRetries, + ChunkSize: s.conf.ChunkSize, + CacheSize: s.conf.CacheSize, + } return nil } @@ -799,14 +874,7 @@ func (s *SDAfs) OpenFile( return fuse.EINVAL } - conf := httpreader.Conf{ - Token: s.token, - Client: s.client, - Headers: &s.keyHeader, - MaxRetries: s.conf.MaxRetries, - ChunkSize: s.conf.ChunkSize, - } - r, err := httpreader.NewHTTPReader(&conf, + r, err := httpreader.NewHTTPReader(s.httpReaderConf, &httpreader.Request{FileURL: s.getFileURL(in), ObjectSize: s.getTotalSize(in)}) if err != nil {