From 778aa66625b87aee42798a70affea571002995f2 Mon Sep 17 00:00:00 2001 From: Dominik Vagner Date: Tue, 14 Jan 2025 12:40:40 +0100 Subject: [PATCH 1/3] HMS-5224: fix snapshot task cleanup --- pkg/tasks/repository_snapshot.go | 31 +++++++++--- pkg/tasks/repository_snapshot_test.go | 22 ++------- pkg/tasks/snapshot_helper.go | 70 +++++++++++++++++++++++---- 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/pkg/tasks/repository_snapshot.go b/pkg/tasks/repository_snapshot.go index 2263c3fae..e40b7e2f9 100644 --- a/pkg/tasks/repository_snapshot.go +++ b/pkg/tasks/repository_snapshot.go @@ -94,14 +94,19 @@ func (sr *SnapshotRepository) Run() (err error) { defer func() { if errors.Is(err, context.Canceled) { - cleanupErr := sr.cleanupOnCancel() + cleanupErr := helper.Cleanup() + if cleanupErr != nil { + sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper") + } + cleanupErr = sr.cleanupOnCancel() if cleanupErr != nil { sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot") } - cleanupErr = helper.Cleanup() + cleanupErr = sr.UpdatePayload() if cleanupErr != nil { - sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper") + sr.logger.Err(cleanupErr).Msg("error cleaning up payload") } + sr.logger.Warn().Msg("task was cancelled") } }() @@ -252,11 +257,13 @@ func (sr *SnapshotRepository) cleanupOnCancel() error { if err != nil { return err } + sr.payload.SyncTaskHref = nil if sr.payload.PublicationTaskHref != nil { _, err := pulpClient.CancelTask(ctxWithLogger, *sr.payload.PublicationTaskHref) if err != nil { return err } + sr.payload.PublicationTaskHref = nil } versionHref := pulp_client.SelectVersionHref(&task) if versionHref != nil { @@ -323,7 +330,11 @@ func (sr *SnapshotRepository) GetOrphanedLatestVersion(repoConfigUUID string) (* } func (sr *SnapshotRepository) SavePublicationTaskHref(href string) error { - sr.payload.PublicationTaskHref = &href + if href == "" { + sr.payload.PublicationTaskHref = nil + } else { + sr.payload.PublicationTaskHref = &href + } return sr.UpdatePayload() } @@ -332,7 +343,11 @@ func (sr *SnapshotRepository) GetPublicationTaskHref() *string { } func (sr *SnapshotRepository) SaveDistributionTaskHref(href string) error { - sr.payload.DistributionTaskHref = &href + if href == "" { + sr.payload.DistributionTaskHref = nil + } else { + sr.payload.DistributionTaskHref = &href + } return sr.UpdatePayload() } @@ -341,7 +356,11 @@ func (sr *SnapshotRepository) GetDistributionTaskHref() *string { } func (sr *SnapshotRepository) SaveSnapshotIdent(id string) error { - sr.payload.SnapshotIdent = &id + if id == "" { + sr.payload.SnapshotIdent = nil + } else { + sr.payload.SnapshotIdent = &id + } return sr.UpdatePayload() } diff --git a/pkg/tasks/repository_snapshot_test.go b/pkg/tasks/repository_snapshot_test.go index bbeecdac6..45aa2aa54 100644 --- a/pkg/tasks/repository_snapshot_test.go +++ b/pkg/tasks/repository_snapshot_test.go @@ -204,7 +204,7 @@ func (s *SnapshotSuite) TestSnapshotResyncWithOrphanVersion() { s.MockPulpClient.On("UpdateDomainIfNeeded", ctx, domainName).Return(nil) s.MockPulpClient.On("SyncRpmRepository", ctx, *(repoResp.PulpHref), &remoteHref).Return(taskHref, nil) - _, syncTask := s.mockSync(ctx, taskHref, false) + _, _ = s.mockSync(ctx, taskHref, false) task := models.TaskInfo{ Id: uuid.UUID{}, @@ -212,24 +212,10 @@ func (s *SnapshotSuite) TestSnapshotResyncWithOrphanVersion() { ObjectUUID: repoUuid, ObjectType: utils.Ptr(config.ObjectTypeRepository)} - pubHref, pubTask := s.mockPublish(ctx, existingVersionHref, false) - distHref, distTask := s.mockCreateDist(ctx, pubHref) + pubHref, _ := s.mockPublish(ctx, existingVersionHref, false) + distHref, _ := s.mockCreateDist(ctx, pubHref) - s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{ - SnapshotIdent: &snapshotId, - SyncTaskHref: &syncTask, - }).Return(&task, nil) - s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{ - SnapshotIdent: &snapshotId, - SyncTaskHref: &syncTask, - PublicationTaskHref: &pubTask, - }).Return(&task, nil) - s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{ - SnapshotIdent: &snapshotId, - SyncTaskHref: &syncTask, - PublicationTaskHref: &pubTask, - DistributionTaskHref: &distTask, - }).Return(&task, nil) + s.MockQueue.On("UpdatePayload", &task, mock.Anything).Return(&task, nil) // Lookup the version counts := zest.ContentSummaryResponse{ diff --git a/pkg/tasks/snapshot_helper.go b/pkg/tasks/snapshot_helper.go index 5f0d6d86b..1f1af88e7 100644 --- a/pkg/tasks/snapshot_helper.go +++ b/pkg/tasks/snapshot_helper.go @@ -2,6 +2,7 @@ package tasks import ( "context" + "errors" "fmt" "path/filepath" @@ -12,6 +13,7 @@ import ( "github.com/content-services/content-sources-backend/pkg/tasks/helpers" "github.com/google/uuid" "github.com/rs/zerolog" + "gorm.io/gorm" ) // ResumableSnapshotInterface used to store various references needed @@ -62,6 +64,10 @@ func (sh *SnapshotHelper) Run(versionHref string) error { if err != nil { return err } + err = sh.payload.SaveDistributionTaskHref(distHref) + if err != nil { + return fmt.Errorf("unable to save distribution task href: %w", err) + } latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID) @@ -96,33 +102,79 @@ func (sh *SnapshotHelper) Run(versionHref string) error { if err != nil { return err } + err = sh.payload.SaveSnapshotIdent(snap.UUID) + if err != nil { + return fmt.Errorf("unable to save snapshot ident: %w", err) + } + err = sh.payload.SaveDistributionTaskHref(snap.DistributionHref) + if err != nil { + return fmt.Errorf("unable to save distribution task href: %w", err) + } + return nil } func (sh *SnapshotHelper) Cleanup() error { - if sh.payload.GetDistributionTaskHref() != nil { - task, err := sh.pulpClient.CancelTask(sh.ctx, *sh.payload.GetDistributionTaskHref()) + if distHref := sh.payload.GetDistributionTaskHref(); distHref != nil { + deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *distHref) if err != nil { return err } - task, err = sh.pulpClient.GetTask(sh.ctx, *sh.payload.GetDistributionTaskHref()) + _, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref) if err != nil { return err } - versionHref := pulp_client.SelectRpmDistributionHref(&task) - if versionHref != nil { - _, err = sh.pulpClient.DeleteRpmDistribution(sh.ctx, *versionHref) - if err != nil { - return err - } + + err = sh.payload.SavePublicationTaskHref("") + if err != nil { + return err + } + err = sh.payload.SaveDistributionTaskHref("") + if err != nil { + return err } } + if sh.payload.GetSnapshotIdent() != nil { err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotIdent()) if err != nil { return err } } + err := sh.payload.SaveSnapshotIdent("") + if err != nil { + return err + } + + helper := helpers.NewPulpDistributionHelper(sh.ctx, sh.pulpClient) + latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID) + latestDistro, err := sh.pulpClient.FindDistributionByPath(sh.ctx, latestPathIdent) + if err != nil { + return err + } + if latestDistro != nil { + latestSnap, err := sh.daoReg.Snapshot.FetchLatestSnapshotModel(sh.ctx, sh.repo.UUID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *latestDistro.PulpHref) + if err != nil { + return err + } + _, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref) + if err != nil { + return err + } + return nil + } + return err + } + + _, _, err = helper.CreateOrUpdateDistribution(sh.repo.OrgID, latestSnap.PublicationHref, sh.repo.UUID, latestPathIdent) + if err != nil { + return err + } + } + return nil } From c0c97dc3e801cf376cbcbaaa1f6345bcf06c7cc4 Mon Sep 17 00:00:00 2001 From: Dominik Vagner Date: Thu, 23 Jan 2025 10:10:04 +0100 Subject: [PATCH 2/3] fix: cleanup, save snap uuid along ident --- pkg/dao/repository_configs_mock.go | 4 +--- pkg/tasks/add_uploads.go | 10 ++++++++++ pkg/tasks/payloads/repository_snapshot.go | 1 + pkg/tasks/repository_snapshot.go | 13 +++++++++++++ pkg/tasks/snapshot_helper.go | 17 ++++++++--------- 5 files changed, 33 insertions(+), 12 deletions(-) diff --git a/pkg/dao/repository_configs_mock.go b/pkg/dao/repository_configs_mock.go index e3302f27d..e2e11242a 100644 --- a/pkg/dao/repository_configs_mock.go +++ b/pkg/dao/repository_configs_mock.go @@ -6,10 +6,8 @@ import ( context "context" api "github.com/content-services/content-sources-backend/pkg/api" - - mock "github.com/stretchr/testify/mock" - models "github.com/content-services/content-sources-backend/pkg/models" + mock "github.com/stretchr/testify/mock" ) // MockRepositoryConfigDao is an autogenerated mock type for the RepositoryConfigDao type diff --git a/pkg/tasks/add_uploads.go b/pkg/tasks/add_uploads.go index 55af2411e..026e78aaf 100644 --- a/pkg/tasks/add_uploads.go +++ b/pkg/tasks/add_uploads.go @@ -31,6 +31,7 @@ type AddUploadsPayload struct { PublicationTaskHref *string DistributionTaskHref *string SnapshotIdent *string + SnapshotUUID *string } type AddUploads struct { @@ -301,6 +302,15 @@ func (ur *AddUploads) GetSnapshotIdent() *string { return ur.payload.SnapshotIdent } +func (ur *AddUploads) SaveSnapshotUUID(uuid string) error { + ur.payload.SnapshotUUID = &uuid + return ur.UpdatePayload() +} + +func (ur *AddUploads) GetSnapshotUUID() *string { + return ur.payload.SnapshotUUID +} + func (ur *AddUploads) ImportPackageData(versionHref string) error { pkgs, err := ur.pulpClient.ListVersionAllPackages(ur.ctx, versionHref) if err != nil { diff --git a/pkg/tasks/payloads/repository_snapshot.go b/pkg/tasks/payloads/repository_snapshot.go index 936b66be6..4394f6962 100644 --- a/pkg/tasks/payloads/repository_snapshot.go +++ b/pkg/tasks/payloads/repository_snapshot.go @@ -4,6 +4,7 @@ const Snapshot = "snapshot" type SnapshotPayload struct { SnapshotIdent *string + SnapshotUUID *string SyncTaskHref *string PublicationTaskHref *string DistributionTaskHref *string diff --git a/pkg/tasks/repository_snapshot.go b/pkg/tasks/repository_snapshot.go index e40b7e2f9..56849cf56 100644 --- a/pkg/tasks/repository_snapshot.go +++ b/pkg/tasks/repository_snapshot.go @@ -367,3 +367,16 @@ func (sr *SnapshotRepository) SaveSnapshotIdent(id string) error { func (sr *SnapshotRepository) GetSnapshotIdent() *string { return sr.payload.SnapshotIdent } + +func (sr *SnapshotRepository) SaveSnapshotUUID(uuid string) error { + if uuid == "" { + sr.payload.SnapshotUUID = nil + } else { + sr.payload.SnapshotUUID = &uuid + } + return sr.UpdatePayload() +} + +func (sr *SnapshotRepository) GetSnapshotUUID() *string { + return sr.payload.SnapshotUUID +} diff --git a/pkg/tasks/snapshot_helper.go b/pkg/tasks/snapshot_helper.go index 1f1af88e7..63c442943 100644 --- a/pkg/tasks/snapshot_helper.go +++ b/pkg/tasks/snapshot_helper.go @@ -28,6 +28,9 @@ type ResumableSnapshotInterface interface { SaveSnapshotIdent(id string) error GetSnapshotIdent() *string + + SaveSnapshotUUID(uuid string) error + GetSnapshotUUID() *string } // SnapshotHelper is meant to be used by another task, and be able to turn a repository Version into a @@ -102,13 +105,9 @@ func (sh *SnapshotHelper) Run(versionHref string) error { if err != nil { return err } - err = sh.payload.SaveSnapshotIdent(snap.UUID) - if err != nil { - return fmt.Errorf("unable to save snapshot ident: %w", err) - } - err = sh.payload.SaveDistributionTaskHref(snap.DistributionHref) + err = sh.payload.SaveSnapshotUUID(snap.UUID) if err != nil { - return fmt.Errorf("unable to save distribution task href: %w", err) + return fmt.Errorf("unable to save snapshot uuid: %w", err) } return nil @@ -135,13 +134,13 @@ func (sh *SnapshotHelper) Cleanup() error { } } - if sh.payload.GetSnapshotIdent() != nil { - err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotIdent()) + if sh.payload.GetSnapshotUUID() != nil { + err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotUUID()) if err != nil { return err } } - err := sh.payload.SaveSnapshotIdent("") + err := sh.payload.SaveSnapshotUUID("") if err != nil { return err } From 3fea0be8e559ee403c9b94c1c4bd0c267b64650d Mon Sep 17 00:00:00 2001 From: Dominik Vagner Date: Thu, 30 Jan 2025 09:32:46 +0100 Subject: [PATCH 3/3] fix: rebase fixes --- pkg/tasks/snapshot_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tasks/snapshot_helper.go b/pkg/tasks/snapshot_helper.go index 63c442943..92eb6b96b 100644 --- a/pkg/tasks/snapshot_helper.go +++ b/pkg/tasks/snapshot_helper.go @@ -168,7 +168,7 @@ func (sh *SnapshotHelper) Cleanup() error { return err } - _, _, err = helper.CreateOrUpdateDistribution(sh.repo.OrgID, latestSnap.PublicationHref, sh.repo.UUID, latestPathIdent) + _, err = helper.CreateOrUpdateDistribution(sh.repo, latestSnap.PublicationHref, sh.repo.UUID, latestPathIdent) if err != nil { return err }