From c460f354746a9de13b15fd52cec4046a7ff16381 Mon Sep 17 00:00:00 2001 From: blackandred Date: Mon, 21 Feb 2022 08:46:48 +0100 Subject: [PATCH] docs: #164 add support for request cancellation and for fixed request timeout --- server-go/go.mod | 1 + server-go/http/collection.go | 202 ++++++++++++++++---------------- server-go/http/main.go | 2 +- server-go/http/responses.go | 13 +- server-go/storage/service.go | 3 +- server-go/storage/validation.go | 18 +++ 6 files changed, 138 insertions(+), 101 deletions(-) diff --git a/server-go/go.mod b/server-go/go.mod index 04b52ac9..19a1f96c 100644 --- a/server-go/go.mod +++ b/server-go/go.mod @@ -38,6 +38,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/gin-contrib/timeout v0.0.3 // indirect github.com/go-logr/logr v1.2.0 // indirect github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect diff --git a/server-go/http/collection.go b/server-go/http/collection.go index 8aef41b7..3cddc4a9 100644 --- a/server-go/http/collection.go +++ b/server-go/http/collection.go @@ -3,6 +3,7 @@ package http import ( "errors" "fmt" + "github.com/gin-contrib/timeout" "github.com/gin-gonic/gin" "github.com/riotkit-org/backup-repository/core" "github.com/riotkit-org/backup-repository/security" @@ -11,107 +12,112 @@ import ( "time" ) -func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer) { - r.POST("/repository/collection/:collectionId/version", func(c *gin.Context) { - // todo: deactivate token if temporary token is used - // todo: handle upload interruptions - // todo: locking support! There should be no concurrent uploads to the same collection - - ctxUser, _ := GetContextUser(ctx, c) - - // Check if Collection exists - collection, findError := ctx.Collections.GetCollectionById(c.Param("collectionId")) - if findError != nil { - NotFoundResponse(c, errors.New("cannot find specified collection")) - return - } - - // [SECURITY] Check permissions - if !collection.CanUploadToMe(ctxUser) { - UnauthorizedResponse(c, errors.New("not authorized to upload versions to this collection")) - } - - // [SECURITY] Backup Windows support - if !ctx.Collections.ValidateIsBackupWindowAllowingToUpload(collection, time.Now()) && - !ctxUser.Spec.Roles.HasRole(security.RoleUploadsAnytime) { - - UnauthorizedResponse(c, errors.New("backup window does not allow you to send a backup at this time. "+ - "You need a token from a user that has a special permission 'uploadsAnytime'")) - return - } - - // [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage - sessionId := GetCurrentSessionId(c) - version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0) - if factoryError != nil { - ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot increment version. %v", factoryError))) - return - } - - // [ROTATION STRATEGY] Is it allowed to upload? Is there enough space? - rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection) - if strategyFactorialError != nil { - logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError)) - ServerErrorResponse(c, errors.New("internal error while trying to create rotation strategy. Check server logs")) - return - } - if err := rotationStrategyCase.CanUpload(version); err != nil { - UnauthorizedResponse(c, errors.New(fmt.Sprintf("backup collection strategy declined a possibility to upload, %v", err))) - return - } - - var stream io.ReadCloser - - // [HTTP] Support form data - if c.ContentType() == "application/x-www-form-urlencoded" || c.ContentType() == "multipart/form-data" { - var openErr error - fh, ffErr := c.FormFile("file") - if ffErr != nil { - ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot read file from multipart/urlencoded form: %v", ffErr))) +func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer, requestTimeout time.Duration) { + timeoutMiddleware := timeout.New( + timeout.WithTimeout(requestTimeout), + timeout.WithHandler(func(c *gin.Context) { + // todo: deactivate token if temporary token is used + // todo: locking support! There should be no concurrent uploads to the same collection + + ctxUser, _ := GetContextUser(ctx, c) + + // Check if Collection exists + collection, findError := ctx.Collections.GetCollectionById(c.Param("collectionId")) + if findError != nil { + NotFoundResponse(c, errors.New("cannot find specified collection")) + return + } + + // [SECURITY] Check permissions + if !collection.CanUploadToMe(ctxUser) { + UnauthorizedResponse(c, errors.New("not authorized to upload versions to this collection")) + } + + // [SECURITY] Backup Windows support + if !ctx.Collections.ValidateIsBackupWindowAllowingToUpload(collection, time.Now()) && + !ctxUser.Spec.Roles.HasRole(security.RoleUploadsAnytime) { + + UnauthorizedResponse(c, errors.New("backup window does not allow you to send a backup at this time. "+ + "You need a token from a user that has a special permission 'uploadsAnytime'")) return } - stream, openErr = fh.Open() - if openErr != nil { - ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot open file from multipart/urlencoded form: %v", openErr))) + + // [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage + sessionId := GetCurrentSessionId(c) + version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0) + if factoryError != nil { + ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot increment version. %v", factoryError))) + return + } + + // [ROTATION STRATEGY] Is it allowed to upload? Is there enough space? + rotationStrategyCase, strategyFactorialError := ctx.Storage.CreateRotationStrategyCase(collection) + if strategyFactorialError != nil { + logrus.Errorf(fmt.Sprintf("Cannot create collection strategy for collectionId=%v, error: %v", collection.Metadata.Name, strategyFactorialError)) + ServerErrorResponse(c, errors.New("internal error while trying to create rotation strategy. Check server logs")) + return + } + if err := rotationStrategyCase.CanUpload(version); err != nil { + UnauthorizedResponse(c, errors.New(fmt.Sprintf("backup collection strategy declined a possibility to upload, %v", err))) + return } - defer stream.Close() - - } else { - // [HTTP] Support RAW sent data via body - stream = c.Request.Body - } - - // [VALIDATION] Middlewares - versionsToDelete := rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version) - middlewares, err := ctx.Storage.CreateStandardMiddleWares(versionsToDelete, collection) - if err != nil { - ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot construct validators %v", err))) - return - } - - // [HTTP] Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully - wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version, &middlewares) - if uploadError != nil { - _ = ctx.Storage.Delete(&version) - - ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot upload version. %v", uploadError))) - return - } - - // Set a valid filesize that is known after receiving the file - version.Filesize = wroteLen - - // Append version to the registry - if err := ctx.Storage.RegisterVersion(&version); err != nil { - _ = ctx.Storage.Delete(&version) - } - ctx.Storage.CleanUpOlderVersions(versionsToDelete) - logrus.Infof("Uploaded v%v for collectionId=%v, size=%v", version.VersionNumber, version.CollectionId, version.Filesize) - - OKResponse(c, gin.H{ - "version": version, - }) - }) + + var stream io.ReadCloser + + // [HTTP] Support form data + if c.ContentType() == "application/x-www-form-urlencoded" || c.ContentType() == "multipart/form-data" { + var openErr error + fh, ffErr := c.FormFile("file") + if ffErr != nil { + ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot read file from multipart/urlencoded form: %v", ffErr))) + return + } + stream, openErr = fh.Open() + if openErr != nil { + ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot open file from multipart/urlencoded form: %v", openErr))) + } + defer stream.Close() + + } else { + // [HTTP] Support RAW sent data via body + stream = c.Request.Body + } + + // [VALIDATION] Middlewares + versionsToDelete := rotationStrategyCase.GetVersionsThatShouldBeDeletedIfThisVersionUploaded(version) + middlewares, err := ctx.Storage.CreateStandardMiddleWares(c.Request.Context(), versionsToDelete, collection) + if err != nil { + ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot construct validators %v", err))) + return + } + + // [HTTP] Upload a file from selected source, then handle errors - delete file from storage if not uploaded successfully + wroteLen, uploadError := ctx.Storage.UploadFile(stream, &version, &middlewares) + if uploadError != nil { + _ = ctx.Storage.Delete(&version) + + ServerErrorResponse(c, errors.New(fmt.Sprintf("cannot upload version. %v", uploadError))) + return + } + + // Set a valid filesize that is known after receiving the file + version.Filesize = wroteLen + + // Append version to the registry + if err := ctx.Storage.RegisterVersion(&version); err != nil { + _ = ctx.Storage.Delete(&version) + } + ctx.Storage.CleanUpOlderVersions(versionsToDelete) + logrus.Infof("Uploaded v%v for collectionId=%v, size=%v", version.VersionNumber, version.CollectionId, version.Filesize) + + OKResponse(c, gin.H{ + "version": version, + }) + }), + timeout.WithResponse(RequestTimeoutResponse), + ) + + r.POST("/repository/collection/:collectionId/version", timeoutMiddleware) } // todo: healthcheck route diff --git a/server-go/http/main.go b/server-go/http/main.go index c0ad2b8e..ea0d63f6 100644 --- a/server-go/http/main.go +++ b/server-go/http/main.go @@ -29,7 +29,7 @@ func SpawnHttpApplication(ctx *core.ApplicationContainer) { addWhoamiRoute(router, ctx) addLogoutRoute(router, ctx) addGrantedAccessSearchRoute(router, ctx) - addUploadRoute(router, ctx) + addUploadRoute(router, ctx, 180*time.Minute) } _ = r.Run() diff --git a/server-go/http/responses.go b/server-go/http/responses.go index 3b8471c8..051a1aa9 100644 --- a/server-go/http/responses.go +++ b/server-go/http/responses.go @@ -1,6 +1,9 @@ package http -import "github.com/gin-gonic/gin" +import ( + "github.com/gin-gonic/gin" + "net/http" +) func NotFoundResponse(c *gin.Context, err error) { c.IndentedJSON(404, gin.H{ @@ -32,3 +35,11 @@ func ServerErrorResponse(c *gin.Context, err error) { "data": gin.H{}, }) } + +func RequestTimeoutResponse(c *gin.Context) { + c.IndentedJSON(http.StatusRequestTimeout, gin.H{ + "status": false, + "error": "Request took too long", + "data": gin.H{}, + }) +} diff --git a/server-go/storage/service.go b/server-go/storage/service.go index 566edfd8..583cf91f 100644 --- a/server-go/storage/service.go +++ b/server-go/storage/service.go @@ -160,7 +160,7 @@ func (s *Service) CalculateAllocatedSpaceAboveSingleVersionLimit(collection *col return allocatedSpaceAboveLimit, nil } -func (s *Service) CreateStandardMiddleWares(versionsToDelete []UploadedVersion, collection *collections.Collection) (NestedStreamMiddlewares, error) { +func (s *Service) CreateStandardMiddleWares(context context.Context, versionsToDelete []UploadedVersion, collection *collections.Collection) (NestedStreamMiddlewares, error) { maxAllowedFilesize, err := s.CalculateMaximumAllowedUploadFilesize(collection, versionsToDelete) logrus.Debugf("CalculateMaximumAllowedUploadFilesize(%v) = %v", collection.GetId(), maxAllowedFilesize) @@ -169,6 +169,7 @@ func (s *Service) CreateStandardMiddleWares(versionsToDelete []UploadedVersion, } return NestedStreamMiddlewares{ + s.createRequestCancelledMiddleware(context), s.createQuotaMaxFileSizeMiddleware(maxAllowedFilesize), s.createNonEmptyMiddleware(), s.createGPGStreamMiddleware(), diff --git a/server-go/storage/validation.go b/server-go/storage/validation.go index 78277193..86702598 100644 --- a/server-go/storage/validation.go +++ b/server-go/storage/validation.go @@ -2,8 +2,10 @@ package storage import ( "bytes" + "context" "errors" "fmt" + "github.com/sirupsen/logrus" ) // @@ -106,3 +108,19 @@ func (s *Service) createQuotaMaxFileSizeMiddleware(maxFileSize int64) streamMidd return nil }} } + +// createRequestCancelledMiddleware handles the request cancellation +func (s *Service) createRequestCancelledMiddleware(context context.Context) streamMiddleware { + return streamMiddleware{ + processor: func(i []byte, i2 int64, i3 []byte, i4 int) error { + if context.Err() != nil { + logrus.Warning(fmt.Sprintf("Upload was cancelled: %v", context.Err())) + return errors.New("upload was cancelled") + } + return nil + }, + resultReporter: func() error { + return nil + }, + } +}