Skip to content

Commit

Permalink
HMS-5244: fix snapshot task cleanup (#939)
Browse files Browse the repository at this point in the history
* HMS-5224: fix snapshot task cleanup

* fix: cleanup, save snap uuid along ident

* fix: rebase fixes
  • Loading branch information
dominikvagner authored Jan 31, 2025
1 parent 13ef705 commit cb2da85
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 38 deletions.
4 changes: 1 addition & 3 deletions pkg/dao/repository_configs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/tasks/add_uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type AddUploadsPayload struct {
PublicationTaskHref *string
DistributionTaskHref *string
SnapshotIdent *string
SnapshotUUID *string
}

type AddUploads struct {
Expand Down Expand Up @@ -301,6 +302,15 @@ func (ur *AddUploads) GetSnapshotIdent() *string {
return ur.payload.SnapshotIdent
}

func (ur *AddUploads) SaveSnapshotUUID(uuid string) error {
ur.payload.SnapshotUUID = &uuid
return ur.UpdatePayload()
}

func (ur *AddUploads) GetSnapshotUUID() *string {
return ur.payload.SnapshotUUID
}

func (ur *AddUploads) ImportPackageData(versionHref string) error {
pkgs, err := ur.pulpClient.ListVersionAllPackages(ur.ctx, versionHref)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/tasks/payloads/repository_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const Snapshot = "snapshot"

type SnapshotPayload struct {
SnapshotIdent *string
SnapshotUUID *string
SyncTaskHref *string
PublicationTaskHref *string
DistributionTaskHref *string
Expand Down
44 changes: 38 additions & 6 deletions pkg/tasks/repository_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,19 @@ func (sr *SnapshotRepository) Run() (err error) {

defer func() {
if errors.Is(err, context.Canceled) {
cleanupErr := sr.cleanupOnCancel()
cleanupErr := helper.Cleanup()
if cleanupErr != nil {
sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper")
}
cleanupErr = sr.cleanupOnCancel()
if cleanupErr != nil {
sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot")
}
cleanupErr = helper.Cleanup()
cleanupErr = sr.UpdatePayload()
if cleanupErr != nil {
sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper")
sr.logger.Err(cleanupErr).Msg("error cleaning up payload")
}
sr.logger.Warn().Msg("task was cancelled")
}
}()

Expand Down Expand Up @@ -252,11 +257,13 @@ func (sr *SnapshotRepository) cleanupOnCancel() error {
if err != nil {
return err
}
sr.payload.SyncTaskHref = nil
if sr.payload.PublicationTaskHref != nil {
_, err := pulpClient.CancelTask(ctxWithLogger, *sr.payload.PublicationTaskHref)
if err != nil {
return err
}
sr.payload.PublicationTaskHref = nil
}
versionHref := pulp_client.SelectVersionHref(&task)
if versionHref != nil {
Expand Down Expand Up @@ -323,7 +330,11 @@ func (sr *SnapshotRepository) GetOrphanedLatestVersion(repoConfigUUID string) (*
}

func (sr *SnapshotRepository) SavePublicationTaskHref(href string) error {
sr.payload.PublicationTaskHref = &href
if href == "" {
sr.payload.PublicationTaskHref = nil
} else {
sr.payload.PublicationTaskHref = &href
}
return sr.UpdatePayload()
}

Expand All @@ -332,7 +343,11 @@ func (sr *SnapshotRepository) GetPublicationTaskHref() *string {
}

func (sr *SnapshotRepository) SaveDistributionTaskHref(href string) error {
sr.payload.DistributionTaskHref = &href
if href == "" {
sr.payload.DistributionTaskHref = nil
} else {
sr.payload.DistributionTaskHref = &href
}
return sr.UpdatePayload()
}

Expand All @@ -341,10 +356,27 @@ func (sr *SnapshotRepository) GetDistributionTaskHref() *string {
}

func (sr *SnapshotRepository) SaveSnapshotIdent(id string) error {
sr.payload.SnapshotIdent = &id
if id == "" {
sr.payload.SnapshotIdent = nil
} else {
sr.payload.SnapshotIdent = &id
}
return sr.UpdatePayload()
}

func (sr *SnapshotRepository) GetSnapshotIdent() *string {
return sr.payload.SnapshotIdent
}

func (sr *SnapshotRepository) SaveSnapshotUUID(uuid string) error {
if uuid == "" {
sr.payload.SnapshotUUID = nil
} else {
sr.payload.SnapshotUUID = &uuid
}
return sr.UpdatePayload()
}

func (sr *SnapshotRepository) GetSnapshotUUID() *string {
return sr.payload.SnapshotUUID
}
22 changes: 4 additions & 18 deletions pkg/tasks/repository_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,32 +204,18 @@ func (s *SnapshotSuite) TestSnapshotResyncWithOrphanVersion() {
s.MockPulpClient.On("UpdateDomainIfNeeded", ctx, domainName).Return(nil)
s.MockPulpClient.On("SyncRpmRepository", ctx, *(repoResp.PulpHref), &remoteHref).Return(taskHref, nil)

_, syncTask := s.mockSync(ctx, taskHref, false)
_, _ = s.mockSync(ctx, taskHref, false)

task := models.TaskInfo{
Id: uuid.UUID{},
OrgId: repoConfig.OrgID,
ObjectUUID: repoUuid,
ObjectType: utils.Ptr(config.ObjectTypeRepository)}

pubHref, pubTask := s.mockPublish(ctx, existingVersionHref, false)
distHref, distTask := s.mockCreateDist(ctx, pubHref)
pubHref, _ := s.mockPublish(ctx, existingVersionHref, false)
distHref, _ := s.mockCreateDist(ctx, pubHref)

s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{
SnapshotIdent: &snapshotId,
SyncTaskHref: &syncTask,
}).Return(&task, nil)
s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{
SnapshotIdent: &snapshotId,
SyncTaskHref: &syncTask,
PublicationTaskHref: &pubTask,
}).Return(&task, nil)
s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{
SnapshotIdent: &snapshotId,
SyncTaskHref: &syncTask,
PublicationTaskHref: &pubTask,
DistributionTaskHref: &distTask,
}).Return(&task, nil)
s.MockQueue.On("UpdatePayload", &task, mock.Anything).Return(&task, nil)

// Lookup the version
counts := zest.ContentSummaryResponse{
Expand Down
73 changes: 62 additions & 11 deletions pkg/tasks/snapshot_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tasks

import (
"context"
"errors"
"fmt"
"path/filepath"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/content-services/content-sources-backend/pkg/tasks/helpers"
"github.com/google/uuid"
"github.com/rs/zerolog"
"gorm.io/gorm"
)

// ResumableSnapshotInterface used to store various references needed
Expand All @@ -26,6 +28,9 @@ type ResumableSnapshotInterface interface {

SaveSnapshotIdent(id string) error
GetSnapshotIdent() *string

SaveSnapshotUUID(uuid string) error
GetSnapshotUUID() *string
}

// SnapshotHelper is meant to be used by another task, and be able to turn a repository Version into a
Expand Down Expand Up @@ -62,6 +67,10 @@ func (sh *SnapshotHelper) Run(versionHref string) error {
if err != nil {
return err
}
err = sh.payload.SaveDistributionTaskHref(distHref)
if err != nil {
return fmt.Errorf("unable to save distribution task href: %w", err)
}

latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID)

Expand Down Expand Up @@ -96,33 +105,75 @@ func (sh *SnapshotHelper) Run(versionHref string) error {
if err != nil {
return err
}
err = sh.payload.SaveSnapshotUUID(snap.UUID)
if err != nil {
return fmt.Errorf("unable to save snapshot uuid: %w", err)
}

return nil
}

func (sh *SnapshotHelper) Cleanup() error {
if sh.payload.GetDistributionTaskHref() != nil {
task, err := sh.pulpClient.CancelTask(sh.ctx, *sh.payload.GetDistributionTaskHref())
if distHref := sh.payload.GetDistributionTaskHref(); distHref != nil {
deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *distHref)
if err != nil {
return err
}
task, err = sh.pulpClient.GetTask(sh.ctx, *sh.payload.GetDistributionTaskHref())
_, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref)
if err != nil {
return err
}
versionHref := pulp_client.SelectRpmDistributionHref(&task)
if versionHref != nil {
_, err = sh.pulpClient.DeleteRpmDistribution(sh.ctx, *versionHref)
if err != nil {
return err
}

err = sh.payload.SavePublicationTaskHref("")
if err != nil {
return err
}
err = sh.payload.SaveDistributionTaskHref("")
if err != nil {
return err
}
}

if sh.payload.GetSnapshotUUID() != nil {
err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotUUID())
if err != nil {
return err
}
}
if sh.payload.GetSnapshotIdent() != nil {
err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotIdent())
err := sh.payload.SaveSnapshotUUID("")
if err != nil {
return err
}

helper := helpers.NewPulpDistributionHelper(sh.ctx, sh.pulpClient)
latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID)
latestDistro, err := sh.pulpClient.FindDistributionByPath(sh.ctx, latestPathIdent)
if err != nil {
return err
}
if latestDistro != nil {
latestSnap, err := sh.daoReg.Snapshot.FetchLatestSnapshotModel(sh.ctx, sh.repo.UUID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *latestDistro.PulpHref)
if err != nil {
return err
}
_, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref)
if err != nil {
return err
}
return nil
}
return err
}

_, err = helper.CreateOrUpdateDistribution(sh.repo, latestSnap.PublicationHref, sh.repo.UUID, latestPathIdent)
if err != nil {
return err
}
}

return nil
}

Expand Down

0 comments on commit cb2da85

Please sign in to comment.