diff --git a/go.mod b/go.mod index b78c7123c..3e688cbc7 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( buf.build/gen/go/cedana/task/protocolbuffers/go v1.35.2-20241203191352-12c25eb032cd.1 github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Masterminds/squirrel v1.5.4 + github.com/VictoriaMetrics/metrics v1.35.2 github.com/alicebob/miniredis/v2 v2.30.5 github.com/aws/aws-sdk-go-v2 v1.30.1 github.com/aws/aws-sdk-go-v2/config v1.27.24 @@ -255,7 +256,9 @@ require ( github.com/ulikunitz/xz v0.5.12 // indirect github.com/urfave/cli v1.22.14 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect + github.com/valyala/histogram v1.2.0 // indirect github.com/vbatts/go-mtree v0.5.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xeonx/timeago v1.0.0-rc5 // indirect diff --git a/go.sum b/go.sum index aab19b1d5..4a4922aa7 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/VictoriaMetrics/metrics v1.35.2 h1:Bj6L6ExfnakZKYPpi7mGUnkJP4NGQz2v5wiChhXNyWQ= +github.com/VictoriaMetrics/metrics v1.35.2/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/akutz/memconn v0.1.0 h1:NawI0TORU4hcOMsMr11g7vwlCdkYeLKXBcxWu2W/P8A= github.com/akutz/memconn v0.1.0/go.mod h1:Jo8rI7m0NieZyLI5e2CDlRdRqRRB4S7Xp77ukDjH+Fw= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= @@ -649,8 +651,12 @@ github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= +github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= +github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= +github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= github.com/vbatts/go-mtree v0.5.0 h1:dM+5XZdqH0j9CSZeerhoN/tAySdwnmevaZHO1XGW2Vc= github.com/vbatts/go-mtree v0.5.0/go.mod h1:7JbaNHyBMng+RP8C3Q4E+4Ca8JnGQA2R/MB+jb4tSOk= github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= diff --git a/manifests/kustomize/overlays/cluster-dev/kustomization.yaml b/manifests/kustomize/overlays/cluster-dev/kustomization.yaml index e1a14b9ae..dfd78d792 100644 --- a/manifests/kustomize/overlays/cluster-dev/kustomization.yaml +++ b/manifests/kustomize/overlays/cluster-dev/kustomization.yaml @@ -38,28 +38,35 @@ patches: path: /spec/template/spec/initContainers/0/resources - op: remove path: /spec/template/spec/containers/0/resources -# Disable unused monitoring components - target: kind: Deployment name: victoria-metrics-agent patch: |- - op: replace path: /spec/replicas - value: 0 + value: 1 - target: kind: StatefulSet name: victoria-metrics-single patch: |- - op: replace path: /spec/replicas - value: 0 + value: 1 - target: kind: StatefulSet name: victoria-logs-single patch: |- - op: replace path: /spec/replicas - value: 0 + value: 1 +- target: + kind: Deployment + name: grafana + patch: |- + - op: replace + path: /spec/replicas + value: 1 + # Replace RollingUpdate with Recreate for all Deployments in this overlay - target: kind: Deployment @@ -68,3 +75,18 @@ patches: path: /spec/strategy value: type: Recreate + +- target: + kind: Service + name: grafana + patch: |- + - op: replace + path: /spec/type + value: LoadBalancer + - op: replace + path: /spec/ports + value: + - name: service + port: 3000 + targetPort: 3000 + protocol: TCP \ No newline at end of file diff --git a/pkg/abstractions/endpoint/buffer.go b/pkg/abstractions/endpoint/buffer.go index 50dfe1573..a59b744b8 100644 --- a/pkg/abstractions/endpoint/buffer.go +++ b/pkg/abstractions/endpoint/buffer.go @@ -19,6 +19,7 @@ import ( abstractions "github.com/beam-cloud/beta9/pkg/abstractions/common" "github.com/beam-cloud/beta9/pkg/common" + "github.com/beam-cloud/beta9/pkg/metrics" "github.com/beam-cloud/beta9/pkg/network" "github.com/beam-cloud/beta9/pkg/repository" "github.com/beam-cloud/beta9/pkg/task" @@ -353,10 +354,13 @@ func (rb *RequestBuffer) getHttpClient(address string, timeout time.Duration) (* return rb.httpClient, nil } + start := time.Now() conn, err := network.ConnectToHost(rb.ctx, address, timeout, rb.tailscale, rb.tsConfig) if err != nil { return nil, err } + // Use goroutine to avoid potential blocking from rate limiter's internal mutex + go metrics.RecordDialTime(time.Since(start)) // Create a custom transport that uses the established connection // Either using tailscale or not diff --git a/pkg/abstractions/image/build.go b/pkg/abstractions/image/build.go index dd20264e7..c5a6e1727 100644 --- a/pkg/abstractions/image/build.go +++ b/pkg/abstractions/image/build.go @@ -22,6 +22,7 @@ import ( "github.com/beam-cloud/beta9/pkg/auth" "github.com/beam-cloud/beta9/pkg/common" "github.com/beam-cloud/beta9/pkg/network" + "github.com/beam-cloud/beta9/pkg/registry" "github.com/beam-cloud/beta9/pkg/repository" "github.com/beam-cloud/beta9/pkg/scheduler" "github.com/beam-cloud/beta9/pkg/types" @@ -42,7 +43,7 @@ const ( type Builder struct { config types.AppConfig scheduler *scheduler.Scheduler - registry *common.ImageRegistry + registry *registry.ImageRegistry containerRepo repository.ContainerRepository tailscale *network.Tailscale eventBus *common.EventBus @@ -142,7 +143,7 @@ func (o *BuildOpts) addPythonRequirements() { o.PythonPackages = append(filteredPythonPackages, baseRequirementsSlice...) } -func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository, rdb *common.RedisClient) (*Builder, error) { +func NewBuilder(config types.AppConfig, registry *registry.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository, rdb *common.RedisClient) (*Builder, error) { return &Builder{ config: config, scheduler: scheduler, diff --git a/pkg/abstractions/image/image.go b/pkg/abstractions/image/image.go index b29259033..aecb0b710 100644 --- a/pkg/abstractions/image/image.go +++ b/pkg/abstractions/image/image.go @@ -8,6 +8,7 @@ import ( "github.com/beam-cloud/beta9/pkg/auth" "github.com/beam-cloud/beta9/pkg/common" "github.com/beam-cloud/beta9/pkg/network" + "github.com/beam-cloud/beta9/pkg/registry" "github.com/beam-cloud/beta9/pkg/repository" "github.com/beam-cloud/beta9/pkg/scheduler" "github.com/beam-cloud/beta9/pkg/types" @@ -48,7 +49,7 @@ func NewRuncImageService( ctx context.Context, opts ImageServiceOpts, ) (ImageService, error) { - registry, err := common.NewImageRegistry(opts.Config.ImageService) + registry, err := registry.NewImageRegistry(opts.Config) if err != nil { return nil, err } diff --git a/pkg/abstractions/pod/proxy.go b/pkg/abstractions/pod/proxy.go index c32126f38..45de83166 100644 --- a/pkg/abstractions/pod/proxy.go +++ b/pkg/abstractions/pod/proxy.go @@ -11,6 +11,7 @@ import ( abstractions "github.com/beam-cloud/beta9/pkg/abstractions/common" "github.com/beam-cloud/beta9/pkg/common" + "github.com/beam-cloud/beta9/pkg/metrics" "github.com/beam-cloud/beta9/pkg/network" "github.com/beam-cloud/beta9/pkg/repository" "github.com/beam-cloud/beta9/pkg/types" @@ -178,11 +179,13 @@ func (pb *PodProxyBuffer) handleConnection(conn *connection) { return } + start := time.Now() containerConn, err := network.ConnectToHost(request.Context(), container.addressMap[int32(port)], containerDialTimeoutDurationS, pb.tailscale, pb.tsConfig) if err != nil { conn.ctx.String(http.StatusServiceUnavailable, "Failed to connect to service") return } + go metrics.RecordDialTime(time.Since(start)) defer containerConn.Close() abstractions.SetConnOptions(containerConn, true, connectionKeepAliveInterval, connectionReadTimeout) diff --git a/pkg/common/skopeo.go b/pkg/common/skopeo.go index 49fdd4804..4473f1d18 100644 --- a/pkg/common/skopeo.go +++ b/pkg/common/skopeo.go @@ -22,6 +22,7 @@ const ( type SkopeoClient interface { Inspect(ctx context.Context, sourceImage string, creds string) (ImageMetadata, error) + InspectSizeInBytes(ctx context.Context, sourceImage string, creds string) (int64, error) Copy(ctx context.Context, sourceImage string, dest string, creds string) error } @@ -88,6 +89,20 @@ func (p *skopeoClient) Inspect(ctx context.Context, sourceImage string, creds st return imageMetadata, nil } +func (p *skopeoClient) InspectSizeInBytes(ctx context.Context, sourceImage string, creds string) (int64, error) { + imageMetadata, err := p.Inspect(ctx, sourceImage, creds) + if err != nil { + return 0, err + } + + size := int64(0) + for _, layer := range imageMetadata.LayersData { + size += int64(layer.Size) + } + + return size, nil +} + func (p *skopeoClient) Copy(ctx context.Context, sourceImage string, dest string, creds string) error { args := []string{"copy", fmt.Sprintf("docker://%s", sourceImage), dest} diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index f97c317d7..36f957b25 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -40,7 +40,7 @@ import ( repositoryservices "github.com/beam-cloud/beta9/pkg/gateway/services/repository" "github.com/beam-cloud/beta9/pkg/network" "github.com/beam-cloud/beta9/pkg/repository" - metrics "github.com/beam-cloud/beta9/pkg/repository/metrics" + usage "github.com/beam-cloud/beta9/pkg/repository/usage" "github.com/beam-cloud/beta9/pkg/scheduler" "github.com/beam-cloud/beta9/pkg/storage" "github.com/beam-cloud/beta9/pkg/task" @@ -55,27 +55,27 @@ import ( type Gateway struct { pb.UnimplementedSchedulerServer - Config types.AppConfig - httpServer *http.Server - grpcServer *grpc.Server - RedisClient *common.RedisClient - TaskDispatcher *task.Dispatcher - TaskRepo repository.TaskRepository - WorkspaceRepo repository.WorkspaceRepository - ContainerRepo repository.ContainerRepository - BackendRepo repository.BackendRepository - ProviderRepo repository.ProviderRepository - WorkerPoolRepo repository.WorkerPoolRepository - EventRepo repository.EventRepository - Tailscale *network.Tailscale - metricsRepo repository.MetricsRepository - workerRepo repository.WorkerRepository - Storage storage.Storage - Scheduler *scheduler.Scheduler - ctx context.Context - cancelFunc context.CancelFunc - baseRouteGroup *echo.Group - rootRouteGroup *echo.Group + Config types.AppConfig + httpServer *http.Server + grpcServer *grpc.Server + RedisClient *common.RedisClient + TaskDispatcher *task.Dispatcher + TaskRepo repository.TaskRepository + WorkspaceRepo repository.WorkspaceRepository + ContainerRepo repository.ContainerRepository + BackendRepo repository.BackendRepository + ProviderRepo repository.ProviderRepository + WorkerPoolRepo repository.WorkerPoolRepository + EventRepo repository.EventRepository + Tailscale *network.Tailscale + usageMetricsRepo repository.UsageMetricsRepository + workerRepo repository.WorkerRepository + Storage storage.Storage + Scheduler *scheduler.Scheduler + ctx context.Context + cancelFunc context.CancelFunc + baseRouteGroup *echo.Group + rootRouteGroup *echo.Group } func NewGateway() (*Gateway, error) { @@ -90,7 +90,7 @@ func NewGateway() (*Gateway, error) { return nil, err } - metricsRepo, err := metrics.NewMetrics(config.Monitoring, string(metrics.MetricsSourceGateway)) + metricsRepo, err := usage.NewUsage(config.Monitoring, string(usage.MetricsSourceGateway)) if err != nil { return nil, err } @@ -157,7 +157,7 @@ func NewGateway() (*Gateway, error) { gateway.BackendRepo = backendRepo gateway.Tailscale = tailscale gateway.TaskDispatcher = taskDispatcher - gateway.metricsRepo = metricsRepo + gateway.usageMetricsRepo = metricsRepo gateway.EventRepo = eventRepo gateway.workerRepo = workerRepo diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 000000000..f19884918 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,143 @@ +package metrics + +import ( + "context" + "fmt" + "os" + "time" + + vmetrics "github.com/VictoriaMetrics/metrics" + "github.com/beam-cloud/beta9/pkg/types" + "github.com/rs/zerolog/log" + "tailscale.com/tstime/rate" +) + +var ( + dialMetricLimiter *rate.Limiter +) + +const ( + metricRequestSchedulingDuration = "scheduler_request_scheduling_duration_ms" + metricRequestRetries = "scheduler_request_retries" + metricImagePullTime = "worker_image_pull_time_seconds" + metricImageBuildSpeed = "worker_image_build_speed_mbps" + metricImageUnpackSpeed = "worker_image_unpack_speed_mbps" + metricImageCopySpeed = "worker_image_copy_speed_mbps" + metricImageArchiveSpeed = "worker_image_archive_speed_mbps" + metricImagePushSpeed = "worker_image_push_speed_mbps" + metricS3PutSpeed = "s3_put_speed_mbps" + metricS3GetSpeed = "s3_get_speed_mbps" + metricDialTime = "dial_time_ms" + metricContainerStartLatency = "container_start_latency_ms" +) + +func InitializeMetricsRepository(config types.VictoriaMetricsConfig) { + // ENV to collect for default labels + workerPoolName := os.Getenv("WORKER_POOL_NAME") + gpuType := os.Getenv("GPU_TYPE") + podHostname := os.Getenv("POD_HOSTNAME") + + opts := &vmetrics.PushOptions{ + Headers: []string{ + fmt.Sprintf("Authorization: Bearer %s", config.AuthToken), + }, + } + + if workerPoolName != "" && podHostname != "" { + opts.ExtraLabels = fmt.Sprintf(`worker_pool_name="%s",pod_hostname="%s"`, workerPoolName, podHostname) + } + + if gpuType != "" { + opts.ExtraLabels = fmt.Sprintf(`%s,gpu_type="%s"`, opts.ExtraLabels, gpuType) + } + + vmetrics.GetDefaultSet().InitPushWithOptions( + context.Background(), + config.PushURL, + time.Duration(config.PushSecs)*time.Second, + opts, + ) + + dialMetricLimiter = rate.NewLimiter(2, 1) +} + +func RecordRequestSchedulingDuration(duration time.Duration, request *types.ContainerRequest) { + log.Info().Interface("request", request).Msg("recording request scheduling duration") + metricName := fmt.Sprintf("%s{gpu=\"%s\",gpu_count=\"%d\",cpu=\"%d\",memory=\"%d\"}", + metricRequestSchedulingDuration, + request.Gpu, + request.GpuCount, + request.Cpu, + request.Memory) + + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricName).Update(float64(duration.Milliseconds())) +} + +func RecordRequestRetry(request *types.ContainerRequest) { + metricName := fmt.Sprintf("%s{container_id=\"%s\",gpu=\"%s\",gpu_count=\"%d\",cpu=\"%d\",memory=\"%d\",retry_count=\"%d\"}", + metricRequestRetries, + request.ContainerId, + request.Gpu, + request.GpuCount, + request.Cpu, + request.Memory, + request.RetryCount) + + vmetrics.GetDefaultSet().GetOrCreateCounter(metricName).Inc() +} + +func RecordImagePullTime(duration time.Duration) { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImagePullTime).Update(duration.Seconds()) +} + +func RecordImageBuildSpeed(sizeInMB float64, duration time.Duration) { + if sizeInMB > 0 { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageBuildSpeed).Update(sizeInMB / duration.Seconds()) + } +} + +func RecordImageUnpackSpeed(sizeInMB float64, duration time.Duration) { + if sizeInMB > 0 { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageUnpackSpeed).Update(sizeInMB / duration.Seconds()) + } +} + +func RecordImageCopySpeed(sizeInMB float64, duration time.Duration) { + if sizeInMB > 0 { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageCopySpeed).Update(sizeInMB / duration.Seconds()) + } +} + +func RecordImageArchiveSpeed(sizeInMB float64, duration time.Duration) { + if sizeInMB > 0 { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageArchiveSpeed).Update(sizeInMB / duration.Seconds()) + } +} + +func RecordImagePushSpeed(sizeInMB float64, duration time.Duration) { + if sizeInMB > 0 { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImagePushSpeed).Update(sizeInMB / duration.Seconds()) + } +} + +func RecordS3PutSpeed(sizeInMB float64, duration time.Duration) { + if sizeInMB > 0 { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricS3PutSpeed).Update(sizeInMB / duration.Seconds()) + } +} + +func RecordS3GetSpeed(sizeInMB float64, duration time.Duration) { + if sizeInMB > 0 { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricS3GetSpeed).Update(sizeInMB / duration.Seconds()) + } +} + +func RecordDialTime(duration time.Duration) { + if dialMetricLimiter.Allow() { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricDialTime).Update(float64(duration.Milliseconds())) + } +} + +func RecordContainerStartLatency(duration time.Duration) { + vmetrics.GetDefaultSet().GetOrCreateHistogram(metricContainerStartLatency).Update(float64(duration.Milliseconds())) +} diff --git a/pkg/common/registry.go b/pkg/registry/registry.go similarity index 82% rename from pkg/common/registry.go rename to pkg/registry/registry.go index ab4ac9714..f05191397 100644 --- a/pkg/common/registry.go +++ b/pkg/registry/registry.go @@ -1,4 +1,4 @@ -package common +package registry import ( "context" @@ -6,10 +6,13 @@ import ( "io" "os" "path/filepath" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/beam-cloud/beta9/pkg/common" + "github.com/beam-cloud/beta9/pkg/metrics" "github.com/beam-cloud/beta9/pkg/types" "github.com/google/uuid" "github.com/rs/zerolog/log" @@ -28,15 +31,15 @@ type ImageRegistry struct { ImageFileExtension string } -func NewImageRegistry(config types.ImageServiceConfig) (*ImageRegistry, error) { +func NewImageRegistry(config types.AppConfig) (*ImageRegistry, error) { var err error var store ObjectStore var imageFileExtension string = localImageFileExtension - switch config.RegistryStore { + switch config.ImageService.RegistryStore { case S3ImageRegistryStore: imageFileExtension = remoteImageFileExtension - store, err = NewS3Store(config.Registries.S3) + store, err = NewS3Store(config) if err != nil { return nil, err } @@ -50,7 +53,7 @@ func NewImageRegistry(config types.ImageServiceConfig) (*ImageRegistry, error) { return &ImageRegistry{ store: store, - config: config, + config: config.ImageService, ImageFileExtension: imageFileExtension, }, nil } @@ -78,28 +81,29 @@ type ObjectStore interface { Size(ctx context.Context, key string) (int64, error) } -func NewS3Store(config types.S3ImageRegistryConfig) (*S3Store, error) { - cfg, err := GetAWSConfig(config.AccessKey, config.SecretKey, config.Region, config.Endpoint) +type S3Store struct { + client *s3.Client + config types.S3ImageRegistryConfig +} + +func NewS3Store(config types.AppConfig) (*S3Store, error) { + cfg, err := common.GetAWSConfig(config.ImageService.Registries.S3.AccessKey, config.ImageService.Registries.S3.SecretKey, config.ImageService.Registries.S3.Region, config.ImageService.Registries.S3.Endpoint) if err != nil { return nil, err } return &S3Store{ client: s3.NewFromConfig(cfg, func(o *s3.Options) { - if config.ForcePathStyle { + if config.ImageService.Registries.S3.ForcePathStyle { o.UsePathStyle = true } }), - config: config, + config: config.ImageService.Registries.S3, }, nil } -type S3Store struct { - client *s3.Client - config types.S3ImageRegistryConfig -} - func (s *S3Store) Put(ctx context.Context, localPath string, key string) error { + start := time.Now() f, err := os.Open(localPath) if err != nil { return err @@ -117,10 +121,19 @@ func (s *S3Store) Put(ctx context.Context, localPath string, key string) error { return err } + info, err := os.Stat(localPath) + if err != nil { + log.Error().Str("key", key).Err(err).Msg("error getting file size") + return err + } + sizeMB := float64(info.Size()) / 1024 / 1024 + + metrics.RecordS3PutSpeed(sizeMB, time.Since(start)) return nil } func (s *S3Store) Get(ctx context.Context, key string, localPath string) error { + start := time.Now() tmpLocalPath := fmt.Sprintf("%s.%s", localPath, uuid.New().String()[:6]) f, err := os.Create(tmpLocalPath) @@ -151,6 +164,14 @@ func (s *S3Store) Get(ctx context.Context, key string, localPath string) error { return err } + info, err := os.Stat(localPath) + if err != nil { + log.Error().Str("key", key).Err(err).Msg("error getting file size") + return err + } + sizeMB := float64(info.Size()) / 1024 / 1024 + metrics.RecordS3GetSpeed(sizeMB, time.Since(start)) + return nil } diff --git a/pkg/repository/base.go b/pkg/repository/base.go index 3ac2d84f6..b7fac28a6 100755 --- a/pkg/repository/base.go +++ b/pkg/repository/base.go @@ -208,7 +208,7 @@ type EventRepository interface { PushWorkerPoolHealthyEvent(poolName string, poolState *types.WorkerPoolState) } -type MetricsRepository interface { +type UsageMetricsRepository interface { Init(source string) error IncrementCounter(name string, metadata map[string]interface{}, value float64) error SetGauge(name string, metadata map[string]interface{}, value float64) error diff --git a/pkg/repository/metrics/metrics.go b/pkg/repository/usage/usage.go similarity index 71% rename from pkg/repository/metrics/metrics.go rename to pkg/repository/usage/usage.go index dca675d1c..3076b41fb 100644 --- a/pkg/repository/metrics/metrics.go +++ b/pkg/repository/usage/usage.go @@ -12,14 +12,14 @@ var ( MetricsSourceWorker MetricsSource = "worker" ) -func NewMetrics(config types.MonitoringConfig, source string) (repository.MetricsRepository, error) { - var metricsRepo repository.MetricsRepository +func NewUsage(config types.MonitoringConfig, source string) (repository.UsageMetricsRepository, error) { + var metricsRepo repository.UsageMetricsRepository switch config.MetricsCollector { case string(types.MetricsCollectorPrometheus): metricsRepo = NewPrometheusMetricsRepository(config.Prometheus) case string(types.MetricsCollectorOpenMeter): - metricsRepo = NewOpenMeterMetricsRepository(config.OpenMeter) + metricsRepo = NewOpenMeterUsageRepository(config.OpenMeter) } err := metricsRepo.Init(source) diff --git a/pkg/repository/metrics/metrics_openmeter.go b/pkg/repository/usage/usage_openmeter.go similarity index 71% rename from pkg/repository/metrics/metrics_openmeter.go rename to pkg/repository/usage/usage_openmeter.go index 97e32846c..f7ae80f40 100644 --- a/pkg/repository/metrics/metrics_openmeter.go +++ b/pkg/repository/usage/usage_openmeter.go @@ -13,20 +13,20 @@ import ( openmeter "github.com/openmeterio/openmeter/api/client/go" ) -type OpenMeterMetricsRepository struct { +type OpenMeterUsageRepository struct { client *openmeter.ClientWithResponses config types.OpenMeterConfig source string } -func NewOpenMeterMetricsRepository(omConfig types.OpenMeterConfig) repository.MetricsRepository { - return &OpenMeterMetricsRepository{ +func NewOpenMeterUsageRepository(omConfig types.OpenMeterConfig) repository.UsageMetricsRepository { + return &OpenMeterUsageRepository{ config: omConfig, source: "", } } -func (o *OpenMeterMetricsRepository) Init(source string) error { +func (o *OpenMeterUsageRepository) Init(source string) error { om, err := openmeter.NewAuthClientWithResponses(o.config.ServerUrl, o.config.ApiKey) if err != nil { return err @@ -37,15 +37,15 @@ func (o *OpenMeterMetricsRepository) Init(source string) error { return nil } -func (o *OpenMeterMetricsRepository) SetGauge(name string, data map[string]interface{}, value float64) error { +func (o *OpenMeterUsageRepository) SetGauge(name string, data map[string]interface{}, value float64) error { return o.sendEvent(name, data) } -func (o *OpenMeterMetricsRepository) IncrementCounter(name string, data map[string]interface{}, value float64) error { +func (o *OpenMeterUsageRepository) IncrementCounter(name string, data map[string]interface{}, value float64) error { return o.sendEvent(name, data) } -func (o *OpenMeterMetricsRepository) sendEvent(name string, data map[string]interface{}) error { +func (o *OpenMeterUsageRepository) sendEvent(name string, data map[string]interface{}) error { // NOTE: in openmeter, meters are really just counters with different aggregation functions so you don't need // separate functions defined here (i.e. gauge, counter). // Events are based directly on the data payload and "value" is unused. diff --git a/pkg/repository/metrics/metrics_prometheus.go b/pkg/repository/usage/usage_prometheus.go similarity index 99% rename from pkg/repository/metrics/metrics_prometheus.go rename to pkg/repository/usage/usage_prometheus.go index 1a0b98d72..93fe00404 100644 --- a/pkg/repository/metrics/metrics_prometheus.go +++ b/pkg/repository/usage/usage_prometheus.go @@ -36,7 +36,7 @@ type PrometheusMetricsRepository struct { histogramVecs *common.SafeMap[*prometheus.HistogramVec] } -func NewPrometheusMetricsRepository(promConfig types.PrometheusConfig) repository.MetricsRepository { +func NewPrometheusMetricsRepository(promConfig types.PrometheusConfig) repository.UsageMetricsRepository { collectorRegistrar := prometheus.NewRegistry() collectorRegistrar.MustRegister( collectors.NewGoCollector(), // Metrics from Go runtime. diff --git a/pkg/scheduler/metrics.go b/pkg/scheduler/metrics.go deleted file mode 100644 index c5f4559dc..000000000 --- a/pkg/scheduler/metrics.go +++ /dev/null @@ -1,42 +0,0 @@ -package scheduler - -import ( - "github.com/beam-cloud/beta9/pkg/repository" - "github.com/beam-cloud/beta9/pkg/types" -) - -type SchedulerMetrics struct { - metricsRepo repository.MetricsRepository -} - -func NewSchedulerMetrics(metricsRepo repository.MetricsRepository) SchedulerMetrics { - return SchedulerMetrics{ - metricsRepo: metricsRepo, - } -} - -func (sm *SchedulerMetrics) CounterIncContainerScheduled(request *types.ContainerRequest) { - if sm.metricsRepo == nil { - return - } - - sm.metricsRepo.IncrementCounter(types.MetricsSchedulerContainerScheduled, map[string]interface{}{ - "value": 1, - "workspace_id": request.WorkspaceId, - "stub_id": request.StubId, - "gpu": request.Gpu, - }, 1.0) -} - -func (sm *SchedulerMetrics) CounterIncContainerRequested(request *types.ContainerRequest) { - if sm.metricsRepo == nil { - return - } - - sm.metricsRepo.IncrementCounter(types.MetricsSchedulerContainerRequested, map[string]interface{}{ - "value": 1, - "workspace_id": request.WorkspaceId, - "stub_id": request.StubId, - "gpu": request.Gpu, - }, 1.0) -} diff --git a/pkg/scheduler/pool_health.go b/pkg/scheduler/pool_health.go index 863869099..3b850559a 100644 --- a/pkg/scheduler/pool_health.go +++ b/pkg/scheduler/pool_health.go @@ -5,6 +5,7 @@ import ( "errors" "time" + "github.com/beam-cloud/beta9/pkg/metrics" "github.com/beam-cloud/beta9/pkg/repository" "github.com/beam-cloud/beta9/pkg/types" "github.com/rs/zerolog/log" @@ -150,6 +151,7 @@ func (p *PoolHealthMonitor) getPoolState() (*types.WorkerPoolState, error) { } latency := time.Unix(container.StartedAt, 0).Sub(time.Unix(container.ScheduledAt, 0)) + metrics.RecordContainerStartLatency(latency) schedulingLatencies = append(schedulingLatencies, latency) } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index df7691707..f718b4308 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -10,6 +10,7 @@ import ( "time" "github.com/beam-cloud/beta9/pkg/common" + "github.com/beam-cloud/beta9/pkg/metrics" "github.com/beam-cloud/beta9/pkg/network" repo "github.com/beam-cloud/beta9/pkg/repository" "github.com/beam-cloud/beta9/pkg/types" @@ -29,11 +30,11 @@ type Scheduler struct { containerRepo repo.ContainerRepository workspaceRepo repo.WorkspaceRepository eventRepo repo.EventRepository - schedulerMetrics SchedulerMetrics + schedulerUsage SchedulerUsage eventBus *common.EventBus } -func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, metricsRepo repo.MetricsRepository, backendRepo repo.BackendRepository, workspaceRepo repo.WorkspaceRepository, tailscale *network.Tailscale) (*Scheduler, error) { +func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, usageRepo repo.UsageMetricsRepository, backendRepo repo.BackendRepository, workspaceRepo repo.WorkspaceRepository, tailscale *network.Tailscale) (*Scheduler, error) { eventBus := common.NewEventBus(redisClient) workerRepo := repo.NewWorkerRedisRepository(redisClient, config.Worker) providerRepo := repo.NewProviderRedisRepository(redisClient) @@ -41,7 +42,7 @@ func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *comm containerRepo := repo.NewContainerRedisRepository(redisClient) workerPoolRepo := repo.NewWorkerPoolRedisRepository(redisClient) - schedulerMetrics := NewSchedulerMetrics(metricsRepo) + schedulerUsage := NewSchedulerUsage(usageRepo) eventRepo := repo.NewTCPEventClientRepo(config.Monitoring.FluentBit.Events) // Load worker pools @@ -99,7 +100,7 @@ func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *comm workerPoolManager: workerPoolManager, requestBacklog: requestBacklog, containerRepo: containerRepo, - schedulerMetrics: schedulerMetrics, + schedulerUsage: schedulerUsage, eventRepo: eventRepo, workspaceRepo: workspaceRepo, }, nil @@ -120,7 +121,7 @@ func (s *Scheduler) Run(request *types.ContainerRequest) error { } } - go s.schedulerMetrics.CounterIncContainerRequested(request) + go s.schedulerUsage.CounterIncContainerRequested(request) go s.eventRepo.PushContainerRequestedEvent(request) quota, err := s.getConcurrencyLimit(request) @@ -298,7 +299,12 @@ func (s *Scheduler) StartProcessingRequests() { err = s.scheduleRequest(worker, request) if err != nil { s.addRequestToBacklog(request) + continue } + + // Record the request processing duration + schedulingDuration := time.Since(request.Timestamp) + metrics.RecordRequestSchedulingDuration(schedulingDuration, request) } } @@ -309,7 +315,7 @@ func (s *Scheduler) scheduleRequest(worker *types.Worker, request *types.Contain request.Gpu = worker.Gpu - go s.schedulerMetrics.CounterIncContainerScheduled(request) + go s.schedulerUsage.CounterIncContainerScheduled(request) go s.eventRepo.PushContainerScheduledEvent(request.ContainerId, worker.Id, request) return s.workerRepo.ScheduleContainerRequest(worker, request) } @@ -465,6 +471,8 @@ func (s *Scheduler) addRequestToBacklog(request *types.ContainerRequest) error { return s.requestBacklog.Push(request) } + go metrics.RecordRequestRetry(request) + // TODO: add some sort of signaling mechanism to alert the caller if the request failed to be pushed to the requestBacklog go func() { if request.RetryCount < maxScheduleRetryCount && time.Since(request.Timestamp) < maxScheduleRetryDuration { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 0c192e137..e21e55c68 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -45,8 +45,8 @@ func NewSchedulerForTest() (*Scheduler, error) { config := configManager.GetConfig() eventRepo := repo.NewTCPEventClientRepo(config.Monitoring.FluentBit.Events) - schedulerMetrics := SchedulerMetrics{ - metricsRepo: nil, + schedulerUsage := SchedulerUsage{ + UsageRepo: nil, } workerPoolManager := NewWorkerPoolManager(false) @@ -66,7 +66,7 @@ func NewSchedulerForTest() (*Scheduler, error) { workerPoolManager: workerPoolManager, requestBacklog: requestBacklog, containerRepo: containerRepo, - schedulerMetrics: schedulerMetrics, + schedulerUsage: schedulerUsage, eventRepo: eventRepo, workspaceRepo: workspaceRepo, }, nil diff --git a/pkg/scheduler/usage.go b/pkg/scheduler/usage.go new file mode 100644 index 000000000..2692e2a8c --- /dev/null +++ b/pkg/scheduler/usage.go @@ -0,0 +1,42 @@ +package scheduler + +import ( + "github.com/beam-cloud/beta9/pkg/repository" + "github.com/beam-cloud/beta9/pkg/types" +) + +type SchedulerUsage struct { + UsageRepo repository.UsageMetricsRepository +} + +func NewSchedulerUsage(usageRepo repository.UsageMetricsRepository) SchedulerUsage { + return SchedulerUsage{ + UsageRepo: usageRepo, + } +} + +func (sm *SchedulerUsage) CounterIncContainerScheduled(request *types.ContainerRequest) { + if sm.UsageRepo == nil { + return + } + + sm.UsageRepo.IncrementCounter(types.MetricsSchedulerContainerScheduled, map[string]interface{}{ + "value": 1, + "workspace_id": request.WorkspaceId, + "stub_id": request.StubId, + "gpu": request.Gpu, + }, 1.0) +} + +func (sm *SchedulerUsage) CounterIncContainerRequested(request *types.ContainerRequest) { + if sm.UsageRepo == nil { + return + } + + sm.UsageRepo.IncrementCounter(types.MetricsSchedulerContainerRequested, map[string]interface{}{ + "value": 1, + "workspace_id": request.WorkspaceId, + "stub_id": request.StubId, + "gpu": request.Gpu, + }, 1.0) +} diff --git a/pkg/types/config.go b/pkg/types/config.go index 79d80c4c4..c353427ea 100644 --- a/pkg/types/config.go +++ b/pkg/types/config.go @@ -412,13 +412,21 @@ var ( ) type MonitoringConfig struct { - MetricsCollector string `key:"metricsCollector" json:"metrics_collector"` - Prometheus PrometheusConfig `key:"prometheus" json:"prometheus"` - OpenMeter OpenMeterConfig `key:"openmeter" json:"openmeter"` - FluentBit FluentBitConfig `key:"fluentbit" json:"fluentbit"` - Telemetry TelemetryConfig `key:"telemetry" json:"telemetry"` - ContainerMetricsInterval time.Duration `key:"containerMetricsInterval" json:"container_metrics_interval"` + MetricsCollector string `key:"metricsCollector" json:"metrics_collector"` + Prometheus PrometheusConfig `key:"prometheus" json:"prometheus"` + OpenMeter OpenMeterConfig `key:"openmeter" json:"openmeter"` + FluentBit FluentBitConfig `key:"fluentbit" json:"fluentbit"` + Telemetry TelemetryConfig `key:"telemetry" json:"telemetry"` + ContainerMetricsInterval time.Duration `key:"containerMetricsInterval" json:"container_metrics_interval"` + VictoriaMetrics VictoriaMetricsConfig `key:"victoriametrics" json:"victoriametrics"` } + +type VictoriaMetricsConfig struct { + PushURL string `key:"pushURL" json:"push_url"` + AuthToken string `key:"authToken" json:"auth_token"` + PushSecs int `key:"pushSecs" json:"push_secs"` +} + type PrometheusConfig struct { AgentUrl string `key:"agentUrl" json:"agent_url"` AgentUsername string `key:"agentUsername" json:"agent_username"` diff --git a/pkg/worker/image.go b/pkg/worker/image.go index 60806f444..f1f21a034 100644 --- a/pkg/worker/image.go +++ b/pkg/worker/image.go @@ -13,6 +13,8 @@ import ( "github.com/beam-cloud/beta9/pkg/abstractions/image" common "github.com/beam-cloud/beta9/pkg/common" + "github.com/beam-cloud/beta9/pkg/metrics" + "github.com/beam-cloud/beta9/pkg/registry" types "github.com/beam-cloud/beta9/pkg/types" pb "github.com/beam-cloud/beta9/proto" blobcache "github.com/beam-cloud/blobcache-v2/pkg" @@ -29,8 +31,9 @@ import ( ) const ( - imageBundlePath string = "/dev/shm/images" - imageTmpDir string = "/tmp" + imageBundlePath string = "/dev/shm/images" + imageTmpDir string = "/tmp" + metricsSourceLabel = "image_client" ) var ( @@ -60,8 +63,28 @@ func getImageMountPath(workerId string) string { return path } +type PathInfo struct { + Path string + Size float64 +} + +func NewPathInfo(path string) *PathInfo { + size := float64(0) + info, err := os.Stat(path) + if err == nil { + size = float64(info.Size()) / 1024 / 1024 + } else { + log.Warn().Err(err).Str("path", path).Msg("unable to get bundle path size") + } + + return &PathInfo{ + Path: path, + Size: size, + } +} + type ImageClient struct { - registry *common.ImageRegistry + registry *registry.ImageRegistry cacheClient *blobcache.BlobCacheClient imageCachePath string imageMountPath string @@ -75,7 +98,7 @@ type ImageClient struct { } func NewImageClient(config types.AppConfig, workerId string, workerRepoClient pb.WorkerRepositoryServiceClient, fileCacheManager *FileCacheManager) (*ImageClient, error) { - registry, err := common.NewImageRegistry(config.ImageService) + registry, err := registry.NewImageRegistry(config) if err != nil { return nil, err } @@ -129,6 +152,7 @@ func (c *ImageClient) PullLazy(ctx context.Context, request *types.ContainerRequ if _, err := os.Stat(baseBlobFsContentPath); err == nil && c.cacheClient.IsPathCachedNearby(ctx, "/"+sourcePath) { localCachePath = baseBlobFsContentPath } else { + pullStartTime := time.Now() c.logger.Log(request.ContainerId, request.StubId, "image <%s> not found in cache, caching nearby", imageId) // Otherwise, lets cache it in a nearby blobcache host @@ -138,6 +162,7 @@ func (c *ImageClient) PullLazy(ctx context.Context, request *types.ContainerRequ } else { c.logger.Log(request.ContainerId, request.StubId, "unable to cache image nearby <%s>: %v\n", imageId, err) } + metrics.RecordImagePullTime(time.Since(pullStartTime)) } } @@ -252,6 +277,7 @@ func (c *ExecWriter) Write(p []byte) (n int, err error) { func (c *ImageClient) BuildAndArchiveImage(ctx context.Context, outputLogger *slog.Logger, request *types.ContainerRequest) error { outputLogger.Info("Building image from Dockerfile\n") + startTime := time.Now() buildCtxPath, err := getBuildContext(request) if err != nil { @@ -273,8 +299,8 @@ func (c *ImageClient) BuildAndArchiveImage(ctx context.Context, outputLogger *sl imagePath := filepath.Join(buildPath, "image") ociPath := filepath.Join(buildPath, "oci") - tmpBundlePath := filepath.Join(c.imageBundlePath, request.ImageId) - defer os.RemoveAll(tmpBundlePath) + tmpBundlePath := NewPathInfo(filepath.Join(c.imageBundlePath, request.ImageId)) + defer os.RemoveAll(tmpBundlePath.Path) os.MkdirAll(imagePath, 0755) os.MkdirAll(ociPath, 0755) @@ -293,7 +319,15 @@ func (c *ImageClient) BuildAndArchiveImage(ctx context.Context, outputLogger *sl if err != nil { return err } + ociImageInfo, err := os.Stat(ociPath) + if err == nil { + ociImageMB := float64(ociImageInfo.Size()) / 1024 / 1024 + metrics.RecordImageBuildSpeed(ociImageMB, time.Since(startTime)) + } else { + log.Warn().Err(err).Str("path", ociPath).Msg("unable to inspect image size") + } + startTime = time.Now() engine, err := dir.Open(ociPath) if err != nil { return err @@ -305,13 +339,13 @@ func (c *ImageClient) BuildAndArchiveImage(ctx context.Context, outputLogger *sl engineExt := casext.NewEngine(engine) defer engineExt.Close() - err = umoci.Unpack(engineExt, "latest", tmpBundlePath, unpackOptions) + err = umoci.Unpack(engineExt, "latest", tmpBundlePath.Path, unpackOptions) if err != nil { return err } for _, dir := range requiredContainerDirectories { - fullPath := filepath.Join(tmpBundlePath, "rootfs", dir) + fullPath := filepath.Join(tmpBundlePath.Path, "rootfs", dir) err := os.MkdirAll(fullPath, 0755) if err != nil { return err @@ -345,30 +379,43 @@ func (c *ImageClient) PullAndArchiveImage(ctx context.Context, outputLogger *slo dest := fmt.Sprintf("oci:%s:%s", baseImage.Repo, baseImage.Tag) - outputLogger.Info("Copying image...\n") + imageBytes, err := c.skopeoClient.InspectSizeInBytes(ctx, *request.BuildOptions.SourceImage, request.BuildOptions.SourceImageCreds) + if err != nil { + log.Warn().Err(err).Msg("unable to inspect image size") + } + imageSizeMB := float64(imageBytes) / 1024 / 1024 + + outputLogger.Info(fmt.Sprintf("Copying image (size: %.2f MB)...\n", imageSizeMB)) + startTime := time.Now() err = c.skopeoClient.Copy(ctx, *request.BuildOptions.SourceImage, dest, request.BuildOptions.SourceImageCreds) if err != nil { return err } + metrics.RecordImageCopySpeed(imageSizeMB, time.Since(startTime)) outputLogger.Info("Unpacking image...\n") - tmpBundlePath := filepath.Join(baseTmpBundlePath, request.ImageId) + tmpBundlePath := NewPathInfo(filepath.Join(baseTmpBundlePath, request.ImageId)) err = c.unpack(ctx, baseImage.Repo, baseImage.Tag, tmpBundlePath) if err != nil { return fmt.Errorf("unable to unpack image: %v", err) } - defer os.RemoveAll(baseTmpBundlePath) defer os.RemoveAll(copyDir) outputLogger.Info("Archiving base image...\n") - return c.Archive(ctx, tmpBundlePath, request.ImageId, nil) + err = c.Archive(ctx, tmpBundlePath, request.ImageId, nil) + if err != nil { + return err + } + + return nil } -func (c *ImageClient) unpack(ctx context.Context, baseImageName string, baseImageTag string, bundlePath string) error { +func (c *ImageClient) unpack(ctx context.Context, baseImageName string, baseImageTag string, bundlePath *PathInfo) error { if ctx.Err() != nil { return ctx.Err() } + startTime := time.Now() unpackOptions := umociUnpackOptions() @@ -383,7 +430,7 @@ func (c *ImageClient) unpack(ctx context.Context, baseImageName string, baseImag engineExt := casext.NewEngine(engine) defer engineExt.Close() - tmpBundlePath := filepath.Join(bundlePath + "_") + tmpBundlePath := filepath.Join(bundlePath.Path + "_") err = umoci.Unpack(engineExt, baseImageTag, tmpBundlePath, unpackOptions) if err == nil { for _, dir := range requiredContainerDirectories { @@ -395,14 +442,15 @@ func (c *ImageClient) unpack(ctx context.Context, baseImageName string, baseImag } } - return os.Rename(tmpBundlePath, bundlePath) + return os.Rename(tmpBundlePath, bundlePath.Path) } + metrics.RecordImageUnpackSpeed(bundlePath.Size, time.Since(startTime)) return err } // Generate and upload archived version of the image for distribution -func (c *ImageClient) Archive(ctx context.Context, bundlePath string, imageId string, progressChan chan int) error { +func (c *ImageClient) Archive(ctx context.Context, bundlePath *PathInfo, imageId string, progressChan chan int) error { if ctx.Err() != nil { return ctx.Err() } @@ -418,9 +466,9 @@ func (c *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st var err error = nil switch c.config.ImageService.RegistryStore { - case common.S3ImageRegistryStore: + case registry.S3ImageRegistryStore: err = clip.CreateAndUploadArchive(ctx, clip.CreateOptions{ - InputPath: bundlePath, + InputPath: bundlePath.Path, OutputPath: archivePath, Credentials: storage.ClipStorageCredentials{ S3: &storage.S3ClipStorageCredentials{ @@ -436,9 +484,9 @@ func (c *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st Key: fmt.Sprintf("%s.clip", imageId), ForcePathStyle: c.config.ImageService.Registries.S3.ForcePathStyle, }) - case common.LocalImageRegistryStore: + case registry.LocalImageRegistryStore: err = clip.CreateArchive(clip.CreateOptions{ - InputPath: bundlePath, + InputPath: bundlePath.Path, OutputPath: archivePath, }) } @@ -448,6 +496,7 @@ func (c *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st return err } log.Info().Str("container_id", imageId).Dur("duration", time.Since(startTime)).Msg("container archive took") + metrics.RecordImageArchiveSpeed(bundlePath.Size, time.Since(startTime)) // Push the archive to a registry startTime = time.Now() @@ -458,6 +507,7 @@ func (c *ImageClient) Archive(ctx context.Context, bundlePath string, imageId st } log.Info().Str("image_id", imageId).Dur("duration", time.Since(startTime)).Msg("image push took") + metrics.RecordImagePushSpeed(bundlePath.Size, time.Since(startTime)) return nil } diff --git a/pkg/worker/lifecycle.go b/pkg/worker/lifecycle.go index 1c6cef181..0b24707b2 100644 --- a/pkg/worker/lifecycle.go +++ b/pkg/worker/lifecycle.go @@ -644,7 +644,7 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output } // Log metrics - go s.workerMetrics.EmitContainerUsage(ctx, request) + go s.workerUsageMetrics.EmitContainerUsage(ctx, request) go s.eventRepo.PushContainerStartedEvent(containerId, s.workerId, request) defer func() { go s.eventRepo.PushContainerStoppedEvent(containerId, s.workerId, request) }() diff --git a/pkg/worker/metrics.go b/pkg/worker/metrics.go deleted file mode 100644 index f82e72cc3..000000000 --- a/pkg/worker/metrics.go +++ /dev/null @@ -1,67 +0,0 @@ -package worker - -import ( - "context" - "time" - - repo "github.com/beam-cloud/beta9/pkg/repository" - metrics "github.com/beam-cloud/beta9/pkg/repository/metrics" - - types "github.com/beam-cloud/beta9/pkg/types" -) - -type WorkerMetrics struct { - workerId string - metricsRepo repo.MetricsRepository - ctx context.Context -} - -func NewWorkerMetrics( - ctx context.Context, - workerId string, - config types.MonitoringConfig, -) (*WorkerMetrics, error) { - metricsRepo, err := metrics.NewMetrics(config, string(metrics.MetricsSourceWorker)) - if err != nil { - return nil, err - } - - return &WorkerMetrics{ - ctx: ctx, - workerId: workerId, - metricsRepo: metricsRepo, - }, nil -} - -func (wm *WorkerMetrics) metricsContainerDuration(request *types.ContainerRequest, duration time.Duration) { - wm.metricsRepo.IncrementCounter(types.MetricsWorkerContainerDuration, map[string]interface{}{ - "container_id": request.ContainerId, - "worker_id": wm.workerId, - "stub_id": request.StubId, - "workspace_id": request.WorkspaceId, - "cpu_millicores": request.Cpu, - "mem_mb": request.Memory, - "gpu": request.Gpu, - "gpu_count": request.GpuCount, - "duration_ms": duration.Milliseconds(), - }, float64(duration.Milliseconds())) -} - -// Periodically send metrics to track container duration -func (wm *WorkerMetrics) EmitContainerUsage(ctx context.Context, request *types.ContainerRequest) { - cursorTime := time.Now() - ticker := time.NewTicker(types.ContainerDurationEmissionInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - go wm.metricsContainerDuration(request, time.Since(cursorTime)) - cursorTime = time.Now() - case <-ctx.Done(): - // Consolidate any remaining time - go wm.metricsContainerDuration(request, time.Since(cursorTime)) - return - } - } -} diff --git a/pkg/worker/runc_server.go b/pkg/worker/runc_server.go index 1fa1be7e0..ca3f567a3 100644 --- a/pkg/worker/runc_server.go +++ b/pkg/worker/runc_server.go @@ -270,8 +270,9 @@ func (s *RunCServer) RunCArchive(req *pb.RunCArchiveRequest, stream pb.RunCServi wg.Wait() }() + topLayerPath := NewPathInfo(instance.Overlay.TopLayerPath()) err = stream.Send(&pb.RunCArchiveResponse{ - Done: true, Success: s.imageClient.Archive(ctx, instance.Overlay.TopLayerPath(), req.ImageId, progressChan) == nil, + Done: true, Success: s.imageClient.Archive(ctx, topLayerPath, req.ImageId, progressChan) == nil, }) close(doneChan) diff --git a/pkg/worker/usage.go b/pkg/worker/usage.go new file mode 100644 index 000000000..a13100f39 --- /dev/null +++ b/pkg/worker/usage.go @@ -0,0 +1,67 @@ +package worker + +import ( + "context" + "time" + + repo "github.com/beam-cloud/beta9/pkg/repository" + usage "github.com/beam-cloud/beta9/pkg/repository/usage" + + types "github.com/beam-cloud/beta9/pkg/types" +) + +type WorkerUsageMetrics struct { + workerId string + usageRepo repo.UsageMetricsRepository + ctx context.Context +} + +func NewWorkerUsageMetrics( + ctx context.Context, + workerId string, + config types.MonitoringConfig, +) (*WorkerUsageMetrics, error) { + metricsRepo, err := usage.NewUsage(config, string(usage.MetricsSourceWorker)) + if err != nil { + return nil, err + } + + return &WorkerUsageMetrics{ + ctx: ctx, + workerId: workerId, + usageRepo: metricsRepo, + }, nil +} + +func (wm *WorkerUsageMetrics) usageContainerDuration(request *types.ContainerRequest, duration time.Duration) { + wm.usageRepo.IncrementCounter(types.MetricsWorkerContainerDuration, map[string]interface{}{ + "container_id": request.ContainerId, + "worker_id": wm.workerId, + "stub_id": request.StubId, + "workspace_id": request.WorkspaceId, + "cpu_millicores": request.Cpu, + "mem_mb": request.Memory, + "gpu": request.Gpu, + "gpu_count": request.GpuCount, + "duration_ms": duration.Milliseconds(), + }, float64(duration.Milliseconds())) +} + +// Periodically send usage to track container duration +func (wm *WorkerUsageMetrics) EmitContainerUsage(ctx context.Context, request *types.ContainerRequest) { + cursorTime := time.Now() + ticker := time.NewTicker(types.ContainerDurationEmissionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + go wm.usageContainerDuration(request, time.Since(cursorTime)) + cursorTime = time.Now() + case <-ctx.Done(): + // Consolidate any remaining time + go wm.usageContainerDuration(request, time.Since(cursorTime)) + return + } + } +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 90282ce74..34742b343 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -56,7 +56,7 @@ type Worker struct { containerLock sync.Mutex containerWg sync.WaitGroup containerLogger *ContainerLogger - workerMetrics *WorkerMetrics + workerUsageMetrics *WorkerUsageMetrics completedRequests chan *types.ContainerRequest stopContainerChan chan stopContainerEvent workerRepoClient pb.WorkerRepositoryServiceClient @@ -216,7 +216,7 @@ func NewWorker() (*Worker, error) { return nil, err } - workerMetrics, err := NewWorkerMetrics(ctx, workerId, config.Monitoring) + workerMetrics, err := NewWorkerUsageMetrics(ctx, workerId, config.Monitoring) if err != nil { cancel() return nil, err @@ -252,7 +252,7 @@ func NewWorker() (*Worker, error) { containerInstances: containerInstances, logLinesPerHour: config.Worker.ContainerLogLinesPerHour, }, - workerMetrics: workerMetrics, + workerUsageMetrics: workerMetrics, containerRepoClient: containerRepoClient, workerRepoClient: workerRepoClient, eventRepo: eventRepo,