Skip to content

Commit

Permalink
Beta486 (#491)
Browse files Browse the repository at this point in the history
* beta447

* beta448

* beta449

* beta450

* beta451

* beta452

* beta453

* beta454

* beta455

* btea455

* beta456

* beta457

* beta458

* beta460

* beta460

* beta461

* beta462

* beta463

* beta464

* beta465

* beta467

* beta468

* beta469

* beta470

* beta471

* beta472

* beta473

* beta473

* beta475

* beta476

* beta478

* beta479

* beta479

* beta480

* beta481

* beta482

* beta483

* beta484

* beta485

* beta486

* beta486
  • Loading branch information
Hoshinonyaruko authored Aug 20, 2024
1 parent f3cd1b8 commit fc935b8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 25 deletions.
Empty file added go
Empty file.
18 changes: 14 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,29 +747,39 @@ func ThreadEventHandler() event.ThreadEventHandler {
// GroupATMessageEventHandler 实现处理 群at 消息的回调
func GroupATMessageEventHandler() event.GroupATMessageEventHandler {
return func(event *dto.WSPayload, data *dto.WSGroupATMessageData) error {
botstats.RecordMessageReceived()
go p.ProcessGroupMessage(data)

if !config.GetDisableErrorChan() {
botstats.RecordMessageReceived()
}

if config.GetEnableChangeWord() {
data.Content = acnode.CheckWordIN(data.Content)
if data.Author.Username != "" {
data.Author.Username = acnode.CheckWordIN(data.Author.Username)
}
}
go p.ProcessGroupMessage(data)

return nil
}
}

// C2CMessageEventHandler 实现处理 群私聊 消息的回调
func C2CMessageEventHandler() event.C2CMessageEventHandler {
return func(event *dto.WSPayload, data *dto.WSC2CMessageData) error {
botstats.RecordMessageReceived()
go p.ProcessC2CMessage(data)

if !config.GetDisableErrorChan() {
botstats.RecordMessageReceived()
}

if config.GetEnableChangeWord() {
data.Content = acnode.CheckWordIN(data.Content)
if data.Author.Username != "" {
data.Author.Username = acnode.CheckWordIN(data.Author.Username)
}
}
go p.ProcessC2CMessage(data)

return nil
}
}
Expand Down
66 changes: 45 additions & 21 deletions wsclient/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"sync"
"time"

"github.com/gorilla/websocket"
Expand All @@ -24,36 +24,63 @@ type WebSocketClient struct {
botID uint64
urlStr string
cancel context.CancelFunc
mutex sync.Mutex // 用于同步写入和重连操作的互斥锁
isReconnecting bool
sendFailures []map[string]interface{} // 存储失败的消息
writeCh chan writeRequest // 写请求通道
closeCh chan struct{} // 用于关闭的通道
}

type writeRequest struct {
messageType int
data []byte
}

// 发送json信息给onebot应用端
// SendMessage 发送消息,将写请求发送到写 Goroutine
func (client *WebSocketClient) SendMessage(message map[string]interface{}) error {
client.mutex.Lock() // 在写操作之前锁定
defer client.mutex.Unlock() // 确保在函数返回时解锁

// 序列化消息
msgBytes, err := json.Marshal(message)
if err != nil {
mylog.Println("Error marshalling message:", err)
log.Println("Error marshalling message:", err)
return err
}

err = client.conn.WriteMessage(websocket.TextMessage, msgBytes)
if err != nil {
mylog.Println("Error sending message:", err)
// 发送失败,将消息添加到切片
if !config.GetDisableErrorChan() {
client.sendFailures = append(client.sendFailures, message)
}
return err
// 创建错误通道,用于接收写操作的结果
client.writeCh <- writeRequest{
messageType: websocket.TextMessage,
data: msgBytes,
}

// 等待写操作完成,并返回结果
return nil
}

// Close 关闭 WebSocketClient,停止写 Goroutine
func (client *WebSocketClient) Close() error {
close(client.closeCh)
close(client.writeCh)
client.conn.Close()
return nil
}

// startWriter 专用的写 Goroutine
func (client *WebSocketClient) startWriter() {
for {
select {
case req := <-client.writeCh:
// 执行写操作
err := client.conn.WriteMessage(req.messageType, req.data)
if err != nil {
log.Println("Error sending message:", err)
if !config.GetDisableErrorChan() {
client.sendFailures = append(client.sendFailures, map[string]interface{}{"message": req.data}) // 记录失败的消息
}
}
case <-client.closeCh:
return
}
}
}

// 处理onebotv11应用端发来的信息
func (client *WebSocketClient) handleIncomingMessages(cancel context.CancelFunc) {
for {
Expand Down Expand Up @@ -314,7 +341,10 @@ func NewWebSocketClient(urlStr string, botID uint64, api openapi.OpenAPI, apiv2
botID: botID,
urlStr: urlStr,
sendFailures: []map[string]interface{}{},
writeCh: make(chan writeRequest, 5000), // 缓冲区大小可以根据需求调整
closeCh: make(chan struct{}),
}
go client.startWriter() // 启动写 Goroutine

// Sending initial message similar to your setupB function
message := map[string]interface{}{
Expand Down Expand Up @@ -344,12 +374,6 @@ func NewWebSocketClient(urlStr string, botID uint64, api openapi.OpenAPI, apiv2
return client, nil
}

func (ws *WebSocketClient) Close() error {
ws.mutex.Lock()
defer ws.mutex.Unlock()
return ws.conn.Close()
}

// getParamsFromURI 解析给定URI中的查询参数,并返回一个映射(map)
func getParamsFromURI(uriStr string) map[string]string {
params := make(map[string]string)
Expand Down

0 comments on commit fc935b8

Please sign in to comment.