Skip to content

Commit

Permalink
Add command to snapshot a rhel repo
Browse files Browse the repository at this point in the history
  • Loading branch information
jlsherrill committed Nov 7, 2023
1 parent bd3b872 commit 2d37d7b
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 21 deletions.
47 changes: 44 additions & 3 deletions cmd/external-repos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ import (
"fmt"
"os"
"sort"
"time"

"github.com/content-services/content-sources-backend/pkg/config"
"github.com/content-services/content-sources-backend/pkg/dao"
"github.com/content-services/content-sources-backend/pkg/db"
"github.com/content-services/content-sources-backend/pkg/external_repos"
"github.com/content-services/content-sources-backend/pkg/pulp_client"
"github.com/content-services/content-sources-backend/pkg/tasks/client"
"github.com/content-services/content-sources-backend/pkg/tasks/payloads"
"github.com/content-services/content-sources-backend/pkg/tasks/queue"
_ "github.com/golang-migrate/migrate/v4/source/file"
_ "github.com/lib/pq"
"github.com/openlyinc/pointy"
"github.com/rs/zerolog/log"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -73,13 +76,31 @@ func main() {
log.Panic().Err(errors[i]).Msg("Failed to introspect repository due to fatal errors")
}
log.Debug().Msgf("Inserted %d packages", count)
} else if args[1] == "snapshot" {
if len(args) < 3 {
log.Error().Msg("Usage: ./external_repos sync URL [URL2]...")
os.Exit(1)
}
var urls []string
for i := 2; i < len(args); i++ {
urls = append(urls, args[i])
}
if config.Get().Features.Snapshots.Enabled {
waitForPulp()
err := enqueueSyncRepos(&urls)
if err != nil {
log.Warn().Msgf("Error enqueuing snapshot tasks: %v", err)
}
} else {
log.Warn().Msg("Snapshotting disabled")
}
} else if args[1] == "nightly-jobs" {
err = enqueueIntrospectAllRepos()
if err != nil {
log.Error().Err(err).Msg("error queueing introspection tasks")
}
if config.Get().Features.Snapshots.Enabled {
err = enqueueSyncAllRepos()
err = enqueueSyncRepos(nil)
if err != nil {
log.Error().Err(err).Msg("error queueing snapshot tasks")
}
Expand Down Expand Up @@ -110,6 +131,18 @@ func saveToDB(db *gorm.DB) error {
return err
}

func waitForPulp() {
for {
client := pulp_client.GetPulpClientWithDomain(context.Background(), pulp_client.DefaultDomain)
_, err := client.GetRpmRemoteList()
if err == nil {
return
}
log.Warn().Err(err).Msg("Pulp isn't up yet, waiting 5s.")
time.Sleep(5 * time.Second)
}
}

func scanForExternalRepos(path string) {
urls, err := external_repos.IBUrlsFromDir(path)
if err != nil {
Expand Down Expand Up @@ -161,15 +194,23 @@ func enqueueIntrospectAllRepos() error {
return nil
}

func enqueueSyncAllRepos() error {
func enqueueSyncRepos(urls *[]string) error {
q, err := queue.NewPgQueue(db.GetUrl())
if err != nil {
return fmt.Errorf("error getting new task queue: %w", err)
}
c := client.NewTaskClient(&q)

repoConfigDao := dao.GetRepositoryConfigDao(db.DB)
repoConfigs, err := repoConfigDao.InternalOnly_ListReposToSnapshot()
var filter *dao.ListRepoFilter
if urls != nil {
filter = &dao.ListRepoFilter{
URLs: urls,
RedhatOnly: pointy.Pointer(true),
}
}
repoConfigs, err := repoConfigDao.InternalOnly_ListReposToSnapshot(filter)

if err != nil {
return fmt.Errorf("error getting repository configurations: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions deployments/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ objects:
- introspect
- https://cdn.redhat.com/content/dist/layered/rhel8/x86_64/ansible/2/os
- https://cdn.redhat.com/content/dist/rhel8/8.8/x86_64/baseos/os
- name: snapshot-single-repo
inheritEnv: true
args:
- /external-repos
- snapshot
- https://cdn.redhat.com/content/dist/layered/rhel8/x86_64/ansible/2/os/
image: ${IMAGE}:${IMAGE_TAG}
livenessProbe:
failureThreshold: 3
Expand Down
2 changes: 1 addition & 1 deletion pkg/dao/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type RepositoryConfigDao interface {
BulkCreate(newRepositories []api.RepositoryRequest) ([]api.RepositoryResponse, []error)
Update(orgID, uuid string, repoParams api.RepositoryRequest) (bool, error)
Fetch(orgID string, uuid string) (api.RepositoryResponse, error)
InternalOnly_ListReposToSnapshot() ([]models.RepositoryConfiguration, error)
InternalOnly_ListReposToSnapshot(filter *ListRepoFilter) ([]models.RepositoryConfiguration, error)
List(orgID string, paginationData api.PaginationData, filterData api.FilterData) (api.RepositoryCollectionResponse, int64, error)
Delete(orgID string, uuid string) error
SoftDelete(orgID string, uuid string) error
Expand Down
26 changes: 20 additions & 6 deletions pkg/dao/repository_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,33 @@ func (r repositoryConfigDaoImpl) bulkCreate(tx *gorm.DB, newRepositories []api.R
return []api.RepositoryResponse{}, errorList
}

func (p repositoryConfigDaoImpl) InternalOnly_ListReposToSnapshot() ([]models.RepositoryConfiguration, error) {
type ListRepoFilter struct {
URLs *[]string
RedhatOnly *bool
}

func (p repositoryConfigDaoImpl) InternalOnly_ListReposToSnapshot(filter *ListRepoFilter) ([]models.RepositoryConfiguration, error) {
var dbRepos []models.RepositoryConfiguration
var result *gorm.DB
var query *gorm.DB
interval := fmt.Sprintf("%v hours", config.SnapshotInterval)
if config.Get().Options.AlwaysRunCronTasks {
result = p.db.Where("snapshot IS TRUE").Find(&dbRepos)
query = p.db.Where("snapshot IS TRUE")
} else {
result = p.db.Where("snapshot IS TRUE").Joins("LEFT JOIN tasks on last_snapshot_task_uuid = tasks.id").
query = p.db.Where("snapshot IS TRUE").Joins("LEFT JOIN tasks on last_snapshot_task_uuid = tasks.id").
Where(p.db.Where("tasks.queued_at <= (current_date - cast(? as interval))", interval).
Or("tasks.status NOT IN ?", []string{config.TaskStatusCompleted, config.TaskStatusPending, config.TaskStatusRunning}).
Or("last_snapshot_task_uuid is NULL")).
Find(&dbRepos)
Or("last_snapshot_task_uuid is NULL"))
}
if filter != nil {
query = query.Joins("INNER JOIN repositories r on r.uuid = repository_configurations.repository_uuid")
if filter.RedhatOnly != nil && *filter.RedhatOnly {
query = query.Where("r.origin = ?", config.OriginRedHat)
}
if filter.URLs != nil {
query = query.Where("r.url in ?", *filter.URLs)
}
}
result := query.Find(&dbRepos)

if result.Error != nil {
return dbRepos, result.Error
Expand Down
20 changes: 10 additions & 10 deletions pkg/dao/repository_configs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion pkg/dao/repository_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,7 @@ type RepoToSnapshotTest struct {
Opts *seeds.TaskSeedOptions
Included bool
OptionAlwaysRunCronTasks bool
Filter *ListRepoFilter
}

func (suite *RepositoryConfigSuite) TestListReposToSnapshot() {
Expand Down Expand Up @@ -1734,6 +1735,18 @@ func (suite *RepositoryConfigSuite) TestListReposToSnapshot() {
Opts: &seeds.TaskSeedOptions{RepoConfigUUID: repo.UUID, OrgID: repo.OrgID, Status: config.TaskStatusFailed},
Included: true,
},
{
Name: "Previous Snapshot Failed, and url specified",
Opts: &seeds.TaskSeedOptions{RepoConfigUUID: repo.UUID, OrgID: repo.OrgID, Status: config.TaskStatusFailed},
Included: true,
Filter: &ListRepoFilter{URLs: &[]string{repo.URL}},
},
{
Name: "Previous Snapshot Failed, and url specified",
Opts: &seeds.TaskSeedOptions{RepoConfigUUID: repo.UUID, OrgID: repo.OrgID, Status: config.TaskStatusFailed},
Included: false,
Filter: &ListRepoFilter{RedhatOnly: pointy.Pointer(true)},
},
{
Name: "Previous Snapshot was successful and recent",
Opts: &seeds.TaskSeedOptions{RepoConfigUUID: repo.UUID, OrgID: repo.OrgID, Status: config.TaskStatusCompleted},
Expand Down Expand Up @@ -1763,7 +1776,7 @@ func (suite *RepositoryConfigSuite) TestListReposToSnapshot() {

config.Get().Options.AlwaysRunCronTasks = testCase.OptionAlwaysRunCronTasks

afterRepos, err := dao.InternalOnly_ListReposToSnapshot()
afterRepos, err := dao.InternalOnly_ListReposToSnapshot(testCase.Filter)
assert.NoError(t, err)
for i := range afterRepos {
if repo.UUID == afterRepos[i].UUID {
Expand Down

0 comments on commit 2d37d7b

Please sign in to comment.