Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternate pubsub implementation #140

Merged
merged 6 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22 as builder-deps
FROM --platform=$BUILDPLATFORM golang:1.22 AS builder-deps
LABEL maintainer="Pico Maintainers <[email protected]>"

WORKDIR /app
Expand All @@ -8,9 +8,11 @@ RUN apt-get install -y git ca-certificates

COPY go.* ./

RUN go mod download
RUN --mount=type=cache,target=/go/pkg/,rw \
--mount=type=cache,target=/root/.cache/,rw \
go mod download

FROM builder-deps as builder-web
FROM builder-deps AS builder-web

COPY . .

Expand All @@ -23,9 +25,11 @@ ENV LDFLAGS="-s -w"

ENV GOOS=${TARGETOS} GOARCH=${TARGETARCH}

RUN go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-web ./cmd/${APP}/web
RUN --mount=type=cache,target=/go/pkg/,rw \
--mount=type=cache,target=/root/.cache/,rw \
go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-web ./cmd/${APP}/web

FROM builder-deps as builder-ssh
FROM builder-deps AS builder-ssh

COPY . .

Expand All @@ -38,9 +42,11 @@ ENV LDFLAGS="-s -w"

ENV GOOS=${TARGETOS} GOARCH=${TARGETARCH}

RUN go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-ssh ./cmd/${APP}/ssh
RUN --mount=type=cache,target=/go/pkg/,rw \
--mount=type=cache,target=/root/.cache/,rw \
go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-ssh ./cmd/${APP}/ssh

FROM scratch as release-web
FROM scratch AS release-web

WORKDIR /app

Expand All @@ -53,7 +59,7 @@ COPY --from=builder-web /app/${APP}/public ./${APP}/public

ENTRYPOINT ["/app/web"]

FROM scratch as release-ssh
FROM scratch AS release-ssh

WORKDIR /app
ENV TERM="xterm-256color"
Expand Down
2 changes: 1 addition & 1 deletion bouncer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22-alpine as builder
FROM golang:1.22-alpine AS builder

WORKDIR /app

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ services:
pubsub-ssh:
build:
args:
APP:pubsub
APP: pubsub
target: release-ssh
env_file:
- .env.example
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ replace github.com/gdamore/tcell/v2 => github.com/delthas/tcell/v2 v2.4.1-0.2023
require (
git.sr.ht/~delthas/senpai v0.3.1-0.20240425235039-206be659439e
github.com/alecthomas/chroma/v2 v2.14.0
github.com/antoniomika/syncmap v1.0.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/charmbracelet/bubbles v0.18.0
github.com/charmbracelet/bubbletea v0.27.0
Expand All @@ -36,7 +37,7 @@ require (
github.com/muesli/termenv v0.15.3-0.20240509142007-81b8f94111d5
github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a
github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b
github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2
github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc
github.com/picosh/tunkit v0.0.0-20240709033345-8315d4f3cd0e
github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsVi
github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEyMKWU=
github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
Expand Down Expand Up @@ -224,8 +226,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a h1:Cr1xODiyd/SjjBRtYA9VX6Do3D+w+DansQzkb4NGeyA=
github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a/go.mod h1:VIkR1MZBvxSK2OO47jikxikAO/sKb/vTmXX5ZuYTIvo=
github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b h1:/gGhT8y9rnrv8K9ZJKZYzdWvZcnazl8NGE1DGNrD8HU=
github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b/go.mod h1:FKC8uot+40iXmuDzTfbxYDG5PIc3ghwkmP2iItBKH0I=
github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2 h1:Em/eEiElW3OHKDLzchzJ7m8OOk+yJ8dgc7cH0d0c55Q=
github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc h1:IIsJuAFG2ju3cygKVKTIsYYZf21q5S3Dr1H4fGbfgJg=
github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=
Expand Down
7 changes: 4 additions & 3 deletions imgs/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,13 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
opts.bail(err)
return
} else if cmd == "sub" {
err = pubsub.PubSub.Sub(&psub.Subscriber{
err = pubsub.PubSub.Sub(fmt.Sprintf("%s/%s", user.Name, repoName), &psub.Sub{
ID: uuid.NewString(),
Name: fmt.Sprintf("%s@%s", user.Name, repoName),
Writer: sesh,
Chan: make(chan error),
Done: make(chan struct{}),
Data: make(chan []byte),
})

if err != nil {
wish.Errorln(sesh, err)
}
Expand Down
6 changes: 4 additions & 2 deletions imgs/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/charmbracelet/ssh"
"github.com/charmbracelet/wish"
"github.com/google/uuid"
"github.com/picosh/pico/db"
"github.com/picosh/pico/db/postgres"
"github.com/picosh/pico/shared"
Expand Down Expand Up @@ -229,8 +230,9 @@ func createServeMux(handler *CliHandler, pubsub *psub.Cfg) func(ctx ssh.Context)
)
handler.Logger.Info("sending event", "url", furl)

err := pubsub.PubSub.Pub(&psub.Msg{
Name: fmt.Sprintf("%s@%s:%s", user.Name, img, tag),
err := pubsub.PubSub.Pub(fmt.Sprintf("%s@%s:%s", user.Name, img, tag), &psub.Pub{
ID: uuid.NewString(),
Done: make(chan struct{}),
Reader: strings.NewReader(furl),
})

Expand Down
109 changes: 50 additions & 59 deletions pubsub/cli.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pubsub

import (
"bytes"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -59,24 +60,11 @@ func getUser(s ssh.Session, dbpool db.DB) (*db.User, error) {

// scope channel to user by prefixing name.
func toChannel(userName, name string) string {
return fmt.Sprintf("%s@%s", userName, name)
return fmt.Sprintf("%s/%s", userName, name)
}

func toPublicChannel(name string) string {
return fmt.Sprintf("public@%s", name)
}

// extract user and scoped channel from channel.
func fromChannel(channel string) (string, string) {
sp := strings.SplitN(channel, "@", 2)
ln := len(sp)
if ln == 0 {
return "", ""
}
if ln == 1 {
return "", ""
}
return sp[0], sp[1]
return fmt.Sprintf("public/%s", name)
}

var helpStr = `Commands: [pub, sub, ls]
Expand Down Expand Up @@ -123,26 +111,34 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
if cmd == "help" {
wish.Println(sesh, helpStr)
} else if cmd == "ls" {
subs := pubsub.PubSub.GetSubs()
channelFilter := fmt.Sprintf("%s/", user.Name)
if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
channelFilter = ""
}

channels := pubsub.PubSub.GetChannels(channelFilter)

if len(subs) == 0 {
wish.Println(sesh, "no subs found")
if len(channels) == 0 {
wish.Println(sesh, "no pubsub channels found")
} else {
writer := NewTabWriter(sesh)
fmt.Fprintln(writer, "Channel\tID")
for _, sub := range subs {
userName, _ := fromChannel(sub.Name)
if userName != "public" && userName != user.Name {
continue
}

fmt.Fprintf(
writer,
"%s\t%s\n",
sub.Name, sub.ID,
)
outputData := "Channel Information\r\n"
for _, channel := range channels {
outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
outputData += "\tPubs:\r\n"

channel.Pubs.Range(func(I string, J *psub.Pub) bool {
outputData += fmt.Sprintf("\t- %s:\r\n", I)
return true
})

outputData += "\tSubs:\r\n"

channel.Subs.Range(func(I string, J *psub.Sub) bool {
outputData += fmt.Sprintf("\t- %s:\r\n", I)
return true
})
}
writer.Flush()
_, _ = sesh.Write([]byte(outputData))
}
}
next(sesh)
Expand Down Expand Up @@ -170,7 +166,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {

var reader io.Reader
if *empty {
reader = strings.NewReader("")
reader = bytes.NewReader(make([]byte, 1))
} else {
reader = sesh
}
Expand All @@ -181,34 +177,32 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
}

wish.Println(sesh, "sending msg ...")
msg := &psub.Msg{
Name: name,
pub := &psub.Pub{
ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
Done: make(chan struct{}),
Reader: reader,
}

// hacky: we want to notify when no subs are found so
// we duplicate some logic for now
subs := pubsub.PubSub.GetSubs()
found := false
for _, sub := range subs {
if pubsub.PubSub.PubMatcher(msg, sub) {
found = true
break
}
count := 0
channelInfo := pubsub.PubSub.GetChannel(name)

if channelInfo != nil {
channelInfo.Subs.Range(func(I string, J *psub.Sub) bool {
count++
return true
})
}
if !found {

if count == 0 {
wish.Println(sesh, "no subs found ... waiting")
}

go func() {
<-ctx.Done()
err := pubsub.PubSub.UnPub(msg)
if err != nil {
wish.Errorln(sesh, err)
}
pub.Cleanup()
}()

err = pubsub.PubSub.Pub(msg)
err = pubsub.PubSub.Pub(name, pub)
wish.Println(sesh, "msg sent!")
if err != nil {
wish.Errorln(sesh, err)
Expand All @@ -226,21 +220,18 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
name = toPublicChannel(channelName)
}

sub := &psub.Subscriber{
ID: uuid.NewString(),
Name: name,
sub := &psub.Sub{
ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
Writer: sesh,
Chan: make(chan error),
Done: make(chan struct{}),
Data: make(chan []byte),
}

go func() {
<-ctx.Done()
err := pubsub.PubSub.UnSub(sub)
if err != nil {
wish.Errorln(sesh, err)
}
sub.Cleanup()
}()
err = pubsub.PubSub.Sub(sub)
err = pubsub.PubSub.Sub(name, sub)
if err != nil {
wish.Errorln(sesh, err)
}
Expand Down
5 changes: 3 additions & 2 deletions pubsub/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"syscall"
"time"

"github.com/antoniomika/syncmap"
"github.com/charmbracelet/promwish"
"github.com/charmbracelet/wish"
"github.com/picosh/pico/db/postgres"
Expand All @@ -29,8 +30,8 @@ func StartSshServer() {
pubsub := &psub.Cfg{
Logger: logger,
PubSub: &psub.PubSubMulticast{
Logger: logger,
Chan: make(chan *psub.Subscriber),
Logger: logger,
Channels: syncmap.New[string, *psub.Channel](),
},
}

Expand Down
Loading