Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation #1001

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
buf.build/gen/go/cedana/task/protocolbuffers/go v1.35.2-20241203191352-12c25eb032cd.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Masterminds/squirrel v1.5.4
github.com/VictoriaMetrics/metrics v1.35.2
github.com/alicebob/miniredis/v2 v2.30.5
github.com/aws/aws-sdk-go-v2 v1.30.1
github.com/aws/aws-sdk-go-v2/config v1.27.24
Expand Down Expand Up @@ -255,7 +256,9 @@ require (
github.com/ulikunitz/xz v0.5.12 // indirect
github.com/urfave/cli v1.22.14 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/vbatts/go-mtree v0.5.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xeonx/timeago v1.0.0-rc5 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/VictoriaMetrics/metrics v1.35.2 h1:Bj6L6ExfnakZKYPpi7mGUnkJP4NGQz2v5wiChhXNyWQ=
github.com/VictoriaMetrics/metrics v1.35.2/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/akutz/memconn v0.1.0 h1:NawI0TORU4hcOMsMr11g7vwlCdkYeLKXBcxWu2W/P8A=
github.com/akutz/memconn v0.1.0/go.mod h1:Jo8rI7m0NieZyLI5e2CDlRdRqRRB4S7Xp77ukDjH+Fw=
github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI=
Expand Down Expand Up @@ -649,8 +651,12 @@ github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
github.com/vbatts/go-mtree v0.5.0 h1:dM+5XZdqH0j9CSZeerhoN/tAySdwnmevaZHO1XGW2Vc=
github.com/vbatts/go-mtree v0.5.0/go.mod h1:7JbaNHyBMng+RP8C3Q4E+4Ca8JnGQA2R/MB+jb4tSOk=
github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs=
Expand Down
30 changes: 26 additions & 4 deletions manifests/kustomize/overlays/cluster-dev/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,35 @@ patches:
path: /spec/template/spec/initContainers/0/resources
- op: remove
path: /spec/template/spec/containers/0/resources
# Disable unused monitoring components
- target:
kind: Deployment
name: victoria-metrics-agent
patch: |-
- op: replace
path: /spec/replicas
value: 0
value: 1
- target:
kind: StatefulSet
name: victoria-metrics-single
patch: |-
- op: replace
path: /spec/replicas
value: 0
value: 1
- target:
kind: StatefulSet
name: victoria-logs-single
patch: |-
- op: replace
path: /spec/replicas
value: 0
value: 1
- target:
kind: Deployment
name: grafana
patch: |-
- op: replace
path: /spec/replicas
value: 1

# Replace RollingUpdate with Recreate for all Deployments in this overlay
- target:
kind: Deployment
Expand All @@ -68,3 +75,18 @@ patches:
path: /spec/strategy
value:
type: Recreate

- target:
kind: Service
name: grafana
patch: |-
- op: replace
path: /spec/type
value: LoadBalancer
- op: replace
path: /spec/ports
value:
- name: service
port: 3000
targetPort: 3000
protocol: TCP
4 changes: 4 additions & 0 deletions pkg/abstractions/endpoint/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

abstractions "github.com/beam-cloud/beta9/pkg/abstractions/common"
"github.com/beam-cloud/beta9/pkg/common"
"github.com/beam-cloud/beta9/pkg/metrics"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/repository"
"github.com/beam-cloud/beta9/pkg/task"
Expand Down Expand Up @@ -353,10 +354,13 @@ func (rb *RequestBuffer) getHttpClient(address string, timeout time.Duration) (*
return rb.httpClient, nil
}

start := time.Now()
conn, err := network.ConnectToHost(rb.ctx, address, timeout, rb.tailscale, rb.tsConfig)
if err != nil {
return nil, err
}
// Use goroutine to avoid potential blocking from rate limiter's internal mutex
go metrics.RecordDialTime(time.Since(start))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit confused on this, why would it block? the rate limiter doesn't just return if its not "allowed". I could see that other weird issues

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a mutex inside the rate limiter to acquire the count. Its likely a small and quick acquisition, but it could still block


// Create a custom transport that uses the established connection
// Either using tailscale or not
Expand Down
5 changes: 3 additions & 2 deletions pkg/abstractions/image/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/beam-cloud/beta9/pkg/auth"
"github.com/beam-cloud/beta9/pkg/common"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/registry"
"github.com/beam-cloud/beta9/pkg/repository"
"github.com/beam-cloud/beta9/pkg/scheduler"
"github.com/beam-cloud/beta9/pkg/types"
Expand All @@ -42,7 +43,7 @@ const (
type Builder struct {
config types.AppConfig
scheduler *scheduler.Scheduler
registry *common.ImageRegistry
registry *registry.ImageRegistry
containerRepo repository.ContainerRepository
tailscale *network.Tailscale
eventBus *common.EventBus
Expand Down Expand Up @@ -142,7 +143,7 @@ func (o *BuildOpts) addPythonRequirements() {
o.PythonPackages = append(filteredPythonPackages, baseRequirementsSlice...)
}

func NewBuilder(config types.AppConfig, registry *common.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository, rdb *common.RedisClient) (*Builder, error) {
func NewBuilder(config types.AppConfig, registry *registry.ImageRegistry, scheduler *scheduler.Scheduler, tailscale *network.Tailscale, containerRepo repository.ContainerRepository, rdb *common.RedisClient) (*Builder, error) {
return &Builder{
config: config,
scheduler: scheduler,
Expand Down
3 changes: 2 additions & 1 deletion pkg/abstractions/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/beam-cloud/beta9/pkg/auth"
"github.com/beam-cloud/beta9/pkg/common"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/registry"
"github.com/beam-cloud/beta9/pkg/repository"
"github.com/beam-cloud/beta9/pkg/scheduler"
"github.com/beam-cloud/beta9/pkg/types"
Expand Down Expand Up @@ -48,7 +49,7 @@ func NewRuncImageService(
ctx context.Context,
opts ImageServiceOpts,
) (ImageService, error) {
registry, err := common.NewImageRegistry(opts.Config.ImageService)
registry, err := registry.NewImageRegistry(opts.Config)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/abstractions/pod/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

abstractions "github.com/beam-cloud/beta9/pkg/abstractions/common"
"github.com/beam-cloud/beta9/pkg/common"
"github.com/beam-cloud/beta9/pkg/metrics"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/repository"
"github.com/beam-cloud/beta9/pkg/types"
Expand Down Expand Up @@ -178,11 +179,13 @@ func (pb *PodProxyBuffer) handleConnection(conn *connection) {
return
}

start := time.Now()
containerConn, err := network.ConnectToHost(request.Context(), container.addressMap[int32(port)], containerDialTimeoutDurationS, pb.tailscale, pb.tsConfig)
if err != nil {
conn.ctx.String(http.StatusServiceUnavailable, "Failed to connect to service")
return
}
go metrics.RecordDialTime(time.Since(start))
defer containerConn.Close()

abstractions.SetConnOptions(containerConn, true, connectionKeepAliveInterval, connectionReadTimeout)
Expand Down
15 changes: 15 additions & 0 deletions pkg/common/skopeo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (

type SkopeoClient interface {
Inspect(ctx context.Context, sourceImage string, creds string) (ImageMetadata, error)
InspectSizeInBytes(ctx context.Context, sourceImage string, creds string) (int64, error)
Copy(ctx context.Context, sourceImage string, dest string, creds string) error
}

Expand Down Expand Up @@ -88,6 +89,20 @@ func (p *skopeoClient) Inspect(ctx context.Context, sourceImage string, creds st
return imageMetadata, nil
}

func (p *skopeoClient) InspectSizeInBytes(ctx context.Context, sourceImage string, creds string) (int64, error) {
imageMetadata, err := p.Inspect(ctx, sourceImage, creds)
if err != nil {
return 0, err
}

size := int64(0)
for _, layer := range imageMetadata.LayersData {
size += int64(layer.Size)
}

return size, nil
}

func (p *skopeoClient) Copy(ctx context.Context, sourceImage string, dest string, creds string) error {
args := []string{"copy", fmt.Sprintf("docker://%s", sourceImage), dest}

Expand Down
6 changes: 3 additions & 3 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
repositoryservices "github.com/beam-cloud/beta9/pkg/gateway/services/repository"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/repository"
metrics "github.com/beam-cloud/beta9/pkg/repository/metrics"
usage "github.com/beam-cloud/beta9/pkg/repository/usage"
"github.com/beam-cloud/beta9/pkg/scheduler"
"github.com/beam-cloud/beta9/pkg/storage"
"github.com/beam-cloud/beta9/pkg/task"
Expand Down Expand Up @@ -68,7 +68,7 @@ type Gateway struct {
WorkerPoolRepo repository.WorkerPoolRepository
EventRepo repository.EventRepository
Tailscale *network.Tailscale
metricsRepo repository.MetricsRepository
metricsRepo repository.UsageRepository
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be called usageRepo here?

workerRepo repository.WorkerRepository
Storage storage.Storage
Scheduler *scheduler.Scheduler
Expand All @@ -90,7 +90,7 @@ func NewGateway() (*Gateway, error) {
return nil, err
}

metricsRepo, err := metrics.NewMetrics(config.Monitoring, string(metrics.MetricsSourceGateway))
metricsRepo, err := usage.NewUsage(config.Monitoring, string(usage.MetricsSourceGateway))
if err != nil {
return nil, err
}
Expand Down
129 changes: 129 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package metrics

import (
"context"
"fmt"
"os"
"time"

vmetrics "github.com/VictoriaMetrics/metrics"
"github.com/beam-cloud/beta9/pkg/types"
"github.com/rs/zerolog/log"
"tailscale.com/tstime/rate"
)

var (
dialMetricLimiter *rate.Limiter
)

const (
metricRequestSchedulingDuration = "scheduler_request_scheduling_duration_ms"
metricRequestRetries = "scheduler_request_retries"
metricImagePullTime = "worker_image_pull_time_seconds"
metricImageBuildSpeed = "worker_image_build_speed_mbps"
metricImageUnpackSpeed = "worker_image_unpack_speed_mbps"
metricImageCopySpeed = "worker_image_copy_speed_mbps"
metricImageArchiveSpeed = "worker_image_archive_speed_mbps"
metricImagePushSpeed = "worker_image_push_speed_mbps"
metricS3PutSpeed = "s3_put_speed_mbps"
metricS3GetSpeed = "s3_get_speed_mbps"
metricDialTime = "dial_time_ms"
metricContainerStartLatency = "container_start_latency_ms"
)

func InitializeMetricsRepository(config types.VictoriaMetricsConfig) {
// ENV to collect for default labels
workerPoolName := os.Getenv("WORKER_POOL_NAME")
gpuType := os.Getenv("GPU_TYPE")
podHostname := os.Getenv("POD_HOSTNAME")

opts := &vmetrics.PushOptions{
Headers: []string{
fmt.Sprintf("Authorization: Bearer %s", config.AuthToken),
},
}

if workerPoolName != "" && podHostname != "" {
opts.ExtraLabels = fmt.Sprintf(`worker_pool_name="%s",pod_hostname="%s"`, workerPoolName, podHostname)
}

if gpuType != "" {
opts.ExtraLabels = fmt.Sprintf(`%s,gpu_type="%s"`, opts.ExtraLabels, gpuType)
}

vmetrics.GetDefaultSet().InitPushWithOptions(
context.Background(),
config.PushURL,
time.Duration(config.PushSecs)*time.Second,
opts,
)

dialMetricLimiter = rate.NewLimiter(2, 1)
}

func RecordRequestSchedulingDuration(duration time.Duration, request *types.ContainerRequest) {
log.Info().Interface("request", request).Msg("recording request scheduling duration")
metricName := fmt.Sprintf("%s{gpu=\"%s\",gpu_count=\"%d\",cpu=\"%d\",memory=\"%d\"}",
metricRequestSchedulingDuration,
request.Gpu,
request.GpuCount,
request.Cpu,
request.Memory)

vmetrics.GetDefaultSet().GetOrCreateHistogram(metricName).Update(float64(duration.Milliseconds()))
}

func RecordRequestRetry(request *types.ContainerRequest) {
metricName := fmt.Sprintf("%s{container_id=\"%s\",gpu=\"%s\",gpu_count=\"%d\",cpu=\"%d\",memory=\"%d\",retry_count=\"%d\"}",
metricRequestRetries,
request.ContainerId,
request.Gpu,
request.GpuCount,
request.Cpu,
request.Memory,
request.RetryCount)

vmetrics.GetDefaultSet().GetOrCreateCounter(metricName).Inc()
}

func RecordImagePullTime(duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImagePullTime).Update(duration.Seconds())
}

func RecordImageBuildSpeed(sizeInMB float64, duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageBuildSpeed).Update(sizeInMB / duration.Seconds())
}

func RecordImageUnpackSpeed(sizeInMB float64, duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageUnpackSpeed).Update(sizeInMB / duration.Seconds())
}

func RecordImageCopySpeed(sizeInMB float64, duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageCopySpeed).Update(sizeInMB / duration.Seconds())
}

func RecordImageArchiveSpeed(sizeInMB float64, duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImageArchiveSpeed).Update(sizeInMB / duration.Seconds())
}

func RecordImagePushSpeed(sizeInMB float64, duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricImagePushSpeed).Update(sizeInMB / duration.Seconds())
}

func RecordS3PutSpeed(sizeInMB float64, duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricS3PutSpeed).Update(sizeInMB / duration.Seconds())
}

func RecordS3GetSpeed(sizeInMB float64, duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricS3GetSpeed).Update(sizeInMB / duration.Seconds())
}

func RecordDialTime(duration time.Duration) {
if dialMetricLimiter.Allow() {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricDialTime).Update(float64(duration.Milliseconds()))
}
}

func RecordContainerStartLatency(duration time.Duration) {
vmetrics.GetDefaultSet().GetOrCreateHistogram(metricContainerStartLatency).Update(float64(duration.Milliseconds()))
}
Loading
Loading