Skip to content

Commit

Permalink
Merge pull request #122 from mattn/use-ctx
Browse files Browse the repository at this point in the history
Use WithTimeout
  • Loading branch information
mattn authored Jul 22, 2024
2 parents 02d23e8 + 01be279 commit 274bdf0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
20 changes: 8 additions & 12 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,6 @@ func (s *Server) handleMessage(ctx context.Context, ws *WebSocket, message []byt
}

func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
store := s.relay.Storage(ctx)

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.Log.Errorf("failed to upgrade websocket: %v", err)
Expand All @@ -355,7 +352,6 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
defer s.clientsMu.Unlock()
s.clients[conn] = struct{}{}
ticker := time.NewTicker(pingPeriod)
stop := make(chan struct{})

ip := conn.RemoteAddr().String()
if realIP := r.Header.Get("X-Forwarded-For"); realIP != "" {
Expand All @@ -374,12 +370,15 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
)
}

ctx, cancel := context.WithCancel(context.Background())

store := s.relay.Storage(ctx)

// reader
go func() {
defer func() {
cancel()
ticker.Stop()
stop <- struct{}{}
close(stop)
s.clientsMu.Lock()
if _, ok := s.clients[conn]; ok {
conn.Close()
Expand All @@ -388,8 +387,6 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
}
s.clientsMu.Unlock()
s.Log.Infof("disconnected from %s", ip)

ctx.Done()
}()

conn.SetReadLimit(maxMessageSize)
Expand Down Expand Up @@ -432,17 +429,16 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
continue
}

go s.handleMessage(context.TODO(), ws, message, store)
go s.handleMessage(ctx, ws, message, store)
}
}()

// writer
go func() {
defer func() {
cancel()
ticker.Stop()
conn.Close()
for range stop {
}
}()

for {
Expand All @@ -454,7 +450,7 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
return
}
s.Log.Infof("pinging for %s", ip)
case <-stop:
case <-ctx.Done():
return
}
}
Expand Down
1 change: 1 addition & 0 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func removeListenerId(ws *WebSocket, id string) {
func removeListener(ws *WebSocket) {
listenersMutex.Lock()
defer listenersMutex.Unlock()
clear(listeners[ws])
delete(listeners, ws)
}

Expand Down

0 comments on commit 274bdf0

Please sign in to comment.