Skip to content

Commit

Permalink
make syncInterval configurable
Browse files Browse the repository at this point in the history
Fix keepalive context issue
Fix connection limit for OSS
  • Loading branch information
zyxkad committed Jan 24, 2024
1 parent 13f6a6a commit e976342
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 9 deletions.
2 changes: 2 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ port: 4000
cluster_id: ${CLUSTER_ID}
# CLUSTER_SECRET
cluster_secret: ${CLUSTER_SECRET}
# 文件同步间隔 (分钟)
sync_interval: 10
# 同步文件时最多打开的连接数量
download_max_conn: 64
# 服务器上行限制
Expand Down
13 changes: 7 additions & 6 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,19 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {
var keepaliveCtx context.Context
keepaliveCtx, cr.cancelKeepalive = context.WithCancel(ctx)
createInterval(keepaliveCtx, func() {
ctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2)
defer cancel()
if !cr.KeepAlive(ctx) {
tctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2)
ok := cr.KeepAlive(tctx)
cancel()
if !ok {
if keepaliveCtx.Err() == nil {
logInfo("Reconnecting due to keepalive failed")
cr.Disable(keepaliveCtx)
cr.Disable(ctx)
logInfo("Reconnecting ...")
if !cr.Connect(keepaliveCtx) {
if !cr.Connect(ctx) {
logError("Cannot reconnect to server, exit.")
os.Exit(1)
}
if err := cr.Enable(keepaliveCtx); err != nil {
if err := cr.Enable(ctx); err != nil {
logError("Cannot enable cluster:", err, "; exit.")
os.Exit(1)
}
Expand Down
7 changes: 6 additions & 1 deletion cluster_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
case path := <-pathRes:
if path != "" {
defer os.Remove(path)
// acquire slot here
buf := <-stats.slots
defer func(){
stats.slots <- buf
}()
var srcFd *os.File
if srcFd, err = os.Open(path); err != nil {
return
Expand All @@ -113,7 +118,7 @@ func (cr *Cluster) ossSyncFiles(ctx context.Context, files []FileInfo) error {
logErrorf("Could not seek file %q: %v", path, err)
continue
}
_, err = io.Copy(dstFd, srcFd)
_, err = io.CopyBuffer(dstFd, srcFd, buf)
dstFd.Close()
if err != nil {
logErrorf("Could not copy from %q to %q:\n\t%v", path, target, err)
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Config struct {
Port uint16 `yaml:"port"`
ClusterId string `yaml:"cluster_id"`
ClusterSecret string `yaml:"cluster_secret"`
SyncInterval int `yaml:"sync_interval"`
DownloadMaxConn int `yaml:"download_max_conn"`
ServeLimit ServeLimitConfig `yaml:"serve_limit"`
Oss OSSConfig `yaml:"oss"`
Expand All @@ -84,6 +85,7 @@ func readConfig() (config Config) {
Port: 4000,
ClusterId: "${CLUSTER_ID}",
ClusterSecret: "${CLUSTER_SECRET}",
SyncInterval: 10,
DownloadMaxConn: 64,
ServeLimit: ServeLimitConfig{
Enable: false,
Expand Down
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public_port: 8080
port: 4000
cluster_id: ${CLUSTER_ID}
cluster_secret: ${CLUSTER_SECRET}
sync_interval: 10
download_max_conn: 64
serve_limit:
enable: false
Expand Down
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
)

var (
SyncFileInterval = time.Minute * 10
KeepAliveInterval = time.Second * 59
)

Expand Down Expand Up @@ -252,7 +251,7 @@ START:
return
}
cluster.SyncFiles(ctx, fl)
}, SyncFileInterval)
}, (time.Duration)(config.SyncInterval)*time.Minute)

if err := cluster.Enable(ctx); err != nil {
logError("Cannot enable cluster:", err)
Expand Down

0 comments on commit e976342

Please sign in to comment.