Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
feat: #164 implement locking for upload endpoint, so there will be no…
Browse files Browse the repository at this point in the history
… parallel uploads to the same collection
  • Loading branch information
blackandred committed Feb 22, 2022
1 parent c460f35 commit f5a0b7a
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 6 deletions.
1 change: 1 addition & 0 deletions server-go/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/.build
10 changes: 5 additions & 5 deletions server-go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ test_upload_by_form_1mb:
@echo "-----BEGIN PGP MESSAGE-----" > /tmp/1mb.gpg
@openssl rand -base64 $$((735*1024*1)) >> /tmp/1mb.gpg
@echo "-----END PGP MESSAGE-----" >> /tmp/1mb.gpg
curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/1mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version'
curl -vvv -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/1mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' --limit-rate 100K

test_upload_by_form_5mb:
@echo "-----BEGIN PGP MESSAGE-----" > /tmp/5mb.gpg
@openssl rand -base64 $$((735*1024*5)) >> /tmp/5mb.gpg
@echo "-----END PGP MESSAGE-----" >> /tmp/5mb.gpg
curl -s -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/5mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version'
curl -vvv -X POST -H 'Authorization: Bearer ${TOKEN}' -F "file=@/tmp/5mb.gpg" 'http://localhost:8080/api/stable/repository/collection/iwa-ait/version' --limit-rate 1000K

postgres:
docker run -d \
Expand All @@ -48,21 +48,21 @@ postgres:
-e POSTGRES_USER=postgres \
-e POSTGRES_DB=postgres \
-e PGDATA=/var/lib/postgresql/data/pgdata \
-v /tmp/br_postgres:/var/lib/postgresql/data \
-v $$(pwd)/.build/postgres:/var/lib/postgresql \
-p 5432:5432 \
postgres:14.1-alpine

postgres_refresh:
docker rm -f br_postgres || true
sudo rm -rf /tmp/br_postgres
sudo rm -rf $$(pwd)/.build/postgres
make postgres

minio:
docker run -d \
--name br_minio \
-p 9000:9000 \
-p 9001:9001 \
-v /tmp/br_minio:/data \
-v $$(pwd)/.build/minio:/data \
-e "MINIO_ROOT_USER=AKIAIOSFODNN7EXAMPLE" \
-e "MINIO_ROOT_PASSWORD=wJaFuCKtnFEMI/CApItaliSM/bPxRfiCYEXAMPLEKEY" \
quay.io/minio/minio:RELEASE.2022-02-16T00-35-27Z server /data --console-address 0.0.0.0:9001
4 changes: 4 additions & 0 deletions server-go/collections/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,7 @@ func (c *Collection) getMaxCollectionSizeInBytes() (int64, error) {
func (c *Collection) GetId() string {
return c.Metadata.Name
}

func (c Collection) GetGlobalIdentifier() string {
return "collection:" + c.GetId()
}
78 changes: 78 additions & 0 deletions server-go/concurrency/locking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package concurrency

import (
"database/sql"
"errors"
"fmt"
"gorm.io/gorm"
"math/rand"
"time"
)

type LocksService struct {
db *gorm.DB
}

func (ls *LocksService) Lock(id string, howLong time.Duration) (Lock, error) {
if ls.isLockedAlready(id) {
return Lock{}, errors.New("already locked")
}
if err := ls.addLock(id, howLong); err != nil {
return Lock{}, errors.New(fmt.Sprintf("cannot lock transaction, %v", err))
}
return Lock{
Id: id,
unlock: func() {
ls.unlock(id)
},
}, nil
}

func (ls *LocksService) addLock(id string, howLong time.Duration) error {
expiration := time.Now().Add(howLong)
return ls.db.Exec("INSERT INTO locks (id, expires) VALUES (@id, @expires);", sql.Named("id", id), sql.Named("expires", expiration)).Error
}

func (ls *LocksService) unlock(id string) {
ls.db.Exec("DELETE FROM locks WHERE locks.id = @id", sql.Named("id", id))
}

func (ls *LocksService) isLockedAlready(id string) bool {
var result int
ls.db.Raw("SELECT count(*) FROM locks WHERE locks.id = @id AND locks.expires > @now", sql.Named("id", id), sql.Named("now", time.Now())).Scan(&result)

if ls.shouldPerformCleanUpNow() {
ls.cleanUp()
}

return result > 0
}

func (ls *LocksService) cleanUp() {
ls.db.Exec("DELETE FROM locks WHERE locks.expires < @now", sql.Named("now", time.Now()))
}

func (ls *LocksService) shouldPerformCleanUpNow() bool {
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)

return r1.Intn(5) == 2 // PN-VI
}

func InitializeModel(db *gorm.DB) error {
return db.AutoMigrate(&Lock{})
}

func NewService(db *gorm.DB) LocksService {
return LocksService{db}
}

type Lock struct {
Id string
Expires time.Time
unlock func()
}

func (l *Lock) Unlock() {
l.unlock()
}
2 changes: 2 additions & 0 deletions server-go/core/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"github.com/riotkit-org/backup-repository/collections"
"github.com/riotkit-org/backup-repository/concurrency"
"github.com/riotkit-org/backup-repository/config"
"github.com/riotkit-org/backup-repository/security"
"github.com/riotkit-org/backup-repository/storage"
Expand All @@ -15,4 +16,5 @@ type ApplicationContainer struct {
Collections *collections.Service
Storage *storage.Service
JwtSecretKey string
Locks *concurrency.LocksService
}
5 changes: 5 additions & 0 deletions server-go/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"fmt"
"github.com/riotkit-org/backup-repository/concurrency"
"github.com/riotkit-org/backup-repository/security"
"github.com/riotkit-org/backup-repository/storage"
"github.com/sirupsen/logrus"
Expand All @@ -25,6 +26,10 @@ func InitializeDatabase(db *gorm.DB) bool {
logrus.Errorf("Cannot initialize UploadedVersion model: %v", err)
return false
}
if err := concurrency.InitializeModel(db); err != nil {
logrus.Errorf("Cannot initialize Locks model: %v", err)
return false
}

return true
}
9 changes: 8 additions & 1 deletion server-go/http/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer, requestT
timeout.WithTimeout(requestTimeout),
timeout.WithHandler(func(c *gin.Context) {
// todo: deactivate token if temporary token is used
// todo: locking support! There should be no concurrent uploads to the same collection

ctxUser, _ := GetContextUser(ctx, c)

Expand All @@ -42,6 +41,14 @@ func addUploadRoute(r *gin.RouterGroup, ctx *core.ApplicationContainer, requestT
return
}

// [SECURITY] Do not allow parallel uploads to the same collection
lock, lockErr := ctx.Locks.Lock(collection.GetGlobalIdentifier(), requestTimeout)
if lockErr != nil {
ServerErrorResponse(c, errors.New("cannot upload to same collection in parallel"))
return
}
defer lock.Unlock()

// [ROTATION STRATEGY][VERSIONING] Increment a version, generate target file path name that will be used on storage
sessionId := GetCurrentSessionId(c)
version, factoryError := ctx.Storage.CreateNewVersionFromCollection(collection, ctxUser.Metadata.Name, sessionId, 0)
Expand Down
3 changes: 3 additions & 0 deletions server-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"github.com/jessevdk/go-flags"
"github.com/riotkit-org/backup-repository/collections"
"github.com/riotkit-org/backup-repository/concurrency"
"github.com/riotkit-org/backup-repository/config"
"github.com/riotkit-org/backup-repository/core"
"github.com/riotkit-org/backup-repository/db"
Expand Down Expand Up @@ -66,6 +67,7 @@ func main() {
log.Errorln("Cannot initialize database connection")
log.Fatal(err)
}
locksService := concurrency.NewService(dbDriver)
db.InitializeDatabase(dbDriver)

usersService := users.NewUsersService(configProvider)
Expand All @@ -84,6 +86,7 @@ func main() {
JwtSecretKey: opts.JwtSecretKey,
Collections: &collectionsService,
Storage: &storageService,
Locks: &locksService,
}

// todo: First thread - HTTP
Expand Down

0 comments on commit f5a0b7a

Please sign in to comment.