Skip to content

Commit

Permalink
server: add proxy status
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Apr 22, 2024
1 parent 17d93ca commit 87e9e3d
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 1 deletion.
6 changes: 5 additions & 1 deletion cli/server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,18 @@ func run(conf *config.Config, logger *log.Logger) {
networkMapStatus := netmap.NewStatus(networkMap)
adminServer.AddStatus("/netmap", networkMapStatus)

p := proxy.NewProxy(networkMap, registry, logger)
proxyServer := proxyserver.NewServer(
conf.Proxy.BindAddr,
proxy.NewProxy(networkMap, registry, logger),
p,
&conf.Proxy,
registry,
logger,
)

proxyStatus := proxy.NewStatus(p)
adminServer.AddStatus("/proxy", proxyStatus)

ctx, cancel := signal.NotifyContext(
context.Background(), syscall.SIGINT, syscall.SIGTERM,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ type Conn interface {
NextReader() (io.Reader, error)
WriteMessage(b []byte) error
NextWriter() (io.WriteCloser, error)
Addr() string
Close() error
}
4 changes: 4 additions & 0 deletions pkg/conn/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (t *WebsocketConn) NextWriter() (io.WriteCloser, error) {
return t.wsConn.NextWriter(websocket.BinaryMessage)
}

func (t *WebsocketConn) Addr() string {
return t.wsConn.RemoteAddr().String()
}

func (t *WebsocketConn) Close() error {
return t.wsConn.Close()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/rpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewStream(conn conn.Conn, handler *Handler, logger *log.Logger) *Stream {
return stream
}

func (s *Stream) Addr() string {
return s.conn.Addr()
}

// RPC sends the given request message to the peer and returns the response or
// an error.
//
Expand Down
21 changes: 21 additions & 0 deletions server/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ func (e *localEndpoint) Next() *rpc.Stream {
return s
}

func (e *localEndpoint) UpstreamAddrs() []string {
var addrs []string
for _, stream := range e.streams {
addrs = append(addrs, stream.Addr())
}
return addrs
}

// Proxy is responsible for forwarding requests to upstream listeners.
type Proxy struct {
localEndpoints map[string]*localEndpoint
Expand Down Expand Up @@ -178,6 +186,19 @@ func (p *Proxy) RemoveUpstream(endpointID string, stream *rpc.Stream) {
p.metrics.Listeners.Dec()
}

// LocalEndpoints returns a mapping from endpoint ID to listener addresses for
// all local registered endpoints.
func (p *Proxy) LocalEndpoints() map[string][]string {
p.mu.Lock()
defer p.mu.Unlock()

endpoints := make(map[string][]string)
for endpointID, endpoint := range p.localEndpoints {
endpoints[endpointID] = endpoint.UpstreamAddrs()
}
return endpoints
}

func (p *Proxy) request(
ctx context.Context,
endpointID string,
Expand Down
29 changes: 29 additions & 0 deletions server/proxy/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package proxy

import (
"net/http"

"github.com/andydunstall/pico/server/status"
"github.com/gin-gonic/gin"
)

type Status struct {
proxy *Proxy
}

func NewStatus(proxy *Proxy) *Status {
return &Status{
proxy: proxy,
}
}

func (s *Status) Register(group *gin.RouterGroup) {
group.GET("/endpoints", s.listEndpointsRoute)
}

func (s *Status) listEndpointsRoute(c *gin.Context) {
endpoints := s.proxy.LocalEndpoints()
c.JSON(http.StatusOK, endpoints)
}

var _ status.Handler = &Status{}
1 change: 1 addition & 0 deletions server/server/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (s *Server) registerRoutes() {
}

func (s *Server) healthRoute(c *gin.Context) {
c.Status(http.StatusOK)
}

func (s *Server) panicRoute(c *gin.Context, err any) {
Expand Down

0 comments on commit 87e9e3d

Please sign in to comment.