Skip to content

Commit

Permalink
Merge pull request #1169 from flanksource/add-canary-id-to-job-history
Browse files Browse the repository at this point in the history
fix: add canary_id to job_history and cleanup sync canary function
  • Loading branch information
moshloop authored Jul 24, 2023
2 parents 5ab2298 + 5ef05f6 commit ac227af
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 48 deletions.
6 changes: 6 additions & 0 deletions pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ func (c Canary) ToV1() (*v1.Canary, error) {
return &canary, nil
}

func (c Canary) GetSpec() (v1.CanarySpec, error) {
var spec v1.CanarySpec
err := json.Unmarshal(c.Spec, &spec)
return spec, err
}

func CanaryFromV1(canary v1.Canary) (Canary, error) {
spec, err := json.Marshal(canary.Spec)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
return ctrl.Result{}, r.Update(ctx, canary)
}

_, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+string(canary.ObjectMeta.UID))
dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+string(canary.ObjectMeta.UID))
if err != nil {
return ctrl.Result{Requeue: true}, err
}

// Sync jobs if canary is created or updated
if canary.Generation == 1 || changed {
if err := canaryJobs.SyncCanaryJob(*canary); err != nil {
if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
logger.Error(err, "failed to sync canary job")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
Expand Down
58 changes: 16 additions & 42 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,13 @@ func ScanCanaryConfigs() {
var canaryUpdateTimeCache = sync.Map{}

// TODO: Refactor to use database object instead of kubernetes
func SyncCanaryJob(canary v1.Canary) error {
if !canary.DeletionTimestamp.IsZero() || canary.Spec.GetSchedule() == "@never" {
func SyncCanaryJob(dbCanary pkg.Canary) error {
canary, err := dbCanary.ToV1()
if err != nil {
return err
}

if canary.Spec.GetSchedule() == "@never" {
DeleteCanaryJob(canary.GetPersistedID())
return nil
}
Expand All @@ -290,14 +295,10 @@ func SyncCanaryJob(canary v1.Canary) error {
}
}

dbCanary, err := db.GetCanary(canary.GetPersistedID())
if err != nil {
return err
}
job := CanaryJob{
Client: Kommons,
Kubernetes: Kubernetes,
Canary: canary,
Canary: *canary,
DBCanary: dbCanary,
LogPass: canary.IsTrace() || canary.IsDebug() || LogPass,
LogFail: canary.IsTrace() || canary.IsDebug() || LogFail,
Expand Down Expand Up @@ -333,52 +334,25 @@ func SyncCanaryJob(canary v1.Canary) error {
func SyncCanaryJobs() {
logger.Debugf("Syncing canary jobs")

jobHistory := models.NewJobHistory("CanarySync", "canary", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

canaries, err := db.GetAllCanariesForSync()
if err != nil {
logger.Errorf("Failed to get canaries: %v", err)
jobHistory.AddError(err.Error())
return
}

for _, c := range canaries {
canary, err := c.ToV1()
if err != nil {
logger.Errorf("Error parsing canary[%s]: %v", c.ID, err)
jobHistory.AddError(err.Error())
continue
}

if len(canary.Status.Checks) == 0 && len(canary.Spec.GetAllChecks()) > 0 {
logger.Infof("Persisting %s as it has no checks", canary.Name)
pkgCanary, _, _, err := db.PersistCanary(*canary, canary.Annotations["source"])
if err != nil {
logger.Errorf("Failed to persist canary %s: %v", canary.Name, err)
jobHistory.AddError(err.Error())
continue
}

v1canary, err := pkgCanary.ToV1()
if err != nil {
logger.Errorf("Failed to convert canary to V1 %s: %v", canary.Name, err)
jobHistory.AddError(err.Error())
continue
}
jobHistory := models.NewJobHistory("CanarySync", "canary", c.ID.String()).Start()
_ = db.PersistJobHistory(jobHistory)

if err := SyncCanaryJob(*v1canary); err != nil {
logger.Errorf(err.Error())
jobHistory.AddError(err.Error())
}
} else if err := SyncCanaryJob(*canary); err != nil {
logger.Errorf(err.Error())
jobHistory.AddError(err.Error())
if err := SyncCanaryJob(c); err != nil {
logger.Errorf("Error syncing canary[%s]: %v", c.ID, err.Error())
_ = db.PersistJobHistory(jobHistory.AddError(err.Error()).End())
continue
}
jobHistory.IncrSuccess()
_ = db.PersistJobHistory(jobHistory.End())
}

jobHistory.IncrSuccess()
logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries()))
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/topology/checks/component_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ func GetCheckComponentRelationshipsForComponent(component *pkg.Component) (relat
logger.Debugf("error creating canary from inline: %v", err)
}

if v1canary, err := canary.ToV1(); err == nil {
if err := canaryJobs.SyncCanaryJob(*v1canary); err != nil {
logger.Debugf("error creating canary job: %v", err)
}
if err := canaryJobs.SyncCanaryJob(*canary); err != nil {
logger.Debugf("error creating canary job: %v", err)
}

inlineChecks, err := db.GetAllChecksForCanary(canary.ID)
if err != nil {
logger.Debugf("error getting checks for canary: %s. err: %v", canary.ID, err)
Expand Down

0 comments on commit ac227af

Please sign in to comment.