diff --git a/cluster.go b/cluster.go index 1e453b2f..d66bf00a 100644 --- a/cluster.go +++ b/cluster.go @@ -175,7 +175,9 @@ func (cr *Cluster) Connect(ctx context.Context) bool { }() } logInfof("Dialing %s", strings.ReplaceAll(wsurl, cr.password, "<******>")) - err := cr.socket.IO().DialContext(ctx, wsurl, WithHeader(header)) + tctx, cancel := context.WithTimeout(ctx, time.Second*15) + err := cr.socket.IO().DialContext(tctx, wsurl, WithHeader(header)) + cancel() if err != nil { logError("Websocket connect error:", err) return false @@ -210,12 +212,14 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) { return } logInfo("Sending enable packet") - data, err := cr.socket.EmitAckContext(ctx, "enable", Map{ + tctx, cancel := context.WithTimeout(ctx, time.Second*10) + data, err := cr.socket.EmitAckContext(tctx, "enable", Map{ "host": cr.host, "port": cr.publicPort, "version": ClusterVersion, "byoc": cr.byoc, }) + cancel() if err != nil { return } diff --git a/cluster_oss.go b/cluster_oss.go index 0e462338..99e6bc0c 100644 --- a/cluster_oss.go +++ b/cluster_oss.go @@ -79,6 +79,8 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error { logInfof("Starting sync files, count: %d, total: %s", fl, bytesToUnit(stats.totalsize)) start := time.Now() + done := make(chan struct{}, 1) + for _, f := range missing { logDebugf("File %s is for %v", f.Hash, f.targets) pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo) @@ -87,15 +89,35 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error { return err } go func(f *fileInfoWithTargets) { + defer func() { + done <- struct{}{} + }() select { case path := <-pathRes: if path != "" { defer os.Remove(path) + var srcFd *os.File + if srcFd, err = os.Open(path); err != nil { + return + } + defer srcFd.Close() relpath := hashToFilename(f.Hash) for _, target := range f.targets { target = filepath.Join(target, relpath) - if err := copyFile(path, target, 0644); err != nil { - logErrorf("Could not copy file %q to %q:\n\t%v", path, target, err) + dstFd, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + logErrorf("Could not create %q: %v", target, err) + continue + } + if _, err = srcFd.Seek(0, io.SeekStart); err != nil { + logErrorf("Could not seek file %q: %v", path, err) + continue + } + _, err = io.Copy(dstFd, srcFd) + dstFd.Close() + if err != nil { + logErrorf("Could not copy from %q to %q:\n\t%v", path, target, err) + continue } } } @@ -106,7 +128,7 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error { } for i := cap(stats.slots); i > 0; i-- { select { - case <-stats.slots: + case <-done: case <-ctx.Done(): logWarn("File sync interrupted") return ctx.Err()