From bb6685e19ebb9857dd235454f3bd7464238969bb Mon Sep 17 00:00:00 2001 From: Justin Sherrill Date: Wed, 25 Oct 2023 17:18:26 -0400 Subject: [PATCH 1/2] Fixes 2282: reduce number of introspect tasks --- cmd/external-repos/main.go | 34 +-- pkg/cache/cache_mock.go | 9 +- pkg/config/value_constraints.go | 4 + pkg/dao/interfaces.go | 2 +- pkg/dao/repositories.go | 22 +- pkg/dao/repositories_mock.go | 18 +- pkg/dao/repositories_test.go | 233 ++++++++++++++------- pkg/dao/repository_configs_mock.go | 13 +- pkg/dao/snapshots_mock.go | 11 +- pkg/external_repos/introspect.go | 137 +++--------- pkg/external_repos/introspect_test.go | 123 ----------- pkg/pulp_client/pulp_client_mock.go | 9 +- pkg/pulp_client/pulp_global_client_mock.go | 9 +- pkg/tasks/client/client_mock.go | 9 +- pkg/tasks/introspect.go | 11 +- pkg/tasks/queue/queue_mock.go | 10 +- 16 files changed, 290 insertions(+), 364 deletions(-) diff --git a/cmd/external-repos/main.go b/cmd/external-repos/main.go index 99def9c7a..bf4f762d8 100644 --- a/cmd/external-repos/main.go +++ b/cmd/external-repos/main.go @@ -22,10 +22,6 @@ import ( "gorm.io/gorm" ) -var ( - forceIntrospect bool = false -) - func main() { args := os.Args config.Load() @@ -60,6 +56,7 @@ func main() { os.Exit(1) } var urls []string + forceIntrospect := false for i := 2; i < len(args); i++ { if args[i] != "--force" { urls = append(urls, args[i]) @@ -67,15 +64,7 @@ func main() { forceIntrospect = true } } - - count, introErrors, errors := external_repos.IntrospectAll(context.Background(), &urls, forceIntrospect) - for i := 0; i < len(introErrors); i++ { - log.Warn().Msgf("Introspection Error: %v", introErrors[i].Error()) - } - for i := 0; i < len(errors); i++ { - log.Panic().Err(errors[i]).Msg("Failed to introspect repository due to fatal errors") - } - log.Debug().Msgf("Inserted %d packages", count) + introspectUrls(urls, forceIntrospect) } else if args[1] == "snapshot" { if len(args) < 3 { log.Error().Msg("Usage: ./external_repos snapshot URL [URL2]...") @@ -143,6 +132,23 @@ func waitForPulp() { } } +func introspectUrls(urls []string, force bool) { + repos, err := dao.GetDaoRegistry(db.DB).Repository.ListForIntrospection(&urls, force) + if err != nil { + log.Fatal().Err(err).Msg("Could not lookup repos to introspect") + } + for _, repo := range repos { + count, introError, error := external_repos.IntrospectUrl(context.Background(), repo.URL) + if introError != nil { + log.Warn().Msgf("Introspection Error: %v", introError) + } + if error != nil { + log.Panic().Err(error).Msg("Failed to introspect repository due to fatal errors") + } + log.Debug().Msgf("Inserted %d packages for %v", count, repo.URL) + } +} + func scanForExternalRepos(path string) { urls, err := external_repos.IBUrlsFromDir(path) if err != nil { @@ -173,7 +179,7 @@ func enqueueIntrospectAllRepos() error { log.Err(err).Msg("error during task cleanup") } - repos, err := repoDao.List(true) + repos, err := repoDao.ListForIntrospection(nil, false) if err != nil { return fmt.Errorf("error getting repositories: %w", err) } diff --git a/pkg/cache/cache_mock.go b/pkg/cache/cache_mock.go index f5dd3053c..f087963ac 100644 --- a/pkg/cache/cache_mock.go +++ b/pkg/cache/cache_mock.go @@ -92,12 +92,13 @@ func (_m *MockCache) SetPulpContentPath(ctx context.Context, pulpContentPath str return r0 } -// NewMockCache creates a new instance of MockCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockCache(t interface { +type mockConstructorTestingTNewMockCache interface { mock.TestingT Cleanup(func()) -}) *MockCache { +} + +// NewMockCache creates a new instance of MockCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockCache(t mockConstructorTestingTNewMockCache) *MockCache { mock := &MockCache{} mock.Mock.Test(t) diff --git a/pkg/config/value_constraints.go b/pkg/config/value_constraints.go index 81cde314d..59577a272 100644 --- a/pkg/config/value_constraints.go +++ b/pkg/config/value_constraints.go @@ -1,5 +1,7 @@ package config +import "time" + const ( StatusValid = "Valid" // Repository introspected successfully StatusUnavailable = "Unavailable" // Repository introspected at least once, but now errors @@ -18,6 +20,8 @@ const ( const RedHatOrg = "-1" +const IntrospectTimeInterval = time.Hour * 23 + const ANY_VERSION = "any" const El7 = "7" const El8 = "8" diff --git a/pkg/dao/interfaces.go b/pkg/dao/interfaces.go index 4bab4870e..90de3c92b 100644 --- a/pkg/dao/interfaces.go +++ b/pkg/dao/interfaces.go @@ -76,7 +76,7 @@ type RpmDao interface { //go:generate mockery --name RepositoryDao --filename repositories_mock.go --inpackage type RepositoryDao interface { FetchForUrl(url string) (Repository, error) - List(ignoreFailed bool) ([]Repository, error) + ListForIntrospection(urls *[]string, force bool) ([]Repository, error) ListPublic(paginationData api.PaginationData, _ api.FilterData) (api.PublicRepositoryCollectionResponse, int64, error) Update(repo RepositoryUpdate) error FetchRepositoryRPMCount(repoUUID string) (int, error) diff --git a/pkg/dao/repositories.go b/pkg/dao/repositories.go index 8f270a7f6..b8fe12bdb 100644 --- a/pkg/dao/repositories.go +++ b/pkg/dao/repositories.go @@ -72,17 +72,27 @@ func (p repositoryDaoImpl) FetchForUrl(url string) (Repository, error) { return internalRepo, nil } -func (p repositoryDaoImpl) List(ignoreFailed bool) ([]Repository, error) { +func (p repositoryDaoImpl) ListForIntrospection(urls *[]string, force bool) ([]Repository, error) { var dbRepos []models.Repository var repos []Repository var repo Repository - var result *gorm.DB - if ignoreFailed { - result = p.db.Where("public = true OR failed_introspections_count < ?", config.FailedIntrospectionsLimit+1).Find(&dbRepos) - } else { - result = p.db.Find(&dbRepos) + db := p.db + if !force && !config.Get().Options.AlwaysRunCronTasks { + introspectThreshold := time.Now().Add(config.IntrospectTimeInterval * -1) // Add a negative duration + db = db.Where( + db.Where("status != ?", config.StatusValid). + Or("last_introspection_time is NULL"). // It was never introspected + Or("last_introspection_time < ?", introspectThreshold)). // It was introspected more than the threshold ago) + Where( + db.Where("failed_introspections_count < ?", config.FailedIntrospectionsLimit). + Or("public = true")) + } + if urls != nil { + db = db.Where("url in ?", *urls) } + + result := db.Find(&dbRepos) if result.Error != nil { return repos, result.Error } diff --git a/pkg/dao/repositories_mock.go b/pkg/dao/repositories_mock.go index 33dafbf35..63af1a299 100644 --- a/pkg/dao/repositories_mock.go +++ b/pkg/dao/repositories_mock.go @@ -60,25 +60,25 @@ func (_m *MockRepositoryDao) FetchRepositoryRPMCount(repoUUID string) (int, erro return r0, r1 } -// List provides a mock function with given fields: ignoreFailed -func (_m *MockRepositoryDao) List(ignoreFailed bool) ([]Repository, error) { - ret := _m.Called(ignoreFailed) +// ListForIntrospection provides a mock function with given fields: urls, force +func (_m *MockRepositoryDao) ListForIntrospection(urls *[]string, force bool) ([]Repository, error) { + ret := _m.Called(urls, force) var r0 []Repository var r1 error - if rf, ok := ret.Get(0).(func(bool) ([]Repository, error)); ok { - return rf(ignoreFailed) + if rf, ok := ret.Get(0).(func(*[]string, bool) ([]Repository, error)); ok { + return rf(urls, force) } - if rf, ok := ret.Get(0).(func(bool) []Repository); ok { - r0 = rf(ignoreFailed) + if rf, ok := ret.Get(0).(func(*[]string, bool) []Repository); ok { + r0 = rf(urls, force) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]Repository) } } - if rf, ok := ret.Get(1).(func(bool) error); ok { - r1 = rf(ignoreFailed) + if rf, ok := ret.Get(1).(func(*[]string, bool) error); ok { + r1 = rf(urls, force) } else { r1 = ret.Error(1) } diff --git a/pkg/dao/repositories_test.go b/pkg/dao/repositories_test.go index 70082b865..46be59fb9 100644 --- a/pkg/dao/repositories_test.go +++ b/pkg/dao/repositories_test.go @@ -128,87 +128,6 @@ func (s *RepositorySuite) TestFetchForUrl() { }, repo) } -func (s *RepositorySuite) TestList() { - tx := s.tx - t := s.T() - - expected := Repository{ - UUID: s.repo.UUID, - URL: s.repo.URL, - Status: s.repo.Status, - LastIntrospectionTime: s.repo.LastIntrospectionTime, - LastIntrospectionUpdateTime: s.repo.LastIntrospectionUpdateTime, - LastIntrospectionSuccessTime: s.repo.LastIntrospectionSuccessTime, - LastIntrospectionError: s.repo.LastIntrospectionError, - PackageCount: s.repo.PackageCount, - FailedIntrospectionsCount: s.repo.FailedIntrospectionsCount, - Public: s.repo.Public, - } - - dao := GetRepositoryDao(tx) - repoList, err := dao.List(false) - assert.NoError(t, err) - assert.Contains(t, repoList, expected) -} - -func (s *RepositorySuite) TestListIgnoreFailed() { - tx := s.tx - t := s.T() - - expectedNotIgnored := Repository{ - UUID: uuid.NewString(), - URL: "https://example1.com", - Status: s.repo.Status, - LastIntrospectionTime: s.repo.LastIntrospectionTime, - LastIntrospectionUpdateTime: s.repo.LastIntrospectionUpdateTime, - LastIntrospectionSuccessTime: s.repo.LastIntrospectionSuccessTime, - LastIntrospectionError: s.repo.LastIntrospectionError, - PackageCount: s.repo.PackageCount, - FailedIntrospectionsCount: config.FailedIntrospectionsLimit, - Public: false, - } - - expectedNotIgnoredPublic := Repository{ - UUID: uuid.NewString(), - URL: "https://public.example.com", - Status: s.repo.Status, - LastIntrospectionTime: s.repo.LastIntrospectionTime, - LastIntrospectionUpdateTime: s.repo.LastIntrospectionUpdateTime, - LastIntrospectionSuccessTime: s.repo.LastIntrospectionSuccessTime, - LastIntrospectionError: s.repo.LastIntrospectionError, - PackageCount: s.repo.PackageCount, - FailedIntrospectionsCount: config.FailedIntrospectionsLimit + 1, - Public: true, - } - - expectedIgnored := Repository{ - UUID: uuid.NewString(), - URL: "https://example2.com", - Status: s.repo.Status, - LastIntrospectionTime: s.repo.LastIntrospectionTime, - LastIntrospectionUpdateTime: s.repo.LastIntrospectionUpdateTime, - LastIntrospectionSuccessTime: s.repo.LastIntrospectionSuccessTime, - LastIntrospectionError: s.repo.LastIntrospectionError, - PackageCount: s.repo.PackageCount, - FailedIntrospectionsCount: config.FailedIntrospectionsLimit + 1, - Public: false, - } - - err := tx.Create(expectedNotIgnored).Error - require.NoError(t, err) - err = tx.Create(expectedIgnored).Error - require.NoError(t, err) - err = tx.Create(expectedNotIgnoredPublic).Error - require.NoError(t, err) - - dao := GetRepositoryDao(tx) - repoList, err := dao.List(true) - assert.NoError(t, err) - assert.Contains(t, repoList, expectedNotIgnored) - assert.NotContains(t, repoList, expectedIgnored) - assert.Contains(t, repoList, expectedNotIgnoredPublic) -} - func (s *RepositorySuite) TestListPublic() { tx := s.tx t := s.T() @@ -467,3 +386,155 @@ func (s *RepositorySuite) TestFetchRpmCount() { assert.NoError(t, err) assert.Equal(t, expected, count) } + +func (s *RepositorySuite) TestListRepositoriesForIntrospection() { + type TestCaseExpected struct { + result bool + } + type TestCase struct { + given *Repository + expected TestCaseExpected + } + + var ( + thresholdBefore24 time.Time = time.Now().Add(-(config.IntrospectTimeInterval - 2*time.Hour)) // Subtract 22 hours to the current time + thresholdAfter24 time.Time = time.Now().Add(-(config.IntrospectTimeInterval + time.Hour)) // Subtract 25 hours to the current time + + testCases []TestCase = []TestCase{ + // BEGIN: Cover all the no valid status + + // When Status is not Valid + // it returns true + { + given: &Repository{ + Status: config.StatusInvalid, + }, + expected: TestCaseExpected{ + result: true, + }, + }, + { + given: &Repository{ + Status: config.StatusPending, + }, + expected: TestCaseExpected{ + result: true, + }, + }, + { + given: &Repository{ + Status: config.StatusUnavailable, + }, + expected: TestCaseExpected{ + result: true, + }, + }, + // END: Cover all the no valid status + + // When Status is Valid + // and LastIntrospectionTime is nill + // it returns true + { + given: &Repository{ + Status: config.StatusValid, + LastIntrospectionTime: nil, + }, + expected: TestCaseExpected{ + result: true, + }, + }, + // When Status is Valid + // and LastIntrospectionTime does not reach the threshold interval (24hours) + // it returns false indicating that no introspection is needed + { + given: &Repository{ + Status: config.StatusValid, + LastIntrospectionTime: &thresholdBefore24, + }, + expected: TestCaseExpected{ + result: false, + }, + }, + // When Status is Valid + // and LastIntrospectionTime does reach the threshold interval (24hours) + // it returns true indicating that an introspection is needed + { + given: &Repository{ + Status: config.StatusValid, + LastIntrospectionTime: &thresholdAfter24, + }, + expected: TestCaseExpected{ + result: true, + }, + }, + // Test around FailedIntrospectionsCount + { // doesn't exceed the count + given: &Repository{ + Status: config.StatusInvalid, + FailedIntrospectionsCount: config.FailedIntrospectionsLimit - 1, + Public: false, + }, + expected: TestCaseExpected{ + result: true, + }, + }, + { // Exceeds the count + given: &Repository{ + Status: config.StatusInvalid, + FailedIntrospectionsCount: config.FailedIntrospectionsLimit, + Public: false, + }, + expected: TestCaseExpected{ + result: false, + }, + }, + + { // Exceeds the count but is public + given: &Repository{ + Status: config.StatusInvalid, + FailedIntrospectionsCount: config.FailedIntrospectionsLimit, + Public: true, + }, + expected: TestCaseExpected{ + result: true, + }, + }, + } + ) + + for _, tCase := range testCases { + tCase.given.URL = "https://" + uuid.NewString() + "/" + tCase.given.UUID = uuid.NewString() + result := s.tx.Create(&tCase.given) + assert.NoError(s.T(), result.Error) + } + + dao := GetRepositoryDao(s.tx) + repos, err := dao.ListForIntrospection(nil, false) + assert.NoError(s.T(), err) + repoIncluded := func(expected *Repository) bool { + for _, repo := range repos { + if repo.UUID == expected.UUID { + return true + } + } + return false + } + for _, tCase := range testCases { + found := repoIncluded(tCase.given) + assert.Equal(s.T(), tCase.expected.result, found) + } + + // Force them all + repos, err = dao.ListForIntrospection(nil, true) + assert.NoError(s.T(), err) + for _, tCase := range testCases { + found := repoIncluded(tCase.given) + assert.True(s.T(), found) + } + + // Query a single one + repos, err = dao.ListForIntrospection(&[]string{repos[0].URL}, true) + assert.NoError(s.T(), err) + assert.Equal(s.T(), 1, len(repos)) +} diff --git a/pkg/dao/repository_configs_mock.go b/pkg/dao/repository_configs_mock.go index c18dec3f6..f0dad03ad 100644 --- a/pkg/dao/repository_configs_mock.go +++ b/pkg/dao/repository_configs_mock.go @@ -1,4 +1,8 @@ +<<<<<<< HEAD // Code generated by mockery v2.36.1. DO NOT EDIT. +======= +// Code generated by mockery v2.20.0. DO NOT EDIT. +>>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) package dao @@ -360,6 +364,7 @@ func (_m *MockRepositoryConfigDao) ValidateParameters(orgId string, params api.R return r0, r1 } +<<<<<<< HEAD // WithContext provides a mock function with given fields: ctx func (_m *MockRepositoryConfigDao) WithContext(ctx context.Context) RepositoryConfigDao { ret := _m.Called(ctx) @@ -379,9 +384,15 @@ func (_m *MockRepositoryConfigDao) WithContext(ctx context.Context) RepositoryCo // NewMockRepositoryConfigDao creates a new instance of MockRepositoryConfigDao. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockRepositoryConfigDao(t interface { +======= +type mockConstructorTestingTNewMockRepositoryConfigDao interface { +>>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) mock.TestingT Cleanup(func()) -}) *MockRepositoryConfigDao { +} + +// NewMockRepositoryConfigDao creates a new instance of MockRepositoryConfigDao. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRepositoryConfigDao(t mockConstructorTestingTNewMockRepositoryConfigDao) *MockRepositoryConfigDao { mock := &MockRepositoryConfigDao{} mock.Mock.Test(t) diff --git a/pkg/dao/snapshots_mock.go b/pkg/dao/snapshots_mock.go index 6e8a040be..e9a67a953 100644 --- a/pkg/dao/snapshots_mock.go +++ b/pkg/dao/snapshots_mock.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.0. DO NOT EDIT. +// Code generated by mockery v2.20.0. DO NOT EDIT. package dao @@ -174,6 +174,7 @@ func (_m *MockSnapshotDao) List(orgID string, repoConfigUuid string, paginationD return r0, r1, r2 } +<<<<<<< HEAD // WithContext provides a mock function with given fields: ctx func (_m *MockSnapshotDao) WithContext(ctx context.Context) SnapshotDao { ret := _m.Called(ctx) @@ -193,9 +194,15 @@ func (_m *MockSnapshotDao) WithContext(ctx context.Context) SnapshotDao { // NewMockSnapshotDao creates a new instance of MockSnapshotDao. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockSnapshotDao(t interface { +======= +type mockConstructorTestingTNewMockSnapshotDao interface { +>>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) mock.TestingT Cleanup(func()) -}) *MockSnapshotDao { +} + +// NewMockSnapshotDao creates a new instance of MockSnapshotDao. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockSnapshotDao(t mockConstructorTestingTNewMockSnapshotDao) *MockSnapshotDao { mock := &MockSnapshotDao{} mock.Mock.Test(t) diff --git a/pkg/external_repos/introspect.go b/pkg/external_repos/introspect.go index cc1f9ce99..2d5cdb3c1 100644 --- a/pkg/external_repos/introspect.go +++ b/pkg/external_repos/introspect.go @@ -21,20 +21,46 @@ import ( "github.com/content-services/yummy/pkg/yum" "github.com/openlyinc/pointy" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) const ( - RhCdnHost = "cdn.redhat.com" - IntrospectTimeInterval = time.Hour * 23 + RhCdnHost = "cdn.redhat.com" ) // IntrospectUrl Fetch the metadata of a url and insert RPM data // Returns the number of new RPMs inserted system-wide, any introspection errors, // and any fatal errors -func IntrospectUrl(ctx context.Context, url string, force bool) (int64, []error, []error) { - urls := []string{url} - return IntrospectAll(ctx, &urls, force) +func IntrospectUrl(ctx context.Context, url string) (int64, error, error) { + var ( + total int64 + count int64 + err error + dao = dao.GetDaoRegistry(db.DB) + introspectionError error + introspectFailedUuids []string + introspectSuccessUuids []string + updated bool + ) + + repo, err := dao.Repository.FetchForUrl(url) + if err != nil { + return total, introspectionError, err + } + count, err, updated = Introspect(ctx, &repo, dao) + + if err != nil { + introspectionError = fmt.Errorf("error introspecting %s: %s", repo.URL, err.Error()) + introspectFailedUuids = append(introspectFailedUuids, repo.UUID) + } else if updated { + introspectSuccessUuids = append(introspectSuccessUuids, repo.UUID) + } + + err = UpdateIntrospectionStatusMetadata(repo, dao, count, err) + + // Logic to handle notifications. This should really be moved to a daily report? + sendIntrospectionNotifications(introspectSuccessUuids, introspectFailedUuids, dao) + + return total, introspectionError, err } // IsRedHat returns if the url is a 'cdn.redhat.com' url @@ -57,10 +83,6 @@ func Introspect(ctx context.Context, repo *dao.Repository, dao *dao.DaoRegistry) logger.Debug().Msg("Introspecting " + repo.URL) - if repo.FailedIntrospectionsCount >= config.FailedIntrospectionsLimit && !repo.Public { - return 0, fmt.Errorf("introspection skipped because this repository has failed more than %v times in a row", config.FailedIntrospectionsLimit), false - } - if client, err = httpClient(IsRedHat(repo.URL)); err != nil { return 0, err, false } @@ -107,76 +129,6 @@ func Introspect(ctx context.Context, repo *dao.Repository, dao *dao.DaoRegistry) return total, nil, true } -func reposForIntrospection(urls *[]string, force bool) ([]dao.Repository, []error) { - repoDao := dao.GetRepositoryDao(db.DB) - ignoredFailed := !force // when forcing introspection, include repositories over FailedIntrospectionsLimit - if urls != nil { - var repos []dao.Repository - var errors []error - for i := 0; i < len(*urls); i++ { - repo, err := repoDao.FetchForUrl((*urls)[i]) - if err != nil { - errors = append(errors, err) - } else if ignoredFailed && repo.FailedIntrospectionsCount > config.FailedIntrospectionsLimit && !repo.Public { - continue - } else { - repos = append(repos, repo) - } - } - return repos, errors - } else { - repos, err := repoDao.List(ignoredFailed) - return repos, []error{err} - } -} - -// IntrospectAll introspects all repositories -// Returns the number of new RPMs inserted system-wide, all non-fatal introspection errors, -// and separately all other fatal errors -func IntrospectAll(ctx context.Context, urls *[]string, force bool) (int64, []error, []error) { - var ( - total int64 - count int64 - err error - dao = dao.GetDaoRegistry(db.DB) - introspectionErrors []error - introspectFailedUuids []string - introspectSuccessUuids []string - updated bool - ) - repos, errors := reposForIntrospection(urls, force) - for i := 0; i < len(repos); i++ { - if !force { - hasToIntrospect, reason := needsIntrospect(&repos[i]) - log.Info().Msg(reason) - if !hasToIntrospect { - continue - } - } else { - log.Info().Msgf("Forcing introspection for '%s'", repos[i].URL) - } - count, err, updated = Introspect(ctx, &repos[i], dao) - total += count - - if err != nil { - introspectionErrors = append(introspectionErrors, fmt.Errorf("Error introspecting %s: %s", repos[i].URL, err.Error())) - introspectFailedUuids = append(introspectFailedUuids, repos[i].UUID) - } else if updated { - introspectSuccessUuids = append(introspectSuccessUuids, repos[i].UUID) - } - - err = UpdateIntrospectionStatusMetadata(repos[i], dao, count, err) - if err != nil { - errors = append(errors, err) - } - } - - // Logic to handle notifications - sendIntrospectionNotifications(introspectSuccessUuids, introspectFailedUuids, dao) - - return total, introspectionErrors, errors -} - func sendIntrospectionNotifications(successUuids []string, failedUuids []string, dao *dao.DaoRegistry) { count := 0 wg := sync.WaitGroup{} @@ -231,31 +183,6 @@ func sendIntrospectionNotifications(successUuids []string, failedUuids []string, wg.Wait() } -func needsIntrospect(repo *dao.Repository) (bool, string) { - if repo == nil { - return false, "Cannot introspect nil Repository" - } - - if config.Get().Options.AlwaysRunCronTasks { - return true, "Introspection started: AlwaysRunCronTasks is true" - } - - if repo.Status != config.StatusValid { - return true, fmt.Sprintf("Introspection started: the Status field content differs from '%s' for Repository.UUID = %s", config.StatusValid, repo.UUID) - } - - if repo.LastIntrospectionTime == nil { - return true, fmt.Sprintf("Introspection started: not expected LastIntrospectionTime = nil for Repository.UUID = %s", repo.UUID) - } - - threshold := repo.LastIntrospectionTime.Add(IntrospectTimeInterval) - if threshold.After(time.Now()) { - return false, fmt.Sprintf("Introspection skipped: Last instrospection happened before the threshold for Repository.UUID = %s", repo.UUID) - } - - return true, fmt.Sprintf("Introspection started: last introspection happened after the threshold for Repository.UUID = %s", repo.UUID) -} - func httpClient(useCert bool) (http.Client, error) { timeout := 90 * time.Second if useCert { diff --git a/pkg/external_repos/introspect_test.go b/pkg/external_repos/introspect_test.go index 562d685f1..a64aad9e3 100644 --- a/pkg/external_repos/introspect_test.go +++ b/pkg/external_repos/introspect_test.go @@ -130,20 +130,6 @@ func TestIntrospect(t *testing.T) { assert.Equal(t, int64(0), count) assert.Equal(t, false, updated) assert.Equal(t, 14, expected.PackageCount) - - // If the repository has failed more than FailedIntrospectionsLimit number of times in a row, it should not introspect - _, err, updated = Introspect( - context.Background(), - &dao.Repository{ - UUID: repoUUID, - URL: server.URL + "/content", - RepomdChecksum: templateRepoMdXmlSum, - PackageCount: 14, - FailedIntrospectionsCount: config.FailedIntrospectionsLimit + 1, - }, - mockDao.ToDaoRegistry()) - assert.Error(t, err) - assert.Equal(t, false, updated) } func TestHttpClient(t *testing.T) { @@ -346,112 +332,3 @@ func TestUpdateIntrospectionStatusMetadata(t *testing.T) { assert.Equal(t, testCase.expected.LastIntrospectionUpdateTime, result.LastIntrospectionUpdateTime) } } - -func TestNeedIntrospect(t *testing.T) { - type TestCaseExpected struct { - result bool - reason string - } - type TestCase struct { - given *dao.Repository - expected TestCaseExpected - } - - var ( - thresholdBefore24 time.Time = time.Now().Add(-(IntrospectTimeInterval - 2*time.Hour)) // Subtract 22 hours to the current time - thresholdAfter24 time.Time = time.Now().Add(-(IntrospectTimeInterval + time.Hour)) // Subtract 25 hours to the current time - result bool - reason string - testCases []TestCase = []TestCase{ - // When repo is nil - // it returns false - { - given: nil, - expected: TestCaseExpected{ - result: false, - reason: "Cannot introspect nil Repository", - }, - }, - - // BEGIN: Cover all the no valid status - - // When Status is not Valid - // it returns true - { - given: &dao.Repository{ - Status: config.StatusInvalid, - }, - expected: TestCaseExpected{ - result: true, - reason: fmt.Sprintf("Introspection started: the Status field content differs from '%s' for Repository.UUID = %s", config.StatusValid, ""), - }, - }, - { - given: &dao.Repository{ - Status: config.StatusPending, - }, - expected: TestCaseExpected{ - result: true, - reason: fmt.Sprintf("Introspection started: the Status field content differs from '%s' for Repository.UUID = %s", config.StatusValid, ""), - }, - }, - { - given: &dao.Repository{ - Status: config.StatusUnavailable, - }, - expected: TestCaseExpected{ - result: true, - reason: fmt.Sprintf("Introspection started: the Status field content differs from '%s' for Repository.UUID = %s", config.StatusValid, ""), - }, - }, - // END: Cover all the no valid status - - // When Status is Valid - // and LastIntrospectionTime is nill - // it returns true - { - given: &dao.Repository{ - Status: config.StatusValid, - LastIntrospectionTime: nil, - }, - expected: TestCaseExpected{ - result: true, - reason: "Introspection started: not expected LastIntrospectionTime = nil for Repository.UUID = ", - }, - }, - // When Status is Valid - // and LastIntrospectionTime does not reach the threshold interval (24hours) - // it returns false indicating that no introspection is needed - { - given: &dao.Repository{ - Status: config.StatusValid, - LastIntrospectionTime: &thresholdBefore24, - }, - expected: TestCaseExpected{ - result: false, - reason: "Introspection skipped: Last instrospection happened before the threshold for Repository.UUID = ", - }, - }, - // When Status is Valid - // and LastIntrospectionTime does reach the threshold interval (24hours) - // it returns true indicating that an introspection is needed - { - given: &dao.Repository{ - Status: config.StatusValid, - LastIntrospectionTime: &thresholdAfter24, - }, - expected: TestCaseExpected{ - result: true, - reason: "Introspection started: last introspection happened after the threshold for Repository.UUID = ", - }, - }, - } - ) - - // Run all the test cases - for _, testCase := range testCases { - result, reason = needsIntrospect(testCase.given) - assert.Equal(t, testCase.expected.result, result) - assert.Equal(t, testCase.expected.reason, reason) - } -} diff --git a/pkg/pulp_client/pulp_client_mock.go b/pkg/pulp_client/pulp_client_mock.go index c90f36029..2ffcbd224 100644 --- a/pkg/pulp_client/pulp_client_mock.go +++ b/pkg/pulp_client/pulp_client_mock.go @@ -678,6 +678,7 @@ func (_m *MockPulpClient) UpdateRpmRemote(pulpHref string, url string, clientCer return r0, r1 } +<<<<<<< HEAD // WithContext provides a mock function with given fields: ctx func (_m *MockPulpClient) WithContext(ctx context.Context) PulpClient { ret := _m.Called(ctx) @@ -713,9 +714,15 @@ func (_m *MockPulpClient) WithDomain(domainName string) PulpClient { // NewMockPulpClient creates a new instance of MockPulpClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockPulpClient(t interface { +======= +type mockConstructorTestingTNewMockPulpClient interface { +>>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) mock.TestingT Cleanup(func()) -}) *MockPulpClient { +} + +// NewMockPulpClient creates a new instance of MockPulpClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockPulpClient(t mockConstructorTestingTNewMockPulpClient) *MockPulpClient { mock := &MockPulpClient{} mock.Mock.Test(t) diff --git a/pkg/pulp_client/pulp_global_client_mock.go b/pkg/pulp_client/pulp_global_client_mock.go index 3d12412f5..9973da709 100644 --- a/pkg/pulp_client/pulp_global_client_mock.go +++ b/pkg/pulp_client/pulp_global_client_mock.go @@ -172,12 +172,13 @@ func (_m *MockPulpGlobalClient) UpdateDomainIfNeeded(name string) error { return r0 } -// NewMockPulpGlobalClient creates a new instance of MockPulpGlobalClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockPulpGlobalClient(t interface { +type mockConstructorTestingTNewMockPulpGlobalClient interface { mock.TestingT Cleanup(func()) -}) *MockPulpGlobalClient { +} + +// NewMockPulpGlobalClient creates a new instance of MockPulpGlobalClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockPulpGlobalClient(t mockConstructorTestingTNewMockPulpGlobalClient) *MockPulpGlobalClient { mock := &MockPulpGlobalClient{} mock.Mock.Test(t) diff --git a/pkg/tasks/client/client_mock.go b/pkg/tasks/client/client_mock.go index 48fc493d5..0725601f1 100644 --- a/pkg/tasks/client/client_mock.go +++ b/pkg/tasks/client/client_mock.go @@ -56,12 +56,13 @@ func (_m *MockTaskClient) SendCancelNotification(ctx context.Context, taskId str return r0 } -// NewMockTaskClient creates a new instance of MockTaskClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockTaskClient(t interface { +type mockConstructorTestingTNewMockTaskClient interface { mock.TestingT Cleanup(func()) -}) *MockTaskClient { +} + +// NewMockTaskClient creates a new instance of MockTaskClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockTaskClient(t mockConstructorTestingTNewMockTaskClient) *MockTaskClient { mock := &MockTaskClient{} mock.Mock.Test(t) diff --git a/pkg/tasks/introspect.go b/pkg/tasks/introspect.go index a0dec68e2..451de5d34 100644 --- a/pkg/tasks/introspect.go +++ b/pkg/tasks/introspect.go @@ -28,14 +28,15 @@ func IntrospectHandler(ctx context.Context, task *models.TaskInfo, q *queue.Queu if err := validate.Var(p.Url, "required"); err != nil { return err } - newRpms, nonFatalErrs, errs := external_repos.IntrospectUrl(logger.WithContext(context.Background()), p.Url, p.Force) - for i := 0; i < len(nonFatalErrs); i++ { - logger.Warn().Err(nonFatalErrs[i]).Msgf("Error %v introspecting repository %v", i, p.Url) + + newRpms, nonFatalErr, err := external_repos.IntrospectUrl(logger.WithContext(context.Background()), p.Url) + if nonFatalErr != nil { + logger.Warn().Err(nonFatalErr).Msgf("Error introspecting repository %v", p.Url) } // Introspection failure isn't considered a message failure, as the message has been handled - for i := 0; i < len(errs); i++ { - logger.Error().Err(errs[i]).Msgf("Error %v introspecting repository %v", i, p.Url) + if err != nil { + logger.Error().Err(err).Msgf("Error introspecting repository %v", p.Url) } logger.Debug().Msgf("IntrospectionUrl returned %d new packages", newRpms) diff --git a/pkg/tasks/queue/queue_mock.go b/pkg/tasks/queue/queue_mock.go index d8832cc7c..a7ddd35be 100644 --- a/pkg/tasks/queue/queue_mock.go +++ b/pkg/tasks/queue/queue_mock.go @@ -1,5 +1,6 @@ // Code generated by mockery v2.33.0. DO NOT EDIT. + package queue import ( @@ -232,12 +233,13 @@ func (_m *MockQueue) UpdatePayload(task *models.TaskInfo, payload interface{}) ( return r0, r1 } -// NewMockQueue creates a new instance of MockQueue. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockQueue(t interface { +type mockConstructorTestingTNewMockQueue interface { mock.TestingT Cleanup(func()) -}) *MockQueue { +} + +// NewMockQueue creates a new instance of MockQueue. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockQueue(t mockConstructorTestingTNewMockQueue) *MockQueue { mock := &MockQueue{} mock.Mock.Test(t) From ec5cb425c1ca0baf47220fbe15ef9d3a38bdb826 Mon Sep 17 00:00:00 2001 From: Justin Sherrill Date: Mon, 27 Nov 2023 14:30:06 -0500 Subject: [PATCH 2/2] more stuff --- pkg/dao/repositories.go | 4 +- pkg/dao/repositories_test.go | 43 +++++++++++---------- pkg/dao/repository_configs_mock.go | 10 ----- pkg/dao/snapshots_mock.go | 6 --- pkg/external_repos/introspect.go | 5 +++ pkg/handler/repositories.go | 5 +++ pkg/handler/repositories_test.go | 30 +++++++++++++++ pkg/pulp_client/pulp_client_mock.go | 6 --- pkg/tasks/introspect.go | 58 ++++++++++++++++++----------- pkg/tasks/queue/queue_mock.go | 1 - 10 files changed, 99 insertions(+), 69 deletions(-) diff --git a/pkg/dao/repositories.go b/pkg/dao/repositories.go index b8fe12bdb..eed64a896 100644 --- a/pkg/dao/repositories.go +++ b/pkg/dao/repositories.go @@ -84,8 +84,8 @@ func (p repositoryDaoImpl) ListForIntrospection(urls *[]string, force bool) ([]R db.Where("status != ?", config.StatusValid). Or("last_introspection_time is NULL"). // It was never introspected Or("last_introspection_time < ?", introspectThreshold)). // It was introspected more than the threshold ago) - Where( - db.Where("failed_introspections_count < ?", config.FailedIntrospectionsLimit). + Where( // It is over the introspection limit and has failed once due to being over the limit, so that last_introspection_error says 'over the limit of failed' + db.Where("failed_introspections_count < ?", config.FailedIntrospectionsLimit+1). Or("public = true")) } if urls != nil { diff --git a/pkg/dao/repositories_test.go b/pkg/dao/repositories_test.go index 46be59fb9..1133148c4 100644 --- a/pkg/dao/repositories_test.go +++ b/pkg/dao/repositories_test.go @@ -1,6 +1,7 @@ package dao import ( + "fmt" "log" "os" "strings" @@ -392,8 +393,9 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { result bool } type TestCase struct { - given *Repository - expected TestCaseExpected + description string + given *Repository + expected TestCaseExpected } var ( @@ -402,10 +404,8 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { testCases []TestCase = []TestCase{ // BEGIN: Cover all the no valid status - - // When Status is not Valid - // it returns true { + description: "When Status is not Valid it returns true", given: &Repository{ Status: config.StatusInvalid, }, @@ -414,6 +414,7 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { }, }, { + description: "Test pending", given: &Repository{ Status: config.StatusPending, }, @@ -422,6 +423,7 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { }, }, { + description: "Test unavail", given: &Repository{ Status: config.StatusUnavailable, }, @@ -431,10 +433,8 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { }, // END: Cover all the no valid status - // When Status is Valid - // and LastIntrospectionTime is nill - // it returns true { + description: "When Status is Valid and LastIntrospectionTime is nil it returns true", given: &Repository{ Status: config.StatusValid, LastIntrospectionTime: nil, @@ -443,10 +443,8 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { result: true, }, }, - // When Status is Valid - // and LastIntrospectionTime does not reach the threshold interval (24hours) - // it returns false indicating that no introspection is needed { + description: "When Status is Valid and LastIntrospectionTime does not reach the threshold interval (24hours) it returns false indicating that no introspection is needed", given: &Repository{ Status: config.StatusValid, LastIntrospectionTime: &thresholdBefore24, @@ -455,10 +453,8 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { result: false, }, }, - // When Status is Valid - // and LastIntrospectionTime does reach the threshold interval (24hours) - // it returns true indicating that an introspection is needed { + description: "When Status is Valid and LastIntrospectionTime does reach the threshold interval (24hours) it returns true indicating that an introspection is needed", given: &Repository{ Status: config.StatusValid, LastIntrospectionTime: &thresholdAfter24, @@ -467,21 +463,23 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { result: true, }, }, - // Test around FailedIntrospectionsCount - { // doesn't exceed the count + + { + description: "Test around FailedIntrospectionsCount doesn't exceed the count", given: &Repository{ Status: config.StatusInvalid, - FailedIntrospectionsCount: config.FailedIntrospectionsLimit - 1, + FailedIntrospectionsCount: config.FailedIntrospectionsLimit, Public: false, }, expected: TestCaseExpected{ result: true, }, }, - { // Exceeds the count + { + description: "Exceeds the count", given: &Repository{ Status: config.StatusInvalid, - FailedIntrospectionsCount: config.FailedIntrospectionsLimit, + FailedIntrospectionsCount: config.FailedIntrospectionsLimit + 1, Public: false, }, expected: TestCaseExpected{ @@ -489,7 +487,8 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { }, }, - { // Exceeds the count but is public + { + description: "Exceeds the count but is public", given: &Repository{ Status: config.StatusInvalid, FailedIntrospectionsCount: config.FailedIntrospectionsLimit, @@ -522,7 +521,7 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { } for _, tCase := range testCases { found := repoIncluded(tCase.given) - assert.Equal(s.T(), tCase.expected.result, found) + assert.Equal(s.T(), tCase.expected.result, found, tCase.description) } // Force them all @@ -530,7 +529,7 @@ func (s *RepositorySuite) TestListRepositoriesForIntrospection() { assert.NoError(s.T(), err) for _, tCase := range testCases { found := repoIncluded(tCase.given) - assert.True(s.T(), found) + assert.True(s.T(), found, fmt.Sprintf("Forced: %v", tCase.description)) } // Query a single one diff --git a/pkg/dao/repository_configs_mock.go b/pkg/dao/repository_configs_mock.go index f0dad03ad..829a4e8b9 100644 --- a/pkg/dao/repository_configs_mock.go +++ b/pkg/dao/repository_configs_mock.go @@ -1,8 +1,4 @@ -<<<<<<< HEAD // Code generated by mockery v2.36.1. DO NOT EDIT. -======= -// Code generated by mockery v2.20.0. DO NOT EDIT. ->>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) package dao @@ -364,7 +360,6 @@ func (_m *MockRepositoryConfigDao) ValidateParameters(orgId string, params api.R return r0, r1 } -<<<<<<< HEAD // WithContext provides a mock function with given fields: ctx func (_m *MockRepositoryConfigDao) WithContext(ctx context.Context) RepositoryConfigDao { ret := _m.Called(ctx) @@ -381,12 +376,7 @@ func (_m *MockRepositoryConfigDao) WithContext(ctx context.Context) RepositoryCo return r0 } -// NewMockRepositoryConfigDao creates a new instance of MockRepositoryConfigDao. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockRepositoryConfigDao(t interface { -======= type mockConstructorTestingTNewMockRepositoryConfigDao interface { ->>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) mock.TestingT Cleanup(func()) } diff --git a/pkg/dao/snapshots_mock.go b/pkg/dao/snapshots_mock.go index e9a67a953..905b9e1d1 100644 --- a/pkg/dao/snapshots_mock.go +++ b/pkg/dao/snapshots_mock.go @@ -174,7 +174,6 @@ func (_m *MockSnapshotDao) List(orgID string, repoConfigUuid string, paginationD return r0, r1, r2 } -<<<<<<< HEAD // WithContext provides a mock function with given fields: ctx func (_m *MockSnapshotDao) WithContext(ctx context.Context) SnapshotDao { ret := _m.Called(ctx) @@ -191,12 +190,7 @@ func (_m *MockSnapshotDao) WithContext(ctx context.Context) SnapshotDao { return r0 } -// NewMockSnapshotDao creates a new instance of MockSnapshotDao. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockSnapshotDao(t interface { -======= type mockConstructorTestingTNewMockSnapshotDao interface { ->>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) mock.TestingT Cleanup(func()) } diff --git a/pkg/external_repos/introspect.go b/pkg/external_repos/introspect.go index 2d5cdb3c1..d106282e9 100644 --- a/pkg/external_repos/introspect.go +++ b/pkg/external_repos/introspect.go @@ -81,6 +81,11 @@ func Introspect(ctx context.Context, repo *dao.Repository, dao *dao.DaoRegistry) ) logger := zerolog.Ctx(ctx) + if repo.FailedIntrospectionsCount == config.FailedIntrospectionsLimit { + logger.Debug().Msgf("introspection count reached for %v", repo.URL) + return 0, fmt.Errorf("introspection skipped because this repository has failed more than %v times in a row", config.FailedIntrospectionsLimit), false + } + logger.Debug().Msg("Introspecting " + repo.URL) if client, err = httpClient(IsRedHat(repo.URL)); err != nil { diff --git a/pkg/handler/repositories.go b/pkg/handler/repositories.go index db538eb66..d898793dd 100644 --- a/pkg/handler/repositories.go +++ b/pkg/handler/repositories.go @@ -492,6 +492,11 @@ func (rh *RepositoryHandler) introspect(c echo.Context) error { } } + if repo.FailedIntrospectionsCount >= config.FailedIntrospectionsLimit+1 && !req.ResetCount { + return ce.NewErrorResponse(http.StatusBadRequest, "Too many failed introspections", + fmt.Sprintf("This repository has failed introspecting %v times.", repo.FailedIntrospectionsCount)) + } + var repoUpdate dao.RepositoryUpdate count := 0 status := "Pending" diff --git a/pkg/handler/repositories_test.go b/pkg/handler/repositories_test.go index ae79e75c8..4a7fe1901 100644 --- a/pkg/handler/repositories_test.go +++ b/pkg/handler/repositories_test.go @@ -982,6 +982,36 @@ func (suite *ReposSuite) TestIntrospectRepository() { assert.Equal(t, http.StatusNoContent, code) } +func (suite *ReposSuite) TestIntrospectRepositoryFailedLimit() { + t := suite.T() + intReq := api.RepositoryIntrospectRequest{} + repo := dao.Repository{UUID: "12345", FailedIntrospectionsCount: 21} + repoResp := api.RepositoryResponse{ + Name: "my repo", + URL: "https://example.com", + UUID: "someuuid", + } + + // Fetch will filter the request by Org ID before updating + suite.reg.Repository.On("FetchForUrl", repoResp.URL).Return(repo, nil).NotBefore( + suite.reg.RepositoryConfig.WithContextMock().On("Fetch", test_handler.MockOrgId, repoResp.UUID).Return(repoResp, nil), + ) + + body, err := json.Marshal(intReq) + if err != nil { + t.Error("Could not marshal JSON") + } + + req := httptest.NewRequest(http.MethodPost, api.FullRootPath()+"/repositories/"+repoResp.UUID+"/introspect/?reset_count=true", + bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(api.IdentityHeader, test_handler.EncodedIdentity(t)) + + code, _, err := suite.serveRepositoriesRouter(req) + assert.Nil(t, err) + assert.Equal(t, http.StatusBadRequest, code) +} + func (suite *ReposSuite) TestIntrospectRepositoryBeforeTimeLimit() { t := suite.T() diff --git a/pkg/pulp_client/pulp_client_mock.go b/pkg/pulp_client/pulp_client_mock.go index 2ffcbd224..417da9c98 100644 --- a/pkg/pulp_client/pulp_client_mock.go +++ b/pkg/pulp_client/pulp_client_mock.go @@ -678,7 +678,6 @@ func (_m *MockPulpClient) UpdateRpmRemote(pulpHref string, url string, clientCer return r0, r1 } -<<<<<<< HEAD // WithContext provides a mock function with given fields: ctx func (_m *MockPulpClient) WithContext(ctx context.Context) PulpClient { ret := _m.Called(ctx) @@ -711,12 +710,7 @@ func (_m *MockPulpClient) WithDomain(domainName string) PulpClient { return r0 } -// NewMockPulpClient creates a new instance of MockPulpClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockPulpClient(t interface { -======= type mockConstructorTestingTNewMockPulpClient interface { ->>>>>>> cbe09e1 (Fixes 2282: reduce number of introspect tasks) mock.TestingT Cleanup(func()) } diff --git a/pkg/tasks/introspect.go b/pkg/tasks/introspect.go index 451de5d34..317f001c9 100644 --- a/pkg/tasks/introspect.go +++ b/pkg/tasks/introspect.go @@ -5,11 +5,12 @@ import ( "encoding/json" "fmt" + "github.com/content-services/content-sources-backend/pkg/dao" + "github.com/content-services/content-sources-backend/pkg/db" "github.com/content-services/content-sources-backend/pkg/external_repos" "github.com/content-services/content-sources-backend/pkg/models" "github.com/content-services/content-sources-backend/pkg/tasks/payloads" "github.com/content-services/content-sources-backend/pkg/tasks/queue" - "github.com/go-playground/validator/v10" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -17,35 +18,48 @@ import ( func IntrospectHandler(ctx context.Context, task *models.TaskInfo, q *queue.Queue) error { var p payloads.IntrospectPayload - logger := LogForTask(task.Id.String(), task.Typename, task.RequestID) - if err := json.Unmarshal(task.Payload, &p); err != nil { - return fmt.Errorf("payload incorrect type for IntrospectHandler") + return fmt.Errorf("payload incorrect type for IntrospectHandler: %w", err) } - // https://github.com/go-playground/validator - // FIXME Wrong usage of validator library - validate := validator.New() - if err := validate.Var(p.Url, "required"); err != nil { - return err + intro := IntrospectionTask{ + URL: p.Url, + daoReg: dao.GetDaoRegistry(db.DB), + ctx: ctx, + logger: LogForTask(task.Id.String(), task.Typename, task.RequestID), } + return intro.Run() +} - newRpms, nonFatalErr, err := external_repos.IntrospectUrl(logger.WithContext(context.Background()), p.Url) - if nonFatalErr != nil { - logger.Warn().Err(nonFatalErr).Msgf("Error introspecting repository %v", p.Url) - } +type IntrospectionTask struct { + URL string + daoReg *dao.DaoRegistry + ctx context.Context + logger *zerolog.Logger +} - // Introspection failure isn't considered a message failure, as the message has been handled +func (i *IntrospectionTask) Run() error { + logger := i.logger + repo, err := i.daoReg.Repository.FetchForUrl(i.URL) if err != nil { - logger.Error().Err(err).Msgf("Error introspecting repository %v", p.Url) + return fmt.Errorf("error loading repository during introspection %w", err) } - logger.Debug().Msgf("IntrospectionUrl returned %d new packages", newRpms) - - select { - case <-ctx.Done(): - return queue.ErrTaskCanceled - default: - return nil + newRpms, nonFatalErr, err := external_repos.IntrospectUrl(i.logger.WithContext(i.ctx), i.URL) + if err != nil { + logger.Error().Err(err).Msgf("Fatal error introspecting repository %v", i.URL) + return err } + if nonFatalErr != nil { + msg := fmt.Sprintf("Error introspecting repository %v", i.URL) + if repo.Public { + logger.Error().Err(nonFatalErr).Msg(msg) + } else { + logger.Info().Err(nonFatalErr).Msg(msg) + } + return nonFatalErr + } + + logger.Debug().Msgf("IntrospectionUrl returned %d new packages", newRpms) + return nil } func LogForTask(taskID, typename, requestID string) *zerolog.Logger { diff --git a/pkg/tasks/queue/queue_mock.go b/pkg/tasks/queue/queue_mock.go index a7ddd35be..628260d2f 100644 --- a/pkg/tasks/queue/queue_mock.go +++ b/pkg/tasks/queue/queue_mock.go @@ -1,6 +1,5 @@ // Code generated by mockery v2.33.0. DO NOT EDIT. - package queue import (