Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beta473 #477

Merged
merged 28 commits into from
Aug 1, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
beta471
Hoshinonyaruko committed Aug 1, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit cd21ad9d1be5b32b7ea2408d5d99636a270a287e
7 changes: 5 additions & 2 deletions Processor/ProcessInlineSearch.go
Original file line number Diff line number Diff line change
@@ -360,8 +360,11 @@ func (p *Processors) ProcessInlineSearch(data *dto.WSInteractionData) error {

// Convert OnebotGroupMessage to map and send
privateMsgMap := structToMap(privateMsg)
//上报信息到onebotv11应用端(正反ws)
go p.BroadcastMessageToAll(privateMsgMap, p.Apiv2, data)

if privateMsg.RawMessage != "" {
//上报信息到onebotv11应用端(正反ws)
go p.BroadcastMessageToAll(privateMsgMap, p.Apiv2, data)
}

// 转换appid
AppIDString := strconv.FormatUint(p.Settings.AppID, 10)
109 changes: 108 additions & 1 deletion botgo/websocket/client/client.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
@@ -26,6 +27,18 @@ const DefaultQueueSize = 10000
// 定义全局变量
var global_s int64

// PayloadWithTimestamp 存储带时间戳的 WSPayload
type PayloadWithTimestamp struct {
Payload *dto.WSPayload
Timestamp time.Time
}

var dataMap sync.Map

func init() {
StartCleanupRoutine()
}

// Setup 依赖注册
func Setup() {
websocket.Register(&Client{})
@@ -187,6 +200,33 @@ func (c *Client) Session() *dto.Session {
return c.session
}

// func (c *Client) readMessageToQueue() {
// for {
// _, message, err := c.conn.ReadMessage()
// if err != nil {
// log.Errorf("%s read message failed, %v, message %s", c.session, err, string(message))
// close(c.messageQueue)
// c.closeChan <- err
// return
// }
// payload := &dto.WSPayload{}
// if err := json.Unmarshal(message, payload); err != nil {
// log.Errorf("%s json failed, %v", c.session, err)
// continue
// }
// // 更新 global_s 的值
// atomic.StoreInt64(&global_s, payload.S)

// payload.RawMessage = message
// log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))
// // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
// if c.isHandleBuildIn(payload) {
// continue
// }
// c.messageQueue <- payload
// }
// }

func (c *Client) readMessageToQueue() {
for {
_, message, err := c.conn.ReadMessage()
@@ -201,19 +241,58 @@ func (c *Client) readMessageToQueue() {
log.Errorf("%s json failed, %v", c.session, err)
continue
}
// 更新 global_s 的值
atomic.StoreInt64(&global_s, payload.S)

payload.RawMessage = message
log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))

// 计算数据的哈希值
dataHash := calculateDataHash(payload.Data)

// 检查是否已存在相同的 Data
if existingPayload, ok := getDataFromSyncMap(dataHash); ok {
// 如果已存在相同的 Data,则丢弃当前消息
log.Infof("%s discard duplicate message with DataHash: %v", c.session, existingPayload)
continue
}

// 将新的 payload 存入 sync.Map
storeDataToSyncMap(dataHash, payload)

// 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
if c.isHandleBuildIn(payload) {
continue
}

c.messageQueue <- payload
}
}

func getDataFromSyncMap(dataHash string) (*dto.WSPayload, bool) {
value, ok := dataMap.Load(dataHash)
if !ok {
return nil, false
}
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return nil, false
}
return payloadWithTimestamp.Payload, true
}

func storeDataToSyncMap(dataHash string, payload *dto.WSPayload) {
payloadWithTimestamp := &PayloadWithTimestamp{
Payload: payload,
Timestamp: time.Now(),
}
dataMap.Store(dataHash, payloadWithTimestamp)
}

func calculateDataHash(data interface{}) string {
dataBytes, _ := json.Marshal(data)
return string(dataBytes) // 这里直接转换为字符串,可以使用更复杂的算法
}

// 在全局范围通过atomic访问s值与message_id的映射
func GetGlobalS() int64 {
return atomic.LoadInt64(&global_s)
@@ -297,3 +376,31 @@ func (c *Client) readyHandler(payload *dto.WSPayload) {
event.DefaultHandlers.Ready(payload, readyData)
}
}

const cleanupInterval = 5 * time.Minute // 清理间隔时间

func StartCleanupRoutine() {
go func() {
for {
<-time.After(cleanupInterval)
cleanupDataMap()
}
}()
}

func cleanupDataMap() {
now := time.Now()
dataMap.Range(func(key, value interface{}) bool {
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return true
}

// 检查时间戳,清理超过一定时间的数据
if now.Sub(payloadWithTimestamp.Timestamp) > cleanupInterval {
dataMap.Delete(key)
}

return true
})
}
64 changes: 64 additions & 0 deletions echo/echo.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,70 @@ import (
"github.com/tencent-connect/botgo/dto"
)

func init() {
// 在 init 函数中运行清理逻辑
startCleanupRoutine()
}

func startCleanupRoutine() {
cleanupTicker = time.NewTicker(30 * time.Minute)
go func() {
for {
<-cleanupTicker.C
cleanupGlobalMaps()
}
}()
}

func cleanupGlobalMaps() {
cleanupSyncMap(&globalSyncMapMsgid)
cleanupSyncMap(&globalReverseMapMsgid)
cleanupMessageGroupStack(globalMessageGroupStack)
cleanupEchoMapping(globalEchoMapping)
cleanupInt64ToIntMapping(globalInt64ToIntMapping)
cleanupStringToIntMappingSeq(globalStringToIntMappingSeq)
}

func cleanupSyncMap(m *sync.Map) {
m.Range(func(key, value interface{}) bool {
m.Delete(key)
return true
})
}

func cleanupMessageGroupStack(stack *globalMessageGroup) {
stack.stack = make([]MessageGroupPair, 0)
}

func cleanupEchoMapping(mapping *EchoMapping) {
mapping.msgTypeMapping.Range(func(key, value interface{}) bool {
mapping.msgTypeMapping.Delete(key)
return true
})
mapping.msgIDMapping.Range(func(key, value interface{}) bool {
mapping.msgIDMapping.Delete(key)
return true
})
mapping.eventIDMapping.Range(func(key, value interface{}) bool {
mapping.eventIDMapping.Delete(key)
return true
})
}

func cleanupInt64ToIntMapping(mapping *Int64ToIntMapping) {
mapping.mapping.Range(func(key, value interface{}) bool {
mapping.mapping.Delete(key)
return true
})
}

func cleanupStringToIntMappingSeq(mapping *StringToIntMappingSeq) {
mapping.mapping.Range(func(key, value interface{}) bool {
mapping.mapping.Delete(key)
return true
})
}

type EchoMapping struct {
msgTypeMapping sync.Map
msgIDMapping sync.Map
68 changes: 35 additions & 33 deletions handlers/send_group_msg_raw.go
Original file line number Diff line number Diff line change
@@ -123,47 +123,49 @@ func HandleSendGroupMsgRaw(client callapi.Client, api openapi.OpenAPI, apiv2 ope
var SSM bool

var originalGroupID, originalUserID string
// 检查UserID是否为nil
if message.Params.UserID != nil && config.GetIdmapPro() && message.Params.UserID.(string) != "" && message.Params.UserID.(string) != "0" {
// 如果UserID不是nil且配置为使用Pro版本,则调用RetrieveRowByIDv2Pro
originalGroupID, originalUserID, err = idmap.RetrieveRowByIDv2Pro(message.Params.GroupID.(string), message.Params.UserID.(string))
if err != nil {
mylog.Printf("Error1 retrieving original GroupID: %v", err)
}
mylog.Printf("测试,通过idmaps-pro获取的originalGroupID:%v", originalGroupID)
if originalGroupID == "" {
originalGroupID, err = idmap.RetrieveRowByIDv2(message.Params.GroupID.(string))
if len(message.Params.GroupID.(string)) != 32 {
// 检查UserID是否为nil
if message.Params.UserID != nil && config.GetIdmapPro() && message.Params.UserID.(string) != "" && message.Params.UserID.(string) != "0" {
// 如果UserID不是nil且配置为使用Pro版本,则调用RetrieveRowByIDv2Pro
originalGroupID, originalUserID, err = idmap.RetrieveRowByIDv2Pro(message.Params.GroupID.(string), message.Params.UserID.(string))
if err != nil {
mylog.Printf("Error2 retrieving original GroupID: %v", err)
return "", nil
mylog.Printf("Error1 retrieving original GroupID: %v", err)
}
mylog.Printf("测试,通过idmaps-pro获取的originalGroupID:%v", originalGroupID)
if originalGroupID == "" {
originalGroupID, err = idmap.RetrieveRowByIDv2(message.Params.GroupID.(string))
if err != nil {
mylog.Printf("Error2 retrieving original GroupID: %v", err)
return "", nil
}
mylog.Printf("测试,通过idmaps获取的originalGroupID:%v", originalGroupID)
}
mylog.Printf("测试,通过idmaps获取的originalGroupID:%v", originalGroupID)
}
} else {
// 如果UserID是nil或配置不使用Pro版本,则调用RetrieveRowByIDv2
originalGroupID, err = idmap.RetrieveRowByIDv2(message.Params.GroupID.(string))
if err != nil {
mylog.Printf("Error retrieving original GroupID: %v", err)
}
// 检查 message.Params.UserID 是否为 nil
if message.Params.UserID == nil {
//mylog.Println("UserID is nil")
} else {
// 进行类型断言,确认 UserID 不是 nil
userID, ok := message.Params.UserID.(string)
if !ok {
mylog.Println("UserID is not a string")
// 处理类型断言失败的情况
// 如果UserID是nil或配置不使用Pro版本,则调用RetrieveRowByIDv2
originalGroupID, err = idmap.RetrieveRowByIDv2(message.Params.GroupID.(string))
if err != nil {
mylog.Printf("Error retrieving original GroupID: %v", err)
}
// 检查 message.Params.UserID 是否为 nil
if message.Params.UserID == nil {
//mylog.Println("UserID is nil")
} else {
originalUserID, err = idmap.RetrieveRowByIDv2(userID)
if err != nil {
mylog.Printf("Error retrieving original UserID: %v", err)
// 进行类型断言,确认 UserID 不是 nil
userID, ok := message.Params.UserID.(string)
if !ok {
mylog.Println("UserID is not a string")
// 处理类型断言失败的情况
} else {
originalUserID, err = idmap.RetrieveRowByIDv2(userID)
if err != nil {
mylog.Printf("Error retrieving original UserID: %v", err)
}
}
}
}
message.Params.GroupID = originalGroupID
message.Params.UserID = originalUserID
}
message.Params.GroupID = originalGroupID
message.Params.UserID = originalUserID

// 检查字符串是否仅包含数字
isNumeric := func(s string) bool {