diff --git a/README.MD b/README.MD index 4a340cb5..1a21cc54 100644 --- a/README.MD +++ b/README.MD @@ -84,6 +84,8 @@ port: 4000 cluster_id: ${CLUSTER_ID} # CLUSTER_SECRET cluster_secret: ${CLUSTER_SECRET} +# 文件同步间隔 (分钟) +sync_interval: 10 # 同步文件时最多打开的连接数量 download_max_conn: 64 # 服务器上行限制 diff --git a/cluster.go b/cluster.go index d66bf00a..4d1d32c2 100644 --- a/cluster.go +++ b/cluster.go @@ -239,18 +239,19 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) { var keepaliveCtx context.Context keepaliveCtx, cr.cancelKeepalive = context.WithCancel(ctx) createInterval(keepaliveCtx, func() { - ctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2) - defer cancel() - if !cr.KeepAlive(ctx) { + tctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2) + ok := cr.KeepAlive(tctx) + cancel() + if !ok { if keepaliveCtx.Err() == nil { logInfo("Reconnecting due to keepalive failed") - cr.Disable(keepaliveCtx) + cr.Disable(ctx) logInfo("Reconnecting ...") - if !cr.Connect(keepaliveCtx) { + if !cr.Connect(ctx) { logError("Cannot reconnect to server, exit.") os.Exit(1) } - if err := cr.Enable(keepaliveCtx); err != nil { + if err := cr.Enable(ctx); err != nil { logError("Cannot enable cluster:", err, "; exit.") os.Exit(1) } diff --git a/cluster_oss.go b/cluster_oss.go index 99e6bc0c..723feca1 100644 --- a/cluster_oss.go +++ b/cluster_oss.go @@ -96,6 +96,11 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error { case path := <-pathRes: if path != "" { defer os.Remove(path) + // acquire slot here + buf := <-stats.slots + defer func(){ + stats.slots <- buf + }() var srcFd *os.File if srcFd, err = os.Open(path); err != nil { return @@ -113,7 +118,7 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error { logErrorf("Could not seek file %q: %v", path, err) continue } - _, err = io.Copy(dstFd, srcFd) + _, err = io.CopyBuffer(dstFd, srcFd, buf) dstFd.Close() if err != nil { logErrorf("Could not copy from %q to %q:\n\t%v", path, target, err) diff --git a/config.go b/config.go index c27e7372..ba15f8f2 100644 --- a/config.go +++ b/config.go @@ -66,6 +66,7 @@ type Config struct { Port uint16 `yaml:"port"` ClusterId string `yaml:"cluster_id"` ClusterSecret string `yaml:"cluster_secret"` + SyncInterval int `yaml:"sync_interval"` DownloadMaxConn int `yaml:"download_max_conn"` ServeLimit ServeLimitConfig `yaml:"serve_limit"` Oss OSSConfig `yaml:"oss"` @@ -84,6 +85,7 @@ func readConfig() (config Config) { Port: 4000, ClusterId: "${CLUSTER_ID}", ClusterSecret: "${CLUSTER_SECRET}", + SyncInterval: 10, DownloadMaxConn: 64, ServeLimit: ServeLimitConfig{ Enable: false, diff --git a/config.yaml b/config.yaml index ab12a8c2..659f979c 100644 --- a/config.yaml +++ b/config.yaml @@ -7,6 +7,7 @@ public_port: 8080 port: 4000 cluster_id: ${CLUSTER_ID} cluster_secret: ${CLUSTER_SECRET} +sync_interval: 10 download_max_conn: 64 serve_limit: enable: false diff --git a/main.go b/main.go index ccd48262..43e21b16 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,6 @@ import ( ) var ( - SyncFileInterval = time.Minute * 10 KeepAliveInterval = time.Second * 59 ) @@ -252,7 +251,7 @@ START: return } cluster.SyncFiles(ctx, fl) - }, SyncFileInterval) + }, (time.Duration)(config.SyncInterval)*time.Minute) if err := cluster.Enable(ctx); err != nil { logError("Cannot enable cluster:", err)