diff --git a/balancer/balancer.go b/balancer/balancer.go index 540e7738..fb7994d2 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -29,10 +29,11 @@ import ( // streams or consumers, and then determine an even distribution. If any of the // servers is the leader for more than the even distribution, the balancer will // step down a number of streams/consumers until the even distribution is met. -// Which streams/consumers are stepped down is determined randomly. -// If stepping down fails, or if the same server is elected the leader again, -// we will move on the next randomly selected server. If we get a second, similar -// failure the Balancer will return an error. +// Which streams/consumers are stepped down is determined randomly. We use +// preferred placement to move the leadership to a server with less than +// the even distribution. If stepping down fails, we will move on the next +// randomly selected server. If we get a second, similar failure the Balancer +// will return an error. type Balancer struct { nc *nats.Conn log api.Logger @@ -45,54 +46,43 @@ type balanceEntity interface { } type peer struct { - hostname string - entities []balanceEntity - leaderCount int - rebalance int + name string + entities []balanceEntity + offset int } // New returns a new instance of the Balancer func New(nc *nats.Conn, log api.Logger) (*Balancer, error) { - return &Balancer{ - nc: nc, - log: log, - }, nil -} + mgr, err := jsm.New(nc) + if err != nil { + return nil, err + } -func (b *Balancer) updateServersWithExclude(servers map[string]*peer, exclude string) (map[string]*peer, error) { - updated := map[string]*peer{} - var err error + apiLvl, err := mgr.MetaApiLevel(true) + if err != nil { + return nil, err + } - for _, s := range servers { - if s.hostname == exclude { - continue - } - for _, e := range s.entities { - updated, err = b.mapEntityToServers(e, updated) - if err != nil { - return updated, err - } - } + if apiLvl < 1 { + return nil, fmt.Errorf("invalid server version. Balancer requires server version 2.11.0 or higher") } - return updated, nil + return &Balancer{ + nc: nc, + log: log, + }, nil } -func (b *Balancer) getOvers(server map[string]*peer, evenDistribution int) { - for _, s := range server { - if s.leaderCount == 0 { - continue - } - - if over := s.leaderCount - evenDistribution; over > 0 { - s.rebalance = over - } +func (b *Balancer) calcOffset(server *map[string]*peer, evenDistribution int) { + for _, s := range *server { + s.offset = len(s.entities) - evenDistribution } } +// We consider a balanced stream to be one with an offset of 0 or less func (b *Balancer) isBalanced(servers map[string]*peer) bool { for _, s := range servers { - if s.rebalance > 0 { + if s.offset > 0 { return false } } @@ -110,22 +100,19 @@ func (b *Balancer) mapEntityToServers(entity balanceEntity, serverMap map[string _, ok := serverMap[leaderName] if !ok { tmp := peer{ - hostname: leaderName, - entities: []balanceEntity{}, - leaderCount: 0, + name: leaderName, + entities: []balanceEntity{}, } serverMap[leaderName] = &tmp } serverMap[leaderName].entities = append(serverMap[leaderName].entities, entity) - serverMap[leaderName].leaderCount += 1 for _, replica := range info.Replicas { _, ok = serverMap[replica.Name] if !ok { tmp := peer{ - hostname: replica.Name, - entities: []balanceEntity{}, - leaderCount: 0, + name: replica.Name, + entities: []balanceEntity{}, } serverMap[replica.Name] = &tmp } @@ -139,58 +126,51 @@ func (b *Balancer) calcDistribution(entities, servers int) int { return int(math.Floor(evenDistributionf + 0.5)) } -func (b *Balancer) balance(servers map[string]*peer, evenDistribution int) (int, error) { - var err error +func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeHint string) (int, error) { + //var err error steppedDown := 0 for !b.isBalanced(servers) { for _, s := range servers { - // skip servers that aren't leaders - if s.leaderCount == 0 { - continue - } - - if s.rebalance > 0 { - b.log.Infof("Found server '%s' with %d entities over the even distribution\n", s.hostname, s.rebalance) - // Now we have to kick a random selection of streams where number = rebalance + if s.offset > 0 { + b.log.Infof("Found server '%s' with offset of %d. Rebalancing", s.name, s.offset) retries := 0 - for i := 0; i < s.rebalance; i++ { + for i := 0; i <= s.offset; i++ { + // find a random stream (or consumer) to move to another server randomIndex := rand.Intn(len(s.entities)) entity := s.entities[randomIndex] - if entity == nil { - return steppedDown, fmt.Errorf("no more valid entities to balance") - } - b.log.Infof("Requesting leader (%s) step down for %s", s.hostname, entity.Name()) - - err = entity.LeaderStepDown() - if err != nil { - b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err) - // If we failed to step down the stream, decrement the iterator so that we don't kick one too few - // Limit this to one retry, if we can't step down multiple leaders something is wrong - if retries == 0 { - i-- - retries++ + + for _, ns := range servers { + if ns.offset < 0 { + b.log.Infof("Requesting leader '%s' step down for %s '%s'. New preferred leader is %s.", s.name, typeHint, entity.Name(), ns.name) + placement := api.Placement{Preferred: ns.name} + err := entity.LeaderStepDown(&placement) + if err != nil { + b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err) + // If we failed to step down the stream, decrement the iterator so that we don't kick one too few + // Limit this to one retry, if we can't step down multiple leaders something is wrong + if retries == 0 { + i-- + retries++ + s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1) + break + } + return 0, err + } + b.log.Infof("Successful step down for %s '%s'", typeHint, entity.Name()) + retries = 0 + steppedDown += 1 + ns.offset += 1 + s.offset -= 1 s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1) - continue + break } - return 0, err } - - b.log.Infof("Successful step down '%s'", entity.Name()) - retries = 0 - s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1) - steppedDown += 1 - } - - // finally, if we rebalanced a server we update the servers list and start again, excluding the one we just rebalanced - servers, err = b.updateServersWithExclude(servers, s.hostname) - if err != nil { - return steppedDown, err } - b.getOvers(servers, evenDistribution) - break } } + // We recalculate the offset count, we can't be 100% sure entities moved to their preferred server + b.calcOffset(&servers, evenDistribution) } return steppedDown, nil @@ -212,9 +192,10 @@ func (b *Balancer) BalanceStreams(streams []*jsm.Stream) (int, error) { b.log.Debugf("found %d streams on %d servers\n", len(streams), len(servers)) evenDistribution := b.calcDistribution(len(streams), len(servers)) b.log.Debugf("even distribution is %d\n", evenDistribution) - b.getOvers(servers, evenDistribution) + b.log.Debugf("calculating offset for each server") + b.calcOffset(&servers, evenDistribution) - return b.balance(servers, evenDistribution) + return b.balance(servers, evenDistribution, "stream") } // BalanceConsumers finds the expected distribution of consumer leaders over servers @@ -233,7 +214,8 @@ func (b *Balancer) BalanceConsumers(consumers []*jsm.Consumer) (int, error) { b.log.Debugf("found %d consumers on %d servers\n", len(consumers), len(servers)) evenDistribution := b.calcDistribution(len(consumers), len(servers)) b.log.Debugf("even distribution is %d\n", evenDistribution) - b.getOvers(servers, evenDistribution) + b.log.Debugf("calculating offset for each server") + b.calcOffset(&servers, evenDistribution) - return b.balance(servers, evenDistribution) + return b.balance(servers, evenDistribution, "consumer") } diff --git a/balancer/balancer_test.go b/balancer/balancer_test.go index 410062f2..c56b3c93 100644 --- a/balancer/balancer_test.go +++ b/balancer/balancer_test.go @@ -18,7 +18,7 @@ import ( func TestBalanceStream(t *testing.T) { withJSCluster(t, 3, func(t *testing.T, servers []*server.Server, nc *nats.Conn, mgr *jsm.Manager) error { streams := []*jsm.Stream{} - for i := 1; i <= 10; i++ { + for i := 1; i < 10; i++ { streamName := fmt.Sprintf("tests%d", i) subjects := fmt.Sprintf("tests%d.*", i) s, err := mgr.NewStream(streamName, jsm.Subjects(subjects), jsm.MemoryStorage(), jsm.Replicas(3)) @@ -62,7 +62,7 @@ func TestBalanceConsumer(t *testing.T) { defer s.Delete() consumers := []*jsm.Consumer{} - for i := 1; i <= 10; i++ { + for i := 1; i < 10; i++ { consumerName := fmt.Sprintf("testc%d", i) c, err := mgr.NewConsumer("TEST_CONSUMER_BALANCE", jsm.ConsumerName(consumerName)) if err != nil { @@ -154,7 +154,7 @@ func withJSCluster(t *testing.T, retries int, cb func(*testing.T, []*server.Serv } defer nc.Close() - mgr, err := jsm.New(nc, jsm.WithTimeout(time.Second)) + mgr, err := jsm.New(nc, jsm.WithTimeout(5*time.Second)) if err != nil { t.Fatalf("manager creation failed: %s", err) }