Skip to content

Commit

Permalink
Feat: Add auxiliary state to help check the runtime status of a stub (#…
Browse files Browse the repository at this point in the history
…771)

Added a new redis value
`scheduler:stub:unhealthy_state:[stub_id]`

That stores the state of a given deployment/stub. Currently state can be
`degraded` or `warning`.
A nonexistent value for this variable indicates that the deployment
currently has no runtime issues and is healthy.

`degraded` occurs once the `max container failure` threshold is reached
`warning` occurs when at least 1 failed container occurs in the past 5
minutes (5 minutes before the container exit code value expires)
`healthy` is the state when 0 containers failed in the past 5 minutes.
!!!Caveat: That means even if the containers spin down after failing and
there are 0 running containers, it is considered `healthy`
  • Loading branch information
jsun-m authored Dec 11, 2024
1 parent a20ef03 commit cae3de1
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 11 deletions.
38 changes: 35 additions & 3 deletions pkg/abstractions/common/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type AutoscaledInstanceState struct {
RunningContainers int
PendingContainers int
StoppingContainers int
FailedContainers int
FailedContainers []string
}

type AutoscaledInstanceConfig struct {
Expand All @@ -39,6 +39,7 @@ type AutoscaledInstanceConfig struct {
InstanceLockKey string
ContainerRepo repository.ContainerRepository
BackendRepo repository.BackendRepository
EventRepo repository.EventRepository
TaskRepo repository.TaskRepository
StartContainersFunc func(containersToRun int) error
StopContainersFunc func(containersToStop int) error
Expand Down Expand Up @@ -73,6 +74,7 @@ type AutoscaledInstance struct {
ContainerRepo repository.ContainerRepository
BackendRepo repository.BackendRepository
TaskRepo repository.TaskRepository
EventRepo repository.EventRepository

// Keys
InstanceLockKey string
Expand Down Expand Up @@ -109,6 +111,7 @@ func NewAutoscaledInstance(ctx context.Context, cfg *AutoscaledInstanceConfig) (
ContainerRepo: cfg.ContainerRepo,
BackendRepo: cfg.BackendRepo,
TaskRepo: cfg.TaskRepo,
EventRepo: cfg.EventRepo,
Containers: make(map[string]bool),
ContainerEventChan: make(chan types.ContainerEvent, 1),
ScaleEventChan: make(chan int, 1),
Expand Down Expand Up @@ -236,7 +239,7 @@ func (i *AutoscaledInstance) HandleScalingEvent(desiredContainers int) error {
return err
}

if state.FailedContainers >= i.FailedContainerThreshold {
if len(state.FailedContainers) >= i.FailedContainerThreshold {
log.Printf("<%s> reached failed container threshold, scaling to zero.\n", i.Name)
desiredContainers = 0
}
Expand All @@ -258,6 +261,8 @@ func (i *AutoscaledInstance) HandleScalingEvent(desiredContainers int) error {
err = i.StopContainersFunc(-containerDelta)
}

go i.handleStubEvents(state.FailedContainers)

return err
}

Expand All @@ -267,7 +272,7 @@ func (i *AutoscaledInstance) State() (*AutoscaledInstanceState, error) {
return nil, err
}

failedContainers, err := i.ContainerRepo.GetFailedContainerCountByStubId(i.Stub.ExternalId)
failedContainers, err := i.ContainerRepo.GetFailedContainersByStubId(i.Stub.ExternalId)
if err != nil {
return nil, err
}
Expand All @@ -287,3 +292,30 @@ func (i *AutoscaledInstance) State() (*AutoscaledInstanceState, error) {
state.FailedContainers = failedContainers
return &state, nil
}

func (i *AutoscaledInstance) handleStubEvents(failedContainers []string) {
if len(failedContainers) >= i.FailedContainerThreshold {
i.emitUnhealthyEvent(i.Stub.ExternalId, types.StubStateDegraded, "reached max failed container threshold", failedContainers)
} else if len(failedContainers) > 0 {
i.emitUnhealthyEvent(i.Stub.ExternalId, types.StubStateWarning, "one or more containers failed", failedContainers)
}
}

func (i *AutoscaledInstance) emitUnhealthyEvent(stubId, currentState, reason string, containers []string) {
var state string
state, err := i.ContainerRepo.GetStubState(stubId)
if err != nil {
return
}

if state == currentState {
return
}

err = i.ContainerRepo.SetStubState(stubId, currentState)
if err != nil {
return
}

go i.EventRepo.PushStubStateUnhealthy(i.Workspace.ExternalId, stubId, currentState, state, reason, containers)
}
1 change: 1 addition & 0 deletions pkg/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func (es *HttpEndpointService) getOrCreateEndpointInstance(ctx context.Context,
Scheduler: es.scheduler,
ContainerRepo: es.containerRepo,
BackendRepo: es.backendRepo,
EventRepo: es.eventRepo,
TaskRepo: es.taskRepo,
InstanceLockKey: Keys.endpointInstanceLock(stub.Workspace.Name, stubId),
StartContainersFunc: instance.startContainers,
Expand Down
1 change: 1 addition & 0 deletions pkg/abstractions/taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ func (tq *RedisTaskQueue) getOrCreateQueueInstance(stubId string, options ...fun
Scheduler: tq.scheduler,
ContainerRepo: tq.containerRepo,
BackendRepo: tq.backendRepo,
EventRepo: tq.eventRepo,
TaskRepo: tq.taskRepo,
InstanceLockKey: Keys.taskQueueInstanceLock(stub.Workspace.Name, stubId),
StartContainersFunc: instance.startContainers,
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
schedulerContainerLock string = "scheduler:container:lock:%s"
schedulerContainerExitCode string = "scheduler:container:exit_code:%s"
schedulerCheckpointState string = "scheduler:checkpoint_state:%s:%s"
schedulerStubState string = "scheduler:stub:state:%s"
)

var (
Expand Down Expand Up @@ -146,6 +147,10 @@ func (rk *redisKeys) SchedulerCheckpointState(workspaceName, checkpointId string
return fmt.Sprintf(schedulerCheckpointState, workspaceName, checkpointId)
}

func (rk *redisKeys) SchedulerStubState(stubId string) string {
return fmt.Sprintf(schedulerStubState, stubId)
}

// Gateway keys
func (rk *redisKeys) GatewayPrefix() string {
return gatewayPrefix
Expand Down
6 changes: 5 additions & 1 deletion pkg/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ type ContainerRepository interface {
GetActiveContainersByStubId(stubId string) ([]types.ContainerState, error)
GetActiveContainersByWorkspaceId(workspaceId string) ([]types.ContainerState, error)
GetActiveContainersByWorkerId(workerId string) ([]types.ContainerState, error)
GetFailedContainerCountByStubId(stubId string) (int, error)
GetFailedContainersByStubId(stubId string) ([]string, error)
UpdateCheckpointState(workspaceName, checkpointId string, checkpointState *types.CheckpointState) error
GetCheckpointState(workspaceName, checkpointId string) (*types.CheckpointState, error)
GetStubState(stubId string) (string, error)
SetStubState(stubId, state string) error
DeleteStubState(stubId string) error
}

type WorkspaceRepository interface {
Expand Down Expand Up @@ -180,6 +183,7 @@ type EventRepository interface {
PushRunStubEvent(workspaceId string, stub *types.Stub)
PushTaskUpdatedEvent(task *types.TaskWithRelated)
PushTaskCreatedEvent(task *types.TaskWithRelated)
PushStubStateUnhealthy(workspaceId string, stubId string, currentState, previousState string, reason string, failedContainers []string)
}

type MetricsRepository interface {
Expand Down
39 changes: 32 additions & 7 deletions pkg/repository/container_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,33 +328,33 @@ func (cr *ContainerRedisRepository) GetActiveContainersByWorkerId(workerId strin
return cr.listContainerStateByIndex(indexKey, keys)
}

func (cr *ContainerRedisRepository) GetFailedContainerCountByStubId(stubId string) (int, error) {
func (cr *ContainerRedisRepository) GetFailedContainersByStubId(stubId string) ([]string, error) {
indexKey := common.RedisKeys.SchedulerContainerIndex(stubId)
keys, err := cr.rdb.SMembers(context.TODO(), indexKey).Result()
if err != nil {
return -1, err
return nil, fmt.Errorf("failed to retrieve container state keys: %v", err)
}

// Retrieve the value (exit code) for each key
failedCount := 0
failedContainerIds := make([]string, 0)
for _, key := range keys {
containerId := strings.Split(key, ":")[len(strings.Split(key, ":"))-1]
exitCodeKey := common.RedisKeys.SchedulerContainerExitCode(containerId)

exitCode, err := cr.rdb.Get(context.Background(), exitCodeKey).Int()
if err != nil && err != redis.Nil {
return -1, fmt.Errorf("failed to get value for key <%v>: %w", key, err)
return nil, fmt.Errorf("failed to get value for key <%v>: %w", key, err)
} else if err == redis.Nil {
continue
}

// Check if the exit code is non-zero
if exitCode != 0 {
failedCount++
failedContainerIds = append(failedContainerIds, containerId)
}
}

return failedCount, nil
return failedContainerIds, nil
}

func (c *ContainerRedisRepository) SetContainerStateWithConcurrencyLimit(quota *types.ConcurrencyLimit, request *types.ContainerRequest) error {
Expand Down Expand Up @@ -428,7 +428,7 @@ func (cr *ContainerRedisRepository) UpdateCheckpointState(workspaceName, checkpo
"stub_id", checkpointState.StubId,
"container_id", checkpointState.ContainerId,
"status", string(checkpointState.Status),
"remote_key", checkpointState.RemoteKey,
"remote_key", checkpointState.RemoteKey,
).Err()
if err != nil {
return fmt.Errorf("failed to set checkpoint state <%v>: %w", stateKey, err)
Expand Down Expand Up @@ -456,3 +456,28 @@ func (cr *ContainerRedisRepository) GetCheckpointState(workspaceName, checkpoint

return state, nil
}

func (cr *ContainerRedisRepository) GetStubState(stubId string) (string, error) {
stateKey := common.RedisKeys.SchedulerStubState(stubId)
state, err := cr.rdb.Get(context.TODO(), stateKey).Result()
if err != nil {
if err == redis.Nil {
return types.StubStateHealthy, nil
}
return "", err
}

return state, nil
}

var unhealthyStateTTL = 10 * time.Minute

func (cr *ContainerRedisRepository) SetStubState(stubId, state string) error {
stateKey := common.RedisKeys.SchedulerStubState(stubId)
return cr.rdb.SetEx(context.TODO(), stateKey, state, unhealthyStateTTL).Err()
}

func (cr *ContainerRedisRepository) DeleteStubState(stubId string) error {
stateKey := common.RedisKeys.SchedulerStubState(stubId)
return cr.rdb.Del(context.TODO(), stateKey).Err()
}
16 changes: 16 additions & 0 deletions pkg/repository/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package repository
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -279,3 +280,18 @@ func (t *TCPEventClientRepo) PushTaskCreatedEvent(task *types.TaskWithRelated) {
event,
)
}

func (t *TCPEventClientRepo) PushStubStateUnhealthy(workspaceId string, stubId string, currentState string, previousState string, reason string, failedContainers []string) {
t.pushEvent(
fmt.Sprintf("stub.state.%s", strings.ToLower(currentState)),
types.EventStubStateSchemaVersion,
types.EventStubStateSchema{
ID: stubId,
WorkspaceID: workspaceId,
State: currentState,
Reason: reason,
PreviousState: previousState,
FailedContainers: failedContainers,
},
)
}
12 changes: 12 additions & 0 deletions pkg/types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
*/
EventTaskUpdated = "task.updated"
EventTaskCreated = "task.created"
EventStubState = "stub.state.%s" // healthy, degraded, warning

/*
TODO: Requires updates
Expand Down Expand Up @@ -130,3 +131,14 @@ type EventTaskSchema struct {
StubID string `json:"stub_id"`
CreatedAt time.Time `json:"created_at"`
}

var EventStubStateSchemaVersion = "1.0"

type EventStubStateSchema struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
State string `json:"state"`
PreviousState string `json:"previous_state"`
Reason string `json:"reason"`
FailedContainers []string `json:"failed_containers"`
}
6 changes: 6 additions & 0 deletions pkg/types/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ const (
WorkerStateTtlS int = 60
)

const (
StubStateDegraded = "degraded"
StubStateWarning = "warning"
StubStateHealthy = "healthy"
)

type Worker struct {
Id string `json:"id" redis:"id"`
Status WorkerStatus `json:"status" redis:"status"`
Expand Down

0 comments on commit cae3de1

Please sign in to comment.