Skip to content

Commit

Permalink
HMS-5224: fix snapshot task cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikvagner committed Jan 14, 2025
1 parent bff6903 commit 8c9b3c2
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 34 deletions.
31 changes: 25 additions & 6 deletions pkg/tasks/repository_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,19 @@ func (sr *SnapshotRepository) Run() (err error) {

defer func() {
if errors.Is(err, context.Canceled) {
cleanupErr := sr.cleanupOnCancel()
cleanupErr := helper.Cleanup()
if cleanupErr != nil {
sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper")
}
cleanupErr = sr.cleanupOnCancel()
if cleanupErr != nil {
sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot")
}
cleanupErr = helper.Cleanup()
cleanupErr = sr.UpdatePayload()
if cleanupErr != nil {
sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper")
sr.logger.Err(cleanupErr).Msg("error cleaning up payload")
}
sr.logger.Warn().Msg("task was cancelled")
}
}()

Expand Down Expand Up @@ -252,11 +257,13 @@ func (sr *SnapshotRepository) cleanupOnCancel() error {
if err != nil {
return err
}
sr.payload.SyncTaskHref = nil
if sr.payload.PublicationTaskHref != nil {
_, err := pulpClient.CancelTask(ctxWithLogger, *sr.payload.PublicationTaskHref)
if err != nil {
return err
}
sr.payload.PublicationTaskHref = nil
}
versionHref := pulp_client.SelectVersionHref(&task)
if versionHref != nil {
Expand Down Expand Up @@ -323,7 +330,11 @@ func (sr *SnapshotRepository) GetOrphanedLatestVersion(repoConfigUUID string) (*
}

func (sr *SnapshotRepository) SavePublicationTaskHref(href string) error {
sr.payload.PublicationTaskHref = &href
if href == "" {
sr.payload.PublicationTaskHref = nil
} else {
sr.payload.PublicationTaskHref = &href
}
return sr.UpdatePayload()
}

Expand All @@ -332,7 +343,11 @@ func (sr *SnapshotRepository) GetPublicationTaskHref() *string {
}

func (sr *SnapshotRepository) SaveDistributionTaskHref(href string) error {
sr.payload.DistributionTaskHref = &href
if href == "" {
sr.payload.DistributionTaskHref = nil
} else {
sr.payload.DistributionTaskHref = &href
}
return sr.UpdatePayload()
}

Expand All @@ -341,7 +356,11 @@ func (sr *SnapshotRepository) GetDistributionTaskHref() *string {
}

func (sr *SnapshotRepository) SaveSnapshotIdent(id string) error {
sr.payload.SnapshotIdent = &id
if id == "" {
sr.payload.SnapshotIdent = nil
} else {
sr.payload.SnapshotIdent = &id
}
return sr.UpdatePayload()
}

Expand Down
22 changes: 4 additions & 18 deletions pkg/tasks/repository_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,32 +204,18 @@ func (s *SnapshotSuite) TestSnapshotResyncWithOrphanVersion() {
s.MockPulpClient.On("UpdateDomainIfNeeded", ctx, domainName).Return(nil)
s.MockPulpClient.On("SyncRpmRepository", ctx, *(repoResp.PulpHref), &remoteHref).Return(taskHref, nil)

_, syncTask := s.mockSync(ctx, taskHref, false)
_, _ = s.mockSync(ctx, taskHref, false)

task := models.TaskInfo{
Id: uuid.UUID{},
OrgId: repoConfig.OrgID,
ObjectUUID: repoUuid,
ObjectType: utils.Ptr(config.ObjectTypeRepository)}

pubHref, pubTask := s.mockPublish(ctx, existingVersionHref, false)
distHref, distTask := s.mockCreateDist(ctx, pubHref)
pubHref, _ := s.mockPublish(ctx, existingVersionHref, false)
distHref, _ := s.mockCreateDist(ctx, pubHref)

s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{
SnapshotIdent: &snapshotId,
SyncTaskHref: &syncTask,
}).Return(&task, nil)
s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{
SnapshotIdent: &snapshotId,
SyncTaskHref: &syncTask,
PublicationTaskHref: &pubTask,
}).Return(&task, nil)
s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{
SnapshotIdent: &snapshotId,
SyncTaskHref: &syncTask,
PublicationTaskHref: &pubTask,
DistributionTaskHref: &distTask,
}).Return(&task, nil)
s.MockQueue.On("UpdatePayload", &task, mock.Anything).Return(&task, nil)

// Lookup the version
counts := zest.ContentSummaryResponse{
Expand Down
71 changes: 61 additions & 10 deletions pkg/tasks/snapshot_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tasks

import (
"context"
"errors"
"fmt"
"path/filepath"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/content-services/content-sources-backend/pkg/tasks/helpers"
"github.com/google/uuid"
"github.com/rs/zerolog"
"gorm.io/gorm"
)

// ResumableSnapshotInterface used to store various references needed
Expand Down Expand Up @@ -62,9 +64,12 @@ func (sh *SnapshotHelper) Run(versionHref string) error {
if err != nil {
return err
}
err = sh.payload.SaveDistributionTaskHref(distHref)
if err != nil {
return fmt.Errorf("unable to save distribution task href: %w", err)
}

latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID)

_, _, err = helper.CreateOrUpdateDistribution(sh.orgId, publicationHref, sh.repo.UUID, latestPathIdent)
if err != nil {
return err
Expand Down Expand Up @@ -97,33 +102,79 @@ func (sh *SnapshotHelper) Run(versionHref string) error {
if err != nil {
return err
}
err = sh.payload.SaveSnapshotIdent(snap.UUID)
if err != nil {
return fmt.Errorf("unable to save snapshot ident: %w", err)
}
err = sh.payload.SaveDistributionTaskHref(snap.DistributionHref)
if err != nil {
return fmt.Errorf("unable to save distribution task href: %w", err)
}

return nil
}

func (sh *SnapshotHelper) Cleanup() error {
if sh.payload.GetDistributionTaskHref() != nil {
task, err := sh.pulpClient.CancelTask(sh.ctx, *sh.payload.GetDistributionTaskHref())
if distHref := sh.payload.GetDistributionTaskHref(); distHref != nil {
deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *distHref)
if err != nil {
return err
}
task, err = sh.pulpClient.GetTask(sh.ctx, *sh.payload.GetDistributionTaskHref())
_, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref)
if err != nil {
return err
}
versionHref := pulp_client.SelectRpmDistributionHref(&task)
if versionHref != nil {
_, err = sh.pulpClient.DeleteRpmDistribution(sh.ctx, *versionHref)
if err != nil {
return err
}

err = sh.payload.SavePublicationTaskHref("")
if err != nil {
return err
}
err = sh.payload.SaveDistributionTaskHref("")
if err != nil {
return err
}
}

if sh.payload.GetSnapshotIdent() != nil {
err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotIdent())
if err != nil {
return err
}
}
err := sh.payload.SaveSnapshotIdent("")
if err != nil {
return err
}

helper := helpers.NewPulpDistributionHelper(sh.ctx, sh.pulpClient)
latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID)
latestDistro, err := sh.pulpClient.FindDistributionByPath(sh.ctx, latestPathIdent)
if err != nil {
return err
}
if latestDistro != nil {
latestSnap, err := sh.daoReg.Snapshot.FetchLatestSnapshotModel(sh.ctx, sh.repo.UUID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *latestDistro.PulpHref)
if err != nil {
return err
}
_, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref)
if err != nil {
return err
}
return nil
}
return err
}

_, _, err = helper.CreateOrUpdateDistribution(sh.repo.OrgID, latestSnap.PublicationHref, sh.repo.UUID, latestPathIdent)
if err != nil {
return err
}
}

return nil
}

Expand Down

0 comments on commit 8c9b3c2

Please sign in to comment.