diff --git a/go b/go new file mode 100644 index 00000000..e69de29b diff --git a/main.go b/main.go index fc2f12d4..54b7ed4c 100644 --- a/main.go +++ b/main.go @@ -747,14 +747,19 @@ func ThreadEventHandler() event.ThreadEventHandler { // GroupATMessageEventHandler 实现处理 群at 消息的回调 func GroupATMessageEventHandler() event.GroupATMessageEventHandler { return func(event *dto.WSPayload, data *dto.WSGroupATMessageData) error { - botstats.RecordMessageReceived() + go p.ProcessGroupMessage(data) + + if !config.GetDisableErrorChan() { + botstats.RecordMessageReceived() + } + if config.GetEnableChangeWord() { data.Content = acnode.CheckWordIN(data.Content) if data.Author.Username != "" { data.Author.Username = acnode.CheckWordIN(data.Author.Username) } } - go p.ProcessGroupMessage(data) + return nil } } @@ -762,14 +767,19 @@ func GroupATMessageEventHandler() event.GroupATMessageEventHandler { // C2CMessageEventHandler 实现处理 群私聊 消息的回调 func C2CMessageEventHandler() event.C2CMessageEventHandler { return func(event *dto.WSPayload, data *dto.WSC2CMessageData) error { - botstats.RecordMessageReceived() + go p.ProcessC2CMessage(data) + + if !config.GetDisableErrorChan() { + botstats.RecordMessageReceived() + } + if config.GetEnableChangeWord() { data.Content = acnode.CheckWordIN(data.Content) if data.Author.Username != "" { data.Author.Username = acnode.CheckWordIN(data.Author.Username) } } - go p.ProcessC2CMessage(data) + return nil } } diff --git a/wsclient/ws.go b/wsclient/ws.go index e0753463..5e1de336 100644 --- a/wsclient/ws.go +++ b/wsclient/ws.go @@ -4,9 +4,9 @@ import ( "context" "encoding/json" "fmt" + "log" "net/http" "net/url" - "sync" "time" "github.com/gorilla/websocket" @@ -24,36 +24,63 @@ type WebSocketClient struct { botID uint64 urlStr string cancel context.CancelFunc - mutex sync.Mutex // 用于同步写入和重连操作的互斥锁 isReconnecting bool sendFailures []map[string]interface{} // 存储失败的消息 + writeCh chan writeRequest // 写请求通道 + closeCh chan struct{} // 用于关闭的通道 +} +type writeRequest struct { + messageType int + data []byte } -// 发送json信息给onebot应用端 +// SendMessage 发送消息,将写请求发送到写 Goroutine func (client *WebSocketClient) SendMessage(message map[string]interface{}) error { - client.mutex.Lock() // 在写操作之前锁定 - defer client.mutex.Unlock() // 确保在函数返回时解锁 - + // 序列化消息 msgBytes, err := json.Marshal(message) if err != nil { - mylog.Println("Error marshalling message:", err) + log.Println("Error marshalling message:", err) return err } - err = client.conn.WriteMessage(websocket.TextMessage, msgBytes) - if err != nil { - mylog.Println("Error sending message:", err) - // 发送失败,将消息添加到切片 - if !config.GetDisableErrorChan() { - client.sendFailures = append(client.sendFailures, message) - } - return err + // 创建错误通道,用于接收写操作的结果 + client.writeCh <- writeRequest{ + messageType: websocket.TextMessage, + data: msgBytes, } + // 等待写操作完成,并返回结果 return nil } +// Close 关闭 WebSocketClient,停止写 Goroutine +func (client *WebSocketClient) Close() error { + close(client.closeCh) + close(client.writeCh) + client.conn.Close() + return nil +} + +// startWriter 专用的写 Goroutine +func (client *WebSocketClient) startWriter() { + for { + select { + case req := <-client.writeCh: + // 执行写操作 + err := client.conn.WriteMessage(req.messageType, req.data) + if err != nil { + log.Println("Error sending message:", err) + if !config.GetDisableErrorChan() { + client.sendFailures = append(client.sendFailures, map[string]interface{}{"message": req.data}) // 记录失败的消息 + } + } + case <-client.closeCh: + return + } + } +} + // 处理onebotv11应用端发来的信息 func (client *WebSocketClient) handleIncomingMessages(cancel context.CancelFunc) { for { @@ -314,7 +341,10 @@ func NewWebSocketClient(urlStr string, botID uint64, api openapi.OpenAPI, apiv2 botID: botID, urlStr: urlStr, sendFailures: []map[string]interface{}{}, + writeCh: make(chan writeRequest, 5000), // 缓冲区大小可以根据需求调整 + closeCh: make(chan struct{}), } + go client.startWriter() // 启动写 Goroutine // Sending initial message similar to your setupB function message := map[string]interface{}{ @@ -344,12 +374,6 @@ func NewWebSocketClient(urlStr string, botID uint64, api openapi.OpenAPI, apiv2 return client, nil } -func (ws *WebSocketClient) Close() error { - ws.mutex.Lock() - defer ws.mutex.Unlock() - return ws.conn.Close() -} - // getParamsFromURI 解析给定URI中的查询参数,并返回一个映射(map) func getParamsFromURI(uriStr string) map[string]string { params := make(map[string]string)