Skip to content

Commit

Permalink
reforged socket.io
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Feb 3, 2024
1 parent 9fcf191 commit ac36bfb
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 852 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@
/build
/build-all.sh
/vendor

# workspace
go.work
189 changes: 106 additions & 83 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (
"sync/atomic"
"time"

"github.com/LiterMC/socket.io"
"github.com/LiterMC/socket.io/engine.io"
"github.com/hamba/avro/v2"
"github.com/klauspost/compress/zstd"
)
Expand Down Expand Up @@ -68,8 +70,8 @@ type Cluster struct {
enabled atomic.Bool
disabled chan struct{}
waitEnable []chan struct{}
reconnectOnce *sync.Once
socket *Socket
shouldEnable bool
socket *socket.Socket
cancelKeepalive context.CancelFunc
downloadMux sync.Mutex
downloading map[string]chan error
Expand All @@ -83,7 +85,9 @@ type Cluster struct {
}

func NewCluster(
ctx context.Context, baseDir string,
ctx context.Context,
prefix string,
baseDir string,
host string, publicPort uint16,
username string, password string,
byoc bool, dialer *net.Dialer,
Expand All @@ -99,7 +103,7 @@ func NewCluster(
username: username,
password: password,
useragent: "openbmclapi-cluster/" + ClusterVersion,
prefix: "https://openbmclapi.bangbang93.com",
prefix: prefix,
byoc: byoc,

cacheDir: filepath.Join(baseDir, "cache"),
Expand Down Expand Up @@ -153,60 +157,55 @@ func (cr *Cluster) Connect(ctx context.Context) bool {
logDebug("Extra connect")
return true
}
wsurl := httpToWs(cr.prefix) +
fmt.Sprintf("/socket.io/?clusterId=%s&clusterSecret=%s&EIO=4&transport=websocket", cr.username, cr.password)
header := http.Header{}
header.Set("Origin", cr.prefix)
header.Set("User-Agent", cr.useragent)

connectCh := make(chan struct{}, 0)
connected := sync.OnceFunc(func() {
close(connectCh)
engio, err := engine.NewSocket(engine.Options{
Host: cr.prefix,
Path: "/socket.io/",
ExtraQuery: url.Values{
"clusterId": {cr.username},
"clusterSecret": {cr.password},
},
ExtraHeaders: http.Header{
"Origin": {cr.prefix},
"User-Agent": {cr.useragent},
},
DialTimeout: time.Minute * 6,
})

reconnectOnce := new(sync.Once)
cr.reconnectOnce = reconnectOnce

cr.socket = NewSocket(NewESocket())
cr.socket.ConnectHandle = func(*Socket) {
connected()
}
cr.socket.DisconnectHandle = func(*Socket) {
connected()
reconnectOnce.Do(func() {
go cr.disconnected()
})
}
cr.socket.ErrorHandle = func(*Socket) {
connected()
reconnectOnce.Do(func() {
go func() {
if cr.disconnected() {
logWarn("Reconnecting due to SIO error")
if !cr.Connect(ctx) {
logError("Cannot reconnect to server, exit.")
os.Exit(0x08)
}
if err := cr.Enable(ctx); err != nil {
logError("Cannot enable cluster:", err, "; exit.")
os.Exit(0x08)
}
}
}()
})
}
logInfof("Dialing %s", strings.ReplaceAll(wsurl, cr.password, "<******>"))
tctx, cancel := context.WithTimeout(ctx, time.Second*15)
err := cr.socket.IO().DialContext(tctx, wsurl, WithHeader(header))
cancel()
if err != nil {
logError("Websocket connect error:", err)
logErrorf("Could not parse Engine.IO options: %v; exit.", err)
os.Exit(1)
}
engio.OnDisconnect(func(_ *engine.Socket, err error) {
go cr.disconnected()
logErrorf("Disconnected: %v", err)
})
engio.OnMessage(func(_ *engine.Socket, data []byte){
logDebugf("Engine.IO recv: %q", (string)(data))
})

cr.socket = socket.NewSocket(engio)
cr.socket.OnConnect(func(*socket.Socket, string) {
if cr.shouldEnable {
if err := cr.Enable(ctx); err != nil {
logErrorf("Cannot enable cluster: %v; exit.", err)
os.Exit(0x08)
}
}
})
cr.socket.OnDisconnect(func(*socket.Socket, string) {
go cr.disconnected()
})
cr.socket.OnError(func(_ *socket.Socket, err error){
logErrorf("Socket.IO error: %v", err)
})
logInfof("Dialing %s", strings.ReplaceAll(engio.URL().String(), cr.password, "<******>"))
if err := engio.Dial(ctx); err != nil {
logErrorf("Dial error: %v", err)
return false
}
select {
case <-ctx.Done():
if err := cr.socket.Connect(""); err != nil {
logErrorf("Open namespace error: %v", err)
return false
case <-connectCh:
}
return true
}
Expand All @@ -232,19 +231,28 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {
logDebug("Extra enable")
return
}

cr.shouldEnable = true

logInfo("Sending enable packet")
tctx, cancel := context.WithTimeout(ctx, time.Minute*6)
data, err := cr.socket.EmitAckContext(tctx, "enable", Map{
resCh, err := cr.socket.EmitWithAck("enable", Map{
"host": cr.host,
"port": cr.publicPort,
"version": ClusterVersion,
"byoc": cr.byoc,
})
cancel()
if err != nil {
return
}
logInfo("get enable ack:", data)
var data []any
select {
case <-tctx.Done():
return tctx.Err()
case data = <-resCh:
cancel()
}
logDebug("got enable ack:", data)
if ero := data[0]; ero != nil {
return fmt.Errorf("Enable failed: %v", ero)
}
Expand All @@ -260,26 +268,23 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {

var keepaliveCtx context.Context
keepaliveCtx, cr.cancelKeepalive = context.WithCancel(ctx)
reconnectOnce := cr.reconnectOnce
createInterval(keepaliveCtx, func() {
tctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2)
ok := cr.KeepAlive(tctx)
cancel()
if !ok {
if keepaliveCtx.Err() == nil {
reconnectOnce.Do(func() {
logInfo("Reconnecting due to keepalive failed")
cr.Disable(ctx)
logInfo("Reconnecting ...")
if !cr.Connect(ctx) {
logError("Cannot reconnect to server, exit.")
os.Exit(1)
}
if err := cr.Enable(ctx); err != nil {
logError("Cannot enable cluster:", err, "; exit.")
os.Exit(1)
}
})
logInfo("Reconnecting due to keepalive failed")
cr.Disable(ctx)
logInfo("Reconnecting ...")
if !cr.Connect(ctx) {
logError("Cannot reconnect to server, exit.")
os.Exit(1)
}
if err := cr.Enable(ctx); err != nil {
logError("Cannot enable cluster:", err, "; exit.")
os.Exit(1)
}
}
}
}, KeepAliveInterval)
Expand All @@ -290,7 +295,7 @@ func (cr *Cluster) Enable(ctx context.Context) (err error) {
func (cr *Cluster) KeepAlive(ctx context.Context) (ok bool) {
hits, hbts := cr.hits.Swap(0), cr.hbts.Swap(0)
cr.stats.AddHits(hits, hbts)
data, err := cr.socket.EmitAckContext(ctx, "keep-alive", Map{
resCh, err := cr.socket.EmitWithAck("keep-alive", Map{
"time": time.Now().UTC().Format("2006-01-02T15:04:05Z"),
"hits": hits,
"bytes": hbts,
Expand All @@ -302,6 +307,12 @@ func (cr *Cluster) KeepAlive(ctx context.Context) (ok bool) {
logError("Error when keep-alive:", err)
return false
}
var data []any
select {
case <-ctx.Done():
return false
case data = <-resCh:
}
if ero := data[0]; len(data) <= 1 || ero != nil {
logError("Keep-alive failed:", ero)
return false
Expand Down Expand Up @@ -330,6 +341,8 @@ func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
cr.mux.Lock()
defer cr.mux.Unlock()

cr.shouldEnable = false

if !cr.enabled.Load() {
logDebug("Extra disable")
return false
Expand All @@ -343,20 +356,24 @@ func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
}
logInfo("Disabling cluster")
tctx, cancel := context.WithTimeout(ctx, time.Second*(time.Duration)(config.KeepaliveTimeout))
data, err := cr.socket.EmitAckContext(tctx, "disable")
cancel()
if err != nil {
logErrorf("Disable failed: %v", err)
ok = false
} else {
logDebug("disable ack:", data)
if ero := data[0]; ero != nil {
logErrorf("Disable failed: %v", ero)
ok = false
} else if !data[1].(bool) {
logError("Disable failed: acked non true value")
if resCh, err := cr.socket.EmitWithAck("disable"); err == nil {
select {
case <-tctx.Done():
ok = false
case data := <-resCh:
cancel()
logDebug("disable ack:", data)
if ero := data[0]; ero != nil {
logErrorf("Disable failed: %v", ero)
ok = false
} else if !data[1].(bool) {
logError("Disable failed: acked non true value")
ok = false
}
}
} else {
logErrorf("Disable failed: %v", err)
ok = false
}

cr.enabled.Store(false)
Expand Down Expand Up @@ -408,10 +425,16 @@ func (pair *CertKeyPair) SaveAsFile() (cert, key string, err error) {

func (cr *Cluster) RequestCert(ctx context.Context) (ckp *CertKeyPair, err error) {
logInfo("Requesting certificates, please wait ...")
data, err := cr.socket.EmitAckContext(ctx, "request-cert")
resCh, err := cr.socket.EmitWithAck("request-cert")
if err != nil {
return
}
var data []any
select {
case <-ctx.Done():
return nil, ctx.Err()
case data = <-resCh:
}
if ero := data[0]; ero != nil {
err = fmt.Errorf("socket.io remote error: %v", ero)
return
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type HijackConfig struct {
type Config struct {
Debug bool `yaml:"debug"`
RecordServeInfo bool `yaml:"record_serve_info"`
SkipFirstSync bool `yaml:"skip-first-sync"`
LogSlots int `yaml:"log-slots"`
Byoc bool `yaml:"byoc"`
NoOpen bool `yaml:"noopen"`
Expand Down Expand Up @@ -97,6 +98,7 @@ func (cfg *Config) applyWebManifest(manifest map[string]any) {
var defaultConfig = Config{
Debug: false,
RecordServeInfo: false,
SkipFirstSync: false,
LogSlots: 7,
Byoc: false,
NoOpen: false,
Expand Down
4 changes: 2 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
debug: false
record_serve_info: false
log-slots: 7
nohttps: false
byoc: false
noopen: false
no_heavy_check: false
trusted-x-forwarded-for: false
Expand Down Expand Up @@ -29,7 +29,7 @@ oss:
- folder_path: oss_mirror
redirect_base: https://oss.example.com/base/paths
skip_measure_gen: false
hijack_port:
hijack:
enable: false
server_host: ""
server_port: 8090
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
module github.com/LiterMC/go-openbmclapi

go 1.21
go 1.21.6

require (
github.com/gorilla/websocket v1.5.1
github.com/LiterMC/socket.io v0.0.0-20240203043357-349b916dd92e
github.com/hamba/avro/v2 v2.18.0
github.com/klauspost/compress v1.17.4
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/gorilla/websocket v1.5.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/LiterMC/socket.io v0.0.0-20240203043357-349b916dd92e h1:tLftcik7JiGMqBAhUm5COZBA3b3UeSTgQsvcujkWFJM=
github.com/LiterMC/socket.io v0.0.0-20240203043357-349b916dd92e/go.mod h1:60lM7+qdBnP64Fk2fB6WmAS6HxI6WCdhlcvaSnutx50=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -25,5 +27,7 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit ac36bfb

Please sign in to comment.