Skip to content

Commit

Permalink
make file checking parallel again
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Feb 20, 2024
1 parent 202f98b commit e754c93
Showing 1 changed file with 38 additions and 16 deletions.
54 changes: 38 additions & 16 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,13 @@ type fileInfoWithTargets struct {
targets []Storage
}

func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, missing *SyncMap[string, *fileInfoWithTargets]) {
func (cr *Cluster) checkFileFor(
ctx context.Context,
storage Storage, files []FileInfo,
heavy bool,
missing *SyncMap[string, *fileInfoWithTargets],
pg *mpb.Progress,
) {
addMissing := func(f FileInfo) {
if info, has := missing.GetOrSet(f.Hash, func() *fileInfoWithTargets {
return &fileInfoWithTargets{
Expand All @@ -626,10 +632,6 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m

logInfof("Start checking files for %s, heavy = %v", storage.String(), heavy)

pg := mpb.New(mpb.WithAutoRefresh(), mpb.WithWidth(140))
setLogOutput(pg)
defer setLogOutput(nil)

var (
checkingHashMux sync.Mutex
checkingHash string
Expand All @@ -639,7 +641,7 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
bar := pg.AddBar((int64)(len(files)),
mpb.BarRemoveOnComplete(),
mpb.PrependDecorators(
decor.Name("Checking "+storage.String()),
decor.Name("> Checking "+storage.String()),
),
mpb.AppendDecorators(
decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR),
Expand All @@ -652,11 +654,13 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
checkingHashMux.Unlock()
}
if lastCheckingHash != "" {
_, err = fmt.Fprintln(w, " ", lastCheckingHash)
_, err = fmt.Fprintln(w, "\t", lastCheckingHash)
}
return
}), true),
)
defer bar.Wait()
defer bar.Abort(true)

sizeMap := make(map[string]int64, len(files))
storage.WalkDir(func(hash string, size int64) error {
Expand All @@ -666,6 +670,9 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m

var buf [1024 * 32]byte
for _, f := range files {
if ctx.Err() != nil {
return
}
start := time.Now()
hash := f.Hash
if checkingHashMux.TryLock() {
Expand Down Expand Up @@ -718,15 +725,36 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
checkingHash = ""
checkingHashMux.Unlock()

pg.Wait()
logInfof("File check finished for %s", storage.String())
return
}

func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) error {
pg := mpb.New(mpb.WithAutoRefresh(), mpb.WithWidth(140))
setLogOutput(pg)
defer setLogOutput(nil)

missingMap := NewSyncMap[string, *fileInfoWithTargets]()
done := make(chan struct{}, 0)

for _, s := range cr.storages {
cr.checkFileFor(s, files, heavyCheck, missingMap)
go func(s Storage) {
defer func() {
select {
case done <- struct{}{}:
case <-ctx.Done():
}
}()
cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg)
}(s)
}
for i := len(cr.storages); i > 0; i-- {
select {
case <-done:
case <-ctx.Done():
logWarn("File sync interrupted")
return ctx.Err()
}
}

totalFiles := len(missingMap.m)
Expand All @@ -748,18 +776,14 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b
}

var stats syncStats
stats.pg = pg
stats.noOpen = config.NoOpen || syncCfg.Source == "center"
stats.slots = NewBufSlots(syncCfg.Concurrency)
stats.totalFiles = totalFiles
for _, f := range missing {
stats.totalSize += f.Size
}

pg := mpb.New(mpb.WithAutoRefresh(), mpb.WithWidth(140))
stats.pg = pg
setLogOutput(pg)
defer setLogOutput(nil)

var barUnit decor.SizeB1024
stats.lastInc.Store(time.Now().UnixNano())
stats.totalBar = pg.AddBar(stats.totalSize,
Expand All @@ -784,8 +808,6 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b
logInfof("Starting sync files, count: %d, total: %s", totalFiles, bytesToUnit((float64)(stats.totalSize)))
start := time.Now()

done := make(chan struct{}, 0)

for _, f := range missing {
logDebugf("File %s is for %v", f.Hash, f.targets)
pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo)
Expand Down

0 comments on commit e754c93

Please sign in to comment.