Skip to content

Commit

Permalink
chore: cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
neurosnap committed Oct 3, 2024
1 parent b0fc9fa commit a3c9e6b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
25 changes: 8 additions & 17 deletions cmd/authorized_keys/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"syscall"
"time"

"github.com/antoniomika/syncmap"
"github.com/charmbracelet/ssh"
"github.com/charmbracelet/wish"
"github.com/google/uuid"
Expand All @@ -26,7 +25,7 @@ func GetEnv(key string, defaultVal string) string {
return defaultVal
}

func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
func PubSubMiddleware(broker pubsub.PubSub, logger *slog.Logger) wish.Middleware {
return func(next ssh.Handler) ssh.Handler {
return func(sesh ssh.Session) {
args := sesh.Command()
Expand All @@ -41,7 +40,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {

topics := strings.Split(topicsRaw, ",")

logger := cfg.Logger.With(
logger := logger.With(
"cmd", cmd,
"topics", topics,
)
Expand All @@ -59,7 +58,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {

clientID := uuid.NewString()

err := errors.Join(cfg.PubSub.Sub(sesh.Context(), clientID, sesh, chans, args[len(args)-1] == "keepalive"))
err := errors.Join(broker.Sub(sesh.Context(), clientID, sesh, chans, args[len(args)-1] == "keepalive"))
if err != nil {
logger.Error("error during pub", slog.Any("error", err), slog.String("client", clientID))
}
Expand All @@ -72,7 +71,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {

clientID := uuid.NewString()

err := errors.Join(cfg.PubSub.Pub(sesh.Context(), clientID, sesh, chans))
err := errors.Join(broker.Pub(sesh.Context(), clientID, sesh, chans))
if err != nil {
logger.Error("error during pub", slog.Any("error", err), slog.String("client", clientID))
}
Expand All @@ -85,7 +84,7 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {

clientID := uuid.NewString()

err := errors.Join(cfg.PubSub.Pipe(sesh.Context(), clientID, sesh, chans, args[len(args)-1] == "replay"))
err := errors.Join(broker.Pipe(sesh.Context(), clientID, sesh, chans, args[len(args)-1] == "replay"))
if err != nil {
logger.Error(
"pipe error",
Expand All @@ -107,23 +106,15 @@ func main() {
host := GetEnv("SSH_HOST", "0.0.0.0")
port := GetEnv("SSH_PORT", "2222")
keyPath := GetEnv("SSH_AUTHORIZED_KEYS", "./ssh_data/authorized_keys")
cfg := &pubsub.Cfg{
Logger: logger,
PubSub: pubsub.NewMulticast(
&pubsub.BaseBroker{
Channels: syncmap.New[string, *pubsub.Channel](),
},
logger,
),
}
broker := pubsub.NewMulticast(logger)

s, err := wish.NewServer(
ssh.NoPty(),
wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
wish.WithAuthorizedKeys(keyPath),
wish.WithMiddleware(
PubSubMiddleware(cfg),
PubSubMiddleware(broker, logger),
),
)
if err != nil {
Expand All @@ -149,7 +140,7 @@ func main() {
slog.Info("Debug Info", slog.Int("goroutines", runtime.NumGoroutine()))
select {
case <-time.After(5 * time.Second):
for _, channel := range cfg.PubSub.GetChannels() {
for _, channel := range broker.GetChannels() {
slog.Info("channel online", slog.Any("channel topic", channel.Topic))
for _, client := range channel.GetClients() {
slog.Info("client online", slog.Any("channel topic", channel.Topic), slog.Any("client", client.ID), slog.String("direction", client.Direction.String()))
Expand Down
8 changes: 6 additions & 2 deletions multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ import (
"io"
"iter"
"log/slog"

"github.com/antoniomika/syncmap"
)

type Multicast struct {
Broker
Logger *slog.Logger
}

func NewMulticast(broker Broker, logger *slog.Logger) *Multicast {
func NewMulticast(logger *slog.Logger) *Multicast {
return &Multicast{
Broker: broker,
Logger: logger,
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
},
}
}

Expand Down

0 comments on commit a3c9e6b

Please sign in to comment.