Skip to content

Commit

Permalink
http
Browse files Browse the repository at this point in the history
  • Loading branch information
yalosev committed Oct 12, 2023
1 parent 4625f46 commit b7246b1
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 77 deletions.
15 changes: 3 additions & 12 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ var (
)

var (
Namespace = ""
ListenAddress = "0.0.0.0"
ListenPort = "9115"
HookMetricsListenPort = ""
Namespace = ""
ListenAddress = "0.0.0.0"
ListenPort = "9115"
)

var PrometheusMetricsPrefix = "shell_operator_"
Expand Down Expand Up @@ -124,14 +123,6 @@ func DefineStartCommandFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause)
StringVar(&PrometheusMetricsPrefix)
}

flag = CommonFlagsInfo["hook-metrics-listen-port"]
if flag.Define {
cmd.Flag(flag.Name, flag.Help).
Envar(flag.Envar).
Default(HookMetricsListenPort).
StringVar(&HookMetricsListenPort)
}

flag = CommonFlagsInfo["namespace"]
if flag.Define {
cmd.Flag(flag.Name, flag.Help).
Expand Down
13 changes: 5 additions & 8 deletions pkg/shell-operator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package shell_operator
import (
"context"
"fmt"
"net/http"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -51,7 +50,7 @@ func Init() (*ShellOperator, error) {
return nil, err
}

err = op.AssembleCommonOperator()
err = op.AssembleCommonOperator(app.ListenAddress, app.ListenPort)
if err != nil {
log.Errorf("Fatal: %s", err)
return nil, err
Expand All @@ -68,15 +67,13 @@ func Init() (*ShellOperator, error) {

// AssembleCommonOperator instantiate common dependencies. These dependencies
// may be used for shell-operator derivatives, like addon-operator.
func (op *ShellOperator) AssembleCommonOperator() (err error) {
err = startHttpServer(app.ListenAddress, app.ListenPort, http.DefaultServeMux)
if err != nil {
return fmt.Errorf("start HTTP server: %s", err)
}
// requires listenAddress, listenPort to run http server for operator APIs
func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string) (err error) {
op.APIServer = newBaseHTTPServer(listenAddress, listenPort)

op.MetricStorage = defaultMetricStorage(op.ctx)

op.HookMetricStorage, err = setupHookMetricStorageAndServer(op.ctx)
op.setupHookMetricStorage()
if err != nil {
return fmt.Errorf("start HTTP server for hook metrics: %s", err)
}
Expand Down
79 changes: 64 additions & 15 deletions pkg/shell-operator/http_server.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,88 @@
package shell_operator

import (
"context"
"fmt"
"net"
"net/http"
"os"
"time"

"github.com/go-chi/chi/v5"

log "github.com/sirupsen/logrus"

"github.com/flant/shell-operator/pkg/app"
)

func startHttpServer(ip string, port string, mux *http.ServeMux) error {
address := fmt.Sprintf("%s:%s", ip, port)
type baseHTTPServer struct {
router chi.Router

address string
port string
}

// Check if port is available
listener, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("listen on '%s' fails: %v", address, err)
// Start runs http server
func (bhs *baseHTTPServer) Start(ctx context.Context) {
srv := &http.Server{
Addr: bhs.address + ":" + bhs.port,
Handler: bhs.router,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}

log.Infof("Listen on %s", address)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("base http server listen: %s\n", err)
}
}()
log.Infof("base http server started at %s:%s", bhs.address, bhs.port)

go func() {
if err := http.Serve(listener, mux); err != nil {
log.Errorf("Fatal: error starting HTTP server: %s", err)
os.Exit(1)
<-ctx.Done()
log.Info("base http server stopped")

cctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer func() {
// extra handling here
cancel()
}()

if err := srv.Shutdown(cctx); err != nil {
log.Fatalf("base http server shutdown failed:%+v", err)
}
}()
}

// RegisterRoute register http.HandlerFunc
func (bhs *baseHTTPServer) RegisterRoute(method, pattern string, h http.HandlerFunc) {
switch method {
case http.MethodGet:
bhs.router.Get(pattern, h)

case http.MethodPost:
bhs.router.Post(pattern, h)

case http.MethodPut:
bhs.router.Put(pattern, h)

case http.MethodDelete:
bhs.router.Delete(pattern, h)
}
}

func newBaseHTTPServer(address, port string) *baseHTTPServer {
router := chi.NewRouter()

srv := &baseHTTPServer{
router: router,
address: address,
port: port,
}

return nil
return srv
}

func registerDefaultRoutes(op *ShellOperator) {
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
op.APIServer.RegisterRoute(http.MethodGet, "/", func(writer http.ResponseWriter, request *http.Request) {
_, _ = fmt.Fprintf(writer, `<html>
<head><title>Shell operator</title></head>
<body>
Expand All @@ -43,7 +92,7 @@ func registerDefaultRoutes(op *ShellOperator) {
</html>`, app.ListenPort)
})

http.HandleFunc("/metrics", func(writer http.ResponseWriter, request *http.Request) {
op.APIServer.RegisterRoute(http.MethodGet, "/metrics", func(writer http.ResponseWriter, request *http.Request) {
if op.MetricStorage != nil {
op.MetricStorage.Handler().ServeHTTP(writer, request)
}
Expand Down
25 changes: 4 additions & 21 deletions pkg/shell-operator/metrics_hooks.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,17 @@
package shell_operator

import (
"context"
"net/http"

"github.com/flant/shell-operator/pkg/app"
"github.com/flant/shell-operator/pkg/metric_storage"
)

// defaultHookMetricStorage creates MetricStorage object
// with new registry to scrape hook metrics on separate port.
func defaultHookMetricStorage(ctx context.Context) *metric_storage.MetricStorage {
metricStorage := metric_storage.NewMetricStorage(ctx, app.PrometheusMetricsPrefix, true)
return metricStorage
}
func (op *ShellOperator) setupHookMetricStorage() {
metricStorage := metric_storage.NewMetricStorage(op.ctx, app.PrometheusMetricsPrefix, true)

func setupHookMetricStorageAndServer(ctx context.Context) (*metric_storage.MetricStorage, error) {
if app.HookMetricsListenPort == "" || app.HookMetricsListenPort == app.ListenPort {
// No separate metric storage required.
return nil, nil
}
op.APIServer.RegisterRoute(http.MethodGet, "/metrics/hooks", metricStorage.Handler().ServeHTTP)
// create new metric storage for hooks
metricStorage := defaultHookMetricStorage(ctx)
// Create new ServeMux, and serve on custom port.
mux := http.NewServeMux()
err := startHttpServer(app.ListenAddress, app.HookMetricsListenPort, mux)
if err != nil {
return nil, err
}
// register scrape handler
mux.Handle("/metrics", metricStorage.Handler())
return metricStorage, nil
op.MetricStorage = metricStorage
}
47 changes: 26 additions & 21 deletions pkg/shell-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type ShellOperator struct {
ctx context.Context
cancel context.CancelFunc

// APIServer common http server for liveness and metrics endpoints
APIServer *baseHTTPServer

MetricStorage *metric_storage.MetricStorage
// separate metric storage for hook metrics if separate listen port is configured
HookMetricStorage *metric_storage.MetricStorage
Expand Down Expand Up @@ -64,6 +67,29 @@ func NewShellOperator(ctx context.Context) *ShellOperator {
}
}

// Start run the operator
func (op *ShellOperator) Start() {
log.Info("start shell-operator")

op.APIServer.Start(op.ctx)

// Create 'main' queue and add onStartup tasks and enable bindings tasks.
op.bootstrapMainQueue(op.TaskQueues)
// Start main task queue handler
op.TaskQueues.StartMain()
op.initAndStartHookQueues()

// Start emit "live" metrics
op.runMetrics()

// Managers are generating events. This go-routine handles all events and converts them into queued tasks.
// Start it before start all informers to catch all kubernetes events (#42)
op.ManagerEventsHandler.Start()

// Unlike KubeEventsManager, ScheduleManager has one go-routine.
op.ScheduleManager.Start()
}

func (op *ShellOperator) Stop() {
if op.cancel != nil {
op.cancel()
Expand Down Expand Up @@ -362,27 +388,6 @@ func (op *ShellOperator) conversionEventHandler(event conversion.Event) (*conver
}, nil
}

// Start
func (op *ShellOperator) Start() {
log.Info("start shell-operator")

// Create 'main' queue and add onStartup tasks and enable bindings tasks.
op.bootstrapMainQueue(op.TaskQueues)
// Start main task queue handler
op.TaskQueues.StartMain()
op.initAndStartHookQueues()

// Start emit "live" metrics
op.runMetrics()

// Managers are generating events. This go-routine handles all events and converts them into queued tasks.
// Start it before start all informers to catch all kubernetes events (#42)
op.ManagerEventsHandler.Start()

// Unlike KubeEventsManager, ScheduleManager has one go-routine.
op.ScheduleManager.Start()
}

// taskHandler
func (op *ShellOperator) taskHandler(t task.Task) queue.TaskResult {
logEntry := log.WithField("operator.component", "taskRunner")
Expand Down

0 comments on commit b7246b1

Please sign in to comment.