Skip to content

Commit

Permalink
server: add proxy server
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Apr 21, 2024
1 parent 4ea6545 commit eb0d760
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 8 deletions.
82 changes: 81 additions & 1 deletion cli/serverv2/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/andydunstall/pico/serverv2/gossip"
"github.com/andydunstall/pico/serverv2/netmap"
adminserver "github.com/andydunstall/pico/serverv2/server/admin"
proxyserver "github.com/andydunstall/pico/serverv2/server/proxy"
"github.com/hashicorp/go-sockaddr"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -64,6 +65,22 @@ The host/port to listen for incoming proxy HTTP and WebSocket connections.
If the host is unspecified it defaults to all listeners, such as
'--proxy.bind-addr :8080' will listen on '0.0.0.0:8080'`,
)
cmd.Flags().StringVar(
&conf.Gossip.AdvertiseAddr,
"proxy.advertise-addr",
"",
`
Proxy listen address to advertise to other nodes in the cluster. This is the
address other nodes will used to forward proxy requests.
Such as if the listen address is ':8080', the advertised address may be
'10.26.104.45:8080' or 'node1.cluster:8080'.
By default, if the bind address includes an IP to bind to that will be used.
If the bind address does not include an IP (such as ':8080') the nodes
private IP will be used, such as a bind address of ':8080' may have an
advertise address of '10.26.104.14:8080'.`,
)

cmd.Flags().StringVar(
&conf.Admin.BindAddr,
Expand All @@ -75,6 +92,22 @@ The host/port to listen for incoming admin connections.
If the host is unspecified it defaults to all listeners, such as
'--admin.bind-addr :8081' will listen on '0.0.0.0:8081'`,
)
cmd.Flags().StringVar(
&conf.Gossip.AdvertiseAddr,
"admin.advertise-addr",
"",
`
Admin listen address to advertise to other nodes in the cluster. This is the
address other nodes will used to forward admin requests.
Such as if the listen address is ':8081', the advertised address may be
'10.26.104.45:8081' or 'node1.cluster:8081'.
By default, if the bind address includes an IP to bind to that will be used.
If the bind address does not include an IP (such as ':8081') the nodes
private IP will be used, such as a bind address of ':8081' may have an
advertise address of '10.26.104.14:8081'.`,
)

cmd.Flags().StringVar(
&conf.Gossip.BindAddr,
Expand All @@ -93,7 +126,7 @@ If the host is unspecified it defaults to all listeners, such as
"",
`
Gossip listen address to advertise to other nodes in the cluster. This is the
address other nodes will used to gossip with the.
address other nodes will used to gossip with the node.
Such as if the listen address is ':7000', the advertised address may be
'10.26.104.45:7000' or 'node1.cluster:7000'.
Expand Down Expand Up @@ -171,6 +204,22 @@ Such as you can enable 'gossip' logs with '--log.subsystems gossip'.`,
conf.Cluster.NodeID = netmap.GenerateNodeID()
}

if conf.Proxy.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Proxy.BindAddr)
if err != nil {
logger.Error("invalid configuration", zap.Error(err))
os.Exit(1)
}
conf.Proxy.AdvertiseAddr = advertiseAddr
}
if conf.Admin.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Admin.BindAddr)
if err != nil {
logger.Error("invalid configuration", zap.Error(err))
os.Exit(1)
}
conf.Admin.AdvertiseAddr = advertiseAddr
}
if conf.Gossip.AdvertiseAddr == "" {
advertiseAddr, err := advertiseAddrFromBindAddr(conf.Gossip.BindAddr)
if err != nil {
Expand All @@ -194,6 +243,8 @@ func run(conf *config.Config, logger *log.Logger) {
networkMap := netmap.NewNetworkMap(&netmap.Node{
ID: conf.Cluster.NodeID,
Status: netmap.NodeStatusJoining,
ProxyAddr: conf.Proxy.AdvertiseAddr,
AdminAddr: conf.Admin.AdvertiseAddr,
GossipAddr: conf.Gossip.AdvertiseAddr,
}, logger)
gossip, err := gossip.NewGossip(
Expand Down Expand Up @@ -224,6 +275,12 @@ func run(conf *config.Config, logger *log.Logger) {
networkMapStatus := netmap.NewStatus(networkMap)
adminServer.AddStatus("/netmap", networkMapStatus)

proxyServer := proxyserver.NewServer(
conf.Proxy.BindAddr,
registry,
logger,
)

ctx, cancel := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM,
)
Expand Down Expand Up @@ -253,6 +310,29 @@ func run(conf *config.Config, logger *log.Logger) {
}
return nil
})
g.Go(func() error {
if err := proxyServer.Serve(); err != nil {
return fmt.Errorf("proxy server serve: %w", err)
}
return nil
})
g.Go(func() error {
<-ctx.Done()

logger.Info("shutting down proxy server")

shutdownCtx, cancel := context.WithTimeout(
context.Background(),
// TODO(andydunstall): Add configuration.
time.Second*30,
)
defer cancel()

if err := proxyServer.Shutdown(shutdownCtx); err != nil {
logger.Warn("failed to gracefully shutdown server", zap.Error(err))
}
return nil
})

networkMap.UpdateLocalStatus(netmap.NodeStatusActive)

Expand Down
6 changes: 6 additions & 0 deletions serverv2/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import "fmt"
type ProxyConfig struct {
// BindAddr is the address to bind to listen for incoming HTTP connections.
BindAddr string `json:"bind_addr"`

// AdvertiseAddr is the address to advertise to other nodes.
AdvertiseAddr string `json:"advertise_addr"`
}

func (c *ProxyConfig) Validate() error {
Expand All @@ -17,6 +20,9 @@ func (c *ProxyConfig) Validate() error {
type AdminConfig struct {
// BindAddr is the address to bind to listen for incoming HTTP connections.
BindAddr string `json:"bind_addr"`

// AdvertiseAddr is the address to advertise to other nodes.
AdvertiseAddr string `json:"advertise_addr"`
}

func (c *AdminConfig) Validate() error {
Expand Down
6 changes: 6 additions & 0 deletions serverv2/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (g *Gossip) Close() error {
// propagated to other nodes.
func (g *Gossip) updateLocalState() {
localNode := g.networkMap.LocalNode()
g.kite.UpsertLocal("proxy_addr", localNode.ProxyAddr)
g.kite.UpsertLocal("admin_addr", localNode.AdminAddr)
g.kite.UpsertLocal("gossip_addr", localNode.GossipAddr)
// Note adding the status last since a node is considered 'pending' until
// the status is known.
Expand Down Expand Up @@ -183,6 +185,10 @@ func (g *Gossip) onRemoteUpdate(nodeID, key, value string) {
}

switch key {
case "proxy_addr":
node.ProxyAddr = value
case "admin_addr":
node.AdminAddr = value
case "gossip_addr":
node.GossipAddr = value
case "status":
Expand Down
6 changes: 6 additions & 0 deletions serverv2/netmap/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type Node struct {

Status NodeStatus `json:"status"`

// ProxyAddr is the advertised proxy address.
ProxyAddr string `json:"proxy_addr"`
// AdminAddr is the advertised admin address.
AdminAddr string `json:"admin_addr"`
// GossipAddr is the advertised gossip address.
GossipAddr string `json:"gossip_addr"`
}
Expand All @@ -31,6 +35,8 @@ func (n *Node) Copy() *Node {
return &Node{
ID: n.ID,
Status: n.Status,
ProxyAddr: n.ProxyAddr,
AdminAddr: n.AdminAddr,
GossipAddr: n.GossipAddr,
}
}
Expand Down
2 changes: 1 addition & 1 deletion serverv2/server/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewServer(

server.router.Use(middleware.NewLogger(logger))
if registry != nil {
router.Use(middleware.NewMetrics(registry))
router.Use(middleware.NewMetrics("admin", registry))
}

server.registerRoutes()
Expand Down
14 changes: 8 additions & 6 deletions serverv2/server/middleware/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (
)

// NewMetrics creates metrics middleware.
func NewMetrics(registry *prometheus.Registry) gin.HandlerFunc {
func NewMetrics(namespace string, registry *prometheus.Registry) gin.HandlerFunc {
var requests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "HTTP requests.",
Namespace: namespace,
Name: "http_requests_total",
Help: "HTTP requests.",
},
[]string{"status"},
)
var requestLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_latency_seconds",
Help: "HTTP request latency.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 10),
Namespace: namespace,
Name: "http_request_latency_seconds",
Help: "HTTP request latency.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 10),
},
[]string{"status"},
)
Expand Down
109 changes: 109 additions & 0 deletions serverv2/server/proxy/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package server

import (
"context"
"fmt"
"net/http"
"strings"

"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/serverv2/server/middleware"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

// Server is the HTTP server for the proxy, which is used for both upstream
// listeners and downstream clients.
//
// /pico is reserved for upstream listeners, all other routes will be proxied.
type Server struct {
addr string

router *gin.Engine

httpServer *http.Server

logger *log.Logger
}

func NewServer(
addr string,
registry *prometheus.Registry,
logger *log.Logger,
) *Server {
router := gin.New()
server := &Server{
addr: addr,
router: router,
httpServer: &http.Server{
Addr: addr,
Handler: router,
},
logger: logger.WithSubsystem("proxy.server"),
}

// Recover from panics.
server.router.Use(gin.CustomRecovery(server.panicRoute))

server.router.Use(middleware.NewLogger(logger))
if registry != nil {
router.Use(middleware.NewMetrics("proxy", registry))
}

server.registerRoutes()

return server
}

func (s *Server) Serve() error {
s.logger.Info("starting http server", zap.String("addr", s.addr))

if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("http serve: %w", err)
}
return nil
}

func (s *Server) Shutdown(ctx context.Context) error {
return s.httpServer.Shutdown(ctx)
}

func (s *Server) registerRoutes() {
pico := s.router.Group("/pico/v1")
pico.GET("/listener/:endpointID", s.listenerRoute)

// Handle not found routes, which includes all proxied endpoints.
s.router.NoRoute(s.notFoundRoute)
}

func (s *Server) listenerRoute(c *gin.Context) {
c.Status(http.StatusNotImplemented)
}

func (s *Server) proxyRoute(c *gin.Context) {
c.Status(http.StatusNotImplemented)
}

func (s *Server) notFoundRoute(c *gin.Context) {
// All /pico endpoints are reserved. All others are proxied.
if strings.HasPrefix(c.Request.URL.Path, "/pico") {
c.Status(http.StatusNotFound)
return
}
s.proxyRoute(c)
}

func (s *Server) panicRoute(c *gin.Context, err any) {
s.logger.Error(
"handler panic",
zap.String("path", c.FullPath()),
zap.Any("err", err),
)
c.AbortWithStatus(http.StatusInternalServerError)
}

func init() {
// Disable Gin debug logs.
gin.SetMode(gin.ReleaseMode)
}

0 comments on commit eb0d760

Please sign in to comment.