diff --git a/prscd/.gitignore b/prscd/.gitignore new file mode 100644 index 0000000..199a7f9 --- /dev/null +++ b/prscd/.gitignore @@ -0,0 +1,7 @@ +.DS_Store +build/ +bin/ +dist/ +.vscode/ +test_pages/ +prscd.pid diff --git a/prscd/Makefile b/prscd/Makefile new file mode 100644 index 0000000..9c6dbe7 --- /dev/null +++ b/prscd/Makefile @@ -0,0 +1,55 @@ +GO ?= go +GOFMT ?= gofmt "-s" +GOFILES := $(shell find . -name "*.go") +VETPACKAGES ?= $(shell $(GO) list ./... | grep -v /example/) + +.PHONY: fmt +fmt: + $(GOFMT) -w $(GOFILES) + +.PHONY: vet +vet: + $(GO) vet $(VETPACKAGES) + +.PHONY: lint +lint: + revive -exclude chirp/*_test.go -exclude cmd/prscd/epoll.go -formatter friendly ./... + +.PHONY: build +build: + $(GO) build -o bin/prscd ./cmd/prscd + +.PHONY: dist +dist: clean + GOOS=linux GOARCH=amd64 $(GO) build -ldflags "-s -w" -o dist/prscd-x86_64-linux ./cmd/prscd + GOOS=linux GOARCH=arm64 $(GO) build -ldflags "-s -w" -o dist/prscd-arm64-linux ./cmd/prscd + GOOS=darwin GOARCH=amd64 $(GO) build -ldflags "-s -w" -o dist/prscd-x86_64-darwin ./cmd/prscd + GOOS=darwin GOARCH=arm64 $(GO) build -ldflags "-s -w" -o dist/prscd-arm64-darwin ./cmd/prscd + GOOS=windows GOARCH=amd64 $(GO) build -ldflags "-s -w" -o dist/prscd-x86_64-windows.exe ./cmd/prscd + GOOS=windows GOARCH=arm64 $(GO) build -ldflags "-s -w" -o dist/prscd-arm64-windows.exe ./cmd/prscd + +.PHONY: dev +dev: + $(GO) run -race ./cmd/prscd + +.PHONY: test +test: + MESH_ID=test go test ./... + +.PHONY: bench +bench: + MESH_ID=bench LOG_LEVEL=2 go test -bench=. -benchmem yomo.run/prscd/chirp + +.PHONY: testpage +testpage: + @mkdir -p ./test_pages + @cp msgpack.js ./test_pages + @cp websocket.html test_pages/. + @sed -i '' 's/URL_DEBG/URL_PROD/g' test_pages/websocket.html + @cp webtrans.html test_pages/. + @sed -i '' 's/URL_DEBG/URL_PROD/g' test_pages/webtrans.html + +.PHONY: clean +clean: + @rm -rf dist + @rm -rf bin diff --git a/prscd/README.md b/prscd/README.md new file mode 100644 index 0000000..a3eb1ff --- /dev/null +++ b/prscd/README.md @@ -0,0 +1,141 @@ + +# 🌎 Prscd + +The open source backend for Presencejs v2.0 + +## 🎯 Roadmap + +- [x] Websocket arraybuffer support +- [x] Zero-copy upgrade to WebSocket +- [x] SO_REUSEPORT on Darwin and Linux +- [x] Implement WebSocket native Ping/Pong frame to keep alive +- [ ] reuse goroutine +- [x] WebTransport Datagram support, unreliable but fast communication +- [ ] WebTransport Stream support, reliable +- [ ] pprof +- [x] Prscd Clusters by YoMo +- [x] Geo-distributed System / Distributed Cloud arch by YoMo + +## 🥷🏻 Development + +1. Start prscd service in terminal-2:`make dev` +1. Open `webtransport.html` by Chrome with Dev Tools +1. Open `websocket.html` by Chrome with Dev Tools + +![](https://github.com/fanweixiao/gifs-repo/blob/main/prscd-readme.gif) + +[![asciicast](https://asciinema.org/a/565542.svg)](https://asciinema.org/a/565542) + +## 🦸🏻 Self-hosting + +Compile: + +```bash +make dist +``` + +### ☝🏻 Host on Single Cloud Region + +TODO: how to deploy `prscd` on digitalocean + +TODO: introducing [geoping.gg](https://geoping.gg), lighthouse for realtime applications. + +### 🌍 Host as Geo-distributed System + +DNS improvements explaination + +#### by Vercel edge functions + +Redirect end-user connect to node close to them. + +#### by AWS Global Accelarator + +Anycast IP + +#### by Azure Traffic Manager + +Geo-IP + +## ☕️ FAQ + +### about https://lo.yomo.dev + +```bash +$ openssl x509 -enddate -noout -in prscd/lo.yomo.dev.cert +notAfter=May 22 07:40:45 2023 GMT +``` + +### how to generate SSL for your own domain + +1. `brew install certbot` +2. `sudo certbot certonly --manual --preferred-challenges dns -d prscd.example.com` +3. create a TXT record followed the instruction by certbot +4. `nslookup -type=TXT _acme-challenge.prscd.example.com` to verify the process +5. `sudo chown -Rv "$(whoami)":staff /etc/letsencrypt/` set permission +6. cert and key: `/etc/letsencrypt/live/prscd.example.com/{fullchain, privkey}.pem` +7. verify the expiratioin time: `openssl x509 -enddate -noout -in prscd.example.com.cert.pem` + +### if you are behind a proxy on Mac + +Most of proxy applications drop WebTransport or HTTP/3, so if you are a macos user, +this bash script can helped bypass `*.yomo.dev` domain to proxy. + +```bash +networksetup -setproxybypassdomains "Wi-Fi" $(networksetup -getproxybypassdomains "Wi-Fi" | awk '{ printf "\"%s\" ", $0 }') "*.yomo.dev" +``` + +### Integrate to your own Auth system + +Currently, provide `public_key` for authentication, the endpoint looks like: `/v1?app_id=&public_key=` + +### Live inspection + +Execute `make dev` in terminal-1: + +```bash +$ make dev +go run -race main.go +pid: 20079 +Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT... +``` + +Open terminal-2, execute: + +```bash +$ kill -SIGUSR1 20079 +$ kill -SIGUSR2 20079 +``` + +The output of terminal-1 will looks like: + +```bash +$ make dev +go run -race main.go +pid: 20079 +Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT... +Received signal: user defined signal 1 +SIGUSR1 +Dump start -------- +Peers: 1 +Channel:room-1 + Peer:127.0.0.1:62577 +Dump doen -------- +Received signal: user defined signal 2 + NumGC = 0 +``` + +### Configure Firewall of Cloud Provider + +TCP and UDP on the `PORT` shall has to be allowed in security rules. + +## .env File + +- `DEBUG=true`: debug mode +- `PORT=443`: indicate the PORT used to listen, both WebSocket and WebTransport +- `MESH_ID=MID_EAST`: indicate nodes in distributed cloud archtecture +- `YOMO_SNDR_NAME`: the name of YoMo Source +- `YOMO_RCVR_NAME`: the name of YoMo Stream Function +- `CERT_FILE`: The SSL cert file path of prscd +- `YOMO_TRACE_JAEGER_ENDPOINT`: Jaeger collector endpoint, e.g., http://localhost:14268/api/traces + +- `KEY_FILE`: The SSL key file path of prscd diff --git a/prscd/build.sh b/prscd/build.sh new file mode 100644 index 0000000..822da95 --- /dev/null +++ b/prscd/build.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +set -e + +# prscd build script for Linux +# Environment variable options: +# - PRSCD_VERSION: App version +# - PRSCD_PLATFORMS: Platforms to build for (e.g. "windows/amd64,linux/amd64,darwin/amd64") + +export LC_ALL=C +export LC_DATE=C + +make_ldflags() { + local ldflags="-s -w -X 'main.appDate=$(date -u '+%F %T')'" + if [ -n "$PRSCD_VERSION" ]; then + ldflags="$ldflags -X 'main.appVersion=$PRSCD_VERSION'" + else + ldflags="$ldflags -X 'main.appVersion=$(git describe --tags --always --match 'v*')'" + fi + echo "$ldflags" +} + +build_for_platform() { + local platform="$1" + local ldflags="$2" + + local GOOS="${platform%/*}" + local GOARCH="${platform#*/}" + if [[ -z "$GOOS" || -z "$GOARCH" ]]; then + echo "Invalid platform $platform" >&2 + return 1 + fi + echo "Building $GOOS/$GOARCH" + local output="build/prscd" + if [[ "$GOOS" = "windows" ]]; then + output="$output.exe" + fi + # compress to .zip file + local binfile="build/prscd-$GOARCH-$GOOS.zip" + local exit_val=0 + GOOS=$GOOS GOARCH=$GOARCH go build -o "$output" -ldflags "$ldflags" -trimpath || exit_val=$? + # compress compiled binary to .zip + zip -r -j "$binfile" "$output" + rm -rf $output + if [[ "$exit_val" -ne 0 ]]; then + echo "Error: failed to build $GOOS/$GOARCH" >&2 + return $exit_val + fi +} + + +if [ -z "$PRSCD_PLATFORMS" ]; then + PRSCD_PLATFORMS="$(go env GOOS)/$(go env GOARCH)" +fi +platforms=(${PRSCD_PLATFORMS//,/ }) +ldflags="$(make_ldflags)" + +mkdir -p build +rm -rf build/* + +echo "Starting build..." + +for platform in "${platforms[@]}"; do + build_for_platform "$platform" "$ldflags" +done + +echo "Build complete." + +ls -lh build/ | awk '{print $9, $5}' diff --git a/prscd/chirp/channel.go b/prscd/chirp/channel.go new file mode 100644 index 0000000..f36c88f --- /dev/null +++ b/prscd/chirp/channel.go @@ -0,0 +1,79 @@ +// Package chirp describes peer-to-peer communication protocol. +package chirp + +import ( + "sync" + + "github.com/vmihailenco/msgpack/v5" + "github.com/yomorun/psig" + "yomo.run/prscd/util" +) + +// Channel describes a message channel. +type Channel struct { + UniqID string // uniq id + AppID string // TODO: APP_ID + pdic sync.Map // all peers subscribed this channel +} + +// AddPeer add peer to this channel. +func (c *Channel) AddPeer(p *Peer) { + c.pdic.Store(p.Sid, p) +} + +// RemovePeer remove peer from this channel. +func (c *Channel) RemovePeer(p *Peer) { + c.pdic.Delete(p.Sid) +} + +// Broadcast message to all peers in this channel by yomo, +// yomo create a distributed cloud network, peers from different location +// will connect to different nodes in this network, so the message will be +// broadcast to all nodes. +func (c *Channel) Broadcast(sig *psig.Signalling) { + sigSentOverYoMo := sig.Clone() + sigSentOverYoMo.AppID = c.AppID + sigSentOverYoMo.MeshID = Node.MeshID + go Node.BroadcastToYoMo(&sigSentOverYoMo) +} + +// Dispatch messages to all peers in this channel of current node. +func (c *Channel) Dispatch(sig *psig.Signalling) { + // sig.Sid is sender's sid when sending message + log.Debug("[%s]\tSND>: %+v", sig.Sid, sig) + var sender = sig.Sid + // do not broadcast APP_ID and Sid to end user + sig.AppID = "" + sig.Sid = "" + resp, err := msgpack.Marshal(sig) + if err != nil { + log.Error("msgpack marshal: %+v", err) + return + } + + c.pdic.Range(func(k, v interface{}) bool { + // do not broadcast to sender-self + sid := k.(string) + p := v.(*Peer) + if sid == sender { + util.Log.Debug("-----------ignore sender-self: %s", sender) + return true + } + util.Log.Debug("[%s] BroadcastPresence to ch:%s, for sid:%s", sender, c.UniqID, p.Sid) + err = p.conn.Write(resp) + if err != nil { + log.Error("ws.write error: %+v", err) + } + return true + }) +} + +// getLen returns the number of peers in this channel of current node. +func (c *Channel) getLen() int { + var count int + c.pdic.Range(func(k, v interface{}) bool { + count++ + return true + }) + return count +} diff --git a/prscd/chirp/connection.go b/prscd/chirp/connection.go new file mode 100644 index 0000000..92459dc --- /dev/null +++ b/prscd/chirp/connection.go @@ -0,0 +1,71 @@ +package chirp + +import ( + "net" + + "github.com/gobwas/ws/wsutil" + "github.com/quic-go/quic-go" +) + +// Connection is connection either WebSocket or WebTransport +type Connection interface { + // RemoteAddr returns the client network address. + RemoteAddr() string + // Write the data to the connection + Write(msg []byte) error +} + +/*** WebSocket ***/ + +// NewWebSocketConnection creates a new WebSocketConnection +func NewWebSocketConnection(conn net.Conn) Connection { + return &WebSocketConnection{ + underlyingConn: conn, + } +} + +// WebSocketConnection is a WebSocket connection +type WebSocketConnection struct { + underlyingConn net.Conn +} + +// RemoteAddr returns the client network address. +func (c *WebSocketConnection) RemoteAddr() string { + return (c.underlyingConn).RemoteAddr().String() +} + +// Write the data to the connection +func (c *WebSocketConnection) Write(msg []byte) error { + return wsutil.WriteServerBinary(c.underlyingConn, msg) +} + +/*** WebTransport ***/ + +// NewWebTransportConnection creates a new WebTransportConnection +func NewWebTransportConnection(conn quic.Connection) Connection { + return &WebTransportConnection{ + underlyingConn: conn, + } +} + +// WebTransportConnection is a WebTransport connection +type WebTransportConnection struct { + underlyingConn quic.Connection +} + +// RemoteAddr returns the client network address. +func (c *WebTransportConnection) RemoteAddr() string { + return c.underlyingConn.RemoteAddr().String() +} + +// Write the data to the connection +func (c *WebTransportConnection) Write(msg []byte) error { + // add 0x00 to msg + buf := []byte{0x00} + buf = append(buf, msg...) + if err := c.underlyingConn.SendMessage(buf); err != nil { + log.Error("[%s] SendMessage error: %v", c.RemoteAddr(), err) + return err + } + return nil +} diff --git a/prscd/chirp/node.go b/prscd/chirp/node.go new file mode 100644 index 0000000..cc30aa3 --- /dev/null +++ b/prscd/chirp/node.go @@ -0,0 +1,222 @@ +package chirp + +import ( + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/vmihailenco/msgpack/v5" + "github.com/yomorun/psig" + "github.com/yomorun/yomo" + "github.com/yomorun/yomo/serverless" + "yomo.run/prscd/util" +) + +var log = util.Log + +const ( + // Endpoint is the base path of service + Endpoint string = "/v1" +) + +type node struct { + cdic sync.Map // all channels on this node + pdic sync.Map // all peers on this node + Env string // Env describes the environment of this node, e.g. "dev", "prod" + MeshID string // MeshID describes the id of this node + sndr yomo.Source // the yomo source used to send data to the geo-distributed network which built by yomo + rcvr yomo.StreamFunction // the yomo stream function used to receive data from the geo-distributed network which built by yomo +} + +// AuthUser auth user by public key +func (n *node) AuthUser(publicKey string) (appID string, ok bool) { + log.Info("Node| auth_user: publicKey=%s", publicKey) + + // implement your own auth logic if needed + + if n.Env == "dev" { + log.Debug("Node| auth_user: DEV MODE, skip") + return "DEV_APP", true + } + + return "YOMO_APP", true +} + +// AddPeer add peer to channel named `cid` on this node. +func (n *node) AddPeer(conn Connection, cid, appID string) *Peer { + log.Info("[%s] node.add_peer: %s", conn.RemoteAddr(), cid) + peer := &Peer{ + Sid: conn.RemoteAddr(), + Cid: cid, + Channels: make(map[string]*Channel), + conn: conn, + AppID: appID, + } + + n.pdic.Store(n.getIDOnNode(appID, peer.Sid), peer) + + return peer +} + +// RemovePeer remove peer on this node. +func (n *node) RemovePeer(appID, pid string) { + log.Info("[%s] node.remove_peer", pid) + n.pdic.Delete(n.getIDOnNode(appID, pid)) +} + +// getIDOnNode get the unique id of peer or channel on this node. +func (n *node) getIDOnNode(appID, name string) string { + return appID + "|" + name +} + +// GetOrCreateChannel get or create channel on this node. +func (n *node) GetOrAddChannel(appID, name string) *Channel { + channelNameOnNode := n.getIDOnNode(appID, name) + channel, ok := n.cdic.LoadOrStore(channelNameOnNode, &Channel{ + UniqID: name, + AppID: appID, + }) + + if !ok { + log.Info("create channel: %s", name) + } + + return channel.(*Channel) +} + +// FindChannel returns the channel on this node by name. +func (n *node) FindChannel(appID, name string) *Channel { + channelNameOnNode := n.getIDOnNode(appID, name) + ch, ok := n.cdic.Load(channelNameOnNode) + if !ok { + log.Debug("channel not found: %s", channelNameOnNode) + return nil + } + return ch.(*Channel) +} + +// ConnectToYoMo connect this node to who geo-distributed network which built by yomo. +func (n *node) ConnectToYoMo(sndr yomo.Source, rcvr yomo.StreamFunction) error { + // connect yomo source to zipper + err := sndr.Connect() + if err != nil { + return err + } + + sfnHandler := func(ctx serverless.Context) { + var sig *psig.Signalling + err := msgpack.Unmarshal(ctx.Data(), &sig) + if err != nil { + log.Error("Read from YoMo error: %v, msg=%# x, string(msg)=%s", err, ctx.Data(), ctx.Data()) + } + log.Debug("\033[32m[\u21CA\u21CA]\t%s\033[36m", sig) + + channel := n.FindChannel(sig.AppID, sig.Channel) + if channel != nil { + channel.Dispatch(sig) + log.Debug("[\u21CA]\t dispatched to %s", sig.Cid) + } else { + log.Debug("[\u21CA]\t dispatch to channel failed cause of not exist: %s", sig.Cid) + } + } + + // set observe data tags from yomo network by yomo stream function + // 0x20 comes from other prscd nodes + // 0x21 comes from backend sfn + rcvr.SetObserveDataTags(0x20, 0x21) + + // handle data from yomo network, and dispatch to the same channel on this node. + rcvr.SetHandler(sfnHandler) + + err = rcvr.Connect() + if err != nil { + return err + } + + n.sndr = sndr + n.rcvr = rcvr + return nil +} + +// BroadcastToYoMo broadcast presence to yomo +func (n *node) BroadcastToYoMo(sig *psig.Signalling) { + // sig.Sid is sender's sid when sending message + log.Debug("\033[34m[%s][\u21C8\u21C8]\t %s\033[36m", sig.AppID, sig) + buf, err := msgpack.Marshal(sig) + if err != nil { + log.Error("msgpack marshal: %+v", err) + return + } + + err = n.sndr.Write(0x20, buf) + if err != nil { + log.Error("broadcast to yomo error: %+v", err) + } +} + +// Node describes current node, which is a singleton. There is only one node in a `prscd` process. +// But multiple `prscd` processes can be served on the same server. +var Node *node + +// CreateNodeSingleton create the singleton node instance. +func CreateNodeSingleton() { + log.Info("init Node instance, mesh_id=%s", os.Getenv("MESH_ID")) + Node = &node{ + MeshID: os.Getenv("MESH_ID"), + } +} + +// DumpNodeState prints the user and room information to stdout. +func DumpNodeState() { + log.Info("Dump start --------") + Node.cdic.Range(func(k1, v1 interface{}) bool { + log.Info("Channel:%s", k1) + ch := v1.(*Channel) + log.Info("\t Peers count: %d", ch.getLen()) + ch.pdic.Range(func(key, value interface{}) bool { + log.Info("\tPeer: sid=%s, cid=%s", key, value) + return true + }) + return true + }) + log.Info("Dump done --------") +} + +// DumpConnectionsState prints the user and room information to stdout. +func DumpConnectionsState() { + log.Info("Dump start --------") + counter := make(map[string]int) + Node.cdic.Range(func(k1, v1 interface{}) bool { + log.Info("Channel:%s", k1) + chName := k1.(string) + ch := v1.(*Channel) + peersCount := ch.getLen() + // chName is like "appID|channelName", so we need to split it to get appID + appID := strings.Split(chName, "|")[0] + log.Info("\t[%s] %s Peers count: %d", appID, chName, peersCount) + if _, ok := counter[appID]; !ok { + counter[appID] = peersCount + } else { + counter[appID] += peersCount + } + return true + }) + // list all counter + for appID, count := range counter { + log.Info("->[%s] connections: %d", appID, count) + } + // write counter to /tmp/conns.log + f, err := os.OpenFile("/tmp/conns.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Error("open file: %v", err) + } + defer f.Close() + timestamp := time.Now().Unix() + for appID, count := range counter { + if count > 0 { + f.WriteString(fmt.Sprintf("{\"timestamp\": %d, \"conns\": %d, \"app_id\": \"%s\", \"mesh_id\": \"%s\"}\n\r", timestamp, count, appID, Node.MeshID)) + } + } +} diff --git a/prscd/chirp/node_test.go b/prscd/chirp/node_test.go new file mode 100644 index 0000000..58a93a6 --- /dev/null +++ b/prscd/chirp/node_test.go @@ -0,0 +1,133 @@ +package chirp + +import ( + "testing" + + "github.com/yomorun/yomo/core/frame" + "yomo.run/prscd/util" +) + +// NewMockConnection creates a new WebSocketConnection +func NewMockConnection(sid string) Connection { + return &MockConnection{ + sid: sid, + } +} + +// MockConnection is a WebSocket connection +type MockConnection struct { + sid string +} + +// RemoteAddr returns the client network address. +func (c *MockConnection) RemoteAddr() string { + return c.sid +} + +// Write the data to the connection +func (c *MockConnection) Write(msg []byte) error { + return nil +} + +// SenderMock implement yomo.Source interface +type SenderMock struct{} + +func (s *SenderMock) Close() error { + return nil +} + +func (s *SenderMock) Connect() error { + return nil +} + +func (s *SenderMock) Write(tag frame.Tag, data []byte) error { + return nil +} + +func (s *SenderMock) SetErrorHandler(fn func(err error)) { +} + +func (s *SenderMock) SetReceiveHandler(fn func(tag frame.Tag, data []byte)) { +} + +func (s *SenderMock) Broadcast(tag uint32, data []byte) error { + return nil +} + +func (s *SenderMock) SetDataTag(tag frame.Tag) {} + +var channelName, appID, peerName string + +func init() { + CreateNodeSingleton() + + // mock YoMo Source + Node.sndr = &SenderMock{} + + channelName = "test_channel" + appID = "test_appid" + peerName = "test_peer" + + // error level + util.Log.SetLogLevel(2) +} + +func Test_node_AddPeer(t *testing.T) { + peer := Node.AddPeer(NewMockConnection(peerName), channelName, appID) + peer.Join(channelName) + + assert(t, peer != nil, "peer should not be nil") + assert(t, peer.AppID == appID, "peer.AppID should be %s, but got %s", appID, peer.AppID) + assert(t, peer.Channels != nil, "peer.Channels should not be nil") + assert(t, len(peer.Channels) == 1, "len(peer.Channels) should be 1, but got %d", len(peer.Channels)) + assert(t, peer.Channels[channelName] != nil, "peer.Channels[%s] should not be nil", channelName) + ch := Node.FindChannel(appID, channelName) + assert(t, ch != nil, "node.cdic[%s] should not be nil", appID+"|"+channelName) + assert(t, ch.getLen() > 0, "len(node.cdic[%s].peers) should > 0", appID+"|"+channelName) + p, ok := Node.pdic.Load(appID + "|" + peerName) + assert(t, ok, "node.pdic[%s] should not be nil", appID+"|"+peerName) + assert(t, p.(*Peer).Sid == peerName, "node.pdic[%s] should not be nil", appID+"|"+peerName) + + peer.Leave(channelName) + assert(t, len(peer.Channels) == 0, "len(peer.Channels) should be 1, but got %d", len(peer.Channels)) + ch = Node.FindChannel(appID, channelName) + assert(t, ch != nil, "node.cdic[%s] should not be nil", appID+"|"+channelName) + assert(t, ch.getLen() == 0, "len(node.cdic[%s].pdic) should be 0, but got %d", appID+"|"+channelName, ch.getLen()) + p, ok = Node.pdic.Load(appID + "|" + peerName) + assert(t, ok, "node.pdic[%s] should not be nil", appID+"|"+peerName) + assert(t, p.(*Peer).Sid == peerName, "node.pdic[%s] should not be nil", appID+"|"+peerName) + + peer.Disconnect() + ch = Node.FindChannel(appID, channelName) + assert(t, ch != nil, "node.cdic[%s] should not be nil", appID+"|"+channelName) + assert(t, ch.getLen() == 0, "len(node.cdic[%s].pdic) should be 0, but got %d", appID+"|"+channelName, ch.getLen()) + p, ok = Node.pdic.Load(appID + "|" + peerName) + assert(t, !ok, "node.pdic[%s] should be nil", appID+"|"+peerName) + assert(t, p == nil, "node.pdic[%s] should not be nil", appID+"|"+peerName) +} + +func assert(t *testing.T, condition bool, format string, args ...any) { + if !condition { + t.Errorf(format, args...) + } +} + +func BenchmarkPeerJoinAndLeave(b *testing.B) { + for i := 0; i < b.N; i++ { + peer := Node.AddPeer(NewMockConnection(peerName), channelName, appID) + peer.Join(channelName) + peer.Leave(channelName) + peer.Disconnect() + } +} + +func Test_node_AuthUser(t *testing.T) { + var wantAppID = "YOMO_APP" + gotAppID, gotOk := Node.AuthUser("kmJAUnCtkWbkNnhXYtZAGEJzGDGpFo1e1vkp6cm") + if gotAppID != wantAppID { + t.Errorf("node.AuthUser() gotAppID = %v, want %v", gotAppID, wantAppID) + } + if gotOk != true { + t.Errorf("node.AuthUser() gotOk = %v, want %v", gotOk, true) + } +} diff --git a/prscd/chirp/peer.go b/prscd/chirp/peer.go new file mode 100644 index 0000000..7de624a --- /dev/null +++ b/prscd/chirp/peer.go @@ -0,0 +1,150 @@ +package chirp + +import ( + "errors" + "io" + "sync" + + "github.com/vmihailenco/msgpack/v5" + "github.com/yomorun/psig" +) + +// Peer describes user on this node. +type Peer struct { + // Sid describes the unique id of this peer on this node, only used for backend. + Sid string + // Cid describes the unique id of this peer on who geo-distributed network, set by developer. + Cid string + // Channel describes the channel which this peer joined. + Channels map[string]*Channel + // conn is the connection of this peer. + conn Connection + mu sync.Mutex + // AppID is the id of the app which this peer belongs to. + AppID string +} + +// Join this peer to channel named `channelName`. +func (p *Peer) Join(channelName string) { + // find channel on this node, if not exist, create it. + c := Node.GetOrAddChannel(p.AppID, channelName) + + // add peer to this channel + c.AddPeer(p) + + // and this channel to peer's channel list + p.Channels[channelName] = c + + // ACK to peer has joined + p.NotifyBack(NewSigChannelJoined(channelName)) + + log.Info("[%s] ack peer.join_chanel:%s, cid=%s", p.Sid, c.UniqID, p.Cid) +} + +// NotifyBack to peer with message. +func (p *Peer) NotifyBack(sig *psig.Signalling) { + resp, err := msgpack.Marshal(sig) + if err != nil { + log.Error("msgpack marshal: %+v", err) + } + + p.mu.Lock() + defer p.mu.Unlock() + + err = p.conn.Write(resp) + + if err != nil { + log.Error("NotifyBack error: %+v", err) + } + log.Debug("[%s]\tSND>: %s", p.Sid, sig) +} + +// Leave a channel +func (p *Peer) Leave(channelName string) { + // remove channel from peer's channel list + p.mu.Lock() + delete(p.Channels, channelName) + p.mu.Unlock() + + // remove peer from channel's peer list + c := Node.FindChannel(p.AppID, channelName) + if c == nil { + log.Error("peer.Leave(): channel is nil. pid: %s, channel: %s", p.Sid, channelName) + return + } + + c.RemovePeer(p) + + // Notify others on this channel that this peer has left + c.Broadcast(NewSigPeerOffline(channelName, p)) + log.Info("[%s] peer.leave: %s", p.Sid, c.UniqID) +} + +// Disconnect clears resources of this peer when leave. +func (p *Peer) Disconnect() { + log.Info("[%s] peer.disconnect", p.Sid) + // wipe this peer from all channels joined before + for _, ch := range p.Channels { + p.Leave(ch.UniqID) + } + // wipe this peer from current node + Node.RemovePeer(p.AppID, p.Sid) +} + +// BroadcastToChannel will broadcast message to channel. +func (p *Peer) BroadcastToChannel(sig *psig.Signalling) { + sig.Cid = p.Cid + c := p.Channels[sig.Channel] + if c == nil { + log.Error("BroadcastToChannel: channel=%s is nil, should panic here", sig.Channel) + return + } + + c.Broadcast(sig) +} + +// HandleSignal handle message sent from connection. +func (p *Peer) HandleSignal(r io.Reader) error { + decoder := msgpack.NewDecoder(r) + sig := &psig.Signalling{} + if err := decoder.Decode(sig); err != nil { + log.Error("msgpack.decode err, ignore: %+v", err) + return err + } + + // p.Sid is the id of connection, set by backend. + sig.Sid = p.Sid + log.Debug("[%s] >RCV: %v", p.Sid, sig) + + if sig.Type == psig.SigControl { + // handle the Control Signalling + switch sig.OpCode { + case psig.OpChannelJoin: // `channel_join` signalling + // join channel + p.Join(sig.Channel) + case psig.OpState: // `peer_state` signalling + // Alice can notify Bob that her state has been updated, also, + // Bob can use this signalling to initialize or update Alice's state + if sig.Sid != "" && sig.Cid != "" { + // if peer sid and client id are both set, then update the client id of this peer + p.Cid = sig.Cid + log.Info("Peer: %s state new ClientID: %s", p.Sid, p.Cid) + } + p.BroadcastToChannel(sig) + case psig.OpPeerOffline: // `peer_offline` signalling + p.Leave(sig.Channel) + case psig.OpPeerOnline: // `peer_online` signalling + p.BroadcastToChannel(sig) + default: + log.Error("Unknown control opcode: %d", sig.OpCode) + } + } else if sig.Type == psig.SigData { + // handle the Data Signalling + p.BroadcastToChannel(sig) + } else { + log.Error("ILLEGAL sig.Type, should be `data` or `control`: %+v", sig) + return errors.New("ILLEGAL sig.Type, should be `data` or `control`") + } + + return nil +} diff --git a/prscd/chirp/sig.go b/prscd/chirp/sig.go new file mode 100644 index 0000000..9509cd6 --- /dev/null +++ b/prscd/chirp/sig.go @@ -0,0 +1,35 @@ +package chirp + +import "github.com/yomorun/psig" + +// NewSigPeerOnline create OpPeerOnline message. +func NewSigPeerOnline(chid string, p *Peer) *psig.Signalling { + return &psig.Signalling{ + Type: psig.SigControl, + OpCode: psig.OpPeerOnline, + Channel: chid, + Cid: p.Cid, + Sid: p.Sid, + } +} + +// NewSigPeerOffline create OpPeerOffline message. +func NewSigPeerOffline(chid string, p *Peer) *psig.Signalling { + return &psig.Signalling{ + Type: psig.SigControl, + OpCode: psig.OpPeerOffline, + Channel: chid, + Cid: p.Cid, + Sid: p.Sid, + } +} + +// NewSigChannelJoined create OpChannelJoin message. +func NewSigChannelJoined(chName string) *psig.Signalling { + return &psig.Signalling{ + Type: psig.SigControl, + OpCode: psig.OpChannelJoin, + Channel: chName, + MeshID: Node.MeshID, + } +} diff --git a/prscd/cmd/prscd/epoll.go b/prscd/cmd/prscd/epoll.go new file mode 100644 index 0000000..0bb4351 --- /dev/null +++ b/prscd/cmd/prscd/epoll.go @@ -0,0 +1,86 @@ +//go:build ignore_vet + +package main + +import ( + "log" + "net" + "reflect" + "sync" + "syscall" + + "golang.org/x/sys/unix" +) + +type epoll struct { + fd int + connections map[int]net.Conn + lock *sync.RWMutex +} + +func MkEpoll() (*epoll, error) { + fd, err := EpollCreate1(0) + if err != nil { + return nil, err + } + return &epoll{ + fd: fd, + lock: &sync.RWMutex{}, + connections: make(map[int]net.Conn), + }, nil +} + +func (e *epoll) Add(conn net.Conn) error { + // Extract file descriptor associated with the connection + fd := websocketFD(conn) + err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)}) + if err != nil { + return err + } + e.lock.Lock() + defer e.lock.Unlock() + e.connections[fd] = conn + if len(e.connections)%100 == 0 { + log.Printf("Total number of connections: %v", len(e.connections)) + } + return nil +} + +func (e *epoll) Remove(conn net.Conn) error { + fd := websocketFD(conn) + err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil) + if err != nil { + return err + } + e.lock.Lock() + defer e.lock.Unlock() + delete(e.connections, fd) + if len(e.connections)%100 == 0 { + log.Printf("Total number of connections: %v", len(e.connections)) + } + return nil +} + +func (e *epoll) Wait() ([]net.Conn, error) { + events := make([]unix.EpollEvent, 100) + n, err := unix.EpollWait(e.fd, events, 100) + if err != nil { + return nil, err + } + e.lock.RLock() + defer e.lock.RUnlock() + var connections []net.Conn + for i := 0; i < n; i++ { + conn := e.connections[int(events[i].Fd)] + connections = append(connections, conn) + } + return connections, nil +} + +func websocketFD(conn net.Conn) int { + tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn") + fdVal := tcpConn.FieldByName("fd") + pfdVal := reflect.Indirect(fdVal).FieldByName("pfd") + + return int(pfdVal.FieldByName("Sysfd").Int()) +} diff --git a/prscd/cmd/prscd/main.go b/prscd/cmd/prscd/main.go new file mode 100644 index 0000000..e990496 --- /dev/null +++ b/prscd/cmd/prscd/main.go @@ -0,0 +1,169 @@ +// package main start the service +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "time" + + "yomo.run/prscd/chirp" + "yomo.run/prscd/util" + "yomo.run/prscd/websocket" + "yomo.run/prscd/webtransport" + + "github.com/joho/godotenv" + "github.com/yomorun/yomo" + "github.com/yomorun/yomo/pkg/config" + "github.com/yomorun/yomo/pkg/trace" +) + +var log = util.Log + +func main() { + err := godotenv.Load(".env") + if err != nil { + log.Fatal(err) + } + + chirp.CreateNodeSingleton() + + // DEBUG env indicates development mode, verbose log + if os.Getenv("DEBUG") == "true" { + chirp.Node.Env = "development" + log.SetLogLevel(util.DEBUG) + log.Debug("IN DEVELOPMENT ENV") + } + + // AS_YOMO_ZIPPER env indicates start YOMO Zipper in this process + if os.Getenv("AS_YOMO_ZIPPER") == "true" { + go startYomoZipper() + // sleep 2 seconds to wait for YoMo Zipper ready + time.Sleep(2 * time.Second) + } else { + log.Debug("Skip start YOMO Zipper") + } + + // YOMO_ZIPPER env indicates the endpoint of YoMo Zipper to connect + log.Debug("connect to YoMo Zipper: %s", os.Getenv("YOMO_ZIPPER")) + + // add open tracing + tp, shutdown, err := trace.NewTracerProviderWithJaeger("prscd") + if err == nil { + log.Info("[%s] 🛰 trace enabled", "prscd") + } + defer shutdown(context.Background()) + + // sndr is sender to send data to other prscd nodes by YoMo + sndr := yomo.NewSource( + os.Getenv("YOMO_SNDR_NAME"), + os.Getenv("YOMO_ZIPPER"), + yomo.WithCredential(os.Getenv("YOMO_CREDENTIAL")), + yomo.WithTracerProvider(tp), + ) + + // rcvr is receiver to receive data from other prscd nodes by YoMo + rcvr := yomo.NewStreamFunction( + os.Getenv("YOMO_RCVR_NAME"), + os.Getenv("YOMO_ZIPPER"), + yomo.WithSfnCredential(os.Getenv("YOMO_CREDENTIAL")), + yomo.WithSfnTracerProvider(tp), + ) + + // connect to YoMo + chirp.Node.ConnectToYoMo(sndr, rcvr) + + // default addr and port listening + addr := "0.0.0.0:443" + if os.Getenv("PORT") != "" { + addr = fmt.Sprintf("0.0.0.0:%s", os.Getenv("PORT")) + } + + // load TLS cert and key, halt if error occurs, + // this helped developers to find out TLS related issues asap. + config, err := loadTLS(os.Getenv("CERT_FILE"), os.Getenv("KEY_FILE"), os.Getenv("DOMAIN")) + if err != nil { + log.Fatal(err) + os.Exit(-2) + } + + // start WebSocket listener + go websocket.ListenAndServe(addr, config) + + // start WebTransport listener + go webtransport.ListenAndServe(addr, config) + + // start Probe Server for AWS health check + go startProbeServer(61226) + + // Ctrl-C or kill graceful shutdown + // - `kill -SIGUSR1 ` customize + // - `kill -SIGTERM ` graceful shutdown + // - `kill -SIGUSR2 ` inspect golang GC + log.Info("PID: %d", os.Getpid()) + // write pid to ./prscd.pid, overwrite if exists + pidFile := "./prscd.pid" + f, err := os.OpenFile(pidFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + log.Fatal(err) + } + defer f.Close() + _, err = f.WriteString(fmt.Sprintf("%d", os.Getpid())) + if err != nil { + log.Fatal(err) + } + + log.Debug("Prscd Dev Server is running on https://%s:%s/v1", os.Getenv("DOMAIN"), os.Getenv("PORT")) + + c := make(chan os.Signal, 1) + registerSignal(c) +} + +func startYomoZipper() { + conf, err := config.ParseConfigFile("./yomo.yaml") + if err != nil { + log.Fatal(err) + } + log.Debug("integrated YoMo config: %v", conf) + log.Debug("integrated YoMo zipper: %s", fmt.Sprintf("%s:%d", conf.Host, conf.Port)) + + zipper, err := yomo.NewZipper(conf.Name, conf.Functions, conf.Downstreams) + if err != nil { + log.Fatal(err) + } + + err = zipper.ListenAndServe(context.Background(), fmt.Sprintf("%s:%d", conf.Host, conf.Port)) + if err != nil { + log.Fatal(err) + } +} + +func loadTLS(certFile, keyFile, domain string) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + + // check if TLS cert is expired + // Parse the X.509 certificate + parsedCert, err := x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return nil, err + } + + // Get the expiration date + expirationDate := parsedCert.NotAfter + log.Debug("check TLS cert expiration date: %s", expirationDate) + + // determine if the certificate is expired + if time.Now().After(expirationDate) { + return nil, fmt.Errorf("tls cert is expired") + } + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"http/1.1", "h2", "h3", "http/0.9", "http/1.0", "spdy/1", "spdy/2", "spdy/3"}, + }, nil +} diff --git a/prscd/cmd/prscd/ossignal_notwin.go b/prscd/cmd/prscd/ossignal_notwin.go new file mode 100644 index 0000000..0ade52c --- /dev/null +++ b/prscd/cmd/prscd/ossignal_notwin.go @@ -0,0 +1,35 @@ +//go:build !windows +// +build !windows + +package main + +import ( + "fmt" + "os" + "os/signal" + "runtime" + "syscall" + + "yomo.run/prscd/chirp" +) + +func registerSignal(c chan os.Signal) { + signal.Notify(c, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGUSR1, syscall.SIGINT) + log.Info("Listening SIGUSR1, SIGUSR2, SIGTERM/SIGINT...") + for p1 := range c { + log.Info("Received signal: %s", p1) + if p1 == syscall.SIGTERM || p1 == syscall.SIGINT { + log.Info("graceful shutting down ... %s", p1) + os.Exit(0) + } else if p1 == syscall.SIGUSR2 { + // kill -SIGUSR2 will write ystat logs to /tmp/conns.log + chirp.DumpConnectionsState() + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Printf("\tNumGC = %v\n", m.NumGC) + } else if p1 == syscall.SIGUSR1 { + log.Info("SIGUSR1") + chirp.DumpNodeState() + } + } +} diff --git a/prscd/cmd/prscd/ossignal_windows.go b/prscd/cmd/prscd/ossignal_windows.go new file mode 100644 index 0000000..2954e77 --- /dev/null +++ b/prscd/cmd/prscd/ossignal_windows.go @@ -0,0 +1,19 @@ +//go:build windows +// +build windows + +package main + +import ( + "os" + "os/signal" + "syscall" +) + +func registerSignal(c chan os.Signal) { + signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) + log.Info("Listening SIGTERM/SIGINT...") + for p1 := range c { + log.Info("Received signal: %s", p1) + os.Exit(0) + } +} diff --git a/prscd/cmd/prscd/probe-server.go b/prscd/cmd/prscd/probe-server.go new file mode 100644 index 0000000..757e1df --- /dev/null +++ b/prscd/cmd/prscd/probe-server.go @@ -0,0 +1,39 @@ +package main + +// implement a tcp server that accept netcat request, use as a probe server for AWS health check +import ( + "fmt" + "io" + "net" +) + +// startProbeServer create a tcp listener on port, accept command: +// nc -z -v -w1 lo.yomo.dev 61226 2>&1 |grep succeeded +func startProbeServer(port int) { + // launch TCP server + listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) + + if err != nil { + log.Error("can not start probe server: %v", err) + return + } + + log.Info("Listening for connections on %s", listener.Addr().String()) + + for { + conn, err := listener.Accept() + if err != nil { + log.Error("Error accepting connection from client: %s", err) + } else { + go (func(conn net.Conn) { + _, err := io.ReadAll(conn) + if err != nil { + log.Error("probe server process error: %v", err) + conn.Close() + } else { + log.Inspect("probed from %s", conn.RemoteAddr().String()) + } + })(conn) + } + } +} diff --git a/prscd/cmd/prscd/tls_test.go b/prscd/cmd/prscd/tls_test.go new file mode 100644 index 0000000..34039a8 --- /dev/null +++ b/prscd/cmd/prscd/tls_test.go @@ -0,0 +1,144 @@ +package main + +import ( + "os" + "testing" +) + +func TestLoadExpiredTLSCert(t *testing.T) { + // create temporary cert and key files + certFile, err := os.CreateTemp("", "cert") + if err != nil { + t.Fatal(err) + } + defer os.Remove(certFile.Name()) + + keyFile, err := os.CreateTemp("", "key") + if err != nil { + t.Fatal(err) + } + defer os.Remove(keyFile.Name()) + + // write test cert and key to files + certData := []byte(`-----BEGIN CERTIFICATE----- +MIIEUTCCAzmgAwIBAgISA5kLdtgzs2UyewrCom3FOA/KMA0GCSqGSIb3DQEBCwUA +MDIxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MQswCQYDVQQD +EwJSMzAeFw0yMzA1MTYxNDA0MzZaFw0yMzA4MTQxNDA0MzVaMBYxFDASBgNVBAMT +C2xvLnlvbW8uZGV2MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEit5O491bdDXk +HTdVuUukyicP8rHPva+/6Brl/RJKQC45OJr3RhLS8mC7uAkZh2RZIYiBgxfh5Yia +gJRltWsMv6OCAkYwggJCMA4GA1UdDwEB/wQEAwIHgDAdBgNVHSUEFjAUBggrBgEF +BQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNVHQ4EFgQUVZ6v5JE+/c5q +EwKKkQRpZzRcOZ4wHwYDVR0jBBgwFoAUFC6zF7dYVsuuUAlA5h+vnYsUwsYwVQYI +KwYBBQUHAQEESTBHMCEGCCsGAQUFBzABhhVodHRwOi8vcjMuby5sZW5jci5vcmcw +IgYIKwYBBQUHMAKGFmh0dHA6Ly9yMy5pLmxlbmNyLm9yZy8wFgYDVR0RBA8wDYIL +bG8ueW9tby5kZXYwTAYDVR0gBEUwQzAIBgZngQwBAgEwNwYLKwYBBAGC3xMBAQEw +KDAmBggrBgEFBQcCARYaaHR0cDovL2Nwcy5sZXRzZW5jcnlwdC5vcmcwggEEBgor +BgEEAdZ5AgQCBIH1BIHyAPAAdQC3Pvsk35xNunXyOcW6WPRsXfxCz3qfNcSeHQmB +Je20mQAAAYglF33lAAAEAwBGMEQCIAX4fiR1DF/Ww366U8DeILAioTy/mn5+8ljH +oK+aIM08AiAIPDHkniIgqnr3mfoJfc8YSC0Fu/CXDHn4wNE42Za3+AB3AHoyjFTY +ty22IOo44FIe6YQWcDIThU070ivBOlejUutSAAABiCUXffkAAAQDAEgwRgIhAMKr +EGcEZV3TyKg7Sq5sHe35kO5gnaHSPSAj2ysv2m7WAiEA1rK+v9d6D2qhhzmLyUbt +e2NrSSopvL2MmZ4xjiVmIbQwDQYJKoZIhvcNAQELBQADggEBAIviOYpUJKS4h6wo +oD81il5zLh+KLQdKm3OrM6j7BQuemKqnf4Fe806qd9ovE7AGbuz4gq+jpW5Q295v +mPHOM+qJvZnAVCWgX5PHNifE+Fuo3VCdpaqXhChMENcviQfPSzlY1ZPt2iYj9JQ2 +hGvooNhMlfEi2gF9FBLdhFyUVg/N8zHa1q1vMnGVgvchrWUOPIxptfptQmBk1F7K +ExvD9FzL/Fd8hJw9XSXHJOd0ngLjTLrPsto+3V4T82RsTUQbKlKWqNTL57j+jxem +75y+eNGe07KBcSB48GT1avXe2KCFxv7XhMdVligs5RkWFRYYR31WOhMijEOzvcIR +yafxQEc= +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFFjCCAv6gAwIBAgIRAJErCErPDBinU/bWLiWnX1owDQYJKoZIhvcNAQELBQAw +TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh +cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMjAwOTA0MDAwMDAw +WhcNMjUwOTE1MTYwMDAwWjAyMQswCQYDVQQGEwJVUzEWMBQGA1UEChMNTGV0J3Mg +RW5jcnlwdDELMAkGA1UEAxMCUjMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQC7AhUozPaglNMPEuyNVZLD+ILxmaZ6QoinXSaqtSu5xUyxr45r+XXIo9cP +R5QUVTVXjJ6oojkZ9YI8QqlObvU7wy7bjcCwXPNZOOftz2nwWgsbvsCUJCWH+jdx +sxPnHKzhm+/b5DtFUkWWqcFTzjTIUu61ru2P3mBw4qVUq7ZtDpelQDRrK9O8Zutm +NHz6a4uPVymZ+DAXXbpyb/uBxa3Shlg9F8fnCbvxK/eG3MHacV3URuPMrSXBiLxg +Z3Vms/EY96Jc5lP/Ooi2R6X/ExjqmAl3P51T+c8B5fWmcBcUr2Ok/5mzk53cU6cG +/kiFHaFpriV1uxPMUgP17VGhi9sVAgMBAAGjggEIMIIBBDAOBgNVHQ8BAf8EBAMC +AYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMBIGA1UdEwEB/wQIMAYB +Af8CAQAwHQYDVR0OBBYEFBQusxe3WFbLrlAJQOYfr52LFMLGMB8GA1UdIwQYMBaA +FHm0WeZ7tuXkAXOACIjIGlj26ZtuMDIGCCsGAQUFBwEBBCYwJDAiBggrBgEFBQcw +AoYWaHR0cDovL3gxLmkubGVuY3Iub3JnLzAnBgNVHR8EIDAeMBygGqAYhhZodHRw +Oi8veDEuYy5sZW5jci5vcmcvMCIGA1UdIAQbMBkwCAYGZ4EMAQIBMA0GCysGAQQB +gt8TAQEBMA0GCSqGSIb3DQEBCwUAA4ICAQCFyk5HPqP3hUSFvNVneLKYY611TR6W +PTNlclQtgaDqw+34IL9fzLdwALduO/ZelN7kIJ+m74uyA+eitRY8kc607TkC53wl +ikfmZW4/RvTZ8M6UK+5UzhK8jCdLuMGYL6KvzXGRSgi3yLgjewQtCPkIVz6D2QQz +CkcheAmCJ8MqyJu5zlzyZMjAvnnAT45tRAxekrsu94sQ4egdRCnbWSDtY7kh+BIm +lJNXoB1lBMEKIq4QDUOXoRgffuDghje1WrG9ML+Hbisq/yFOGwXD9RiX8F6sw6W4 +avAuvDszue5L3sz85K+EC4Y/wFVDNvZo4TYXao6Z0f+lQKc0t8DQYzk1OXVu8rp2 +yJMC6alLbBfODALZvYH7n7do1AZls4I9d1P4jnkDrQoxB3UqQ9hVl3LEKQ73xF1O +yK5GhDDX8oVfGKF5u+decIsH4YaTw7mP3GFxJSqv3+0lUFJoi5Lc5da149p90Ids +hCExroL1+7mryIkXPeFM5TgO9r0rvZaBFOvV2z0gp35Z0+L4WPlbuEjN/lxPFin+ +HlUjr8gRsI3qfJOQFy/9rKIJR0Y/8Omwt/8oTWgy1mdeHmmjk7j1nYsvC9JSQ6Zv +MldlTTKB3zhThV1+XWYp6rjd5JW1zbVWEkLNxE7GJThEUG3szgBVGP7pSWTUTsqX +nLRbwHOoq7hHwg== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFYDCCBEigAwIBAgIQQAF3ITfU6UK47naqPGQKtzANBgkqhkiG9w0BAQsFADA/ +MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT +DkRTVCBSb290IENBIFgzMB4XDTIxMDEyMDE5MTQwM1oXDTI0MDkzMDE4MTQwM1ow +TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh +cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwggIiMA0GCSqGSIb3DQEB +AQUAA4ICDwAwggIKAoICAQCt6CRz9BQ385ueK1coHIe+3LffOJCMbjzmV6B493XC +ov71am72AE8o295ohmxEk7axY/0UEmu/H9LqMZshftEzPLpI9d1537O4/xLxIZpL +wYqGcWlKZmZsj348cL+tKSIG8+TA5oCu4kuPt5l+lAOf00eXfJlII1PoOK5PCm+D +LtFJV4yAdLbaL9A4jXsDcCEbdfIwPPqPrt3aY6vrFk/CjhFLfs8L6P+1dy70sntK +4EwSJQxwjQMpoOFTJOwT2e4ZvxCzSow/iaNhUd6shweU9GNx7C7ib1uYgeGJXDR5 +bHbvO5BieebbpJovJsXQEOEO3tkQjhb7t/eo98flAgeYjzYIlefiN5YNNnWe+w5y +sR2bvAP5SQXYgd0FtCrWQemsAXaVCg/Y39W9Eh81LygXbNKYwagJZHduRze6zqxZ +Xmidf3LWicUGQSk+WT7dJvUkyRGnWqNMQB9GoZm1pzpRboY7nn1ypxIFeFntPlF4 +FQsDj43QLwWyPntKHEtzBRL8xurgUBN8Q5N0s8p0544fAQjQMNRbcTa0B7rBMDBc +SLeCO5imfWCKoqMpgsy6vYMEG6KDA0Gh1gXxG8K28Kh8hjtGqEgqiNx2mna/H2ql +PRmP6zjzZN7IKw0KKP/32+IVQtQi0Cdd4Xn+GOdwiK1O5tmLOsbdJ1Fu/7xk9TND +TwIDAQABo4IBRjCCAUIwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYw +SwYIKwYBBQUHAQEEPzA9MDsGCCsGAQUFBzAChi9odHRwOi8vYXBwcy5pZGVudHJ1 +c3QuY29tL3Jvb3RzL2RzdHJvb3RjYXgzLnA3YzAfBgNVHSMEGDAWgBTEp7Gkeyxx ++tvhS5B1/8QVYIWJEDBUBgNVHSAETTBLMAgGBmeBDAECATA/BgsrBgEEAYLfEwEB +ATAwMC4GCCsGAQUFBwIBFiJodHRwOi8vY3BzLnJvb3QteDEubGV0c2VuY3J5cHQu +b3JnMDwGA1UdHwQ1MDMwMaAvoC2GK2h0dHA6Ly9jcmwuaWRlbnRydXN0LmNvbS9E +U1RST09UQ0FYM0NSTC5jcmwwHQYDVR0OBBYEFHm0WeZ7tuXkAXOACIjIGlj26Ztu +MA0GCSqGSIb3DQEBCwUAA4IBAQAKcwBslm7/DlLQrt2M51oGrS+o44+/yQoDFVDC +5WxCu2+b9LRPwkSICHXM6webFGJueN7sJ7o5XPWioW5WlHAQU7G75K/QosMrAdSW +9MUgNTP52GE24HGNtLi1qoJFlcDyqSMo59ahy2cI2qBDLKobkx/J3vWraV0T9VuG +WCLKTVXkcGdtwlfFRjlBz4pYg1htmf5X6DYO8A4jqv2Il9DjXA6USbW1FzXSLr9O +he8Y4IWS6wY7bCkjCWDcRQJMEhg76fsO3txE+FiYruq9RUWhiF1myv4Q6W+CyBFC +Dfvp7OOGAN6dEOM4+qR9sdjoSYKEBpsr6GtPAQw4dy753ec5 +-----END CERTIFICATE-----`) + if _, err := certFile.Write(certData); err != nil { + t.Fatal(err) + } + + keyData := []byte(`-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgyV0r+quRldFgyg5U +75mr2caKIKGxnbfn4c5J+7eurkihRANCAASK3k7j3Vt0NeQdN1W5S6TKJw/ysc+9 +r7/oGuX9EkpALjk4mvdGEtLyYLu4CRmHZFkhiIGDF+HliJqAlGW1awy/ +-----END PRIVATE KEY-----`) + if _, err := keyFile.Write(keyData); err != nil { + t.Fatal(err) + } + + // call loadTLS with test cert and key files + _, err = loadTLS(certFile.Name(), keyFile.Name(), "lo.yomo.dev") + + // check if tls cert is expired + if err == nil { + t.Fatal("should return error if tls cert is expired") + } else { + if err.Error() != "tls cert is expired" { + t.Fatal("should return error of tls cert is expired") + } + } +} + +func TestLoadCurrentTLSCert(t *testing.T) { + // call loadTLS with test cert and key files + _, err := loadTLS("../../lo.yomo.dev.cert", "../../lo.yomo.dev.key", "lo.yomo.dev") + + // check if tls cert is expired + if err != nil { + t.Fatalf("should return error if tls cert is expired, err: %v", err) + } +} diff --git a/prscd/deploy/env.prod b/prscd/deploy/env.prod new file mode 100644 index 0000000..8403f72 --- /dev/null +++ b/prscd/deploy/env.prod @@ -0,0 +1,10 @@ +DEBUG=true +PORT=443 +MESH_ID=PRSCD_NODE + +YOMO_ZIPPER=127.0.0.1:9000 +YOMO_SNDR_NAME=prscd-v2-sender +YOMO_RCVR_NAME=prscd-v2-receiver + +CERT_FILE=/home/ubuntu/prscd/realtime.example.com.cert +KEY_FILE=/home/ubuntu/prscd/realtime.example.com.key diff --git a/prscd/deploy/prscd.service b/prscd/deploy/prscd.service new file mode 100644 index 0000000..0d83ec8 --- /dev/null +++ b/prscd/deploy/prscd.service @@ -0,0 +1,29 @@ +[Unit] +Description=Start Prscd Service +Documentation=https://presence.js.org/ +After=network.target + +[Service] +Type=simple +User=ubuntu +ExecStart=/home/ubuntu/prscd/prscd +WorkingDirectory=/home/ubuntu/prscd +Restart=on-failure +LimitNOFILE=1000000 +LimitNPROC=102400 + +Restart=on-failure +RestartSec=3 + +AmbientCapabilities=CAP_NET_BIND_SERVICE + +EnvironmentFile=/home/ubuntu/prscd/env.prod + +StandardOutput=append:/var/log/prscd.log +StandardError=append:/var/log/prscd.err + +[Install] +WantedBy=multi-user.target + +[Install] +WantedBy=sockets.target diff --git a/prscd/deploy/yomo.yaml b/prscd/deploy/yomo.yaml new file mode 100644 index 0000000..7ac758a --- /dev/null +++ b/prscd/deploy/yomo.yaml @@ -0,0 +1,5 @@ +name: yomo_prscd_bridge +host: 0.0.0.0 +port: 9000 +functions: + - name: prscd-v2-receiver diff --git a/prscd/go.mod b/prscd/go.mod new file mode 100644 index 0000000..b52372d --- /dev/null +++ b/prscd/go.mod @@ -0,0 +1,43 @@ +module yomo.run/prscd + +go 1.20 + +require ( + github.com/gobwas/ws v1.3.0 + github.com/joho/godotenv v1.5.1 + github.com/quic-go/qpack v0.4.0 + github.com/quic-go/quic-go v0.38.1 + github.com/vmihailenco/msgpack/v5 v5.3.5 + github.com/yomorun/psig v0.0.0-20230912060731-4cf105aaae0e + github.com/yomorun/yomo v1.15.1 + golang.org/x/sys v0.12.0 +) + +require ( + github.com/caarlos0/env/v6 v6.10.1 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/gobwas/httphead v0.1.0 // indirect + github.com/gobwas/pool v0.2.1 // indirect + github.com/golang/mock v1.6.0 // indirect + github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/matoous/go-nanoid/v2 v2.0.0 // indirect + github.com/onsi/ginkgo/v2 v2.11.0 // indirect + github.com/quic-go/qtls-go1-20 v0.3.3 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/yomorun/y3 v1.0.5 // indirect + go.opentelemetry.io/otel v1.17.0 // indirect + go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect + go.opentelemetry.io/otel/metric v1.17.0 // indirect + go.opentelemetry.io/otel/sdk v1.17.0 // indirect + go.opentelemetry.io/otel/trace v1.17.0 // indirect + golang.org/x/crypto v0.12.0 // indirect + golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect + golang.org/x/mod v0.12.0 // indirect + golang.org/x/net v0.14.0 // indirect + golang.org/x/tools v0.12.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/prscd/go.sum b/prscd/go.sum new file mode 100644 index 0000000..5ae17a1 --- /dev/null +++ b/prscd/go.sum @@ -0,0 +1,126 @@ +github.com/caarlos0/env/v6 v6.10.1 h1:t1mPSxNpei6M5yAeu1qtRdPAK29Nbcf/n3G7x+b3/II= +github.com/caarlos0/env/v6 v6.10.1/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.3.0 h1:sbeU3Y4Qzlb+MOzIe6mQGf7QR4Hkv6ZD0qhGkBFL2O0= +github.com/gobwas/ws v1.3.0/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 h1:hR7/MlvK23p6+lIw9SN1TigNLn9ZnF3W4SYRKq2gAHs= +github.com/google/pprof v0.0.0-20230602150820-91b7bce49751/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/matoous/go-nanoid v1.5.0/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= +github.com/matoous/go-nanoid/v2 v2.0.0 h1:d19kur2QuLeHmJBkvYkFdhFBzLoo1XVm2GgTpL+9Tj0= +github.com/matoous/go-nanoid/v2 v2.0.0/go.mod h1:FtS4aGPVfEkxKxhdWPAspZpZSh1cOjtM7Ej/So3hR0g= +github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= +github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= +github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc= +github.com/onsi/gomega v1.27.8/go.mod h1:2J8vzI/s+2shY9XHRApDkdgPo1TKT7P2u6fXeJKFnNQ= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= +github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= +github.com/quic-go/qtls-go1-20 v0.3.3 h1:17/glZSLI9P9fDAeyCHBFSWSqJcwx1byhLwP5eUIDCM= +github.com/quic-go/qtls-go1-20 v0.3.3/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k= +github.com/quic-go/quic-go v0.38.1 h1:M36YWA5dEhEeT+slOu/SwMEucbYd0YFidxG3KlGPZaE= +github.com/quic-go/quic-go v0.38.1/go.mod h1:ijnZM7JsFIkp4cRyjxJNIzdSfCLmUMg9wdyhGmg+SN4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/yomorun/psig v0.0.0-20230912060731-4cf105aaae0e h1:zRCoZmeeBj6XWd05Bn9/kAUA2/SPBvxLptqd9Ftwhgs= +github.com/yomorun/psig v0.0.0-20230912060731-4cf105aaae0e/go.mod h1:2Hwf3MF1/QELDJjCzB6FtxjyYDHT1Pia1Lc9cD7Y8Ys= +github.com/yomorun/y3 v1.0.5 h1:1qoZrDX+47hgU2pVJgoCEpeeXEOqml/do5oHjF9Wef4= +github.com/yomorun/y3 v1.0.5/go.mod h1:+zwvZrKHe8D3fTMXNTsUsZXuI+kYxv3LRA2fSJEoWbo= +github.com/yomorun/yomo v1.15.1 h1:4w8ZQCtqM/dLogMbfFxO6cySMjJN69g4fzdM7Y0JYcE= +github.com/yomorun/yomo v1.15.1/go.mod h1:dFwExCKXaB6kWEMkktbeWaUCyfBiGY+yZAxr1J8dZ2Q= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= +go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= +go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4= +go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI= +go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= +go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= +go.opentelemetry.io/otel/sdk v1.17.0 h1:FLN2X66Ke/k5Sg3V623Q7h7nt3cHXaW1FOvKKrW0IpE= +go.opentelemetry.io/otel/sdk v1.17.0/go.mod h1:U87sE0f5vQB7hwUoW98pW5Rz4ZDuCFBZFNUBlSgmDFQ= +go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= +go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss= +golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/prscd/lo.yomo.dev.cert b/prscd/lo.yomo.dev.cert new file mode 100644 index 0000000..b46199d --- /dev/null +++ b/prscd/lo.yomo.dev.cert @@ -0,0 +1,85 @@ +-----BEGIN CERTIFICATE----- +MIIEFzCCAv+gAwIBAgISBHUQ0HXpRSNLmFy745t55FveMA0GCSqGSIb3DQEBCwUA +MDIxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MQswCQYDVQQD +EwJSMzAeFw0yMzA5MDEwMTAxMDVaFw0yMzExMzAwMTAxMDRaMBYxFDASBgNVBAMT +C2xvLnlvbW8uZGV2MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEs7GIHXcCCisL +IOwR+5kfjffpggp9qOnLbNYySSdVi/+GOg/qx0et3lzbBzdXstmGq/4buMQEaieR +y7ZcetQIuKOCAgwwggIIMA4GA1UdDwEB/wQEAwIHgDAdBgNVHSUEFjAUBggrBgEF +BQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNVHQ4EFgQUcOQH3eWQ3D+c +H6JEgEmDGA6vRIkwHwYDVR0jBBgwFoAUFC6zF7dYVsuuUAlA5h+vnYsUwsYwVQYI +KwYBBQUHAQEESTBHMCEGCCsGAQUFBzABhhVodHRwOi8vcjMuby5sZW5jci5vcmcw +IgYIKwYBBQUHMAKGFmh0dHA6Ly9yMy5pLmxlbmNyLm9yZy8wFgYDVR0RBA8wDYIL +bG8ueW9tby5kZXYwEwYDVR0gBAwwCjAIBgZngQwBAgEwggEDBgorBgEEAdZ5AgQC +BIH0BIHxAO8AdQC3Pvsk35xNunXyOcW6WPRsXfxCz3qfNcSeHQmBJe20mQAAAYpO +ePsuAAAEAwBGMEQCIB1DPZBrPJbcSoUrq1QIR/2hrQp/JCEmm5CWXKYaXbESAiAj +SnwNrjyRwcvvcduBwLWoQZa4iM5chv3XBphaB1gTzgB2AOg+0No+9QY1MudXKLyJ +a8kD08vREWvs62nhd31tBr1uAAABik54+0cAAAQDAEcwRQIhAJ9ovTth28cAdaAL +FPyMEq68/q2nFkpa8ptrznD9DX++AiAtYSFUcZUl5uOpHFMP5IiH9WUKcUBlEv57 +7q0zp3uf5jANBgkqhkiG9w0BAQsFAAOCAQEArk3djld1DPq1snMh/9sRgNc9ff7n +rNzUS3dupTUBkloNmYxbKIwDVmM0+QVgahgOu5EfKJjFcVfyEf8+hZ7qc9n/ZYaB +QyVPeYWoXN1z+lzv88QqXu4mbUO3PIU7QmWaGEYbfNMQ7TGVQSAa1D5AaiFtcnW6 +eaUKnp1A7hWTHCUR7ZDa6bYC5WySaxvfdOxoaPpg0kY6zDVp1b0bl1I6NuzwbvWB ++r94XxEFB9dHoYJoNZrUfe9kHmsNUjVpxebzWLmGXv9lcJ8378J3J5Sszvq3nz3Y +E8uYUcL3n+z2jj1rNYkHulCo9UGVCbYyvq5HNMAiqWIV6BdGZab/IaoLzg== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFFjCCAv6gAwIBAgIRAJErCErPDBinU/bWLiWnX1owDQYJKoZIhvcNAQELBQAw +TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh +cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMjAwOTA0MDAwMDAw +WhcNMjUwOTE1MTYwMDAwWjAyMQswCQYDVQQGEwJVUzEWMBQGA1UEChMNTGV0J3Mg +RW5jcnlwdDELMAkGA1UEAxMCUjMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQC7AhUozPaglNMPEuyNVZLD+ILxmaZ6QoinXSaqtSu5xUyxr45r+XXIo9cP +R5QUVTVXjJ6oojkZ9YI8QqlObvU7wy7bjcCwXPNZOOftz2nwWgsbvsCUJCWH+jdx +sxPnHKzhm+/b5DtFUkWWqcFTzjTIUu61ru2P3mBw4qVUq7ZtDpelQDRrK9O8Zutm +NHz6a4uPVymZ+DAXXbpyb/uBxa3Shlg9F8fnCbvxK/eG3MHacV3URuPMrSXBiLxg +Z3Vms/EY96Jc5lP/Ooi2R6X/ExjqmAl3P51T+c8B5fWmcBcUr2Ok/5mzk53cU6cG +/kiFHaFpriV1uxPMUgP17VGhi9sVAgMBAAGjggEIMIIBBDAOBgNVHQ8BAf8EBAMC +AYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMBIGA1UdEwEB/wQIMAYB +Af8CAQAwHQYDVR0OBBYEFBQusxe3WFbLrlAJQOYfr52LFMLGMB8GA1UdIwQYMBaA +FHm0WeZ7tuXkAXOACIjIGlj26ZtuMDIGCCsGAQUFBwEBBCYwJDAiBggrBgEFBQcw +AoYWaHR0cDovL3gxLmkubGVuY3Iub3JnLzAnBgNVHR8EIDAeMBygGqAYhhZodHRw +Oi8veDEuYy5sZW5jci5vcmcvMCIGA1UdIAQbMBkwCAYGZ4EMAQIBMA0GCysGAQQB +gt8TAQEBMA0GCSqGSIb3DQEBCwUAA4ICAQCFyk5HPqP3hUSFvNVneLKYY611TR6W +PTNlclQtgaDqw+34IL9fzLdwALduO/ZelN7kIJ+m74uyA+eitRY8kc607TkC53wl +ikfmZW4/RvTZ8M6UK+5UzhK8jCdLuMGYL6KvzXGRSgi3yLgjewQtCPkIVz6D2QQz +CkcheAmCJ8MqyJu5zlzyZMjAvnnAT45tRAxekrsu94sQ4egdRCnbWSDtY7kh+BIm +lJNXoB1lBMEKIq4QDUOXoRgffuDghje1WrG9ML+Hbisq/yFOGwXD9RiX8F6sw6W4 +avAuvDszue5L3sz85K+EC4Y/wFVDNvZo4TYXao6Z0f+lQKc0t8DQYzk1OXVu8rp2 +yJMC6alLbBfODALZvYH7n7do1AZls4I9d1P4jnkDrQoxB3UqQ9hVl3LEKQ73xF1O +yK5GhDDX8oVfGKF5u+decIsH4YaTw7mP3GFxJSqv3+0lUFJoi5Lc5da149p90Ids +hCExroL1+7mryIkXPeFM5TgO9r0rvZaBFOvV2z0gp35Z0+L4WPlbuEjN/lxPFin+ +HlUjr8gRsI3qfJOQFy/9rKIJR0Y/8Omwt/8oTWgy1mdeHmmjk7j1nYsvC9JSQ6Zv +MldlTTKB3zhThV1+XWYp6rjd5JW1zbVWEkLNxE7GJThEUG3szgBVGP7pSWTUTsqX +nLRbwHOoq7hHwg== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFYDCCBEigAwIBAgIQQAF3ITfU6UK47naqPGQKtzANBgkqhkiG9w0BAQsFADA/ +MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT +DkRTVCBSb290IENBIFgzMB4XDTIxMDEyMDE5MTQwM1oXDTI0MDkzMDE4MTQwM1ow +TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh +cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwggIiMA0GCSqGSIb3DQEB +AQUAA4ICDwAwggIKAoICAQCt6CRz9BQ385ueK1coHIe+3LffOJCMbjzmV6B493XC +ov71am72AE8o295ohmxEk7axY/0UEmu/H9LqMZshftEzPLpI9d1537O4/xLxIZpL +wYqGcWlKZmZsj348cL+tKSIG8+TA5oCu4kuPt5l+lAOf00eXfJlII1PoOK5PCm+D +LtFJV4yAdLbaL9A4jXsDcCEbdfIwPPqPrt3aY6vrFk/CjhFLfs8L6P+1dy70sntK +4EwSJQxwjQMpoOFTJOwT2e4ZvxCzSow/iaNhUd6shweU9GNx7C7ib1uYgeGJXDR5 +bHbvO5BieebbpJovJsXQEOEO3tkQjhb7t/eo98flAgeYjzYIlefiN5YNNnWe+w5y +sR2bvAP5SQXYgd0FtCrWQemsAXaVCg/Y39W9Eh81LygXbNKYwagJZHduRze6zqxZ +Xmidf3LWicUGQSk+WT7dJvUkyRGnWqNMQB9GoZm1pzpRboY7nn1ypxIFeFntPlF4 +FQsDj43QLwWyPntKHEtzBRL8xurgUBN8Q5N0s8p0544fAQjQMNRbcTa0B7rBMDBc +SLeCO5imfWCKoqMpgsy6vYMEG6KDA0Gh1gXxG8K28Kh8hjtGqEgqiNx2mna/H2ql +PRmP6zjzZN7IKw0KKP/32+IVQtQi0Cdd4Xn+GOdwiK1O5tmLOsbdJ1Fu/7xk9TND +TwIDAQABo4IBRjCCAUIwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYw +SwYIKwYBBQUHAQEEPzA9MDsGCCsGAQUFBzAChi9odHRwOi8vYXBwcy5pZGVudHJ1 +c3QuY29tL3Jvb3RzL2RzdHJvb3RjYXgzLnA3YzAfBgNVHSMEGDAWgBTEp7Gkeyxx ++tvhS5B1/8QVYIWJEDBUBgNVHSAETTBLMAgGBmeBDAECATA/BgsrBgEEAYLfEwEB +ATAwMC4GCCsGAQUFBwIBFiJodHRwOi8vY3BzLnJvb3QteDEubGV0c2VuY3J5cHQu +b3JnMDwGA1UdHwQ1MDMwMaAvoC2GK2h0dHA6Ly9jcmwuaWRlbnRydXN0LmNvbS9E +U1RST09UQ0FYM0NSTC5jcmwwHQYDVR0OBBYEFHm0WeZ7tuXkAXOACIjIGlj26Ztu +MA0GCSqGSIb3DQEBCwUAA4IBAQAKcwBslm7/DlLQrt2M51oGrS+o44+/yQoDFVDC +5WxCu2+b9LRPwkSICHXM6webFGJueN7sJ7o5XPWioW5WlHAQU7G75K/QosMrAdSW +9MUgNTP52GE24HGNtLi1qoJFlcDyqSMo59ahy2cI2qBDLKobkx/J3vWraV0T9VuG +WCLKTVXkcGdtwlfFRjlBz4pYg1htmf5X6DYO8A4jqv2Il9DjXA6USbW1FzXSLr9O +he8Y4IWS6wY7bCkjCWDcRQJMEhg76fsO3txE+FiYruq9RUWhiF1myv4Q6W+CyBFC +Dfvp7OOGAN6dEOM4+qR9sdjoSYKEBpsr6GtPAQw4dy753ec5 +-----END CERTIFICATE----- diff --git a/prscd/lo.yomo.dev.key b/prscd/lo.yomo.dev.key new file mode 100644 index 0000000..01b2828 --- /dev/null +++ b/prscd/lo.yomo.dev.key @@ -0,0 +1,5 @@ +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgS45Z3zOVSgpbJbw+ +bTd+qfvw2FWz9XPcsJGi8Fd1YoGhRANCAASzsYgddwIKKwsg7BH7mR+N9+mCCn2o +6cts1jJJJ1WL/4Y6D+rHR63eXNsHN1ey2Yar/hu4xARqJ5HLtlx61Ai4 +-----END PRIVATE KEY----- diff --git a/prscd/spec/sequence.md b/prscd/spec/sequence.md new file mode 100644 index 0000000..718c690 --- /dev/null +++ b/prscd/spec/sequence.md @@ -0,0 +1,41 @@ +![graph](https://www.websequencediagrams.com/cgi-bin/cdraw?lz=dGl0bGUgUHJlc2VuY2VqcyB2MgoKYWN0b3IgQWxpY2UABQdCb2IKAAsFLT5Cb2I6IEJvYiBpcyBvbmxpbmUsACUGIGpvaW4gdGhpcyBjaGFubmVsLCBzeW5jIHN0YXRlLgoKbm90ZSBvdmUAVAcsIEJvYiwgcHJzY2Q6IFN0ZXAgMSAtIEF1dGgAZAgAFQdXZWJTb2NrZXQvV2ViVHJhbnNwb3J0LCB3aXRoIGBhdXRoYCBhbmQgYGlkYABgBnJpZ2h0IG9mAFYIAFAFAGUFLT4rQXV0aFN2YwAQBlN2YwoACgctLT4tACULUmVzdWx0AC4IAIIJBTogNHh4IGlmIGF1dGggZmFpbCwgVXBncmFkZSBpZiBzdWNjZXNzAIFGJDIgLSBKb2luIEMAgiUGAIJWCCsAgggHYACCPAdfam9pbmAAgTQHLT4tAIEABwARDiBBQ0sAgxcIKDIpAIMdBWBwZWVyXwCDGwZgAIIkBnN0YXRfb2JqIHBheWxvYWQAghwPAINUBWFkZACDRwd0bwCDWAcgdXNlcnMgY2FjaGUAHxRzaG91bGQgcmVzcG9uZAByBwCDbwVgCkJvYgCBDgUAgS8IABAMAIQKBWxlZgCDIwUADhMAUQhhZGQAhGsFdG8AfQcAgVUWAC4HYXQgYW55IHRpbQCBGBV1cGRhdGUAhSoHAIIJCACEdyQzIC0gQnJvYWRjYXN0IGN1c3RvbWUgZGF0YQCCaRFkYXRhYACFSyQ0IC0gTGVhdmUAg3YQAIM2DGZmAIM-BQCDeQoACRQAgzcTcmVtb3YAgWIIZnJvbSBsb2NhbACCPAg&s=default) + +Source file: + +Open: https://www.websequencediagrams.com/#, Source code is: + +```text +title Presencejs v2 - 20221009 + +actor Alice +actor Bob +Alice->Bob: Bob is online, Alice join this channel, sync state. + +note over Alice, Bob, prscd: Step 1 - Auth +Alice->prscd: WebSocket/WebTransport, with `auth` and `id` +note right of prscd: Auth +prscd->+AuthSvc: AuthSvc +AuthSvc-->-prscd: AuthResult +prscd->Alice: 4xx if auth fail, Upgrade if success + +note over Alice, Bob, prscd: Step 2 - Join Channel +Alice->+prscd: `channel_join` +prscd-->-Alice: `channel_join` ACK +Alice->(2)Bob: `peer_online` with stat_obj payload +note right of Bob: add Alice to online users cache +note right of Bob: should respond `peer_state` +Bob->(2)Alice: `peer_state` +note left of Alice: `peer_state` should add Bob to cache + +Alice->(2)Bob: `peer_state` at any time +note right of Bob: update Alice stat_obj + +note over Alice, Bob, prscd: Step 3 - Broadcast custome data +Alice->(2)Bob: `data` + +note over Alice, Bob, prscd: Step 4 - Leave Channel +Alice->Bob: `peer_offline` +prscd-->-Bob: `peer_offline` +note right of Bob: remove Alice from local cache +``` + diff --git a/prscd/util/log.go b/prscd/util/log.go new file mode 100644 index 0000000..544ffea --- /dev/null +++ b/prscd/util/log.go @@ -0,0 +1,73 @@ +// Package util is a collection of utility functions. +package util + +import ( + "fmt" + "os" +) + +type logLevelType int + +const ( + // DEBUG level + DEBUG logLevelType = iota + // INFO level + INFO + // ERROR level + ERROR +) + +type plog struct { + logLevel logLevelType +} + +// Info prints log to Stdout. +func (l *plog) Info(format string, a ...any) { + if l.logLevel > INFO { + return + } + _, _ = fmt.Fprintf(os.Stdout, format+"\r\n", a...) +} + +// Inspect prints log to stdout, but will not add a newline +func (l *plog) Inspect(format string, a ...any) { + if l.logLevel > INFO { + return + } + _, _ = fmt.Fprintf(os.Stdout, format+"\r", a...) +} + +// Error prints log to stderr. +func (l *plog) Error(format string, a ...any) { + if l.logLevel > DEBUG { + _, _ = fmt.Fprintf(os.Stderr, format+"\r\n", a...) + } else { + _, _ = fmt.Fprintf(os.Stderr, "\033[31m"+format+"\033[0m\r\n", a...) + } +} + +// Debug log to stdout with colors. +func (l *plog) Debug(format string, a ...any) { + if l.logLevel > DEBUG { + return + } + l.Info("\033[36m"+format+"\033[0m", a...) +} + +// Fatal prints log to stderr and exit. +func (l *plog) Fatal(err error) { + l.Error("FATAL:%s", err) + os.Exit(1) +} + +// SetLogLevel set log level. +func (l *plog) SetLogLevel(lvl logLevelType) { + l.logLevel = lvl +} + +// Log is a global logger +var Log *plog + +func init() { + Log = &plog{logLevel: INFO} +} diff --git a/prscd/websocket.html b/prscd/websocket.html new file mode 100644 index 0000000..f2d6d84 --- /dev/null +++ b/prscd/websocket.html @@ -0,0 +1,134 @@ + + + + +

Realtime communication over Websocket

+
    +
+ + + + \ No newline at end of file diff --git a/prscd/websocket/listen_darwin.go b/prscd/websocket/listen_darwin.go new file mode 100644 index 0000000..4697324 --- /dev/null +++ b/prscd/websocket/listen_darwin.go @@ -0,0 +1,25 @@ +//go:build darwin +// +build darwin + +package websocket + +import ( + "net" + "syscall" + + "golang.org/x/sys/unix" +) + +// reuse port on darwin +var lc = net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var opErr error + if err := c.Control(func(fd uintptr) { + // 端口复用,这样可以多进程监听该端口,充分利用 CPU 资源;同时也可以实现热更新 + opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + }); err != nil { + return err + } + return opErr + }, +} diff --git a/prscd/websocket/listen_unix.go b/prscd/websocket/listen_unix.go new file mode 100644 index 0000000..5cd1c65 --- /dev/null +++ b/prscd/websocket/listen_unix.go @@ -0,0 +1,27 @@ +//go:build freebsd || linux || netbsd || openbsd +// +build freebsd linux netbsd openbsd + +package websocket + +import ( + "golang.org/x/sys/unix" + "net" + "syscall" +) + +var lc = net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + var opErr error + if err := c.Control(func(fd uintptr) { + // reuse port + opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + // TCP_NODELAY + opErr = unix.SetsockoptInt(int(fd), unix.IPPROTO_TCP, unix.TCP_NODELAY, 1) + // set priority of the socket + opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_PRIORITY, 6) + }); err != nil { + return err + } + return opErr + }, +} diff --git a/prscd/websocket/listen_windows.go b/prscd/websocket/listen_windows.go new file mode 100644 index 0000000..f9befa5 --- /dev/null +++ b/prscd/websocket/listen_windows.go @@ -0,0 +1,10 @@ +//go:build windows +// +build windows + +package websocket + +import ( + "net" +) + +var lc = net.ListenConfig{} diff --git a/prscd/websocket/main.go b/prscd/websocket/main.go new file mode 100644 index 0000000..61aa0ce --- /dev/null +++ b/prscd/websocket/main.go @@ -0,0 +1,264 @@ +// Package websocket serve websocket connections +package websocket + +import ( + "context" + "crypto/tls" + "encoding/binary" + "errors" + "io" + "net" + "net/http" + "net/url" + "os" + "time" + + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" + + "yomo.run/prscd/chirp" + "yomo.run/prscd/util" +) + +var log = util.Log + +const ( + // DurationOfPing describes the interval of ping + DurationOfPing = 10 * time.Second +) + +// ListenAndServe create the websocket server +func ListenAndServe(addr string, config *tls.Config) { + // create TCP listener + lp, err := lc.Listen(context.Background(), "tcp", addr) + if err != nil { + log.Fatal(err) + } + defer lp.Close() + + // wrap TCP listener with TLS + ln := tls.NewListener(lp, config) + defer ln.Close() + + log.Info("Prscd - WebSocket Server - Listening on %s", ln.Addr()) + + var node = chirp.Node + + for { + // TCP has new connection + conn, err := ln.Accept() + if err != nil { + log.Error("ln.accept error: %s", err) + conn.Close() + continue + } + + var cuid, appID string // presencejs client user id + + rejectionHeader := ws.RejectionHeader(ws.HandshakeHeaderString("X-Prscd-Version: v2\r\nX-Prscd-MeshID: " + os.Getenv("MESH_ID") + "\r\n")) + + // HTTP layer + u := ws.Upgrader{ + OnRequest: func(req []byte) error { + // the request url should be like: /v1?id=xxx&publickey=xxx + url, err := url.ParseRequestURI(string(req)) + if err != nil { + log.Error("url parse error: %s", err) + return ws.RejectConnectionError( + ws.RejectionStatus(500), + rejectionHeader, + ws.RejectionReason("url parse error"), + ) + } + log.Info("path: %s, query: %+v", url.Path, url.Query()) + if url.Path != chirp.Endpoint { + return ws.RejectConnectionError( + ws.RejectionStatus(404), + rejectionHeader, + ws.RejectionReason("path not allowed"), + ) + } + cuid = url.Query().Get("id") + if cuid == "" { + return ws.RejectConnectionError( + ws.RejectionStatus(401), + rejectionHeader, + ws.RejectionReason("id must not be empty"), + ) + } + // publickey can be used for identify user if developer want integrate with other systems + authPublicKey := url.Query().Get("publickey") + if authPublicKey == "" { + return ws.RejectConnectionError( + ws.RejectionStatus(401), + rejectionHeader, + ws.RejectionReason("publickey must not be empty"), + ) + } + var ok bool + appID, ok = chirp.Node.AuthUser(authPublicKey) + if !ok { + return ws.RejectConnectionError( + ws.RejectionStatus(403), + rejectionHeader, + ws.RejectionReason("illegal public key"), + ) + } + log.Info("query.id: %s", cuid) + return nil + }, + OnHeader: func(key, value []byte) error { + // implement this method to check request headers if needed + // log.Info("header: %s=%s", string(key), string(value)) + return nil + }, + OnBeforeUpgrade: func() (ws.HandshakeHeader, error) { + // before upgrade to websocket, logic can be implemented here + return ws.HandshakeHeaderHTTP(http.Header{ + "X-Prscd-Version": []string{"v2.0.0-alpha"}, + "X-Prscd-MESHID": []string{os.Getenv("MESH_ID")}, + }), nil + }, + } + + // zero-copy resuse the TCP connection + p, err := u.Upgrade(conn) + if err != nil { + if err == io.EOF { + log.Inspect("[%s] connection closed by peer.", conn.RemoteAddr().String()) + } else { + log.Info("[ws] new conn: %s", conn.RemoteAddr().String()) + // if is rejected connection error, send close frame to client + var rejectErr *ws.ConnectionRejectedError + if errors.As(err, &rejectErr) { + ws.WriteFrame(conn, ws.NewCloseFrame(ws.NewCloseFrameBody(ws.StatusCode(rejectErr.StatusCode()), rejectErr.Error()))) + log.Error("[%s] u.upgrade reject error: %v, close connection.", conn.RemoteAddr().String(), err) + } else { + log.Error("[%s] u.upgrade unknown error: %+v, close connection", conn.RemoteAddr().String(), err) + } + } + + // closeConn(conn, "886") + conn.Write(ws.CompiledClose) + continue + } + + log.Info("[%s] upgrade success, start serving: %v", conn.RemoteAddr().String(), p) + + // create peer instance after Websocket handshake + pconn := chirp.NewWebSocketConnection(conn) + peer := node.AddPeer(pconn, cuid, appID) + log.Debug("[%s-%s] Upgrade done!", peer.Sid, peer.Cid) + + keepaliveDone := make(chan bool) + go func(c net.Conn) { + // 浏览器不会发送 Ping,一定是服务器端发 Ping,浏览器会自动回应 Pong(但在 DevTools 里是不显示Ping/Pong的) + // according to https://tools.ietf.org/html/rfc6455#section-5.5.2, Web Browsers will not send Ping frame, + // backend server should send Ping frame to keep connection alive, and Web Browsers will auto reply Pong frame when receive Ping frame. But in Chrome DevTools, Ping/Pong frame is not shown. + ticker := time.NewTicker(DurationOfPing) + defer ticker.Stop() + for { + select { + case <-keepaliveDone: + log.Debug("[%s] ticker done", peer.Sid) + return + case <-ticker.C: + c.Write(generatePingFrame()) + } + } + }(conn) + + // handle WebSocket requests + go func() { + defer conn.Close() + defer close(keepaliveDone) + + for { + // read data + header, r, err := wsutil.NextReader(conn, ws.StateServerSide) + if err != nil { + log.Error("read from ws error: %+v", err) + switch et := err.(type) { + case wsutil.ClosedError: + // Client close the connection: + log.Info("[client disconnect] ClosedError: %v, %v", et.Code, et.Reason) + default: + // detect connection has been closed + log.Info("read error: [%v] %v", et, err) + // send Close frame to client + conn.Write(ws.MustCompileFrame(ws.NewCloseFrame(ws.NewCloseFrameBody(ws.StatusGoingAway, "bye")))) + } + // clear connection + peer.Disconnect() + return + } + + // handle Websocket Control Frame: https://www.rfc-editor.org/rfc/rfc6455#section-5.5 + // there are three types of Control Frame: 0x08(Close), 0x09(Ping) and 0x0A(Pong) + // be careful that Control frames can be interjected in the middle of a fragmented message. + if header.OpCode.IsControl() { + // Close Frame + if header.OpCode == ws.OpClose { + log.Info("[%s] >GOT CLOSE", peer.Sid) + peer.Disconnect() + wsutil.ControlFrameHandler(conn, ws.StateServerSide) + return + } + + // Pong Frame + if header.OpCode == ws.OpPong { + handlePongFrame(peer.Sid, r, header) + continue + } + + log.Debug("[%s] >GOT Unhandled Control Frame: %+v", peer.Sid, header.OpCode) + wsutil.ControlFrameHandler(conn, ws.StateServerSide) + + continue + } + + // handle Websocket Data Frames: https://www.rfc-editor.org/rfc/rfc6455#section-5.6 + // only accept Binary mode message, will break if receive Text mode message + if header.OpCode == ws.OpText { + log.Error("Peer: %s sent text which not allowed", peer.Sid) + break + } + + _ = peer.HandleSignal(r) + } + }() + } +} + +// generatePingFrame return a Ping Frame +func generatePingFrame() []byte { + // according to RFC6455: https://www.rfc-editor.org/rfc/rfc6455#section-5.5.2, + // Application Data can be carried by Ping frame, and the payload will be returned in Pong frame from Web Browser automatically, so we can calculate the RTT by this. + ts := time.Now().UnixMilli() + tsbuf := make([]byte, 8) + binary.BigEndian.PutUint64(tsbuf, uint64(ts)) + pf := ws.MustCompileFrame(ws.NewPingFrame(tsbuf)) + return pf +} + +// handlePongFrame handle Pong Frame from Web Browser +func handlePongFrame(sid string, r io.Reader, header ws.Header) error { + // read the Application Data from Pong frame + buf := make([]byte, header.Length) + _, err := io.ReadFull(r, buf) + if err != nil { + log.Error("Read PONG payload err: %+v", err) + return err + } + // calculate the RTT and prints to stdout + appData := int64(binary.BigEndian.Uint64(buf)) + now := time.Now().UnixMilli() + log.Inspect("[%s]\tPONG Payload, len=%d, data=%d, 𝚫=%dms", sid, len(buf), appData, now-appData) + return nil +} + +// closeConn send Close Frame to client and close the connection +func closeConn(conn net.Conn, reason string) { + ws.WriteFrame(conn, ws.NewCloseFrame(ws.NewCloseFrameBody(ws.StatusNormalClosure, reason))) + conn.Close() +} diff --git a/prscd/webtransport.html b/prscd/webtransport.html new file mode 100644 index 0000000..e560bcc --- /dev/null +++ b/prscd/webtransport.html @@ -0,0 +1,160 @@ + + + + + +

Realtime communication over WebTransport

+
    +
+ + + + + \ No newline at end of file diff --git a/prscd/webtransport/http3.go b/prscd/webtransport/http3.go new file mode 100644 index 0000000..103e651 --- /dev/null +++ b/prscd/webtransport/http3.go @@ -0,0 +1,242 @@ +package webtransport + +import ( + "bufio" + "bytes" + "errors" + "io" + "net/http" + "strconv" + "strings" + + "github.com/quic-go/qpack" + "github.com/quic-go/quic-go" + "github.com/quic-go/quic-go/quicvarint" +) + +func sendSettingsFrame(sess quic.Connection) { + // server should send HTTP SETTINGS frame to client + log.Debug("[1] Send SETTINGS frame") + // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-02.html#section-3.1 + // In order to indicate support for WebTransport, both the client and the server MUST send a SETTINGS_ENABLE_WEBTRANSPORT value set to "1" in their SETTINGS frame. + // [1] send SETTINGS frame + // https://www.w3.org/TR/webtransport/#webtransport-constructor + // 6. Wait for connection to receive the first SETTINGS frame, and let settings be a dictionary that represents the SETTINGS frame. + // 7. If settings doesn’t contain SETTINGS_ENABLE_WEBTRANPORT with a value of 1, or it doesn’t contain H3_DATAGRAM with a value of 1, then abort the remaining steps and queue a network task with transport to run these steps: + respStream, err := sess.OpenUniStream() + if err != nil { + log.Error("sess.OpenUniStream error: %v", err) + return + } + + // https://datatracker.ietf.org/doc/draft-ietf-masque-h3-datagram/ + // Implementations of HTTP/3 that support HTTP Datagrams can indicate + // that to their peer by sending the H3_DATAGRAM SETTINGS parameter with + // a value of 1. The value of the H3_DATAGRAM SETTINGS parameter MUST + // be either 0 or 1. A value of 0 indicates that HTTP Datagrams are not + // supported. An endpoint that receives the H3_DATAGRAM SETTINGS + // parameter with a value that is neither 0 or 1 MUST terminate the + // connection with error H3_SETTINGS_ERROR. + // buf := &bytes.Buffer{} + buf := make([]byte, 0, 64) + // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-6.2.1 + // A control stream is indicated by a stream type of 0x00. Data on this stream consists of HTTP/3 frames, as defined in Section 7.2. + // Each side MUST initiate a single control stream at the beginning of the connection and send its SETTINGS frame as the first frame on this stream. If the first frame of the control stream is any other frame type, this MUST be treated as a connection error of type H3_MISSING_SETTINGS. Only one control stream per peer is permitted; receipt of a second stream claiming to be a control stream MUST be treated as a connection error of type + buf = quicvarint.Append(buf, 0x00) + // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-http-framing-layer + // 7. HTTP Framing Layer + // HTTP/3 Frame Format { + // Type (i), + // Length (i), + // Frame Payload (..), + // } + // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-settings + // 7.2.4. SETTINGS + // The SETTINGS frame (type=0x04) conveys configuration parameters that affect how endpoints communicate, such as preferences and constraints on peer behavior. Individually, a SETTINGS parameter can also be referred to as a "setting"; the identifier and value of each setting parameter can be referred to as a "setting identifier" and a "setting value". + buf = quicvarint.Append(buf, 0x04) + var l uint64 + // H3_DATAGRAM + // https://datatracker.ietf.org/doc/html/draft-ietf-masque-h3-datagram-05#section-9.1 + // +==============+==========+===============+=========+ + // | Setting Name | Value | Specification | Default | + // +==============+==========+===============+=========+ + // | H3_DATAGRAM | 0xffd277 | This Document | 0 | + // +--------------+----------+---------------+---------+ + l += uint64(quicvarint.Len(0xffd277) + quicvarint.Len(1)) + // SETTINGS_ENABLE_WEBTRANPORT + // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-02.html#section-8.2 + // The SETTINGS_ENABLE_WEBTRANSPORT parameter indicates that the specified HTTP/3 connection is + // WebTransport-capable. + // Setting Name:ENABLE_WEBTRANSPORT + // Value:0x2b603742 + // Default:0 + l += uint64(quicvarint.Len(0x2b603742) + quicvarint.Len(1)) + // // ??? + // l += uint64(quicvarint.Len(0x276) + quicvarint.Len(1)) + // write Length + buf = quicvarint.Append(buf, l) + // Write value + // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-settings + // The payload of a SETTINGS frame consists of zero or more parameters. Each parameter consists of a setting identifier and a value, both encoded as QUIC variable-length integers. + // + // Setting { + // Identifier (i), + // Value (i), + // } + + // SETTINGS Frame { + // Type (i) = 0x04, + // Length (i), + // Setting (..) ..., + // } + // + // quicvarint.Write(buf, 0x276) + // quicvarint.Write(buf, 1) + buf = quicvarint.Append(buf, 0xffd277) // H3_DATAGRAM + buf = quicvarint.Append(buf, 1) + buf = quicvarint.Append(buf, 0x2b603742) // SETTINGS_ENABLE_WEBTRANSPORT + buf = quicvarint.Append(buf, 1) + + log.Debug("\t[len=%d] %# x", len(buf), buf) + _, err = respStream.Write(buf) + if err != nil { + log.Error("sendSettingsFrame error: %v", err) + } + log.Debug("\tSettings frame sent") +} + +func receiveSettingsFrame(sess quic.Connection) error { + recvSettingStream, _ := sess.AcceptUniStream(sess.Context()) + log.Debug("[2] receive client SETTINGS frame") + sqr := quicvarint.NewReader(recvSettingStream) + // stream type should = 0x00, control stream + sty, err := quicvarint.Read(sqr) + if err != nil { + return err + } + log.Debug("\tStreamType: %# x\r", sty) + // frame type should = 0x04, SETTINGS frame + ftype, err := quicvarint.Read(sqr) + if err != nil { + return err + } + log.Debug("\tFrameType: %# x\r", ftype) + // Settings length + flen, err := quicvarint.Read(sqr) + if err != nil { + return err + } + log.Debug("\tLength: %# x(oct=%d)\r", flen, flen) + // Frame Payload ... + // total length is `flen` + settingsPayload := make(map[uint64]uint64) + payloadBuf := make([]byte, flen) + if _, err := io.ReadFull(recvSettingStream, payloadBuf); err != nil { + return err + } + bb := bytes.NewReader(payloadBuf) + for bb.Len() > 0 { + identifier, err := quicvarint.Read(bb) + if err != nil { + return err + } + value, err := quicvarint.Read(bb) + if err != nil { + return err + } + settingsPayload[identifier] = value + log.Debug("\tidentifier:%# x, value: %d (%#x)\r", identifier, value, value) + } + + return nil +} + +func readHeaderFrame(reqStream io.Reader) ([]qpack.HeaderField, error) { + // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-http-framing-layer + // HEADERS Frame { + // Type (i) = 0x01, + // Length (i), + // Encoded Field Section (..), + // } + qr := quicvarint.NewReader(reqStream) + // read header frame + hdr, err := quicvarint.Read(qr) + if err != nil { + return nil, err + } + log.Debug("\theader: %# x", hdr) + // read the length of the header block + headerBlockLength, err := quicvarint.Read(qr) + if err != nil { + log.Error("readHeaderFrame error: %v", err) + } + log.Debug("\theader block: %# x", headerBlockLength) + + // header frame id is 0x01 + if hdr != 0x01 { + return nil, errors.New("not header frame, should force close connection") + } + + headerBlock := make([]byte, headerBlockLength) + if _, err = io.ReadFull(reqStream, headerBlock); err != nil { + return nil, err + } + decoder := qpack.NewDecoder(nil) + return decoder.DecodeFull(headerBlock) +} + +func writeResponseHeaderFrame(w io.Writer, status int) error { + // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-02.html#name-negotiating-the-draft-versi + // The header corresponding to the + // version described in this draft is Sec-Webtransport-Http3-Draft02; + // its value SHALL be 1. The server SHALL reply with a Sec- + // Webtransport-Http3-Draft header indicating the selected version; its + // value SHALL be draft02 for the version described in this draft. + respHeader := http.Header{} + respHeader.Add("Sec-Webtransport-Http3-Draft", "draft02") + + // From the client's perspective, a WebTransport session is established + // when the client receives a 2xx response. From the server's + // perspective, a session is established once it sends a 2xx response. + var qpackHeaders bytes.Buffer + encoder := qpack.NewEncoder(&qpackHeaders) + encoder.WriteField(qpack.HeaderField{ + Name: ":status", + Value: strconv.Itoa(status), + }) + for k, v := range respHeader { + for index := range v { + encoder.WriteField(qpack.HeaderField{ + Name: strings.ToLower(k), + Value: v[index], + }) + } + } + + // buf := &bytes.Buffer{} + buf := make([]byte, 0, 64) + // https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.2 + // HEADERS Frame { + // Type (i) = 0x01, + // Length (i), + // Encoded Field Section (..), + // } + buf = quicvarint.Append(buf, 0x01) + buf = quicvarint.Append(buf, uint64(qpackHeaders.Len())) + + respWriter := bufio.NewWriter(w) + if _, err := respWriter.Write(buf); err != nil { + return err + } + if _, err := respWriter.Write(qpackHeaders.Bytes()); err != nil { + return err + } + if err := respWriter.Flush(); err != nil { + return err + } + + log.Debug("[4] Response HEADER frame with status:%d", status) + log.Debug("\t%v", respHeader) + + return nil +} diff --git a/prscd/webtransport/main.go b/prscd/webtransport/main.go new file mode 100644 index 0000000..c05e13f --- /dev/null +++ b/prscd/webtransport/main.go @@ -0,0 +1,266 @@ +// Package webtransport runs the webtrans server service +package webtransport + +import ( + "bytes" + "context" + "crypto/tls" + "errors" + "net/url" + "time" + + "github.com/quic-go/quic-go" + + "yomo.run/prscd/chirp" + "yomo.run/prscd/util" +) + +var log = util.Log + +// ListenAndServe create webtransport server +func ListenAndServe(addr string, tlsConfig *tls.Config) { + quicConfig := &quic.Config{ + EnableDatagrams: true, + KeepAlivePeriod: 30 * time.Second, + MaxIncomingStreams: 10000, + MaxIdleTimeout: 6 * time.Second, // when Read timeout + } + + ln, err := quic.ListenAddr(addr, tlsConfig, quicConfig) + if err != nil { + log.Fatal(err) + return + } + log.Info("Prscd - WebTransport Server - Listening on %s", ln.Addr()) + log.Debug("tls: %+v", tlsConfig.NextProtos) + + // processing request + for { + sess, err := ln.Accept(context.Background()) + if err != nil { + log.Error("ln.accept error: %s", err) + continue + } + log.Info("+Session: %s", sess.RemoteAddr().String()) + go handleConnection(sess) + } +} + +func handleConnection(sess quic.Connection) { + closeReason := "cc-88-cc" + defer sess.CloseWithError(0x2, closeReason) + log.Debug("handleConnection: %s", sess.RemoteAddr().String()) + // https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-02.html#section-2 + // 2. Protocol Overview + // When an HTTP/3 connection is established, both the client and server have + // to send a SETTINGS_ENABLE_WEBTRANSPORT setting in order to indicate that + // they both support WebTransport over HTTP/3. + + // WebTransport sessions are initiated inside a given HTTP/3 connection by + // the client, who sends an extended CONNECT request [RFC8441]. If the server + // accepts the request, an WebTransport session is established. + // all the first is create a http3 connection: + + // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-6 + // In version 1 of QUIC, the stream data containing HTTP frames is carried + // by QUIC STREAM frames, but this framing is invisible to the HTTP framing layer. + + // right now, a QUIC connection has been established. So we focus on HTTP frames. + + // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-3.2 + // While connection-level options pertaining to the core QUIC protocol are + // set in the initial crypto handshake, HTTP/3-specific settings are conveyed + // in the SETTINGS frame. After the QUIC connection is established, a SETTINGS + // frame (Section 7.2.4) MUST be sent by each endpoint as the initial frame + // of their respective HTTP control stream; see Section 6.2.1. + + // so, + // Step 1: Server send SETTINGS frame + go sendSettingsFrame(sess) + + // Step 2: Server receive SETTINGS frame from client + err := receiveSettingsFrame(sess) + if err != nil { + log.Error("webtrans|receiveSettingsFrame error: %s", err) + closeReason = "error in receive settings frame" + return + } + + // Step 3: wait for reading client HTTP CONNECT (client indicatation) + stream, err := sess.AcceptStream(context.Background()) + if err != nil { + log.Error("webtrans|acceptStream error: %s", err) + closeReason = "error in accept stream" + return + } + log.Debug("\trequest stream accepted: %d", stream.StreamID()) + + var publicKey, userID string + status, err := receiveHTTPConnectHeaderFrame(stream, &publicKey, &userID) + if err != nil { + log.Error("webtrans|receiveHTTPConnectHeaderFrame error: %s", err) + closeReason = "error in receive http connect header frame" + return + } + + appID, ok := chirp.Node.AuthUser(publicKey) + if !ok { + status = 401 + } + + // Step 4: response HEADER frame if client is valid + err = writeResponseHeaderFrame(stream, status) + if err != nil { + log.Error("webtrans|writeResponseHeaderFrame error: %s", err) + closeReason = "error in write response header frame" + return + } + + if status >= 300 { + closeReason = "failed with response status other than 2xx" + return + } + + log.Debug("Prepared! Start to work ... uid: %s", userID) + + // Step 5: start to processing presencejs protocol + pconn := chirp.NewWebTransportConnection(sess) + peer := chirp.Node.AddPeer(pconn, userID, appID) + log.Info("[%s-%s] Upgrade done!", peer.Sid, peer.Cid) + + // Handle Datagram + go func() { + for { + msg, err := sess.ReceiveMessage(context.Background()) + if err != nil { + // ignore errors here, we will handle client close event in stream loop + return + } + log.Debug("ReceiveMessage: %s", msg) + // log.Debug("ReceiveMessage: %# x", msg) + // be careful, the first byte of msg is 0x00 + reader := bytes.NewReader(msg[1:]) + peer.HandleSignal(reader) + } + }() + + for { + var buf = make([]byte, 1024) + _, err := stream.Read(buf) + if err != nil { + // if client close the connection, error will occurs here + // for example: err:timeout: no recent network activity + log.Error("[%s] stream.Read error: %s", pconn.RemoteAddr(), err) + peer.Disconnect() + stream.Close() + sess.CloseWithError(0, "client disconnected") + break + } + } +} + +// [3]: wait for reading client HTTP CONNECT (client indicatation) +// https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-3.3 +// In order to create a new WebTransport session, a client can send an +// HTTP CONNECT request. The :protocol pseudo-header field ([RFC8441]) +// MUST be set to webtransport. The :scheme field MUST be https. Both +// the :authority and the :path value MUST be set; those fields indicate +// the desired WebTransport server. An Origin header [RFC6454] MUST be +// provided within the request. + +// Upon receiving an extended CONNECT request with a :protocol field set +// to webtransport, the HTTP/3 server can check if it has a WebTransport +// server associated with the specified :authority and :path values. If +// it does not, it SHOULD reply with status code 404 (Section 6.5.4, +// [RFC7231]). If it does, it MAY accept the session by replying with a +// 2xx series status code, as defined in Section 15.3 of [SEMANTICS]. +// The WebTransport server MUST verify the Origin header to ensure that +// the specified origin is allowed to access the server in question. +// +// From the client's perspective, a WebTransport session is established +// when the client receives a 2xx response. From the server's +// perspective, a session is established once it sends a 2xx response. +// WebTransport over HTTP/3 does not support 0-RTT. +func receiveHTTPConnectHeaderFrame(reqStream quic.Stream, publicKey, userID *string) (status int, err error) { + log.Debug("[3] Receive HTTP CONNECT from client") + + // read header frame which client requested + headers, err := readHeaderFrame(reqStream) + if err != nil { + return 401, err + } + + // if developers need validate request header, below is the best place to do it + // The :protocol pseudo-header field ([RFC8441]) + // MUST be set to webtransport. + // The :scheme field MUST be https. + // Both the :authority and the :path value MUST be set; those fields indicate + // the desired WebTransport server. An Origin header [RFC6454] MUST be + // provided within the request. + // Which looks like: + // 2022/02/07 11:24:59 [header] 0: {:scheme https} + // 2022/02/07 11:24:59 [header] 1: {:method CONNECT} + // 2022/02/07 11:24:59 [header] 2: {:authority lo.yomo.dev:4433} + // 2022/02/07 11:24:59 [header] 3: {:path /counter} + // 2022/02/07 11:24:59 [header] 4: {:protocol webtransport} + // 2022/02/07 11:24:59 [header] 5: {sec-webtransport-http3-draft02 1} + // 2022/02/07 11:24:59 [header] 6: {origin https://webtransport-client.vercel.app} + + var authority, path, scheme, protocol, origin, method, version string + for key, val := range headers { + log.Debug("\t[header] %d: %s", key, val) + if val.Name == ":authority" { // like prscd.yomo.dev:443 + authority = val.Value + } else if val.Name == ":path" { // `/v1/webtrans?publickey=123&id=yomo-1` + path = val.Value + } else if val.Name == ":scheme" { // must be https + scheme = val.Value + } else if val.Name == ":method" { // CONNECT + method = val.Value + } else if val.Name == ":protocol" { // must be webtransport + protocol = val.Value + } else if val.Name == "origin" { // origin of client + origin = val.Value + } else if val.Name == "sec-webtransport-http3-draft02" { // must be 1 + version = val.Value + } + } + + if protocol != "webtransport" { + return 401, errors.New("protocol has to be webtransport") + } + + if scheme != "https" { + return 401, errors.New("scheme has to be https") + } + + if method != "CONNECT" { + return 401, errors.New("method has to be CONNECT") + } + + if version != "1" { + return 401, errors.New("sec-webtransport-http3-draft02 has to be 1") + } + + // if origin need to be validated, do it here + log.Debug("origin: %s", origin) + + // by checking authority, I'd like tell out the environment of service, like dev, test and prod, because I have different domains for them + log.Debug("authority: %s", authority) + + // validate service version and auth + reqPath, err := url.Parse(path) + if err != nil { + return 401, errors.New("path is invalid: " + err.Error()) + } + log.Debug("request Path: %s, QueryString: %+v", reqPath.Path, reqPath.Query()) + + if reqPath.Path != chirp.Endpoint { + return 404, errors.New("path has to be /v1/webtrans") + } + + *userID = reqPath.Query().Get("id") + *publicKey = reqPath.Query().Get("publickey") + + return 200, nil +} diff --git a/prscd/yomo.yaml b/prscd/yomo.yaml new file mode 100644 index 0000000..28dc20a --- /dev/null +++ b/prscd/yomo.yaml @@ -0,0 +1,6 @@ +name: yomo_prscd_bridge +host: 0.0.0.0 +port: 9000 +functions: + - name: prscd-sender + - name: prscd-receiver