Skip to content

Commit

Permalink
status: add gossip status
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Apr 23, 2024
1 parent c5a9d8f commit 99aa666
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 14 deletions.
25 changes: 14 additions & 11 deletions cli/server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func run(conf *config.Config, logger *log.Logger) {
AdminAddr: conf.Admin.AdvertiseAddr,
GossipAddr: conf.Gossip.AdvertiseAddr,
}, logger)
gossip, err := gossip.NewGossip(
g, err := gossip.NewGossip(
conf.Gossip.BindAddr,
networkMap,
registry,
Expand All @@ -285,10 +285,10 @@ func run(conf *config.Config, logger *log.Logger) {
logger.Error("failed to start gossip: %w", zap.Error(err))
os.Exit(1)
}
defer gossip.Close()
defer g.Close()

if len(conf.Cluster.Join) > 0 {
if err := gossip.Join(conf.Cluster.Join); err != nil {
if err := g.Join(conf.Cluster.Join); err != nil {
logger.Error("failed to join cluster", zap.Error(err))
os.Exit(1)
}
Expand All @@ -303,6 +303,9 @@ func run(conf *config.Config, logger *log.Logger) {
networkMapStatus := netmap.NewStatus(networkMap)
adminServer.AddStatus("/netmap", networkMapStatus)

gossipStatus := gossip.NewStatus(g)
adminServer.AddStatus("/gossip", gossipStatus)

p := proxy.NewProxy(networkMap, registry, logger)
proxyServer := proxyserver.NewServer(
conf.Proxy.BindAddr,
Expand All @@ -320,14 +323,14 @@ func run(conf *config.Config, logger *log.Logger) {
)
defer cancel()

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
if err := adminServer.Serve(); err != nil {
return fmt.Errorf("admin server serve: %w", err)
}
return nil
})
g.Go(func() error {
group.Go(func() error {
<-ctx.Done()

logger.Info("shutting down admin server")
Expand All @@ -343,13 +346,13 @@ func run(conf *config.Config, logger *log.Logger) {
}
return nil
})
g.Go(func() error {
group.Go(func() error {
if err := proxyServer.Serve(); err != nil {
return fmt.Errorf("proxy server serve: %w", err)
}
return nil
})
g.Go(func() error {
group.Go(func() error {
<-ctx.Done()

logger.Info("shutting down proxy server")
Expand All @@ -365,7 +368,7 @@ func run(conf *config.Config, logger *log.Logger) {
}
return nil
})
g.Go(func() error {
group.Go(func() error {
<-ctx.Done()

logger.Info("leaving cluster")
Expand All @@ -378,15 +381,15 @@ func run(conf *config.Config, logger *log.Logger) {

// Leave as soon as we receive the shutdown signal to avoid receiving
// forward proxy requests.
if err := gossip.Leave(shutdownCtx); err != nil {
if err := g.Leave(shutdownCtx); err != nil {
logger.Warn("failed to leave cluster", zap.Error(err))
}
return nil
})

networkMap.UpdateLocalStatus(netmap.NodeStatusActive)

if err := g.Wait(); err != nil {
if err := group.Wait(); err != nil {
logger.Error("failed to run server", zap.Error(err))
}

Expand Down
2 changes: 2 additions & 0 deletions cli/status/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Each Pico server exposes a status API to inspect the state of the node, this
can be used to answer questions such as:
* What upstream listeners are attached to each node?
* What cluster state does this node know?
* What is the gossip state of each known node?
See 'status --help' for the availale commands.
Expand All @@ -29,6 +30,7 @@ Examples:

cmd.AddCommand(newProxyCommand())
cmd.AddCommand(newNetmapCommand())
cmd.AddCommand(newGossipCommand())

return cmd
}
138 changes: 138 additions & 0 deletions cli/status/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package status

import (
"fmt"
"net/url"
"os"

"github.com/andydunstall/kite"
"github.com/andydunstall/pico/status/client"
"github.com/andydunstall/pico/status/config"
yaml "github.com/goccy/go-yaml"
"github.com/spf13/cobra"
)

func newGossipCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "gossip",
Short: "inspect gossip state",
}

cmd.AddCommand(newGossipMembersCommand())
cmd.AddCommand(newGossipMemberCommand())

return cmd
}

func newGossipMembersCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "members",
Short: "inspect gossip members",
Long: `Inspect gossip members.
Queries the server for the metadata for each known gossip member in the
cluster.
Examples:
pico status gossip members
`,
}

var conf config.Config

cmd.Flags().StringVar(
&conf.Server.URL,
"server.url",
"http://localhost:8081",
`
Pico server URL. This URL should point to the server admin port.
`,
)

cmd.Run = func(cmd *cobra.Command, args []string) {
if err := conf.Validate(); err != nil {
fmt.Printf("invalid config: %s\n", err.Error())
os.Exit(1)
}

showGossipMembers(&conf)
}

return cmd
}

type gossipMembersOutput struct {
Members []*kite.MemberMeta `json:"members"`
}

func showGossipMembers(conf *config.Config) {
// The URL has already been validated in conf.
url, _ := url.Parse(conf.Server.URL)
client := client.NewClient(url)
defer client.Close()

members, err := client.GossipMembers()
if err != nil {
fmt.Printf("failed to get gossip members: %s\n", err.Error())
os.Exit(1)
}

output := gossipMembersOutput{
Members: members,
}
b, _ := yaml.Marshal(output)
fmt.Println(string(b))
}

func newGossipMemberCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "member",
Args: cobra.ExactArgs(1),
Short: "inspect a gossip member",
Long: `Inspect a gossip member.
Queries the server for the known state of the gossip member with the given ID.
Examples:
pico status gossip member bbc69214
`,
}

var conf config.Config

cmd.Flags().StringVar(
&conf.Server.URL,
"server.url",
"http://localhost:8081",
`
Pico server URL. This URL should point to the server admin port.
`,
)

cmd.Run = func(cmd *cobra.Command, args []string) {
if err := conf.Validate(); err != nil {
fmt.Printf("invalid config: %s\n", err.Error())
os.Exit(1)
}

showGossipMember(args[0], &conf)
}

return cmd
}

func showGossipMember(memberID string, conf *config.Config) {
// The URL has already been validated in conf.
url, _ := url.Parse(conf.Server.URL)
client := client.NewClient(url)
defer client.Close()

member, err := client.GossipMember(memberID)
if err != nil {
fmt.Printf("failed to get gossip member: %s: %s\n", memberID, err.Error())
os.Exit(1)
}

b, _ := yaml.Marshal(member)
fmt.Println(string(b))
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/andydunstall/pico
go 1.21.1

require (
github.com/andydunstall/kite v0.0.0-20240421130541-932f8f353c71
github.com/andydunstall/kite v0.0.0-20240423050227-a2d489e20d80
github.com/gin-gonic/gin v1.9.1
github.com/goccy/go-yaml v1.11.3
github.com/gorilla/websocket v1.5.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/andydunstall/kite v0.0.0-20240421130541-932f8f353c71 h1:GdlgZALSQUKkri2saczA3VuoxIYDYdNWcUH5+G74mJQ=
github.com/andydunstall/kite v0.0.0-20240421130541-932f8f353c71/go.mod h1:7Bn7WGcSRLvX5HQySLzcGl9dJD1m9xoYNZtx5IIkx1I=
github.com/andydunstall/kite v0.0.0-20240423050227-a2d489e20d80 h1:CTZ0i90sx0zT2mLkTJ8bMla5YmrDiuYOFm+iI4kXaW0=
github.com/andydunstall/kite v0.0.0-20240423050227-a2d489e20d80/go.mod h1:7Bn7WGcSRLvX5HQySLzcGl9dJD1m9xoYNZtx5IIkx1I=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bytedance/sonic v1.11.5 h1:G00FYjjqll5iQ1PYXynbg/hyzqBqavH8Mo9/oTopd9k=
Expand Down
8 changes: 8 additions & 0 deletions server/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ func (g *Gossip) Leave(ctx context.Context) error {
}
}

func (g *Gossip) MembersMetadata(filter kite.MemberFilter) []kite.MemberMeta {
return g.kite.MembersMetadata(filter)
}

func (g *Gossip) MemberState(id string) (kite.MemberState, bool) {
return g.kite.MemberState(id)
}

func (g *Gossip) Close() error {
return g.kite.Close()
}
Expand Down
50 changes: 50 additions & 0 deletions server/gossip/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package gossip

import (
"net/http"
"sort"

"github.com/andydunstall/kite"
"github.com/andydunstall/pico/server/status"
"github.com/gin-gonic/gin"
)

type Status struct {
gossip *Gossip
}

func NewStatus(gossip *Gossip) *Status {
return &Status{
gossip: gossip,
}
}

func (s *Status) Register(group *gin.RouterGroup) {
group.GET("/members", s.listMembersRoute)
group.GET("/members/:id", s.getMemberRoute)
}

func (s *Status) listMembersRoute(c *gin.Context) {
members := s.gossip.MembersMetadata(kite.MemberFilter{
Local: true,
})
c.JSON(http.StatusOK, members)
}

func (s *Status) getMemberRoute(c *gin.Context) {
id := c.Param("id")
state, ok := s.gossip.MemberState(id)
if !ok {
c.Status(http.StatusNotFound)
return
}

// Sort state by version.
sort.Slice(state.State, func(i, j int) bool {
return state.State[i].Version < state.State[j].Version
})

c.JSON(http.StatusOK, state)
}

var _ status.Handler = &Status{}
29 changes: 29 additions & 0 deletions status/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
fspath "path"
"time"

"github.com/andydunstall/kite"
"github.com/andydunstall/pico/server/netmap"
)

Expand Down Expand Up @@ -69,6 +70,34 @@ func (c *Client) NetmapNode(nodeID string) (*netmap.Node, error) {
return &node, nil
}

func (c *Client) GossipMembers() ([]*kite.MemberMeta, error) {
r, err := c.request("/status/gossip/members")
if err != nil {
return nil, err
}
defer r.Close()

var members []*kite.MemberMeta
if err := json.NewDecoder(r).Decode(&members); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return members, nil
}

func (c *Client) GossipMember(memberID string) (*kite.MemberState, error) {
r, err := c.request("/status/gossip/members/" + memberID)
if err != nil {
return nil, err
}
defer r.Close()

var member kite.MemberState
if err := json.NewDecoder(r).Decode(&member); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &member, nil
}

func (c *Client) Close() {
c.httpClient.CloseIdleConnections()
}
Expand Down

0 comments on commit 99aa666

Please sign in to comment.