From 3e516519498d4a5f8d7fcabd61624c2ac454645c Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 29 Dec 2023 11:41:08 +0545 Subject: [PATCH] feat: move check details query to duty --- cmd/root.go | 6 +- cmd/serve.go | 3 +- go.mod | 6 +- go.sum | 4 +- hack/generate-schemas/go.mod | 3 +- hack/generate-schemas/go.sum | 6 +- pkg/api.go | 60 +------------ pkg/api/api.go | 36 +++----- pkg/cache/cache.go | 135 ----------------------------- pkg/cache/postgres_query.go | 144 ------------------------------- pkg/cache/postgres_util.go | 28 ------ pkg/jobs/canary/canary_jobs.go | 6 +- pkg/jobs/canary/sync_upstream.go | 3 +- pkg/metrics/metrics.go | 22 ++--- 14 files changed, 45 insertions(+), 417 deletions(-) delete mode 100644 pkg/cache/cache.go delete mode 100644 pkg/cache/postgres_query.go diff --git a/cmd/root.go b/cmd/root.go index 0f9a1eb1c..0f1363c10 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -6,7 +6,6 @@ import ( "github.com/flanksource/canary-checker/checks" "github.com/flanksource/canary-checker/pkg" - "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/prometheus" @@ -16,6 +15,7 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty" "github.com/flanksource/duty/context" + "github.com/flanksource/duty/query" gomplate "github.com/flanksource/gomplate/v3" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -94,12 +94,12 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringSliceVar(&runner.IncludeCanaries, "include-check", []string{}, "Run matching canaries - useful for debugging") flags.StringSliceVar(&runner.IncludeTypes, "include-type", []string{}, "Check type to disable") flags.StringSliceVar(&runner.IncludeNamespaces, "include-namespace", []string{}, "Check type to disable") - flags.IntVar(&cache.DefaultCacheCount, "maxStatusCheckCount", 5, "Maximum number of past checks in the in memory cache") + flags.IntVar(&query.DefaultCacheCount, "maxStatusCheckCount", 5, "Maximum number of past checks in the in memory cache") flags.StringVar(&runner.RunnerName, "name", "local", "Server name shown in aggregate dashboard") flags.StringVar(&prometheus.PrometheusURL, "prometheus", "", "URL of the prometheus server that is scraping this instance") flags.StringVar(&db.ConnectionString, "db", "DB_URL", "Connection string for the postgres database. Use embedded:///path/to/dir to use the embedded database") flags.IntVar(&db.DefaultExpiryDays, "cache-timeout", 90, "Cache timeout in days") - flags.StringVarP(&cache.DefaultWindow, "default-window", "", "1h", "Default search window") + flags.StringVarP(&query.DefaultCheckQueryWindow, "default-window", "", "1h", "Default search window") flags.IntVar(&db.CheckStatusRetention, "check-status-retention-period", db.CheckStatusRetention, "Check status retention period in days") flags.IntVar(&topology.CheckRetentionDays, "check-retention-period", topology.DefaultCheckRetentionDays, "Check retention period in days") flags.IntVar(&topology.CanaryRetentionDays, "canary-retention-period", topology.DefaultCanaryRetentionDays, "Canary retention period in days") diff --git a/cmd/serve.go b/cmd/serve.go index 6cf96a81c..83697d3b5 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -18,7 +18,6 @@ import ( "github.com/flanksource/canary-checker/pkg/echo" "github.com/flanksource/canary-checker/pkg/jobs" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" - "github.com/flanksource/duty" echov4 "github.com/labstack/echo/v4" "go.opentelemetry.io/otel" @@ -62,7 +61,7 @@ func setup() dutyContext.Context { cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext) - if err := duty.UpdatePropertiesFromFile(apicontext.DefaultContext, propertiesFile); err != nil { + if err := dutyContext.LoadPropertiesFromFile(apicontext.DefaultContext, propertiesFile); err != nil { logger.Fatalf("Error setting properties in database: %v", err) } return apicontext.DefaultContext diff --git a/go.mod b/go.mod index cffd651a3..cf6364cce 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/fergusstrange/embedded-postgres v1.25.0 github.com/flanksource/artifacts v1.0.3 github.com/flanksource/commons v1.19.3 - github.com/flanksource/duty v1.0.258 + github.com/flanksource/duty v1.0.261 github.com/flanksource/gomplate/v3 v3.20.26 github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7 github.com/flanksource/kommons v0.31.4 @@ -284,6 +284,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) -// replace github.com/flanksource/commons => ../commons - -// replace github.com/flanksource/duty => ../duty +// replace "github.com/flanksource/duty" => ../duty diff --git a/go.sum b/go.sum index 90d9407e3..5a78e5f09 100644 --- a/go.sum +++ b/go.sum @@ -827,8 +827,8 @@ github.com/flanksource/artifacts v1.0.3 h1:Ci26mDhVFyxPefX96tgPGTe5fR0wfntXotSxa github.com/flanksource/artifacts v1.0.3/go.mod h1:KfDDG7B4wsiGqJYOamcRU19u5hBvpYjlru3GfcORvko= github.com/flanksource/commons v1.19.3 h1:J4s5WWicUMNavEDCC5AIJQWBDaaloJ/tgztWUXTtRXc= github.com/flanksource/commons v1.19.3/go.mod h1:ZUgFy0Wwrm2LxV/AltBx36FShzXcn7wPmXJldK8Aa0E= -github.com/flanksource/duty v1.0.258 h1:+51Z8arzzks9hWHhUwV0A606UTvNMk0up9rj0asai0Y= -github.com/flanksource/duty v1.0.258/go.mod h1:tXMyMlcSGOXvXoP5KZApwzSDFs07NDQGhnJi9ArjWf4= +github.com/flanksource/duty v1.0.261 h1:G8qfPPk8ABIIQ/Bwv4OyAU7l6NrIuBN4eYJ9MYrzSJ0= +github.com/flanksource/duty v1.0.261/go.mod h1:4r5cU+iu89HXK2NtQpPw3neE6CRxrqZLshrKLN2G3CU= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.26 h1:85lUzlKgZjb1uIkzoa4zN03OcdOnFPG+oWxshZTYqz4= github.com/flanksource/gomplate/v3 v3.20.26/go.mod h1:GKmptFMdr2LbOuqwQZrmo9a/UygyZ0pbXffks8MuYhE= diff --git a/hack/generate-schemas/go.mod b/hack/generate-schemas/go.mod index 07e2e6cc1..6087253b6 100644 --- a/hack/generate-schemas/go.mod +++ b/hack/generate-schemas/go.mod @@ -37,6 +37,7 @@ require ( github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/antonmedv/expr v1.15.5 // indirect github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect + github.com/asecurityteam/rolling v2.0.4+incompatible // indirect github.com/aws/aws-sdk-go v1.48.15 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b // indirect @@ -44,7 +45,7 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/evanphx/json-patch v5.7.0+incompatible // indirect github.com/exaring/otelpgx v0.5.2 // indirect - github.com/flanksource/duty v1.0.258 // indirect + github.com/flanksource/duty v1.0.261 // indirect github.com/flanksource/is-healthy v0.0.0-20231003215854-76c51e3a3ff7 // indirect github.com/flanksource/kommons v0.31.4 // indirect github.com/ghodss/yaml v1.0.0 // indirect diff --git a/hack/generate-schemas/go.sum b/hack/generate-schemas/go.sum index 8fde98ca9..56cae8e51 100644 --- a/hack/generate-schemas/go.sum +++ b/hack/generate-schemas/go.sum @@ -647,6 +647,8 @@ github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmms github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/asecurityteam/rolling v2.0.4+incompatible h1:WOSeokINZT0IDzYGc5BVcjLlR9vPol08RvI2GAsmB0s= +github.com/asecurityteam/rolling v2.0.4+incompatible/go.mod h1:2D4ba5ZfYCWrIMleUgTvc8pmLExEuvu3PDwl+vnG58Q= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.48.15 h1:Gad2C4pLzuZDd5CA0Rvkfko6qUDDTOYru145gkO7w/Y= github.com/aws/aws-sdk-go v1.48.15/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= @@ -717,8 +719,8 @@ github.com/exaring/otelpgx v0.5.2/go.mod h1:4dBiAqwzDNmpj3TwX5Syti1/Nw2bIoDQItdL github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/flanksource/commons v1.19.3 h1:J4s5WWicUMNavEDCC5AIJQWBDaaloJ/tgztWUXTtRXc= github.com/flanksource/commons v1.19.3/go.mod h1:ZUgFy0Wwrm2LxV/AltBx36FShzXcn7wPmXJldK8Aa0E= -github.com/flanksource/duty v1.0.258 h1:+51Z8arzzks9hWHhUwV0A606UTvNMk0up9rj0asai0Y= -github.com/flanksource/duty v1.0.258/go.mod h1:tXMyMlcSGOXvXoP5KZApwzSDFs07NDQGhnJi9ArjWf4= +github.com/flanksource/duty v1.0.261 h1:G8qfPPk8ABIIQ/Bwv4OyAU7l6NrIuBN4eYJ9MYrzSJ0= +github.com/flanksource/duty v1.0.261/go.mod h1:4r5cU+iu89HXK2NtQpPw3neE6CRxrqZLshrKLN2G3CU= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.26 h1:85lUzlKgZjb1uIkzoa4zN03OcdOnFPG+oWxshZTYqz4= github.com/flanksource/gomplate/v3 v3.20.26/go.mod h1:GKmptFMdr2LbOuqwQZrmo9a/UygyZ0pbXffks8MuYhE= diff --git a/pkg/api.go b/pkg/api.go index 3fdf05c1d..202a4aff7 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -64,62 +64,6 @@ func (s CheckStatus) GetTime() (time.Time, error) { return time.Parse("2006-01-02 15:04:05", s.Time) } -type Latency struct { - Percentile99 float64 `json:"p99,omitempty" db:"p99"` - Percentile97 float64 `json:"p97,omitempty" db:"p97"` - Percentile95 float64 `json:"p95,omitempty" db:"p95"` - Rolling1H float64 `json:"rolling1h"` -} - -func (l Latency) String() string { - s := "" - if l.Percentile99 != 0 { - s += fmt.Sprintf("p99=%s", utils.Age(time.Duration(l.Percentile99)*time.Millisecond)) - } - if l.Percentile95 != 0 { - s += fmt.Sprintf("p95=%s", utils.Age(time.Duration(l.Percentile95)*time.Millisecond)) - } - if l.Percentile97 != 0 { - s += fmt.Sprintf("p97=%s", utils.Age(time.Duration(l.Percentile97)*time.Millisecond)) - } - if l.Rolling1H != 0 { - s += fmt.Sprintf("rolling1h=%s", utils.Age(time.Duration(l.Rolling1H)*time.Millisecond)) - } - return s -} - -type Uptime struct { - Passed int `json:"passed"` - Failed int `json:"failed"` - P100 float64 `json:"p100,omitempty"` - LastPass *time.Time `json:"last_pass,omitempty"` - LastFail *time.Time `json:"last_fail,omitempty"` -} - -func (u Uptime) String() string { - if u.Passed == 0 && u.Failed == 0 { - return "" - } - if u.Passed == 0 { - return fmt.Sprintf("0/%d 0%%", u.Failed) - } - percentage := 100.0 * (1 - (float64(u.Failed) / float64(u.Passed+u.Failed))) - return fmt.Sprintf("%d/%d (%0.1f%%)", u.Passed, u.Passed+u.Failed, percentage) -} - -type Timeseries struct { - Key string `json:"key,omitempty"` - Time string `json:"time,omitempty"` - Status bool `json:"status,omitempty"` - Message string `json:"message,omitempty"` - Error string `json:"error,omitempty"` - Duration int `json:"duration"` - // Count is the number of times the check has been run in the specified time window - Count int `json:"count,omitempty"` - Passed int `json:"passed,omitempty"` - Failed int `json:"failed,omitempty"` -} - type Canary struct { ID uuid.UUID `gorm:"default:generate_ulid()"` AgentID uuid.UUID @@ -212,8 +156,8 @@ type Check struct { Labels types.JSONStringMap `json:"labels" gorm:"type:jsonstringmap"` Description string `json:"description,omitempty"` Status string `json:"status,omitempty"` - Uptime Uptime `json:"uptime" gorm:"-"` - Latency Latency `json:"latency" gorm:"-"` + Uptime types.Uptime `json:"uptime" gorm:"-"` + Latency types.Latency `json:"latency" gorm:"-"` Statuses []CheckStatus `json:"checkStatuses" gorm:"-"` Owner string `json:"owner,omitempty"` Severity string `json:"severity,omitempty"` diff --git a/pkg/api/api.go b/pkg/api/api.go index 71aba9c3e..896b17239 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -9,6 +9,7 @@ import ( "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" "github.com/flanksource/duty/query" + "github.com/flanksource/duty/types" "github.com/labstack/echo/v4" "github.com/flanksource/canary-checker/pkg" @@ -44,11 +45,11 @@ type Response struct { } type DetailResponse struct { - Duration int `json:"duration,omitempty"` - RunnerName string `json:"runnerName"` - Status []pkg.Timeseries `json:"status"` - Latency pkg.Latency `json:"latency"` - Uptime pkg.Uptime `json:"uptime"` + Duration int `json:"duration,omitempty"` + RunnerName string `json:"runnerName"` + Status []query.Timeseries `json:"status"` + Latency types.Latency `json:"latency"` + Uptime types.Uptime `json:"uptime"` } func About(c echo.Context) error { @@ -60,30 +61,15 @@ func About(c echo.Context) error { } func CheckDetails(c echo.Context) error { - q, err := cache.ParseQuery(c) - if err != nil { + ctx := c.Request().Context().(context.Context) + + var q query.CheckQueryParams + if err := q.Init(c.QueryParams()); err != nil { return errorResponse(c, err, http.StatusBadRequest) } start := time.Now() - - end := q.GetEndTime() - since := q.GetStartTime() - timeRange := end.Sub(*since) - - if timeRange <= time.Hour*2 { - q.WindowDuration = time.Minute - } else if timeRange >= time.Hour*24 { - q.WindowDuration = time.Minute * 15 - } else if timeRange >= time.Hour*24*7 { - q.WindowDuration = time.Minute * 60 - } else { - q.WindowDuration = time.Hour * 4 - } - - ctx := c.Request().Context().(context.Context) - - results, uptime, latency, err := q.ExecuteDetails(ctx, ctx.Pool()) + results, uptime, latency, err := q.ExecuteDetails(ctx) if err != nil { return errorResponse(c, err, http.StatusInternalServerError) } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go deleted file mode 100644 index 755d63f09..000000000 --- a/pkg/cache/cache.go +++ /dev/null @@ -1,135 +0,0 @@ -package cache - -import ( - "fmt" - "strconv" - "time" - - v1 "github.com/flanksource/canary-checker/api/v1" - "github.com/flanksource/canary-checker/pkg" - "github.com/labstack/echo/v4" - "github.com/pkg/errors" - "github.com/samber/lo" -) - -var DefaultCacheCount int - -var DefaultWindow string - -const AllStatuses = -1 - -type QueryParams struct { - Check string - CanaryID string - Start, End string - Window string - IncludeMessages bool - IncludeDetails bool - _start, _end *time.Time - StatusCount int - Labels map[string]string - Trace bool - WindowDuration time.Duration -} - -func (q QueryParams) Validate() error { - start, err := timeV(q.Start) - if err != nil { - return errors.Wrap(err, "start is invalid") - } - end, err := timeV(q.End) - if err != nil { - return errors.Wrap(err, "end is invalid") - } - if start != nil && end != nil { - if end.Before(*start) { - return fmt.Errorf("end time must be after start time") - } - } - return nil -} - -func (q QueryParams) GetStartTime() *time.Time { - if q._start != nil || q.Start == "" { - return q._start - } - q._start, _ = timeV(q.Start) - return q._start -} - -func (q QueryParams) GetEndTime() *time.Time { - if q._end != nil { - return q._end - } - if q.End == "" { - q._end = lo.ToPtr(time.Now()) - } - q._end, _ = timeV(q.End) - return q._end -} - -func (q QueryParams) String() string { - return fmt.Sprintf("check:=%s, start=%s, end=%s, count=%d", q.Check, q.Start, q.End, q.StatusCount) -} - -func ParseQuery(c echo.Context) (*QueryParams, error) { - queryParams := c.Request().URL.Query() - count := queryParams.Get("count") - var cacheCount int64 - var err error - if count != "" { - cacheCount, err = strconv.ParseInt(count, 10, 64) - if err != nil { - return nil, fmt.Errorf("count must be a number: %s", count) - } - } else { - cacheCount = int64(DefaultCacheCount) - } - - since := queryParams.Get("since") - if since == "" { - since = queryParams.Get("start") - } - if since == "" { - since = DefaultWindow - } - - until := queryParams.Get("until") - if until == "" { - until = queryParams.Get("end") - } - if until == "" { - until = "0s" - } - - q := QueryParams{ - Start: since, - End: until, - Window: queryParams.Get("window"), - IncludeMessages: isTrue(queryParams.Get("includeMessages")), - IncludeDetails: isTrue(queryParams.Get("includeDetails")), - Check: queryParams.Get("check"), - StatusCount: int(cacheCount), - Trace: isTrue(queryParams.Get("trace")), - CanaryID: queryParams.Get("canary_id"), - } - - if err := q.Validate(); err != nil { - return nil, err - } - - return &q, nil -} - -func isTrue(v string) bool { - return v == "true" -} - -type Cache interface { - Add(check pkg.Check, status ...pkg.CheckStatus) - GetDetails(checkkey string, time string) interface{} - RemoveChecks(canary v1.Canary) - Query(q QueryParams) (pkg.Checks, error) - QueryStatus(q QueryParams) ([]pkg.Timeseries, error) - RemoveCheckByKey(key string) -} diff --git a/pkg/cache/postgres_query.go b/pkg/cache/postgres_query.go deleted file mode 100644 index 6a7c2fdc8..000000000 --- a/pkg/cache/postgres_query.go +++ /dev/null @@ -1,144 +0,0 @@ -package cache - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/asecurityteam/rolling" - "github.com/flanksource/canary-checker/pkg" - "github.com/flanksource/commons/duration" - "github.com/jackc/pgx/v5" -) - -type Querier interface { - Query(ctx context.Context, query string, args ...interface{}) (pgx.Rows, error) -} - -func parseDuration(d string, name string) (clause string, arg interface{}, err error) { - if d == "" { - return "", nil, nil - } - dur, err := duration.ParseDuration(d) - if err == nil { - return fmt.Sprintf("(NOW() at TIME ZONE 'utc' - Interval '1 minute' * :%s)", name), dur.Minutes(), nil - } - if timestamp, err := time.Parse(time.RFC3339, d); err == nil { - return ":" + name, timestamp, nil - } - return "", nil, fmt.Errorf("start time must be a duration or RFC3339 timestamp") -} - -func (q QueryParams) GetWhereClause() (string, map[string]interface{}, error) { - clause := "" - args := make(map[string]interface{}) - and := " AND " - if q.Check != "" { - clause = "check_id = :check_key" - args["check_key"] = q.Check - } - if q.Start != "" && q.End == "" { - if clause != "" { - clause += and - } - start, arg, err := parseDuration(q.Start, "start") - if err != nil { - return "", nil, err - } - args["start"] = arg - clause += "time > " + start - } else if q.Start == "" && q.End != "" { - if clause != "" { - clause += and - } - end, arg, err := parseDuration(q.End, "end") - if err != nil { - return "", nil, err - } - args["end"] = arg - clause += "time < " + end - } - if q.Start != "" && q.End != "" { - if clause != "" { - clause += and - } - start, arg, err := parseDuration(q.Start, "start") - if err != nil { - return "", nil, err - } - args["start"] = arg - end, arg, err := parseDuration(q.End, "end") - if err != nil { - return "", nil, err - } - args["end"] = arg - clause += "time BETWEEN " + start + and + end - } - return strings.TrimSpace(clause), args, nil -} - -func (q QueryParams) ExecuteDetails(ctx context.Context, db Querier) ([]pkg.Timeseries, pkg.Uptime, pkg.Latency, error) { - start := q.GetStartTime().Format(time.RFC3339) - end := q.GetEndTime().Format(time.RFC3339) - - query := ` -With grouped_by_window AS ( - SELECT - duration, - status, - CASE WHEN check_statuses.status = TRUE THEN 1 ELSE 0 END AS passed, - CASE WHEN check_statuses.status = FALSE THEN 1 ELSE 0 END AS failed, - to_timestamp(floor((extract(epoch FROM time) + $1) / $2) * $2) AS time - FROM check_statuses - WHERE - time >= $3 AND - time <= $4 AND - check_id = $5 -) -SELECT - time, - bool_and(status), - AVG(duration)::integer as duration, - sum(passed) as passed, - sum(failed) as failed -FROM - grouped_by_window -GROUP BY time -ORDER BY time -` - args := []any{q.WindowDuration.Seconds() / 2, q.WindowDuration.Seconds(), start, end, q.Check} - - if q.WindowDuration == 0 { - // FIXME - query = `SELECT time, status, duration, - CASE WHEN check_statuses.status = TRUE THEN 1 ELSE 0 END AS passed, - CASE WHEN check_statuses.status = FALSE THEN 1 ELSE 0 END AS failed - FROM check_statuses WHERE time >= $1 AND time <= $2 AND check_id = $3` - args = []any{start, end, q.Check} - } - uptime := pkg.Uptime{} - latency := rolling.NewPointPolicy(rolling.NewWindow(100)) - - rows, err := db.Query(ctx, query, args...) - if err != nil { - return nil, uptime, pkg.Latency{}, err - } - defer rows.Close() - - var results []pkg.Timeseries - for rows.Next() { - var datapoint pkg.Timeseries - var ts time.Time - if err := rows.Scan(&ts, &datapoint.Status, &datapoint.Duration, &datapoint.Passed, &datapoint.Failed); err != nil { - return nil, uptime, pkg.Latency{}, err - } - uptime.Failed += datapoint.Failed - uptime.Passed += datapoint.Passed - latency.Append(float64(datapoint.Duration)) - datapoint.Time = ts.Format(time.RFC3339) - results = append(results, datapoint) - } - - return results, uptime, pkg.Latency{Percentile95: latency.Reduce(rolling.Percentile(95))}, nil -} diff --git a/pkg/cache/postgres_util.go b/pkg/cache/postgres_util.go index 1582bda0b..cc0a38602 100644 --- a/pkg/cache/postgres_util.go +++ b/pkg/cache/postgres_util.go @@ -3,9 +3,6 @@ package cache import ( "fmt" "strings" - "time" - - "github.com/flanksource/commons/duration" ) func ConvertNamedParamsDebug(sql string, namedArgs map[string]interface{}) string { @@ -27,28 +24,3 @@ func ConvertNamedParams(sql string, namedArgs map[string]interface{}) (string, [ } return sql, args } - -func timeV(v interface{}) (*time.Time, error) { - if v == nil { - return nil, nil - } - switch v := v.(type) { - case time.Time: - return &v, nil - case time.Duration: - t := time.Now().Add(v * -1) - return &t, nil - case string: - if v == "" { - return nil, nil - } - if t, err := time.Parse(time.RFC3339, v); err == nil { - return &t, nil - } else if d, err := duration.ParseDuration(v); err == nil { - t := time.Now().Add(time.Duration(d) * -1) - return &t, nil - } - return nil, fmt.Errorf("time must be a duration or RFC3339 timestamp") - } - return nil, fmt.Errorf("unknown time type %T", v) -} diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 58f83c17e..744cbf152 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -21,6 +21,8 @@ import ( "github.com/flanksource/duty/context" dutyjob "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" + "github.com/flanksource/duty/query" + dutyTypes "github.com/flanksource/duty/types" "github.com/flanksource/kommons" "github.com/google/uuid" "go.opentelemetry.io/otel/trace" @@ -276,7 +278,7 @@ func updateCanaryStatusAndEvent(ctx context.Context, canary v1.Canary, results [ var pass = true var lastTransitionedTime *metav1.Time var highestLatency float64 - var uptimeAgg pkg.Uptime + var uptimeAgg dutyTypes.Uptime transitioned := false for _, result := range results { @@ -301,7 +303,7 @@ func updateCanaryStatusAndEvent(ctx context.Context, canary v1.Canary, results [ } // Transition - q := cache.QueryParams{Check: checkID, StatusCount: 1} + q := query.CheckQueryParams{Check: checkID, StatusCount: 1} if canary.Status.LastTransitionedTime != nil { q.Start = canary.Status.LastTransitionedTime.Format(time.RFC3339) } diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index 7f264f66f..20d8b4ff6 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -41,7 +41,8 @@ var ReconcileChecks = job.Job{ Schedule: "@every 30m", Fn: func(ctx job.JobRuntime) error { reconciler := upstream.NewUpstreamReconciler(UpstreamConf, 5) - return reconciler.SyncAfter(ctx.Context, "checks", ReconcileMaxAge) + _, err := reconciler.SyncAfter(ctx.Context, "checks", ReconcileMaxAge) + return err }, } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 982b935c5..fa56974c5 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -8,8 +8,10 @@ import ( "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/types" cmap "github.com/orcaman/concurrent-map" "github.com/prometheus/client_golang/prometheus" + "github.com/samber/lo" ) var ( @@ -120,8 +122,8 @@ func RemoveCheckByKey(key string) { latencies.Remove(key) } -func GetMetrics(key string) (uptime pkg.Uptime, latency pkg.Latency) { - uptime = pkg.Uptime{} +func GetMetrics(key string) (uptime types.Uptime, latency types.Latency) { + uptime = types.Uptime{} fail, ok := failed.Get(key) if ok { @@ -135,12 +137,12 @@ func GetMetrics(key string) (uptime pkg.Uptime, latency pkg.Latency) { lat, ok := latencies.Get(key) if ok { - latency = pkg.Latency{Rolling1H: lat.(*rolling.TimePolicy).Reduce(rolling.Percentile(95))} + latency = types.Latency{Rolling1H: lat.(*rolling.TimePolicy).Reduce(rolling.Percentile(95))} } return } -func Record(canary v1.Canary, result *pkg.CheckResult) (_uptime pkg.Uptime, _latency pkg.Latency) { +func Record(canary v1.Canary, result *pkg.CheckResult) (_uptime types.Uptime, _latency types.Latency) { defer func() { e := recover() if e != nil { @@ -236,11 +238,11 @@ func Record(canary v1.Canary, result *pkg.CheckResult) (_uptime pkg.Uptime, _lat OpsFailedCount.WithLabelValues(checkType, endpoint, canaryName, canaryNamespace, owner, severity, key, name).Inc() } - _uptime = pkg.Uptime{Passed: int(pass.Reduce(rolling.Sum)), Failed: int(fail.Reduce(rolling.Sum))} + _uptime = types.Uptime{Passed: int(pass.Reduce(rolling.Sum)), Failed: int(fail.Reduce(rolling.Sum))} if latency != nil { - _latency = pkg.Latency{Rolling1H: latency.Reduce(rolling.Percentile(95))} + _latency = types.Latency{Rolling1H: latency.Reduce(rolling.Percentile(95))} } else { - _latency = pkg.Latency{} + _latency = types.Latency{} } return _uptime, _latency } @@ -308,7 +310,7 @@ func getOrCreateHistogram(m pkg.Metric) error { } } -func FillLatencies(checkKey string, duration string, latency *pkg.Latency) error { +func FillLatencies(checkKey string, duration string, latency *types.Latency) error { if runner.Prometheus == nil || duration == "" { return nil } @@ -331,7 +333,7 @@ func FillLatencies(checkKey string, duration string, latency *pkg.Latency) error return nil } -func FillUptime(checkKey, duration string, uptime *pkg.Uptime) error { +func FillUptime(checkKey, duration string, uptime *types.Uptime) error { if runner.Prometheus == nil || duration == "" { return nil } @@ -339,7 +341,7 @@ func FillUptime(checkKey, duration string, uptime *pkg.Uptime) error { if err != nil { return err } - uptime.P100 = value + uptime.P100 = lo.ToPtr(value) return nil }