Skip to content

Commit

Permalink
Additional fixes and tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Jan 18, 2024
1 parent 7fe6c56 commit 963d88f
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 26 deletions.
1 change: 1 addition & 0 deletions router/link/link_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (self *updateLinkStatusForLink) Handle(registry *linkRegistryImpl) {
state.connectedCount++
state.retryDelay = time.Duration(0)
state.ctrlsNotified = false
registry.triggerNotify()
}

if state.status == StatusLinkFailed {
Expand Down
11 changes: 11 additions & 0 deletions router/link/link_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewLinkRegistry(routerEnv Env) xlink.Registry {
env: routerEnv,
destinations: map[string]*linkDest{},
linkStateQueue: &linkStateHeap{},
triggerNotifyC: make(chan struct{}, 1),
}

go result.run()
Expand All @@ -71,6 +72,7 @@ type linkRegistryImpl struct {
destinations map[string]*linkDest
linkStateQueue *linkStateHeap
events chan event
triggerNotifyC chan struct{}
notifyInProgress atomic.Bool
}

Expand Down Expand Up @@ -365,6 +367,8 @@ func (self *linkRegistryImpl) run() {
select {
case evt := <-self.events:
evt.Handle(self)
case <-self.triggerNotifyC:
self.notifyControllersOfLinks()
case <-queueCheckTicker.C:
self.evaluateLinkStateQueue()
self.notifyControllersOfLinks()
Expand All @@ -376,6 +380,13 @@ func (self *linkRegistryImpl) run() {
}
}

func (self *linkRegistryImpl) triggerNotify() {
select {
case self.triggerNotifyC <- struct{}{}:
default:
}
}

func (self *linkRegistryImpl) evaluateLinkStateQueue() {
now := time.Now()
for len(*self.linkStateQueue) > 0 {
Expand Down
10 changes: 9 additions & 1 deletion tests/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"github.com/openziti/channel/v2"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/transport/v2"
"github.com/openziti/ziti/common/capabilities"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/controller"
"math/big"
)

func (ctx *FabricTestContext) NewControlChannelListener() channel.UnderlayListener {
Expand All @@ -14,8 +17,13 @@ func (ctx *FabricTestContext) NewControlChannelListener() channel.UnderlayListen

versionHeader, err := versions.StdVersionEncDec.Encode(versions.NewDefaultVersionProvider().AsVersionInfo())
ctx.Req.NoError(err)

capabilityMask := &big.Int{}
capabilityMask.SetBit(capabilityMask, capabilities.ControllerCreateTerminatorV2, 1)
capabilityMask.SetBit(capabilityMask, capabilities.ControllerSingleRouterLinkSource, 1)
headers := map[int32][]byte{
channel.HelloVersionHeader: versionHeader,
channel.HelloVersionHeader: versionHeader,
int32(ctrl_pb.ControlHeaders_CapabilitiesHeader): capabilityMask.Bytes(),
}

ctrlChannelListenerConfig := channel.ListenerConfig{
Expand Down
30 changes: 15 additions & 15 deletions tests/link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,16 @@ func Test_DuplicateLinkWithLinkCloseDialer(t *testing.T) {
ctrlListener := ctx.NewControlChannelListener()
router1 := ctx.startRouter(1)

router1cc, linkCheck1 := testutil.StartLinkTest("router-1", ctrlListener, ctx.Req)
linkChecker := testutil.NewLinkChecker(ctx.Req)
router1cc := testutil.StartLinkTest(linkChecker, "router-1", ctrlListener, ctx.Req)

router1Listeners := &ctrl_pb.Listeners{}
if val, found := router1cc.Underlay().Headers()[int32(ctrl_pb.ControlHeaders_ListenersHeader)]; found {
ctx.Req.NoError(proto.Unmarshal(val, router1Listeners))
}

router2 := ctx.startRouter(2)
router2cc, linkCheck2 := testutil.StartLinkTest("router-2", ctrlListener, ctx.Req)
router2cc := testutil.StartLinkTest(linkChecker, "router-2", ctrlListener, ctx.Req)

router2Listeners := &ctrl_pb.Listeners{}
if val, found := router1cc.Underlay().Headers()[int32(ctrl_pb.ControlHeaders_ListenersHeader)]; found {
Expand Down Expand Up @@ -266,32 +267,31 @@ func Test_DuplicateLinkWithLinkCloseDialer(t *testing.T) {

time.Sleep(time.Second)

linkCheck1.RequireNoErrors()
link1 := linkCheck1.RequireOneActiveLink()
linkChecker.RequireNoErrors()
link1 := linkChecker.RequireOneActiveLink()

linkCheck2.RequireNoErrors()
link2 := linkCheck1.RequireOneActiveLink()
linkChecker.RequireNoErrors()
link2 := linkChecker.RequireOneActiveLink()

ctx.Req.Equal(link1.Id, link2.Id)

// Test closing control ch to router 1. On reconnect the existing link should get reported
ctx.Req.NoError(router1cc.Close())
_, linkCheck1 = testutil.StartLinkTest("router-1", ctrlListener, ctx.Req)
_ = testutil.StartLinkTest(linkChecker, "router-1", ctrlListener, ctx.Req)

time.Sleep(time.Second)

linkCheck1.RequireNoErrors()
link1 = linkCheck1.RequireOneActiveLink()
linkChecker.RequireNoErrors()
link1 = linkChecker.RequireOneActiveLink()
ctx.Req.Equal(link1.Id, link2.Id)

// Test closing control ch to router 2. On reconnect the existing link should get reported
ctx.Req.NoError(router2cc.Close())
_, linkCheck2 = testutil.StartLinkTest("router-2", ctrlListener, ctx.Req)
_ = testutil.StartLinkTest(linkChecker, "router-2", ctrlListener, ctx.Req)

time.Sleep(time.Second)

linkCheck2.RequireNoErrors()
link2 = linkCheck2.RequireOneActiveLink()
linkChecker.RequireNoErrors()
link2 = linkChecker.RequireOneActiveLink()
ctx.Req.Equal(link1.Id, link2.Id)

// restart router 1
Expand All @@ -302,10 +302,10 @@ func Test_DuplicateLinkWithLinkCloseDialer(t *testing.T) {
ctx.Req.NoError(router1.Shutdown())
}()

router1cc, linkCheck1 = testutil.StartLinkTest("router-1", ctrlListener, ctx.Req)
router1cc = testutil.StartLinkTest(linkChecker, "router-1", ctrlListener, ctx.Req)
ctx.Req.NoError(protobufs.MarshalTyped(peerUpdates2).WithTimeout(time.Second).SendAndWaitForWire(router1cc))

linkCheck1.RequireNoErrors()
linkChecker.RequireNoErrors()

//time.Sleep(time.Minute)
//
Expand Down
41 changes: 31 additions & 10 deletions tests/testutil/linkschecker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package testutil

import (
"fmt"
"sync"
"time"

Expand All @@ -15,9 +16,11 @@ import (
)

type TestLink struct {
Id string
Dest string
Failed bool
Id string
Src string
Dest string
FaultCount int
Valid bool
}

type LinkStateChecker struct {
Expand All @@ -34,7 +37,7 @@ func (self *LinkStateChecker) reportError(err error) {
}
}

func (self *LinkStateChecker) HandleLink(msg *channel.Message, _ channel.Channel) {
func (self *LinkStateChecker) HandleLink(msg *channel.Message, ch channel.Channel) {
self.Lock()
defer self.Unlock()

Expand All @@ -44,8 +47,22 @@ func (self *LinkStateChecker) HandleLink(msg *channel.Message, _ channel.Channel
}

for _, link := range routerLinks.Links {
self.links[link.Id] = &TestLink{
Id: link.Id,
testLink, ok := self.links[link.Id]
if !ok {
self.links[link.Id] = &TestLink{
Id: link.Id,
Src: ch.Id(),
Dest: link.DestRouterId,
Valid: true,
}
} else {
if testLink.Src != ch.Id() {
self.reportError(fmt.Errorf("source router change for link %v => %v", testLink.Src, ch.Id()))
}
if testLink.Dest != link.DestRouterId {
self.reportError(fmt.Errorf("dest router change for link %v => %v", testLink.Dest, link.DestRouterId))
}
testLink.Valid = true
}
}
}
Expand All @@ -64,7 +81,8 @@ func (self *LinkStateChecker) HandleFault(msg *channel.Message, _ channel.Channe

if fault.Subject == ctrl_pb.FaultSubject_LinkFault || fault.Subject == ctrl_pb.FaultSubject_LinkDuplicate {
if link, found := self.links[fault.Id]; found {
link.Failed = true
link.FaultCount++
link.Valid = false
} else {
self.reportError(errors.Errorf("no link with Id %s found", fault.Id))
}
Expand Down Expand Up @@ -108,7 +126,7 @@ func (self *LinkStateChecker) RequireOneActiveLink() *TestLink {
var activeLink *TestLink

for _, link := range self.links {
if !link.Failed {
if link.Valid {
self.req.Nil(activeLink, "more than one active link found")
activeLink = link
}
Expand All @@ -117,13 +135,16 @@ func (self *LinkStateChecker) RequireOneActiveLink() *TestLink {
return activeLink
}

func StartLinkTest(id string, uf channel.UnderlayFactory, assertions *require.Assertions) (channel.Channel, *LinkStateChecker) {
func NewLinkChecker(assertions *require.Assertions) *LinkStateChecker {
checker := &LinkStateChecker{
errorC: make(chan error, 4),
links: map[string]*TestLink{},
req: assertions,
}
return checker
}

func StartLinkTest(checker *LinkStateChecker, id string, uf channel.UnderlayFactory, assertions *require.Assertions) channel.Channel {
bindHandler := func(binding channel.Binding) error {
binding.AddReceiveHandlerF(channel.AnyContentType, checker.HandleOther)
binding.AddReceiveHandlerF(int32(ctrl_pb.ContentType_VerifyRouterType), func(msg *channel.Message, ch channel.Channel) {
Expand All @@ -137,5 +158,5 @@ func StartLinkTest(id string, uf channel.UnderlayFactory, assertions *require.As
timeoutUF := NewTimeoutUnderlayFactory(uf, 2*time.Second)
ch, err := channel.NewChannel(id, timeoutUF, channel.BindHandlerF(bindHandler), channel.DefaultOptions())
assertions.NoError(err)
return ch, checker
return ch
}

0 comments on commit 963d88f

Please sign in to comment.