diff --git a/server-go/http/collection.go b/server-go/http/collection.go index 9adcdd8e..1800773d 100644 --- a/server-go/http/collection.go +++ b/server-go/http/collection.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" "github.com/riotkit-org/backup-repository/core" "github.com/riotkit-org/backup-repository/security" + "github.com/sirupsen/logrus" "io" "time" ) @@ -15,8 +16,8 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { // todo: check if rotation strategy allows uploading // todo: deactivate token if temporary token is used // todo: check uploaded file size, respect quotas and additional space - // todo: check if there are gpg header and footer // todo: handle upload interruptions + // todo: locking support! There should be no concurrent uploads to the same collection ctxUser, _ := GetContextUser(ctx, c) @@ -49,6 +50,17 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { return } + // Check rotation strategy: Is it allowed to upload? Is there enough space? + rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection) + if strategyFactorialError != nil { + logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError)) + ServerErrorResponse(c, errors.New("internal error while trying to create rotation strategy. Check server logs")) + } + if err := rotationStrategyCase.CanUpload(version); err != nil { + UnauthorizedResponse(c, errors.New(fmt.Sprintf("backup collection strategy declined a possibility to upload, %v", err))) + return + } + var stream io.ReadCloser // Support form data @@ -82,8 +94,11 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { version.Filesize = wroteLen // Append version to the registry + // todo //ctx.Storage.SubmitVersion(version) + //ctx.Storage.CleanUpOlderVersions(rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version)) + // todo: Rotate collection // todo: add UploadedVersion to database OKResponse(c, gin.H{ "version": version, diff --git a/server-go/storage/repository.go b/server-go/storage/repository.go index bbd10181..291d140a 100644 --- a/server-go/storage/repository.go +++ b/server-go/storage/repository.go @@ -17,6 +17,16 @@ func (vr VersionsRepository) findLastHighestVersionNumber(collectionId string) ( return maxNum, nil } +func (vr VersionsRepository) findAllVersionsForCollectionId(collectionId string) ([]UploadedVersion, error) { + var foundVersions []UploadedVersion + + err := vr.db.Model(&UploadedVersion{}).Where("uploaded_versions.collection_id = ?", collectionId).Order("uploaded_versions.version_number DESC").Find(&foundVersions).Error + if err != nil { + return []UploadedVersion{}, err + } + return foundVersions, nil +} + func InitializeModel(db *gorm.DB) error { return db.AutoMigrate(&UploadedVersion{}) } diff --git a/server-go/storage/rotation.go b/server-go/storage/rotation.go new file mode 100644 index 00000000..1f878503 --- /dev/null +++ b/server-go/storage/rotation.go @@ -0,0 +1,51 @@ +package storage + +import ( + "github.com/riotkit-org/backup-repository/collections" + "sort" +) + +type RotationStrategy interface { + // CanUpload returns nil if YES, error if NO + CanUpload(version UploadedVersion) error + + // GetVersionsThatShouldBeDeletedIfThisVersionUploaded lists all the versions that should be deleted if a new version would be submitted + GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version UploadedVersion) []UploadedVersion +} + +// +// FifoRotationStrategy implements a simple queue, first is appended, oldest will be deleted +// +type FifoRotationStrategy struct { + collection *collections.Collection + existingVersions []UploadedVersion +} + +func (frs *FifoRotationStrategy) CanUpload(version UploadedVersion) error { + return nil +} + +func (frs *FifoRotationStrategy) GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version UploadedVersion) []UploadedVersion { + existingVersions := frs.existingVersions + + // nothing to do, there is still enough slots + if len(existingVersions) < frs.collection.Spec.MaxBackupsCount { + return []UploadedVersion{} + } + + // order by version number DESCENDING + sort.SliceStable(&existingVersions, func(i, j int) bool { + return existingVersions[i].VersionNumber > existingVersions[j].VersionNumber + }) + + // oldest element + return existingVersions[0:1] +} + +// todo: test reference +func NewFifoRotationStrategy(collection *collections.Collection, existingVersions []UploadedVersion) *FifoRotationStrategy { + return &FifoRotationStrategy{ + collection: collection, + existingVersions: existingVersions, + } +} diff --git a/server-go/storage/rotation_test.go b/server-go/storage/rotation_test.go new file mode 100644 index 00000000..82be0547 --- /dev/null +++ b/server-go/storage/rotation_test.go @@ -0,0 +1 @@ +package storage diff --git a/server-go/storage/service.go b/server-go/storage/service.go index 5b4e9f35..da1366af 100644 --- a/server-go/storage/service.go +++ b/server-go/storage/service.go @@ -49,6 +49,19 @@ func (s *Service) CreateNewVersionFromCollection(c *collections.Collection, uplo }, nil } +func (s *Service) CreateRotationStrategyCase(collection *collections.Collection) (RotationStrategy, error) { + foundVersions, err := s.repository.findAllVersionsForCollectionId(collection.Metadata.Name) + if err != nil { + return &FifoRotationStrategy{}, errors.New(fmt.Sprintf("cannot construct rotation strategy, cannot findAllVersionsForCollectionId, error: %v", err)) + } + + if collection.Spec.StrategyName == "fifo" { + return NewFifoRotationStrategy(collection, foundVersions), nil + } + + return &FifoRotationStrategy{}, errors.New(fmt.Sprintf("collection configuration error: unrecognized backup strategy type '%v'", collection.Spec.StrategyName)) +} + // NewService is a factory method that knows how to construct a Storage provider, distincting multiple types of providers func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error) { repository := VersionsRepository{db: db} @@ -74,6 +87,7 @@ func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error) return Service{storage: driver, repository: &repository}, nil } + // AWS S3, Min.io, CEPH and others compatible with S3 protocol driver, err := blob.OpenBucket(context.Background(), driverUrl) if err != nil { logrus.Errorf("Cannot construct storage driver: %v", err) diff --git a/server-go/storage/upload.go b/server-go/storage/upload.go index 278a7c7e..4d010646 100644 --- a/server-go/storage/upload.go +++ b/server-go/storage/upload.go @@ -20,7 +20,6 @@ func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion } middlewares := nestedStreamMiddlewares{ - // todo: add a middleware to abort file upload if filesize reached the limit s.createNonEmptyMiddleware(), s.createGPGStreamMiddleware(), }