Skip to content

Commit

Permalink
server: add listener handler
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Apr 21, 2024
1 parent eb0d760 commit 7121751
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cli/serverv2/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/andydunstall/pico/serverv2/config"
"github.com/andydunstall/pico/serverv2/gossip"
"github.com/andydunstall/pico/serverv2/netmap"
"github.com/andydunstall/pico/serverv2/proxy"
adminserver "github.com/andydunstall/pico/serverv2/server/admin"
proxyserver "github.com/andydunstall/pico/serverv2/server/proxy"
"github.com/hashicorp/go-sockaddr"
Expand Down Expand Up @@ -277,6 +278,7 @@ func run(conf *config.Config, logger *log.Logger) {

proxyServer := proxyserver.NewServer(
conf.Proxy.BindAddr,
proxy.NewProxy(),
registry,
logger,
)
Expand Down
17 changes: 17 additions & 0 deletions serverv2/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package proxy

import "github.com/andydunstall/pico/pkg/rpc"

// Proxy is responsible for forwarding requests to upstream listeners.
type Proxy struct {
}

func NewProxy() *Proxy {
return &Proxy{}
}

func (p *Proxy) AddUpstream(_ string, _ *rpc.Stream) {
}

func (p *Proxy) RemoveUpstream(_ string, _ *rpc.Stream) {
}
23 changes: 23 additions & 0 deletions serverv2/server/proxy/rpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package server

import "github.com/andydunstall/pico/pkg/rpc"

type rpcServer struct {
rpcHandler *rpc.Handler
}

func newRPCServer() *rpcServer {
server := &rpcServer{
rpcHandler: rpc.NewHandler(),
}
server.rpcHandler.Register(rpc.TypeHeartbeat, server.Heartbeat)
return server
}

func (s *rpcServer) Handler() *rpc.Handler {
return s.rpcHandler
}

func (s *rpcServer) Heartbeat(m []byte) []byte {
return m
}
57 changes: 55 additions & 2 deletions serverv2/server/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import (
"fmt"
"net/http"
"strings"
"time"

"github.com/andydunstall/pico/pkg/conn"
"github.com/andydunstall/pico/pkg/log"
"github.com/andydunstall/pico/pkg/rpc"
"github.com/andydunstall/pico/serverv2/proxy"
"github.com/andydunstall/pico/serverv2/server/middleware"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -23,15 +28,26 @@ type Server struct {
router *gin.Engine

httpServer *http.Server
rpcServer *rpcServer

websocketUpgrader *websocket.Upgrader

proxy *proxy.Proxy

shutdownCtx context.Context
shutdownCancel func()

logger *log.Logger
}

func NewServer(
addr string,
proxy *proxy.Proxy,
registry *prometheus.Registry,
logger *log.Logger,
) *Server {
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())

router := gin.New()
server := &Server{
addr: addr,
Expand All @@ -40,7 +56,12 @@ func NewServer(
Addr: addr,
Handler: router,
},
logger: logger.WithSubsystem("proxy.server"),
rpcServer: newRPCServer(),
websocketUpgrader: &websocket.Upgrader{},
shutdownCtx: shutdownCtx,
shutdownCancel: shutdownCancel,
proxy: proxy,
logger: logger.WithSubsystem("proxy.server"),
}

// Recover from panics.
Expand Down Expand Up @@ -77,8 +98,40 @@ func (s *Server) registerRoutes() {
s.router.NoRoute(s.notFoundRoute)
}

// listenerRoute handles WebSocket connections from upstream listeners.
func (s *Server) listenerRoute(c *gin.Context) {
c.Status(http.StatusNotImplemented)
endpointID := c.Param("endpointID")

wsConn, err := s.websocketUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
// Upgrade replies to the client so nothing else to do.
s.logger.Warn("failed to upgrade websocket", zap.Error(err))
return
}
stream := rpc.NewStream(
conn.NewWebsocketConn(wsConn),
s.rpcServer.Handler(),
s.logger,
)
defer stream.Close()

s.logger.Debug(
"listener connected",
zap.String("endpoint-id", endpointID),
zap.String("client-ip", c.ClientIP()),
)

s.proxy.AddUpstream(endpointID, stream)
defer s.proxy.RemoveUpstream(endpointID, stream)

if err := stream.Monitor(
s.shutdownCtx,
// TODO(andydunstall): Configurable.
time.Second*10,
time.Second*10,
); err != nil {
s.logger.Debug("listener disconnected", zap.Error(err))
}
}

func (s *Server) proxyRoute(c *gin.Context) {
Expand Down

0 comments on commit 7121751

Please sign in to comment.