Skip to content

Commit

Permalink
beta476
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoshinonyaruko committed Aug 8, 2024
1 parent b54fdee commit a30a697
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 111 deletions.
25 changes: 25 additions & 0 deletions botgo/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,34 @@ package event

import (
"encoding/json"
"sync"
"time"

"github.com/tidwall/gjson" // 由于回包的 d 类型不确定,gjson 用于从回包json中提取 d 并进行针对性的解析

"github.com/tencent-connect/botgo/dto"
)

func init() {
// Start a goroutine for periodic cleaning
go cleanProcessedIDs()
}

func cleanProcessedIDs() {
ticker := time.NewTicker(5 * time.Minute) // Adjust the interval as needed
defer ticker.Stop()

for range ticker.C {
// Clean processedIDs, remove entries which are no longer needed
processedIDs.Range(func(key, value interface{}) bool {
processedIDs.Delete(key)
return true
})
}
}

var processedIDs sync.Map

var eventParseFuncMap = map[dto.OPCode]map[dto.EventType]eventParseFunc{
dto.WSDispatchEvent: {
dto.EventGuildCreate: guildHandler,
Expand Down Expand Up @@ -218,6 +240,9 @@ func groupAtMessageHandler(payload *dto.WSPayload, message []byte) error {
if err := ParseData(message, data); err != nil {
return err
}
if _, loaded := processedIDs.LoadOrStore(data.ID, struct{}{}); loaded {
return nil
}
if DefaultHandlers.GroupATMessage != nil {
return DefaultHandlers.GroupATMessage(payload, data)
}
Expand Down
112 changes: 1 addition & 111 deletions botgo/websocket/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -27,18 +26,6 @@ const DefaultQueueSize = 10000
// 定义全局变量
var global_s int64

// PayloadWithTimestamp 存储带时间戳的 WSPayload
type PayloadWithTimestamp struct {
Payload *dto.WSPayload
Timestamp time.Time
}

var dataMap sync.Map

func init() {
StartCleanupRoutine()
}

// Setup 依赖注册
func Setup() {
websocket.Register(&Client{})
Expand Down Expand Up @@ -200,33 +187,6 @@ func (c *Client) Session() *dto.Session {
return c.session
}

// func (c *Client) readMessageToQueue() {
// for {
// _, message, err := c.conn.ReadMessage()
// if err != nil {
// log.Errorf("%s read message failed, %v, message %s", c.session, err, string(message))
// close(c.messageQueue)
// c.closeChan <- err
// return
// }
// payload := &dto.WSPayload{}
// if err := json.Unmarshal(message, payload); err != nil {
// log.Errorf("%s json failed, %v", c.session, err)
// continue
// }
// // 更新 global_s 的值
// atomic.StoreInt64(&global_s, payload.S)

// payload.RawMessage = message
// log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))
// // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
// if c.isHandleBuildIn(payload) {
// continue
// }
// c.messageQueue <- payload
// }
// }

func (c *Client) readMessageToQueue() {
for {
_, message, err := c.conn.ReadMessage()
Expand All @@ -241,61 +201,19 @@ func (c *Client) readMessageToQueue() {
log.Errorf("%s json failed, %v", c.session, err)
continue
}
// 更新 global_s 的值
atomic.StoreInt64(&global_s, payload.S)

payload.RawMessage = message
log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))

// 不过滤心跳事件
if payload.OPCode != 11 {
// 计算数据的哈希值
dataHash := calculateDataHash(payload.Data)

// 检查是否已存在相同的 Data
if existingPayload, ok := getDataFromSyncMap(dataHash); ok {
// 如果已存在相同的 Data,则丢弃当前消息
log.Infof("%s discard duplicate message with DataHash: %v", c.session, existingPayload)
continue
}

// 将新的 payload 存入 sync.Map
storeDataToSyncMap(dataHash, payload)
}

// 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
if c.isHandleBuildIn(payload) {
continue
}

c.messageQueue <- payload
}
}

func getDataFromSyncMap(dataHash string) (*dto.WSPayload, bool) {
value, ok := dataMap.Load(dataHash)
if !ok {
return nil, false
}
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return nil, false
}
return payloadWithTimestamp.Payload, true
}

func storeDataToSyncMap(dataHash string, payload *dto.WSPayload) {
payloadWithTimestamp := &PayloadWithTimestamp{
Payload: payload,
Timestamp: time.Now(),
}
dataMap.Store(dataHash, payloadWithTimestamp)
}

func calculateDataHash(data interface{}) string {
dataBytes, _ := json.Marshal(data)
return string(dataBytes) // 这里直接转换为字符串,可以使用更复杂的算法
}

// 在全局范围通过atomic访问s值与message_id的映射
func GetGlobalS() int64 {
return atomic.LoadInt64(&global_s)
Expand Down Expand Up @@ -379,31 +297,3 @@ func (c *Client) readyHandler(payload *dto.WSPayload) {
event.DefaultHandlers.Ready(payload, readyData)
}
}

const cleanupInterval = 5 * time.Minute // 清理间隔时间

func StartCleanupRoutine() {
go func() {
for {
<-time.After(cleanupInterval)
cleanupDataMap()
}
}()
}

func cleanupDataMap() {
now := time.Now()
dataMap.Range(func(key, value interface{}) bool {
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return true
}

// 检查时间戳,清理超过一定时间的数据
if now.Sub(payloadWithTimestamp.Timestamp) > cleanupInterval {
dataMap.Delete(key)
}

return true
})
}

0 comments on commit a30a697

Please sign in to comment.