Skip to content

Commit

Permalink
improve file checking
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Feb 19, 2024
1 parent c54ea87 commit 2479f7d
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 34 deletions.
12 changes: 9 additions & 3 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,12 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
}
}

sizeMap := make(map[string]int64, len(files))
storages.WalkDir(func(hash string, size int64) error {

Check failure on line 601 in cluster.go

View workflow job for this annotation

GitHub Actions / build

undefined: storages
sizeMap[hash] = size
return nil
})

logInfof("Start checking files for %s, heavy = %v", storage.String(), heavy)
var buf [1024 * 32]byte
for _, f := range files {
Expand All @@ -606,7 +612,7 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
logDebugf("Skipped empty file %s", hash)
continue
}
if size, err := storage.Size(hash); err == nil {
if size, ok := sizeMap[hash]; ok {
if size != f.Size {
logInfof("Found modified file: size of %q is %s, expect %s",
hash, bytesToUnit((float64)(size)), bytesToUnit((float64)(f.Size)))
Expand Down Expand Up @@ -747,7 +753,7 @@ func (cr *Cluster) gc() {

func (cr *Cluster) gcFor(s Storage) {
logInfo("Starting garbage collector for", s.String())
err := s.WalkDir(func(hash string) error {
err := s.WalkDir(func(hash string, _ int64) error {
if cr.issync.Load() {
return context.Canceled
}
Expand Down Expand Up @@ -848,7 +854,7 @@ func (cr *Cluster) fetchFileWithBuf(ctx context.Context, f FileInfo, hashMethod
return
}
if res.StatusCode != http.StatusOK {
err = fmt.Errorf("Unexpected status code: %d", res.StatusCode)
err = &HTTPStatusError{Code: res.StatusCode}
return
}
switch ce := strings.ToLower(res.Header.Get("Content-Encoding")); ce {
Expand Down
7 changes: 4 additions & 3 deletions cmd_compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func cmdZipCache(args []string) {
}
cacheDir := filepath.Join(baseDir, "cache")
fmt.Printf("Cache directory = %q\n", cacheDir)
err := walkCacheDir(cacheDir, func(path string) (_ error) {
err := walkCacheDir(cacheDir, func(hash string, _ int64) (_ error) {
path := filepath.Join(cacheDir, hash[0:2], hash)
if strings.HasSuffix(path, ".gz") {
return
}
Expand Down Expand Up @@ -175,13 +176,13 @@ func cmdUnzipCache(args []string) {
cacheDir := filepath.Join(baseDir, "cache")
fmt.Printf("Cache directory = %q\n", cacheDir)
var hashBuf [64]byte
err := walkCacheDir(cacheDir, func(path string) (_ error) {
err := walkCacheDir(cacheDir, func(hash string, _ int64) (_ error) {
path := filepath.Join(cacheDir, hash[0:2], hash)
target, ok := strings.CutSuffix(path, ".gz")
if !ok {
return
}

hash := filepath.Base(target)
hashMethod, err := getHashMethod(len(hash))
if err != nil {
return
Expand Down
7 changes: 1 addition & 6 deletions cmd_webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,7 @@ func cmdUploadWebdav(args []string) {
}

for _, s := range webdavs {
local.WalkDir(func(hash string) error {
size, err := local.Size(hash)
if err != nil {
logErrorf("Cannot get stat of %s: %v", hash, err)
return nil
}
local.WalkDir(func(hash string, size int64) error {
if sz, err := s.Size(hash); err == nil && sz == size {
return nil
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Storage interface {
Open(hash string) (io.ReadCloser, error)
Create(hash string, r io.Reader) error
Remove(hash string) error
WalkDir(func(hash string) error) error
WalkDir(func(hash string, size int64) error) error

ServeDownload(rw http.ResponseWriter, req *http.Request, hash string, size int64) (int64, error)
ServeMeasure(rw http.ResponseWriter, req *http.Request, size int) error
Expand Down
10 changes: 6 additions & 4 deletions storage_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *LocalStorage) Remove(hash string) error {
return os.Remove(s.hashToPath(hash))
}

func (s *LocalStorage) WalkDir(walker func(hash string) error) error {
func (s *LocalStorage) WalkDir(walker func(hash string, size int64) error) error {
return walkCacheDir(s.opt.CachePath, walker)
}

Expand Down Expand Up @@ -228,7 +228,7 @@ var hex256 = func() (hex256 []string) {
return
}()

func walkCacheDir(cacheDir string, walker func(hash string) (err error)) (err error) {
func walkCacheDir(cacheDir string, walker func(hash string, size int64) (err error)) (err error) {
for _, dir := range hex256 {
files, err := os.ReadDir(filepath.Join(cacheDir, dir))
if err != nil {
Expand All @@ -240,8 +240,10 @@ func walkCacheDir(cacheDir string, walker func(hash string) (err error)) (err er
for _, f := range files {
if !f.IsDir() {
if hash := f.Name(); len(hash) >= 2 && hash[:2] == dir {
if err := walker(hash); err != nil {
return err
if info, err := f.Info(); err == nil {
if err := walker(hash, info.Size()); err != nil {
return err
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion storage_mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (s *MountStorage) Remove(hash string) error {
return os.Remove(s.hashToPath(hash))
}

func (s *MountStorage) WalkDir(walker func(hash string) error) error {
func (s *MountStorage) WalkDir(walker func(hash string, size int64) error) error {
return walkCacheDir(s.opt.CachePath(), walker)
}

Expand Down
52 changes: 36 additions & 16 deletions storage_webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *WebDavStorage) Init(ctx context.Context) (err error) {
s.cli = gowebdav.NewAuthClient(s.opt.GetEndPoint(), gowebdav.NewEmptyAuth())
s.cli.SetHeader("Authorization", "Basic "+
base64.StdEncoding.EncodeToString(([]byte)(s.opt.GetUsername()+":"+s.opt.GetPassword())))
s.cli.SetHeader("User-Agent", ClusterUserAgentFull)
// s.cli.SetHeader("User-Agent", ClusterUserAgentFull)

if err := s.cli.Mkdir("measure", 0755); err != nil {
if !webdavIsHTTPError(err, http.StatusConflict) {
Expand All @@ -172,6 +172,33 @@ func (s *WebDavStorage) Init(ctx context.Context) (err error) {
return
}

func (s *WebDavStorage) putFile(path string, r io.Reader) error {
target, err := url.JoinPath(s.opt.GetEndPoint(), path)
if err != nil {
return err
}
logDebugf("Putting %q", target)
cr := &countReader{r.(io.ReadSeeker), 0}
req, err := http.NewRequestWithContext(context.TODO(), http.MethodPut, target, cr)
if err != nil {
return err
}
req.SetBasicAuth(s.opt.GetUsername(), s.opt.GetPassword())

res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent:
logDebugf("read %d bytes", cr.n)
return nil
default:
return &HTTPStatusError{Code: res.StatusCode}
}
}

func (s *WebDavStorage) hashToPath(hash string) string {
return path.Join("download", hash[0:2], hash)
}
Expand All @@ -189,14 +216,14 @@ func (s *WebDavStorage) Open(hash string) (io.ReadCloser, error) {
}

func (s *WebDavStorage) Create(hash string, r io.Reader) error {
return s.cli.WriteStream(s.hashToPath(hash), r, 0644)
return s.putFile(s.hashToPath(hash), r)
}

func (s *WebDavStorage) Remove(hash string) error {
return s.cli.Remove(s.hashToPath(hash))
}

func (s *WebDavStorage) WalkDir(walker func(hash string) error) error {
func (s *WebDavStorage) WalkDir(walker func(hash string, size int64) error) error {
for _, dir := range hex256 {
files, err := s.cli.ReadDir(dir)
if err != nil {
Expand All @@ -205,7 +232,7 @@ func (s *WebDavStorage) WalkDir(walker func(hash string) error) error {
for _, f := range files {
if !f.IsDir() {
if hash := f.Name(); len(hash) >= 2 && hash[:2] == dir {
if err := walker(hash); err != nil {
if err := walker(hash, f.Size()); err != nil {
return err
}
}
Expand All @@ -229,7 +256,7 @@ func copyHeader(key string, dst, src http.Header) {
}

func (s *WebDavStorage) ServeDownload(rw http.ResponseWriter, req *http.Request, hash string, size int64) (int64, error) {
target, err := url.JoinPath(s.opt.GetEndPoint(), "download", hash[0:2], hash)
target, err := url.JoinPath(s.opt.GetEndPoint(), s.hashToPath(hash))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -281,7 +308,7 @@ func (s *WebDavStorage) ServeDownload(rw http.ResponseWriter, req *http.Request,
n, _ := io.Copy(rw, resp.Body)
return n, nil
default:
return 0, fmt.Errorf("Unexpected status %d", resp.StatusCode)
return 0, &HTTPStatusError{Code: resp.StatusCode}
}
}

Expand Down Expand Up @@ -354,16 +381,9 @@ func (s *WebDavStorage) createMeasureFile(ctx context.Context, size int) (err er
logErrorf("Cannot get stat of %s: %v", t, err)
}
logInfof("Creating measure file at %q", t)
if size == 0 {
if err = s.cli.Write(t, mbChunk[:2], 0644); err != nil {
logErrorf("Cannot create measure file %q: %v", t, err)
return
}
} else {
if err = s.cli.WriteStream(t, &io.LimitedReader{R: NoChangeReader, N: tsz}, 0644); err != nil {
logErrorf("Cannot create measure file %q: %v", t, err)
return
}
if err = s.putFile(t, &io.LimitedReader{R: NoChangeReader, N: tsz}); err != nil {
logErrorf("Cannot create measure file %q: %v", t, err)
return
}
return nil
}
9 changes: 9 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -388,3 +389,11 @@ func (m *SyncMap[K, V]) GetOrSet(k K, setter func() V) (v V, has bool) {
}
return
}

type HTTPStatusError struct {
Code int
}

func (e *HTTPStatusError) Error() string {
return fmt.Sprintf("Unexpected http status %d %s", e.Code, http.StatusText(e.Code))
}

0 comments on commit 2479f7d

Please sign in to comment.