Skip to content

Commit

Permalink
Add public key comment to pubsub ls
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniomika committed Oct 9, 2024
1 parent 215a842 commit 79f17fd
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ replace github.com/gdamore/tcell/v2 => github.com/delthas/tcell/v2 v2.4.1-0.2023
require (
git.sr.ht/~delthas/senpai v0.3.1-0.20240425235039-206be659439e
github.com/alecthomas/chroma/v2 v2.14.0
github.com/antoniomika/syncmap v1.0.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/charmbracelet/bubbles v0.18.0
github.com/charmbracelet/bubbletea v1.1.1
Expand Down Expand Up @@ -60,7 +61,6 @@ require (
github.com/PuerkitoBio/goquery v1.9.2 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/antoniomika/syncmap v1.0.0 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aws/aws-sdk-go-v2 v1.32.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
Expand Down
81 changes: 65 additions & 16 deletions pubsub/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"fmt"
"io"
"log/slog"
"slices"
"strings"
"text/tabwriter"
"time"

"github.com/antoniomika/syncmap"
"github.com/charmbracelet/ssh"
"github.com/charmbracelet/wish"
"github.com/google/uuid"
Expand Down Expand Up @@ -79,10 +81,10 @@ func clientInfo(clients []*psub.Client, clientType string) string {
return ""
}

outputData := fmt.Sprintf("\t%s:\r\n", clientType)
outputData := fmt.Sprintf(" %s:\r\n", clientType)

for _, client := range clients {
outputData += fmt.Sprintf("\t- %s\r\n", client.ID)
outputData += fmt.Sprintf(" - %s\r\n", client.ID)
}

return outputData
Expand Down Expand Up @@ -110,10 +112,11 @@ data is being sent:
}

type CliHandler struct {
DBPool db.DB
Logger *slog.Logger
PubSub psub.PubSub
Cfg *shared.ConfigSite
DBPool db.DB
Logger *slog.Logger
PubSub psub.PubSub
Cfg *shared.ConfigSite
Waiters *syncmap.Map[string, []string]
}

func toSshCmd(cfg *shared.ConfigSite) string {
Expand Down Expand Up @@ -162,22 +165,29 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
}

var channels []*psub.Channel
waitingChannels := map[string][]string{}

for topic, channel := range pubsub.GetChannels() {
if strings.HasPrefix(topic, topicFilter) {
channels = append(channels, channel)
}
}

if len(channels) == 0 {
for channel, clients := range handler.Waiters.Range {
if strings.HasPrefix(channel, topicFilter) {
waitingChannels[channel] = clients
}
}

if len(channels) == 0 && len(waitingChannels) == 0 {
wish.Println(sesh, "no pubsub channels found")
} else {
var outputData string
if len(channels) > 0 {
if len(channels) > 0 || len(waitingChannels) > 0 {
outputData += "Channel Information\r\n"
for _, channel := range channels {
outputData += fmt.Sprintf("- %s:\r\n", channel.Topic)
outputData += "\tClients:\r\n"
outputData += " Clients:\r\n"

var pubs []*psub.Client
var subs []*psub.Client
Expand All @@ -196,6 +206,15 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
outputData += clientInfo(subs, "Subs")
outputData += clientInfo(pipes, "Pipes")
}

for waitingChannel, channelPubs := range waitingChannels {
outputData += fmt.Sprintf("- %s:\r\n", waitingChannel)
outputData += " Clients:\r\n"
outputData += fmt.Sprintf(" %s:\r\n", "Waiting Pubs")
for _, client := range channelPubs {
outputData += fmt.Sprintf(" - %s\r\n", client)
}
}
}

_, _ = sesh.Write([]byte(outputData))
Expand All @@ -220,6 +239,14 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
"cmdArgs", cmdArgs,
)

userName := user.Name

if user.PublicKey.Name != "" {
userName += fmt.Sprintf("-%s", user.PublicKey.Name)
}

clientID := fmt.Sprintf("%s (%s@%s)", uuid.NewString(), userName, sesh.RemoteAddr().String())

if cmd == "pub" {
pubCmd := flagSet("pub", sesh)
empty := pubCmd.Bool("e", false, "Send an empty message to subs")
Expand Down Expand Up @@ -270,14 +297,12 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {

wish.Printf(
sesh,
"subscribe to this channel:\n\tssh %s sub %s%s\n",
"subscribe to this channel:\n ssh %s sub %s%s\n",
toSshCmd(handler.Cfg),
msgFlag,
topic,
)

wish.Println(sesh, "sending msg ...")

var pubCtx context.Context = ctx

if *block {
Expand All @@ -295,6 +320,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {

tt := *timeout
if count == 0 {
currentWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
handler.Waiters.Store(name, append(currentWaiters, clientID))

termMsg := "no subs found ... waiting"
if tt > 0 {
termMsg += " " + tt.String()
Expand Down Expand Up @@ -335,16 +363,37 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {

select {
case <-ready:
case <-ctx.Done():
case <-time.After(tt):
cancelFunc()
wish.Fatalln(sesh, "timeout reached, exiting ...")
}

newWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
newWaiters = slices.DeleteFunc(newWaiters, func(cl string) bool {
return cl == clientID
})
handler.Waiters.Store(name, newWaiters)

var toDelete []string

for channel, clients := range handler.Waiters.Range {
if len(clients) == 0 {
toDelete = append(toDelete, channel)
}
}

for _, channel := range toDelete {
handler.Waiters.Delete(channel)
}
}
}

wish.Println(sesh, "sending msg ...")

err = pubsub.Pub(
pubCtx,
fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
clientID,
rw,
[]*psub.Channel{
psub.NewChannel(name),
Expand Down Expand Up @@ -389,7 +438,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {

err = pubsub.Sub(
ctx,
fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
clientID,
sesh,
[]*psub.Channel{
psub.NewChannel(name),
Expand Down Expand Up @@ -441,7 +490,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
if isCreator {
wish.Printf(
sesh,
"subscribe to this topic:\n\tssh %s sub %s%s\n",
"subscribe to this topic:\n ssh %s sub %s%s\n",
toSshCmd(handler.Cfg),
flagMsg,
topic,
Expand All @@ -450,7 +499,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {

readErr, writeErr := pubsub.Pipe(
ctx,
fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
clientID,
sesh,
[]*psub.Channel{
psub.NewChannel(name),
Expand Down
10 changes: 6 additions & 4 deletions pubsub/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"syscall"
"time"

"github.com/antoniomika/syncmap"
"github.com/charmbracelet/promwish"
"github.com/charmbracelet/wish"
"github.com/picosh/pico/db/postgres"
Expand All @@ -32,10 +33,11 @@ func StartSshServer() {

pubsub := psub.NewMulticast(logger)
handler := &CliHandler{
Logger: logger,
DBPool: dbh,
PubSub: pubsub,
Cfg: cfg,
Logger: logger,
DBPool: dbh,
PubSub: pubsub,
Cfg: cfg,
Waiters: syncmap.New[string, []string](),
}

sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)
Expand Down

0 comments on commit 79f17fd

Please sign in to comment.