Skip to content

Commit

Permalink
add timeout for reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Jan 24, 2024
1 parent 1bf475c commit 13f6a6a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
8 changes: 6 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func (cr *Cluster) Connect(ctx context.Context) bool {
}()
}
logInfof("Dialing %s", strings.ReplaceAll(wsurl, cr.password, "<******>"))
err := cr.socket.IO().DialContext(ctx, wsurl, WithHeader(header))
tctx, cancel := context.WithTimeout(ctx, time.Second*15)
err := cr.socket.IO().DialContext(tctx, wsurl, WithHeader(header))
cancel()
if err != nil {
logError("Websocket connect error:", err)
return false
Expand Down Expand Up @@ -210,12 +212,14 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {
return
}
logInfo("Sending enable packet")
data, err := cr.socket.EmitAckContext(ctx, "enable", Map{
tctx, cancel := context.WithTimeout(ctx, time.Second*10)
data, err := cr.socket.EmitAckContext(tctx, "enable", Map{
"host": cr.host,
"port": cr.publicPort,
"version": ClusterVersion,
"byoc": cr.byoc,
})
cancel()
if err != nil {
return
}
Expand Down
28 changes: 25 additions & 3 deletions cluster_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
logInfof("Starting sync files, count: %d, total: %s", fl, bytesToUnit(stats.totalsize))
start := time.Now()

done := make(chan struct{}, 1)

for _, f := range missing {
logDebugf("File %s is for %v", f.Hash, f.targets)
pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo)
Expand All @@ -87,15 +89,35 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
return err
}
go func(f *fileInfoWithTargets) {
defer func() {
done <- struct{}{}
}()
select {
case path := <-pathRes:
if path != "" {
defer os.Remove(path)
var srcFd *os.File
if srcFd, err = os.Open(path); err != nil {
return
}
defer srcFd.Close()
relpath := hashToFilename(f.Hash)
for _, target := range f.targets {
target = filepath.Join(target, relpath)
if err := copyFile(path, target, 0644); err != nil {
logErrorf("Could not copy file %q to %q:\n\t%v", path, target, err)
dstFd, err := os.OpenFile(target, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
logErrorf("Could not create %q: %v", target, err)
continue
}
if _, err = srcFd.Seek(0, io.SeekStart); err != nil {
logErrorf("Could not seek file %q: %v", path, err)
continue
}
_, err = io.Copy(dstFd, srcFd)
dstFd.Close()
if err != nil {
logErrorf("Could not copy from %q to %q:\n\t%v", path, target, err)
continue
}
}
}
Expand All @@ -106,7 +128,7 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
}
for i := cap(stats.slots); i > 0; i-- {
select {
case <-stats.slots:
case <-done:
case <-ctx.Done():
logWarn("File sync interrupted")
return ctx.Err()
Expand Down

0 comments on commit 13f6a6a

Please sign in to comment.