Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
feat: #164 implemented storage write validation and storage factory v…
Browse files Browse the repository at this point in the history
…alidation
  • Loading branch information
blackandred committed Feb 17, 2022
1 parent b1e48bf commit 221282e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
15 changes: 13 additions & 2 deletions server-go/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,28 @@ func (s *Service) CreateNewVersionFromCollection(c *collections.Collection, uplo
}, nil
}

// NewService is a factory method that knows how to construct a Storage provider, distincting multiple types of providers
func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error) {
repository := VersionsRepository{db: db}

// Google Cloud requires extra support
if isUsingGCS {
gcsCredentials, err := gcp.DefaultCredentials(context.Background())
gcsCredentials, err := gcp.DefaultCredentials(context.TODO())
if err != nil {
return Service{}, errors.New(fmt.Sprintf("cannot grab credentials for Google Cloud Storage: %v", err))
}
client, loginErr := gcp.NewHTTPClient(gcp.DefaultTransport(), gcp.CredentialsTokenSource(gcsCredentials))
if loginErr != nil {
return Service{}, errors.New(fmt.Sprintf("cannot login to Google Cloud Storage: %v", loginErr))
}
driver, openErr := gcsblob.OpenBucket(context.Background(), client, driverUrl, nil)
driver, openErr := gcsblob.OpenBucket(context.TODO(), client, driverUrl, nil)
if openErr != nil {
return Service{}, errors.New(fmt.Sprintf("cannot open Google Cloud Storage bucket: %v", openErr))
}
if result, err := driver.IsAccessible(context.TODO()); err != nil || !result {
logrus.Warningln("If connection status is still failing without a message then, check if bucket exists")
return Service{}, errors.New(fmt.Sprintf("Google Cloud Storage bucket is not accessible: %v || connection status = %v", err, result))
}
return Service{storage: driver, repository: &repository}, nil
}

Expand All @@ -74,6 +79,12 @@ func NewService(db *gorm.DB, driverUrl string, isUsingGCS bool) (Service, error)
logrus.Errorf("Cannot construct storage driver: %v", err)
return Service{}, err
}
if result, err := driver.IsAccessible(context.TODO()); err != nil || !result {
logrus.Warningln("For S3-compatible adapters it may be need to set environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY")
logrus.Warningln("For Min.io example connection string is: 's3://mybucket?endpoint=localhost:9000&disableSSL=true&s3ForcePathStyle=true&region=eu-central-1'")
logrus.Warningln("If connection status is still failing without a message then, check if bucket exists")
return Service{}, errors.New(fmt.Sprintf("bucket is not accessible: %v || connection status = %v", err, result))
}

return Service{storage: driver, repository: &repository}, nil
}
21 changes: 19 additions & 2 deletions server-go/storage/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"gocloud.dev/blob"
"io"
)

func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion) (int, error) {
writeStream, err := s.storage.NewWriter(context.Background(), version.GetTargetPath(), &blob.WriterOptions{})
defer writeStream.Close()
defer func() {
writeStream.Close()
}()

if err != nil {
return 0, errors.New(fmt.Sprintf("cannot upload file, attempted to open a writable stream, error: %v", err))
Expand All @@ -22,11 +25,25 @@ func (s *Service) UploadFile(inputStream io.ReadCloser, version *UploadedVersion
s.createGPGStreamMiddleware(),
}

wroteLen, writeErr := s.CopyStream(inputStream, writeStream, 1024, middlewares)
wroteLen, writeErr := s.CopyStream(inputStream, writeStream, 1024*1024, middlewares)
if writeErr != nil {
return wroteLen, errors.New(fmt.Sprintf("cannot upload file, cannot copy stream, error: %v", writeErr))
}

// Check if file exists at the storage
_ = writeStream.Close()
if exists, err := s.storage.Exists(context.TODO(), version.GetTargetPath()); !exists || err != nil {
logrus.Error(fmt.Sprintf("file was uploaded but does not exists on the storage at path '%v'. Error: %v", version.GetTargetPath(), err))
return wroteLen, errors.New("storage error")
}

// Check if filesize matches buffered stream size
attributes, err := s.storage.Attributes(context.TODO(), version.GetTargetPath())
if attributes.Size != int64(wroteLen) {
logrus.Errorln(fmt.Sprintf("file written to the storage does not match uploaded file, the filesize is not matching %v != %v for file '%v'", wroteLen, attributes.Size, version.GetTargetPath()))
return wroteLen, errors.New("storage error")
}

return wroteLen, nil
}

Expand Down

0 comments on commit 221282e

Please sign in to comment.