From 2293285f079ecacb263cd370813d49ec1b0107f2 Mon Sep 17 00:00:00 2001 From: Pedro Soares Date: Wed, 9 Oct 2024 11:50:42 -0300 Subject: [PATCH] wip --- .../operations/healthcontroller/executor.go | 163 ++++++++++++++++-- .../operations/rooms/remove/definition.go | 2 +- .../core/operations/rooms/remove/executor.go | 75 ++++---- internal/core/ports/autoscaler.go | 3 + internal/core/ports/room_ports.go | 14 -- .../core/services/autoscaler/autoscaler.go | 32 ++++ .../policies/roomoccupancy/policy.go | 21 +++ internal/core/services/rooms/room_manager.go | 100 ----------- 8 files changed, 240 insertions(+), 170 deletions(-) diff --git a/internal/core/operations/healthcontroller/executor.go b/internal/core/operations/healthcontroller/executor.go index 9106d7cdb..1f4169e56 100644 --- a/internal/core/operations/healthcontroller/executor.go +++ b/internal/core/operations/healthcontroller/executor.go @@ -24,6 +24,7 @@ package healthcontroller import ( "context" + "errors" "fmt" "math" "strconv" @@ -41,7 +42,10 @@ import ( "github.com/topfreegames/maestro/internal/core/entities/operation" "github.com/topfreegames/maestro/internal/core/operations" + porterrors "github.com/topfreegames/maestro/internal/core/ports/errors" + "github.com/topfreegames/maestro/internal/core/utils" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const ( @@ -127,7 +131,7 @@ func (ex *Executor) Execute(ctx context.Context, op *operation.Operation, defini ex.setTookAction(def, true) } - desiredNumberOfRooms, isRollingUpdating, err := GetDesiredNumberOfRooms(ctx, ex.autoscaler, ex.roomStorage, ex.schedulerStorage, logger, scheduler, availableRooms) + desiredNumberOfRooms, isRollingUpdating, err := GetDesiredNumberOfRooms(ctx, ex.autoscaler, ex.roomStorage, ex.schedulerStorage, logger, scheduler, ex.config, availableRooms) if err != nil { logger.Error("error getting the desired number of rooms", zap.Error(err)) return err @@ -198,20 +202,20 @@ func (ex *Executor) ensureDesiredAmountOfInstances(ctx context.Context, op *oper logger = logger.With(zap.Int("actual", actualAmount), zap.Int("desired", desiredAmount)) switch { case actualAmount > desiredAmount: // Need to scale down - removeAmount := actualAmount - desiredAmount - reason := remove.ScaleDown - if isRollingUpdating { - reason = remove.RollingUpdateReplace - } - removeOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &remove.Definition{ - Amount: removeAmount, - Reason: reason, - }) - if err != nil { - return err - } - tookAction = true - msgToAppend = fmt.Sprintf("created operation (id: %s) to remove %v rooms.", removeOperation.ID, removeAmount) + // removeAmount := actualAmount - desiredAmount + // reason := remove.ScaleDown + // if isRollingUpdating { + // reason = remove.RollingUpdateReplace + // } + // removeOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &remove.Definition{ + // Amount: removeAmount, + // Reason: reason, + // }) + // if err != nil { + // return err + // } + // tookAction = true + // msgToAppend = fmt.Sprintf("created operation (id: %s) to remove %v rooms.", removeOperation.ID, removeAmount) case actualAmount < desiredAmount: // Need to scale up addAmount := desiredAmount - actualAmount addOperation, err := ex.operationManager.CreatePriorityOperation(ctx, op.SchedulerName, &add.Definition{ @@ -345,12 +349,20 @@ func (ex *Executor) setTookAction(def *Definition, tookAction bool) { def.TookAction = &tookAction } -func CanPerformDownscale(ctx context.Context, autoscaler ports.Autoscaler, scheduler *entities.Scheduler, logger *zap.Logger) (bool, string) { +func CanPerformDownscale(ctx context.Context, autoscaler ports.Autoscaler, scheduler *entities.Scheduler, readyRoomsToBeDeleted, occupiedRoomsToBeDeleted int, logger *zap.Logger) (bool, string) { can, err := autoscaler.CanDownscale(ctx, scheduler) if err != nil { logger.Error("error checking if scheduler can downscale", zap.Error(err)) return can, err.Error() } + logger.Info("using current CanDownscale: ", zap.Bool("can", can)) + + can, err = autoscaler.CanDownscaleToNextState(ctx, scheduler, readyRoomsToBeDeleted, occupiedRoomsToBeDeleted) + if err != nil { + logger.Error("error checking if scheduler can downscale", zap.Error(err)) + return can, err.Error() + } + logger.Info("using CanDownscaleToNextState: ", zap.Bool("can", can)) if !can { message := fmt.Sprintf("scheduler %s can't downscale, occupation is above the threshold", scheduler.Name) @@ -390,6 +402,7 @@ func GetDesiredNumberOfRooms( schedulerStorage ports.SchedulerStorage, logger *zap.Logger, scheduler *entities.Scheduler, + config Config, availableRooms []string, ) (desiredNumber int, isRollingUpdating bool, err error) { desiredNumber = scheduler.RoomsReplicas @@ -424,9 +437,29 @@ func GetDesiredNumberOfRooms( } if len(availableRooms) > desiredNumber { + // Compute the next state to check if we can perform downscale + // 1. Get the rooms with deletion priority + // 2. Count how many ready rooms will be deleted + // 3. Check if we can delete without offending readyTarget + // 4. If we can't, keep the current number of rooms + // 5. If we can, update the scheduler.LastDownscaleAt and proceed with the downscale + roomsToBeDeleted, err := ListRoomsWithDeletionPriority(ctx, roomStorage, scheduler, len(availableRooms)-desiredNumber, isRollingUpdating, config, logger) + if err != nil { + logger.Error("error listing rooms with deletion priority, can not compute next state, skipping iteration", zap.Error(err)) + } + readyRoomsToBeDeleted := 0 + occupiedRoomsToBeDeleted := 0 + for _, room := range roomsToBeDeleted { + if room.Status == game_room.GameStatusReady { + readyRoomsToBeDeleted += 1 + } + if room.Status == game_room.GameStatusOccupied { + occupiedRoomsToBeDeleted += 1 + } + } willDownscale := true if scheduler.Autoscaling != nil && scheduler.Autoscaling.Enabled { - canDownscale, msg := CanPerformDownscale(ctx, autoscaler, scheduler, logger) + canDownscale, msg := CanPerformDownscale(ctx, autoscaler, scheduler, readyRoomsToBeDeleted, occupiedRoomsToBeDeleted, logger) if !canDownscale { willDownscale = false desiredNumber = len(availableRooms) @@ -513,3 +546,99 @@ func ComputeMaxSurge(scheduler *entities.Scheduler, desiredNumberOfRooms int) (m return maxSurgeNum, nil } + +// TODO: refactor to SchedulerController execute +// ListRoomsWithDeletionPriority returns a specified number of rooms, following +// the priority of it being deleted +// +// The priority is: +// +// - On error rooms; +// - No ping received for x time rooms; +// - Pending rooms; +// - Ready rooms; +// - Occupied rooms; +// +// This function can return less rooms than the `amount` since it might not have +// enough rooms on the scheduler. +func ListRoomsWithDeletionPriority(ctx context.Context, storage ports.RoomStorage, activeScheduler *entities.Scheduler, amount int, sortByVersion bool, config Config, logger *zap.Logger) ([]*game_room.GameRoom, error) { + var schedulerRoomsIDs []string + onErrorRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusError) + if err != nil { + return nil, fmt.Errorf("failed to list scheduler rooms on error: %w", err) + } + + oldLastPingRoomIDs, err := storage.GetRoomIDsByLastPing(ctx, activeScheduler.Name, time.Now().Add(config.RoomPingTimeout*-1)) + if err != nil { + return nil, fmt.Errorf("failed to list scheduler rooms with old last ping datetime: %w", err) + } + + pendingRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusPending) + if err != nil { + return nil, fmt.Errorf("failed to list scheduler rooms on pending status: %w", err) + } + + readyRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusReady) + if err != nil { + return nil, fmt.Errorf("failed to list scheduler rooms on ready status: %w", err) + } + + occupiedRoomIDs, err := storage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusOccupied) + if err != nil { + return nil, fmt.Errorf("failed to list scheduler rooms on occupied status: %w", err) + } + + schedulerRoomsIDs = append(schedulerRoomsIDs, onErrorRoomIDs...) + schedulerRoomsIDs = append(schedulerRoomsIDs, oldLastPingRoomIDs...) + schedulerRoomsIDs = append(schedulerRoomsIDs, pendingRoomIDs...) + schedulerRoomsIDs = append(schedulerRoomsIDs, readyRoomIDs...) + schedulerRoomsIDs = append(schedulerRoomsIDs, occupiedRoomIDs...) + + schedulerRoomsIDs = utils.RemoveDuplicates(schedulerRoomsIDs) + + var activeVersionRoomPool []*game_room.GameRoom + var toDeleteRooms []*game_room.GameRoom + var terminatingRooms []*game_room.GameRoom + for _, roomID := range schedulerRoomsIDs { + room, err := storage.GetRoom(ctx, activeScheduler.Name, roomID) + if err != nil { + if !errors.Is(err, porterrors.ErrNotFound) { + return nil, fmt.Errorf("failed to fetch room information: %w", err) + } + + room = &game_room.GameRoom{ID: roomID, SchedulerID: activeScheduler.Name, Status: game_room.GameStatusError, Version: activeScheduler.Spec.Version} + } + + // Select Terminating rooms to be re-deleted. This is useful for fixing any desync state. + if room.Status == game_room.GameStatusTerminating { + terminatingRooms = append(terminatingRooms, room) + continue + } + + isRoomActive := room.Status == game_room.GameStatusOccupied || room.Status == game_room.GameStatusReady || room.Status == game_room.GameStatusPending + if sortByVersion && isRoomActive && activeScheduler.IsSameMajorVersion(room.Version) { + activeVersionRoomPool = append(activeVersionRoomPool, room) + } else { + toDeleteRooms = append(toDeleteRooms, room) + } + } + toDeleteRooms = append(toDeleteRooms, activeVersionRoomPool...) + + logger.Debug("toDeleteRooms", + zap.Array("toDeleteRooms uncapped", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, room := range toDeleteRooms { + enc.AppendString(fmt.Sprintf("%s-%s-%s", room.ID, room.Version, room.Status.String())) + } + return nil + })), + zap.Int("amount", amount), + ) + + if len(toDeleteRooms) > amount { + toDeleteRooms = toDeleteRooms[:amount] + } + + result := append(toDeleteRooms, terminatingRooms...) + + return result, nil +} diff --git a/internal/core/operations/rooms/remove/definition.go b/internal/core/operations/rooms/remove/definition.go index 66555c1fe..272c0c468 100644 --- a/internal/core/operations/rooms/remove/definition.go +++ b/internal/core/operations/rooms/remove/definition.go @@ -44,7 +44,7 @@ const ( const OperationName = "remove_rooms" type Definition struct { - Amount int `json:"amount"` + // Amount int `json:"amount"` RoomsIDs []string `json:"rooms_ids"` Reason string `json:"reason"` } diff --git a/internal/core/operations/rooms/remove/executor.go b/internal/core/operations/rooms/remove/executor.go index 8f2e048b6..bf077180d 100644 --- a/internal/core/operations/rooms/remove/executor.go +++ b/internal/core/operations/rooms/remove/executor.go @@ -34,7 +34,6 @@ import ( "github.com/topfreegames/maestro/internal/core/ports" porterrors "github.com/topfreegames/maestro/internal/core/ports/errors" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" ) @@ -80,17 +79,17 @@ func (e *Executor) Execute(ctx context.Context, op *operation.Operation, definit e.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("removed rooms: %v", removeDefinition.RoomsIDs)) } - if removeDefinition.Amount > 0 { - logger.Info("start removing rooms", zap.Int("amount", removeDefinition.Amount)) - err := e.removeRoomsByAmount(ctx, logger, op.SchedulerName, removeDefinition.Amount, op, removeDefinition.Reason) - if err != nil { - reportDeletionFailedTotal(op.SchedulerName, op.ID) - logger.Warn("error removing rooms", zap.Error(err)) + // if removeDefinition.Amount > 0 { + // logger.Info("start removing rooms", zap.Int("amount", removeDefinition.Amount)) + // err := e.removeRoomsByAmount(ctx, logger, op.SchedulerName, removeDefinition.Amount, op, removeDefinition.Reason) + // if err != nil { + // reportDeletionFailedTotal(op.SchedulerName, op.ID) + // logger.Warn("error removing rooms", zap.Error(err)) - return fmt.Errorf("error removing rooms by amount: %w", err) - } - e.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("removed %d rooms", removeDefinition.Amount)) - } + // return fmt.Errorf("error removing rooms by amount: %w", err) + // } + // e.operationManager.AppendOperationEventToExecutionHistory(ctx, op, fmt.Sprintf("removed %d rooms", removeDefinition.Amount)) + // } logger.Info("finished deleting rooms") return nil @@ -111,33 +110,33 @@ func (e *Executor) removeRoomsByIDs(ctx context.Context, schedulerName string, r return nil } -func (e *Executor) removeRoomsByAmount(ctx context.Context, logger *zap.Logger, schedulerName string, amount int, op *operation.Operation, reason string) error { - activeScheduler, err := e.schedulerManager.GetActiveScheduler(ctx, schedulerName) - if err != nil { - return err - } - - rooms, err := e.roomManager.ListRoomsWithDeletionPriority(ctx, activeScheduler, amount) - if err != nil { - return err - } - - logger.Info("removing rooms by amount sorting by version", - zap.Array("rooms:", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { - for _, room := range rooms { - enc.AppendString(fmt.Sprintf("%s-%s-%s", room.ID, room.Version, room.Status.String())) - } - return nil - })), - ) - - err = e.deleteRooms(ctx, rooms, op, reason) - if err != nil { - return err - } - - return nil -} +// func (e *Executor) removeRoomsByAmount(ctx context.Context, logger *zap.Logger, schedulerName string, amount int, op *operation.Operation, reason string) error { +// activeScheduler, err := e.schedulerManager.GetActiveScheduler(ctx, schedulerName) +// if err != nil { +// return err +// } + +// rooms, err := e.roomManager.ListRoomsWithDeletionPriority(ctx, activeScheduler, amount) +// if err != nil { +// return err +// } + +// logger.Info("removing rooms by amount sorting by version", +// zap.Array("rooms:", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { +// for _, room := range rooms { +// enc.AppendString(fmt.Sprintf("%s-%s-%s", room.ID, room.Version, room.Status.String())) +// } +// return nil +// })), +// ) + +// err = e.deleteRooms(ctx, rooms, op, reason) +// if err != nil { +// return err +// } + +// return nil +// } func (e *Executor) deleteRooms(ctx context.Context, rooms []*game_room.GameRoom, op *operation.Operation, reason string) error { errs, ctx := errgroup.WithContext(ctx) diff --git a/internal/core/ports/autoscaler.go b/internal/core/ports/autoscaler.go index 2d610fb50..08b63c1d5 100644 --- a/internal/core/ports/autoscaler.go +++ b/internal/core/ports/autoscaler.go @@ -38,6 +38,7 @@ type Autoscaler interface { CalculateDesiredNumberOfRooms(ctx context.Context, scheduler *entities.Scheduler) (int, error) // CanDownscale returns true if the scheduler can downscale, false otherwise. CanDownscale(ctx context.Context, scheduler *entities.Scheduler) (bool, error) + CanDownscaleToNextState(ctx context.Context, scheduler *entities.Scheduler, occupiedToBeDeleted, readyToBeDeleted int) (bool, error) } // Secondary ports (output, driven ports) @@ -52,4 +53,6 @@ type Policy interface { CalculateDesiredNumberOfRooms(policyParameters autoscaling.PolicyParameters, currentState policies.CurrentState) (desiredNumberOfRooms int, err error) // CanDownscale returns true if the scheduler can downscale, false otherwise. CanDownscale(policyParameters autoscaling.PolicyParameters, currentState policies.CurrentState) (bool, error) + // NextStateBuilder builds and return the next state of the scheduler considering rooms to be deleted. + NextStateBuilder(ctx context.Context, scheduler *entities.Scheduler, occupiedToBeDeleted, readyToBeDeleted int) (policies.CurrentState, error) } diff --git a/internal/core/ports/room_ports.go b/internal/core/ports/room_ports.go index fd19dc873..2084f2062 100644 --- a/internal/core/ports/room_ports.go +++ b/internal/core/ports/room_ports.go @@ -40,20 +40,6 @@ type RoomManager interface { // SchedulerMaxSurge calculates the current scheduler max surge based on // the number of rooms the scheduler has. SchedulerMaxSurge(ctx context.Context, scheduler *entities.Scheduler) (int, error) - // ListRoomsWithDeletionPriority returns a specified number of rooms, following - // the priority of it being deleted - // - // The priority is: - // - // - On error rooms; - // - No ping received for x time rooms; - // - Pending rooms; - // - Ready rooms; - // - Occupied rooms; - // - // This function can return less rooms than the `amount` since it might not have - // enough rooms on the scheduler. - ListRoomsWithDeletionPriority(ctx context.Context, activeScheduler *entities.Scheduler, amount int) ([]*game_room.GameRoom, error) // CleanRoomState cleans the remaining state of a room. This function is // intended to be used after a `DeleteRoom`, where the room instance is // signaled to terminate. diff --git a/internal/core/services/autoscaler/autoscaler.go b/internal/core/services/autoscaler/autoscaler.go index 1b9cb9af3..bf6a6ff8a 100644 --- a/internal/core/services/autoscaler/autoscaler.go +++ b/internal/core/services/autoscaler/autoscaler.go @@ -98,11 +98,43 @@ func (a *Autoscaler) CanDownscale(ctx context.Context, scheduler *entities.Sched if err != nil { return false, fmt.Errorf("error fetching current state to scheduler %s: %w", scheduler.Name, err) } + fmt.Println("currentState", currentState) canDownscale, err := policy.CanDownscale(scheduler.Autoscaling.Policy.Parameters, currentState) if err != nil { return false, fmt.Errorf("error checking if scheduler %s can downscale: %w", scheduler.Name, err) } + fmt.Println("currentState canDownscale", canDownscale) + + return canDownscale, nil +} + +func (a *Autoscaler) CanDownscaleToNextState(ctx context.Context, scheduler *entities.Scheduler, occupiedToBeDeleted, readyToBeDeleted int) (bool, error) { + if scheduler.Autoscaling == nil { + return false, errors.New("scheduler does not have autoscaling struct") + } + + if !scheduler.Autoscaling.Enabled { + return false, errors.New("scheduler does not have autoscaling enabled") + } + + if _, ok := a.policyMap[scheduler.Autoscaling.Policy.Type]; !ok { + return false, fmt.Errorf("error finding policy to scheduler %s", scheduler.Name) + } + + policy := a.policyMap[scheduler.Autoscaling.Policy.Type] + + nextState, err := policy.NextStateBuilder(ctx, scheduler, occupiedToBeDeleted, readyToBeDeleted) + if err != nil { + return false, fmt.Errorf("error fetching current state to scheduler %s: %w", scheduler.Name, err) + } + fmt.Println("nextState", nextState) + + canDownscale, err := policy.CanDownscale(scheduler.Autoscaling.Policy.Parameters, nextState) + if err != nil { + return false, fmt.Errorf("error checking if scheduler %s can downscale: %w", scheduler.Name, err) + } + fmt.Println("nextState canDownscale", canDownscale) return canDownscale, nil } diff --git a/internal/core/services/autoscaler/policies/roomoccupancy/policy.go b/internal/core/services/autoscaler/policies/roomoccupancy/policy.go index 700ae873e..dd1bb5fb2 100644 --- a/internal/core/services/autoscaler/policies/roomoccupancy/policy.go +++ b/internal/core/services/autoscaler/policies/roomoccupancy/policy.go @@ -77,6 +77,27 @@ func (p *Policy) CurrentStateBuilder(ctx context.Context, scheduler *entities.Sc return currentState, nil } +func (p *Policy) NextStateBuilder(ctx context.Context, scheduler *entities.Scheduler, occupiedToBeDeleted, readyToBeDeleted int) (policies.CurrentState, error) { + occupiedRoomsAmount, err := p.roomStorage.GetRoomCountByStatus(ctx, scheduler.Name, game_room.GameStatusOccupied) + if err != nil { + return nil, fmt.Errorf("error fetching occupied game rooms amount: %w", err) + } + occupiedRoomsAmount -= occupiedToBeDeleted + + readyRoomsAmount, err := p.roomStorage.GetRoomCountByStatus(ctx, scheduler.Name, game_room.GameStatusReady) + if err != nil { + return nil, fmt.Errorf("error fetching ready game rooms amount: %w", err) + } + readyRoomsAmount -= readyToBeDeleted + + currentState := policies.CurrentState{ + OccupiedRoomsKey: occupiedRoomsAmount, + ReadyRoomsKey: readyRoomsAmount, + } + + return currentState, nil +} + // CalculateDesiredNumberOfRooms executes to knows how many rooms should a scheduler have based on your current state. func (p *Policy) CalculateDesiredNumberOfRooms(policyParameters autoscaling.PolicyParameters, currentState policies.CurrentState) (int, error) { if policyParameters.RoomOccupancy == nil { diff --git a/internal/core/services/rooms/room_manager.go b/internal/core/services/rooms/room_manager.go index 6eeb4966b..3f5ba8baa 100644 --- a/internal/core/services/rooms/room_manager.go +++ b/internal/core/services/rooms/room_manager.go @@ -29,7 +29,6 @@ import ( "math" "strconv" "strings" - "time" serviceerrors "github.com/topfreegames/maestro/internal/core/services/errors" @@ -37,7 +36,6 @@ import ( "github.com/topfreegames/maestro/internal/core/logs" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "github.com/topfreegames/maestro/internal/core/entities" "github.com/topfreegames/maestro/internal/core/entities/game_room" @@ -201,89 +199,6 @@ func (m *RoomManager) CleanRoomState(ctx context.Context, schedulerName, roomId return nil } -func (m *RoomManager) ListRoomsWithDeletionPriority(ctx context.Context, activeScheduler *entities.Scheduler, amount int) ([]*game_room.GameRoom, error) { - - var schedulerRoomsIDs []string - onErrorRoomIDs, err := m.RoomStorage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusError) - if err != nil { - return nil, fmt.Errorf("failed to list scheduler rooms on error: %w", err) - } - - oldLastPingRoomIDs, err := m.RoomStorage.GetRoomIDsByLastPing(ctx, activeScheduler.Name, time.Now().Add(m.Config.RoomPingTimeout*-1)) - if err != nil { - return nil, fmt.Errorf("failed to list scheduler rooms with old last ping datetime: %w", err) - } - - pendingRoomIDs, err := m.RoomStorage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusPending) - if err != nil { - return nil, fmt.Errorf("failed to list scheduler rooms on pending status: %w", err) - } - - readyRoomIDs, err := m.RoomStorage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusReady) - if err != nil { - return nil, fmt.Errorf("failed to list scheduler rooms on ready status: %w", err) - } - - occupiedRoomIDs, err := m.RoomStorage.GetRoomIDsByStatus(ctx, activeScheduler.Name, game_room.GameStatusOccupied) - if err != nil { - return nil, fmt.Errorf("failed to list scheduler rooms on occupied status: %w", err) - } - - schedulerRoomsIDs = append(schedulerRoomsIDs, onErrorRoomIDs...) - schedulerRoomsIDs = append(schedulerRoomsIDs, oldLastPingRoomIDs...) - schedulerRoomsIDs = append(schedulerRoomsIDs, pendingRoomIDs...) - schedulerRoomsIDs = append(schedulerRoomsIDs, readyRoomIDs...) - schedulerRoomsIDs = append(schedulerRoomsIDs, occupiedRoomIDs...) - - schedulerRoomsIDs = removeDuplicateValues(schedulerRoomsIDs) - - var activeVersionRoomPool []*game_room.GameRoom - var toDeleteRooms []*game_room.GameRoom - var terminatingRooms []*game_room.GameRoom - for _, roomID := range schedulerRoomsIDs { - room, err := m.RoomStorage.GetRoom(ctx, activeScheduler.Name, roomID) - if err != nil { - if !errors.Is(err, porterrors.ErrNotFound) { - return nil, fmt.Errorf("failed to fetch room information: %w", err) - } - - room = &game_room.GameRoom{ID: roomID, SchedulerID: activeScheduler.Name, Status: game_room.GameStatusError, Version: activeScheduler.Spec.Version} - } - - // Select Terminating rooms to be re-deleted. This is useful for fixing any desync state. - if room.Status == game_room.GameStatusTerminating { - terminatingRooms = append(terminatingRooms, room) - continue - } - - isRoomActive := room.Status == game_room.GameStatusOccupied || room.Status == game_room.GameStatusReady || room.Status == game_room.GameStatusPending - if isRoomActive && activeScheduler.IsSameMajorVersion(room.Version) { - activeVersionRoomPool = append(activeVersionRoomPool, room) - } else { - toDeleteRooms = append(toDeleteRooms, room) - } - } - toDeleteRooms = append(toDeleteRooms, activeVersionRoomPool...) - - m.Logger.Debug("toDeleteRooms", - zap.Array("toDeleteRooms uncapped", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { - for _, room := range toDeleteRooms { - enc.AppendString(fmt.Sprintf("%s-%s-%s", room.ID, room.Version, room.Status.String())) - } - return nil - })), - zap.Int("amount", amount), - ) - - if len(toDeleteRooms) > amount { - toDeleteRooms = toDeleteRooms[:amount] - } - - result := append(toDeleteRooms, terminatingRooms...) - - return result, nil -} - func (m *RoomManager) SchedulerMaxSurge(ctx context.Context, scheduler *entities.Scheduler) (int, error) { if scheduler.MaxSurge == "" { return minSchedulerMaxSurge, nil @@ -500,21 +415,6 @@ func (m *RoomManager) forwardStatusTerminatingEvent(ctx context.Context, room *g } } -func removeDuplicateValues(slice []string) []string { - check := make(map[string]int) - res := make([]string, 0) - for _, val := range slice { - if check[val] == 1 { - continue - } - - check[val] = 1 - res = append(res, val) - } - - return res -} - func contains[T comparable](s []T, e T) bool { for _, v := range s { if v == e {