Skip to content

Commit

Permalink
Merge pull request #1477 from flanksource/feat/canary-subresource
Browse files Browse the repository at this point in the history
feat: canary scale subresource
  • Loading branch information
moshloop authored Dec 4, 2023
2 parents 4e9a75d + c31d555 commit 1786d89
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 26 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ gen-schemas:

# Generate manifests e.g. CRD, RBAC etc.
manifests: .bin/controller-gen
# For debugging
yq -V

schemaPath=.spec.versions[0].schema.openAPIV3Schema.properties.spec.properties
.bin/controller-gen crd paths="./api/..." output:stdout | yq ea -P '[.] | sort_by(.metadata.name) | .[] | splitDoc' - > config/deploy/crd.yaml
$(MAKE) gen-schemas
Expand Down
8 changes: 8 additions & 0 deletions api/v1/canary_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (

// CanarySpec defines the desired state of Canary
type CanarySpec struct {
//+kubebuilder:default=1
//+optional
Replicas int `yaml:"replicas,omitempty" json:"replicas,omitempty"`

Env map[string]VarSource `yaml:"env,omitempty" json:"env,omitempty"`
HTTP []HTTPCheck `yaml:"http,omitempty" json:"http,omitempty"`
DNS []DNSCheck `yaml:"dns,omitempty" json:"dns,omitempty"`
Expand Down Expand Up @@ -281,6 +285,8 @@ type CanaryStatus struct {
Latency1H string `json:"latency1h,omitempty"`
// used for keeping history of the checks
runnerName string `json:"-"`
// Replicas keep track of the number of replicas
Replicas int `json:"replicas,omitempty"`
}

func (c Canary) GetCheckID(checkName string) string {
Expand All @@ -305,6 +311,7 @@ type CheckStatus struct {
// +kubebuilder:object:root=true

// Canary is the Schema for the canaries API
// +kubebuilder:printcolumn:name="Replicas",type=integer,priority=1,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Interval",type=string,JSONPath=`.spec.interval`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.status`
// +kubebuilder:printcolumn:name="Last Check",type=date,JSONPath=`.status.lastCheck`
Expand All @@ -314,6 +321,7 @@ type CheckStatus struct {
// +kubebuilder:printcolumn:name="Message",type=string,priority=1,JSONPath=`.status.message`
// +kubebuilder:printcolumn:name="Error",type=string,priority=1,JSONPath=`.status.errorMessage`
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
type Canary struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
14 changes: 14 additions & 0 deletions config/deploy/crd.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
Expand All @@ -15,6 +16,10 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .spec.replicas
name: Replicas
priority: 1
type: integer
- jsonPath: .spec.interval
name: Interval
type: string
Expand Down Expand Up @@ -5191,6 +5196,9 @@ spec:
- name
type: object
type: array
replicas:
default: 1
type: integer
restic:
items:
properties:
Expand Down Expand Up @@ -5729,6 +5737,9 @@ spec:
type: integer
persistedID:
type: string
replicas:
description: Replicas keep track of the number of replicas
type: integer
status:
type: string
uptime1h:
Expand All @@ -5739,6 +5750,9 @@ spec:
served: true
storage: true
subresources:
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
status: {}
---
apiVersion: apiextensions.k8s.io/v1
Expand Down
13 changes: 13 additions & 0 deletions config/deploy/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .spec.replicas
name: Replicas
priority: 1
type: integer
- jsonPath: .spec.interval
name: Interval
type: string
Expand Down Expand Up @@ -5191,6 +5195,9 @@ spec:
- name
type: object
type: array
replicas:
default: 1
type: integer
restic:
items:
properties:
Expand Down Expand Up @@ -5729,6 +5736,9 @@ spec:
type: integer
persistedID:
type: string
replicas:
description: Replicas keep track of the number of replicas
type: integer
status:
type: string
uptime1h:
Expand All @@ -5739,6 +5749,9 @@ spec:
served: true
storage: true
subresources:
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
status: {}
---
apiVersion: v1
Expand Down
6 changes: 6 additions & 0 deletions config/schemas/canary.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@
},
"CanarySpec": {
"properties": {
"replicas": {
"type": "integer"
},
"env": {
"patternProperties": {
".*": {
Expand Down Expand Up @@ -726,6 +729,9 @@
},
"latency1h": {
"type": "string"
},
"replicas": {
"type": "integer"
}
},
"additionalProperties": false,
Expand Down
3 changes: 3 additions & 0 deletions config/schemas/component.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@
},
"CanarySpec": {
"properties": {
"replicas": {
"type": "integer"
},
"env": {
"patternProperties": {
".*": {
Expand Down
3 changes: 3 additions & 0 deletions config/schemas/topology.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@
},
"CanarySpec": {
"properties": {
"replicas": {
"type": "integer"
},
"env": {
"patternProperties": {
".*": {
Expand Down
27 changes: 16 additions & 11 deletions pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg/labels"
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/collections"
"github.com/flanksource/commons/console"
"github.com/flanksource/commons/logger"
cUtils "github.com/flanksource/commons/utils"
Expand Down Expand Up @@ -118,17 +119,18 @@ type Timeseries struct {
}

type Canary struct {
ID uuid.UUID `gorm:"default:generate_ulid()"`
AgentID uuid.UUID
Spec types.JSON `json:"spec"`
Labels types.JSONStringMap `json:"labels"`
Source string
Name string
Namespace string
Checks types.JSONStringMap `gorm:"-"`
CreatedAt time.Time
UpdatedAt time.Time `json:"updated_at"`
DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"`
ID uuid.UUID `gorm:"default:generate_ulid()"`
AgentID uuid.UUID
Spec types.JSON `json:"spec"`
Labels types.JSONStringMap `json:"labels"`
Source string
Name string
Namespace string
Checks types.JSONStringMap `gorm:"-"`
Annotations types.JSONStringMap `json:"annotations,omitempty"`
CreatedAt time.Time
UpdatedAt time.Time `json:"updated_at"`
DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"`
}

func (c Canary) GetCheckID(checkName string) string {
Expand Down Expand Up @@ -156,7 +158,10 @@ func (c Canary) ToV1() (*v1.Canary, error) {
logger.Debugf("Failed to unmarshal canary spec: %s", err)
return nil, err
}

canary.Status.Checks = c.Checks
canary.ObjectMeta.Annotations = collections.MergeMap(canary.ObjectMeta.Annotations, c.Annotations)

return &canary, nil
}

Expand Down
65 changes: 57 additions & 8 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package controllers

import (
gocontext "context"
"fmt"
"time"

"github.com/flanksource/canary-checker/pkg/db"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flanksource/canary-checker/api/context"

v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
dutyContext "github.com/flanksource/duty/context"
"github.com/flanksource/kommons"
"github.com/go-logr/logr"
jsontime "github.com/liamylian/jsontime/v2/v2"
Expand Down Expand Up @@ -67,7 +70,9 @@ const FinalizerName = "canary.canaries.flanksource.com"
// +kubebuilder:rbac:groups=canaries.flanksource.com,resources=canaries/status,verbs=get;update;patch
// +kubebuilder:rbac:groups="",resources=pods/exec,verbs=*
// +kubebuilder:rbac:groups="",resources=pods/logs,verbs=*
func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *CanaryReconciler) Reconcile(parentCtx gocontext.Context, req ctrl.Request) (ctrl.Result, error) {
ctx := context.DefaultContext.Wrap(parentCtx)

logger := r.Log.WithValues("canary", req.NamespacedName)
canary := &v1.Canary{}
err := r.Get(ctx, req.NamespacedName, canary)
Expand All @@ -82,6 +87,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
}

canary.SetRunnerName(r.RunnerName)

// Add finalizer first if not exist to avoid the race condition between init and delete
if !controllerutil.ContainsFinalizer(canary, FinalizerName) {
controllerutil.AddFinalizer(canary, FinalizerName)
Expand All @@ -99,14 +105,14 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
return ctrl.Result{}, r.Update(ctx, canary)
}

dbCanary, err := r.updateCanaryInDB(canary)
dbCanary, err := r.updateCanaryInDB(ctx, canary)
if err != nil {
return ctrl.Result{Requeue: true}, err
}

// Sync jobs if canary is created or updated
if canary.Generation == 1 {
if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary); err != nil {
logger.Error(err, "failed to sync canary job")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
Expand All @@ -121,12 +127,55 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
}
patch := client.MergeFrom(canaryForStatus.DeepCopy())

if val, ok := canary.Annotations["next-runtime"]; ok {
runAt, err := time.Parse(time.RFC3339, val)
if err != nil {
return ctrl.Result{}, err
}

var syncCanaryJobOptions = []canaryJobs.SyncCanaryJobOption{}
syncCanaryJobOptions = append(syncCanaryJobOptions, canaryJobs.WithSchedule(
fmt.Sprintf("%d %d %d %d *", runAt.Minute(), runAt.Hour(), runAt.Day(), runAt.Month()),
))
if !runAt.After(time.Now()) {
syncCanaryJobOptions = append(syncCanaryJobOptions, canaryJobs.WithRunNow(true))
}

if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary, syncCanaryJobOptions...); err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}

delete(canary.Annotations, "next-runtime")
if err := r.Update(ctx, canary); err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
}

if canaryForStatus.Status.Replicas != canary.Spec.Replicas {
if canary.Spec.Replicas == 0 {
canaryJobs.DeleteCanaryJob(canary.GetPersistedID())
if err := db.SuspendCanary(ctx, canary.GetPersistedID(), true); err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
} else {
if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary); err != nil {
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
if err := db.SuspendCanary(ctx, canary.GetPersistedID(), false); err != nil {
logger.Error(err, "failed to suspend canary")
}
}

canaryForStatus.Status.Replicas = canary.Spec.Replicas
}

canaryForStatus.Status.Checks = dbCanary.Checks
canaryForStatus.Status.ObservedGeneration = canary.Generation
if err = r.Status().Patch(ctx, &canaryForStatus, patch); err != nil {
logger.Error(err, "failed to update status for canary")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}

return ctrl.Result{}, nil
}

Expand All @@ -137,26 +186,26 @@ func (r *CanaryReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary, error) {
func (r *CanaryReconciler) persistAndCacheCanary(ctx dutyContext.Context, canary *v1.Canary) (*pkg.Canary, error) {
dbCanary, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID())
if err != nil {
return nil, err
}
r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration)

if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(ctx, *dbCanary); err != nil {
return nil, err
}
return dbCanary, nil
}

func (r *CanaryReconciler) updateCanaryInDB(canary *v1.Canary) (*pkg.Canary, error) {
func (r *CanaryReconciler) updateCanaryInDB(ctx dutyContext.Context, canary *v1.Canary) (*pkg.Canary, error) {
var dbCanary *pkg.Canary
var err error

// Get DBCanary from cache if exists else persist in database and update cache
if cacheObj, exists := r.CanaryCache.Get(canary.GetPersistedID()); !exists {
dbCanary, err = r.persistAndCacheCanary(canary)
dbCanary, err = r.persistAndCacheCanary(ctx, canary)
if err != nil {
return nil, err
}
Expand All @@ -172,7 +221,7 @@ func (r *CanaryReconciler) updateCanaryInDB(canary *v1.Canary) (*pkg.Canary, err
}
opts := jsondiff.DefaultJSONOptions()
if diff, _ := jsondiff.Compare(canarySpecJSON, dbCanary.Spec, &opts); diff != jsondiff.FullMatch {
dbCanary, err = r.persistAndCacheCanary(canary)
dbCanary, err = r.persistAndCacheCanary(ctx, canary)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1786d89

Please sign in to comment.