Skip to content

Commit

Permalink
Fix terminator id race condition. Fixes #1685
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Jan 16, 2024
1 parent ce0bcf4 commit be5d421
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 10 deletions.
20 changes: 14 additions & 6 deletions controller/handler_edge_ctrl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (

"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/controller/xt"
"github.com/openziti/ziti/common/logcontext"
"github.com/openziti/foundation/v2/stringz"
"github.com/openziti/identity"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common/logcontext"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/controller/xt"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -367,6 +367,14 @@ func (self *baseSessionRequestContext) verifyTerminator(terminatorId string, bin
return nil
}

func (self *baseSessionRequestContext) verifyTerminatorId(id string) {
if self.err == nil {
if id == "" {
self.err = invalidTerminator("provided terminator id is blank")
}
}
}

func (self *baseSessionRequestContext) updateTerminator(terminator *network.Terminator, request UpdateTerminatorRequest, ctx *change.Context) {
if self.err == nil {
checker := fields.UpdatedFieldsMap{}
Expand Down
1 change: 1 addition & 0 deletions controller/handler_edge_ctrl/create_terminator_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (self *createTerminatorV2Handler) CreateTerminatorV2(ctx *CreateTerminatorV
if !ctx.loadRouter() {
return
}
ctx.verifyTerminatorId(ctx.req.Address)
ctx.loadSession(ctx.req.SessionToken)
ctx.checkSessionType(db.SessionTypeBind)
ctx.checkSessionFingerprints(ctx.req.Fingerprints)
Expand Down
3 changes: 2 additions & 1 deletion router/xgress_edge/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ func (self *edgeTerminator) close(notify bool, reason string) {
if terminatorId := self.terminatorId.Load(); terminatorId != "" {
if self.terminatorId.CompareAndSwap(terminatorId, "") {
logger.Debug("removing terminator on router")
self.edgeClientConn.listener.factory.hostedServices.Delete(terminatorId)

self.state.Store(TerminatorStateDeleting)
self.edgeClientConn.listener.factory.hostedServices.Delete(terminatorId)

logger.Info("removing terminator on controller")
ctrlCh := self.edgeClientConn.listener.factory.ctrls.AnyCtrlChannel()
if ctrlCh == nil {
Expand Down
11 changes: 8 additions & 3 deletions router/xgress_edge/hosted.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,13 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato
WithField("routerId", factory.env.GetRouterId().Token).
WithField("terminatorId", terminator.terminatorId.Load())

terminatorId := terminator.terminatorId.Load()
if terminatorId == "" {
return fmt.Errorf("edge link is closed, stopping terminator creation for terminator %s", terminatorId)
}

request := &edge_ctrl_pb.CreateTerminatorV2Request{
Address: terminator.terminatorId.Load(),
Address: terminatorId,
SessionToken: terminator.token,
Fingerprints: terminator.edgeClientConn.fingerprints.Prints(),
PeerData: terminator.hostData,
Expand All @@ -286,7 +291,7 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato
return err
}

if self.waitForTerminatorCreated(terminator.terminatorId.Load(), 10*time.Second) {
if self.waitForTerminatorCreated(terminatorId, 10*time.Second) {
return nil
}

Expand All @@ -295,7 +300,7 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato
return errors.Errorf("timeout waiting for response to create terminator request for terminator %v", terminator.terminatorId.Load())
}

func (self *hostedServiceRegistry) HandleCreateTerminatorResponse(msg *channel.Message, ctrlCh channel.Channel) {
func (self *hostedServiceRegistry) HandleCreateTerminatorResponse(msg *channel.Message, _ channel.Channel) {
log := pfxlog.Logger().WithField("routerId", self.env.GetRouterId().Token)

response := &edge_ctrl_pb.CreateTerminatorV2Response{}
Expand Down

0 comments on commit be5d421

Please sign in to comment.