Skip to content

Commit

Permalink
Fixes 5125: Add support for resumable downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrewgdewar committed Dec 27, 2024
1 parent 98de418 commit 4f7120b
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 18 deletions.
21 changes: 21 additions & 0 deletions api/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3597,9 +3597,19 @@ const docTemplate = `{
"api.CreateUploadRequest": {
"type": "object",
"required": [
"chunk_size",
"sha256",
"size"
],
"properties": {
"chunk_size": {
"description": "Size of the chunk",
"type": "integer"
},
"sha256": {
"description": "SHA-256 checksum of the file",
"type": "string"
},
"size": {
"description": "Size of the upload in bytes",
"type": "integer"
Expand Down Expand Up @@ -5127,10 +5137,21 @@ const docTemplate = `{
"api.UploadResponse": {
"type": "object",
"properties": {
"artifact_href": {
"description": "Artifact href if one exists (on create only)",
"type": "string"
},
"completed": {
"description": "Timestamp when upload is committed",
"type": "string"
},
"completed_checksums": {
"description": "A list of already completed checksums",
"type": "array",
"items": {
"type": "string"
}
},
"created": {
"description": "Timestamp of creation",
"type": "string"
Expand Down
21 changes: 21 additions & 0 deletions api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,22 @@
},
"api.CreateUploadRequest": {
"properties": {
"chunk_size": {
"description": "Size of the chunk",
"type": "integer"
},
"sha256": {
"description": "SHA-256 checksum of the file",
"type": "string"
},
"size": {
"description": "Size of the upload in bytes",
"type": "integer"
}
},
"required": [
"chunk_size",
"sha256",
"size"
],
"type": "object"
Expand Down Expand Up @@ -1571,10 +1581,21 @@
},
"api.UploadResponse": {
"properties": {
"artifact_href": {
"description": "Artifact href if one exists (on create only)",
"type": "string"
},
"completed": {
"description": "Timestamp when upload is committed",
"type": "string"
},
"completed_checksums": {
"description": "A list of already completed checksums",
"items": {
"type": "string"
},
"type": "array"
},
"created": {
"description": "Timestamp of creation",
"type": "string"
Expand Down
2 changes: 1 addition & 1 deletion db/migrations.latest
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20241113084850
20241203143614
5 changes: 5 additions & 0 deletions db/migrations/20241203143614_create_uploads_table.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

DROP TABLE IF EXISTS uploads;

COMMIT;
14 changes: 14 additions & 0 deletions db/migrations/20241203143614_create_uploads_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
BEGIN;

CREATE TABLE IF NOT EXISTS uploads (
upload_uuid TEXT NOT NULL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
org_id VARCHAR (255) NOT NULL,
chunk_size int NOT NULL,
sha256 TEXT NOT NULL,
chunk_list TEXT[] default '{}' not null
);

CREATE INDEX IF NOT EXISTS index_orgid_chunksize_sha256 ON uploads(org_id,chunk_size,sha256);

COMMIT;
4 changes: 3 additions & 1 deletion pkg/api/pulp.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package api

type CreateUploadRequest struct {
Size int64 `json:"size" validate:"required"` // Size of the upload in bytes
Size int64 `json:"size" validate:"required"` // Size of the upload in bytes
ChunkSize int64 `json:"chunk_size" validate:"required"` // Size of the chunk
Sha256 string `json:"sha256" validate:"required"` // SHA-256 checksum of the file
}

type PulpUploadChunkRequest struct {
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ type UploadChunkRequest struct {
}

type UploadResponse struct {
UploadUuid *string `json:"upload_uuid"` // Upload UUID
Created *time.Time `json:"created"` // Timestamp of creation
LastUpdated *time.Time `json:"last_updated"` // Timestamp of last update
Size int64 `json:"size"` // Size of the upload in bytes
Completed *time.Time `json:"completed,omitempty"` // Timestamp when upload is committed
ArtifactHref *string `json:"artifact_href"` // Artifact href if one exists (on create only)
CompletedChecksums []string `json:"completed_checksums"` // A list of already completed checksums
UploadUuid *string `json:"upload_uuid"` // Upload UUID
Created *time.Time `json:"created"` // Timestamp of creation
LastUpdated *time.Time `json:"last_updated"` // Timestamp of last update
Size int64 `json:"size"` // Size of the upload in bytes
Completed *time.Time `json:"completed,omitempty"` // Timestamp when upload is committed

}
8 changes: 8 additions & 0 deletions pkg/dao/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type DaoRegistry struct {
PackageGroup PackageGroupDao
Environment EnvironmentDao
Template TemplateDao
Uploads UploadDao
}

func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
Expand Down Expand Up @@ -51,6 +52,7 @@ func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
db: db,
pulpClient: pulp_client.GetPulpClientWithDomain(""),
},
Uploads: uploadDaoImpl{db: db, pulpClient: pulp_client.GetPulpClientWithDomain("")},
}
return &reg
}
Expand Down Expand Up @@ -184,3 +186,9 @@ type TemplateDao interface {
DeleteTemplateSnapshot(ctx context.Context, snapshotUUID string) error
GetRepositoryConfigurationFile(ctx context.Context, orgID string, templateUUID string) (string, error)
}

type UploadDao interface {
StoreFileUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string, chunkSize int64) error
StoreChunkUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string) error
GetExistingUploadIDAndCompletedChunks(ctx context.Context, orgID string, sha256 string, chunkSize int64) (string, []string, error)
}
68 changes: 68 additions & 0 deletions pkg/dao/uploads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package dao

import (
"context"

"github.com/content-services/content-sources-backend/pkg/models"
"github.com/content-services/content-sources-backend/pkg/pulp_client"
"gorm.io/gorm"
)

type uploadDaoImpl struct {
db *gorm.DB
pulpClient pulp_client.PulpClient
}

func GetUploadDao(db *gorm.DB, pulpClient pulp_client.PulpClient) UploadDao {
return &uploadDaoImpl{
db: db,
pulpClient: pulpClient,
}
}

func (t uploadDaoImpl) StoreFileUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string, chunkSize int64) error {
var upload models.Upload

upload.OrgID = orgID
upload.UploadUUID = uploadUUID
upload.Sha256 = sha256
upload.ChunkSize = chunkSize

upload.ChunkList = []string{}

db := t.db.Model(models.Upload{}).WithContext(ctx).Create(&upload)
if db.Error != nil {
return db.Error
}

return nil
}

func (t uploadDaoImpl) GetExistingUploadIDAndCompletedChunks(ctx context.Context, orgID string, sha256 string, chunkSize int64) (string, []string, error) {
db := t.db.Model(models.Upload{}).WithContext(ctx)

var result models.Upload

db.Where("org_id = ?", orgID).Where("chunk_size = ?", chunkSize).Where("sha256 = ?", sha256).First(&result)

if db.Error != nil {
return "", []string{}, db.Error
}

return result.UploadUUID, result.ChunkList, nil
}

func (t uploadDaoImpl) StoreChunkUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string) error {
db := t.db.Model(models.Upload{}).
WithContext(ctx).
Where("org_id = ?", orgID).
Where("upload_uuid = ?", uploadUUID).
Where("? != all(chunk_list)", sha256).
Update("chunk_list", gorm.Expr(`array_append(chunk_list, ?)`, sha256))

if db.Error != nil {
return db.Error
}

return nil
}
55 changes: 55 additions & 0 deletions pkg/dao/uploads_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package dao

import (
"context"
"testing"

"github.com/content-services/content-sources-backend/pkg/pulp_client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

type UploadsSuite struct {
*DaoSuite
mockPulpClient *pulp_client.MockPulpClient
}

func TestUploadsSuite(t *testing.T) {
m := DaoSuite{}
r := UploadsSuite{DaoSuite: &m}
suite.Run(t, &r)
}

func (s *UploadsSuite) uploadsDao() uploadDaoImpl {
return uploadDaoImpl{
db: s.tx,
pulpClient: s.mockPulpClient,
}
}
func (s *UploadsSuite) SetupTest() {
s.DaoSuite.SetupTest()
}
func (s *UploadsSuite) TestStoreFileUpload() {
uploadDao := s.uploadsDao()
ctx := context.Background()

err := uploadDao.StoreFileUpload(ctx, "bananaOrg", "bananaUUID", "bananaHash256", 16000)

assert.Equal(s.T(), err, nil)

uploadUUID, chunkList, err := uploadDao.GetExistingUploadIDAndCompletedChunks(ctx, "bananaOrg", "bananaHash256", 16000)

assert.Equal(s.T(), nil, err)
assert.Equal(s.T(), "bananaUUID", uploadUUID)
assert.Equal(s.T(), []string{}, chunkList)

err = uploadDao.StoreChunkUpload(ctx, "bananaOrg", "bananaUUID", "bananaChunkHash256")

assert.Equal(s.T(), nil, err)

uploadUUID, chunkList, err = uploadDao.GetExistingUploadIDAndCompletedChunks(ctx, "bananaOrg", "bananaHash256", 16000)

assert.Equal(s.T(), nil, err)
assert.Equal(s.T(), "bananaUUID", uploadUUID)
assert.Equal(s.T(), []string{"bananaChunkHash256"}, chunkList)
}
30 changes: 22 additions & 8 deletions pkg/handler/pulp.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@ func RegisterPulpRoutes(engine *echo.Group, daoReg *dao.DaoRegistry) {
addRepoRoute(engine, http.MethodGet, "/pulp/tasks/:task_href", pulpHandler.getTask, rbac.RbacVerbRead)
}

func (ph *PulpHandler) createUploadInternal(c echo.Context) (*zest.UploadResponse, error) {
func (ph *PulpHandler) createUploadInternal(c echo.Context, request api.CreateUploadRequest) (*zest.UploadResponse, error) {
_, orgId := getAccountIdOrgId(c)
dataInput := api.CreateUploadRequest{}
if err := c.Bind(&dataInput); err != nil {
return nil, ce.NewErrorResponse(http.StatusBadRequest, "Error binding parameters", err.Error())
}

if dataInput.Size <= 0 {
if request.Size <= 0 {
return nil, ce.NewErrorResponse(http.StatusBadRequest, "error creating upload", "upload size must be greater than 0")
}

Expand All @@ -53,16 +49,34 @@ func (ph *PulpHandler) createUploadInternal(c echo.Context) (*zest.UploadRespons
}
pulpClient := pulp_client.GetPulpClientWithDomain(domainName)

apiResponse, code, err := pulpClient.CreateUpload(c.Request().Context(), dataInput.Size)
apiResponse, code, err := pulpClient.CreateUpload(c.Request().Context(), request.Size)
if err != nil {
return nil, ce.NewErrorResponse(code, "error creating upload", err.Error())
}

// Get the upload uuid to put in the upload db
uploadUuid := ""
if apiResponse != nil && apiResponse.PulpHref != nil {
uploadUuid = extractUploadUuid(*apiResponse.PulpHref)
}

// Associate the file uploaduuid for later use
err = ph.DaoRegistry.Uploads.StoreFileUpload(c.Request().Context(), orgId, uploadUuid, request.Sha256, request.ChunkSize)

if err != nil {
return nil, err
}

return apiResponse, nil
}

func (ph *PulpHandler) createUpload(c echo.Context) error {
apiResponse, err := ph.createUploadInternal(c)
var req api.CreateUploadRequest

if err := c.Bind(&req); err != nil {
return ce.NewErrorResponse(http.StatusBadRequest, "Error binding parameters", err.Error())
}
apiResponse, err := ph.createUploadInternal(c, req)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 4f7120b

Please sign in to comment.