From 654c4b2835a99f2daa6a96a8ede3d58d03b7bb9c Mon Sep 17 00:00:00 2001 From: closetool Date: Thu, 23 Mar 2023 14:04:14 +0800 Subject: [PATCH 1/2] feat: merge job with core Signed-off-by: closetool --- .github/workflows/image.yml | 3 +- build/job/Dockerfile | 29 --- config.yaml | 7 +- core/cmd/cmd.go | 72 +++++- core/config/config.go | 2 + job/cmd/cmd.go | 223 ------------------ job/main.go | 25 -- pkg/config/job/job.go | 22 ++ .../jobs/jobautofree}/autofree.go | 4 +- .../jobs/jobautofree}/autofree_test.go | 4 +- pkg/jobs/jobwebhook/webhook.go | 34 +++ pkg/jobs/singleinstancejob.go | 80 +++++++ 12 files changed, 208 insertions(+), 297 deletions(-) delete mode 100644 build/job/Dockerfile delete mode 100644 job/cmd/cmd.go delete mode 100644 job/main.go create mode 100644 pkg/config/job/job.go rename {job/autofree => pkg/jobs/jobautofree}/autofree.go (96%) rename {job/autofree => pkg/jobs/jobautofree}/autofree_test.go (98%) create mode 100644 pkg/jobs/jobwebhook/webhook.go create mode 100644 pkg/jobs/singleinstancejob.go diff --git a/.github/workflows/image.yml b/.github/workflows/image.yml index f4a0898c..bb5b1916 100644 --- a/.github/workflows/image.yml +++ b/.github/workflows/image.yml @@ -11,7 +11,7 @@ jobs: release: strategy: matrix: - components: [core, job, swagger] + components: [core, swagger] runs-on: ubuntu-18.04 steps: - uses: actions/checkout@v3 @@ -25,7 +25,6 @@ jobs: go: - '**.go' - 'build/core/Dockerfile' - - 'build/job/Dockerfile' api: - 'openapi/**' - 'build/swagger/build.sh' diff --git a/build/job/Dockerfile b/build/job/Dockerfile deleted file mode 100644 index c27364d4..00000000 --- a/build/job/Dockerfile +++ /dev/null @@ -1,29 +0,0 @@ -FROM golang:1.15.3 AS builder -COPY . /horizon - -WORKDIR /horizon - -RUN CGO_ENABLED=0 GOOS=linux go build -o bin/app -ldflags '-s -w' ./job/main.go - -FROM alpine:3.9 AS runtime - -RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && \ - apk update && apk add bash curl git && \ - apk add tzdata && cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone - -ARG GROUP=netease -ARG USER=appops -ARG GROUP_ID=10001 -ARG USER_ID=10001 - -COPY --from=builder /horizon/bin/app /usr/local/bin/app - -RUN addgroup --gid $GROUP_ID $GROUP && adduser -h /home/$USER -u $USER_ID -G $GROUP -D $USER && \ - chown -R $USER:$GROUP /usr/local/bin/app - - -RUN echo "hosts: files dns" > /etc/nsswitch.conf - -USER $USER - -ENTRYPOINT ["/usr/local/bin/app"] diff --git a/config.yaml b/config.yaml index a7fa7786..6a31f606 100644 --- a/config.yaml +++ b/config.yaml @@ -2,6 +2,12 @@ serverConfig: port: 8080 cloudEventServerConfig: port: 8181 +jobConfig: + lockName: horizon-lock + lockNS: horizoncd + leaseDuration: 15 + renewDeadline: 10 + retryPeriod: 2 dbConfig: host: "" port: 3331 @@ -77,4 +83,3 @@ oauth: tokenConfig: jwtSigningKey: "" callbackTokenExpireIn: 2h - diff --git a/core/cmd/cmd.go b/core/cmd/cmd.go index ffc0461e..bbf07476 100644 --- a/core/cmd/cmd.go +++ b/core/cmd/cmd.go @@ -10,7 +10,10 @@ import ( "log" "net/http" "os" + "os/signal" "regexp" + "sync" + "syscall" "github.com/horizoncd/horizon/core/config" accessctl "github.com/horizoncd/horizon/core/controller/access" @@ -90,6 +93,9 @@ import ( gitlablib "github.com/horizoncd/horizon/lib/gitlab" "github.com/horizoncd/horizon/pkg/environment/service" "github.com/horizoncd/horizon/pkg/grafana" + "github.com/horizoncd/horizon/pkg/jobs" + "github.com/horizoncd/horizon/pkg/jobs/jobautofree" + "github.com/horizoncd/horizon/pkg/jobs/jobwebhook" "github.com/horizoncd/horizon/pkg/token/generator" tokenservice "github.com/horizoncd/horizon/pkg/token/service" tokenstorage "github.com/horizoncd/horizon/pkg/token/storage" @@ -227,19 +233,21 @@ func runPProfServer(config *pprof.Config) { } } -func Init(flags *Flags) *config.Config { - // init log - InitLog(flags) - - // load coreConfig +func LoadConfig(flags *Flags) (*config.Config, error) { coreConfig, err := config.LoadConfig(flags.ConfigFile) if err != nil { - panic(err) + return nil, err } _, err = json.MarshalIndent(coreConfig, "", " ") if err != nil { - panic(err) + return nil, err } + return coreConfig, nil +} + +func Init(ctx context.Context, flags *Flags, coreConfig *config.Config) { + // init log + InitLog(flags) // init roles file, err := os.OpenFile(flags.RoleConfigFile, os.O_RDONLY, 0644) @@ -293,8 +301,6 @@ func Init(flags *Flags) *config.Config { // init manager parameter manager := managerparam.InitManager(mysqlDB) - // init service - ctx := context.Background() gitlabGitops, err := gitlablib.New(coreConfig.GitopsRepoConfig.Token, coreConfig.GitopsRepoConfig.URL) if err != nil { @@ -542,6 +548,15 @@ func Init(flags *Flags) *config.Config { webhookAPIV2 = webhookv2.NewAPI(webhookCtl) ) + // start jobs + autoFreeJob := func(ctx context.Context) { + jobautofree.Run(ctx, &coreConfig.AutoFreeConfig, parameter.UserManager, clusterCtl, prCtl) + } + webhookJob := func(ctx context.Context) { + jobwebhook.Run(ctx, coreConfig.EventHandlerConfig, coreConfig.WebhookConfig, manager) + } + go jobs.Run(ctx, &coreConfig.JobConfig, autoFreeJob, webhookJob) + // init server r := gin.New() // use middleware @@ -655,13 +670,44 @@ func Init(flags *Flags) *config.Config { // start api server log.Printf("Server started") log.Print(r.Run(fmt.Sprintf(":%d", coreConfig.ServerConfig.Port))) - return coreConfig } // Run runs the agent. func Run(flags *Flags) { - // init api - c := Init(flags) + ctx, cancelFunc := context.WithCancel(context.Background()) + + setTasksBeforeExit(cancelFunc) + + configs, err := LoadConfig(flags) + if err != nil { + panic(err) + } + // enable pprof - runPProfServer(&c.PProf) + runPProfServer(&configs.PProf) + + // init api + Init(ctx, flags, configs) +} + +// setTasksBeforeExit set stop funcs which will be executed after sigterm and sigint catched +func setTasksBeforeExit(stopFuncs ...func()) { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) + go func() { + s := <-sig + log.Printf("got %s signal, stop tasks...\n", s) + if len(stopFuncs) == 0 { + return + } + wg := sync.WaitGroup{} + wg.Add(len(stopFuncs)) + for _, stopFunc := range stopFuncs { + go func(stop func()) { + stop() + }(stopFunc) + } + wg.Wait() + log.Printf("all tasks stopped, exit now.") + }() } diff --git a/core/config/config.go b/core/config/config.go index 448e288f..e7077c93 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -12,6 +12,7 @@ import ( "github.com/horizoncd/horizon/pkg/config/git" "github.com/horizoncd/horizon/pkg/config/gitlab" "github.com/horizoncd/horizon/pkg/config/grafana" + "github.com/horizoncd/horizon/pkg/config/job" "github.com/horizoncd/horizon/pkg/config/oauth" "github.com/horizoncd/horizon/pkg/config/pprof" "github.com/horizoncd/horizon/pkg/config/redis" @@ -29,6 +30,7 @@ import ( type Config struct { ServerConfig server.Config `yaml:"serverConfig"` CloudEventServerConfig server.Config `yaml:"cloudEventServerConfig"` + JobConfig job.Config `yaml:"jobConfig"` PProf pprof.Config `yaml:"pprofConfig"` DBConfig db.Config `yaml:"dbConfig"` SessionConfig session.Config `yaml:"sessionConfig"` diff --git a/job/cmd/cmd.go b/job/cmd/cmd.go deleted file mode 100644 index 6b8e009b..00000000 --- a/job/cmd/cmd.go +++ /dev/null @@ -1,223 +0,0 @@ -package cmd - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "log" - "os" - "os/signal" - "sync" - "syscall" - - "github.com/gin-gonic/gin" - "github.com/horizoncd/horizon/core/config" - clusterctl "github.com/horizoncd/horizon/core/controller/cluster" - prctl "github.com/horizoncd/horizon/core/controller/pipelinerun" - "github.com/horizoncd/horizon/core/http/health" - ginlogmiddle "github.com/horizoncd/horizon/core/middleware/ginlog" - "github.com/horizoncd/horizon/job/autofree" - gitlablib "github.com/horizoncd/horizon/lib/gitlab" - "github.com/horizoncd/horizon/lib/orm" - "github.com/horizoncd/horizon/pkg/cluster/cd" - clustergitrepo "github.com/horizoncd/horizon/pkg/cluster/gitrepo" - eventhandlersvc "github.com/horizoncd/horizon/pkg/eventhandler" - "github.com/horizoncd/horizon/pkg/eventhandler/wlgenerator" - "github.com/horizoncd/horizon/pkg/grafana" - "github.com/horizoncd/horizon/pkg/param" - "github.com/horizoncd/horizon/pkg/param/managerparam" - "github.com/horizoncd/horizon/pkg/templaterepo" - "github.com/horizoncd/horizon/pkg/util/kube" - callbacks "github.com/horizoncd/horizon/pkg/util/ormcallbacks" - webhooksvc "github.com/horizoncd/horizon/pkg/webhook/service" - "github.com/sirupsen/logrus" -) - -// Flags defines agent CLI flags. -type Flags struct { - ConfigFile string - Environment string - LogLevel string - GitOpsRepoDefaultBranch string -} - -// ParseFlags parses agent CLI flags. -func ParseFlags() *Flags { - var flags Flags - - flag.StringVar( - &flags.ConfigFile, "config", "", "configuration file path") - - flag.StringVar( - &flags.Environment, "environment", "production", "environment string tag") - - flag.StringVar( - &flags.LogLevel, "loglevel", "info", "the loglevel(panic/fatal/error/warn/info/debug/trace))") - - flag.StringVar( - &flags.GitOpsRepoDefaultBranch, "gitOpsRepoDefaultBranch", "master", - "configure gitops git engine default branch") - - flag.Parse() - return &flags -} - -func InitLog(flags *Flags) { - if flags.Environment == "production" { - logrus.SetFormatter(&logrus.JSONFormatter{}) - } else { - logrus.SetFormatter(&logrus.TextFormatter{}) - } - logrus.SetOutput(os.Stdout) - level, err := logrus.ParseLevel(flags.LogLevel) - if err != nil { - panic(err) - } - logrus.SetLevel(level) -} - -// Run runs the agent. -func Run(flags *Flags) { - // init log - InitLog(flags) - - // load coreConfig - coreConfig, err := config.LoadConfig(flags.ConfigFile) - if err != nil { - panic(err) - } - _, err = json.MarshalIndent(coreConfig, "", " ") - if err != nil { - panic(err) - } - - // init db - mysqlDB, err := orm.NewMySQLDB(&orm.MySQL{ - Host: coreConfig.DBConfig.Host, - Port: coreConfig.DBConfig.Port, - Username: coreConfig.DBConfig.Username, - Password: coreConfig.DBConfig.Password, - Database: coreConfig.DBConfig.Database, - PrometheusEnabled: coreConfig.DBConfig.PrometheusEnabled, - }) - if err != nil { - panic(err) - } - callbacks.RegisterCustomCallbacks(mysqlDB) - - // init manager parameter - manager := managerparam.InitManager(mysqlDB) - // init context - ctx := context.Background() - - gitlabGitops, err := gitlablib.New(coreConfig.GitopsRepoConfig.Token, coreConfig.GitopsRepoConfig.URL) - if err != nil { - panic(err) - } - // check existence of gitops root group - rootGroupPath := coreConfig.GitopsRepoConfig.RootGroupPath - rootGroup, err := gitlabGitops.GetGroup(ctx, rootGroupPath) - if err != nil { - log.Printf("failed to get gitops root group, error: %s, start to create it", err.Error()) - rootGroup, err = gitlabGitops.CreateGroup(ctx, rootGroupPath, rootGroupPath, nil) - if err != nil { - panic(err) - } - } - - templateRepo, err := templaterepo.NewRepo(coreConfig.TemplateRepo) - if err != nil { - panic(err) - } - - clusterGitRepo, err := clustergitrepo.NewClusterGitlabRepo(ctx, rootGroup, templateRepo, gitlabGitops, - flags.GitOpsRepoDefaultBranch) - if err != nil { - panic(err) - } - - parameter := ¶m.Param{ - Manager: manager, - Cd: cd.NewCD(clusterGitRepo, coreConfig.ArgoCDMapper, flags.GitOpsRepoDefaultBranch), - } - - // init controller - var ( - clusterCtl = clusterctl.NewController(&config.Config{}, parameter) - prCtl = prctl.NewController(parameter) - ) - - // init kube client - _, client, err := kube.BuildClient(coreConfig.KubeConfig) - if err != nil { - panic(err) - } - - // sync grafana datasource periodically - grafanaService := grafana.NewService(coreConfig.GrafanaConfig, manager, client) - cancellableCtx, cancel := context.WithCancel(ctx) - defer cancel() - go func() { - grafanaService.SyncDatasource(cancellableCtx) - }() - - log.Printf("auto-free job Config: %+v", coreConfig.AutoFreeConfig) - // automatically release expired clusters - go func() { - autofree.AutoReleaseExpiredClusterJob(cancellableCtx, &coreConfig.AutoFreeConfig, - parameter.UserManager, clusterCtl, prCtl) - }() - - // start event handler service to generate webhook log by events - eventHandlerService := eventhandlersvc.NewService(ctx, manager, coreConfig.EventHandlerConfig) - if err := eventHandlerService.RegisterEventHandler("webhook", - wlgenerator.NewWebhookLogGenerator(manager)); err != nil { - log.Printf("failed to register event handler, error: %s", err.Error()) - } - eventHandlerService.Start() - - // start webhook service with multi workers to consume webhook logs and send webhook events - webhookService := webhooksvc.NewService(ctx, manager, coreConfig.WebhookConfig) - webhookService.Start() - - // graceful exit - setTasksBeforeExit(webhookService.StopAndWait, eventHandlerService.StopAndWait) - - r := gin.New() - // use middleware - middlewares := []gin.HandlerFunc{ - ginlogmiddle.Middleware(gin.DefaultWriter, "/health"), - gin.Recovery(), - } - r.Use(middlewares...) - - gin.ForceConsoleColor() - - health.RegisterRoutes(r) - - log.Printf("Server started") - log.Print(r.Run(fmt.Sprintf(":%d", coreConfig.ServerConfig.Port))) -} - -// setTasksBeforeExit set stop funcs which will be executed after sigterm and sigint catched -func setTasksBeforeExit(stopFuncs ...func()) { - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) - go func() { - s := <-sig - log.Printf("got %s signal, stop tasks...\n", s) - if len(stopFuncs) == 0 { - return - } - wg := sync.WaitGroup{} - wg.Add(len(stopFuncs)) - for _, stopFunc := range stopFuncs { - go func(stop func()) { - stop() - }(stopFunc) - } - wg.Wait() - log.Printf("all tasks stopped, exit now.") - }() -} diff --git a/job/main.go b/job/main.go deleted file mode 100644 index 2e90904c..00000000 --- a/job/main.go +++ /dev/null @@ -1,25 +0,0 @@ -package main - -import ( - "github.com/horizoncd/horizon/job/cmd" - - // for image registry - _ "github.com/horizoncd/horizon/pkg/cluster/registry/harbor" - - _ "github.com/horizoncd/horizon/pkg/git" - _ "github.com/horizoncd/horizon/pkg/git/github" - _ "github.com/horizoncd/horizon/pkg/git/gitlab" - - // for template repo - _ "github.com/horizoncd/horizon/pkg/templaterepo/chartmuseumbase" - - // for k8s workload - _ "github.com/horizoncd/horizon/pkg/cluster/cd/workload/deployment" - _ "github.com/horizoncd/horizon/pkg/cluster/cd/workload/kservice" - _ "github.com/horizoncd/horizon/pkg/cluster/cd/workload/pod" - _ "github.com/horizoncd/horizon/pkg/cluster/cd/workload/rollout" -) - -func main() { - cmd.Run(cmd.ParseFlags()) -} diff --git a/pkg/config/job/job.go b/pkg/config/job/job.go new file mode 100644 index 00000000..513a6dc8 --- /dev/null +++ b/pkg/config/job/job.go @@ -0,0 +1,22 @@ +package job + +type Config struct { + + // LockName is the name of the lock resource + LockName string `yaml:"lockName"` + + // LockNS is the namespace of the lock resource + LockNS string `yaml:"lockNS"` + + // LeaseDuration is the duration that non-leader candidates will + // wait to force acquire leadership + LeaseDuration int `yaml:"leaseDuration"` + + // RenewDeadline is the duration that the acting master will retry + // refreshing leadership before giving up + RenewDeadline int `yaml:"renewDeadline"` + + // RetryPeriod is the duration the LeaderElector clients should wait + // between tries of actions + RetryPeriod int `yaml:"retryPeriod"` +} diff --git a/job/autofree/autofree.go b/pkg/jobs/jobautofree/autofree.go similarity index 96% rename from job/autofree/autofree.go rename to pkg/jobs/jobautofree/autofree.go index f0f5bd87..2f1e5e09 100644 --- a/job/autofree/autofree.go +++ b/pkg/jobs/jobautofree/autofree.go @@ -1,4 +1,4 @@ -package autofree +package jobautofree import ( "context" @@ -16,7 +16,7 @@ import ( uuid "github.com/satori/go.uuid" ) -func AutoReleaseExpiredClusterJob(ctx context.Context, jobConfig *autofree.Config, userMgr usermanager.Manager, +func Run(ctx context.Context, jobConfig *autofree.Config, userMgr usermanager.Manager, clusterCtr clusterctl.Controller, prCtr prctl.Controller) { // verify account user, err := userMgr.GetUserByID(ctx, jobConfig.AccountID) diff --git a/job/autofree/autofree_test.go b/pkg/jobs/jobautofree/autofree_test.go similarity index 98% rename from job/autofree/autofree_test.go rename to pkg/jobs/jobautofree/autofree_test.go index 8484fc99..e6941dc0 100644 --- a/job/autofree/autofree_test.go +++ b/pkg/jobs/jobautofree/autofree_test.go @@ -1,4 +1,4 @@ -package autofree +package jobautofree import ( "context" @@ -189,7 +189,7 @@ func TestAutoFreeExpiredCluster(t *testing.T) { <-timer.C cancelFunc() }() - AutoReleaseExpiredClusterJob(ctx, &autofree.Config{ + Run(ctx, &autofree.Config{ AccountID: 1, JobInterval: 1 * time.Second, BatchInterval: 0 * time.Second, diff --git a/pkg/jobs/jobwebhook/webhook.go b/pkg/jobs/jobwebhook/webhook.go new file mode 100644 index 00000000..37968617 --- /dev/null +++ b/pkg/jobs/jobwebhook/webhook.go @@ -0,0 +1,34 @@ +package jobwebhook + +import ( + "context" + "log" + + eventhandlercfg "github.com/horizoncd/horizon/pkg/config/eventhandler" + webhookcfg "github.com/horizoncd/horizon/pkg/config/webhook" + eventhandlersvc "github.com/horizoncd/horizon/pkg/eventhandler" + "github.com/horizoncd/horizon/pkg/eventhandler/wlgenerator" + "github.com/horizoncd/horizon/pkg/param/managerparam" + webhooksvc "github.com/horizoncd/horizon/pkg/webhook/service" +) + +// Run runs the agent. +func Run(ctx context.Context, eventHandlerConfig eventhandlercfg.Config, + webhookCfg webhookcfg.Config, mgrs *managerparam.Manager) { + // start event handler service to generate webhook log by events + eventHandlerService := eventhandlersvc.NewService(ctx, mgrs, eventHandlerConfig) + if err := eventHandlerService.RegisterEventHandler("webhook", + wlgenerator.NewWebhookLogGenerator(mgrs)); err != nil { + log.Printf("failed to register event handler, error: %s", err.Error()) + } + eventHandlerService.Start() + + // start webhook service with multi workers to consume webhook logs and send webhook events + webhookService := webhooksvc.NewService(ctx, mgrs, webhookCfg) + webhookService.Start() + + <-ctx.Done() + // graceful exit + webhookService.StopAndWait() + eventHandlerService.StopAndWait() +} diff --git a/pkg/jobs/singleinstancejob.go b/pkg/jobs/singleinstancejob.go new file mode 100644 index 00000000..c5319ca5 --- /dev/null +++ b/pkg/jobs/singleinstancejob.go @@ -0,0 +1,80 @@ +package jobs + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + jobconfig "github.com/horizoncd/horizon/pkg/config/job" + "github.com/horizoncd/horizon/pkg/util/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" +) + +type Job = func(ctx context.Context) + +// Run runs the job in a single instance +func Run(ctx context.Context, jobconfig *jobconfig.Config, jobs ...Job) { + // get candidate name + candidateID := fmt.Sprintf("candidate-%s", uuid.New()) + + // get jobconfig in cluster + config, err := rest.InClusterConfig() + if err != nil { + log.Errorf(ctx, "Failed to get jobconfig: %v", err) + return + } + + // create the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + + // create the leader elector + var elector *leaderelection.LeaderElector + electionConfig := leaderelection.LeaderElectionConfig{ + Lock: &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Namespace: jobconfig.LockNS, + Name: jobconfig.LockName, + }, + Client: clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: candidateID, + }, + }, + LeaseDuration: time.Duration(jobconfig.LeaseDuration) * time.Second, + RenewDeadline: time.Duration(jobconfig.RenewDeadline) * time.Second, + RetryPeriod: time.Duration(jobconfig.RetryPeriod) * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + for _, job := range jobs { + go job(ctx) + } + }, + OnStoppedLeading: func() { + log.Debugf(ctx, "%s lost leadership", candidateID) + }, + }, + } + + elector, err = leaderelection.NewLeaderElector(electionConfig) + if err != nil { + panic(err) + } + + // start the leader elector + for { + select { + case <-ctx.Done(): + return + default: + elector.Run(ctx) + } + } +} From 29e3c4a092f2f206ced2fc416b57e412167aff0b Mon Sep 17 00:00:00 2001 From: closetool Date: Fri, 24 Mar 2023 10:26:14 +0800 Subject: [PATCH 2/2] fix: add grafana sync job Signed-off-by: closetool --- core/cmd/cmd.go | 6 +++++- pkg/grafana/grafana.go | 3 +-- pkg/jobs/jobgrafanasync/grafanasync.go | 16 ++++++++++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 pkg/jobs/jobgrafanasync/grafanasync.go diff --git a/core/cmd/cmd.go b/core/cmd/cmd.go index bbf07476..9a6de607 100644 --- a/core/cmd/cmd.go +++ b/core/cmd/cmd.go @@ -95,6 +95,7 @@ import ( "github.com/horizoncd/horizon/pkg/grafana" "github.com/horizoncd/horizon/pkg/jobs" "github.com/horizoncd/horizon/pkg/jobs/jobautofree" + "github.com/horizoncd/horizon/pkg/jobs/jobgrafanasync" "github.com/horizoncd/horizon/pkg/jobs/jobwebhook" "github.com/horizoncd/horizon/pkg/token/generator" tokenservice "github.com/horizoncd/horizon/pkg/token/service" @@ -555,7 +556,10 @@ func Init(ctx context.Context, flags *Flags, coreConfig *config.Config) { webhookJob := func(ctx context.Context) { jobwebhook.Run(ctx, coreConfig.EventHandlerConfig, coreConfig.WebhookConfig, manager) } - go jobs.Run(ctx, &coreConfig.JobConfig, autoFreeJob, webhookJob) + grafanaSyncJob := func(ctx context.Context) { + jobgrafanasync.Run(ctx, coreConfig, manager, client) + } + go jobs.Run(ctx, &coreConfig.JobConfig, autoFreeJob, webhookJob, grafanaSyncJob) // init server r := gin.New() diff --git a/pkg/grafana/grafana.go b/pkg/grafana/grafana.go index 6e5c5461..6d3684d0 100644 --- a/pkg/grafana/grafana.go +++ b/pkg/grafana/grafana.go @@ -80,9 +80,8 @@ func (s *service) SyncDatasource(ctx context.Context) { log.Debug(ctx, "Get done signal from context") return case <-ticker.C: + s.sync(ctx) } - - s.sync(ctx) } } diff --git a/pkg/jobs/jobgrafanasync/grafanasync.go b/pkg/jobs/jobgrafanasync/grafanasync.go new file mode 100644 index 00000000..321e9916 --- /dev/null +++ b/pkg/jobs/jobgrafanasync/grafanasync.go @@ -0,0 +1,16 @@ +package jobgrafanasync + +import ( + "context" + + "github.com/horizoncd/horizon/core/config" + "github.com/horizoncd/horizon/pkg/grafana" + "github.com/horizoncd/horizon/pkg/param/managerparam" + "k8s.io/client-go/kubernetes" +) + +func Run(ctx context.Context, coreConfig *config.Config, + manager *managerparam.Manager, client kubernetes.Interface) { + grafanaService := grafana.NewService(coreConfig.GrafanaConfig, manager, client) + grafanaService.SyncDatasource(ctx) +}