Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HMS-5125: Add support for resumable downloads #918

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions api/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3597,9 +3597,19 @@ const docTemplate = `{
"api.CreateUploadRequest": {
"type": "object",
"required": [
"chunk_size",
"sha256",
"size"
],
"properties": {
"chunk_size": {
"description": "Size of the chunk",
"type": "integer"
},
"sha256": {
"description": "SHA-256 checksum of the file",
"type": "string"
},
"size": {
"description": "Size of the upload in bytes",
"type": "integer"
Expand Down Expand Up @@ -5127,10 +5137,21 @@ const docTemplate = `{
"api.UploadResponse": {
"type": "object",
"properties": {
"artifact_href": {
"description": "Artifact href if one exists (on create only)",
"type": "string"
},
"completed": {
"description": "Timestamp when upload is committed",
"type": "string"
},
"completed_checksums": {
"description": "A list of already completed checksums",
"type": "array",
"items": {
"type": "string"
}
},
"created": {
"description": "Timestamp of creation",
"type": "string"
Expand Down
21 changes: 21 additions & 0 deletions api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,22 @@
},
"api.CreateUploadRequest": {
"properties": {
"chunk_size": {
"description": "Size of the chunk",
"type": "integer"
},
"sha256": {
"description": "SHA-256 checksum of the file",
"type": "string"
},
"size": {
"description": "Size of the upload in bytes",
"type": "integer"
}
},
"required": [
"chunk_size",
"sha256",
"size"
],
"type": "object"
Expand Down Expand Up @@ -1571,10 +1581,21 @@
},
"api.UploadResponse": {
"properties": {
"artifact_href": {
"description": "Artifact href if one exists (on create only)",
"type": "string"
},
"completed": {
"description": "Timestamp when upload is committed",
"type": "string"
},
"completed_checksums": {
"description": "A list of already completed checksums",
"items": {
"type": "string"
},
"type": "array"
},
"created": {
"description": "Timestamp of creation",
"type": "string"
Expand Down
2 changes: 1 addition & 1 deletion db/migrations.latest
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20241113084850
20241203143614
5 changes: 5 additions & 0 deletions db/migrations/20241203143614_create_uploads_table.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

DROP TABLE IF EXISTS uploads;

COMMIT;
14 changes: 14 additions & 0 deletions db/migrations/20241203143614_create_uploads_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
BEGIN;

CREATE TABLE IF NOT EXISTS uploads (
upload_uuid TEXT NOT NULL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
org_id VARCHAR (255) NOT NULL,
chunk_size int NOT NULL,
sha256 TEXT NOT NULL,
chunk_list TEXT[] default '{}' not null
);

CREATE INDEX IF NOT EXISTS index_orgid_chunksize_sha256 ON uploads(org_id,chunk_size,sha256);

COMMIT;
4 changes: 3 additions & 1 deletion pkg/api/pulp.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package api

type CreateUploadRequest struct {
Size int64 `json:"size" validate:"required"` // Size of the upload in bytes
Size int64 `json:"size" validate:"required"` // Size of the upload in bytes
ChunkSize int64 `json:"chunk_size" validate:"required"` // Size of the chunk
Sha256 string `json:"sha256" validate:"required"` // SHA-256 checksum of the file
}

type PulpUploadChunkRequest struct {
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ type UploadChunkRequest struct {
}

type UploadResponse struct {
UploadUuid *string `json:"upload_uuid"` // Upload UUID
Created *time.Time `json:"created"` // Timestamp of creation
LastUpdated *time.Time `json:"last_updated"` // Timestamp of last update
Size int64 `json:"size"` // Size of the upload in bytes
Completed *time.Time `json:"completed,omitempty"` // Timestamp when upload is committed
ArtifactHref *string `json:"artifact_href"` // Artifact href if one exists (on create only)
CompletedChecksums []string `json:"completed_checksums"` // A list of already completed checksums
UploadUuid *string `json:"upload_uuid"` // Upload UUID
Created *time.Time `json:"created"` // Timestamp of creation
LastUpdated *time.Time `json:"last_updated"` // Timestamp of last update
Size int64 `json:"size"` // Size of the upload in bytes
Completed *time.Time `json:"completed,omitempty"` // Timestamp when upload is committed

}
8 changes: 8 additions & 0 deletions pkg/dao/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type DaoRegistry struct {
PackageGroup PackageGroupDao
Environment EnvironmentDao
Template TemplateDao
Uploads UploadDao
}

func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
Expand Down Expand Up @@ -51,6 +52,7 @@ func GetDaoRegistry(db *gorm.DB) *DaoRegistry {
db: db,
pulpClient: pulp_client.GetPulpClientWithDomain(""),
},
Uploads: uploadDaoImpl{db: db, pulpClient: pulp_client.GetPulpClientWithDomain("")},
}
return &reg
}
Expand Down Expand Up @@ -184,3 +186,9 @@ type TemplateDao interface {
DeleteTemplateSnapshot(ctx context.Context, snapshotUUID string) error
GetRepositoryConfigurationFile(ctx context.Context, orgID string, templateUUID string) (string, error)
}

type UploadDao interface {
StoreFileUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string, chunkSize int64) error
StoreChunkUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string) error
GetExistingUploadIDAndCompletedChunks(ctx context.Context, orgID string, sha256 string, chunkSize int64) (string, []string, error)
}
68 changes: 68 additions & 0 deletions pkg/dao/uploads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package dao

import (
"context"

"github.com/content-services/content-sources-backend/pkg/models"
"github.com/content-services/content-sources-backend/pkg/pulp_client"
"gorm.io/gorm"
)

type uploadDaoImpl struct {
db *gorm.DB
pulpClient pulp_client.PulpClient
}

func GetUploadDao(db *gorm.DB, pulpClient pulp_client.PulpClient) UploadDao {
return &uploadDaoImpl{
db: db,
pulpClient: pulpClient,
}
}

func (t uploadDaoImpl) StoreFileUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string, chunkSize int64) error {
var upload models.Upload

upload.OrgID = orgID
upload.UploadUUID = uploadUUID
upload.Sha256 = sha256
upload.ChunkSize = chunkSize

upload.ChunkList = []string{}

db := t.db.Model(models.Upload{}).WithContext(ctx).Create(&upload)
if db.Error != nil {
return db.Error
}

return nil
}

func (t uploadDaoImpl) GetExistingUploadIDAndCompletedChunks(ctx context.Context, orgID string, sha256 string, chunkSize int64) (string, []string, error) {
db := t.db.Model(models.Upload{}).WithContext(ctx)

var result models.Upload

db.Where("org_id = ?", orgID).Where("chunk_size = ?", chunkSize).Where("sha256 = ?", sha256).First(&result)
Andrewgdewar marked this conversation as resolved.
Show resolved Hide resolved

if db.Error != nil {
return "", []string{}, db.Error
}

return result.UploadUUID, result.ChunkList, nil
}

func (t uploadDaoImpl) StoreChunkUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string) error {
db := t.db.Model(models.Upload{}).
WithContext(ctx).
Where("org_id = ?", orgID).
Where("upload_uuid = ?", uploadUUID).
Where("? != all(chunk_list)", sha256).
Update("chunk_list", gorm.Expr(`array_append(chunk_list, ?)`, sha256))

if db.Error != nil {
return db.Error
}

return nil
}
55 changes: 55 additions & 0 deletions pkg/dao/uploads_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package dao

import (
"context"
"testing"

"github.com/content-services/content-sources-backend/pkg/pulp_client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

type UploadsSuite struct {
*DaoSuite
mockPulpClient *pulp_client.MockPulpClient
}

func TestUploadsSuite(t *testing.T) {
m := DaoSuite{}
r := UploadsSuite{DaoSuite: &m}
suite.Run(t, &r)
}

func (s *UploadsSuite) uploadsDao() uploadDaoImpl {
return uploadDaoImpl{
db: s.tx,
pulpClient: s.mockPulpClient,
}
}
func (s *UploadsSuite) SetupTest() {
s.DaoSuite.SetupTest()
}
func (s *UploadsSuite) TestStoreFileUpload() {
uploadDao := s.uploadsDao()
ctx := context.Background()

err := uploadDao.StoreFileUpload(ctx, "bananaOrg", "bananaUUID", "bananaHash256", 16000)

assert.Equal(s.T(), err, nil)

uploadUUID, chunkList, err := uploadDao.GetExistingUploadIDAndCompletedChunks(ctx, "bananaOrg", "bananaHash256", 16000)

assert.Equal(s.T(), nil, err)
assert.Equal(s.T(), "bananaUUID", uploadUUID)
assert.Equal(s.T(), []string{}, chunkList)

err = uploadDao.StoreChunkUpload(ctx, "bananaOrg", "bananaUUID", "bananaChunkHash256")

assert.Equal(s.T(), nil, err)

uploadUUID, chunkList, err = uploadDao.GetExistingUploadIDAndCompletedChunks(ctx, "bananaOrg", "bananaHash256", 16000)

assert.Equal(s.T(), nil, err)
assert.Equal(s.T(), "bananaUUID", uploadUUID)
assert.Equal(s.T(), []string{"bananaChunkHash256"}, chunkList)
}
30 changes: 22 additions & 8 deletions pkg/handler/pulp.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@ func RegisterPulpRoutes(engine *echo.Group, daoReg *dao.DaoRegistry) {
addRepoRoute(engine, http.MethodGet, "/pulp/tasks/:task_href", pulpHandler.getTask, rbac.RbacVerbRead)
}

func (ph *PulpHandler) createUploadInternal(c echo.Context) (*zest.UploadResponse, error) {
func (ph *PulpHandler) createUploadInternal(c echo.Context, request api.CreateUploadRequest) (*zest.UploadResponse, error) {
_, orgId := getAccountIdOrgId(c)
dataInput := api.CreateUploadRequest{}
if err := c.Bind(&dataInput); err != nil {
return nil, ce.NewErrorResponse(http.StatusBadRequest, "Error binding parameters", err.Error())
}

if dataInput.Size <= 0 {
if request.Size <= 0 {
return nil, ce.NewErrorResponse(http.StatusBadRequest, "error creating upload", "upload size must be greater than 0")
}

Expand All @@ -53,16 +49,34 @@ func (ph *PulpHandler) createUploadInternal(c echo.Context) (*zest.UploadRespons
}
pulpClient := pulp_client.GetPulpClientWithDomain(domainName)

apiResponse, code, err := pulpClient.CreateUpload(c.Request().Context(), dataInput.Size)
apiResponse, code, err := pulpClient.CreateUpload(c.Request().Context(), request.Size)
if err != nil {
return nil, ce.NewErrorResponse(code, "error creating upload", err.Error())
}

// Get the upload uuid to put in the upload db
uploadUuid := ""
if apiResponse != nil && apiResponse.PulpHref != nil {
uploadUuid = extractUploadUuid(*apiResponse.PulpHref)
}

// Associate the file uploaduuid for later use
err = ph.DaoRegistry.Uploads.StoreFileUpload(c.Request().Context(), orgId, uploadUuid, request.Sha256, request.ChunkSize)

if err != nil {
return nil, err
}

return apiResponse, nil
}

func (ph *PulpHandler) createUpload(c echo.Context) error {
apiResponse, err := ph.createUploadInternal(c)
var req api.CreateUploadRequest

if err := c.Bind(&req); err != nil {
return ce.NewErrorResponse(http.StatusBadRequest, "Error binding parameters", err.Error())
}
apiResponse, err := ph.createUploadInternal(c, req)
if err != nil {
return err
}
Expand Down
Loading
Loading