From 1dd71b1ebf4c6f514e71283a46f57bd7e0c9ad2b Mon Sep 17 00:00:00 2001 From: pnguyen215 Date: Fri, 17 Nov 2023 23:15:03 +0700 Subject: [PATCH] :recycle: refactor: updated new function register topic on endpoint websocket #3 --- wsconn/wsconn.go | 6 ++++++ wsconn/wsconn_model.go | 13 +++++++------ wsconn/wsconn_service.go | 29 ++++++++++++++++++++++++++++- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/wsconn/wsconn.go b/wsconn/wsconn.go index cf947c9..1ab7b93 100644 --- a/wsconn/wsconn.go +++ b/wsconn/wsconn.go @@ -28,6 +28,7 @@ func NewWebsocket() *Websocket { ws.SetConfig(*conf) ws.SetUpgrader(wsUpgrader) ws.SetAllowCloseConn(false) + ws.SetRegisteredTopics(make(map[string]bool)) return ws } @@ -56,6 +57,11 @@ func (ws *Websocket) SetAllowCloseConn(value bool) *Websocket { return ws } +func (ws *Websocket) SetRegisteredTopics(value map[string]bool) *Websocket { + ws.RegisteredTopics = value + return ws +} + func (ws *Websocket) Json() string { return utils.ToJson(ws) } diff --git a/wsconn/wsconn_model.go b/wsconn/wsconn_model.go index 6b3694a..7398466 100644 --- a/wsconn/wsconn_model.go +++ b/wsconn/wsconn_model.go @@ -8,10 +8,11 @@ import ( ) type Websocket struct { - Config wsconnx.WsConnOptionConfig `json:"conf"` - AllowCloseConn bool `json:"allow_close_conn"` - Upgrader websocket.Upgrader `json:"-"` - Broadcast map[string]chan wsconnx.WsConnMessagePayload `json:"-"` - Subscribers map[*websocket.Conn]wsconnx.WsConnSubscription `json:"-"` - Mutex sync.Mutex `json:"-"` + Config wsconnx.WsConnOptionConfig `json:"conf"` + AllowCloseConn bool `json:"allow_close_conn"` + RegisteredTopics map[string]bool `json:"registered_topics"` + Upgrader websocket.Upgrader `json:"-"` + Broadcast map[string]chan wsconnx.WsConnMessagePayload `json:"-"` + Subscribers map[*websocket.Conn]wsconnx.WsConnSubscription `json:"-"` + Mutex sync.Mutex `json:"-"` } diff --git a/wsconn/wsconn_service.go b/wsconn/wsconn_service.go index 5633dc5..9bf46ff 100644 --- a/wsconn/wsconn_service.go +++ b/wsconn/wsconn_service.go @@ -1,10 +1,13 @@ package wsconn import ( + "fmt" + "net/http" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + "github.com/sivaosorg/govm/entity" "github.com/sivaosorg/govm/wsconnx" ) @@ -15,10 +18,11 @@ type WebsocketService interface { AddSubscriber(conn *websocket.Conn, subscription wsconnx.WsConnSubscription) SubscribeMessage(c *gin.Context) BroadcastMessage(message wsconnx.WsConnMessagePayload) + RegisterTopic(c *gin.Context) } type websocketServiceImpl struct { - wsConf *Websocket `json:"-"` + wsConf *Websocket } func NewWebsocketService(wsConf *Websocket) WebsocketService { @@ -114,3 +118,26 @@ func (ws *websocketServiceImpl) BroadcastMessage(message wsconnx.WsConnMessagePa channel <- message } } + +func (ws *websocketServiceImpl) RegisterTopic(c *gin.Context) { + ws.wsConf.Mutex.Lock() + defer ws.wsConf.Mutex.Unlock() + response := entity.NewResponseEntity() + var subscription wsconnx.WsConnSubscription + if err := c.ShouldBindJSON(&subscription); err != nil { + response.SetStatusCode(http.StatusBadRequest).SetError(err).SetMessage(err.Error()) + c.JSON(response.StatusCode, response) + return + } + if _, ok := ws.wsConf.RegisteredTopics[subscription.Topic]; ok { + response.SetStatusCode(http.StatusOK).SetMessage(fmt.Sprintf("Topic %s already registered", subscription.Topic)).SetData(subscription) + c.JSON(response.StatusCode, response) + return + } + ws.wsConf.RegisteredTopics[subscription.Topic] = true + ws.wsConf.Broadcast[subscription.Topic] = make(chan wsconnx.WsConnMessagePayload) + go ws.Run(subscription.Topic) + response.SetStatusCode(http.StatusOK).SetMessage(fmt.Sprintf("Topic %s registered successfully", subscription.Topic)).SetData(subscription) + c.JSON(response.StatusCode, response) + return +}