diff --git a/ocpp1.6/central_system.go b/ocpp1.6/central_system.go index 2b8bec43..a246e9cc 100644 --- a/ocpp1.6/central_system.go +++ b/ocpp1.6/central_system.go @@ -2,6 +2,7 @@ package ocpp16 import ( "fmt" + "sync" "github.com/lorenzodonini/ocpp-go/internal/callbackqueue" "github.com/lorenzodonini/ocpp-go/ocpp" @@ -26,6 +27,7 @@ type centralSystem struct { smartChargingHandler smartcharging.CentralSystemHandler callbackQueue callbackqueue.CallbackQueue errC chan error + errCLock sync.Mutex } func newCentralSystem(server *ocppj.Server) centralSystem { @@ -40,7 +42,18 @@ func newCentralSystem(server *ocppj.Server) centralSystem { func (cs *centralSystem) error(err error) { if cs.errC != nil { - cs.errC <- err + // It can happen that the error channel is getting closed + // when the central system is shutting down. + // If this happens right before this call here, we have a closed channel. + // To prevent a panic, we check if it is closed before sending the error. + cs.errCLock.Lock() + defer cs.errCLock.Unlock() + select { + case <-cs.errC: + // channel is closed, don't send the error + default: + cs.errC <- err + } } } @@ -400,6 +413,14 @@ func (cs *centralSystem) SendRequestAsync(clientId string, request ocpp.Request, func (cs *centralSystem) Start(listenPort int, listenPath string) { cs.server.Start(listenPort, listenPath) + + // Make sure to lock the access to the error channel to prevent races/panics + // When an error happens right after the server is shutting down. + if cs.errC != nil { + cs.errCLock.Lock() + defer cs.errCLock.Unlock() + close(cs.errC) + } } func (cs *centralSystem) sendResponse(chargePointId string, confirmation ocpp.Response, err error, requestId string) { diff --git a/ocppj/dispatcher.go b/ocppj/dispatcher.go index 16891156..ba57c16a 100644 --- a/ocppj/dispatcher.go +++ b/ocppj/dispatcher.go @@ -341,6 +341,11 @@ type ServerDispatcher interface { DeleteClient(clientID string) } +type disconnectInfo struct { + clientID string + queue RequestQueue +} + // DefaultServerDispatcher is a default implementation of the ServerDispatcher interface. // // The dispatcher implements the ClientState as well for simplicity. @@ -354,6 +359,7 @@ type DefaultServerDispatcher struct { timerC chan string running bool stoppedC chan struct{} + onDisconnect chan disconnectInfo onRequestCancel CanceledRequestHandler network ws.WsServer mutex sync.RWMutex @@ -379,6 +385,7 @@ func NewDefaultServerDispatcher(queueMap ServerQueueMap) *DefaultServerDispatche requestChannel: nil, readyForDispatch: make(chan string, 1), timeout: defaultMessageTimeout, + onDisconnect: make(chan disconnectInfo, 1), } d.pendingRequestState = NewServerState(&d.mutex) return d @@ -416,9 +423,15 @@ func (d *DefaultServerDispatcher) CreateClient(clientID string) { } func (d *DefaultServerDispatcher) DeleteClient(clientID string) { + q, _ := d.queueMap.Get(clientID) + + // Remove from queue and signal to messagePump that this client does not exist anymore d.queueMap.Remove(clientID) - if d.IsRunning() { - d.requestChannel <- clientID + if d.IsRunning() && q != nil { + d.onDisconnect <- disconnectInfo{ + clientID: clientID, + queue: q, + } } } @@ -466,6 +479,43 @@ func (d *DefaultServerDispatcher) messagePump() { d.queueMap.Init() log.Info("stopped processing requests") return + case disconnectInfo := <-d.onDisconnect: + q := disconnectInfo.queue + clientID := disconnectInfo.clientID + + log.Infof("client %s disconnected.", clientID) + + if !q.IsEmpty() { + // Clear the queue + for { + if q.IsEmpty() { + break + } + // Removing request and triggering cancel callback + bundle, _ := q.Pop().(RequestBundle) + callID := bundle.Call.GetUniqueId() + + d.pendingRequestState.DeletePendingRequest(clientID, callID) + + if d.onRequestCancel != nil { + d.onRequestCancel(clientID, bundle.Call.UniqueId, bundle.Call.Payload, + ocpp.NewError(GenericError, "clear pending request due to a disconnect", bundle.Call.UniqueId)) + } + } + } + + // Deleting and canceling the context + clientCtx = clientContextMap[clientID] + delete(clientContextMap, clientID) + if clientCtx.ctx != nil { + clientCtx.cancel() + } + + // Be ready for the next request again + clientQueue = nil + rdy = true + + continue case clientID = <-d.requestChannel: // Check whether there is a request queue for the specified client clientQueue, ok = d.queueMap.Get(clientID) @@ -548,6 +598,10 @@ func (d *DefaultServerDispatcher) dispatchNextRequest(clientID string) (clientCt return } el := q.Peek() + if el == nil { + log.Errorf("queue for %s is empty, won't dispatch.", clientID) + return + } bundle, _ := el.(RequestBundle) jsonMessage := bundle.Data callID := bundle.Call.GetUniqueId() diff --git a/ws/websocket.go b/ws/websocket.go index 3dc9f8a7..4ac370d7 100644 --- a/ws/websocket.go +++ b/ws/websocket.go @@ -331,7 +331,12 @@ func (server *Server) SetCheckOriginHandler(handler func(r *http.Request) bool) func (server *Server) error(err error) { log.Error(err) if server.errC != nil { - server.errC <- err + select { + case <-server.errC: + log.Debug("servers errC is closed, so we can't send the error!") + default: + server.errC <- err + } } }