diff --git a/botgo/sessions/multi/multi.go b/botgo/sessions/multi/multi.go new file mode 100644 index 00000000..5dfeadbc --- /dev/null +++ b/botgo/sessions/multi/multi.go @@ -0,0 +1,97 @@ +package multi + +import ( + "sync" + "time" + + "github.com/tencent-connect/botgo/dto" + "github.com/tencent-connect/botgo/log" + "github.com/tencent-connect/botgo/sessions/manager" + "github.com/tencent-connect/botgo/token" + "github.com/tencent-connect/botgo/websocket" +) + +type ShardManager struct { + Sessions []dto.Session + SessionChans []chan dto.Session + Clients []websocket.WebSocket + APInfo *dto.WebsocketAP + Token *token.Token + Intents *dto.Intent + StartInterval time.Duration + wg sync.WaitGroup +} + +func NewShardManager(apInfo *dto.WebsocketAP, token *token.Token, intents *dto.Intent) *ShardManager { + m := &ShardManager{ + APInfo: apInfo, + Token: token, + Intents: intents, + Sessions: make([]dto.Session, apInfo.Shards), + Clients: make([]websocket.WebSocket, apInfo.Shards), + SessionChans: make([]chan dto.Session, apInfo.Shards), + } + for i := range m.Sessions { + m.SessionChans[i] = make(chan dto.Session, 1) + } + m.StartInterval = manager.CalcInterval(apInfo.SessionStartLimit.MaxConcurrency) + return m +} + +func (sm *ShardManager) StartAllShards() { + for i := uint32(0); i < sm.APInfo.Shards; i++ { + sm.StartShard(i) + } + sm.wg.Wait() +} + +func (sm *ShardManager) StartShard(shardID uint32) { + sm.wg.Add(1) + go func() { + defer sm.wg.Done() + session := dto.Session{ + URL: sm.APInfo.URL, + Token: *sm.Token, + Intent: *sm.Intents, + LastSeq: 0, + Shards: dto.ShardConfig{ + ShardID: shardID, + ShardCount: sm.APInfo.Shards, + }, + } + sm.Sessions[shardID] = session + sm.SessionChans[shardID] <- session + + for session := range sm.SessionChans[shardID] { + time.Sleep(sm.StartInterval) + sm.newConnect(session, shardID) + } + }() +} + +func (sm *ShardManager) newConnect(session dto.Session, shardID uint32) { + wsClient := websocket.ClientImpl.New(session) + sm.Clients[shardID] = wsClient + if err := wsClient.Connect(); err != nil { + log.Error(err) + sm.SessionChans[shardID] <- session // Reconnect + return + } + if session.ID != "" { + err := wsClient.Resume() + if err != nil { + log.Errorf("[ws/session] Resume error: %+v", err) + return + } + } else { + err := wsClient.Identify() + if err != nil { + log.Errorf("[ws/session] Identify error: %+v", err) + return + } + } + if err := wsClient.Listening(); err != nil { + log.Errorf("[ws/session] Listening error: %+v", err) + sm.SessionChans[shardID] <- session // Reconnect + } +} diff --git a/main.go b/main.go index 43b1d859..dfa86835 100644 --- a/main.go +++ b/main.go @@ -34,6 +34,7 @@ import ( "github.com/hoshinonyaruko/gensokyo/url" "github.com/hoshinonyaruko/gensokyo/webui" "github.com/hoshinonyaruko/gensokyo/wsclient" + "github.com/tencent-connect/botgo/sessions/multi" "google.golang.org/grpc" "github.com/gin-gonic/gin" @@ -283,8 +284,12 @@ func main() { if conf.Settings.ShardCount == 1 { go func() { wsInfo.Shards = uint32(conf.Settings.ShardNum) - if err = botgo.NewSessionManager().Start(wsInfo, token, &intent); err != nil { - log.Fatalln(err) + if wsInfo.Shards == 1 { + if err = botgo.NewSessionManager().Start(wsInfo, token, &intent); err != nil { + log.Fatalln(err) + } + } else { + multi.NewShardManager(wsInfo, token, &intent).StartAllShards() } }() log.Printf("不使用分片,所有信息都由当前gensokyo处理...\n")