From 7121751d713970b73c0cff8f127d3c37eda2b99b Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Sun, 21 Apr 2024 11:23:43 +0100 Subject: [PATCH] server: add listener handler --- cli/serverv2/command.go | 2 ++ serverv2/proxy/proxy.go | 17 +++++++++ serverv2/server/proxy/rpcserver.go | 23 ++++++++++++ serverv2/server/proxy/server.go | 57 ++++++++++++++++++++++++++++-- 4 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 serverv2/proxy/proxy.go create mode 100644 serverv2/server/proxy/rpcserver.go diff --git a/cli/serverv2/command.go b/cli/serverv2/command.go index 77c5ef2..d5b5fd8 100644 --- a/cli/serverv2/command.go +++ b/cli/serverv2/command.go @@ -14,6 +14,7 @@ import ( "github.com/andydunstall/pico/serverv2/config" "github.com/andydunstall/pico/serverv2/gossip" "github.com/andydunstall/pico/serverv2/netmap" + "github.com/andydunstall/pico/serverv2/proxy" adminserver "github.com/andydunstall/pico/serverv2/server/admin" proxyserver "github.com/andydunstall/pico/serverv2/server/proxy" "github.com/hashicorp/go-sockaddr" @@ -277,6 +278,7 @@ func run(conf *config.Config, logger *log.Logger) { proxyServer := proxyserver.NewServer( conf.Proxy.BindAddr, + proxy.NewProxy(), registry, logger, ) diff --git a/serverv2/proxy/proxy.go b/serverv2/proxy/proxy.go new file mode 100644 index 0000000..f62817b --- /dev/null +++ b/serverv2/proxy/proxy.go @@ -0,0 +1,17 @@ +package proxy + +import "github.com/andydunstall/pico/pkg/rpc" + +// Proxy is responsible for forwarding requests to upstream listeners. +type Proxy struct { +} + +func NewProxy() *Proxy { + return &Proxy{} +} + +func (p *Proxy) AddUpstream(_ string, _ *rpc.Stream) { +} + +func (p *Proxy) RemoveUpstream(_ string, _ *rpc.Stream) { +} diff --git a/serverv2/server/proxy/rpcserver.go b/serverv2/server/proxy/rpcserver.go new file mode 100644 index 0000000..5536923 --- /dev/null +++ b/serverv2/server/proxy/rpcserver.go @@ -0,0 +1,23 @@ +package server + +import "github.com/andydunstall/pico/pkg/rpc" + +type rpcServer struct { + rpcHandler *rpc.Handler +} + +func newRPCServer() *rpcServer { + server := &rpcServer{ + rpcHandler: rpc.NewHandler(), + } + server.rpcHandler.Register(rpc.TypeHeartbeat, server.Heartbeat) + return server +} + +func (s *rpcServer) Handler() *rpc.Handler { + return s.rpcHandler +} + +func (s *rpcServer) Heartbeat(m []byte) []byte { + return m +} diff --git a/serverv2/server/proxy/server.go b/serverv2/server/proxy/server.go index 03caa50..75afb09 100644 --- a/serverv2/server/proxy/server.go +++ b/serverv2/server/proxy/server.go @@ -5,10 +5,15 @@ import ( "fmt" "net/http" "strings" + "time" + "github.com/andydunstall/pico/pkg/conn" "github.com/andydunstall/pico/pkg/log" + "github.com/andydunstall/pico/pkg/rpc" + "github.com/andydunstall/pico/serverv2/proxy" "github.com/andydunstall/pico/serverv2/server/middleware" "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -23,15 +28,26 @@ type Server struct { router *gin.Engine httpServer *http.Server + rpcServer *rpcServer + + websocketUpgrader *websocket.Upgrader + + proxy *proxy.Proxy + + shutdownCtx context.Context + shutdownCancel func() logger *log.Logger } func NewServer( addr string, + proxy *proxy.Proxy, registry *prometheus.Registry, logger *log.Logger, ) *Server { + shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) + router := gin.New() server := &Server{ addr: addr, @@ -40,7 +56,12 @@ func NewServer( Addr: addr, Handler: router, }, - logger: logger.WithSubsystem("proxy.server"), + rpcServer: newRPCServer(), + websocketUpgrader: &websocket.Upgrader{}, + shutdownCtx: shutdownCtx, + shutdownCancel: shutdownCancel, + proxy: proxy, + logger: logger.WithSubsystem("proxy.server"), } // Recover from panics. @@ -77,8 +98,40 @@ func (s *Server) registerRoutes() { s.router.NoRoute(s.notFoundRoute) } +// listenerRoute handles WebSocket connections from upstream listeners. func (s *Server) listenerRoute(c *gin.Context) { - c.Status(http.StatusNotImplemented) + endpointID := c.Param("endpointID") + + wsConn, err := s.websocketUpgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + // Upgrade replies to the client so nothing else to do. + s.logger.Warn("failed to upgrade websocket", zap.Error(err)) + return + } + stream := rpc.NewStream( + conn.NewWebsocketConn(wsConn), + s.rpcServer.Handler(), + s.logger, + ) + defer stream.Close() + + s.logger.Debug( + "listener connected", + zap.String("endpoint-id", endpointID), + zap.String("client-ip", c.ClientIP()), + ) + + s.proxy.AddUpstream(endpointID, stream) + defer s.proxy.RemoveUpstream(endpointID, stream) + + if err := stream.Monitor( + s.shutdownCtx, + // TODO(andydunstall): Configurable. + time.Second*10, + time.Second*10, + ); err != nil { + s.logger.Debug("listener disconnected", zap.Error(err)) + } } func (s *Server) proxyRoute(c *gin.Context) {