Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with lambda labs machines #285

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 19 additions & 36 deletions pkg/api/v1/machine.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package apiv1

import (
"encoding/json"
"fmt"
"net/http"
"strings"
"strconv"

"github.com/beam-cloud/beta9/pkg/auth"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/providers"
"github.com/beam-cloud/beta9/pkg/repository"
"github.com/beam-cloud/beta9/pkg/scheduler"
"github.com/beam-cloud/beta9/pkg/storage"
"github.com/beam-cloud/beta9/pkg/types"
"github.com/labstack/echo/v4"
)
Expand Down Expand Up @@ -40,6 +39,7 @@ type RegisterMachineRequest struct {
PoolName string `json:"pool_name"`
Cpu string `json:"cpu"`
Memory string `json:"memory"`
GpuCount string `json:"gpu_count"`
}

func (g *MachineGroup) RegisterMachine(ctx echo.Context) error {
Expand All @@ -50,63 +50,46 @@ func (g *MachineGroup) RegisterMachine(ctx echo.Context) error {

var request RegisterMachineRequest
if err := ctx.Bind(&request); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid payload")
return echo.NewHTTPError(http.StatusBadRequest, "invalid payload")
}

configBytes, err := json.Marshal(g.config)
remoteConfig, err := providers.GetRemoteConfig(g.config, g.tailscale)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Unable to serialize config")
return echo.NewHTTPError(http.StatusInternalServerError, "unable to create remote config")
}

// Overwrite certain config fields with tailscale hostnames
// TODO: figure out a more elegant to override these fields without hardcoding service names
// possibly, use proxy config values
remoteConfig := types.AppConfig{}
if err = json.Unmarshal(configBytes, &remoteConfig); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Unable to deserialize config")
}

redisHostname, err := g.tailscale.GetHostnameForService("redis")
cpu, err := scheduler.ParseCPU(request.Cpu)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Unable to lookup service: redis")
return echo.NewHTTPError(http.StatusInternalServerError, "invalid machine cpu value")
}

if g.config.Storage.Mode == storage.StorageModeJuiceFS {
juiceFsRedisHostname, err := g.tailscale.GetHostnameForService("juicefs-redis")
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Unable to lookup service: juicefs-redis")
}
remoteConfig.Storage.JuiceFS.RedisURI = fmt.Sprintf("rediss://%s/0", juiceFsRedisHostname)
}

gatewayGrpcHostname, err := g.tailscale.GetHostnameForService("gateway-grpc")
memory, err := scheduler.ParseMemory(request.Memory)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Unable to lookup service: gateway-grpc")
return echo.NewHTTPError(http.StatusInternalServerError, "invalid machine memory value")
}

remoteConfig.Database.Redis.Addrs[0] = redisHostname
remoteConfig.GatewayService.Host = strings.Split(gatewayGrpcHostname, ":")[0]

cpu, err := scheduler.ParseCPU(request.Cpu)
gpuCount, err := strconv.ParseUint(request.GpuCount, 10, 32)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Invalid machine CPU value")
return echo.NewHTTPError(http.StatusInternalServerError, "invalid gpu count")
}

memory, err := scheduler.ParseMemory(request.Memory)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Invalid machine memory value")
hostName := fmt.Sprintf("%s.%s", request.MachineID, g.config.Tailscale.HostName)

// If user is != "", add it into hostname (for self-managed control servers like headscale)
if g.config.Tailscale.User != "" {
hostName = fmt.Sprintf("%s.%s.%s", request.MachineID, g.config.Tailscale.User, g.config.Tailscale.HostName)
}

hostName := fmt.Sprintf("%s.%s.%s", request.MachineID, g.config.Tailscale.User, g.config.Tailscale.HostName)
err = g.providerRepo.RegisterMachine(request.ProviderName, request.PoolName, request.MachineID, &types.ProviderMachineState{
MachineId: request.MachineID,
Token: request.Token,
HostName: hostName,
Cpu: cpu,
Memory: memory,
GpuCount: uint32(gpuCount),
})
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to register machine")
return echo.NewHTTPError(http.StatusInternalServerError, "failed to register machine")
}

return ctx.JSON(http.StatusOK, map[string]interface{}{
Expand Down
1 change: 1 addition & 0 deletions pkg/common/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ database:
- redis-master.beta9:6379
password:
enableTLS: false
insecureSkipVerify: false
dialTimeout: 3s
storage:
mode: juicefs
Expand Down
4 changes: 3 additions & 1 deletion pkg/common/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func NewRedisClient(config types.RedisConfig, options ...func(*redis.UniversalOp
}

if config.EnableTLS {
opts.TLSConfig = &tls.Config{}
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: config.InsecureSkipVerify,
}
}

var client redis.UniversalClient
Expand Down
51 changes: 51 additions & 0 deletions pkg/providers/remote_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package providers

import (
"encoding/json"
"fmt"
"strings"

"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/storage"
"github.com/beam-cloud/beta9/pkg/types"
)

func GetRemoteConfig(baseConfig types.AppConfig, tailscale *network.Tailscale) (*types.AppConfig, error) {
configBytes, err := json.Marshal(baseConfig)
if err != nil {
return nil, err
}

// Overwrite certain config fields with tailscale hostnames
// TODO: figure out a more elegant to override these fields without hardcoding service names
// possibly, use proxy config values
remoteConfig := types.AppConfig{}
if err = json.Unmarshal(configBytes, &remoteConfig); err != nil {
return nil, err
}

redisHostname, err := tailscale.GetHostnameForService("redis")
if err != nil {
return nil, err
}

if baseConfig.Storage.Mode == storage.StorageModeJuiceFS {
juiceFsRedisHostname, err := tailscale.GetHostnameForService("juicefs-redis")
if err != nil {
return nil, err
}

remoteConfig.Storage.JuiceFS.RedisURI = fmt.Sprintf("rediss://%s/0", juiceFsRedisHostname)
}

gatewayGrpcHostname, err := tailscale.GetHostnameForService("gateway-grpc")
if err != nil {
return nil, err
}

remoteConfig.Database.Redis.Addrs[0] = redisHostname
remoteConfig.Database.Redis.InsecureSkipVerify = true
remoteConfig.GatewayService.Host = strings.Split(gatewayGrpcHostname, ":")[0]

return &remoteConfig, nil
}
1 change: 1 addition & 0 deletions pkg/repository/provider_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (r *ProviderRedisRepository) RegisterMachine(providerName, poolName, machin
machineInfo.Status = types.MachineStatusRegistered
machineInfo.Cpu = newMachineInfo.Cpu
machineInfo.Memory = newMachineInfo.Memory
machineInfo.GpuCount = newMachineInfo.GpuCount

err = r.rdb.HSet(context.TODO(), stateKey, common.ToSlice(machineInfo)).Err()
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/repository/worker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,22 @@ func (r *WorkerRedisRepository) UpdateWorkerCapacity(worker *types.Worker, reque

switch CapacityUpdateType {
case types.AddCapacity:
updatedWorker.Cpu = updatedWorker.Cpu + request.Cpu
updatedWorker.Memory = updatedWorker.Memory + request.Memory
updatedWorker.FreeCpu = updatedWorker.FreeCpu + request.Cpu
updatedWorker.FreeMemory = updatedWorker.FreeMemory + request.Memory

if request.Gpu != "" {
updatedWorker.GpuCount += request.GpuCount
updatedWorker.FreeGpuCount += request.GpuCount
}

case types.RemoveCapacity:
updatedWorker.Cpu = updatedWorker.Cpu - request.Cpu
updatedWorker.Memory = updatedWorker.Memory - request.Memory
updatedWorker.FreeCpu = updatedWorker.FreeCpu - request.Cpu
updatedWorker.FreeMemory = updatedWorker.FreeMemory - request.Memory

if request.Gpu != "" {
updatedWorker.GpuCount -= request.GpuCount
updatedWorker.FreeGpuCount -= request.GpuCount
}

if updatedWorker.Cpu < 0 || updatedWorker.Memory < 0 || updatedWorker.GpuCount < 0 {
if updatedWorker.FreeCpu < 0 || updatedWorker.FreeMemory < 0 || updatedWorker.FreeGpuCount < 0 {
return errors.New("unable to schedule container, worker out of cpu, memory, or gpu")
}

Expand Down
Loading
Loading