From cfadd8372fe9830e30feedbaeb1651450cb86f02 Mon Sep 17 00:00:00 2001 From: blackandred Date: Sun, 20 Feb 2022 21:46:06 +0100 Subject: [PATCH] feat: #164 implemented support for filesize/quota validation with support for "extra space" --- server-go/Makefile | 12 ++++ server-go/collections/entity.go | 36 ++++++++++ server-go/crd/Collection.yaml | 4 +- server-go/docs/examples/collection.yaml | 4 +- server-go/go.mod | 5 +- server-go/http/collection.go | 32 ++++++--- server-go/storage/entity.go | 2 +- server-go/storage/service.go | 94 ++++++++++++++++++++++++- server-go/storage/upload.go | 15 ++-- server-go/storage/validation.go | 31 +++++--- 10 files changed, 199 insertions(+), 36 deletions(-) diff --git a/server-go/Makefile b/server-go/Makefile index 9aec1966..5290588e 100644 --- a/server-go/Makefile +++ b/server-go/Makefile @@ -29,6 +29,18 @@ test_list_auths_other_user: test_upload_by_form: curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@./storage/.test_data/test.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' +test_upload_by_form_1mb: + @echo "-----BEGIN PGP MESSAGE-----" > /tmp/1mb.gpg + @openssl rand -base64 $$((735*1024*1)) >> /tmp/1mb.gpg + @echo "-----END PGP MESSAGE-----" >> /tmp/1mb.gpg + curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/1mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' + +test_upload_by_form_5mb: + @echo "-----BEGIN PGP MESSAGE-----" > /tmp/5mb.gpg + @openssl rand -base64 $$((735*1024*5)) >> /tmp/5mb.gpg + @echo "-----END PGP MESSAGE-----" >> /tmp/5mb.gpg + curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/5mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' + postgres: docker run -d \ --name br_postgres \ diff --git a/server-go/collections/entity.go b/server-go/collections/entity.go index 94991733..e462cb81 100644 --- a/server-go/collections/entity.go +++ b/server-go/collections/entity.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/labstack/gommon/bytes" "github.com/riotkit-org/backup-repository/config" "github.com/riotkit-org/backup-repository/security" "github.com/riotkit-org/backup-repository/users" @@ -110,3 +111,38 @@ func (c Collection) CanUploadToMe(user *users.User) bool { func (c *Collection) GenerateNextVersionFilename(version int) string { return strings.Replace(c.Spec.FilenameTemplate, "${version}", fmt.Sprintf("%v", version), 1) } + +// getEstimatedDiskSpaceForFullCollectionInBytes returns a calculation how many disk space would be required to store all versions (excluding extra disk space) +// in ideal case it would be: MaxBackupsCount * MaxOneVersionSize +func (c *Collection) getEstimatedDiskSpaceForFullCollectionInBytes() (int64, error) { + maxVersionSizeInBytes, err := c.GetMaxOneVersionSizeInBytes() + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate estimated collection size: %v", err)) + } + return int64(c.Spec.MaxBackupsCount) * maxVersionSizeInBytes, nil +} + +func (c *Collection) GetMaxOneVersionSizeInBytes() (int64, error) { + return bytes.Parse(c.Spec.MaxOneVersionSize) +} + +// GetEstimatedCollectionExtraSpace returns total space that can be extra allocated in case, when a single version exceeds its limit. Returned value is estimated, does not include real state. +func (c *Collection) GetEstimatedCollectionExtraSpace() (int64, error) { + estimatedStandardCollectionSize, err := c.getEstimatedDiskSpaceForFullCollectionInBytes() + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate GetEstimatedCollectionExtraSpace(): %v", err)) + } + maxCollectionSizeInBytes, err := c.getMaxCollectionSizeInBytes() + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate GetEstimatedCollectionExtraSpace(): %v", err)) + } + return maxCollectionSizeInBytes - estimatedStandardCollectionSize, nil +} + +func (c *Collection) getMaxCollectionSizeInBytes() (int64, error) { + return bytes.Parse(c.Spec.MaxCollectionSize) +} + +func (c *Collection) GetId() string { + return c.Metadata.Name +} diff --git a/server-go/crd/Collection.yaml b/server-go/crd/Collection.yaml index 27459a57..f771a9fa 100644 --- a/server-go/crd/Collection.yaml +++ b/server-go/crd/Collection.yaml @@ -36,10 +36,10 @@ spec: type: integer maxOneVersionSize: type: string - pattern: '([0-9]+)(K|G|B|T)' + pattern: '([0-9]+)(B|K|M|G|T)' maxCollectionSize: type: string - pattern: '([0-9]+)(K|G|B|T)' + pattern: '([0-9]+)(B|K|M|G|T)' window: type: array items: diff --git a/server-go/docs/examples/collection.yaml b/server-go/docs/examples/collection.yaml index 42a05e5e..16bb9be1 100644 --- a/server-go/docs/examples/collection.yaml +++ b/server-go/docs/examples/collection.yaml @@ -7,8 +7,8 @@ spec: description: IWA-AIT website files filenameTemplate: iwa-ait-${version}.tar.gz maxBackupsCount: 5 - maxOneVersionSize: 10G - maxCollectionSize: 55G + maxOneVersionSize: 1M + maxCollectionSize: 10M # optional window: diff --git a/server-go/go.mod b/server-go/go.mod index a2fd2bf7..04b52ac9 100644 --- a/server-go/go.mod +++ b/server-go/go.mod @@ -65,8 +65,9 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/juju/ratelimit v1.0.1 // indirect + github.com/labstack/gommon v0.3.1 // indirect github.com/leodido/go-urn v1.2.0 // indirect - github.com/mattn/go-isatty v0.0.12 // indirect + github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-sqlite3 v1.14.9 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -79,7 +80,7 @@ require ( go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect - golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8 // indirect + golang.org/x/sys v0.0.0-20211103235746-7861aae1554b // indirect golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect diff --git a/server-go/http/collection.go b/server-go/http/collection.go index d2228c42..8aef41b7 100644 --- a/server-go/http/collection.go +++ b/server-go/http/collection.go @@ -14,7 +14,6 @@ import ( func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { r.POST("/repository/collection/:collectionId/version", func(c *gin.Context) { // todo: deactivate token if temporary token is used - // todo: check uploaded file size, respect quotas and additional space // todo: handle upload interruptions // todo: locking support! There should be no concurrent uploads to the same collection @@ -27,12 +26,12 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { return } - // Check permissions + // [SECURITY] Check permissions if !collection.CanUploadToMe(ctxUser) { UnauthorizedResponse(c, errors.New("not authorized to upload versions to this collection")) } - // Backup Windows support + // [SECURITY] Backup Windows support if !ctx.Collections.ValidateIsBackupWindowAllowingToUpload(collection, time.Now()) && !ctxUser.Spec.Roles.HasRole(security.RoleUploadsAnytime) { @@ -41,7 +40,7 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { return } - // Increment a version, generate target file path name that will be used on storage + // [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage sessionId := GetCurrentSessionId(c) version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0) if factoryError != nil { @@ -49,7 +48,7 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { return } - // Check rotation strategy: Is it allowed to upload? Is there enough space? + // [ROTATION STRATEGY] Is it allowed to upload? Is there enough space? rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection) if strategyFactorialError != nil { logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError)) @@ -63,7 +62,7 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { var stream io.ReadCloser - // Support form data + // [HTTP] Support form data if c.ContentType() == "application/x-www-form-urlencoded" || c.ContentType() == "multipart/form-data" { var openErr error fh, ffErr := c.FormFile("file") @@ -75,14 +74,23 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { if openErr != nil { ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot open file from multipart/urlencoded form: %v", openErr))) } + defer stream.Close() } else { - // Support RAW sent data via body + // [HTTP] Support RAW sent data via body stream = c.Request.Body } - // Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully - wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version) + // [VALIDATION] Middlewares + versionsToDelete := rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version) + middlewares, err := ctx.Storage.CreateStandardMiddleWares(versionsToDelete, collection) + if err != nil { + ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot construct validators %v", err))) + return + } + + // [HTTP] Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully + wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version, &middlewares) if uploadError != nil { _ = ctx.Storage.Delete(&version) @@ -97,7 +105,7 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { if err := ctx.Storage.RegisterVersion(&version); err != nil { _ = ctx.Storage.Delete(&version) } - ctx.Storage.CleanUpOlderVersions(rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version)) + ctx.Storage.CleanUpOlderVersions(versionsToDelete) logrus.Infof("Uploaded v%v for collectionId=%v, size=%v", version.VersionNumber, version.CollectionId, version.Filesize) OKResponse(c, gin.H{ @@ -105,3 +113,7 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { }) }) } + +// todo: healthcheck route +// to detect if anything was uploaded in previous Backup Window +// to detect if any version is bigger than expected diff --git a/server-go/storage/entity.go b/server-go/storage/entity.go index 57649f3b..a969ce90 100644 --- a/server-go/storage/entity.go +++ b/server-go/storage/entity.go @@ -10,7 +10,7 @@ type UploadedVersion struct { CollectionId string `json:"collectionId"` VersionNumber int `json:"versionNumber"` Filename string `json:"filename"` // full filename e.g. iwa-ait-v1-db.tar.gz - Filesize int `json:"filesize"` // in bytes + Filesize int64 `json:"filesize"` // in bytes // auditing UploadedBySessionId string `json:"uploadedBySessionId"` diff --git a/server-go/storage/service.go b/server-go/storage/service.go index b3defd78..566edfd8 100644 --- a/server-go/storage/service.go +++ b/server-go/storage/service.go @@ -30,7 +30,7 @@ func (s *Service) FindNextVersionForCollectionId(name string) (int, error) { return lastHigherVersion + 1, nil } -func (s *Service) CreateNewVersionFromCollection(c *collections.Collection, uploader string, uploaderSessionId string, filesize int) (UploadedVersion, error) { +func (s *Service) CreateNewVersionFromCollection(c *collections.Collection, uploader string, uploaderSessionId string, filesize int64) (UploadedVersion, error) { nextVersion, err := s.FindNextVersionForCollectionId(c.Metadata.Name) if err != nil { return UploadedVersion{}, err @@ -92,6 +92,89 @@ func (s *Service) RegisterVersion(version *UploadedVersion) error { return s.repository.create(version) } +func (s *Service) CalculateMaximumAllowedUploadFilesize(collection *collections.Collection, excluding []UploadedVersion) (int64, error) { + maxExtraSpace, err := collection.GetEstimatedCollectionExtraSpace() + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err)) + } + maxOneVersionSize, err := collection.GetMaxOneVersionSizeInBytes() + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err)) + } + + // collection does not have extra space + if maxExtraSpace == 0 { + logrus.Debugf("Collection id=%v does not have extra space", collection.GetId()) + return maxOneVersionSize, nil + } + + currentVersionsInCollection, err := s.repository.findAllVersionsForCollectionId(collection.GetId()) + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err)) + } + + // collection has additional extra space to use + usedExtraSpace, err := s.CalculateAllocatedSpaceAboveSingleVersionLimit( + collection, + currentVersionsInCollection, + excluding, + ) + logrus.Debugf("Remaining extra space in collection (id=%v) is %vb", collection.GetId(), usedExtraSpace) + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate maximum allowed filesize for upload: %v", err)) + } + remainedExtraSpace := maxExtraSpace - usedExtraSpace + + if remainedExtraSpace < 0 { + return 0, errors.New(fmt.Sprintf("weird thing happened, maxExtraSpace-usedExtraSpace gave minus result. Corrupted collection")) + } + + return remainedExtraSpace + maxOneVersionSize, nil +} + +func (s *Service) CalculateAllocatedSpaceAboveSingleVersionLimit(collection *collections.Collection, existing []UploadedVersion, excluding []UploadedVersion) (int64, error) { + var ids []string + var allocatedSpaceAboveLimit int64 + maxOneVersionSize, err := collection.GetMaxOneVersionSizeInBytes() + if err != nil { + return 0, errors.New(fmt.Sprintf("cannot calculate allocated space above single version limit in collection, error: %v", err)) + } + + for _, version := range excluding { + ids = append(ids, version.Id) + } + + for _, version := range existing { + // optionally exclude selected versions + if contains(ids, version.Id) { + logrus.Debugf("Excluding version id=%v", version.Id) + continue + } + if version.Filesize > maxOneVersionSize { + diff := version.Filesize - maxOneVersionSize + logrus.Debugf("Version id=%v is exceeding its limit by %v", version.Id, diff) + allocatedSpaceAboveLimit += diff + } + } + + return allocatedSpaceAboveLimit, nil +} + +func (s *Service) CreateStandardMiddleWares(versionsToDelete []UploadedVersion, collection *collections.Collection) (NestedStreamMiddlewares, error) { + maxAllowedFilesize, err := s.CalculateMaximumAllowedUploadFilesize(collection, versionsToDelete) + logrus.Debugf("CalculateMaximumAllowedUploadFilesize(%v) = %v", collection.GetId(), maxAllowedFilesize) + + if err != nil { + return NestedStreamMiddlewares{}, errors.New(fmt.Sprintf("cannot construct standard middlewares, error: %v", err)) + } + + return NestedStreamMiddlewares{ + s.createQuotaMaxFileSizeMiddleware(maxAllowedFilesize), + s.createNonEmptyMiddleware(), + s.createGPGStreamMiddleware(), + }, nil +} + // NewService is a factory method that knows how to construct a Storage provider, distincting multiple types of providers func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error) { repository := VersionsRepository{db: db} @@ -132,3 +215,12 @@ func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error) return Service{storage: driver, repository: &repository}, nil } + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} diff --git a/server-go/storage/upload.go b/server-go/storage/upload.go index 4d010646..0b297960 100644 --- a/server-go/storage/upload.go +++ b/server-go/storage/upload.go @@ -9,7 +9,7 @@ import ( "io" ) -func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion) (int, error) { +func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion, middlewares *NestedStreamMiddlewares) (int64, error) { writeStream, err := s.storage.NewWriter(context.Background(), version.GetTargetPath(), &blob.WriterOptions{}) defer func() { writeStream.Close() @@ -19,11 +19,6 @@ func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion return 0, errors.New(fmt.Sprintf("cannot upload file, attempted to open a writable stream, error: %v", err)) } - middlewares := nestedStreamMiddlewares{ - s.createNonEmptyMiddleware(), - s.createGPGStreamMiddleware(), - } - wroteLen, writeErr := s.CopyStream(inputStream, writeStream, 1024*1024, middlewares) if writeErr != nil { return wroteLen, errors.New(fmt.Sprintf("cannot upload file, cannot copy stream, error: %v", writeErr)) @@ -47,10 +42,10 @@ func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion } // CopyStream copies a readable stream to writable stream, while providing a possibility to use a validation callbacks on-the-fly -func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteCloser, bufferLen int, middlewares nestedStreamMiddlewares) (int, error) { +func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteCloser, bufferLen int, middlewares *NestedStreamMiddlewares) (int64, error) { buff := make([]byte, bufferLen) previousBuff := make([]byte, bufferLen) - totalLength := 0 + var totalLength int64 chunkNum := 0 for { @@ -59,7 +54,7 @@ func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteClos if err != nil { if err == io.EOF { - totalLength += len(buff[:n]) + totalLength += int64(len(buff[:n])) // validation callbacks if err := middlewares.processChunk(buff[:n], totalLength, previousBuff, chunkNum); err != nil { @@ -77,7 +72,7 @@ func (s *Service) CopyStream(inputStream io.ReadCloser, writeStream io.WriteClos return totalLength, errors.New(fmt.Sprintf("cannot copy stream, error: %v", err)) } - totalLength += len(buff[:n]) + totalLength += int64(len(buff[:n])) previousBuff = buff[:n] // validation callbacks diff --git a/server-go/storage/validation.go b/server-go/storage/validation.go index f4434902..78277193 100644 --- a/server-go/storage/validation.go +++ b/server-go/storage/validation.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "errors" + "fmt" ) // @@ -12,11 +13,11 @@ import ( type streamMiddleware struct { // []byte - current buffer value - // int - total read size till this moment + // int64 - total read size till this moment // []byte - if current buffer is an END OF STREAM, then this parameter will contain previous hunk, // so you can join previous+last to have full information in case, when last hunk would be too small // int - processed chunk number - processor func([]byte, int, []byte, int) error + processor func([]byte, int64, []byte, int) error resultReporter func() error } @@ -24,9 +25,9 @@ type streamMiddleware struct { // Aggregation of middlewares // -type nestedStreamMiddlewares []streamMiddleware +type NestedStreamMiddlewares []streamMiddleware -func (nv nestedStreamMiddlewares) processChunk(chunk []byte, processedTotalBytes int, previousHunkBeforeEof []byte, chunkNum int) error { +func (nv NestedStreamMiddlewares) processChunk(chunk []byte, processedTotalBytes int64, previousHunkBeforeEof []byte, chunkNum int) error { for _, processor := range nv { if processingError := processor.processor(chunk, processedTotalBytes, previousHunkBeforeEof, chunkNum); processingError != nil { return processingError @@ -35,7 +36,7 @@ func (nv nestedStreamMiddlewares) processChunk(chunk []byte, processedTotalBytes return nil } -func (nv nestedStreamMiddlewares) checkFinalStatusAfterFilesWasUploaded() error { +func (nv NestedStreamMiddlewares) checkFinalStatusAfterFilesWasUploaded() error { for _, processor := range nv { if processingError := processor.resultReporter(); processingError != nil { return processingError @@ -50,7 +51,7 @@ func (nv nestedStreamMiddlewares) checkFinalStatusAfterFilesWasUploaded() error // createGPGStreamMiddleware Checks if stream is a valid GPG encrypted file by checking GPG header and footer func (s *Service) createGPGStreamMiddleware() streamMiddleware { - validator := func(buff []byte, totalLength int, previousHunkBeforeEof []byte, chunkNum int) error { + validator := func(buff []byte, totalLength int64, previousHunkBeforeEof []byte, chunkNum int) error { if chunkNum == 1 && !bytes.Contains(buff, []byte("-----BEGIN PGP MESSAGE")) { return errors.New("first chunk of uploaded data does not contain a valid GPG header") } @@ -74,9 +75,9 @@ func (s *Service) createGPGStreamMiddleware() streamMiddleware { // createNonEmptyMiddleware Checks if anything was sent at all func (s *Service) createNonEmptyMiddleware() streamMiddleware { - recordedTotalLength := 0 + var recordedTotalLength int64 - validator := func(buff []byte, totalLength int, previousHunkBeforeEof []byte, chunkNum int) error { + validator := func(buff []byte, totalLength int64, previousHunkBeforeEof []byte, chunkNum int) error { recordedTotalLength = totalLength return nil } @@ -91,3 +92,17 @@ func (s *Service) createNonEmptyMiddleware() streamMiddleware { return streamMiddleware{processor: validator, resultReporter: resultReporter} } + +// createQuotaMaxFileSizeMiddleware Takes care about the maximum allowed filesize limit +func (s *Service) createQuotaMaxFileSizeMiddleware(maxFileSize int64) streamMiddleware { + validator := func(buff []byte, totalLength int64, previousHunkBeforeEof []byte, chunkNum int) error { + if totalLength > maxFileSize { + return errors.New(fmt.Sprintf("filesize reached allowed limit. Uploaded %vbytes, allowed to upload only %v bytes", totalLength, maxFileSize)) + } + return nil + } + + return streamMiddleware{processor: validator, resultReporter: func() error { + return nil + }} +}