From cae3de11d8dbd5eba23a6c35dcd632f6d85abe0d Mon Sep 17 00:00:00 2001 From: jsun-m <91754185+jsun-m@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:40:53 -0700 Subject: [PATCH] Feat: Add auxiliary state to help check the runtime status of a stub (#771) Added a new redis value `scheduler:stub:unhealthy_state:[stub_id]` That stores the state of a given deployment/stub. Currently state can be `degraded` or `warning`. A nonexistent value for this variable indicates that the deployment currently has no runtime issues and is healthy. `degraded` occurs once the `max container failure` threshold is reached `warning` occurs when at least 1 failed container occurs in the past 5 minutes (5 minutes before the container exit code value expires) `healthy` is the state when 0 containers failed in the past 5 minutes. !!!Caveat: That means even if the containers spin down after failing and there are 0 running containers, it is considered `healthy` --- pkg/abstractions/common/instance.go | 38 ++++++++++++++++++++++-- pkg/abstractions/endpoint/endpoint.go | 1 + pkg/abstractions/taskqueue/taskqueue.go | 1 + pkg/common/keys.go | 5 ++++ pkg/repository/base.go | 6 +++- pkg/repository/container_redis.go | 39 ++++++++++++++++++++----- pkg/repository/events.go | 16 ++++++++++ pkg/types/event.go | 12 ++++++++ pkg/types/scheduler.go | 6 ++++ 9 files changed, 113 insertions(+), 11 deletions(-) diff --git a/pkg/abstractions/common/instance.go b/pkg/abstractions/common/instance.go index 60ccc2fec..f07de24d4 100644 --- a/pkg/abstractions/common/instance.go +++ b/pkg/abstractions/common/instance.go @@ -23,7 +23,7 @@ type AutoscaledInstanceState struct { RunningContainers int PendingContainers int StoppingContainers int - FailedContainers int + FailedContainers []string } type AutoscaledInstanceConfig struct { @@ -39,6 +39,7 @@ type AutoscaledInstanceConfig struct { InstanceLockKey string ContainerRepo repository.ContainerRepository BackendRepo repository.BackendRepository + EventRepo repository.EventRepository TaskRepo repository.TaskRepository StartContainersFunc func(containersToRun int) error StopContainersFunc func(containersToStop int) error @@ -73,6 +74,7 @@ type AutoscaledInstance struct { ContainerRepo repository.ContainerRepository BackendRepo repository.BackendRepository TaskRepo repository.TaskRepository + EventRepo repository.EventRepository // Keys InstanceLockKey string @@ -109,6 +111,7 @@ func NewAutoscaledInstance(ctx context.Context, cfg *AutoscaledInstanceConfig) ( ContainerRepo: cfg.ContainerRepo, BackendRepo: cfg.BackendRepo, TaskRepo: cfg.TaskRepo, + EventRepo: cfg.EventRepo, Containers: make(map[string]bool), ContainerEventChan: make(chan types.ContainerEvent, 1), ScaleEventChan: make(chan int, 1), @@ -236,7 +239,7 @@ func (i *AutoscaledInstance) HandleScalingEvent(desiredContainers int) error { return err } - if state.FailedContainers >= i.FailedContainerThreshold { + if len(state.FailedContainers) >= i.FailedContainerThreshold { log.Printf("<%s> reached failed container threshold, scaling to zero.\n", i.Name) desiredContainers = 0 } @@ -258,6 +261,8 @@ func (i *AutoscaledInstance) HandleScalingEvent(desiredContainers int) error { err = i.StopContainersFunc(-containerDelta) } + go i.handleStubEvents(state.FailedContainers) + return err } @@ -267,7 +272,7 @@ func (i *AutoscaledInstance) State() (*AutoscaledInstanceState, error) { return nil, err } - failedContainers, err := i.ContainerRepo.GetFailedContainerCountByStubId(i.Stub.ExternalId) + failedContainers, err := i.ContainerRepo.GetFailedContainersByStubId(i.Stub.ExternalId) if err != nil { return nil, err } @@ -287,3 +292,30 @@ func (i *AutoscaledInstance) State() (*AutoscaledInstanceState, error) { state.FailedContainers = failedContainers return &state, nil } + +func (i *AutoscaledInstance) handleStubEvents(failedContainers []string) { + if len(failedContainers) >= i.FailedContainerThreshold { + i.emitUnhealthyEvent(i.Stub.ExternalId, types.StubStateDegraded, "reached max failed container threshold", failedContainers) + } else if len(failedContainers) > 0 { + i.emitUnhealthyEvent(i.Stub.ExternalId, types.StubStateWarning, "one or more containers failed", failedContainers) + } +} + +func (i *AutoscaledInstance) emitUnhealthyEvent(stubId, currentState, reason string, containers []string) { + var state string + state, err := i.ContainerRepo.GetStubState(stubId) + if err != nil { + return + } + + if state == currentState { + return + } + + err = i.ContainerRepo.SetStubState(stubId, currentState) + if err != nil { + return + } + + go i.EventRepo.PushStubStateUnhealthy(i.Workspace.ExternalId, stubId, currentState, state, reason, containers) +} diff --git a/pkg/abstractions/endpoint/endpoint.go b/pkg/abstractions/endpoint/endpoint.go index fefe00db1..fa711ea4b 100644 --- a/pkg/abstractions/endpoint/endpoint.go +++ b/pkg/abstractions/endpoint/endpoint.go @@ -258,6 +258,7 @@ func (es *HttpEndpointService) getOrCreateEndpointInstance(ctx context.Context, Scheduler: es.scheduler, ContainerRepo: es.containerRepo, BackendRepo: es.backendRepo, + EventRepo: es.eventRepo, TaskRepo: es.taskRepo, InstanceLockKey: Keys.endpointInstanceLock(stub.Workspace.Name, stubId), StartContainersFunc: instance.startContainers, diff --git a/pkg/abstractions/taskqueue/taskqueue.go b/pkg/abstractions/taskqueue/taskqueue.go index 4a8fd1eaf..f0f6f8cc0 100644 --- a/pkg/abstractions/taskqueue/taskqueue.go +++ b/pkg/abstractions/taskqueue/taskqueue.go @@ -567,6 +567,7 @@ func (tq *RedisTaskQueue) getOrCreateQueueInstance(stubId string, options ...fun Scheduler: tq.scheduler, ContainerRepo: tq.containerRepo, BackendRepo: tq.backendRepo, + EventRepo: tq.eventRepo, TaskRepo: tq.taskRepo, InstanceLockKey: Keys.taskQueueInstanceLock(stub.Workspace.Name, stubId), StartContainersFunc: instance.startContainers, diff --git a/pkg/common/keys.go b/pkg/common/keys.go index 28cac5151..f54d2535d 100644 --- a/pkg/common/keys.go +++ b/pkg/common/keys.go @@ -21,6 +21,7 @@ var ( schedulerContainerLock string = "scheduler:container:lock:%s" schedulerContainerExitCode string = "scheduler:container:exit_code:%s" schedulerCheckpointState string = "scheduler:checkpoint_state:%s:%s" + schedulerStubState string = "scheduler:stub:state:%s" ) var ( @@ -146,6 +147,10 @@ func (rk *redisKeys) SchedulerCheckpointState(workspaceName, checkpointId string return fmt.Sprintf(schedulerCheckpointState, workspaceName, checkpointId) } +func (rk *redisKeys) SchedulerStubState(stubId string) string { + return fmt.Sprintf(schedulerStubState, stubId) +} + // Gateway keys func (rk *redisKeys) GatewayPrefix() string { return gatewayPrefix diff --git a/pkg/repository/base.go b/pkg/repository/base.go index 5e04ee11c..9ce4471a8 100755 --- a/pkg/repository/base.go +++ b/pkg/repository/base.go @@ -55,9 +55,12 @@ type ContainerRepository interface { GetActiveContainersByStubId(stubId string) ([]types.ContainerState, error) GetActiveContainersByWorkspaceId(workspaceId string) ([]types.ContainerState, error) GetActiveContainersByWorkerId(workerId string) ([]types.ContainerState, error) - GetFailedContainerCountByStubId(stubId string) (int, error) + GetFailedContainersByStubId(stubId string) ([]string, error) UpdateCheckpointState(workspaceName, checkpointId string, checkpointState *types.CheckpointState) error GetCheckpointState(workspaceName, checkpointId string) (*types.CheckpointState, error) + GetStubState(stubId string) (string, error) + SetStubState(stubId, state string) error + DeleteStubState(stubId string) error } type WorkspaceRepository interface { @@ -180,6 +183,7 @@ type EventRepository interface { PushRunStubEvent(workspaceId string, stub *types.Stub) PushTaskUpdatedEvent(task *types.TaskWithRelated) PushTaskCreatedEvent(task *types.TaskWithRelated) + PushStubStateUnhealthy(workspaceId string, stubId string, currentState, previousState string, reason string, failedContainers []string) } type MetricsRepository interface { diff --git a/pkg/repository/container_redis.go b/pkg/repository/container_redis.go index f2c2be0fb..b003e4701 100644 --- a/pkg/repository/container_redis.go +++ b/pkg/repository/container_redis.go @@ -328,33 +328,33 @@ func (cr *ContainerRedisRepository) GetActiveContainersByWorkerId(workerId strin return cr.listContainerStateByIndex(indexKey, keys) } -func (cr *ContainerRedisRepository) GetFailedContainerCountByStubId(stubId string) (int, error) { +func (cr *ContainerRedisRepository) GetFailedContainersByStubId(stubId string) ([]string, error) { indexKey := common.RedisKeys.SchedulerContainerIndex(stubId) keys, err := cr.rdb.SMembers(context.TODO(), indexKey).Result() if err != nil { - return -1, err + return nil, fmt.Errorf("failed to retrieve container state keys: %v", err) } // Retrieve the value (exit code) for each key - failedCount := 0 + failedContainerIds := make([]string, 0) for _, key := range keys { containerId := strings.Split(key, ":")[len(strings.Split(key, ":"))-1] exitCodeKey := common.RedisKeys.SchedulerContainerExitCode(containerId) exitCode, err := cr.rdb.Get(context.Background(), exitCodeKey).Int() if err != nil && err != redis.Nil { - return -1, fmt.Errorf("failed to get value for key <%v>: %w", key, err) + return nil, fmt.Errorf("failed to get value for key <%v>: %w", key, err) } else if err == redis.Nil { continue } // Check if the exit code is non-zero if exitCode != 0 { - failedCount++ + failedContainerIds = append(failedContainerIds, containerId) } } - return failedCount, nil + return failedContainerIds, nil } func (c *ContainerRedisRepository) SetContainerStateWithConcurrencyLimit(quota *types.ConcurrencyLimit, request *types.ContainerRequest) error { @@ -428,7 +428,7 @@ func (cr *ContainerRedisRepository) UpdateCheckpointState(workspaceName, checkpo "stub_id", checkpointState.StubId, "container_id", checkpointState.ContainerId, "status", string(checkpointState.Status), - "remote_key", checkpointState.RemoteKey, + "remote_key", checkpointState.RemoteKey, ).Err() if err != nil { return fmt.Errorf("failed to set checkpoint state <%v>: %w", stateKey, err) @@ -456,3 +456,28 @@ func (cr *ContainerRedisRepository) GetCheckpointState(workspaceName, checkpoint return state, nil } + +func (cr *ContainerRedisRepository) GetStubState(stubId string) (string, error) { + stateKey := common.RedisKeys.SchedulerStubState(stubId) + state, err := cr.rdb.Get(context.TODO(), stateKey).Result() + if err != nil { + if err == redis.Nil { + return types.StubStateHealthy, nil + } + return "", err + } + + return state, nil +} + +var unhealthyStateTTL = 10 * time.Minute + +func (cr *ContainerRedisRepository) SetStubState(stubId, state string) error { + stateKey := common.RedisKeys.SchedulerStubState(stubId) + return cr.rdb.SetEx(context.TODO(), stateKey, state, unhealthyStateTTL).Err() +} + +func (cr *ContainerRedisRepository) DeleteStubState(stubId string) error { + stateKey := common.RedisKeys.SchedulerStubState(stubId) + return cr.rdb.Del(context.TODO(), stateKey).Err() +} diff --git a/pkg/repository/events.go b/pkg/repository/events.go index 26b322fd5..ff26567c1 100644 --- a/pkg/repository/events.go +++ b/pkg/repository/events.go @@ -3,6 +3,7 @@ package repository import ( "bytes" "encoding/json" + "fmt" "log" "net" "net/http" @@ -279,3 +280,18 @@ func (t *TCPEventClientRepo) PushTaskCreatedEvent(task *types.TaskWithRelated) { event, ) } + +func (t *TCPEventClientRepo) PushStubStateUnhealthy(workspaceId string, stubId string, currentState string, previousState string, reason string, failedContainers []string) { + t.pushEvent( + fmt.Sprintf("stub.state.%s", strings.ToLower(currentState)), + types.EventStubStateSchemaVersion, + types.EventStubStateSchema{ + ID: stubId, + WorkspaceID: workspaceId, + State: currentState, + Reason: reason, + PreviousState: previousState, + FailedContainers: failedContainers, + }, + ) +} diff --git a/pkg/types/event.go b/pkg/types/event.go index 01ff7965a..6aaa841a4 100644 --- a/pkg/types/event.go +++ b/pkg/types/event.go @@ -20,6 +20,7 @@ var ( */ EventTaskUpdated = "task.updated" EventTaskCreated = "task.created" + EventStubState = "stub.state.%s" // healthy, degraded, warning /* TODO: Requires updates @@ -130,3 +131,14 @@ type EventTaskSchema struct { StubID string `json:"stub_id"` CreatedAt time.Time `json:"created_at"` } + +var EventStubStateSchemaVersion = "1.0" + +type EventStubStateSchema struct { + ID string `json:"id"` + WorkspaceID string `json:"workspace_id"` + State string `json:"state"` + PreviousState string `json:"previous_state"` + Reason string `json:"reason"` + FailedContainers []string `json:"failed_containers"` +} diff --git a/pkg/types/scheduler.go b/pkg/types/scheduler.go index 1cb652f40..4a5a0e07f 100644 --- a/pkg/types/scheduler.go +++ b/pkg/types/scheduler.go @@ -19,6 +19,12 @@ const ( WorkerStateTtlS int = 60 ) +const ( + StubStateDegraded = "degraded" + StubStateWarning = "warning" + StubStateHealthy = "healthy" +) + type Worker struct { Id string `json:"id" redis:"id"` Status WorkerStatus `json:"status" redis:"status"`