diff --git a/config/config.yaml b/config/config.yaml index bdcc71bfb..a40ea9cc6 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -49,6 +49,10 @@ adapters: inCluster: false masterUrl: "https://127.0.0.1:6443" kubeconfig: "./kubeconfig/kubeconfig.yaml" + topologySpreadConstraint: + maxSkew: 5 + topologyKey: "topology.kubernetes.io/zone" + whenUnsatisfiableScheduleAnyway: false grpc: keepAlive: time: 30s diff --git a/internal/adapters/runtime/kubernetes/game_room.go b/internal/adapters/runtime/kubernetes/game_room.go index e30338952..858b4320c 100644 --- a/internal/adapters/runtime/kubernetes/game_room.go +++ b/internal/adapters/runtime/kubernetes/game_room.go @@ -40,7 +40,7 @@ const ( ) func (k *kubernetes) CreateGameRoomInstance(ctx context.Context, scheduler *entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*game_room.Instance, error) { - pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec) + pod, err := convertGameRoomSpec(*scheduler, gameRoomName, gameRoomSpec, k.config) if err != nil { return nil, errors.NewErrInvalidArgument("invalid game room spec: %s", err) } diff --git a/internal/adapters/runtime/kubernetes/game_room_convert.go b/internal/adapters/runtime/kubernetes/game_room_convert.go index 9c981226a..47bd95829 100644 --- a/internal/adapters/runtime/kubernetes/game_room_convert.go +++ b/internal/adapters/runtime/kubernetes/game_room_convert.go @@ -67,7 +67,7 @@ var invalidPodWaitingStates = []string{ "RunContainerError", } -func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec) (*v1.Pod, error) { +func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, gameRoomSpec game_room.Spec, config KubernetesConfig) (*v1.Pod, error) { defaultAnnotations := map[string]string{safeToEvictAnnotation: safeToEvictValue} defaultLabels := map[string]string{ maestroLabelKey: maestroLabelValue, @@ -75,6 +75,10 @@ func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, game versionLabelKey: gameRoomSpec.Version, } + whenUnsatisfiable := v1.DoNotSchedule + if config.WhenUnsatisfiableScheduleAnyway { + whenUnsatisfiable = v1.ScheduleAnyway + } pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: gameRoomName, @@ -95,9 +99,9 @@ func convertGameRoomSpec(scheduler entities.Scheduler, gameRoomName string, game // 5. Add a conversion function TopologySpreadConstraints: []v1.TopologySpreadConstraint{ { - MaxSkew: 1, - TopologyKey: "topology.kubernetes.io/zone", - WhenUnsatisfiable: v1.ScheduleAnyway, + MaxSkew: int32(config.MaxSkew), + TopologyKey: config.TopologyKey, + WhenUnsatisfiable: whenUnsatisfiable, LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ schedulerLabelKey: scheduler.Name, diff --git a/internal/adapters/runtime/kubernetes/game_room_convert_test.go b/internal/adapters/runtime/kubernetes/game_room_convert_test.go index 79e4bf688..e3475e45a 100644 --- a/internal/adapters/runtime/kubernetes/game_room_convert_test.go +++ b/internal/adapters/runtime/kubernetes/game_room_convert_test.go @@ -675,7 +675,7 @@ func TestConvertGameSpec(t *testing.T) { // Name: test.scheduler, // Annotations: test.expectedPod.ObjectMeta.Annotations, //} - res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec) + res, err := convertGameRoomSpec(test.scheduler, test.roomName, test.gameSpec, KubernetesConfig{MaxSkew: 1}) if test.withError { require.Error(t, err) return diff --git a/internal/adapters/runtime/kubernetes/game_room_test.go b/internal/adapters/runtime/kubernetes/game_room_test.go index 9e6901338..0ef4047e3 100644 --- a/internal/adapters/runtime/kubernetes/game_room_test.go +++ b/internal/adapters/runtime/kubernetes/game_room_test.go @@ -44,7 +44,7 @@ func TestGameRoomCreation(t *testing.T) { ctx := context.Background() gameRoomName := "some-game-room-name" client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) t.Run("successfully create a room", func(t *testing.T) { t.Parallel() @@ -98,7 +98,7 @@ func TestGameRoomDeletion(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) gameRoomName := "some-game-room" - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) t.Run("successfully delete a room", func(t *testing.T) { t.Parallel() @@ -155,7 +155,7 @@ func TestCreateGameRoomName(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) t.Run("When scheduler name is greater than max name length minus randomLength", func(t *testing.T) { t.Run("return the name with randomLength", func(t *testing.T) { diff --git a/internal/adapters/runtime/kubernetes/game_room_watcher_test.go b/internal/adapters/runtime/kubernetes/game_room_watcher_test.go index 35a6f2aea..5832fd5b0 100644 --- a/internal/adapters/runtime/kubernetes/game_room_watcher_test.go +++ b/internal/adapters/runtime/kubernetes/game_room_watcher_test.go @@ -45,7 +45,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) scheduler := &entities.Scheduler{Name: "watch-room-addition"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) @@ -96,7 +96,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) scheduler := &entities.Scheduler{Name: "watch-room-ready"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) @@ -158,7 +158,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) scheduler := &entities.Scheduler{Name: "watch-room-error"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) @@ -213,7 +213,7 @@ func TestGameRoomsWatch(t *testing.T) { t.Parallel() ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) scheduler := &entities.Scheduler{Name: "watch-room-delete"} err := kubernetesRuntime.CreateScheduler(ctx, scheduler) diff --git a/internal/adapters/runtime/kubernetes/kubernetes.go b/internal/adapters/runtime/kubernetes/kubernetes.go index aeca5e3ba..0607fa9fc 100644 --- a/internal/adapters/runtime/kubernetes/kubernetes.go +++ b/internal/adapters/runtime/kubernetes/kubernetes.go @@ -35,16 +35,35 @@ import ( var _ ports.Runtime = (*kubernetes)(nil) +const ( + DefaultMaxSkew = 5 + DefaultTopologyKey = "topology.kubernetes.io/zone" +) + +type KubernetesConfig struct { + MaxSkew int + TopologyKey string + WhenUnsatisfiableScheduleAnyway bool +} + type kubernetes struct { clientSet kube.Interface logger *zap.Logger eventRecorder record.EventRecorder + config KubernetesConfig } -func New(clientSet kube.Interface) *kubernetes { +func New(clientSet kube.Interface, config KubernetesConfig) *kubernetes { + if config.MaxSkew <= 0 { + config.MaxSkew = DefaultMaxSkew + } + if config.TopologyKey == "" { + config.TopologyKey = DefaultTopologyKey + } k := &kubernetes{ clientSet: clientSet, logger: zap.L().With(zap.String(logs.LogFieldRuntime, "kubernetes")), + config: config, } eventBroadcaster := record.NewBroadcaster() diff --git a/internal/adapters/runtime/kubernetes/scheduler_test.go b/internal/adapters/runtime/kubernetes/scheduler_test.go index c80fff3f1..6ae45980a 100644 --- a/internal/adapters/runtime/kubernetes/scheduler_test.go +++ b/internal/adapters/runtime/kubernetes/scheduler_test.go @@ -41,7 +41,7 @@ import ( func TestSchedulerCreation(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) t.Run("create single scheduler", func(t *testing.T) { scheduler := &entities.Scheduler{Name: "single-scheduler-test", PdbMaxUnavailable: "5%"} @@ -66,7 +66,7 @@ func TestSchedulerCreation(t *testing.T) { func TestSchedulerDeletion(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) t.Run("delete scheduler", func(t *testing.T) { scheduler := &entities.Scheduler{Name: "delete-scheduler-test", PdbMaxUnavailable: "5%"} @@ -92,7 +92,7 @@ func TestSchedulerDeletion(t *testing.T) { func TestPDBCreationAndDeletion(t *testing.T) { ctx := context.Background() client := test.GetKubernetesClientSet(t, kubernetesContainer) - kubernetesRuntime := New(client) + kubernetesRuntime := New(client, KubernetesConfig{MaxSkew: 1}) t.Run("create pdb from scheduler without autoscaling", func(t *testing.T) { if !kubernetesRuntime.isPDBSupported() { diff --git a/internal/service/adapters.go b/internal/service/adapters.go index ae3f8bd5f..7d1f6d3bf 100644 --- a/internal/service/adapters.go +++ b/internal/service/adapters.go @@ -69,11 +69,14 @@ const ( grpcKeepAliveTimePath = "adapters.grpc.keepalive.time" grpcKeepAliveTimeoutPath = "adapters.grpc.keepalive.timeout" // Kubernetes runtime - runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl" - runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig" - runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster" - runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps" - runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst" + runtimeKubernetesMasterURLPath = "adapters.runtime.kubernetes.masterUrl" + runtimeKubernetesKubeconfigPath = "adapters.runtime.kubernetes.kubeconfig" + runtimeKubernetesInClusterPath = "adapters.runtime.kubernetes.inCluster" + runtimeKubernetesQPS = "adapters.runtime.kubernetes.qps" + runtimeKubernetesBurst = "adapters.runtime.kubernetes.burst" + runtimeKubernetesMaxSkew = "adapters.runtime.kubernetes.topologySpreadConstraint.maxSkew" + runtimeKubernetesTopologyKey = "adapters.runtime.kubernetes.topologySpreadConstraint.topologyKey" + runtimeKubernetesWhenUnsatisfiableScheduleAnyway = "adapters.runtime.kubernetes.topologySpreadConstraint.whenUnsatisfiableScheduleAnyway" // Redis operation storage operationStorageRedisURLPath = "adapters.operationStorage.redis.url" operationLeaseStorageRedisURLPath = "adapters.operationLeaseStorage.redis.url" @@ -146,7 +149,11 @@ func NewRuntimeKubernetes(c config.Config) (ports.Runtime, error) { return nil, fmt.Errorf("failed to initialize Kubernetes runtime: %w", err) } - return kubernetesRuntime.New(clientSet), nil + return kubernetesRuntime.New(clientSet, kubernetesRuntime.KubernetesConfig{ + MaxSkew: c.GetInt(runtimeKubernetesMaxSkew), + TopologyKey: c.GetString(runtimeKubernetesTopologyKey), + WhenUnsatisfiableScheduleAnyway: c.GetBool(runtimeKubernetesWhenUnsatisfiableScheduleAnyway), + }), nil } // NewOperationStorageRedis instantiates redis as operation storage.