From 04522e338a5d80acd100ee2c4e5fcc8bf0ce8327 Mon Sep 17 00:00:00 2001 From: Luke Lombardi Date: Tue, 25 Feb 2025 18:00:34 -0500 Subject: [PATCH] wip --- pkg/storage/geese.go | 114 +++++++++++++++++++++++++++++++++++++++++++ pkg/types/config.go | 18 +++++-- 2 files changed, 128 insertions(+), 4 deletions(-) diff --git a/pkg/storage/geese.go b/pkg/storage/geese.go index 75c7da7f1..803157357 100644 --- a/pkg/storage/geese.go +++ b/pkg/storage/geese.go @@ -1,7 +1,13 @@ package storage import ( + "fmt" + "os/exec" + "time" + "github.com/beam-cloud/beta9/pkg/types" + "github.com/cenkalti/backoff" + "github.com/rs/zerolog/log" ) type GeeseStorage struct { @@ -15,6 +21,90 @@ func NewGeeseStorage(config types.GeeseConfig) (Storage, error) { } func (s *GeeseStorage) Mount(localPath string) error { + log.Info().Str("local_path", localPath).Msg("geese filesystem mounting") + + args := []string{} + if s.config.Debug { + args = append(args, "--debug") + } + if s.config.Force { + args = append(args, "-f") + } + if s.config.FsyncOnClose { + args = append(args, "--fsync-on-close") + } + + if s.config.MemoryLimit > 0 { + args = append(args, fmt.Sprintf("--memory-limit=%d", s.config.MemoryLimit)) + } + if s.config.MaxFlushers > 0 { + args = append(args, fmt.Sprintf("--max-flushers=%d", s.config.MaxFlushers)) + } + if s.config.MaxParallelParts > 0 { + args = append(args, fmt.Sprintf("--max-parallel-parts=%d", s.config.MaxParallelParts)) + } + if s.config.PartSizes > 0 { + args = append(args, fmt.Sprintf("--part-sizes=%d", s.config.PartSizes)) + } + if s.config.DirMode != "" { + args = append(args, fmt.Sprintf("--dir-mode=%s", s.config.DirMode)) + } + if s.config.FileMode != "" { + args = append(args, fmt.Sprintf("--file-mode=%s", s.config.FileMode)) + } + if s.config.ListType > 0 { + args = append(args, fmt.Sprintf("--list-type=%d", s.config.ListType)) + } + if s.config.EndpointUrl != "" { + args = append(args, fmt.Sprintf("--endpoint=%s", s.config.EndpointUrl)) + } + + // Add bucket and mount point + args = append(args, s.config.BucketName, localPath) + + cmd := exec.Command("geesefs", args...) + + // Set bucket credentials as env vars + if s.config.AccessKey != "" || s.config.SecretKey != "" { + cmd.Env = append(cmd.Env, + fmt.Sprintf("AWS_ACCESS_KEY_ID=%s", s.config.AccessKey), + fmt.Sprintf("AWS_SECRET_ACCESS_KEY=%s", s.config.SecretKey), + ) + } + + // Start the mount command in the background + go func() { + output, err := cmd.CombinedOutput() + if err != nil { + log.Error().Err(err).Str("output", string(output)).Msg("error executing geesefs mount") + } + }() + + ticker := time.NewTicker(100 * time.Millisecond) + timeout := time.After(30 * time.Second) + + done := make(chan bool) + go func() { + for { + select { + case <-timeout: + done <- false + return + case <-ticker.C: + if isMounted(localPath) { + done <- true + return + } + } + } + }() + + // Wait for confirmation or timeout + if !<-done { + return fmt.Errorf("failed to mount GeeseFS filesystem to: '%s'", localPath) + } + + log.Info().Str("local_path", localPath).Msg("geesefs filesystem mounted") return nil } @@ -23,5 +113,29 @@ func (s *GeeseStorage) Format(fsName string) error { } func (s *GeeseStorage) Unmount(localPath string) error { + geeseFsUmount := func() error { + cmd := exec.Command("geesefs", "umount", localPath) + + output, err := cmd.CombinedOutput() + if err != nil { + log.Error().Err(err).Str("output", string(output)).Msg("error executing geesefs umount") + return err + } + + log.Info().Str("local_path", localPath).Msg("geesefs filesystem unmounted") + return nil + } + + err := backoff.Retry(geeseFsUmount, backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 10)) + if err == nil { + return nil + } + + // Forcefully kill the fuse mount devices + err = exec.Command("fuser", "-k", "/dev/fuse").Run() + if err != nil { + return fmt.Errorf("error executing fuser -k /dev/fuse: %v", err) + } + return nil } diff --git a/pkg/types/config.go b/pkg/types/config.go index 5de31debc..cee237c8b 100644 --- a/pkg/types/config.go +++ b/pkg/types/config.go @@ -203,10 +203,20 @@ type JuiceFSConfig struct { } type GeeseConfig struct { - S3AccessKey string `key:"s3AccessKey" json:"s3_access_key"` - S3SecretKey string `key:"s3SecretKey" json:"s3_secret_key"` - S3EndpointUrl string `key:"s3EndpointURL" json:"s3_endpoint_url"` - S3BucketName string `key:"s3BucketName" json:"s3_bucket_name"` + Debug bool `key:"debug" json:"debug"` // --debug + Force bool `key:"force" json:"force"` // -f (force) + FsyncOnClose bool `key:"fsyncOnClose" json:"fsync_on_close"` // --fsync-on-close + MemoryLimit int64 `key:"memoryLimit" json:"memory_limit"` // --memory-limit + MaxFlushers int `key:"maxFlushers" json:"max_flushers"` // --max-flushers + MaxParallelParts int `key:"maxParallelParts" json:"max_parallel_parts"` // --max-parallel-parts + PartSizes int64 `key:"partSizes" json:"part_sizes"` // --part-sizes + DirMode string `key:"dirMode" json:"dir_mode"` // --dir-mode, e.g., "0777" + FileMode string `key:"fileMode" json:"file_mode"` // --file-mode, e.g., "0666" + ListType int `key:"listType" json:"list_type"` // --list-type + AccessKey string `key:"accessKey" json:"access_key"` + SecretKey string `key:"secretKey" json:"secret_key"` + EndpointUrl string `key:"endpointURL" json:"endpoint_url"` // --endpoint + BucketName string `key:"bucketName" json:"bucket_name"` } // @go2proto