Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Several Race Conditions #16

Open
wants to merge 4 commits into
base: gridx_extensions
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion ocpp1.6/central_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ocpp16

import (
"fmt"
"sync"

"github.com/lorenzodonini/ocpp-go/internal/callbackqueue"
"github.com/lorenzodonini/ocpp-go/ocpp"
Expand All @@ -26,6 +27,7 @@ type centralSystem struct {
smartChargingHandler smartcharging.CentralSystemHandler
callbackQueue callbackqueue.CallbackQueue
errC chan error
errCLock sync.Mutex
}

func newCentralSystem(server *ocppj.Server) centralSystem {
Expand All @@ -40,7 +42,18 @@ func newCentralSystem(server *ocppj.Server) centralSystem {

func (cs *centralSystem) error(err error) {
if cs.errC != nil {
cs.errC <- err
// It can happen that the error channel is getting closed
// when the central system is shutting down.
// If this happens right before this call here, we have a closed channel.
// To prevent a panic, we check if it is closed before sending the error.
cs.errCLock.Lock()
defer cs.errCLock.Unlock()
select {
case <-cs.errC:
// channel is closed, don't send the error
default:
cs.errC <- err
Copy link
Member

@guelfey guelfey Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally it's problematic if you have a Goroutine closing a channel that another Goroutine can write to. Not too familiar with the rest of the code anymore, but is there any reason we need to close errC at all? It's mostly used for logging, right? Then I don't see a reason why we can't just keep it open. Writers could just do a select and throw the error away if the reader Goroutine already stopped.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea 🤔 I'm also not familiar with it, I just fixed the issues :D
how would the writer know if the readers goroutine has stopped?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write to the channel would block, but you can just wrap it in a select with a default case to prevent it from causing an issue

}
}
}

Expand Down Expand Up @@ -400,6 +413,14 @@ func (cs *centralSystem) SendRequestAsync(clientId string, request ocpp.Request,

func (cs *centralSystem) Start(listenPort int, listenPath string) {
cs.server.Start(listenPort, listenPath)

// Make sure to lock the access to the error channel to prevent races/panics
// When an error happens right after the server is shutting down.
if cs.errC != nil {
cs.errCLock.Lock()
defer cs.errCLock.Unlock()
close(cs.errC)
}
}

func (cs *centralSystem) sendResponse(chargePointId string, confirmation ocpp.Response, err error, requestId string) {
Expand Down
58 changes: 56 additions & 2 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ type ServerDispatcher interface {
DeleteClient(clientID string)
}

type disconnectInfo struct {
clientID string
queue RequestQueue
}

// DefaultServerDispatcher is a default implementation of the ServerDispatcher interface.
//
// The dispatcher implements the ClientState as well for simplicity.
Expand All @@ -354,6 +359,7 @@ type DefaultServerDispatcher struct {
timerC chan string
running bool
stoppedC chan struct{}
onDisconnect chan disconnectInfo
onRequestCancel CanceledRequestHandler
network ws.WsServer
mutex sync.RWMutex
Expand All @@ -379,6 +385,7 @@ func NewDefaultServerDispatcher(queueMap ServerQueueMap) *DefaultServerDispatche
requestChannel: nil,
readyForDispatch: make(chan string, 1),
timeout: defaultMessageTimeout,
onDisconnect: make(chan disconnectInfo, 1),
}
d.pendingRequestState = NewServerState(&d.mutex)
return d
Expand Down Expand Up @@ -416,9 +423,15 @@ func (d *DefaultServerDispatcher) CreateClient(clientID string) {
}

func (d *DefaultServerDispatcher) DeleteClient(clientID string) {
q, _ := d.queueMap.Get(clientID)

// Remove from queue and signal to messagePump that this client does not exist anymore
d.queueMap.Remove(clientID)
if d.IsRunning() {
d.requestChannel <- clientID
if d.IsRunning() && q != nil {
d.onDisconnect <- disconnectInfo{
clientID: clientID,
queue: q,
}
}
}

Expand Down Expand Up @@ -466,6 +479,43 @@ func (d *DefaultServerDispatcher) messagePump() {
d.queueMap.Init()
log.Info("stopped processing requests")
return
case disconnectInfo := <-d.onDisconnect:
q := disconnectInfo.queue
clientID := disconnectInfo.clientID

log.Infof("client %s disconnected.", clientID)

if !q.IsEmpty() {
// Clear the queue
for {
if q.IsEmpty() {
break
}
// Removing request and triggering cancel callback
bundle, _ := q.Pop().(RequestBundle)
callID := bundle.Call.GetUniqueId()

d.pendingRequestState.DeletePendingRequest(clientID, callID)

if d.onRequestCancel != nil {
d.onRequestCancel(clientID, bundle.Call.UniqueId, bundle.Call.Payload,
ocpp.NewError(GenericError, "clear pending request due to a disconnect", bundle.Call.UniqueId))
}
}
}

// Deleting and canceling the context
clientCtx = clientContextMap[clientID]
delete(clientContextMap, clientID)
if clientCtx.ctx != nil {
clientCtx.cancel()
}

// Be ready for the next request again
clientQueue = nil
rdy = true

continue
case clientID = <-d.requestChannel:
// Check whether there is a request queue for the specified client
clientQueue, ok = d.queueMap.Get(clientID)
Expand Down Expand Up @@ -548,6 +598,10 @@ func (d *DefaultServerDispatcher) dispatchNextRequest(clientID string) (clientCt
return
}
el := q.Peek()
if el == nil {
log.Errorf("queue for %s is empty, won't dispatch.", clientID)
return
}
bundle, _ := el.(RequestBundle)
jsonMessage := bundle.Data
callID := bundle.Call.GetUniqueId()
Expand Down
7 changes: 6 additions & 1 deletion ws/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ func (server *Server) SetCheckOriginHandler(handler func(r *http.Request) bool)
func (server *Server) error(err error) {
log.Error(err)
if server.errC != nil {
server.errC <- err
select {
case <-server.errC:
log.Debug("servers errC is closed, so we can't send the error!")
default:
server.errC <- err
}
}
}

Expand Down