diff --git a/api/context/context.go b/api/context/context.go index 46700dce0..16afffbd2 100644 --- a/api/context/context.go +++ b/api/context/context.go @@ -11,6 +11,7 @@ import ( "github.com/flanksource/commons/logger" ctemplate "github.com/flanksource/commons/template" "github.com/flanksource/duty" + dutyCtx "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" "github.com/flanksource/duty/types" "github.com/flanksource/kommons" @@ -19,6 +20,8 @@ import ( "k8s.io/client-go/kubernetes" ) +var DefaultContext dutyCtx.Context + type KubernetesContext struct { gocontext.Context Kommons *kommons.Client diff --git a/api/v1/canary_types.go b/api/v1/canary_types.go index 7528739ba..adca8d8d3 100644 --- a/api/v1/canary_types.go +++ b/api/v1/canary_types.go @@ -25,6 +25,7 @@ import ( "github.com/flanksource/canary-checker/api/external" "github.com/flanksource/commons/logger" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) type ResultMode string @@ -234,6 +235,10 @@ func (c Canary) GetDescription(check external.Check) string { return check.GetEndpoint() } +func (c Canary) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Name: c.Name, Namespace: c.Namespace} +} + func (c *Canary) SetRunnerName(name string) { c.Status.runnerName = name } diff --git a/cmd/operator.go b/cmd/operator.go index 6c9c473c8..3c47638b8 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -1,9 +1,11 @@ package cmd import ( + gocontext "context" "os" "time" + apicontext "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs" @@ -16,9 +18,12 @@ import ( "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/controllers" "github.com/flanksource/canary-checker/pkg/labels" + commonsCtx "github.com/flanksource/commons/context" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/context" "github.com/go-logr/zapr" "github.com/spf13/cobra" + "go.opentelemetry.io/otel" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" @@ -73,6 +78,17 @@ func run(cmd *cobra.Command, args []string) { if err := db.Init(); err != nil { logger.Fatalf("error connecting with postgres: %v", err) } + kommonsClient, k8s, err := pkg.NewKommonsClient() + if err != nil { + logger.Warnf("failed to get kommons client, checks that read kubernetes configs will fail: %v", err) + } + + apicontext.DefaultContext = context.NewContext(gocontext.Background(), commonsCtx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))). + WithDB(db.Gorm, db.Pool). + WithKubernetes(k8s). + WithKommons(kommonsClient). + WithNamespace(runner.WatchNamespace) + cache.PostgresCache = cache.NewPostgresCache(db.Pool) if operatorExecutor { logger.Infof("Starting executors") diff --git a/cmd/root.go b/cmd/root.go index 4805b3b1a..93596971e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -8,6 +8,7 @@ import ( "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/runner" + "github.com/flanksource/canary-checker/pkg/telemetry" "github.com/flanksource/commons/logger" gomplate "github.com/flanksource/gomplate/v3" "github.com/spf13/cobra" @@ -31,16 +32,25 @@ var Root = &cobra.Command{ if canary.UpstreamConf.Valid() { logger.Infof("Pushing checks to %s with name=%s user=%s", canary.UpstreamConf.Host, canary.UpstreamConf.AgentName, canary.UpstreamConf.Username) } + + if otelcollectorURL != "" { + telemetry.InitTracer(otelServiceName, otelcollectorURL, true) + } }, } -var httpPort = 8080 -var publicEndpoint = "http://localhost:8080" -var prometheusURL string -var pushServers, pullServers []string -var sharedLibrary []string -var exposeEnv bool -var logPass, logFail bool +var ( + httpPort = 8080 + publicEndpoint = "http://localhost:8080" + prometheusURL string + pushServers, pullServers []string + sharedLibrary []string + exposeEnv bool + logPass, logFail bool + + otelcollectorURL string + otelServiceName string +) func ServerFlags(flags *pflag.FlagSet) { flags.IntVar(&httpPort, "httpPort", httpPort, "Port to expose a health dashboard ") @@ -76,6 +86,9 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringVar(&canary.UpstreamConf.Password, "upstream-password", os.Getenv("UPSTREAM_PASSWORD"), "upstream password") flags.StringVar(&canary.UpstreamConf.AgentName, "agent-name", os.Getenv("UPSTREAM_NAME"), "name of this agent") flags.BoolVar(&canary.UpstreamConf.InsecureSkipVerify, "upstream-insecure-skip-verify", os.Getenv("UPSTREAM_INSECURE_SKIP_VERIFY") == "true", "Skip TLS verification on the upstream servers certificate") + + flags.StringVar(&otelcollectorURL, "otel-collector-url", "", "OpenTelemetry gRPC Collector URL in host:port format") + flags.StringVar(&otelServiceName, "otel-service-name", "canary-checker", "OpenTelemetry service name for the resource") } func readFromEnv(v string) string { diff --git a/go.mod b/go.mod index 2b24625a5..9c8e74e1f 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 go.mongodb.org/mongo-driver v1.12.1 + go.opentelemetry.io/otel v1.19.0 golang.org/x/crypto v0.14.0 golang.org/x/net v0.17.0 golang.org/x/sync v0.4.0 @@ -111,6 +112,7 @@ require ( github.com/aws/smithy-go v1.14.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -160,6 +162,7 @@ require ( github.com/gosimple/slug v1.13.1 // indirect github.com/gosimple/unidecode v1.0.1 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf // indirect github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -231,9 +234,12 @@ require ( github.com/yuin/gopher-lua v1.1.0 // indirect github.com/zclconf/go-cty v1.14.1 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect + go.opentelemetry.io/otel/sdk v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.starlark.net v0.0.0-20230925163745-10651d5192ab // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect diff --git a/go.sum b/go.sum index ff9a2e3aa..3458e0866 100644 --- a/go.sum +++ b/go.sum @@ -737,6 +737,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY= github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= @@ -1108,9 +1110,12 @@ github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6 github.com/gosimple/unidecode v1.0.1/go.mod h1:CP0Cr1Y1kogOtx0bJblKzsVWrqYaqfNOnHzpgWw4Awc= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf h1:I1sbT4ZbIt9i+hB1zfKw2mE8C12TuGxPiW7YmtLbPa4= github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf/go.mod h1:jDHmWDKZY6MIIYltYYfW4Rs7hQ50oS4qf/6spSiZAxY= github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce h1:cVkYhlWAxwuS2/Yp6qPtcl0fGpcWxuZNonywHZ6/I+s= @@ -1527,14 +1532,21 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 h1:Nw7Dv4lwvGrI68+wULbcq7su9K2cebeCUrDjVrUJHxM= go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.starlark.net v0.0.0-20230925163745-10651d5192ab h1:7QkXlIVjYdSsKKSGnM0jQdw/2w9W5qcFDGTc00zKqgI= go.starlark.net v0.0.0-20230925163745-10651d5192ab/go.mod h1:LcLNIzVOMp4oV+uusnpk+VU+SzXaJakUuBjoCSWH5dM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index 98686bb52..56ec2958c 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" @@ -105,7 +106,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c // Sync jobs if canary is created or updated if canary.Generation == 1 { - if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { logger.Error(err, "failed to sync canary job") return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err } @@ -143,7 +144,7 @@ func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary } r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration) - if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil { return nil, err } return dbCanary, nil diff --git a/pkg/db/canary.go b/pkg/db/canary.go index fa21d4ddc..167291ac2 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -1,7 +1,6 @@ package db import ( - "context" "encoding/json" "errors" "fmt" @@ -15,6 +14,7 @@ import ( "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" "github.com/flanksource/duty" + "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" dutyTypes "github.com/flanksource/duty/types" "github.com/google/uuid" @@ -23,7 +23,7 @@ import ( "gorm.io/gorm/clause" ) -func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) { +func GetAllCanariesForSync(ctx context.Context, namespace string) ([]pkg.Canary, error) { query := ` SELECT json_agg( jsonb_set_lax(to_jsonb(canaries),'{checks}', ( @@ -49,7 +49,7 @@ func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) { args["namespace"] = namespace } - rows, err := Pool.Query(context.Background(), query, args) + rows, err := ctx.Pool().Query(ctx, query, args) if err != nil { return nil, err } @@ -125,9 +125,9 @@ func PersistCheck(check pkg.Check, canaryID uuid.UUID) (uuid.UUID, error) { return check.ID, nil } -func GetTransformedCheckIDs(canaryID string) ([]string, error) { +func GetTransformedCheckIDs(ctx context.Context, canaryID string) ([]string, error) { var ids []string - err := Gorm.Table("checks"). + err := ctx.DB().Table("checks"). Select("id"). Where("canary_id = ? AND transformed = true AND deleted_at IS NULL", canaryID). Find(&ids). @@ -135,7 +135,7 @@ func GetTransformedCheckIDs(canaryID string) ([]string, error) { return ids, err } -func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error { +func AddCheckStatuses(ctx context.Context, ids []string, status models.CheckHealthStatus) error { if len(ids) == 0 { return nil } @@ -158,12 +158,12 @@ func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error { }) } } - return Gorm.Table("check_statuses"). + return ctx.DB().Table("check_statuses"). Create(objs). Error } -func RemoveTransformedChecks(ids []string) error { +func RemoveTransformedChecks(ctx context.Context, ids []string) error { if len(ids) == 0 { return nil } @@ -171,7 +171,7 @@ func RemoveTransformedChecks(ids []string) error { "deleted_at": gorm.Expr("NOW()"), } - return Gorm.Table("checks"). + return ctx.DB().Table("checks"). Where("id in (?)", ids). Where("transformed = true"). Updates(updates). @@ -280,7 +280,7 @@ func FindCheck(canary pkg.Canary, name string) (*pkg.Check, error) { func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, error) { var ids []string - err := Gorm.Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error + err := ctx.DB().Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error return ids, err } diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 5fb8fd60a..0adb8f37c 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -1,12 +1,13 @@ package canary import ( + gocontext "context" "fmt" "path" "sync" "time" - "github.com/flanksource/canary-checker/api/context" + canarycontext "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/checks" "github.com/flanksource/canary-checker/pkg" @@ -17,9 +18,14 @@ import ( "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/context" + dutyjob "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/flanksource/kommons" + "go.opentelemetry.io/otel/trace" + "github.com/robfig/cron/v3" + "go.opentelemetry.io/otel/attribute" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -50,36 +56,36 @@ func StartScanCanaryConfigs(dataFile string, configFiles []string) { } type CanaryJob struct { - *kommons.Client - Kubernetes kubernetes.Interface - Canary v1.Canary - DBCanary pkg.Canary - LogPass bool - LogFail bool + Canary v1.Canary + DBCanary pkg.Canary + LogPass bool + LogFail bool } -func (job CanaryJob) GetNamespacedName() types.NamespacedName { - return types.NamespacedName{Name: job.Canary.Name, Namespace: job.Canary.Namespace} +func (j CanaryJob) GetNamespacedName() types.NamespacedName { + return types.NamespacedName{Name: j.Canary.Name, Namespace: j.Canary.Namespace} } var minimumTimeBetweenCanaryRuns = 10 * time.Second var canaryLastRuntimes = sync.Map{} -func (job CanaryJob) Run() { - if runner.IsCanaryIgnored(&job.Canary.ObjectMeta) { - return +func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error { + ctx.GetSpan().SetAttributes(attribute.String("canary-id", j.DBCanary.ID.String())) + if runner.IsCanaryIgnored(&j.Canary.ObjectMeta) { + return nil } - canaryID := job.DBCanary.ID.String() + + canaryID := j.DBCanary.ID.String() val, _ := concurrentJobLocks.LoadOrStore(canaryID, &sync.Mutex{}) lock, ok := val.(*sync.Mutex) if !ok { logger.Warnf("expected mutex but got %T for canary(id=%s)", lock, canaryID) - return + return nil } if !lock.TryLock() { logger.Debugf("canary (id=%s) is already running. skipping this run ...", canaryID) - return + return nil } defer lock.Unlock() @@ -93,25 +99,32 @@ func (job CanaryJob) Run() { // Skip run if job ran too recently if lastRunDelta < minimumTimeBetweenCanaryRuns { - logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, job.GetNamespacedName(), lastRunDelta.Seconds()) - return + logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, j.Canary.GetNamespacedName(), lastRunDelta.Seconds()) + return nil + } + + canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), j.Canary) + var span trace.Span + ctx.Context, span = ctx.StartSpan("RunCanaryChecks") + results, err := checks.RunChecks(canaryCtx) + if err != nil { + logger.Errorf("error running checks for canary %s: %v", canaryID, err) + return nil } + span.End() // Get transformed checks before and after, and then delete the olds ones that are not in new set - existingTransformedChecks, _ := db.GetTransformedCheckIDs(canaryID) + existingTransformedChecks, _ := db.GetTransformedCheckIDs(ctx.Context, canaryID) var transformedChecksCreated []string // Transformed checks have a delete strategy // On deletion they can either be marked healthy, unhealthy or left as is checkIDDeleteStrategyMap := make(map[string]string) - results, err := checks.RunChecks(job.NewContext()) - if err != nil { - logger.Errorf("error running checks for canary %s: %v", job.Canary.GetPersistedID(), err) - job.Errorf("error running checks for canary %s: %v", job.Canary.GetPersistedID(), err) - return - } + // TODO: Use ctx with object here + logPass := j.Canary.IsTrace() || j.Canary.IsDebug() || LogPass + logFail := j.Canary.IsTrace() || j.Canary.IsDebug() || LogFail for _, result := range results { - if job.LogPass && result.Pass || job.LogFail && !result.Pass { + if logPass && result.Pass || logFail && !result.Pass { logger.Infof(result.String()) } transformedChecksAdded := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.FromResult(*result)) @@ -120,7 +133,7 @@ func (job CanaryJob) Run() { checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy() } } - job.updateStatusAndEvent(results) + updateCanaryStatusAndEvent(j.Canary, results) checkDeleteStrategyGroup := make(map[string][]string) checksToRemove := utils.SetDifference(existingTransformedChecks, transformedChecksCreated) @@ -137,10 +150,10 @@ func (job CanaryJob) Run() { checkDeleteStrategyGroup[status] = append(checkDeleteStrategyGroup[status], checkID) } for status, checkIDs := range checkDeleteStrategyGroup { - if err := db.AddCheckStatuses(checkIDs, models.CheckHealthStatus(status)); err != nil { + if err := db.AddCheckStatuses(ctx.Context, checkIDs, models.CheckHealthStatus(status)); err != nil { logger.Errorf("error adding statuses for transformed checks: %v", err) } - if err := db.RemoveTransformedChecks(checkIDs); err != nil { + if err := db.RemoveTransformedChecks(ctx.Context, checkIDs); err != nil { logger.Errorf("error deleting transformed checks for canary %s: %v", canaryID, err) } } @@ -148,13 +161,10 @@ func (job CanaryJob) Run() { // Update last runtime map canaryLastRuntimes.Store(canaryID, time.Now()) + return nil } -func (job *CanaryJob) NewContext() *context.Context { - return context.New(job.Client, job.Kubernetes, db.Gorm, db.Pool, job.Canary) -} - -func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { +func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) { if CanaryStatusChannel == nil { return } @@ -175,11 +185,12 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { duration += result.Duration // Set uptime and latency - uptime, latency := metrics.Record(job.Canary, result) - checkKey := job.Canary.GetKey(result.Check) - checkStatus[checkKey] = &v1.CheckStatus{} - checkStatus[checkKey].Uptime1H = uptime.String() - checkStatus[checkKey].Latency1H = latency.String() + uptime, latency := metrics.Record(canary, result) + checkKey := canary.GetKey(result.Check) + checkStatus[checkKey] = &v1.CheckStatus{ + Uptime1H: uptime.String(), + Latency1H: latency.String(), + } // Increment aggregate uptime uptimeAgg.Passed += uptime.Passed @@ -192,8 +203,8 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { // Transition q := cache.QueryParams{Check: checkKey, StatusCount: 1} - if job.Canary.Status.LastTransitionedTime != nil { - q.Start = job.Canary.Status.LastTransitionedTime.Format(time.RFC3339) + if canary.Status.LastTransitionedTime != nil { + q.Start = canary.Status.LastTransitionedTime.Format(time.RFC3339) } lastStatus, err := cache.PostgresCache.Query(q) if err != nil || len(lastStatus) == 0 || len(lastStatus[0].Statuses) == 0 { @@ -206,7 +217,8 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { lastTransitionedTime = &metav1.Time{Time: time.Now()} } - push.Queue(pkg.FromV1(job.Canary, result.Check), pkg.FromResult(*result)) + // TODO Why is this here ? + push.Queue(pkg.FromV1(canary, result.Check), pkg.FromResult(*result)) // Update status message if len(messages) == 1 { @@ -235,7 +247,7 @@ func (job CanaryJob) updateStatusAndEvent(results []*pkg.CheckResult) { ErrorMessage: errorMsg, Uptime: uptimeAgg.String(), Latency: utils.Age(time.Duration(highestLatency) * time.Millisecond), - NamespacedName: job.GetNamespacedName(), + NamespacedName: canary.GetNamespacedName(), } CanaryStatusChannel <- payload @@ -255,7 +267,7 @@ type CanaryStatusPayload struct { func findCronEntry(id string) *cron.Entry { for _, entry := range CanaryScheduler.Entries() { - if entry.Job.(CanaryJob).DBCanary.ID.String() == id { + if entry.Job.(*dutyjob.Job).ID == id { return &entry } } @@ -265,7 +277,7 @@ func findCronEntry(id string) *cron.Entry { func getAllCanaryIDsInCron() []string { var ids []string for _, entry := range CanaryScheduler.Entries() { - ids = append(ids, entry.Job.(CanaryJob).DBCanary.ID.String()) + ids = append(ids, entry.Job.(*dutyjob.Job).ID) } return ids } @@ -295,7 +307,7 @@ func ScanCanaryConfigs() { var canaryUpdateTimeCache = sync.Map{} // TODO: Refactor to use database object instead of kubernetes -func SyncCanaryJob(dbCanary pkg.Canary) error { +func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error { canary, err := dbCanary.ToV1() if err != nil { return err @@ -313,21 +325,21 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { if Kommons == nil { var err error Kommons, Kubernetes, err = pkg.NewKommonsClient() + ctx = ctx.WithKommons(Kommons).WithKubernetes(Kubernetes) if err != nil { logger.Warnf("Failed to get kommons client, features that read kubernetes config will fail: %v", err) } } - job := CanaryJob{ - Client: Kommons, - Kubernetes: Kubernetes, - Canary: *canary, - DBCanary: dbCanary, - LogPass: canary.IsTrace() || canary.IsDebug() || LogPass, - LogFail: canary.IsTrace() || canary.IsDebug() || LogFail, + updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String()) + cj := CanaryJob{ + Canary: *canary, + DBCanary: dbCanary, } - updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String()) + // Create new job context from empty context to create root spans for jobs + jobCtx := ctx.Wrap(gocontext.Background()).WithObject(canary.ObjectMeta) + newJob := dutyjob.NewJob(jobCtx, "SyncCanaryJob", canary.Spec.GetSchedule(), cj.Run).SetID(dbCanary.ID.String()) entry := findCronEntry(dbCanary.ID.String()) if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil { // Remove entry if it exists @@ -336,11 +348,11 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { } // Schedule canary for the first time - entryID, err := CanaryScheduler.AddJob(canary.Spec.GetSchedule(), job) - if err != nil { + if err := newJob.AddToScheduler(CanaryScheduler); err != nil { return fmt.Errorf("failed to schedule canary %s/%s: %v", canary.Namespace, canary.Name, err) } - entry = utils.Ptr(CanaryScheduler.Entry(entryID)) + + entry = newJob.GetEntry(CanaryScheduler) logger.Infof("Scheduled %s: %s", canary, canary.Spec.GetSchedule()) canaryUpdateTimeCache.Store(dbCanary.ID.String(), dbCanary.UpdatedAt) @@ -354,17 +366,17 @@ func SyncCanaryJob(dbCanary pkg.Canary) error { return nil } -func SyncCanaryJobs() { - logger.Debugf("Syncing canary jobs") +func SyncCanaryJobs(ctx dutyjob.JobRuntime) error { + ctx.Debugf("Syncing canary jobs") - canaries, err := db.GetAllCanariesForSync(runner.WatchNamespace) + canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace) if err != nil { logger.Errorf("Failed to get canaries: %v", err) jobHistory := models.NewJobHistory("SyncCanaries", "canary", "").Start() logIfError(db.PersistJobHistory(jobHistory.AddError(err.Error()).End()), "failed to persist job history [SyncCanaries]") - return + return err } existingIDsInCron := getAllCanaryIDsInCron() @@ -373,7 +385,7 @@ func SyncCanaryJobs() { jobHistory := models.NewJobHistory("CanarySync", "canary", c.ID.String()).Start() idsInNewFetch = append(idsInNewFetch, c.ID.String()) - if err := SyncCanaryJob(c); err != nil { + if err := SyncCanaryJob(ctx.Context, c); err != nil { logger.Errorf("Error syncing canary[%s]: %v", c.ID, err.Error()) logIfError(db.PersistJobHistory(jobHistory.AddError(err.Error()).End()), "failed to persist job history [CanarySync]") continue @@ -386,6 +398,7 @@ func SyncCanaryJobs() { } logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries())) + return nil } func DeleteCanaryJob(id string) { diff --git a/pkg/jobs/canary/canary_jobs_test.go b/pkg/jobs/canary/canary_jobs_test.go index d32f1fbfd..e90277c55 100644 --- a/pkg/jobs/canary/canary_jobs_test.go +++ b/pkg/jobs/canary/canary_jobs_test.go @@ -1,6 +1,7 @@ package canary import ( + gocontext "context" "encoding/json" "fmt" "net/http" @@ -8,6 +9,8 @@ import ( v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg/db" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/flanksource/duty/types" "github.com/google/uuid" @@ -42,7 +45,8 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { err = db.Gorm.Create(canaryM).Error Expect(err).To(BeNil()) - response, err := db.GetAllCanariesForSync("") + ctx := context.NewContext(gocontext.Background()).WithDB(db.Gorm, db.Pool) + response, err := db.GetAllCanariesForSync(ctx, "") Expect(err).To(BeNil()) Expect(len(response)).To(Equal(1)) }) @@ -50,7 +54,12 @@ var _ = ginkgo.Describe("Test Sync Canary Job", ginkgo.Ordered, func() { ginkgo.It("schedule the canary job", func() { CanaryScheduler.Start() minimumTimeBetweenCanaryRuns = 0 // reset this for now so it doesn't hinder test with small schedules - SyncCanaryJobs() + ctx := context.NewContext(gocontext.Background()).WithDB(db.Gorm, db.Pool) + jobCtx := job.JobRuntime{ + Context: ctx, + } + err := SyncCanaryJobs(jobCtx) + Expect(err).To(BeNil()) }) ginkgo.It("should verify that the endpoint wasn't called more than once after 3 seconds", func() { diff --git a/pkg/jobs/canary/prometheus_cleanup.go b/pkg/jobs/canary/prometheus_cleanup.go index b8ae80593..8c0db729b 100644 --- a/pkg/jobs/canary/prometheus_cleanup.go +++ b/pkg/jobs/canary/prometheus_cleanup.go @@ -1,9 +1,9 @@ package canary import ( - "context" "time" + "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/metrics" "github.com/flanksource/commons/logger" @@ -12,10 +12,8 @@ import ( // CleanupMetricsGauges removes gauges for checks that no longer exist. func CleanupMetricsGauges() { - ctx := context.Background() - sevenDaysAgo := time.Now().Add(-time.Hour * 24 * 7) - deletedCheckIDs, err := db.FindDeletedChecksSince(ctx, sevenDaysAgo) + deletedCheckIDs, err := db.FindDeletedChecksSince(context.DefaultContext, sevenDaysAgo) if err != nil { logger.Errorf("Error finding deleted checks: %v", err) return diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index 7a31b448a..e16d95f11 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -37,12 +37,12 @@ const ( // that are missing on the upstream. func ReconcileChecks() { jobHistory := models.NewJobHistory("PushChecksToUpstream", "Canary", "") + _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - ctx := dutyContext.NewContext(gocontext.TODO()).WithDB(db.Gorm, db.Pool) reconciler := upstream.NewUpstreamReconciler(UpstreamConf, 5) - if err := reconciler.SyncAfter(ctx, "checks", ReconcileMaxAge); err != nil { + if err := reconciler.SyncAfter(context.DefaultContext, "checks", ReconcileMaxAge); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("failed to sync table 'checks': %v", err) } else { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index bcc06ebde..a4cd258e8 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -3,6 +3,7 @@ package jobs import ( "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" + "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" systemJobs "github.com/flanksource/canary-checker/pkg/jobs/system" @@ -10,6 +11,7 @@ import ( "github.com/flanksource/canary-checker/pkg/topology/checks" "github.com/flanksource/canary-checker/pkg/topology/configs" "github.com/flanksource/commons/logger" + dutyjob "github.com/flanksource/duty/job" "github.com/robfig/cron/v3" ) @@ -67,9 +69,11 @@ func Start() { } } - if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs); err != nil { - logger.Errorf("Failed to schedule sync jobs for canary: %v", err) + if err := dutyjob.NewJob(context.DefaultContext, "SyncCanaryJobs", SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs). + RunOnStart().AddToScheduler(FuncScheduler); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.SyncCanaryJobs]: %v", err) } + if _, err := ScheduleFunc(SyncSystemsJobsSchedule, systemJobs.SyncTopologyJobs); err != nil { logger.Errorf("Failed to schedule sync jobs for systems: %v", err) } @@ -114,7 +118,6 @@ func Start() { } canaryJobs.CleanupMetricsGauges() - canaryJobs.SyncCanaryJobs() systemJobs.SyncTopologyJobs() } diff --git a/pkg/telemetry/tracer.go b/pkg/telemetry/tracer.go new file mode 100644 index 000000000..962bda4e6 --- /dev/null +++ b/pkg/telemetry/tracer.go @@ -0,0 +1,59 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "google.golang.org/grpc/credentials" + + "github.com/flanksource/commons/logger" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func InitTracer(serviceName, collectorURL string, insecure bool) func(context.Context) error { + var secureOption otlptracegrpc.Option + if !insecure { + secureOption = otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, "")) + } else { + secureOption = otlptracegrpc.WithInsecure() + } + + exporter, err := otlptrace.New( + context.Background(), + otlptracegrpc.NewClient( + secureOption, + otlptracegrpc.WithEndpoint(collectorURL), + ), + ) + + if err != nil { + logger.Fatalf("Failed to create exporter: %v", err) + } + resources, err := resource.New( + context.Background(), + resource.WithAttributes( + attribute.String("service.name", serviceName), + ), + ) + if err != nil { + logger.Fatalf("Could not set resources: %v", err) + } + + otel.SetTracerProvider( + sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(resources), + ), + ) + + // Register the TraceContext propagator globally. + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return exporter.Shutdown +} diff --git a/pkg/topology/checks/component_check.go b/pkg/topology/checks/component_check.go index 553e671b3..e1d8e8cb1 100644 --- a/pkg/topology/checks/component_check.go +++ b/pkg/topology/checks/component_check.go @@ -3,6 +3,7 @@ package checks import ( "time" + "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" @@ -70,7 +71,7 @@ func GetCheckComponentRelationshipsForComponent(component *pkg.Component) (relat logger.Debugf("error creating canary from inline: %v", err) } - if err := canaryJobs.SyncCanaryJob(*canary); err != nil { + if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *canary); err != nil { logger.Debugf("error creating canary job: %v", err) }