Skip to content

Commit

Permalink
Add websocket to pipe web
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniomika committed Jan 28, 2025
1 parent 46cac13 commit a812bed
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/gorilla/feeds v1.2.0
github.com/gorilla/websocket v1.5.3
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/microcosm-cc/bluemonday v1.0.27
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8=
github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0=
github.com/gorilla/feeds v1.2.0 h1:O6pBiXJ5JHhPvqy53NsjKOThq+dNFm8+DFrxBEdzSCc=
github.com/gorilla/feeds v1.2.0/go.mod h1:WMib8uJP3BbY+X8Szd1rA5Pzhdfh+HCCAYT2z7Fza6Y=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down
119 changes: 119 additions & 0 deletions pipe/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/picosh/pico/db/postgres"
"github.com/picosh/pico/shared"
"github.com/picosh/utils/pipe"
Expand All @@ -23,6 +24,11 @@ import (
var (
cleanRegex = regexp.MustCompile(`[^0-9a-zA-Z,/]`)
sshClient *pipe.Client
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)

func serveFile(file string, contentType string) http.HandlerFunc {
Expand Down Expand Up @@ -264,6 +270,118 @@ func handlePub(pubsub bool) http.HandlerFunc {
}
}

func handlePipe() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
logger := shared.GetLogger(r)

c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Error("pipe upgrade error", "err", err.Error())
return
}

defer c.Close()

clientInfo := shared.NewPicoPipeClient()
topic, _ := url.PathUnescape(shared.GetField(r, 0))

topic = cleanRegex.ReplaceAllString(topic, "")

logger.Info("pipe", "topic", topic, "info", clientInfo)

params := "-p -c"
if r.URL.Query().Get("status") == "true" {
params = params[:len(params)-3]
}

if r.URL.Query().Get("replay") == "true" {
params += " -r"
}

messageType := websocket.TextMessage
if r.URL.Query().Get("binary") == "true" {
messageType = websocket.BinaryMessage
}

if accessList := r.URL.Query().Get("access"); accessList != "" {
logger.Info("adding access list", "topic", topic, "info", clientInfo, "access", accessList)
cleanList := cleanRegex.ReplaceAllString(accessList, "")
params += fmt.Sprintf(" -a=%s", cleanList)
}

id := uuid.NewString()

p, err := sshClient.AddSession(id, fmt.Sprintf("pipe %s %s", params, topic), 0, -1, -1)
if err != nil {
logger.Error("pipe error", "topic", topic, "info", clientInfo, "err", err.Error())
http.Error(w, "server error", http.StatusInternalServerError)
return
}

go func() {
<-r.Context().Done()
err := sshClient.RemoveSession(id)
if err != nil {
logger.Error("pipe remove error", "topic", topic, "info", clientInfo, "err", err.Error())
}
c.Close()
}()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer func() {
p.Close()
c.Close()
wg.Done()
}()

for {
_, message, err := c.ReadMessage()
if err != nil {
logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
break
}

_, err = p.Write(message)
if err != nil {
logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
break
}
}
}()

go func() {
defer func() {
p.Close()
c.Close()
wg.Done()
}()

for {
buf := make([]byte, 32*1024)

n, err := p.Read(buf)
if err != nil {
logger.Error("pipe read error", "topic", topic, "info", clientInfo, "err", err.Error())
break
}

buf = buf[:n]

err = c.WriteMessage(messageType, buf)
if err != nil {
logger.Error("pipe write error", "topic", topic, "info", clientInfo, "err", err.Error())
break
}
}
}()

wg.Wait()
}
}

func createMainRoutes(staticRoutes []shared.Route) []shared.Route {
routes := []shared.Route{
shared.NewRoute("GET", "/", shared.CreatePageHandler("html/marketing.page.tmpl")),
Expand All @@ -275,6 +393,7 @@ func createMainRoutes(staticRoutes []shared.Route) []shared.Route {
shared.NewRoute("POST", "/topic/(.+)", handlePub(false)),
shared.NewRoute("GET", "/pubsub/(.+)", handleSub(true)),
shared.NewRoute("POST", "/pubsub/(.+)", handlePub(true)),
shared.NewRoute("GET", "/pipe/(.+)", handlePipe()),
}

for _, route := range pipeRoutes {
Expand Down

0 comments on commit a812bed

Please sign in to comment.