Skip to content

Commit

Permalink
Ocpp: cache and re-use initial status (4th attempt) (#18597)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfuchs1984 authored Feb 7, 2025
1 parent 3516e7b commit 39e7d6d
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 69 deletions.
4 changes: 4 additions & 0 deletions charger/ocpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func NewOCPP(id string, connector int, idTag string,
) (*OCPP, error) {
log := util.NewLogger(fmt.Sprintf("%s-%d", lo.CoalesceOrEmpty(id, "ocpp"), connector))

log.DEBUG.Printf("!! registering %s:%d", id, connector)

cp, err := ocpp.Instance().RegisterChargepoint(id,
func() *ocpp.CP {
return ocpp.NewChargePoint(log, id)
Expand All @@ -164,6 +166,8 @@ func NewOCPP(id string, connector int, idTag string,
return nil, err
}

log.DEBUG.Printf("!! connected %s:%d", id, connector)

if cp.NumberOfConnectors > 0 && connector > cp.NumberOfConnectors {
return nil, fmt.Errorf("invalid connector: %d", connector)
}
Expand Down
27 changes: 23 additions & 4 deletions charger/ocpp/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,28 @@ func NewConnector(log *util.Logger, id int, cp *CP, idTag string) (*Connector, e
remoteIdTag: idTag,
}

err := cp.registerConnector(id, conn)
if err := cp.registerConnector(id, conn); err != nil {
return nil, err
}

// trigger status for all connectors

var ok bool
// apply cached status if available
instance.WithConnectorStatus(cp.ID(), id, func(status *core.StatusNotificationRequest) {
if _, err := cp.OnStatusNotification(status); err == nil {
ok = true
}
})

// only trigger if we don't already have a status
if !ok && cp.HasRemoteTriggerFeature {
if err := cp.TriggerMessageRequest(0, core.StatusNotificationFeatureName); err != nil {
cp.log.WARN.Printf("failed triggering StatusNotification: %v", err)
}
}

return conn, err
return conn, nil
}

func (conn *Connector) TestClock(clock clock.Clock) {
Expand Down Expand Up @@ -88,7 +107,7 @@ func (conn *Connector) WatchDog(timeout time.Duration) {
update := conn.clock.Since(conn.meterUpdated) > timeout
conn.mu.Unlock()

if update {
if update && conn.cp.HasRemoteTriggerFeature {
conn.TriggerMessageRequest(core.MeterValuesFeatureName)
}
}
Expand All @@ -103,7 +122,7 @@ func (conn *Connector) Initialized() error {
case <-conn.statusC:
return nil

case <-trigger: // try to trigger StatusNotification again as last resort
case <-trigger: // try to trigger StatusNotification again as last resort even when the charger does not report RemoteTrigger support
conn.TriggerMessageRequest(core.StatusNotificationFeatureName)

case <-timeout:
Expand Down
2 changes: 2 additions & 0 deletions charger/ocpp/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type connTestSuite struct {
}

func (suite *connTestSuite) SetupTest() {
// setup instance
Instance()
suite.cp = NewChargePoint(util.NewLogger("foo"), "abc")
suite.conn, _ = NewConnector(util.NewLogger("foo"), 1, suite.cp, "")

Expand Down
7 changes: 0 additions & 7 deletions charger/ocpp/cp_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,6 @@ func (cp *CP) Setup(meterValues string, meterInterval time.Duration) error {
cp.log.DEBUG.Printf("failed configuring %s: %v", KeyWebSocketPingInterval, err)
}

// trigger status for all connectors
if cp.HasRemoteTriggerFeature {
if err := cp.TriggerMessageRequest(0, core.StatusNotificationFeatureName); err != nil {
cp.log.WARN.Printf("failed triggering StatusNotification: %v", err)
}
}

return nil
}

Expand Down
124 changes: 68 additions & 56 deletions charger/ocpp/cs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,26 @@ import (

"github.com/evcc-io/evcc/util"
ocpp16 "github.com/lorenzodonini/ocpp-go/ocpp1.6"
"github.com/lorenzodonini/ocpp-go/ocpp1.6/core"
)

type CS struct {
mu sync.Mutex
log *util.Logger
ocpp16.CentralSystem
cps map[string]*CP
init map[string]*sync.Mutex
txnId atomic.Int64
type registration struct {
mu sync.RWMutex
setup sync.RWMutex // serialises chargepoint setup
cp *CP // guarded by setup and CS mutexes
status map[int]*core.StatusNotificationRequest // guarded by mu mutex
}

// Register registers a charge point with the central system.
// The charge point identified by id may already be connected in which case initial connection is triggered.
func (cs *CS) register(id string, new *CP) error {
cs.mu.Lock()
defer cs.mu.Unlock()

cp, ok := cs.cps[id]

// case 1: charge point neither registered nor physically connected
if !ok {
cs.cps[id] = new
return nil
}

// case 2: duplicate registration of id empty
if id == "" {
return errors.New("cannot have >1 charge point with empty station id")
}

// case 3: charge point not registered but physically already connected
if cp == nil {
cs.cps[id] = new
new.connect(true)
}
func newRegistration() *registration {
return &registration{status: make(map[int]*core.StatusNotificationRequest)}
}

return nil
type CS struct {
ocpp16.CentralSystem
mu sync.Mutex
log *util.Logger
regs map[string]*registration // guarded by mu mutex
txnId atomic.Int64
}

// errorHandler logs error channel
Expand All @@ -58,38 +41,67 @@ func (cs *CS) ChargepointByID(id string) (*CP, error) {
cs.mu.Lock()
defer cs.mu.Unlock()

cp, ok := cs.cps[id]
reg, ok := cs.regs[id]
if !ok {
return nil, fmt.Errorf("unknown charge point: %s", id)
}
if cp == nil {
if reg.cp == nil {
return nil, fmt.Errorf("charge point not configured: %s", id)
}
return cp, nil
return reg.cp, nil
}

func (cs *CS) WithConnectorStatus(id string, connector int, fun func(status *core.StatusNotificationRequest)) {
cs.mu.Lock()
defer cs.mu.Unlock()

if reg, ok := cs.regs[id]; ok {
reg.mu.RLock()
if status, ok := reg.status[connector]; ok {
fun(status)
}
reg.mu.RUnlock()
}
}

// RegisterChargepoint registers a charge point with the central system of returns an already registered charge point
func (cs *CS) RegisterChargepoint(id string, newfun func() *CP, init func(*CP) error) (*CP, error) {
cs.mu.Lock()
cpmu, ok := cs.init[id]
if !ok {
cpmu = new(sync.Mutex)
cs.init[id] = cpmu

// prepare shadow state
reg, registered := cs.regs[id]
if !registered {
reg = newRegistration()
cs.regs[id] = reg
}
cs.mu.Unlock()

// serialise on chargepoint id
cpmu.Lock()
defer cpmu.Unlock()
reg.setup.Lock()
defer reg.setup.Unlock()

cp := reg.cp

cs.mu.Unlock()

// setup already completed?
if cp != nil {
// duplicate registration of id empty
if id == "" {
return nil, errors.New("cannot have >1 charge point with empty station id")
}

// already registered?
if cp, err := cs.ChargepointByID(id); err == nil {
return cp, nil
}

// first time- registration should not error
cp := newfun()
if err := cs.register(id, cp); err != nil {
return nil, err
// first time- create the charge point
cp = newfun()

cs.mu.Lock()
reg.cp = cp
cs.mu.Unlock()

if registered {
cp.connect(true)
}

return cp, init(cp)
Expand All @@ -101,28 +113,28 @@ func (cs *CS) NewChargePoint(chargePoint ocpp16.ChargePointConnection) {
defer cs.mu.Unlock()

// check for configured charge point
cp, ok := cs.cps[chargePoint.ID()]
reg, ok := cs.regs[chargePoint.ID()]
if ok {
cs.log.DEBUG.Printf("charge point connected: %s", chargePoint.ID())

// trigger initial connection if charge point is already setup
if cp != nil {
if cp := reg.cp; cp != nil {
cp.connect(true)
}

return
}

// check for configured anonymous charge point
cp, ok = cs.cps[""]
if ok && cp != nil {
reg, ok = cs.regs[""]
if ok && reg.cp != nil {
cp := reg.cp
cs.log.INFO.Printf("charge point connected, registering: %s", chargePoint.ID())

// update id
cp.RegisterID(chargePoint.ID())

cs.cps[chargePoint.ID()] = cp
delete(cs.cps, "")
cs.regs[chargePoint.ID()] = reg
delete(cs.regs, "")

cp.connect(true)

Expand All @@ -133,7 +145,7 @@ func (cs *CS) NewChargePoint(chargePoint ocpp16.ChargePointConnection) {

// register unknown charge point
// when charge point setup is complete, it will eventually be associated with the connected id
cs.cps[chargePoint.ID()] = nil
cs.regs[chargePoint.ID()] = newRegistration()
}

// ChargePointDisconnected implements ocpp16.ChargePointConnectionHandler
Expand Down
9 changes: 9 additions & 0 deletions charger/ocpp/cs_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (cs *CS) OnMeterValues(id string, request *core.MeterValuesRequest) (*core.
}

func (cs *CS) OnStatusNotification(id string, request *core.StatusNotificationRequest) (*core.StatusNotificationConfirmation, error) {
cs.mu.Lock()
// cache status for future cp connection
if reg, ok := cs.regs[id]; ok && request != nil {
reg.mu.Lock()
reg.status[request.ConnectorId] = request
reg.mu.Unlock()
}
cs.mu.Unlock()

if cp, err := cs.ChargepointByID(id); err == nil {
return cp.OnStatusNotification(request)
}
Expand Down
4 changes: 2 additions & 2 deletions charger/ocpp/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func Instance() *CS {

instance = &CS{
log: log,
cps: make(map[string]*CP),
init: make(map[string]*sync.Mutex),
regs: make(map[string]*registration),
CentralSystem: cs,
}

instance.txnId.Store(time.Now().UTC().Unix())

ocppj.SetLogger(instance)
Expand Down

0 comments on commit 39e7d6d

Please sign in to comment.