Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hspedro committed Oct 9, 2024
1 parent dd58d08 commit 2293285
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 170 deletions.
163 changes: 146 additions & 17 deletions internal/core/operations/healthcontroller/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package healthcontroller

import (
"context"
"errors"
"fmt"
"math"
"strconv"
Expand All @@ -41,7 +42,10 @@ import (

"github.com/topfreegames/maestro/internal/core/entities/operation"
"github.com/topfreegames/maestro/internal/core/operations"
porterrors "github.com/topfreegames/maestro/internal/core/ports/errors"
"github.com/topfreegames/maestro/internal/core/utils"

Check failure on line 46 in internal/core/operations/healthcontroller/executor.go

View workflow job for this annotation

GitHub Actions / Integration tests

no required module provides package github.com/topfreegames/maestro/internal/core/utils; to add it:

Check failure on line 46 in internal/core/operations/healthcontroller/executor.go

View workflow job for this annotation

GitHub Actions / Unit tests

no required module provides package github.com/topfreegames/maestro/internal/core/utils; to add it:
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
Expand Down Expand Up @@ -127,7 +131,7 @@ func (ex *Executor) Execute(ctx context.Context, op *operation.Operation, defini
ex.setTookAction(def, true)
}

desiredNumberOfRooms, isRollingUpdating, err := GetDesiredNumberOfRooms(ctx, ex.autoscaler, ex.roomStorage, ex.schedulerStorage, logger, scheduler, availableRooms)
desiredNumberOfRooms, isRollingUpdating, err := GetDesiredNumberOfRooms(ctx, ex.autoscaler, ex.roomStorage, ex.schedulerStorage, logger, scheduler, ex.config, availableRooms)
if err != nil {
logger.Error("error getting the desired number of rooms", zap.Error(err))
return err
Expand Down Expand Up @@ -198,20 +202,20 @@ func (ex *Executor) ensureDesiredAmountOfInstances(ctx context.Context, op *oper
logger = logger.With(zap.Int("actual", actualAmount), zap.Int("desired", desiredAmount))
switch {
case actualAmount > desiredAmount: // Need to scale down
removeAmount := actualAmount - desiredAmount
reason := remove.ScaleDown
if isRollingUpdating {
reason = remove.RollingUpdateReplace
}
removeOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &remove.Definition{
Amount: removeAmount,
Reason: reason,
})
if err != nil {
return err
}
tookAction = true
msgToAppend = fmt.Sprintf("created operation (id: %s) to remove %v rooms.", removeOperation.ID, removeAmount)
// removeAmount := actualAmount - desiredAmount
// reason := remove.ScaleDown
// if isRollingUpdating {
// reason = remove.RollingUpdateReplace
// }
// removeOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &remove.Definition{
// Amount: removeAmount,
// Reason: reason,
// })
// if err != nil {
// return err
// }
// tookAction = true
// msgToAppend = fmt.Sprintf("created operation (id: %s) to remove %v rooms.", removeOperation.ID, removeAmount)
case actualAmount < desiredAmount: // Need to scale up
addAmount := desiredAmount - actualAmount
addOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &add.Definition{
Expand Down Expand Up @@ -345,12 +349,20 @@ func (ex *Executor) setTookAction(def *Definition, tookAction bool) {
def.TookAction = &tookAction
}

func CanPerformDownscale(ctx context.Context, autoscaler ports.Autoscaler, scheduler *entities.Scheduler, logger *zap.Logger) (bool, string) {
func CanPerformDownscale(ctx context.Context, autoscaler ports.Autoscaler, scheduler *entities.Scheduler, readyRoomsToBeDeleted, occupiedRoomsToBeDeleted int, logger *zap.Logger) (bool, string) {
can, err := autoscaler.CanDownscale(ctx, scheduler)
if err != nil {
logger.Error("error checking if scheduler can downscale", zap.Error(err))
return can, err.Error()
}
logger.Info("using current CanDownscale: ", zap.Bool("can", can))

can, err = autoscaler.CanDownscaleToNextState(ctx, scheduler, readyRoomsToBeDeleted, occupiedRoomsToBeDeleted)
if err != nil {
logger.Error("error checking if scheduler can downscale", zap.Error(err))
return can, err.Error()
}
logger.Info("using CanDownscaleToNextState: ", zap.Bool("can", can))

if !can {
message := fmt.Sprintf("scheduler %s can't downscale, occupation is above the threshold", scheduler.Name)
Expand Down Expand Up @@ -390,6 +402,7 @@ func GetDesiredNumberOfRooms(
schedulerStorage ports.SchedulerStorage,
logger *zap.Logger,
scheduler *entities.Scheduler,
config Config,
availableRooms []string,
) (desiredNumber int, isRollingUpdating bool, err error) {
desiredNumber = scheduler.RoomsReplicas
Expand Down Expand Up @@ -424,9 +437,29 @@ func GetDesiredNumberOfRooms(
}

if len(availableRooms) > desiredNumber {
// Compute the next state to check if we can perform downscale
// 1. Get the rooms with deletion priority
// 2. Count how many ready rooms will be deleted
// 3. Check if we can delete without offending readyTarget
// 4. If we can't, keep the current number of rooms
// 5. If we can, update the scheduler.LastDownscaleAt and proceed with the downscale
roomsToBeDeleted, err := ListRoomsWithDeletionPriority(ctx, roomStorage, scheduler, len(availableRooms)-desiredNumber, isRollingUpdating, config, logger)
if err != nil {
logger.Error("error listing rooms with deletion priority, can not compute next state, skipping iteration", zap.Error(err))
}
readyRoomsToBeDeleted := 0
occupiedRoomsToBeDeleted := 0
for _, room := range roomsToBeDeleted {
if room.Status == game_room.GameStatusReady {
readyRoomsToBeDeleted += 1
}
if room.Status == game_room.GameStatusOccupied {
occupiedRoomsToBeDeleted += 1
}
}
willDownscale := true
if scheduler.Autoscaling != nil && scheduler.Autoscaling.Enabled {
canDownscale, msg := CanPerformDownscale(ctx, autoscaler, scheduler, logger)
canDownscale, msg := CanPerformDownscale(ctx, autoscaler, scheduler, readyRoomsToBeDeleted, occupiedRoomsToBeDeleted, logger)
if !canDownscale {
willDownscale = false
desiredNumber = len(availableRooms)
Expand Down Expand Up @@ -513,3 +546,99 @@ func ComputeMaxSurge(scheduler *entities.Scheduler, desiredNumberOfRooms int) (m

return maxSurgeNum, nil
}

// TODO: refactor to SchedulerController execute
// ListRoomsWithDeletionPriority returns a specified number of rooms, following
// the priority of it being deleted
//
// The priority is:
//
// - On error rooms;
// - No ping received for x time rooms;
// - Pending rooms;
// - Ready rooms;
// - Occupied rooms;
//
// This function can return less rooms than the `amount` since it might not have
// enough rooms on the scheduler.
func ListRoomsWithDeletionPriority(ctx context.Context, storage ports.RoomStorage, activeScheduler *entities.Scheduler, amount int, sortByVersion bool, config Config, logger *zap.Logger) ([]*game_room.GameRoom, error) {
var schedulerRoomsIDs []string
onErrorRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusError)
if err != nil {
return nil, fmt.Errorf("failed to list scheduler rooms on error: %w", err)
}

oldLastPingRoomIDs, err := storage.GetRoomIDsByLastPing(ctx, activeScheduler.Name, time.Now().Add(config.RoomPingTimeout*-1))
if err != nil {
return nil, fmt.Errorf("failed to list scheduler rooms with old last ping datetime: %w", err)
}

pendingRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusPending)
if err != nil {
return nil, fmt.Errorf("failed to list scheduler rooms on pending status: %w", err)
}

readyRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusReady)
if err != nil {
return nil, fmt.Errorf("failed to list scheduler rooms on ready status: %w", err)
}

occupiedRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusOccupied)
if err != nil {
return nil, fmt.Errorf("failed to list scheduler rooms on occupied status: %w", err)
}

schedulerRoomsIDs = append(schedulerRoomsIDs, onErrorRoomIDs...)
schedulerRoomsIDs = append(schedulerRoomsIDs, oldLastPingRoomIDs...)
schedulerRoomsIDs = append(schedulerRoomsIDs, pendingRoomIDs...)
schedulerRoomsIDs = append(schedulerRoomsIDs, readyRoomIDs...)
schedulerRoomsIDs = append(schedulerRoomsIDs, occupiedRoomIDs...)

schedulerRoomsIDs = utils.RemoveDuplicates(schedulerRoomsIDs)

var activeVersionRoomPool []*game_room.GameRoom
var toDeleteRooms []*game_room.GameRoom
var terminatingRooms []*game_room.GameRoom
for _, roomID := range schedulerRoomsIDs {
room, err := storage.GetRoom(ctx, activeScheduler.Name, roomID)
if err != nil {
if !errors.Is(err, porterrors.ErrNotFound) {
return nil, fmt.Errorf("failed to fetch room information: %w", err)
}

room = &game_room.GameRoom{ID: roomID, SchedulerID: activeScheduler.Name, Status: game_room.GameStatusError, Version: activeScheduler.Spec.Version}
}

// Select Terminating rooms to be re-deleted. This is useful for fixing any desync state.
if room.Status == game_room.GameStatusTerminating {
terminatingRooms = append(terminatingRooms, room)
continue
}

isRoomActive := room.Status == game_room.GameStatusOccupied || room.Status == game_room.GameStatusReady || room.Status == game_room.GameStatusPending
if sortByVersion && isRoomActive && activeScheduler.IsSameMajorVersion(room.Version) {
activeVersionRoomPool = append(activeVersionRoomPool, room)
} else {
toDeleteRooms = append(toDeleteRooms, room)
}
}
toDeleteRooms = append(toDeleteRooms, activeVersionRoomPool...)

logger.Debug("toDeleteRooms",
zap.Array("toDeleteRooms uncapped", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, room := range toDeleteRooms {
enc.AppendString(fmt.Sprintf("%s-%s-%s", room.ID, room.Version, room.Status.String()))
}
return nil
})),
zap.Int("amount", amount),
)

if len(toDeleteRooms) > amount {
toDeleteRooms = toDeleteRooms[:amount]
}

result := append(toDeleteRooms, terminatingRooms...)

return result, nil
}
2 changes: 1 addition & 1 deletion internal/core/operations/rooms/remove/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
const OperationName = "remove_rooms"

type Definition struct {
Amount int `json:"amount"`
// Amount int `json:"amount"`
RoomsIDs []string `json:"rooms_ids"`
Reason string `json:"reason"`
}
Expand Down
75 changes: 37 additions & 38 deletions internal/core/operations/rooms/remove/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/topfreegames/maestro/internal/core/ports"
porterrors "github.com/topfreegames/maestro/internal/core/ports/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -80,17 +79,17 @@ func (e *Executor) Execute(ctx context.Context, op *operation.Operation, definit
e.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("removed rooms: %v", removeDefinition.RoomsIDs))
}

if removeDefinition.Amount > 0 {
logger.Info("start removing rooms", zap.Int("amount", removeDefinition.Amount))
err := e.removeRoomsByAmount(ctx, logger, op.SchedulerName, removeDefinition.Amount, op, removeDefinition.Reason)
if err != nil {
reportDeletionFailedTotal(op.SchedulerName, op.ID)
logger.Warn("error removing rooms", zap.Error(err))
// if removeDefinition.Amount > 0 {
// logger.Info("start removing rooms", zap.Int("amount", removeDefinition.Amount))
// err := e.removeRoomsByAmount(ctx, logger, op.SchedulerName, removeDefinition.Amount, op, removeDefinition.Reason)
// if err != nil {
// reportDeletionFailedTotal(op.SchedulerName, op.ID)
// logger.Warn("error removing rooms", zap.Error(err))

return fmt.Errorf("error removing rooms by amount: %w", err)
}
e.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("removed %d rooms", removeDefinition.Amount))
}
// return fmt.Errorf("error removing rooms by amount: %w", err)
// }
// e.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("removed %d rooms", removeDefinition.Amount))
// }

logger.Info("finished deleting rooms")
return nil
Expand All @@ -111,33 +110,33 @@ func (e *Executor) removeRoomsByIDs(ctx context.Context, schedulerName string, r
return nil
}

func (e *Executor) removeRoomsByAmount(ctx context.Context, logger *zap.Logger, schedulerName string, amount int, op *operation.Operation, reason string) error {
activeScheduler, err := e.schedulerManager.GetActiveScheduler(ctx, schedulerName)
if err != nil {
return err
}

rooms, err := e.roomManager.ListRoomsWithDeletionPriority(ctx, activeScheduler, amount)
if err != nil {
return err
}

logger.Info("removing rooms by amount sorting by version",
zap.Array("rooms:", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, room := range rooms {
enc.AppendString(fmt.Sprintf("%s-%s-%s", room.ID, room.Version, room.Status.String()))
}
return nil
})),
)

err = e.deleteRooms(ctx, rooms, op, reason)
if err != nil {
return err
}

return nil
}
// func (e *Executor) removeRoomsByAmount(ctx context.Context, logger *zap.Logger, schedulerName string, amount int, op *operation.Operation, reason string) error {
// activeScheduler, err := e.schedulerManager.GetActiveScheduler(ctx, schedulerName)
// if err != nil {
// return err
// }

// rooms, err := e.roomManager.ListRoomsWithDeletionPriority(ctx, activeScheduler, amount)
// if err != nil {
// return err
// }

// logger.Info("removing rooms by amount sorting by version",
// zap.Array("rooms:", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
// for _, room := range rooms {
// enc.AppendString(fmt.Sprintf("%s-%s-%s", room.ID, room.Version, room.Status.String()))
// }
// return nil
// })),
// )

// err = e.deleteRooms(ctx, rooms, op, reason)
// if err != nil {
// return err
// }

// return nil
// }

func (e *Executor) deleteRooms(ctx context.Context, rooms []*game_room.GameRoom, op *operation.Operation, reason string) error {
errs, ctx := errgroup.WithContext(ctx)
Expand Down
3 changes: 3 additions & 0 deletions internal/core/ports/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Autoscaler interface {
CalculateDesiredNumberOfRooms(ctx context.Context, scheduler *entities.Scheduler) (int, error)
// CanDownscale returns true if the scheduler can downscale, false otherwise.
CanDownscale(ctx context.Context, scheduler *entities.Scheduler) (bool, error)
CanDownscaleToNextState(ctx context.Context, scheduler *entities.Scheduler, occupiedToBeDeleted, readyToBeDeleted int) (bool, error)
}

// Secondary ports (output, driven ports)
Expand All @@ -52,4 +53,6 @@ type Policy interface {
CalculateDesiredNumberOfRooms(policyParameters autoscaling.PolicyParameters, currentState policies.CurrentState) (desiredNumberOfRooms int, err error)
// CanDownscale returns true if the scheduler can downscale, false otherwise.
CanDownscale(policyParameters autoscaling.PolicyParameters, currentState policies.CurrentState) (bool, error)
// NextStateBuilder builds and return the next state of the scheduler considering rooms to be deleted.
NextStateBuilder(ctx context.Context, scheduler *entities.Scheduler, occupiedToBeDeleted, readyToBeDeleted int) (policies.CurrentState, error)
}
14 changes: 0 additions & 14 deletions internal/core/ports/room_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@ type RoomManager interface {
// SchedulerMaxSurge calculates the current scheduler max surge based on
// the number of rooms the scheduler has.
SchedulerMaxSurge(ctx context.Context, scheduler *entities.Scheduler) (int, error)
// ListRoomsWithDeletionPriority returns a specified number of rooms, following
// the priority of it being deleted
//
// The priority is:
//
// - On error rooms;
// - No ping received for x time rooms;
// - Pending rooms;
// - Ready rooms;
// - Occupied rooms;
//
// This function can return less rooms than the `amount` since it might not have
// enough rooms on the scheduler.
ListRoomsWithDeletionPriority(ctx context.Context, activeScheduler *entities.Scheduler, amount int) ([]*game_room.GameRoom, error)
// CleanRoomState cleans the remaining state of a room. This function is
// intended to be used after a `DeleteRoom`, where the room instance is
// signaled to terminate.
Expand Down
Loading

0 comments on commit 2293285

Please sign in to comment.