Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#596) Update balancer to use Placement preferred #612

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 69 additions & 87 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
// streams or consumers, and then determine an even distribution. If any of the
// servers is the leader for more than the even distribution, the balancer will
// step down a number of streams/consumers until the even distribution is met.
// Which streams/consumers are stepped down is determined randomly.
// If stepping down fails, or if the same server is elected the leader again,
// we will move on the next randomly selected server. If we get a second, similar
// failure the Balancer will return an error.
// Which streams/consumers are stepped down is determined randomly. We use
// preferred placement to move the leadership to a server with less than
// the even distribution. If stepping down fails, we will move on the next
// randomly selected server. If we get a second, similar failure the Balancer
// will return an error.
type Balancer struct {
nc *nats.Conn
log api.Logger
Expand All @@ -45,54 +46,43 @@ type balanceEntity interface {
}

type peer struct {
hostname string
entities []balanceEntity
leaderCount int
rebalance int
name string
entities []balanceEntity
offset int
}

// New returns a new instance of the Balancer
func New(nc *nats.Conn, log api.Logger) (*Balancer, error) {
return &Balancer{
nc: nc,
log: log,
}, nil
}
mgr, err := jsm.New(nc)
if err != nil {
return nil, err
}

func (b *Balancer) updateServersWithExclude(servers map[string]*peer, exclude string) (map[string]*peer, error) {
updated := map[string]*peer{}
var err error
apiLvl, err := mgr.MetaApiLevel(true)
if err != nil {
return nil, err
}

for _, s := range servers {
if s.hostname == exclude {
continue
}
for _, e := range s.entities {
updated, err = b.mapEntityToServers(e, updated)
if err != nil {
return updated, err
}
}
if apiLvl < 1 {
return nil, fmt.Errorf("invalid server version. Balancer requires server version 2.11.0 or higher")
}

return updated, nil
return &Balancer{
nc: nc,
log: log,
}, nil
}

func (b *Balancer) getOvers(server map[string]*peer, evenDistribution int) {
for _, s := range server {
if s.leaderCount == 0 {
continue
}

if over := s.leaderCount - evenDistribution; over > 0 {
s.rebalance = over
}
func (b *Balancer) calcOffset(server *map[string]*peer, evenDistribution int) {
for _, s := range *server {
s.offset = len(s.entities) - evenDistribution
}
}

// We consider a balanced stream to be one with an offset of 0 or less
func (b *Balancer) isBalanced(servers map[string]*peer) bool {
for _, s := range servers {
if s.rebalance > 0 {
if s.offset > 0 {
return false
}
}
Expand All @@ -110,22 +100,19 @@ func (b *Balancer) mapEntityToServers(entity balanceEntity, serverMap map[string
_, ok := serverMap[leaderName]
if !ok {
tmp := peer{
hostname: leaderName,
entities: []balanceEntity{},
leaderCount: 0,
name: leaderName,
entities: []balanceEntity{},
}
serverMap[leaderName] = &tmp
}
serverMap[leaderName].entities = append(serverMap[leaderName].entities, entity)
serverMap[leaderName].leaderCount += 1

for _, replica := range info.Replicas {
_, ok = serverMap[replica.Name]
if !ok {
tmp := peer{
hostname: replica.Name,
entities: []balanceEntity{},
leaderCount: 0,
name: replica.Name,
entities: []balanceEntity{},
}
serverMap[replica.Name] = &tmp
}
Expand All @@ -139,58 +126,51 @@ func (b *Balancer) calcDistribution(entities, servers int) int {
return int(math.Floor(evenDistributionf + 0.5))
}

func (b *Balancer) balance(servers map[string]*peer, evenDistribution int) (int, error) {
var err error
func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeHint string) (int, error) {
//var err error
steppedDown := 0

for !b.isBalanced(servers) {
for _, s := range servers {
// skip servers that aren't leaders
if s.leaderCount == 0 {
continue
}

if s.rebalance > 0 {
b.log.Infof("Found server '%s' with %d entities over the even distribution\n", s.hostname, s.rebalance)
// Now we have to kick a random selection of streams where number = rebalance
if s.offset > 0 {
b.log.Infof("Found server '%s' with offset of %d. Rebalancing", s.name, s.offset)
retries := 0
for i := 0; i < s.rebalance; i++ {
for i := 0; i <= s.offset; i++ {
// find a random stream (or consumer) to move to another server
randomIndex := rand.Intn(len(s.entities))
entity := s.entities[randomIndex]
if entity == nil {
return steppedDown, fmt.Errorf("no more valid entities to balance")
}
b.log.Infof("Requesting leader (%s) step down for %s", s.hostname, entity.Name())

err = entity.LeaderStepDown()
if err != nil {
b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err)
// If we failed to step down the stream, decrement the iterator so that we don't kick one too few
// Limit this to one retry, if we can't step down multiple leaders something is wrong
if retries == 0 {
i--
retries++

for _, ns := range servers {
if ns.offset < 0 {
b.log.Infof("Requesting leader '%s' step down for %s '%s'. New preferred leader is %s.", s.name, typeHint, entity.Name(), ns.name)
placement := api.Placement{Preferred: ns.name}
err := entity.LeaderStepDown(&placement)
if err != nil {
b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err)
// If we failed to step down the stream, decrement the iterator so that we don't kick one too few
// Limit this to one retry, if we can't step down multiple leaders something is wrong
if retries == 0 {
i--
retries++
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
break
}
return 0, err
}
b.log.Infof("Successful step down for %s '%s'", typeHint, entity.Name())
retries = 0
steppedDown += 1
ns.offset += 1
s.offset -= 1
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
continue
break
}
return 0, err
}

b.log.Infof("Successful step down '%s'", entity.Name())
retries = 0
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
steppedDown += 1
}

// finally, if we rebalanced a server we update the servers list and start again, excluding the one we just rebalanced
servers, err = b.updateServersWithExclude(servers, s.hostname)
if err != nil {
return steppedDown, err
}
b.getOvers(servers, evenDistribution)
break
}
}
// We recalculate the offset count, we can't be 100% sure entities moved to their preferred server
b.calcOffset(&servers, evenDistribution)
}

return steppedDown, nil
Expand All @@ -212,9 +192,10 @@ func (b *Balancer) BalanceStreams(streams []*jsm.Stream) (int, error) {
b.log.Debugf("found %d streams on %d servers\n", len(streams), len(servers))
evenDistribution := b.calcDistribution(len(streams), len(servers))
b.log.Debugf("even distribution is %d\n", evenDistribution)
b.getOvers(servers, evenDistribution)
b.log.Debugf("calculating offset for each server")
b.calcOffset(&servers, evenDistribution)

return b.balance(servers, evenDistribution)
return b.balance(servers, evenDistribution, "stream")
}

// BalanceConsumers finds the expected distribution of consumer leaders over servers
Expand All @@ -233,7 +214,8 @@ func (b *Balancer) BalanceConsumers(consumers []*jsm.Consumer) (int, error) {
b.log.Debugf("found %d consumers on %d servers\n", len(consumers), len(servers))
evenDistribution := b.calcDistribution(len(consumers), len(servers))
b.log.Debugf("even distribution is %d\n", evenDistribution)
b.getOvers(servers, evenDistribution)
b.log.Debugf("calculating offset for each server")
b.calcOffset(&servers, evenDistribution)

return b.balance(servers, evenDistribution)
return b.balance(servers, evenDistribution, "consumer")
}
6 changes: 3 additions & 3 deletions balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func TestBalanceStream(t *testing.T) {
withJSCluster(t, 3, func(t *testing.T, servers []*server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
streams := []*jsm.Stream{}
for i := 1; i <= 10; i++ {
for i := 1; i < 10; i++ {
streamName := fmt.Sprintf("tests%d", i)
subjects := fmt.Sprintf("tests%d.*", i)
s, err := mgr.NewStream(streamName, jsm.Subjects(subjects), jsm.MemoryStorage(), jsm.Replicas(3))
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestBalanceConsumer(t *testing.T) {
defer s.Delete()

consumers := []*jsm.Consumer{}
for i := 1; i <= 10; i++ {
for i := 1; i < 10; i++ {
consumerName := fmt.Sprintf("testc%d", i)
c, err := mgr.NewConsumer("TEST_CONSUMER_BALANCE", jsm.ConsumerName(consumerName))
if err != nil {
Expand Down Expand Up @@ -154,7 +154,7 @@ func withJSCluster(t *testing.T, retries int, cb func(*testing.T, []*server.Serv
}
defer nc.Close()

mgr, err := jsm.New(nc, jsm.WithTimeout(time.Second))
mgr, err := jsm.New(nc, jsm.WithTimeout(5*time.Second))
if err != nil {
t.Fatalf("manager creation failed: %s", err)
}
Expand Down
Loading