From 79f17fdd69232acfb78f9d82fe0da85bedb26961 Mon Sep 17 00:00:00 2001 From: Antonio Mika Date: Wed, 9 Oct 2024 15:24:58 -0400 Subject: [PATCH] Add public key comment to pubsub ls --- go.mod | 2 +- pubsub/cli.go | 81 +++++++++++++++++++++++++++++++++++++++++---------- pubsub/ssh.go | 10 ++++--- 3 files changed, 72 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index d5ea40b7..c389ae00 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ replace github.com/gdamore/tcell/v2 => github.com/delthas/tcell/v2 v2.4.1-0.2023 require ( git.sr.ht/~delthas/senpai v0.3.1-0.20240425235039-206be659439e github.com/alecthomas/chroma/v2 v2.14.0 + github.com/antoniomika/syncmap v1.0.0 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/charmbracelet/bubbles v0.18.0 github.com/charmbracelet/bubbletea v1.1.1 @@ -60,7 +61,6 @@ require ( github.com/PuerkitoBio/goquery v1.9.2 // indirect github.com/andybalholm/cascadia v1.3.2 // indirect github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect - github.com/antoniomika/syncmap v1.0.0 // indirect github.com/atotto/clipboard v0.1.4 // indirect github.com/aws/aws-sdk-go-v2 v1.32.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect diff --git a/pubsub/cli.go b/pubsub/cli.go index 388d13b0..a3c50382 100644 --- a/pubsub/cli.go +++ b/pubsub/cli.go @@ -7,10 +7,12 @@ import ( "fmt" "io" "log/slog" + "slices" "strings" "text/tabwriter" "time" + "github.com/antoniomika/syncmap" "github.com/charmbracelet/ssh" "github.com/charmbracelet/wish" "github.com/google/uuid" @@ -79,10 +81,10 @@ func clientInfo(clients []*psub.Client, clientType string) string { return "" } - outputData := fmt.Sprintf("\t%s:\r\n", clientType) + outputData := fmt.Sprintf(" %s:\r\n", clientType) for _, client := range clients { - outputData += fmt.Sprintf("\t- %s\r\n", client.ID) + outputData += fmt.Sprintf(" - %s\r\n", client.ID) } return outputData @@ -110,10 +112,11 @@ data is being sent: } type CliHandler struct { - DBPool db.DB - Logger *slog.Logger - PubSub psub.PubSub - Cfg *shared.ConfigSite + DBPool db.DB + Logger *slog.Logger + PubSub psub.PubSub + Cfg *shared.ConfigSite + Waiters *syncmap.Map[string, []string] } func toSshCmd(cfg *shared.ConfigSite) string { @@ -162,6 +165,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { } var channels []*psub.Channel + waitingChannels := map[string][]string{} for topic, channel := range pubsub.GetChannels() { if strings.HasPrefix(topic, topicFilter) { @@ -169,15 +173,21 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { } } - if len(channels) == 0 { + for channel, clients := range handler.Waiters.Range { + if strings.HasPrefix(channel, topicFilter) { + waitingChannels[channel] = clients + } + } + + if len(channels) == 0 && len(waitingChannels) == 0 { wish.Println(sesh, "no pubsub channels found") } else { var outputData string - if len(channels) > 0 { + if len(channels) > 0 || len(waitingChannels) > 0 { outputData += "Channel Information\r\n" for _, channel := range channels { outputData += fmt.Sprintf("- %s:\r\n", channel.Topic) - outputData += "\tClients:\r\n" + outputData += " Clients:\r\n" var pubs []*psub.Client var subs []*psub.Client @@ -196,6 +206,15 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { outputData += clientInfo(subs, "Subs") outputData += clientInfo(pipes, "Pipes") } + + for waitingChannel, channelPubs := range waitingChannels { + outputData += fmt.Sprintf("- %s:\r\n", waitingChannel) + outputData += " Clients:\r\n" + outputData += fmt.Sprintf(" %s:\r\n", "Waiting Pubs") + for _, client := range channelPubs { + outputData += fmt.Sprintf(" - %s\r\n", client) + } + } } _, _ = sesh.Write([]byte(outputData)) @@ -220,6 +239,14 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { "cmdArgs", cmdArgs, ) + userName := user.Name + + if user.PublicKey.Name != "" { + userName += fmt.Sprintf("-%s", user.PublicKey.Name) + } + + clientID := fmt.Sprintf("%s (%s@%s)", uuid.NewString(), userName, sesh.RemoteAddr().String()) + if cmd == "pub" { pubCmd := flagSet("pub", sesh) empty := pubCmd.Bool("e", false, "Send an empty message to subs") @@ -270,14 +297,12 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { wish.Printf( sesh, - "subscribe to this channel:\n\tssh %s sub %s%s\n", + "subscribe to this channel:\n ssh %s sub %s%s\n", toSshCmd(handler.Cfg), msgFlag, topic, ) - wish.Println(sesh, "sending msg ...") - var pubCtx context.Context = ctx if *block { @@ -295,6 +320,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { tt := *timeout if count == 0 { + currentWaiters, _ := handler.Waiters.LoadOrStore(name, nil) + handler.Waiters.Store(name, append(currentWaiters, clientID)) + termMsg := "no subs found ... waiting" if tt > 0 { termMsg += " " + tt.String() @@ -335,16 +363,37 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { select { case <-ready: + case <-ctx.Done(): case <-time.After(tt): cancelFunc() wish.Fatalln(sesh, "timeout reached, exiting ...") } + + newWaiters, _ := handler.Waiters.LoadOrStore(name, nil) + newWaiters = slices.DeleteFunc(newWaiters, func(cl string) bool { + return cl == clientID + }) + handler.Waiters.Store(name, newWaiters) + + var toDelete []string + + for channel, clients := range handler.Waiters.Range { + if len(clients) == 0 { + toDelete = append(toDelete, channel) + } + } + + for _, channel := range toDelete { + handler.Waiters.Delete(channel) + } } } + wish.Println(sesh, "sending msg ...") + err = pubsub.Pub( pubCtx, - fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()), + clientID, rw, []*psub.Channel{ psub.NewChannel(name), @@ -389,7 +438,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { err = pubsub.Sub( ctx, - fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()), + clientID, sesh, []*psub.Channel{ psub.NewChannel(name), @@ -441,7 +490,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { if isCreator { wish.Printf( sesh, - "subscribe to this topic:\n\tssh %s sub %s%s\n", + "subscribe to this topic:\n ssh %s sub %s%s\n", toSshCmd(handler.Cfg), flagMsg, topic, @@ -450,7 +499,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware { readErr, writeErr := pubsub.Pipe( ctx, - fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()), + clientID, sesh, []*psub.Channel{ psub.NewChannel(name), diff --git a/pubsub/ssh.go b/pubsub/ssh.go index 3d55c4f3..b18f8b6e 100644 --- a/pubsub/ssh.go +++ b/pubsub/ssh.go @@ -8,6 +8,7 @@ import ( "syscall" "time" + "github.com/antoniomika/syncmap" "github.com/charmbracelet/promwish" "github.com/charmbracelet/wish" "github.com/picosh/pico/db/postgres" @@ -32,10 +33,11 @@ func StartSshServer() { pubsub := psub.NewMulticast(logger) handler := &CliHandler{ - Logger: logger, - DBPool: dbh, - PubSub: pubsub, - Cfg: cfg, + Logger: logger, + DBPool: dbh, + PubSub: pubsub, + Cfg: cfg, + Waiters: syncmap.New[string, []string](), } sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)