Skip to content

Commit

Permalink
fix: fix stop deployment bug (#873)
Browse files Browse the repository at this point in the history
  • Loading branch information
luke-lombardi authored Jan 20, 2025
1 parent 25754fc commit c8e7e87
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 47 deletions.
95 changes: 51 additions & 44 deletions pkg/abstractions/common/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type IAutoscaledInstance interface {
ConsumeScaleResult(*AutoscalerResult)
ConsumeContainerEvent(types.ContainerEvent)
HandleScalingEvent(int) error
Reload() error
Sync() error
}

type AutoscaledInstanceState struct {
Expand Down Expand Up @@ -130,27 +130,10 @@ func NewAutoscaledInstance(ctx context.Context, cfg *AutoscaledInstanceConfig) (
instance.StubConfig.Autoscaler.TasksPerContainer = 1
}

instance.Sync()
return instance, nil
}

// Reload updates state that should be changed on the instance.
// If a stub has a deployment associated with it, we update the IsActive field.
func (i *AutoscaledInstance) Reload() error {
deployments, err := i.BackendRepo.ListDeploymentsWithRelated(i.Ctx, types.DeploymentFilter{
StubIds: []string{i.Stub.ExternalId},
WorkspaceID: i.Stub.Workspace.Id,
})
if err != nil || len(deployments) == 0 {
return err
}

if len(deployments) == 1 && !deployments[0].Active {
i.IsActive = false
}

return nil
}

func (i *AutoscaledInstance) WaitForContainer(ctx context.Context, duration time.Duration) (*types.ContainerState, error) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -270,6 +253,27 @@ func (i *AutoscaledInstance) HandleScalingEvent(desiredContainers int) error {
return err
}

// Sync updates any persistent state that can be changed on the instance.
// If a stub has a deployment associated with it, we update the IsActive field.
func (i *AutoscaledInstance) Sync() error {
if i.Stub.Type.IsDeployment() {
deployments, err := i.BackendRepo.ListDeploymentsWithRelated(i.Ctx, types.DeploymentFilter{
StubIds: []string{i.Stub.ExternalId},
WorkspaceID: i.Stub.Workspace.Id,
ShowDeleted: true,
})
if err != nil || len(deployments) == 0 {
return err
}

if len(deployments) == 1 && !deployments[0].Active {
i.IsActive = false
}
}

return nil
}

func (i *AutoscaledInstance) State() (*AutoscaledInstanceState, error) {
containers, err := i.ContainerRepo.GetActiveContainersByStubId(i.Stub.ExternalId)
if err != nil {
Expand Down Expand Up @@ -328,12 +332,13 @@ func (i *AutoscaledInstance) emitUnhealthyEvent(stubId, currentState, reason str
type InstanceController struct {
ctx context.Context
getOrCreateInstance func(ctx context.Context, stubId string, options ...func(IAutoscaledInstance)) (IAutoscaledInstance, error)
StubTypes []string
stubTypes []string
backendRepo repository.BackendRepository
redisClient *common.RedisClient
eventBus *common.EventBus
}

func NewController(
func NewInstanceController(
ctx context.Context,
getOrCreateInstance func(ctx context.Context, stubId string, options ...func(IAutoscaledInstance)) (IAutoscaledInstance, error),
stubTypes []string,
Expand All @@ -343,9 +348,10 @@ func NewController(
return &InstanceController{
ctx: ctx,
getOrCreateInstance: getOrCreateInstance,
StubTypes: stubTypes,
stubTypes: stubTypes,
backendRepo: backendRepo,
redisClient: redisClient,
eventBus: common.NewEventBus(redisClient),
}
}

Expand All @@ -357,7 +363,7 @@ func (c *InstanceController) Init() error {
stubType := e.Args["stub_type"].(string)

correctStub := false
for _, t := range c.StubTypes {
for _, t := range c.stubTypes {
if t == stubType {
correctStub = true
break
Expand All @@ -368,16 +374,23 @@ func (c *InstanceController) Init() error {
return true
}

if err := c.reload(stubId); err != nil {
if err := c.Load(&types.DeploymentFilter{
StubIds: []string{stubId},
StubType: c.stubTypes,
ShowDeleted: true,
}); err != nil {
return false
}

return true
}},
)
go eventBus.ReceiveEvents(c.ctx)
c.eventBus = eventBus

go c.eventBus.ReceiveEvents(c.ctx)

if err := c.load(); err != nil {
// Load all instances matching the defined stub types
if err := c.Load(nil); err != nil {
return err
}

Expand All @@ -396,36 +409,30 @@ func (c *InstanceController) Warmup(
return instance.HandleScalingEvent(1)
}

func (c *InstanceController) load() error {
stubs, err := c.backendRepo.ListDeploymentsWithRelated(
c.ctx,
types.DeploymentFilter{
StubType: c.StubTypes,
func (c *InstanceController) Load(filter *types.DeploymentFilter) error {
if filter == nil {
filter = &types.DeploymentFilter{
StubType: c.stubTypes,
MinContainersGTE: 1,
Active: ptr.To(true),
},
}
}

stubs, err := c.backendRepo.ListDeploymentsWithRelated(
c.ctx,
*filter,
)
if err != nil {
return err
}

for _, stub := range stubs {
_, err := c.getOrCreateInstance(c.ctx, stub.Stub.ExternalId)
instance, err := c.getOrCreateInstance(c.ctx, stub.Stub.ExternalId)
if err != nil {
return err
}
instance.Sync()
}

return nil
}

func (c *InstanceController) reload(stubId string) error {
instance, err := c.getOrCreateInstance(c.ctx, stubId)
if err != nil {
return err
}

instance.Reload()

return nil
}
2 changes: 1 addition & 1 deletion pkg/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewHTTPEndpointService(
}
eventManager.Listen()

es.controller = abstractions.NewController(ctx, es.InstanceFactory, []string{types.StubTypeEndpointDeployment, types.StubTypeASGIDeployment}, es.backendRepo, es.rdb)
es.controller = abstractions.NewInstanceController(ctx, es.InstanceFactory, []string{types.StubTypeEndpointDeployment, types.StubTypeASGIDeployment}, es.backendRepo, es.rdb)
err = es.controller.Init()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/abstractions/taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewRedisTaskQueueService(
eventManager.Listen()

// Initialize deployment manager
tq.controller = abstractions.NewController(ctx, tq.InstanceFactory, []string{types.StubTypeTaskQueueDeployment}, opts.BackendRepo, opts.RedisClient)
tq.controller = abstractions.NewInstanceController(ctx, tq.InstanceFactory, []string{types.StubTypeTaskQueueDeployment}, opts.BackendRepo, opts.RedisClient)
err = tq.controller.Init()
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package apiv1
import (
"net/http"
"path"
"time"

"github.com/labstack/echo/v4"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -232,6 +233,7 @@ func (g *DeploymentGroup) stopDeployments(deployments []types.DeploymentWithRela
eventBus.Send(&common.Event{Type: common.EventTypeReloadInstance, Retries: 3, LockAndDelete: false, Args: map[string]any{
"stub_id": deployment.Stub.ExternalId,
"stub_type": deployment.StubType,
"timestamp": time.Now().Unix(),
}})
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/gateway/services/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strconv"
"strings"
"time"

"github.com/beam-cloud/beta9/pkg/auth"
common "github.com/beam-cloud/beta9/pkg/common"
Expand Down Expand Up @@ -180,6 +181,7 @@ func (gws *GatewayService) stopDeployments(deployments []types.DeploymentWithRel
eventBus.Send(&common.Event{Type: common.EventTypeReloadInstance, Retries: 3, LockAndDelete: false, Args: map[string]any{
"stub_id": deployment.Stub.ExternalId,
"stub_type": deployment.StubType,
"timestamp": time.Now().Unix(),
}})
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/gateway/services/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"strings"
"time"

"github.com/beam-cloud/beta9/pkg/abstractions/endpoint"
"github.com/beam-cloud/beta9/pkg/abstractions/function"
Expand Down Expand Up @@ -240,6 +241,7 @@ func (gws *GatewayService) DeployStub(ctx context.Context, in *pb.DeployStubRequ
eventBus.Send(&common.Event{Type: common.EventTypeReloadInstance, Retries: 3, LockAndDelete: false, Args: map[string]any{
"stub_id": stub.ExternalId,
"stub_type": stub.Type,
"timestamp": time.Now().Unix(),
}})
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/repository/backend_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,10 +1062,13 @@ func (c *PostgresBackendRepository) listDeploymentsQueryBuilder(filters types.De
).From("deployment d").
Join("workspace w ON d.workspace_id = w.id").
Join("stub s ON d.stub_id = s.id").
Where("d.deleted_at IS NULL").
OrderBy("d.created_at DESC")

// Apply filters
if !filters.ShowDeleted {
qb = qb.Where("d.deleted_at IS NULL")
}

if filters.WorkspaceID != 0 {
qb = qb.Where(squirrel.Eq{"d.workspace_id": filters.WorkspaceID})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/types/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type DeploymentFilter struct {
Subdomain string `query:"subdomain"`
SearchQuery string `query:"search_query"`
MinContainersGTE uint `query:"min_containers"`
ShowDeleted bool `query:"show_deleted"`
}

type TaskFilter struct {
Expand Down

0 comments on commit c8e7e87

Please sign in to comment.