Skip to content

Commit

Permalink
Merge pull request #13 from NBISweden/retries_handling
Browse files Browse the repository at this point in the history
Retries handling
  • Loading branch information
pontus authored Jan 14, 2025
2 parents 45425bf + b9263d3 commit dbd9c66
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 62 deletions.
13 changes: 12 additions & 1 deletion cmd/sdafs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@ import (

var credentialsFile, rootURL, logFile string
var foreground, open bool
var maxRetries int

var Version string = "development"

// usage prints usage and version for the benefit of the user
func usage() {
fmt.Fprintf(flag.CommandLine.Output(),
"Usage: %s [FLAGS...] mountpoint\n\nSupported flags are:\n\n",
os.Args[0])
flag.PrintDefaults()
fmt.Printf("\nsdafs version: %s\n\n", Version)
os.Exit(1)
}

// mainConfig holds the configuration
type mainConfig struct {
mountPoint string
foreground bool
Expand All @@ -32,6 +38,8 @@ type mainConfig struct {
open bool
}

// mainConfig makes the configuration structure from whatever sources applies
// (currently command line flags only)
func getConfigs() mainConfig {
home := os.Getenv("HOME")

Expand All @@ -41,7 +49,8 @@ func getConfigs() mainConfig {
flag.StringVar(&logFile, "log", "", "File to send logs to instead of stderr,"+
" defaults to sdafs.log if detached, empty string means stderr which is default for foreground")

flag.BoolVar(&foreground, "foreground", false, "Do not detach")
flag.IntVar(&maxRetries, "maxretries", 7, "Max number retries for failed transfers. Retries will be done with some form of backoff")
flag.BoolVar(&foreground, "foreground", false, "Do not detach, run in foreground and send log output to stdout")
flag.BoolVar(&open, "open", false, "Set permissions allowing access by others than the user")

flag.Parse()
Expand Down Expand Up @@ -82,6 +91,7 @@ func getConfigs() mainConfig {
return m
}

// repointLog switches where the log goes if needed
func repointLog(m mainConfig) {
if m.logFile != "" {
f, err := os.OpenFile(m.logFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
Expand All @@ -95,6 +105,7 @@ func repointLog(m mainConfig) {
}
}

// detachIfNeeded daemonizes if needed
func detachIfNeeded(c mainConfig) {
if !c.foreground {
context := new(daemon.Context)
Expand Down
75 changes: 57 additions & 18 deletions internal/httpreader/httpreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log"
"math"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -61,19 +62,30 @@ type HTTPReader struct {
lock sync.Mutex
token string
extraHeaders *http.Header

maxRetries int
}

func NewHTTPReader(fileURL, token string, objectSize uint64, client *http.Client, headers *http.Header) (*HTTPReader, error) {
func NewHTTPReader(fileURL, token string,
objectSize uint64,
client *http.Client,
headers *http.Header,
maxRetries int) (*HTTPReader, error) {

if maxRetries == 0 {
maxRetries = 6
}

log.Printf("Creating reader for %v, object size %v", fileURL, objectSize)
reader := &HTTPReader{
client,
0,
fileURL,
objectSize,
sync.Mutex{},
token,
headers,
client: client,
currentOffset: 0,
fileURL: fileURL,
objectSize: objectSize,
lock: sync.Mutex{},
token: token,
extraHeaders: headers,
maxRetries: maxRetries,
}

reader.initCache()
Expand Down Expand Up @@ -137,6 +149,10 @@ func (r *HTTPReader) doFetch(rangeSpec string) ([]byte, error) {

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", r.token))

if rangeSpec != "" {
req.Header.Add("Range", rangeSpec)
}

if r.extraHeaders != nil {
for h := range *r.extraHeaders {
req.Header.Add(h, r.extraHeaders.Get(h))
Expand Down Expand Up @@ -187,7 +203,11 @@ func (r *HTTPReader) isInCache(offset uint64) bool {
return false
}

func (r *HTTPReader) prefetchAt(offset uint64) {
func (r *HTTPReader) prefetchAt(waitBefore time.Duration, offset uint64) {
if waitBefore.Seconds() > 0 {
time.Sleep(waitBefore)
}

r.pruneCache()

if offset >= r.objectSize {
Expand Down Expand Up @@ -217,6 +237,17 @@ func (r *HTTPReader) prefetchAt(offset uint64) {
r.removeFromOutstanding(offset)

if err != nil {

if waitBefore == 0*time.Second {
waitBefore = 1
} else {
waitBefore = 2 * waitBefore
}

if waitBefore < time.Duration(math.Pow(2, float64(r.maxRetries)))*time.Second {
r.prefetchAt(waitBefore, offset)
}

return
}

Expand All @@ -239,7 +270,7 @@ func (r *HTTPReader) Seek(offset int64, whence int) (int64, error) {
}

r.currentOffset = uint64(offset)
go r.prefetchAt(r.currentOffset)
go r.prefetchAt(0*time.Second, r.currentOffset)

return offset, nil

Expand All @@ -253,7 +284,7 @@ func (r *HTTPReader) Seek(offset int64, whence int) (int64, error) {
}

r.currentOffset = uint64(int64(r.currentOffset) + offset) // #nosec G115
go r.prefetchAt(r.currentOffset)
go r.prefetchAt(0*time.Second, r.currentOffset)

return int64(r.currentOffset), nil // #nosec G115

Expand All @@ -269,7 +300,7 @@ func (r *HTTPReader) Seek(offset int64, whence int) (int64, error) {
}

r.currentOffset = uint64(int64(r.objectSize) + offset) // #nosec G115
go r.prefetchAt(r.currentOffset)
go r.prefetchAt(0*time.Second, r.currentOffset)

return int64(r.currentOffset), nil // #nosec G115
}
Expand Down Expand Up @@ -403,7 +434,7 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) {
r.currentOffset = start + uint64(n) // #nosec G115

// Prefetch the next bit
go r.prefetchAt(r.currentOffset)
go r.prefetchAt(0*time.Second, r.currentOffset)

return n, nil
}
Expand All @@ -417,11 +448,19 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) {
// Not found in cache, need to fetch data
wantedRange := fmt.Sprintf("bytes=%d-%d", r.currentOffset, min(r.currentOffset+r.prefetchSize()-1, r.objectSize-1))

r.addToOutstanding(start)
var data []byte
var wait time.Duration = 1
for tries := 0; tries <= r.maxRetries; tries++ {
r.addToOutstanding(start)
data, err = r.doFetch(wantedRange)
r.removeFromOutstanding(start)

data, err := r.doFetch(wantedRange)

r.removeFromOutstanding(start)
if err != nil {
// Something went wrong, wait and retry
time.Sleep(wait * time.Second)
wait *= 2
}
}

if err != nil {
return 0, err
Expand All @@ -438,7 +477,7 @@ func (r *HTTPReader) Read(dst []byte) (n int, err error) {
r.currentOffset = start + uint64(n) // #nosec G115
r.lock.Unlock()

go r.prefetchAt(r.currentOffset)
go r.prefetchAt(0*time.Second, r.currentOffset)

return n, err
}
Loading

0 comments on commit dbd9c66

Please sign in to comment.