Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a new job to reconcile forced deleted canaries linked to CRD #2255

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import (
ctrlMetrics "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var webhookPort int
var k8sLogLevel int
var enableLeaderElection bool
var operatorExecutor bool
var Operator = &cobra.Command{
Use: "operator",
Short: "Start the kubernetes operator",
Run: run,
}
var (
webhookPort int
k8sLogLevel int
enableLeaderElection bool
Operator = &cobra.Command{
Use: "operator",
Short: "Start the kubernetes operator",
Run: run,
}
)

func init() {
ServerFlags(Operator.Flags())
Operator.Flags().StringVarP(&runner.WatchNamespace, "namespace", "n", "", "Watch only specified namespace, otherwise watch all")
Operator.Flags().BoolVar(&operatorExecutor, "executor", true, "If false, only serve the UI and sync the configs")
Operator.Flags().BoolVar(&runner.OperatorExecutor, "executor", true, "If false, only serve the UI and sync the configs")
Operator.Flags().IntVar(&webhookPort, "webhookPort", 8082, "Port for webhooks ")
Operator.Flags().IntVar(&k8sLogLevel, "k8s-log-level", -1, "Kubernetes controller log level")
Operator.Flags().BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enabling this will ensure there is only one active controller manager")
Expand Down Expand Up @@ -77,14 +78,15 @@ func run(cmd *cobra.Command, args []string) {

cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext)

if operatorExecutor {
if runner.OperatorExecutor {
logger.Infof("Starting executors")

// Some synchronous jobs can take time
// so we use a goroutine to unblock server start
// to prevent health check from failing
go jobs.Start()
}

go serve()

ctrl.SetLogger(logr.FromSlogHandler(logger.Handler()))
Expand Down
8 changes: 5 additions & 3 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
"github.com/spf13/cobra"
)

var schedule, configFile string
var executor bool
var propertiesFile = "canary-checker.properties"
var (
schedule, configFile string
executor bool
propertiesFile = "canary-checker.properties"
)

var Serve = &cobra.Command{
Use: "serve config.yaml",
Expand Down
56 changes: 55 additions & 1 deletion pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package canary

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -18,10 +19,13 @@ import (
dutyEcho "github.com/flanksource/duty/echo"
dutyjob "github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel/attribute"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -194,6 +198,56 @@ func logIfError(err error, description string) {
}
}

var CleanupCRDDeleteCanaries = &dutyjob.Job{
Name: "CleanupCRDDeletedCanaries",
Schedule: "@every 1h",
moshloop marked this conversation as resolved.
Show resolved Hide resolved
RunNow: true,
Singleton: true,
JobHistory: true,
Retention: dutyjob.RetentionBalanced,
Fn: func(ctx dutyjob.JobRuntime) error {
var crdCanaries []models.Canary
if err := ctx.DB().Select("id", "name", "namespace").
Where("deleted_at IS NULL").
Where("agent_id = ?", uuid.Nil.String()).
Where("source LIKE 'kubernetes/%'").Find(&crdCanaries).Error; err != nil {
return fmt.Errorf("failed to list all canaries with source=CRD: %w", err)
}

if len(crdCanaries) == 0 {
return nil
}

canaryClient, err := ctx.KubernetesDynamicClient().GetClientByGroupVersionKind(v1.GroupVersion.Group, v1.GroupVersion.Version, "Canary")
if err != nil {
return fmt.Errorf("failed to get kubernetes client for canaries: %w", err)
}

for _, canary := range crdCanaries {
if _, err := canaryClient.Namespace(canary.Namespace).Get(ctx, canary.Name, metav1.GetOptions{}); err != nil {
var statusErr *apierrors.StatusError
if errors.As(err, &statusErr) {
if statusErr.ErrStatus.Reason == metav1.StatusReasonNotFound {
if err := db.DeleteCanary(ctx.Context, canary.ID.String()); err != nil {
ctx.History.AddErrorf("error deleting canary[%s]: %v", canary.ID, err)
} else {
ctx.History.IncrSuccess()
}

Unschedule(canary.ID.String())

continue
}
}

return fmt.Errorf("failed to delete canary %s/%s from kubernetes: %w", canary.Namespace, canary.Name, err)
}
}

return nil
},
}

var CleanupDeletedCanaryChecks = &dutyjob.Job{
Name: "CleanupDeletedCanaryChecks",
Schedule: "@every 1h",
Expand All @@ -217,7 +271,7 @@ var CleanupDeletedCanaryChecks = &dutyjob.Job{

for _, r := range rows {
if err := db.DeleteCanary(ctx.Context, r.ID); err != nil {
ctx.History.AddError(fmt.Sprintf("Error deleting components for topology[%s]: %v", r.ID, err))
ctx.History.AddErrorf("error deleting canary[%s]: %v", r.ID, err)
} else {
ctx.History.IncrSuccess()
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/flanksource/canary-checker/pkg/db"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
topologyJobs "github.com/flanksource/canary-checker/pkg/jobs/topology"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/canary-checker/pkg/topology"
"github.com/flanksource/commons/logger"
dutyEcho "github.com/flanksource/duty/echo"
Expand All @@ -19,9 +20,10 @@ var FuncScheduler = cron.New()
func Start() {
logger.Infof("Starting jobs ...")
dutyEcho.RegisterCron(FuncScheduler)

if canaryJobs.UpstreamConf.Valid() {
for _, j := range canaryJobs.UpstreamJobs {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
Expand All @@ -30,23 +32,31 @@ func Start() {
}

for _, j := range db.CheckStatusJobs {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
}
}

for _, j := range topology.Jobs {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
}
}

for _, j := range []*job.Job{topologyJobs.CleanupDeletedTopologyComponents, topologyJobs.SyncTopology, canaryJobs.SyncCanaryJobs, canaryJobs.CleanupDeletedCanaryChecks, dutyQuery.SyncComponentCacheJob} {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
}
}

if runner.OperatorExecutor {
job := canaryJobs.CleanupCRDDeleteCanaries
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
Expand Down
3 changes: 3 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// OperatorExecutor when true means the application is serving as the k8s operator
var OperatorExecutor bool

var RunnerName string

var Version string
Expand Down
Loading