diff --git a/cluster/cluster.go b/cluster/cluster.go index f7198d5..67f93bc 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -143,7 +143,10 @@ func (cr *Cluster) Enable(ctx context.Context) error { }() oldStatus := cr.status.Swap(clusterEnabling) defer cr.status.CompareAndSwap(clusterEnabling, oldStatus) + return cr.enable(ctx) +} +func (cr *Cluster) enable(ctx context.Context) error { storageStr := cr.storageManager.GetFlavorString(cr.storages) log.TrInfof("info.cluster.enable.sending") @@ -189,15 +192,64 @@ func (cr *Cluster) Enable(ctx context.Context) error { if v := data[1]; !v.(bool) { return fmt.Errorf("FATAL: Enable ack non true value, got (%T) %#v", v, v) } - cr.disableSignal = make(chan struct{}, 0) + disableSignal := make(chan struct{}, 0) + cr.disableSignal = disableSignal log.TrInfof("info.cluster.enabled") cr.status.Store(clusterEnabled) + cr.socket.OnceConnect(func(_ *socket.Socket, ns string) { + if ns != "" { + return + } + if cr.status.Load() != clusterEnabled { + return + } + select { + case <-disableSignal: + return + default: + } + cr.status.Store(clusterEnabling) + go cr.reEnable(disableSignal) + }) return nil } +func (cr *Cluster) reEnable(disableSignal <-chan struct{}) { + tctx, cancel := context.WithTimeout(context.Background(), time.Minute*7) + go func() { + select { + case <-tctx.Done(): + case <-disableSignal: + cancel() + } + }() + err := cr.enable(tctx) + cancel() + if err != nil { + log.TrErrorf("error.cluster.enable.failed", err) + if cr.status.Load() == clusterEnabled { + ctx, cancel := context.WithCancel(context.Background()) + timer := time.AfterFunc(time.Minute, func() { + cancel() + if cr.status.CompareAndSwap(clusterEnabled, clusterEnabling) { + cr.reEnable(disableSignal) + } + }) + go func() { + select { + case <-ctx.Done(): + case <-disableSignal: + cancel() + } + }() + } + } +} + // Disable send disable packet to central server // The context passed in only affect the logical of Disable method // Disable method is thread-safe, and it will wait until the first invoke exited +// Connection will not be closed after disable func (cr *Cluster) Disable(ctx context.Context) error { if cr.Enabled() { cr.mux.Lock() @@ -241,8 +293,8 @@ func (cr *Cluster) disable(ctx context.Context) error { return nil } -// markDisconnected marked the cluster as error or kicked -func (cr *Cluster) markDisconnected(kicked bool) { +// markKicked marks the cluster as kicked +func (cr *Cluster) markKicked() { if !cr.Enabled() { return } @@ -252,12 +304,5 @@ func (cr *Cluster) markDisconnected(kicked bool) { return } defer close(cr.disableSignal) - - var nextStatus int32 - if kicked { - nextStatus = clusterKicked - } else { - nextStatus = clusterError - } - cr.status.Store(nextStatus) + cr.status.Store(clusterKicked) } diff --git a/cluster/keepalive.go b/cluster/keepalive.go index 8d6362e..478c176 100644 --- a/cluster/keepalive.go +++ b/cluster/keepalive.go @@ -39,6 +39,7 @@ type keepAliveReq struct { } // KeepAlive will send the keep-alive packet and fresh hits & hit bytes data +// If cluster is kicked by the central server, the cluster status will be mark as kicked func (cr *Cluster) KeepAlive(ctx context.Context) KeepAliveRes { hits, hbts := cr.hits.Load(), cr.hbts.Load() resCh, err := cr.socket.EmitWithAck("keep-alive", keepAliveReq{ @@ -82,6 +83,7 @@ func (cr *Cluster) KeepAlive(ctx context.Context) KeepAliveRes { cr.hits.Add(-hits2) cr.hbts.Add(-hbts2) if data[1] == false { + cr.markKicked() return KeepAliveKicked } return KeepAliveSucceed diff --git a/cluster/socket.go b/cluster/socket.go index fcb3e2b..0eb1a8a 100644 --- a/cluster/socket.go +++ b/cluster/socket.go @@ -21,6 +21,7 @@ package cluster import ( "context" + "fmt" "github.com/LiterMC/socket.io" "github.com/LiterMC/socket.io/engine.io" @@ -28,6 +29,102 @@ import ( // Connect connects to the central server // The context passed in only affect the logical of Connect method +// Connection will not be closed after disable +// +// See Disconnect func (cr *Cluster) Connect(ctx context.Context) error { - return + if !cr.Disconnected() { + return errors.New("Attempt to connect while connecting") + } + _, err := cr.GetAuthToken(ctx) + if err != nil { + return fmt.Errorf("Auth failed %w", err) + } + + engio, err := engine.NewSocket(engine.Options{ + Host: cr.prefix, + Path: "/socket.io/", + ExtraHeaders: http.Header{ + "Origin": {cr.prefix}, + "User-Agent": {build.ClusterUserAgent}, + }, + DialTimeout: time.Minute * 6, + }) + if err != nil { + return fmt.Errorf("Could not parse Engine.IO options: %w", err) + } + if ctx.Value("cluster.options.engine-io.debug") == true { + engio.OnRecv(func(s *engine.Socket, data []byte) { + log.Debugf("Engine.IO %s recv: %q", s.ID(), (string)(data)) + }) + engio.OnSend(func(s *engine.Socket, data []byte) { + log.Debugf("Engine.IO %s send: %q", s.ID(), (string)(data)) + }) + } + engio.OnConnect(func(s *engine.Socket) { + log.Info("Engine.IO %s connected for cluster %s", s.ID(), cr.Id()) + }) + engio.OnDisconnect(cr.onDisconnected) + engio.OnDialError(func(s *engine.Socket, err *DialErrorContext) { + if err.Count() < 0 { + return + } + log.TrErrorf("error.cluster.connect.failed", cr.Id(), err.Count(), config.MaxReconnectCount, err.Err()) + if config.MaxReconnectCount >= 0 && err.Count() >= config.MaxReconnectCount { + log.TrErrorf("error.cluster.connect.failed.toomuch", cr.Id()) + s.Close() + } + }) + log.Infof("Dialing %s for cluster %s", engio.URL().String(), cr.Id()) + if err := engio.Dial(ctx); err != nil { + log.Errorf("Dial error: %v", err) + return false + } + + cr.socket = socket.NewSocket(engio, socket.WithAuthTokenFn(func() (string, error) { + token, err := cr.GetAuthToken(ctx) + if err != nil { + log.TrErrorf("error.cluster.auth.failed", err) + return "", err + } + return token, nil + })) + cr.socket.OnError(func(_ *socket.Socket, err error) { + log.Errorf("Socket.IO error: %v", err) + }) + cr.socket.OnMessage(func(event string, data []any) { + if event == "message" { + log.Infof("[remote]: %v", data[0]) + } + }) + log.Info("Connecting to socket.io namespace") + if err := cr.socket.Connect(""); err != nil { + log.Errorf("Namespace connect error: %v", err) + return false + } + return true +} + +// Disconnect close the connection which connected to the central server +// Disconnect will not disable the cluster +// +// See Connect +func (cr *Cluster) Disconnect() error { + if cr.Disconnected() { + return + } + cr.mux.Lock() + defer cr.mux.Unlock() + err := cr.socket.Close() + cr.socketStatus.Store(socketDisconnected) + cr.socket = nil + return err +} + +func (cr *Cluster) onDisconnected(s *engine.Socket, err error) { + if err != nil { + log.Warnf("Engine.IO %s disconnected: %v", s.ID(), err) + } + cr.socketStatus.Store(socketDisconnected) + cr.socket = nil } diff --git a/cluster/status.go b/cluster/status.go index f08b958..96e94d3 100644 --- a/cluster/status.go +++ b/cluster/status.go @@ -19,14 +19,29 @@ package cluster +const ( + socketDisconnected = 0 + socketConnected = 1 + socketConnecting = 2 +) + const ( clusterDisabled = 0 clusterEnabled = 1 clusterEnabling = 2 clusterKicked = 4 - clusterError = 5 ) +// Disconnected returns true if the cluster is disconnected from the central server +func (cr *Cluster) Disconnected() bool { + return cr.socketStatus.Load() == socketDisconnected +} + +// Connected returns true if the cluster is connected to the central server +func (cr *Cluster) Connected() bool { + return cr.socketStatus.Load() == socketConnected +} + // Enabled returns true if the cluster is enabled or enabling func (cr *Cluster) Enabled() bool { s := cr.status.Load() @@ -48,11 +63,6 @@ func (cr *Cluster) IsKicked() bool { return cr.status.Load() == clusterKicked } -// IsError returns true if the cluster is disabled since connection error -func (cr *Cluster) IsError() bool { - return cr.status.Load() == clusterError -} - // WaitForEnable returns a channel which receives true when cluster enabled succeed, or receives false when it failed to enable // If the cluster is already enable, the channel always returns true // The channel should not be used multiple times