Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
feat: #164 wip implementation of collection rotation strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed Feb 18, 2022
1 parent 221282e commit 6110d9b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 2 deletions.
17 changes: 16 additions & 1 deletion server-go/http/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/riotkit-org/backup-repository/core"
"github.com/riotkit-org/backup-repository/security"
"github.com/sirupsen/logrus"
"io"
"time"
)
Expand All @@ -15,8 +16,8 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
// todo: check if rotation strategy allows uploading
// todo: deactivate token if temporary token is used
// todo: check uploaded file size, respect quotas and additional space
// todo: check if there are gpg header and footer
// todo: handle upload interruptions
// todo: locking support! There should be no concurrent uploads to the same collection

ctxUser, _ := GetContextUser(ctx, c)

Expand Down Expand Up @@ -49,6 +50,17 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
return
}

// Check rotation strategy: Is it allowed to upload? Is there enough space?
rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection)
if strategyFactorialError != nil {
logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError))
ServerErrorResponse(c, errors.New("internal error while trying to create rotation strategy. Check server logs"))
}
if err := rotationStrategyCase.CanUpload(version); err != nil {
UnauthorizedResponse(c, errors.New(fmt.Sprintf("backup collection strategy declined a possibility to upload, %v", err)))
return
}

var stream io.ReadCloser

// Support form data
Expand Down Expand Up @@ -82,8 +94,11 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
version.Filesize = wroteLen

// Append version to the registry
// todo
//ctx.Storage.SubmitVersion(version)
//ctx.Storage.CleanUpOlderVersions(rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version))

// todo: Rotate collection
// todo: add UploadedVersion to database
OKResponse(c, gin.H{
"version": version,
Expand Down
10 changes: 10 additions & 0 deletions server-go/storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ func (vr VersionsRepository) findLastHighestVersionNumber(collectionId string) (
return maxNum, nil
}

func (vr VersionsRepository) findAllVersionsForCollectionId(collectionId string) ([]UploadedVersion, error) {
var foundVersions []UploadedVersion

err := vr.db.Model(&UploadedVersion{}).Where("uploaded_versions.collection_id = ?", collectionId).Order("uploaded_versions.version_number DESC").Find(&foundVersions).Error
if err != nil {
return []UploadedVersion{}, err
}
return foundVersions, nil
}

func InitializeModel(db *gorm.DB) error {
return db.AutoMigrate(&UploadedVersion{})
}
51 changes: 51 additions & 0 deletions server-go/storage/rotation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package storage

import (
"github.com/riotkit-org/backup-repository/collections"
"sort"
)

type RotationStrategy interface {
// CanUpload returns nil if YES, error if NO
CanUpload(version UploadedVersion) error

// GetVersionsThatShouldBeDeletedIfThisVersionUploaded lists all the versions that should be deleted if a new version would be submitted
GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version UploadedVersion) []UploadedVersion
}

//
// FifoRotationStrategy implements a simple queue, first is appended, oldest will be deleted
//
type FifoRotationStrategy struct {
collection *collections.Collection
existingVersions []UploadedVersion
}

func (frs *FifoRotationStrategy) CanUpload(version UploadedVersion) error {
return nil
}

func (frs *FifoRotationStrategy) GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version UploadedVersion) []UploadedVersion {
existingVersions := frs.existingVersions

// nothing to do, there is still enough slots
if len(existingVersions) < frs.collection.Spec.MaxBackupsCount {
return []UploadedVersion{}
}

// order by version number DESCENDING
sort.SliceStable(&existingVersions, func(i, j int) bool {
return existingVersions[i].VersionNumber > existingVersions[j].VersionNumber
})

// oldest element
return existingVersions[0:1]
}

// todo: test reference
func NewFifoRotationStrategy(collection *collections.Collection, existingVersions []UploadedVersion) *FifoRotationStrategy {
return &FifoRotationStrategy{
collection: collection,
existingVersions: existingVersions,
}
}
1 change: 1 addition & 0 deletions server-go/storage/rotation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package storage
14 changes: 14 additions & 0 deletions server-go/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ func (s *Service) CreateNewVersionFromCollection(c *collections.Collection, uplo
}, nil
}

func (s *Service) CreateRotationStrategyCase(collection *collections.Collection) (RotationStrategy, error) {
foundVersions, err := s.repository.findAllVersionsForCollectionId(collection.Metadata.Name)
if err != nil {
return &FifoRotationStrategy{}, errors.New(fmt.Sprintf("cannot construct rotation strategy, cannot findAllVersionsForCollectionId, error: %v", err))
}

if collection.Spec.StrategyName == "fifo" {
return NewFifoRotationStrategy(collection, foundVersions), nil
}

return &FifoRotationStrategy{}, errors.New(fmt.Sprintf("collection configuration error: unrecognized backup strategy type '%v'", collection.Spec.StrategyName))
}

// NewService is a factory method that knows how to construct a Storage provider, distincting multiple types of providers
func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error) {
repository := VersionsRepository{db: db}
Expand All @@ -74,6 +87,7 @@ func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error)
return Service{storage: driver, repository: &repository}, nil
}

// AWS S3, Min.io, CEPH and others compatible with S3 protocol
driver, err := blob.OpenBucket(context.Background(), driverUrl)
if err != nil {
logrus.Errorf("Cannot construct storage driver: %v", err)
Expand Down
1 change: 0 additions & 1 deletion server-go/storage/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion
}

middlewares := nestedStreamMiddlewares{
// todo: add a middleware to abort file upload if filesize reached the limit
s.createNonEmptyMiddleware(),
s.createGPGStreamMiddleware(),
}
Expand Down

0 comments on commit 6110d9b

Please sign in to comment.