From 38585b433ffbf4307338d6f48a92660598c9020c Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Wed, 21 Feb 2024 13:41:39 -0500 Subject: [PATCH] manager/allocator: lift portAllocator out of CNM The port allocation logic does not depend on the network allocator implementation in any meaningful way. It has no knowledge of the CNM network allocator's state, and it does not need to change if the network allocator changes. Allocating node ports is fundamentally a seaparate concern from allocating network resources for services and tasks. Therefore the low-level network allocator should not be responsible for allocating both. Lift the port allocator into the Allocator's network context as a sibling of the low-level network allocator. Signed-off-by: Cory Snider --- .../cnmallocator/networkallocator.go | 24 ------- .../cnmallocator/networkallocator_test.go | 9 +-- manager/allocator/network.go | 63 +++++++++++++++---- .../networkallocator/networkallocator.go | 4 -- .../{cnmallocator => }/portallocator.go | 2 +- .../{cnmallocator => }/portallocator_test.go | 2 +- 6 files changed, 59 insertions(+), 45 deletions(-) rename manager/allocator/{cnmallocator => }/portallocator.go (99%) rename manager/allocator/{cnmallocator => }/portallocator_test.go (99%) diff --git a/manager/allocator/cnmallocator/networkallocator.go b/manager/allocator/cnmallocator/networkallocator.go index 3c9d0c4617..0606b88575 100644 --- a/manager/allocator/cnmallocator/networkallocator.go +++ b/manager/allocator/cnmallocator/networkallocator.go @@ -40,9 +40,6 @@ type cnmNetworkAllocator struct { // The driver registry for all internal and external network drivers. networkRegistry drvregistry.Networks - // The port allocator instance for allocating node ports - portAllocator *portAllocator - // Local network state used by cnmNetworkAllocator to do network management. networks map[string]*network @@ -108,8 +105,6 @@ func New(pg plugingetter.PluginGetter, netConfig *NetworkConfig) (networkallocat tasks: make(map[string]struct{}), nodes: make(map[string]map[string]struct{}), pg: pg, - - portAllocator: newPortAllocator(), } for ntype, i := range initializers { @@ -207,9 +202,6 @@ func (na *cnmNetworkAllocator) Deallocate(n *api.Network) error { // AllocateService allocates all the network resources such as virtual // IP and ports needed by the service. func (na *cnmNetworkAllocator) AllocateService(s *api.Service) (err error) { - if err = na.portAllocator.serviceAllocatePorts(s); err != nil { - return err - } defer func() { if err != nil { na.DeallocateService(s) @@ -312,7 +304,6 @@ func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error { } s.Endpoint.VirtualIPs = nil - na.portAllocator.serviceDeallocatePorts(s) delete(na.services, s.ID) return nil @@ -369,19 +360,8 @@ func (na *cnmNetworkAllocator) IsTaskAllocated(t *api.Task) bool { return true } -// HostPublishPortsNeedUpdate returns true if the passed service needs -// allocations for its published ports in host (non ingress) mode -func (na *cnmNetworkAllocator) HostPublishPortsNeedUpdate(s *api.Service) bool { - return na.portAllocator.hostPublishPortsNeedUpdate(s) -} - // IsServiceAllocated returns false if the passed service needs to have network resources allocated/updated. func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool { - var options networkallocator.ServiceAllocationOpts - for _, flag := range flags { - flag(&options) - } - specNetworks := serviceNetworks(s) // If endpoint mode is VIP and allocator does not have the @@ -443,10 +423,6 @@ func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func( } } - if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) || - (s.Endpoint != nil && len(s.Endpoint.Ports) != 0) { - return na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) - } return true } diff --git a/manager/allocator/cnmallocator/networkallocator_test.go b/manager/allocator/cnmallocator/networkallocator_test.go index 8598fc6b1a..1f92b101bd 100644 --- a/manager/allocator/cnmallocator/networkallocator_test.go +++ b/manager/allocator/cnmallocator/networkallocator_test.go @@ -564,10 +564,11 @@ func TestAllocateService(t *testing.T) { err = na.AllocateService(s) assert.NoError(t, err) assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.True(t, s.Endpoint.Ports[0].PublishedPort >= dynamicPortStart && - s.Endpoint.Ports[0].PublishedPort <= dynamicPortEnd) - assert.True(t, s.Endpoint.Ports[1].PublishedPort >= dynamicPortStart && - s.Endpoint.Ports[1].PublishedPort <= dynamicPortEnd) + assert.True(t, s.Endpoint.Ports[0].PublishedPort >= 1 && + s.Endpoint.Ports[0].PublishedPort <= 65535) + assert.True(t, s.Endpoint.Ports[1].PublishedPort >= 1 && + s.Endpoint.Ports[1].PublishedPort <= 65535) + assert.NotEqual(t, s.Endpoint.Ports[0].PublishedPort, s.Endpoint.Ports[1].PublishedPort) assert.Equal(t, 1, len(s.Endpoint.VirtualIPs)) diff --git a/manager/allocator/network.go b/manager/allocator/network.go index 673da84996..721bf3f080 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -37,6 +37,9 @@ type networkContext struct { // the actual network allocation. nwkAllocator networkallocator.NetworkAllocator + // The port allocator instance for allocating node ports + portAllocator *portAllocator + // A set of tasks which are ready to be allocated as a batch. This is // distinct from "unallocatedTasks" which are tasks that failed to // allocate on the first try, being held for a future retry. @@ -95,6 +98,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { nc := &networkContext{ nwkAllocator: na, + portAllocator: newPortAllocator(), pendingTasks: make(map[string]*api.Task), unallocatedTasks: make(map[string]*api.Task), unallocatedServices: make(map[string]*api.Service), @@ -233,7 +237,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { + if nc.isServiceAllocated(s) { break } @@ -261,8 +265,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { - if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) { + if nc.isServiceAllocated(s) { + if !nc.portAllocator.hostPublishPortsNeedUpdate(s) { break } updatePortsInHostPublishMode(s) @@ -284,7 +288,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { case api.EventDeleteService: s := v.Service.Copy() - if err := nc.nwkAllocator.DeallocateService(s); err != nil { + if err := nc.deallocateService(s); err != nil { log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID) } else { nc.somethingWasDeallocated = true @@ -681,7 +685,7 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly var allocatedServices []*api.Service for _, s := range services { - if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) { + if nc.isServiceAllocated(s, networkallocator.OnInit) { continue } if existingAddressesOnly && @@ -713,6 +717,23 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly return nil } +// isServiceAllocated returns false if the passed service needs to have network resources allocated/updated. +func (nc *networkContext) isServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool { + if !nc.nwkAllocator.IsServiceAllocated(s, flags...) { + return false + } + + var options networkallocator.ServiceAllocationOpts + for _, flag := range flags { + flag(&options) + } + if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) || + (s.Endpoint != nil && len(s.Endpoint.Ports) != 0) { + return nc.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) + } + return true +} + // allocateTasks allocates tasks in the store so far before we started watching. func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly bool) error { var ( @@ -815,7 +836,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo // network configured or service endpoints have been // allocated. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) && - (s == nil || nc.nwkAllocator.IsServiceAllocated(s)) + (s == nil || nc.isServiceAllocated(s)) } func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) { @@ -1200,13 +1221,13 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin // is not there // service has no user-defined endpoints while has already allocated network resources, // need deallocated. - if err := nc.nwkAllocator.DeallocateService(s); err != nil { + if err := nc.deallocateService(s); err != nil { return err } nc.somethingWasDeallocated = true } - if err := nc.nwkAllocator.AllocateService(s); err != nil { + if err := nc.allocateService(s); err != nil { nc.unallocatedServices[s.ID] = s return err } @@ -1229,6 +1250,26 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin return nil } +func (nc *networkContext) allocateService(s *api.Service) error { + if err := nc.portAllocator.serviceAllocatePorts(s); err != nil { + return err + } + if err := nc.nwkAllocator.AllocateService(s); err != nil { + nc.portAllocator.serviceDeallocatePorts(s) + return err + } + + return nil +} + +func (nc *networkContext) deallocateService(s *api.Service) error { + if err := nc.nwkAllocator.DeallocateService(s); err != nil { + return err + } + nc.portAllocator.serviceDeallocatePorts(s) + return nil +} + func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error { if err := batch.Update(func(tx store.Tx) error { err := store.UpdateService(tx, s) @@ -1241,7 +1282,7 @@ func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Bat return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID) }); err != nil { - if err := a.netCtx.nwkAllocator.DeallocateService(s); err != nil { + if err := a.netCtx.deallocateService(s); err != nil { log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID) } @@ -1298,7 +1339,7 @@ func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) { return } - if !nc.nwkAllocator.IsServiceAllocated(s) { + if !nc.isServiceAllocated(s) { err = fmt.Errorf("service %s to which task %s belongs has pending allocations", s.ID, t.ID) return } @@ -1423,7 +1464,7 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { nc := a.netCtx var allocatedServices []*api.Service for _, s := range nc.unallocatedServices { - if !nc.nwkAllocator.IsServiceAllocated(s) { + if !nc.isServiceAllocated(s) { if err := a.allocateService(ctx, s, false); err != nil { log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID) continue diff --git a/manager/allocator/networkallocator/networkallocator.go b/manager/allocator/networkallocator/networkallocator.go index bb7085ce00..293c6e875d 100644 --- a/manager/allocator/networkallocator/networkallocator.go +++ b/manager/allocator/networkallocator/networkallocator.go @@ -61,10 +61,6 @@ type NetworkAllocator interface { // virtual IP and ports associated with the service. DeallocateService(s *api.Service) error - // HostPublishPortsNeedUpdate returns true if the passed service needs - // allocations for its published ports in host (non ingress) mode - HostPublishPortsNeedUpdate(s *api.Service) bool - // // Task Allocation // diff --git a/manager/allocator/cnmallocator/portallocator.go b/manager/allocator/portallocator.go similarity index 99% rename from manager/allocator/cnmallocator/portallocator.go rename to manager/allocator/portallocator.go index 2ebb35b33e..4fa16d7a9b 100644 --- a/manager/allocator/cnmallocator/portallocator.go +++ b/manager/allocator/portallocator.go @@ -1,4 +1,4 @@ -package cnmallocator +package allocator import ( "github.com/moby/swarmkit/v2/api" diff --git a/manager/allocator/cnmallocator/portallocator_test.go b/manager/allocator/portallocator_test.go similarity index 99% rename from manager/allocator/cnmallocator/portallocator_test.go rename to manager/allocator/portallocator_test.go index 0970b37931..e85ffbe79d 100644 --- a/manager/allocator/cnmallocator/portallocator_test.go +++ b/manager/allocator/portallocator_test.go @@ -1,4 +1,4 @@ -package cnmallocator +package allocator import ( "testing"