Skip to content

Commit

Permalink
fix progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Feb 20, 2024
1 parent 8b4c220 commit 202f98b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 21 deletions.
65 changes: 45 additions & 20 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -588,6 +589,7 @@ func (cr *Cluster) SyncFiles(ctx context.Context, files []FileInfo, heavyCheck b
return
}

sort.Slice(files, func(i, j int) bool { return files[i].Hash < files[j].Hash })
cr.syncFiles(ctx, files, heavyCheck)

fileset := make(map[string]int64, len(files))
Expand Down Expand Up @@ -629,24 +631,31 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
defer setLogOutput(nil)

var (
checkingHashMux sync.RWMutex
checkingHash string
checkingHashMux sync.Mutex
checkingHash string
lastCheckingHash string
)

bar := pg.AddBar((int64)(len(files)),
mpb.BarRemoveOnComplete(),
mpb.PrependDecorators(
decor.Name("Checking "+storage.String()),
decor.Any(func(decor.Statistics) string {
checkingHashMux.RLock()
defer checkingHashMux.RUnlock()
return "/" + checkingHash
}),
),
mpb.AppendDecorators(
decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR),
decor.NewPercentage("%d"),
decor.EwmaETA(decor.ET_STYLE_GO, 30),
),
mpb.BarExtender((mpb.BarFillerFunc)(func(w io.Writer, _ decor.Statistics) (err error) {
if checkingHashMux.TryLock() {
lastCheckingHash = checkingHash
checkingHashMux.Unlock()
}
if lastCheckingHash != "" {
_, err = fmt.Fprintln(w, " ", lastCheckingHash)
}
return
}), true),
)

sizeMap := make(map[string]int64, len(files))
Expand All @@ -657,25 +666,26 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m

var buf [1024 * 32]byte
for _, f := range files {
start := time.Now()
hash := f.Hash
checkingHashMux.Lock()
checkingHash = hash
checkingHashMux.Unlock()
bar.Increment()
if checkingHashMux.TryLock() {
checkingHash = hash
checkingHashMux.Unlock()
}
if f.Size == 0 {
logDebugf("Skipped empty file %s", hash)
continue
goto CONTINUE
}
if size, ok := sizeMap[hash]; ok {
if size != f.Size {
logInfof("Found modified file: size of %q is %d, expect %d", hash, size, f.Size)
logWarnf("Found modified file: size of %q is %d, expect %d", hash, size, f.Size)
goto MISSING
}
if heavy {
hashMethod, err := getHashMethod(len(hash))
if err != nil {
logErrorf("Unknown hash method for %q", hash)
continue
goto CONTINUE
}
hw := hashMethod.New()

Expand All @@ -688,19 +698,26 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
r.Close()
if err != nil {
logErrorf("Could not calculate hash for %q: %v", hash, err)
continue
goto CONTINUE
}
if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash {
logInfof("Found modified file: hash of %q is %s, expect %s", hash, hs, f.Hash)
logWarnf("Found modified file: hash of %q is %s, expect %s", hash, hs, f.Hash)
goto MISSING
}
}
continue
goto CONTINUE
}
logDebugf("Could not found file %q", hash)
MISSING:
addMissing(f)
CONTINUE:
bar.EwmaIncrement(time.Since(start))
}

checkingHashMux.Lock()
checkingHash = ""
checkingHashMux.Unlock()

pg.Wait()
logInfof("File check finished for %s", storage.String())
return
Expand Down Expand Up @@ -862,7 +879,10 @@ func (cr *Cluster) gcFor(s Storage) {
}

func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) (<-chan string, error) {
const maxRetryCount = 5
const (
maxRetryCount = 5
maxTryWithOpen = 3
)

slotId, buf, free := stats.slots.Alloc(ctx)
if buf == nil {
Expand Down Expand Up @@ -902,13 +922,14 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo)
)
defer bar.Abort(true)

noOpen := stats.noOpen
interval := time.Second
for {
bar.SetCurrent(0)
hashMethod, err := getHashMethod(len(f.Hash))
if err == nil {
var path string
if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, stats.noOpen, func(r io.Reader) io.Reader {
if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, func(r io.Reader) io.Reader {
return ProxyReader(r, bar, stats.totalBar, &stats.lastInc)
}); err == nil {
pathRes <- path
Expand All @@ -922,9 +943,13 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo)
bar.SetRefill(bar.Current())

logErrorf("Download error %s:\n\t%s", f.Path, err)
if trycount.Add(1) > maxRetryCount {
c := trycount.Add(1)
if c > maxRetryCount {
break
}
if c > maxTryWithOpen {
noOpen = true
}
select {
case <-time.After(interval):
interval *= 2
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/LiterMC/go-openbmclapi
go 1.21.6

require (
github.com/LiterMC/socket.io v0.1.0
github.com/LiterMC/socket.io v0.1.1
github.com/hamba/avro/v2 v2.18.0
github.com/klauspost/compress v1.17.4
github.com/patrickmn/go-cache v2.1.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/LiterMC/socket.io v0.1.0 h1:p3SGNJRKaTldk5Weye1EvKG92l02fLyRgRDmkcLzC7U=
github.com/LiterMC/socket.io v0.1.0/go.mod h1:60lM7+qdBnP64Fk2fB6WmAS6HxI6WCdhlcvaSnutx50=
github.com/LiterMC/socket.io v0.1.1 h1:3DDMHFIG73HlUfjrH8bm1WPGHR+bEWbxQEofUr8pQeg=
github.com/LiterMC/socket.io v0.1.1/go.mod h1:60lM7+qdBnP64Fk2fB6WmAS6HxI6WCdhlcvaSnutx50=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
Expand Down

0 comments on commit 202f98b

Please sign in to comment.