Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new job type for sync canary jobs #1370

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/commons/logger"
ctemplate "github.com/flanksource/commons/template"
"github.com/flanksource/duty"
dutyCtx "github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/flanksource/kommons"
Expand All @@ -19,6 +20,8 @@ import (
"k8s.io/client-go/kubernetes"
)

var DefaultContext dutyCtx.Context

type KubernetesContext struct {
gocontext.Context
Kommons *kommons.Client
Expand Down
5 changes: 5 additions & 0 deletions api/v1/canary_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/flanksource/canary-checker/api/external"
"github.com/flanksource/commons/logger"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

type ResultMode string
Expand Down Expand Up @@ -234,6 +235,10 @@ func (c Canary) GetDescription(check external.Check) string {
return check.GetEndpoint()
}

func (c Canary) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Name: c.Name, Namespace: c.Namespace}
}

func (c *Canary) SetRunnerName(name string) {
c.Status.runnerName = name
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/operator.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cmd

import (
gocontext "context"
"os"
"time"

apicontext "github.com/flanksource/canary-checker/api/context"
"github.com/flanksource/canary-checker/pkg/cache"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs"
Expand All @@ -16,9 +18,12 @@ import (
"github.com/flanksource/canary-checker/pkg"
"github.com/flanksource/canary-checker/pkg/controllers"
"github.com/flanksource/canary-checker/pkg/labels"
commonsCtx "github.com/flanksource/commons/context"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
"github.com/go-logr/zapr"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -73,6 +78,17 @@ func run(cmd *cobra.Command, args []string) {
if err := db.Init(); err != nil {
logger.Fatalf("error connecting with postgres: %v", err)
}
kommonsClient, k8s, err := pkg.NewKommonsClient()
if err != nil {
logger.Warnf("failed to get kommons client, checks that read kubernetes configs will fail: %v", err)
}

apicontext.DefaultContext = context.NewContext(gocontext.Background(), commonsCtx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))).
WithDB(db.Gorm, db.Pool).
WithKubernetes(k8s).
WithKommons(kommonsClient).
WithNamespace(runner.WatchNamespace)

cache.PostgresCache = cache.NewPostgresCache(db.Pool)
if operatorExecutor {
logger.Infof("Starting executors")
Expand Down
27 changes: 20 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/canary-checker/pkg/telemetry"
"github.com/flanksource/commons/logger"
gomplate "github.com/flanksource/gomplate/v3"
"github.com/spf13/cobra"
Expand All @@ -31,16 +32,25 @@ var Root = &cobra.Command{
if canary.UpstreamConf.Valid() {
logger.Infof("Pushing checks to %s with name=%s user=%s", canary.UpstreamConf.Host, canary.UpstreamConf.AgentName, canary.UpstreamConf.Username)
}

if otelcollectorURL != "" {
telemetry.InitTracer(otelServiceName, otelcollectorURL, true)
}
},
}

var httpPort = 8080
var publicEndpoint = "http://localhost:8080"
var prometheusURL string
var pushServers, pullServers []string
var sharedLibrary []string
var exposeEnv bool
var logPass, logFail bool
var (
httpPort = 8080
publicEndpoint = "http://localhost:8080"
prometheusURL string
pushServers, pullServers []string
sharedLibrary []string
exposeEnv bool
logPass, logFail bool

otelcollectorURL string
otelServiceName string
)

func ServerFlags(flags *pflag.FlagSet) {
flags.IntVar(&httpPort, "httpPort", httpPort, "Port to expose a health dashboard ")
Expand Down Expand Up @@ -76,6 +86,9 @@ func ServerFlags(flags *pflag.FlagSet) {
flags.StringVar(&canary.UpstreamConf.Password, "upstream-password", os.Getenv("UPSTREAM_PASSWORD"), "upstream password")
flags.StringVar(&canary.UpstreamConf.AgentName, "agent-name", os.Getenv("UPSTREAM_NAME"), "name of this agent")
flags.BoolVar(&canary.UpstreamConf.InsecureSkipVerify, "upstream-insecure-skip-verify", os.Getenv("UPSTREAM_INSECURE_SKIP_VERIFY") == "true", "Skip TLS verification on the upstream servers certificate")

flags.StringVar(&otelcollectorURL, "otel-collector-url", "", "OpenTelemetry gRPC Collector URL in host:port format")
flags.StringVar(&otelServiceName, "otel-service-name", "canary-checker", "OpenTelemetry service name for the resource")
}

func readFromEnv(v string) string {
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
go.mongodb.org/mongo-driver v1.12.1
go.opentelemetry.io/otel v1.19.0
golang.org/x/crypto v0.14.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.4.0
Expand Down Expand Up @@ -111,6 +112,7 @@ require (
github.com/aws/smithy-go v1.14.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
Expand Down Expand Up @@ -160,6 +162,7 @@ require (
github.com/gosimple/slug v1.13.1 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf // indirect
github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down Expand Up @@ -231,9 +234,12 @@ require (
github.com/yuin/gopher-lua v1.1.0 // indirect
github.com/zclconf/go-cty v1.14.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.starlark.net v0.0.0-20230925163745-10651d5192ab // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
Expand Down Expand Up @@ -1108,9 +1110,12 @@ github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6
github.com/gosimple/unidecode v1.0.1/go.mod h1:CP0Cr1Y1kogOtx0bJblKzsVWrqYaqfNOnHzpgWw4Awc=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf h1:I1sbT4ZbIt9i+hB1zfKw2mE8C12TuGxPiW7YmtLbPa4=
github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf/go.mod h1:jDHmWDKZY6MIIYltYYfW4Rs7hQ50oS4qf/6spSiZAxY=
github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce h1:cVkYhlWAxwuS2/Yp6qPtcl0fGpcWxuZNonywHZ6/I+s=
Expand Down Expand Up @@ -1527,14 +1532,21 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 h1:Nw7Dv4lwvGrI68+wULbcq7su9K2cebeCUrDjVrUJHxM=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o=
go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.starlark.net v0.0.0-20230925163745-10651d5192ab h1:7QkXlIVjYdSsKKSGnM0jQdw/2w9W5qcFDGTc00zKqgI=
go.starlark.net v0.0.0-20230925163745-10651d5192ab/go.mod h1:LcLNIzVOMp4oV+uusnpk+VU+SzXaJakUuBjoCSWH5dM=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c

// Sync jobs if canary is created or updated
if canary.Generation == 1 {
if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
logger.Error(err, "failed to sync canary job")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
Expand Down Expand Up @@ -143,7 +144,7 @@ func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary
}
r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration)

if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
return nil, err
}
return dbCanary, nil
Expand Down
20 changes: 10 additions & 10 deletions pkg/db/canary.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package db

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/google/uuid"
Expand All @@ -23,7 +23,7 @@ import (
"gorm.io/gorm/clause"
)

func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) {
func GetAllCanariesForSync(ctx context.Context, namespace string) ([]pkg.Canary, error) {
query := `
SELECT json_agg(
jsonb_set_lax(to_jsonb(canaries),'{checks}', (
Expand All @@ -49,7 +49,7 @@ func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) {
args["namespace"] = namespace
}

rows, err := Pool.Query(context.Background(), query, args)
rows, err := ctx.Pool().Query(ctx, query, args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,17 +125,17 @@ func PersistCheck(check pkg.Check, canaryID uuid.UUID) (uuid.UUID, error) {
return check.ID, nil
}

func GetTransformedCheckIDs(canaryID string) ([]string, error) {
func GetTransformedCheckIDs(ctx context.Context, canaryID string) ([]string, error) {
var ids []string
err := Gorm.Table("checks").
err := ctx.DB().Table("checks").
Select("id").
Where("canary_id = ? AND transformed = true AND deleted_at IS NULL", canaryID).
Find(&ids).
Error
return ids, err
}

func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error {
func AddCheckStatuses(ctx context.Context, ids []string, status models.CheckHealthStatus) error {
if len(ids) == 0 {
return nil
}
Expand All @@ -158,20 +158,20 @@ func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error {
})
}
}
return Gorm.Table("check_statuses").
return ctx.DB().Table("check_statuses").
Create(objs).
Error
}

func RemoveTransformedChecks(ids []string) error {
func RemoveTransformedChecks(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
updates := map[string]any{
"deleted_at": gorm.Expr("NOW()"),
}

return Gorm.Table("checks").
return ctx.DB().Table("checks").
Where("id in (?)", ids).
Where("transformed = true").
Updates(updates).
Expand Down Expand Up @@ -280,7 +280,7 @@ func FindCheck(canary pkg.Canary, name string) (*pkg.Check, error) {

func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, error) {
var ids []string
err := Gorm.Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error
err := ctx.DB().Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error
return ids, err
}

Expand Down
Loading
Loading