diff --git a/botgo/websocket/client/client.go b/botgo/websocket/client/client.go index f671aa0e..e8fd77d7 100644 --- a/botgo/websocket/client/client.go +++ b/botgo/websocket/client/client.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "os/signal" + "sync" "sync/atomic" "syscall" "time" @@ -26,6 +27,18 @@ const DefaultQueueSize = 10000 // 定义全局变量 var global_s int64 +// PayloadWithTimestamp 存储带时间戳的 WSPayload +type PayloadWithTimestamp struct { + Payload *dto.WSPayload + Timestamp time.Time +} + +var dataMap sync.Map + +func init() { + StartCleanupRoutine() +} + // Setup 依赖注册 func Setup() { websocket.Register(&Client{}) @@ -187,6 +200,33 @@ func (c *Client) Session() *dto.Session { return c.session } +// func (c *Client) readMessageToQueue() { +// for { +// _, message, err := c.conn.ReadMessage() +// if err != nil { +// log.Errorf("%s read message failed, %v, message %s", c.session, err, string(message)) +// close(c.messageQueue) +// c.closeChan <- err +// return +// } +// payload := &dto.WSPayload{} +// if err := json.Unmarshal(message, payload); err != nil { +// log.Errorf("%s json failed, %v", c.session, err) +// continue +// } +// // 更新 global_s 的值 +// atomic.StoreInt64(&global_s, payload.S) + +// payload.RawMessage = message +// log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message)) +// // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务 +// if c.isHandleBuildIn(payload) { +// continue +// } +// c.messageQueue <- payload +// } +// } + func (c *Client) readMessageToQueue() { for { _, message, err := c.conn.ReadMessage() @@ -201,19 +241,61 @@ func (c *Client) readMessageToQueue() { log.Errorf("%s json failed, %v", c.session, err) continue } - // 更新 global_s 的值 atomic.StoreInt64(&global_s, payload.S) payload.RawMessage = message log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message)) + + // 不过滤心跳事件 + if payload.OPCode != 11 { + // 计算数据的哈希值 + dataHash := calculateDataHash(payload.Data) + + // 检查是否已存在相同的 Data + if existingPayload, ok := getDataFromSyncMap(dataHash); ok { + // 如果已存在相同的 Data,则丢弃当前消息 + log.Infof("%s discard duplicate message with DataHash: %v", c.session, existingPayload) + continue + } + + // 将新的 payload 存入 sync.Map + storeDataToSyncMap(dataHash, payload) + } + // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务 if c.isHandleBuildIn(payload) { continue } + c.messageQueue <- payload } } +func getDataFromSyncMap(dataHash string) (*dto.WSPayload, bool) { + value, ok := dataMap.Load(dataHash) + if !ok { + return nil, false + } + payloadWithTimestamp, ok := value.(*PayloadWithTimestamp) + if !ok { + return nil, false + } + return payloadWithTimestamp.Payload, true +} + +func storeDataToSyncMap(dataHash string, payload *dto.WSPayload) { + payloadWithTimestamp := &PayloadWithTimestamp{ + Payload: payload, + Timestamp: time.Now(), + } + dataMap.Store(dataHash, payloadWithTimestamp) +} + +func calculateDataHash(data interface{}) string { + dataBytes, _ := json.Marshal(data) + return string(dataBytes) // 这里直接转换为字符串,可以使用更复杂的算法 +} + // 在全局范围通过atomic访问s值与message_id的映射 func GetGlobalS() int64 { return atomic.LoadInt64(&global_s) @@ -301,3 +383,31 @@ func (c *Client) readyHandler(payload *dto.WSPayload) { event.DefaultHandlers.Ready(payload, readyData) } } + +const cleanupInterval = 5 * time.Minute // 清理间隔时间 + +func StartCleanupRoutine() { + go func() { + for { + <-time.After(cleanupInterval) + cleanupDataMap() + } + }() +} + +func cleanupDataMap() { + now := time.Now() + dataMap.Range(func(key, value interface{}) bool { + payloadWithTimestamp, ok := value.(*PayloadWithTimestamp) + if !ok { + return true + } + + // 检查时间戳,清理超过一定时间的数据 + if now.Sub(payloadWithTimestamp.Timestamp) > cleanupInterval { + dataMap.Delete(key) + } + + return true + }) +} diff --git a/go b/go new file mode 100644 index 00000000..e69de29b