From b7246b1ffaa0555fb099640c3969f697d36dba74 Mon Sep 17 00:00:00 2001 From: Yuriy Losev Date: Thu, 12 Oct 2023 20:05:38 +0400 Subject: [PATCH] http --- pkg/app/app.go | 15 ++---- pkg/shell-operator/bootstrap.go | 13 ++--- pkg/shell-operator/http_server.go | 79 +++++++++++++++++++++++------ pkg/shell-operator/metrics_hooks.go | 25 ++------- pkg/shell-operator/operator.go | 47 +++++++++-------- 5 files changed, 102 insertions(+), 77 deletions(-) diff --git a/pkg/app/app.go b/pkg/app/app.go index 3f80cb3f..24318705 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -20,10 +20,9 @@ var ( ) var ( - Namespace = "" - ListenAddress = "0.0.0.0" - ListenPort = "9115" - HookMetricsListenPort = "" + Namespace = "" + ListenAddress = "0.0.0.0" + ListenPort = "9115" ) var PrometheusMetricsPrefix = "shell_operator_" @@ -124,14 +123,6 @@ func DefineStartCommandFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause) StringVar(&PrometheusMetricsPrefix) } - flag = CommonFlagsInfo["hook-metrics-listen-port"] - if flag.Define { - cmd.Flag(flag.Name, flag.Help). - Envar(flag.Envar). - Default(HookMetricsListenPort). - StringVar(&HookMetricsListenPort) - } - flag = CommonFlagsInfo["namespace"] if flag.Define { cmd.Flag(flag.Name, flag.Help). diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index 06dae499..38222f61 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -3,7 +3,6 @@ package shell_operator import ( "context" "fmt" - "net/http" log "github.com/sirupsen/logrus" @@ -51,7 +50,7 @@ func Init() (*ShellOperator, error) { return nil, err } - err = op.AssembleCommonOperator() + err = op.AssembleCommonOperator(app.ListenAddress, app.ListenPort) if err != nil { log.Errorf("Fatal: %s", err) return nil, err @@ -68,15 +67,13 @@ func Init() (*ShellOperator, error) { // AssembleCommonOperator instantiate common dependencies. These dependencies // may be used for shell-operator derivatives, like addon-operator. -func (op *ShellOperator) AssembleCommonOperator() (err error) { - err = startHttpServer(app.ListenAddress, app.ListenPort, http.DefaultServeMux) - if err != nil { - return fmt.Errorf("start HTTP server: %s", err) - } +// requires listenAddress, listenPort to run http server for operator APIs +func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string) (err error) { + op.APIServer = newBaseHTTPServer(listenAddress, listenPort) op.MetricStorage = defaultMetricStorage(op.ctx) - op.HookMetricStorage, err = setupHookMetricStorageAndServer(op.ctx) + op.setupHookMetricStorage() if err != nil { return fmt.Errorf("start HTTP server for hook metrics: %s", err) } diff --git a/pkg/shell-operator/http_server.go b/pkg/shell-operator/http_server.go index b2ab9acf..bcb1290b 100644 --- a/pkg/shell-operator/http_server.go +++ b/pkg/shell-operator/http_server.go @@ -1,39 +1,88 @@ package shell_operator import ( + "context" "fmt" - "net" "net/http" - "os" + "time" + + "github.com/go-chi/chi/v5" log "github.com/sirupsen/logrus" "github.com/flant/shell-operator/pkg/app" ) -func startHttpServer(ip string, port string, mux *http.ServeMux) error { - address := fmt.Sprintf("%s:%s", ip, port) +type baseHTTPServer struct { + router chi.Router + + address string + port string +} - // Check if port is available - listener, err := net.Listen("tcp", address) - if err != nil { - return fmt.Errorf("listen on '%s' fails: %v", address, err) +// Start runs http server +func (bhs *baseHTTPServer) Start(ctx context.Context) { + srv := &http.Server{ + Addr: bhs.address + ":" + bhs.port, + Handler: bhs.router, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, } - log.Infof("Listen on %s", address) + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("base http server listen: %s\n", err) + } + }() + log.Infof("base http server started at %s:%s", bhs.address, bhs.port) go func() { - if err := http.Serve(listener, mux); err != nil { - log.Errorf("Fatal: error starting HTTP server: %s", err) - os.Exit(1) + <-ctx.Done() + log.Info("base http server stopped") + + cctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer func() { + // extra handling here + cancel() + }() + + if err := srv.Shutdown(cctx); err != nil { + log.Fatalf("base http server shutdown failed:%+v", err) } }() +} + +// RegisterRoute register http.HandlerFunc +func (bhs *baseHTTPServer) RegisterRoute(method, pattern string, h http.HandlerFunc) { + switch method { + case http.MethodGet: + bhs.router.Get(pattern, h) + + case http.MethodPost: + bhs.router.Post(pattern, h) + + case http.MethodPut: + bhs.router.Put(pattern, h) + + case http.MethodDelete: + bhs.router.Delete(pattern, h) + } +} + +func newBaseHTTPServer(address, port string) *baseHTTPServer { + router := chi.NewRouter() + + srv := &baseHTTPServer{ + router: router, + address: address, + port: port, + } - return nil + return srv } func registerDefaultRoutes(op *ShellOperator) { - http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + op.APIServer.RegisterRoute(http.MethodGet, "/", func(writer http.ResponseWriter, request *http.Request) { _, _ = fmt.Fprintf(writer, ` Shell operator @@ -43,7 +92,7 @@ func registerDefaultRoutes(op *ShellOperator) { `, app.ListenPort) }) - http.HandleFunc("/metrics", func(writer http.ResponseWriter, request *http.Request) { + op.APIServer.RegisterRoute(http.MethodGet, "/metrics", func(writer http.ResponseWriter, request *http.Request) { if op.MetricStorage != nil { op.MetricStorage.Handler().ServeHTTP(writer, request) } diff --git a/pkg/shell-operator/metrics_hooks.go b/pkg/shell-operator/metrics_hooks.go index a34e792a..08bb7521 100644 --- a/pkg/shell-operator/metrics_hooks.go +++ b/pkg/shell-operator/metrics_hooks.go @@ -1,34 +1,17 @@ package shell_operator import ( - "context" "net/http" "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/metric_storage" ) -// defaultHookMetricStorage creates MetricStorage object -// with new registry to scrape hook metrics on separate port. -func defaultHookMetricStorage(ctx context.Context) *metric_storage.MetricStorage { - metricStorage := metric_storage.NewMetricStorage(ctx, app.PrometheusMetricsPrefix, true) - return metricStorage -} +func (op *ShellOperator) setupHookMetricStorage() { + metricStorage := metric_storage.NewMetricStorage(op.ctx, app.PrometheusMetricsPrefix, true) -func setupHookMetricStorageAndServer(ctx context.Context) (*metric_storage.MetricStorage, error) { - if app.HookMetricsListenPort == "" || app.HookMetricsListenPort == app.ListenPort { - // No separate metric storage required. - return nil, nil - } + op.APIServer.RegisterRoute(http.MethodGet, "/metrics/hooks", metricStorage.Handler().ServeHTTP) // create new metric storage for hooks - metricStorage := defaultHookMetricStorage(ctx) - // Create new ServeMux, and serve on custom port. - mux := http.NewServeMux() - err := startHttpServer(app.ListenAddress, app.HookMetricsListenPort, mux) - if err != nil { - return nil, err - } // register scrape handler - mux.Handle("/metrics", metricStorage.Handler()) - return metricStorage, nil + op.MetricStorage = metricStorage } diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index a6e08ea5..979a3997 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -34,6 +34,9 @@ type ShellOperator struct { ctx context.Context cancel context.CancelFunc + // APIServer common http server for liveness and metrics endpoints + APIServer *baseHTTPServer + MetricStorage *metric_storage.MetricStorage // separate metric storage for hook metrics if separate listen port is configured HookMetricStorage *metric_storage.MetricStorage @@ -64,6 +67,29 @@ func NewShellOperator(ctx context.Context) *ShellOperator { } } +// Start run the operator +func (op *ShellOperator) Start() { + log.Info("start shell-operator") + + op.APIServer.Start(op.ctx) + + // Create 'main' queue and add onStartup tasks and enable bindings tasks. + op.bootstrapMainQueue(op.TaskQueues) + // Start main task queue handler + op.TaskQueues.StartMain() + op.initAndStartHookQueues() + + // Start emit "live" metrics + op.runMetrics() + + // Managers are generating events. This go-routine handles all events and converts them into queued tasks. + // Start it before start all informers to catch all kubernetes events (#42) + op.ManagerEventsHandler.Start() + + // Unlike KubeEventsManager, ScheduleManager has one go-routine. + op.ScheduleManager.Start() +} + func (op *ShellOperator) Stop() { if op.cancel != nil { op.cancel() @@ -362,27 +388,6 @@ func (op *ShellOperator) conversionEventHandler(event conversion.Event) (*conver }, nil } -// Start -func (op *ShellOperator) Start() { - log.Info("start shell-operator") - - // Create 'main' queue and add onStartup tasks and enable bindings tasks. - op.bootstrapMainQueue(op.TaskQueues) - // Start main task queue handler - op.TaskQueues.StartMain() - op.initAndStartHookQueues() - - // Start emit "live" metrics - op.runMetrics() - - // Managers are generating events. This go-routine handles all events and converts them into queued tasks. - // Start it before start all informers to catch all kubernetes events (#42) - op.ManagerEventsHandler.Start() - - // Unlike KubeEventsManager, ScheduleManager has one go-routine. - op.ScheduleManager.Start() -} - // taskHandler func (op *ShellOperator) taskHandler(t task.Task) queue.TaskResult { logEntry := log.WithField("operator.component", "taskRunner")