Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Lombardi committed Mar 4, 2025
1 parent c77e494 commit 04522e3
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 4 deletions.
114 changes: 114 additions & 0 deletions pkg/storage/geese.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package storage

import (
"fmt"
"os/exec"
"time"

"github.com/beam-cloud/beta9/pkg/types"
"github.com/cenkalti/backoff"
"github.com/rs/zerolog/log"
)

type GeeseStorage struct {
Expand All @@ -15,6 +21,90 @@ func NewGeeseStorage(config types.GeeseConfig) (Storage, error) {
}

func (s *GeeseStorage) Mount(localPath string) error {
log.Info().Str("local_path", localPath).Msg("geese filesystem mounting")

args := []string{}
if s.config.Debug {
args = append(args, "--debug")
}
if s.config.Force {
args = append(args, "-f")
}
if s.config.FsyncOnClose {
args = append(args, "--fsync-on-close")
}

if s.config.MemoryLimit > 0 {
args = append(args, fmt.Sprintf("--memory-limit=%d", s.config.MemoryLimit))
}
if s.config.MaxFlushers > 0 {
args = append(args, fmt.Sprintf("--max-flushers=%d", s.config.MaxFlushers))
}
if s.config.MaxParallelParts > 0 {
args = append(args, fmt.Sprintf("--max-parallel-parts=%d", s.config.MaxParallelParts))
}
if s.config.PartSizes > 0 {
args = append(args, fmt.Sprintf("--part-sizes=%d", s.config.PartSizes))
}
if s.config.DirMode != "" {
args = append(args, fmt.Sprintf("--dir-mode=%s", s.config.DirMode))
}
if s.config.FileMode != "" {
args = append(args, fmt.Sprintf("--file-mode=%s", s.config.FileMode))
}
if s.config.ListType > 0 {
args = append(args, fmt.Sprintf("--list-type=%d", s.config.ListType))
}
if s.config.EndpointUrl != "" {
args = append(args, fmt.Sprintf("--endpoint=%s", s.config.EndpointUrl))
}

// Add bucket and mount point
args = append(args, s.config.BucketName, localPath)

cmd := exec.Command("geesefs", args...)

// Set bucket credentials as env vars
if s.config.AccessKey != "" || s.config.SecretKey != "" {
cmd.Env = append(cmd.Env,
fmt.Sprintf("AWS_ACCESS_KEY_ID=%s", s.config.AccessKey),
fmt.Sprintf("AWS_SECRET_ACCESS_KEY=%s", s.config.SecretKey),
)
}

// Start the mount command in the background
go func() {
output, err := cmd.CombinedOutput()
if err != nil {
log.Error().Err(err).Str("output", string(output)).Msg("error executing geesefs mount")
}
}()

ticker := time.NewTicker(100 * time.Millisecond)
timeout := time.After(30 * time.Second)

done := make(chan bool)
go func() {
for {
select {
case <-timeout:
done <- false
return
case <-ticker.C:
if isMounted(localPath) {
done <- true
return
}
}
}
}()

// Wait for confirmation or timeout
if !<-done {
return fmt.Errorf("failed to mount GeeseFS filesystem to: '%s'", localPath)
}

log.Info().Str("local_path", localPath).Msg("geesefs filesystem mounted")
return nil
}

Expand All @@ -23,5 +113,29 @@ func (s *GeeseStorage) Format(fsName string) error {
}

func (s *GeeseStorage) Unmount(localPath string) error {
geeseFsUmount := func() error {
cmd := exec.Command("geesefs", "umount", localPath)

output, err := cmd.CombinedOutput()
if err != nil {
log.Error().Err(err).Str("output", string(output)).Msg("error executing geesefs umount")
return err
}

log.Info().Str("local_path", localPath).Msg("geesefs filesystem unmounted")
return nil
}

err := backoff.Retry(geeseFsUmount, backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), 10))
if err == nil {
return nil
}

// Forcefully kill the fuse mount devices
err = exec.Command("fuser", "-k", "/dev/fuse").Run()
if err != nil {
return fmt.Errorf("error executing fuser -k /dev/fuse: %v", err)
}

return nil
}
18 changes: 14 additions & 4 deletions pkg/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,20 @@ type JuiceFSConfig struct {
}

type GeeseConfig struct {
S3AccessKey string `key:"s3AccessKey" json:"s3_access_key"`
S3SecretKey string `key:"s3SecretKey" json:"s3_secret_key"`
S3EndpointUrl string `key:"s3EndpointURL" json:"s3_endpoint_url"`
S3BucketName string `key:"s3BucketName" json:"s3_bucket_name"`
Debug bool `key:"debug" json:"debug"` // --debug
Force bool `key:"force" json:"force"` // -f (force)
FsyncOnClose bool `key:"fsyncOnClose" json:"fsync_on_close"` // --fsync-on-close
MemoryLimit int64 `key:"memoryLimit" json:"memory_limit"` // --memory-limit
MaxFlushers int `key:"maxFlushers" json:"max_flushers"` // --max-flushers
MaxParallelParts int `key:"maxParallelParts" json:"max_parallel_parts"` // --max-parallel-parts
PartSizes int64 `key:"partSizes" json:"part_sizes"` // --part-sizes
DirMode string `key:"dirMode" json:"dir_mode"` // --dir-mode, e.g., "0777"
FileMode string `key:"fileMode" json:"file_mode"` // --file-mode, e.g., "0666"
ListType int `key:"listType" json:"list_type"` // --list-type
AccessKey string `key:"accessKey" json:"access_key"`
SecretKey string `key:"secretKey" json:"secret_key"`
EndpointUrl string `key:"endpointURL" json:"endpoint_url"` // --endpoint
BucketName string `key:"bucketName" json:"bucket_name"`
}

// @go2proto
Expand Down

0 comments on commit 04522e3

Please sign in to comment.