Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
feat: #164 implemented support for filesize/quota validation with sup…
Browse files Browse the repository at this point in the history
…port for "extra space"
  • Loading branch information
blackandred committed Feb 20, 2022
1 parent 3bfc290 commit cfadd83
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 36 deletions.
12 changes: 12 additions & 0 deletions server-go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ test_list_auths_other_user:
test_upload_by_form:
curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@./storage/.test_data/test.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version'

test_upload_by_form_1mb:
@echo "-----BEGIN PGP MESSAGE-----" > /tmp/1mb.gpg
@openssl rand -base64 $$((735*1024*1)) >> /tmp/1mb.gpg
@echo "-----END PGP MESSAGE-----" >> /tmp/1mb.gpg
curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/1mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version'

test_upload_by_form_5mb:
@echo "-----BEGIN PGP MESSAGE-----" > /tmp/5mb.gpg
@openssl rand -base64 $$((735*1024*5)) >> /tmp/5mb.gpg
@echo "-----END PGP MESSAGE-----" >> /tmp/5mb.gpg
curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/5mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version'

postgres:
docker run -d \
--name br_postgres \
Expand Down
36 changes: 36 additions & 0 deletions server-go/collections/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/labstack/gommon/bytes"
"github.com/riotkit-org/backup-repository/config"
"github.com/riotkit-org/backup-repository/security"
"github.com/riotkit-org/backup-repository/users"
Expand Down Expand Up @@ -110,3 +111,38 @@ func (c Collection) CanUploadToMe(user *users.User) bool {
func (c *Collection) GenerateNextVersionFilename(version int) string {
return strings.Replace(c.Spec.FilenameTemplate, "${version}", fmt.Sprintf("%v", version), 1)
}

// getEstimatedDiskSpaceForFullCollectionInBytes returns a calculation how many disk space would be required to store all versions (excluding extra disk space)
// in ideal case it would be: MaxBackupsCount * MaxOneVersionSize
func (c *Collection) getEstimatedDiskSpaceForFullCollectionInBytes() (int64, error) {
maxVersionSizeInBytes, err := c.GetMaxOneVersionSizeInBytes()
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate estimated collection size: %v", err))
}
return int64(c.Spec.MaxBackupsCount) * maxVersionSizeInBytes, nil
}

func (c *Collection) GetMaxOneVersionSizeInBytes() (int64, error) {
return bytes.Parse(c.Spec.MaxOneVersionSize)
}

// GetEstimatedCollectionExtraSpace returns total space that can be extra allocated in case, when a single version exceeds its limit. Returned value is estimated, does not include real state.
func (c *Collection) GetEstimatedCollectionExtraSpace() (int64, error) {
estimatedStandardCollectionSize, err := c.getEstimatedDiskSpaceForFullCollectionInBytes()
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate GetEstimatedCollectionExtraSpace(): %v", err))
}
maxCollectionSizeInBytes, err := c.getMaxCollectionSizeInBytes()
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate GetEstimatedCollectionExtraSpace(): %v", err))
}
return maxCollectionSizeInBytes - estimatedStandardCollectionSize, nil
}

func (c *Collection) getMaxCollectionSizeInBytes() (int64, error) {
return bytes.Parse(c.Spec.MaxCollectionSize)
}

func (c *Collection) GetId() string {
return c.Metadata.Name
}
4 changes: 2 additions & 2 deletions server-go/crd/Collection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ spec:
type: integer
maxOneVersionSize:
type: string
pattern: '([0-9]+)(K|G|B|T)'
pattern: '([0-9]+)(B|K|M|G|T)'
maxCollectionSize:
type: string
pattern: '([0-9]+)(K|G|B|T)'
pattern: '([0-9]+)(B|K|M|G|T)'
window:
type: array
items:
Expand Down
4 changes: 2 additions & 2 deletions server-go/docs/examples/collection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ spec:
description: IWA-AIT website files
filenameTemplate: iwa-ait-${version}.tar.gz
maxBackupsCount: 5
maxOneVersionSize: 10G
maxCollectionSize: 55G
maxOneVersionSize: 1M
maxCollectionSize: 10M

# optional
window:
Expand Down
5 changes: 3 additions & 2 deletions server-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/ratelimit v1.0.1 // indirect
github.com/labstack/gommon v0.3.1 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-sqlite3 v1.14.9 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -79,7 +80,7 @@ require (
go.opencensus.io v0.23.0 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8 // indirect
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b // indirect
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
Expand Down
32 changes: 22 additions & 10 deletions server-go/http/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
r.POST("/repository/collection/:collectionId/version", func(c *gin.Context) {
// todo: deactivate token if temporary token is used
// todo: check uploaded file size, respect quotas and additional space
// todo: handle upload interruptions
// todo: locking support! There should be no concurrent uploads to the same collection

Expand All @@ -27,12 +26,12 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
return
}

// Check permissions
// [SECURITY] Check permissions
if !collection.CanUploadToMe(ctxUser) {
UnauthorizedResponse(c, errors.New("not authorized to upload versions to this collection"))
}

// Backup Windows support
// [SECURITY] Backup Windows support
if !ctx.Collections.ValidateIsBackupWindowAllowingToUpload(collection, time.Now()) &&
!ctxUser.Spec.Roles.HasRole(security.RoleUploadsAnytime) {

Expand All @@ -41,15 +40,15 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
return
}

// Increment a version, generate target file path name that will be used on storage
// [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage
sessionId := GetCurrentSessionId(c)
version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0)
if factoryError != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot increment version. %v", factoryError)))
return
}

// Check rotation strategy: Is it allowed to upload? Is there enough space?
// [ROTATION STRATEGY] Is it allowed to upload? Is there enough space?
rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection)
if strategyFactorialError != nil {
logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError))
Expand All @@ -63,7 +62,7 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {

var stream io.ReadCloser

// Support form data
// [HTTP] Support form data
if c.ContentType() == "application/x-www-form-urlencoded" || c.ContentType() == "multipart/form-data" {
var openErr error
fh, ffErr := c.FormFile("file")
Expand All @@ -75,14 +74,23 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
if openErr != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot open file from multipart/urlencoded form: %v", openErr)))
}
defer stream.Close()

} else {
// Support RAW sent data via body
// [HTTP] Support RAW sent data via body
stream = c.Request.Body
}

// Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully
wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version)
// [VALIDATION] Middlewares
versionsToDelete := rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version)
middlewares, err := ctx.Storage.CreateStandardMiddleWares(versionsToDelete, collection)
if err != nil {
ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot construct validators %v", err)))
return
}

// [HTTP] Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully
wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version, &middlewares)
if uploadError != nil {
_ = ctx.Storage.Delete(&version)

Expand All @@ -97,11 +105,15 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) {
if err := ctx.Storage.RegisterVersion(&version); err != nil {
_ = ctx.Storage.Delete(&version)
}
ctx.Storage.CleanUpOlderVersions(rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version))
ctx.Storage.CleanUpOlderVersions(versionsToDelete)
logrus.Infof("Uploaded v%v for collectionId=%v, size=%v", version.VersionNumber, version.CollectionId, version.Filesize)

OKResponse(c, gin.H{
"version": version,
})
})
}

// todo: healthcheck route
// to detect if anything was uploaded in previous Backup Window
// to detect if any version is bigger than expected
2 changes: 1 addition & 1 deletion server-go/storage/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type UploadedVersion struct {
CollectionId string `json:"collectionId"`
VersionNumber int `json:"versionNumber"`
Filename string `json:"filename"` // full filename e.g. iwa-ait-v1-db.tar.gz
Filesize int `json:"filesize"` // in bytes
Filesize int64 `json:"filesize"` // in bytes

// auditing
UploadedBySessionId string `json:"uploadedBySessionId"`
Expand Down
94 changes: 93 additions & 1 deletion server-go/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *Service) FindNextVersionForCollectionId(name string) (int, error) {
return lastHigherVersion + 1, nil
}

func (s *Service) CreateNewVersionFromCollection(c *collections.Collection, uploader string, uploaderSessionId string, filesize int) (UploadedVersion, error) {
func (s *Service) CreateNewVersionFromCollection(c *collections.Collection, uploader string, uploaderSessionId string, filesize int64) (UploadedVersion, error) {
nextVersion, err := s.FindNextVersionForCollectionId(c.Metadata.Name)
if err != nil {
return UploadedVersion{}, err
Expand Down Expand Up @@ -92,6 +92,89 @@ func (s *Service) RegisterVersion(version *UploadedVersion) error {
return s.repository.create(version)
}

func (s *Service) CalculateMaximumAllowedUploadFilesize(collection *collections.Collection, excluding []UploadedVersion) (int64, error) {
maxExtraSpace, err := collection.GetEstimatedCollectionExtraSpace()
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err))
}
maxOneVersionSize, err := collection.GetMaxOneVersionSizeInBytes()
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err))
}

// collection does not have extra space
if maxExtraSpace == 0 {
logrus.Debugf("Collection id=%v does not have extra space", collection.GetId())
return maxOneVersionSize, nil
}

currentVersionsInCollection, err := s.repository.findAllVersionsForCollectionId(collection.GetId())
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err))
}

// collection has additional extra space to use
usedExtraSpace, err := s.CalculateAllocatedSpaceAboveSingleVersionLimit(
collection,
currentVersionsInCollection,
excluding,
)
logrus.Debugf("Remaining extra space in collection (id=%v) is %vb", collection.GetId(), usedExtraSpace)
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err))
}
remainedExtraSpace := maxExtraSpace - usedExtraSpace

if remainedExtraSpace < 0 {
return 0, errors.New(fmt.Sprintf("weird thing happened, maxExtraSpace-usedExtraSpace gave minus result. Corrupted collection"))
}

return remainedExtraSpace + maxOneVersionSize, nil
}

func (s *Service) CalculateAllocatedSpaceAboveSingleVersionLimit(collection *collections.Collection, existing []UploadedVersion, excluding []UploadedVersion) (int64, error) {
var ids []string
var allocatedSpaceAboveLimit int64
maxOneVersionSize, err := collection.GetMaxOneVersionSizeInBytes()
if err != nil {
return 0, errors.New(fmt.Sprintf("cannot calculate allocated space above single version limit in collection, error: %v", err))
}

for _, version := range excluding {
ids = append(ids, version.Id)
}

for _, version := range existing {
// optionally exclude selected versions
if contains(ids, version.Id) {
logrus.Debugf("Excluding version id=%v", version.Id)
continue
}
if version.Filesize > maxOneVersionSize {
diff := version.Filesize - maxOneVersionSize
logrus.Debugf("Version id=%v is exceeding its limit by %v", version.Id, diff)
allocatedSpaceAboveLimit += diff
}
}

return allocatedSpaceAboveLimit, nil
}

func (s *Service) CreateStandardMiddleWares(versionsToDelete []UploadedVersion, collection *collections.Collection) (NestedStreamMiddlewares, error) {
maxAllowedFilesize, err := s.CalculateMaximumAllowedUploadFilesize(collection, versionsToDelete)
logrus.Debugf("CalculateMaximumAllowedUploadFilesize(%v) = %v", collection.GetId(), maxAllowedFilesize)

if err != nil {
return NestedStreamMiddlewares{}, errors.New(fmt.Sprintf("cannot construct standard middlewares, error: %v", err))
}

return NestedStreamMiddlewares{
s.createQuotaMaxFileSizeMiddleware(maxAllowedFilesize),
s.createNonEmptyMiddleware(),
s.createGPGStreamMiddleware(),
}, nil
}

// NewService is a factory method that knows how to construct a Storage provider, distincting multiple types of providers
func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error) {
repository := VersionsRepository{db: db}
Expand Down Expand Up @@ -132,3 +215,12 @@ func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error)

return Service{storage: driver, repository: &repository}, nil
}

func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
15 changes: 5 additions & 10 deletions server-go/storage/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"io"
)

func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion) (int, error) {
func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion, middlewares *NestedStreamMiddlewares) (int64, error) {
writeStream, err := s.storage.NewWriter(context.Background(), version.GetTargetPath(), &blob.WriterOptions{})
defer func() {
writeStream.Close()
Expand All @@ -19,11 +19,6 @@ func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion
return 0, errors.New(fmt.Sprintf("cannot upload file, attempted to open a writable stream, error: %v", err))
}

middlewares := nestedStreamMiddlewares{
s.createNonEmptyMiddleware(),
s.createGPGStreamMiddleware(),
}

wroteLen, writeErr := s.CopyStream(inputStream, writeStream, 1024*1024, middlewares)
if writeErr != nil {
return wroteLen, errors.New(fmt.Sprintf("cannot upload file, cannot copy stream, error: %v", writeErr))
Expand All @@ -47,10 +42,10 @@ func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion
}

// CopyStream copies a readable stream to writable stream, while providing a possibility to use a validation callbacks on-the-fly
func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteCloser, bufferLen int, middlewares nestedStreamMiddlewares) (int, error) {
func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteCloser, bufferLen int, middlewares *NestedStreamMiddlewares) (int64, error) {
buff := make([]byte, bufferLen)
previousBuff := make([]byte, bufferLen)
totalLength := 0
var totalLength int64
chunkNum := 0

for {
Expand All @@ -59,7 +54,7 @@ func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteClos

if err != nil {
if err == io.EOF {
totalLength += len(buff[:n])
totalLength += int64(len(buff[:n]))

// validation callbacks
if err := middlewares.processChunk(buff[:n], totalLength, previousBuff, chunkNum); err != nil {
Expand All @@ -77,7 +72,7 @@ func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteClos
return totalLength, errors.New(fmt.Sprintf("cannot copy stream, error: %v", err))
}

totalLength += len(buff[:n])
totalLength += int64(len(buff[:n]))
previousBuff = buff[:n]

// validation callbacks
Expand Down
Loading

0 comments on commit cfadd83

Please sign in to comment.