diff --git a/pkg/dao/repository_configs_mock.go b/pkg/dao/repository_configs_mock.go index e3302f27d..e2e11242a 100644 --- a/pkg/dao/repository_configs_mock.go +++ b/pkg/dao/repository_configs_mock.go @@ -6,10 +6,8 @@ import ( context "context" api "github.com/content-services/content-sources-backend/pkg/api" - - mock "github.com/stretchr/testify/mock" - models "github.com/content-services/content-sources-backend/pkg/models" + mock "github.com/stretchr/testify/mock" ) // MockRepositoryConfigDao is an autogenerated mock type for the RepositoryConfigDao type diff --git a/pkg/tasks/add_uploads.go b/pkg/tasks/add_uploads.go index 55af2411e..026e78aaf 100644 --- a/pkg/tasks/add_uploads.go +++ b/pkg/tasks/add_uploads.go @@ -31,6 +31,7 @@ type AddUploadsPayload struct { PublicationTaskHref *string DistributionTaskHref *string SnapshotIdent *string + SnapshotUUID *string } type AddUploads struct { @@ -301,6 +302,15 @@ func (ur *AddUploads) GetSnapshotIdent() *string { return ur.payload.SnapshotIdent } +func (ur *AddUploads) SaveSnapshotUUID(uuid string) error { + ur.payload.SnapshotUUID = &uuid + return ur.UpdatePayload() +} + +func (ur *AddUploads) GetSnapshotUUID() *string { + return ur.payload.SnapshotUUID +} + func (ur *AddUploads) ImportPackageData(versionHref string) error { pkgs, err := ur.pulpClient.ListVersionAllPackages(ur.ctx, versionHref) if err != nil { diff --git a/pkg/tasks/payloads/repository_snapshot.go b/pkg/tasks/payloads/repository_snapshot.go index 936b66be6..4394f6962 100644 --- a/pkg/tasks/payloads/repository_snapshot.go +++ b/pkg/tasks/payloads/repository_snapshot.go @@ -4,6 +4,7 @@ const Snapshot = "snapshot" type SnapshotPayload struct { SnapshotIdent *string + SnapshotUUID *string SyncTaskHref *string PublicationTaskHref *string DistributionTaskHref *string diff --git a/pkg/tasks/repository_snapshot.go b/pkg/tasks/repository_snapshot.go index 2263c3fae..56849cf56 100644 --- a/pkg/tasks/repository_snapshot.go +++ b/pkg/tasks/repository_snapshot.go @@ -94,14 +94,19 @@ func (sr *SnapshotRepository) Run() (err error) { defer func() { if errors.Is(err, context.Canceled) { - cleanupErr := sr.cleanupOnCancel() + cleanupErr := helper.Cleanup() + if cleanupErr != nil { + sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper") + } + cleanupErr = sr.cleanupOnCancel() if cleanupErr != nil { sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot") } - cleanupErr = helper.Cleanup() + cleanupErr = sr.UpdatePayload() if cleanupErr != nil { - sr.logger.Err(cleanupErr).Msg("error cleaning up canceled snapshot helper") + sr.logger.Err(cleanupErr).Msg("error cleaning up payload") } + sr.logger.Warn().Msg("task was cancelled") } }() @@ -252,11 +257,13 @@ func (sr *SnapshotRepository) cleanupOnCancel() error { if err != nil { return err } + sr.payload.SyncTaskHref = nil if sr.payload.PublicationTaskHref != nil { _, err := pulpClient.CancelTask(ctxWithLogger, *sr.payload.PublicationTaskHref) if err != nil { return err } + sr.payload.PublicationTaskHref = nil } versionHref := pulp_client.SelectVersionHref(&task) if versionHref != nil { @@ -323,7 +330,11 @@ func (sr *SnapshotRepository) GetOrphanedLatestVersion(repoConfigUUID string) (* } func (sr *SnapshotRepository) SavePublicationTaskHref(href string) error { - sr.payload.PublicationTaskHref = &href + if href == "" { + sr.payload.PublicationTaskHref = nil + } else { + sr.payload.PublicationTaskHref = &href + } return sr.UpdatePayload() } @@ -332,7 +343,11 @@ func (sr *SnapshotRepository) GetPublicationTaskHref() *string { } func (sr *SnapshotRepository) SaveDistributionTaskHref(href string) error { - sr.payload.DistributionTaskHref = &href + if href == "" { + sr.payload.DistributionTaskHref = nil + } else { + sr.payload.DistributionTaskHref = &href + } return sr.UpdatePayload() } @@ -341,10 +356,27 @@ func (sr *SnapshotRepository) GetDistributionTaskHref() *string { } func (sr *SnapshotRepository) SaveSnapshotIdent(id string) error { - sr.payload.SnapshotIdent = &id + if id == "" { + sr.payload.SnapshotIdent = nil + } else { + sr.payload.SnapshotIdent = &id + } return sr.UpdatePayload() } func (sr *SnapshotRepository) GetSnapshotIdent() *string { return sr.payload.SnapshotIdent } + +func (sr *SnapshotRepository) SaveSnapshotUUID(uuid string) error { + if uuid == "" { + sr.payload.SnapshotUUID = nil + } else { + sr.payload.SnapshotUUID = &uuid + } + return sr.UpdatePayload() +} + +func (sr *SnapshotRepository) GetSnapshotUUID() *string { + return sr.payload.SnapshotUUID +} diff --git a/pkg/tasks/repository_snapshot_test.go b/pkg/tasks/repository_snapshot_test.go index bbeecdac6..45aa2aa54 100644 --- a/pkg/tasks/repository_snapshot_test.go +++ b/pkg/tasks/repository_snapshot_test.go @@ -204,7 +204,7 @@ func (s *SnapshotSuite) TestSnapshotResyncWithOrphanVersion() { s.MockPulpClient.On("UpdateDomainIfNeeded", ctx, domainName).Return(nil) s.MockPulpClient.On("SyncRpmRepository", ctx, *(repoResp.PulpHref), &remoteHref).Return(taskHref, nil) - _, syncTask := s.mockSync(ctx, taskHref, false) + _, _ = s.mockSync(ctx, taskHref, false) task := models.TaskInfo{ Id: uuid.UUID{}, @@ -212,24 +212,10 @@ func (s *SnapshotSuite) TestSnapshotResyncWithOrphanVersion() { ObjectUUID: repoUuid, ObjectType: utils.Ptr(config.ObjectTypeRepository)} - pubHref, pubTask := s.mockPublish(ctx, existingVersionHref, false) - distHref, distTask := s.mockCreateDist(ctx, pubHref) + pubHref, _ := s.mockPublish(ctx, existingVersionHref, false) + distHref, _ := s.mockCreateDist(ctx, pubHref) - s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{ - SnapshotIdent: &snapshotId, - SyncTaskHref: &syncTask, - }).Return(&task, nil) - s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{ - SnapshotIdent: &snapshotId, - SyncTaskHref: &syncTask, - PublicationTaskHref: &pubTask, - }).Return(&task, nil) - s.MockQueue.On("UpdatePayload", &task, payloads.SnapshotPayload{ - SnapshotIdent: &snapshotId, - SyncTaskHref: &syncTask, - PublicationTaskHref: &pubTask, - DistributionTaskHref: &distTask, - }).Return(&task, nil) + s.MockQueue.On("UpdatePayload", &task, mock.Anything).Return(&task, nil) // Lookup the version counts := zest.ContentSummaryResponse{ diff --git a/pkg/tasks/snapshot_helper.go b/pkg/tasks/snapshot_helper.go index 5f0d6d86b..92eb6b96b 100644 --- a/pkg/tasks/snapshot_helper.go +++ b/pkg/tasks/snapshot_helper.go @@ -2,6 +2,7 @@ package tasks import ( "context" + "errors" "fmt" "path/filepath" @@ -12,6 +13,7 @@ import ( "github.com/content-services/content-sources-backend/pkg/tasks/helpers" "github.com/google/uuid" "github.com/rs/zerolog" + "gorm.io/gorm" ) // ResumableSnapshotInterface used to store various references needed @@ -26,6 +28,9 @@ type ResumableSnapshotInterface interface { SaveSnapshotIdent(id string) error GetSnapshotIdent() *string + + SaveSnapshotUUID(uuid string) error + GetSnapshotUUID() *string } // SnapshotHelper is meant to be used by another task, and be able to turn a repository Version into a @@ -62,6 +67,10 @@ func (sh *SnapshotHelper) Run(versionHref string) error { if err != nil { return err } + err = sh.payload.SaveDistributionTaskHref(distHref) + if err != nil { + return fmt.Errorf("unable to save distribution task href: %w", err) + } latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID) @@ -96,33 +105,75 @@ func (sh *SnapshotHelper) Run(versionHref string) error { if err != nil { return err } + err = sh.payload.SaveSnapshotUUID(snap.UUID) + if err != nil { + return fmt.Errorf("unable to save snapshot uuid: %w", err) + } + return nil } func (sh *SnapshotHelper) Cleanup() error { - if sh.payload.GetDistributionTaskHref() != nil { - task, err := sh.pulpClient.CancelTask(sh.ctx, *sh.payload.GetDistributionTaskHref()) + if distHref := sh.payload.GetDistributionTaskHref(); distHref != nil { + deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *distHref) if err != nil { return err } - task, err = sh.pulpClient.GetTask(sh.ctx, *sh.payload.GetDistributionTaskHref()) + _, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref) if err != nil { return err } - versionHref := pulp_client.SelectRpmDistributionHref(&task) - if versionHref != nil { - _, err = sh.pulpClient.DeleteRpmDistribution(sh.ctx, *versionHref) - if err != nil { - return err - } + + err = sh.payload.SavePublicationTaskHref("") + if err != nil { + return err + } + err = sh.payload.SaveDistributionTaskHref("") + if err != nil { + return err + } + } + + if sh.payload.GetSnapshotUUID() != nil { + err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotUUID()) + if err != nil { + return err } } - if sh.payload.GetSnapshotIdent() != nil { - err := sh.daoReg.Snapshot.Delete(sh.ctx, *sh.payload.GetSnapshotIdent()) + err := sh.payload.SaveSnapshotUUID("") + if err != nil { + return err + } + + helper := helpers.NewPulpDistributionHelper(sh.ctx, sh.pulpClient) + latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID) + latestDistro, err := sh.pulpClient.FindDistributionByPath(sh.ctx, latestPathIdent) + if err != nil { + return err + } + if latestDistro != nil { + latestSnap, err := sh.daoReg.Snapshot.FetchLatestSnapshotModel(sh.ctx, sh.repo.UUID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + deleteDistributionHref, err := sh.pulpClient.DeleteRpmDistribution(sh.ctx, *latestDistro.PulpHref) + if err != nil { + return err + } + _, err = sh.pulpClient.PollTask(sh.ctx, deleteDistributionHref) + if err != nil { + return err + } + return nil + } + return err + } + + _, err = helper.CreateOrUpdateDistribution(sh.repo, latestSnap.PublicationHref, sh.repo.UUID, latestPathIdent) if err != nil { return err } } + return nil }