From d524fbc6aebe4970d705b4af25da41507a8c9b97 Mon Sep 17 00:00:00 2001 From: FZambia Date: Mon, 2 Sep 2024 21:39:12 +0300 Subject: [PATCH] drop sockjs, drop tarantool --- .github/workflows/test.yml | 7 - go.mod | 4 - go.sum | 10 - internal/middleware/log.go | 4 +- internal/sockjs/cancelctx.go | 34 - internal/sockjs/handler_sockjs.go | 278 -------- internal/sockjs/handler_sockjs_test.go | 280 --------- internal/sockjs/pool.go | 45 -- internal/sockjs/pool_test.go | 34 - internal/tntengine/broker.go | 839 ------------------------- internal/tntengine/broker_test.go | 396 ------------ internal/tntengine/multi_conn.go | 191 ------ internal/tntengine/presence.go | 179 ------ internal/tntengine/shard.go | 135 ---- internal/tntengine/util.go | 109 ---- internal/usage/usage.go | 4 - main.go | 129 +--- misc/tnt/init.lua | 52 -- misc/tnt/readme.md | 6 - 19 files changed, 5 insertions(+), 2731 deletions(-) delete mode 100644 internal/sockjs/cancelctx.go delete mode 100644 internal/sockjs/handler_sockjs.go delete mode 100644 internal/sockjs/handler_sockjs_test.go delete mode 100644 internal/sockjs/pool.go delete mode 100644 internal/sockjs/pool_test.go delete mode 100644 internal/tntengine/broker.go delete mode 100644 internal/tntengine/broker_test.go delete mode 100644 internal/tntengine/multi_conn.go delete mode 100644 internal/tntengine/presence.go delete mode 100644 internal/tntengine/shard.go delete mode 100644 internal/tntengine/util.go delete mode 100755 misc/tnt/init.lua delete mode 100644 misc/tnt/readme.md diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9760e6e669..1b20a8028a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,6 @@ jobs: strategy: matrix: go-version: [1.22.6] - tarantool-version: [2.7.2] steps: - name: Install Go uses: actions/setup-go@v5 @@ -37,11 +36,5 @@ jobs: - name: Start services run: docker compose up -d --wait - - name: Install rocks - run: docker run --rm -v $(pwd)/misc/tnt:/opt/tarantool tarantool/tarantool:${{ matrix.tarantool-version }} tarantoolctl rocks install https://raw.githubusercontent.com/centrifugal/tarantool-centrifuge/main/centrifuge-scm-1.rockspec - - - name: Start Tarantool - run: docker run -d -p 3301:3301 -v $(pwd)/misc/tnt:/opt/tarantool tarantool/tarantool:${{ matrix.tarantool-version }} tarantool /opt/tarantool/init.lua - - name: Test run: make test-integration diff --git a/go.mod b/go.mod index a2722bf239..fd1a7e1257 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( github.com/FZambia/eagle v0.1.0 github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b - github.com/FZambia/tarantool v0.3.1 github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e github.com/centrifugal/centrifuge v0.33.2 github.com/centrifugal/protocol v0.13.4 @@ -33,7 +32,6 @@ require ( github.com/twmb/franz-go/pkg/kadm v1.13.0 github.com/twmb/franz-go/pkg/kmsg v1.8.0 github.com/valyala/fasttemplate v1.2.2 - github.com/vmihailenco/msgpack/v5 v5.4.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 go.opentelemetry.io/otel v1.29.0 @@ -76,7 +74,6 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect - github.com/igm/sockjs-go/v3 v3.0.3 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.17.9 // indirect @@ -99,7 +96,6 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.29.0 go.opentelemetry.io/otel/metric v1.29.0 // indirect diff --git a/go.sum b/go.sum index f702338b93..f5c0829618 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,6 @@ github.com/FZambia/eagle v0.1.0 h1:9gyX6x+xjoIfglgyPTcYm7dvY7FJ93us1QY5De4CyXA= github.com/FZambia/eagle v0.1.0/go.mod h1:YjGSPVkQTNcVLfzEUQJNgW9ScPR0K4u/Ky0yeFa4oDA= github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b h1:D3CXZ/tXFtPMSN5FlhHVezJJp9eqDPR3m27OVptqZYE= github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b/go.mod h1:EqC55Pa/sH33dUeH/rkKY8EHRfnm2JyV+ORPzURdGm0= -github.com/FZambia/tarantool v0.3.1 h1:M6FiJrUBu1TvE8aySwSu47He7aYrJvufr+VPzP8FPWo= -github.com/FZambia/tarantool v0.3.1/go.mod h1:YHnvW/H6TPJP04s3RtbBFqvxTvqfYnPBd+TVM1GWdsw= github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e h1:COyWHWCYUotWRo+Z1Lk8B9NDceEybV61C9diY7YVj8g= github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e/go.mod h1:hx7D3T4iFXiy0QWL4m3yNfzz5CQCtbV5yNdE4UlWo0s= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -68,7 +66,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kXD8ePA= github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= @@ -76,8 +73,6 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1 github.com/hashicorp/go-envparse v0.1.0 h1:bE++6bhIsNCPLvgDZkYqo3nA+/PFI51pkrHdmPSDFPY= github.com/hashicorp/go-envparse v0.1.0/go.mod h1:OHheN1GoygLlAkTlXLXvAdnXdZxy8JUweQ1rAXx1xnc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/igm/sockjs-go/v3 v3.0.3 h1:TlRBWiMzYO73iF6F9Q2Frgz90sN35VJB88qPDkNUJHc= -github.com/igm/sockjs-go/v3 v3.0.3/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -211,11 +206,6 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= -github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= -github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= -github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= -github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= diff --git a/internal/middleware/log.go b/internal/middleware/log.go index 9aaa713873..9783ccf49d 100644 --- a/internal/middleware/log.go +++ b/internal/middleware/log.go @@ -57,7 +57,7 @@ func (lrw *logResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return hijacker.Hijack() } -// Flush as SockJS uses http.Flusher. +// Flush implements http.Flusher. func (lrw *logResponseWriter) Flush() { lrw.ResponseWriter.(http.Flusher).Flush() } @@ -72,7 +72,7 @@ func (lrw *logResponseWriter) HTTPStream() http3.Stream { return lrw.ResponseWriter.(http3.HTTPStreamer).HTTPStream() } -// CloseNotify as SockJS uses http.CloseNotifier. +// CloseNotify implements http.CloseNotifier. // //goland:noinspection GoDeprecation func (lrw *logResponseWriter) CloseNotify() <-chan bool { diff --git a/internal/sockjs/cancelctx.go b/internal/sockjs/cancelctx.go deleted file mode 100644 index 54c3996419..0000000000 --- a/internal/sockjs/cancelctx.go +++ /dev/null @@ -1,34 +0,0 @@ -package sockjs - -import ( - "context" - "time" -) - -// customCancelContext wraps context and cancels as soon as channel closed. -type customCancelContext struct { - context.Context - ch <-chan struct{} -} - -// Deadline not used. -func (c customCancelContext) Deadline() (time.Time, bool) { return time.Time{}, false } - -// Done returns channel that will be closed as soon as connection closed. -func (c customCancelContext) Done() <-chan struct{} { return c.ch } - -// Err returns context error. -func (c customCancelContext) Err() error { - select { - case <-c.ch: - return context.Canceled - default: - return nil - } -} - -// NewCancelContext returns a wrapper context around original context that will -// be canceled on channel close. -func NewCancelContext(ctx context.Context, ch <-chan struct{}) context.Context { - return customCancelContext{Context: ctx, ch: ch} -} diff --git a/internal/sockjs/handler_sockjs.go b/internal/sockjs/handler_sockjs.go deleted file mode 100644 index ffc631ef73..0000000000 --- a/internal/sockjs/handler_sockjs.go +++ /dev/null @@ -1,278 +0,0 @@ -package sockjs - -import ( - "net/http" - "sync" - "time" - - "github.com/centrifugal/centrifuge" - - "github.com/centrifugal/protocol" - "github.com/gorilla/websocket" - "github.com/igm/sockjs-go/v3/sockjs" -) - -// Config represents config for SockJS handler. -type Config struct { - // HandlerPrefix sets prefix for SockJS handler endpoint path. - HandlerPrefix string - - // URL is an address to SockJS client javascript library. Required for iframe-based - // transports to work. This URL should lead to the same SockJS client version as used - // for connecting on the client side. - URL string - - // CheckOrigin allows deciding whether to use CORS or not in XHR case. - // When false returned then CORS headers won't be set. - CheckOrigin func(*http.Request) bool - - // WebsocketCheckOrigin allows setting custom CheckOrigin func for underlying - // Gorilla Websocket based websocket.Upgrader. - WebsocketCheckOrigin func(*http.Request) bool - - // WebsocketReadBufferSize is a parameter that is used for raw websocket.Upgrader. - // If set to zero reasonable default value will be used. - WebsocketReadBufferSize int - - // WebsocketWriteBufferSize is a parameter that is used for raw websocket.Upgrader. - // If set to zero reasonable default value will be used. - WebsocketWriteBufferSize int - - // WebsocketUseWriteBufferPool enables using buffer pool for writes in Websocket transport. - WebsocketUseWriteBufferPool bool - - // WebsocketWriteTimeout is maximum time of write message operation. - // Slow client will be disconnected. - // By default, 1 * time.Second will be used. - WebsocketWriteTimeout time.Duration - - centrifuge.PingPongConfig -} - -// Handler accepts SockJS connections. SockJS has a bunch of fallback -// transports when WebSocket connection is not supported. It comes with additional -// costs though: small protocol framing overhead, lack of binary support, more -// goroutines per connection, and you need to use sticky session mechanism on -// your load balancer in case you are using HTTP-based SockJS fallbacks and have -// more than one Centrifuge Node on a backend (so SockJS to be able to emulate -// bidirectional protocol). So if you can afford it - use WebsocketHandler only. -type Handler struct { - node *centrifuge.Node - config Config - handler http.Handler -} - -var writeBufferPool = &sync.Pool{} - -// NewHandler creates new Handler. -func NewHandler(node *centrifuge.Node, config Config) *Handler { - options := sockjs.DefaultOptions - - wsUpgrader := &websocket.Upgrader{ - ReadBufferSize: config.WebsocketReadBufferSize, - WriteBufferSize: config.WebsocketWriteBufferSize, - Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {}, - } - wsUpgrader.CheckOrigin = config.WebsocketCheckOrigin - if config.WebsocketUseWriteBufferPool { - wsUpgrader.WriteBufferPool = writeBufferPool - } else { - wsUpgrader.WriteBufferSize = config.WebsocketWriteBufferSize - } - options.WebsocketUpgrader = wsUpgrader - - // Override sockjs url. It's important to use the same SockJS - // library version on client and server sides when using iframe - // based SockJS transports, otherwise SockJS will raise error - // about version mismatch. - options.SockJSURL = config.URL - options.CheckOrigin = config.CheckOrigin - - wsWriteTimeout := config.WebsocketWriteTimeout - if wsWriteTimeout == 0 { - wsWriteTimeout = 1 * time.Second - } - options.WebsocketWriteTimeout = wsWriteTimeout - - s := &Handler{ - node: node, - config: config, - } - - options.HeartbeatDelay = 0 - s.handler = sockjs.NewHandler(config.HandlerPrefix, options, s.sockJSHandler) - return s -} - -func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { - s.handler.ServeHTTP(rw, r) -} - -// sockJSHandler called when new client connection comes to SockJS endpoint. -func (s *Handler) sockJSHandler(sess sockjs.Session) { - s.handleSession(sess) -} - -// sockJSHandler called when new client connection comes to SockJS endpoint. -func (s *Handler) handleSession(sess sockjs.Session) { - // Separate goroutine for better GC of caller's data. - go func() { - transport := newSockjsTransport(sess, sockjsTransportOptions{ - pingPong: s.config.PingPongConfig, - }) - - select { - case <-s.node.NotifyShutdown(): - _ = transport.Close(centrifuge.DisconnectShutdown) - return - default: - } - - ctxCh := make(chan struct{}) - defer close(ctxCh) - c, closeFn, err := centrifuge.NewClient(NewCancelContext(sess.Request().Context(), ctxCh), s.node, transport) - if err != nil { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error creating client", map[string]any{"transport": transportSockJS})) - return - } - defer func() { _ = closeFn() }() - - if s.node.LogEnabled(centrifuge.LogLevelDebug) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"client": c.ID(), "transport": transportSockJS})) - defer func(started time.Time) { - s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"client": c.ID(), "transport": transportSockJS, "duration": time.Since(started)})) - }(time.Now()) - } - - var needWaitLoop bool - - for { - if msg, err := sess.Recv(); err == nil { - reader := GetStringReader(msg) - if ok := centrifuge.HandleReadFrame(c, reader); !ok { - PutStringReader(reader) - needWaitLoop = true - break - } - PutStringReader(reader) - continue - } - break - } - - if needWaitLoop { - // One extra loop till we get an error from session, - // this is required to wait until close frame will be sent - // into connection inside Client implementation and transport - // closed with proper disconnect reason. - for { - if _, err := sess.Recv(); err != nil { - break - } - } - } - }() -} - -const ( - transportSockJS = "sockjs" -) - -type sockjsTransportOptions struct { - pingPong centrifuge.PingPongConfig -} - -type sockjsTransport struct { - mu sync.RWMutex - closeCh chan struct{} - session sockjs.Session - opts sockjsTransportOptions - closed bool -} - -func newSockjsTransport(s sockjs.Session, opts sockjsTransportOptions) *sockjsTransport { - t := &sockjsTransport{ - session: s, - closeCh: make(chan struct{}), - opts: opts, - } - return t -} - -// Name returns name of transport. -func (t *sockjsTransport) Name() string { - return transportSockJS -} - -// Protocol returns transport protocol. -func (t *sockjsTransport) Protocol() centrifuge.ProtocolType { - return centrifuge.ProtocolTypeJSON -} - -// ProtocolVersion returns transport ProtocolVersion. -func (t *sockjsTransport) ProtocolVersion() centrifuge.ProtocolVersion { - return centrifuge.ProtocolVersion2 -} - -// Unidirectional returns whether transport is unidirectional. -func (t *sockjsTransport) Unidirectional() bool { - return false -} - -// Emulation ... -func (t *sockjsTransport) Emulation() bool { - return false -} - -// DisabledPushFlags ... -func (t *sockjsTransport) DisabledPushFlags() uint64 { - // SockJS has its own close frames to mimic WebSocket Close frames, - // so we don't need to send Disconnect pushes. - return centrifuge.PushFlagDisconnect -} - -// PingPongConfig ... -func (t *sockjsTransport) PingPongConfig() centrifuge.PingPongConfig { - return t.opts.pingPong -} - -// Write data to transport. -func (t *sockjsTransport) Write(message []byte) error { - select { - case <-t.closeCh: - return nil - default: - // No need to use protocol encoders here since - // SockJS only supports JSON. - return t.session.Send(string(message)) - } -} - -// WriteMany messages to transport. -func (t *sockjsTransport) WriteMany(messages ...[]byte) error { - select { - case <-t.closeCh: - return nil - default: - encoder := protocol.GetDataEncoder(protocol.Type(centrifuge.ProtocolTypeJSON)) - defer protocol.PutDataEncoder(protocol.Type(centrifuge.ProtocolTypeJSON), encoder) - for i := range messages { - _ = encoder.Encode(messages[i]) - } - return t.session.Send(string(encoder.Finish())) - } -} - -// Close closes transport. -func (t *sockjsTransport) Close(disconnect centrifuge.Disconnect) error { - t.mu.Lock() - if t.closed { - // Already closed, noop. - t.mu.Unlock() - return nil - } - t.closed = true - close(t.closeCh) - t.mu.Unlock() - return t.session.Close(disconnect.Code, disconnect.Reason) -} diff --git a/internal/sockjs/handler_sockjs_test.go b/internal/sockjs/handler_sockjs_test.go deleted file mode 100644 index dd5ff30726..0000000000 --- a/internal/sockjs/handler_sockjs_test.go +++ /dev/null @@ -1,280 +0,0 @@ -package sockjs - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "strings" - "testing" - "time" - - "github.com/centrifugal/centrifuge" - - "github.com/centrifugal/protocol" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/require" -) - -func sockjsData(data []byte) []byte { - quoted, _ := json.Marshal(string(data)) - return []byte(fmt.Sprintf("[%s]", string(quoted))) -} - -func TestSockjsHandler(t *testing.T) { - n, _ := centrifuge.New(centrifuge.Config{}) - require.NoError(t, n.Run()) - defer func() { _ = n.Shutdown(context.Background()) }() - mux := http.NewServeMux() - - n.OnConnecting(func(ctx context.Context, event centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { - require.Equal(t, transportSockJS, event.Transport.Name()) - require.Equal(t, centrifuge.ProtocolTypeJSON, event.Transport.Protocol()) - return centrifuge.ConnectReply{ - Credentials: ¢rifuge.Credentials{UserID: "user"}, - Data: []byte(`{"SockJS connect response": 1}`), - }, nil - }) - - doneCh := make(chan struct{}) - - n.OnConnect(func(client *centrifuge.Client) { - err := client.Send([]byte(`{"SockJS write": 1}`)) - require.NoError(t, err) - client.Disconnect(centrifuge.DisconnectForceReconnect) - }) - - mux.Handle("/connection/sockjs/", NewHandler(n, Config{ - HandlerPrefix: "/connection/sockjs", - })) - server := httptest.NewServer(mux) - defer server.Close() - - url := "ws" + server.URL[4:] - - conn, resp, err := websocket.DefaultDialer.Dial(url+"/connection/sockjs/220/fi0988475/websocket", nil) - require.NoError(t, err) - defer func() { _ = resp.Body.Close() }() - require.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) - require.NotNil(t, conn) - defer func() { _ = conn.Close() }() - _, p, err := conn.ReadMessage() - require.NoError(t, err) - require.Equal(t, "o", string(p)) // open frame of SockJS protocol. - - connectRequest := &protocol.ConnectRequest{ - Token: "boom", - } - cmd := &protocol.Command{ - Id: 1, - Connect: connectRequest, - } - cmdBytes, _ := json.Marshal(cmd) - err = conn.WriteMessage(websocket.TextMessage, sockjsData(cmdBytes)) - require.NoError(t, err) - - go func() { - pos := 0 - contentExpected := []string{ - "SockJS connect response", - "SockJS write", - "force reconnect", - } - - loop: - for { - _, p, err = conn.ReadMessage() - if err != nil { - break loop - } - - for { - if strings.Contains(string(p), contentExpected[pos]) { - pos++ - if pos >= len(contentExpected) { - close(doneCh) - break loop - } - } else { - break - } - } - } - }() - - waitWithTimeout(t, doneCh) -} - -func waitWithTimeout(t *testing.T, ch chan struct{}) { - t.Helper() - select { - case <-ch: - case <-time.After(3 * time.Second): - require.Fail(t, "timeout") - } -} - -func defaultNodeNoHandlers() *centrifuge.Node { - n, err := centrifuge.New(centrifuge.Config{ - LogLevel: centrifuge.LogLevelTrace, - LogHandler: func(entry centrifuge.LogEntry) {}, - }) - if err != nil { - panic(err) - } - err = n.Run() - if err != nil { - panic(err) - } - return n -} - -func TestSockjsTransportWrite(t *testing.T) { - node := defaultNodeNoHandlers() - defer func() { _ = node.Shutdown(context.Background()) }() - - node.OnConnecting(func(ctx context.Context, event centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { - require.Equal(t, event.Transport.Protocol(), centrifuge.ProtocolTypeJSON) - transport := event.Transport.(centrifuge.Transport) - // Write to transport directly - this is only valid for tests, in normal situation - // we write over client methods. - require.NoError(t, transport.Write([]byte("hello"))) - return centrifuge.ConnectReply{}, centrifuge.DisconnectForceNoReconnect - }) - - mux := http.NewServeMux() - mux.Handle("/connection/sockjs/", NewHandler(node, Config{ - HandlerPrefix: "/connection/sockjs", - })) - server := httptest.NewServer(mux) - defer server.Close() - - url := "ws" + server.URL[4:] - - conn, resp, err := websocket.DefaultDialer.Dial(url+"/connection/sockjs/220/fi0988475/websocket", nil) - require.NoError(t, err) - defer func() { _ = resp.Body.Close() }() - require.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) - require.NotNil(t, conn) - defer func() { _ = conn.Close() }() - _, p, err := conn.ReadMessage() - require.NoError(t, err) - require.Equal(t, "o", string(p)) // open frame of SockJS protocol. - - connectRequest := &protocol.ConnectRequest{ - Token: "boom", - } - cmd := &protocol.Command{ - Id: 1, - Connect: connectRequest, - } - cmdBytes, _ := json.Marshal(cmd) - err = conn.WriteMessage(websocket.TextMessage, sockjsData(cmdBytes)) - require.NoError(t, err) - - _, p, err = conn.ReadMessage() - require.NoError(t, err) - require.Equal(t, "a[\"hello\"]", string(p)) -} - -func TestSockjsTransportWriteMany(t *testing.T) { - node := defaultNodeNoHandlers() - defer func() { _ = node.Shutdown(context.Background()) }() - - node.OnConnecting(func(ctx context.Context, event centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { - require.Equal(t, event.Transport.Protocol(), centrifuge.ProtocolTypeJSON) - transport := event.Transport.(centrifuge.Transport) - // Write to transport directly - this is only valid for tests, in normal situation - // we write over client methods. - require.NoError(t, transport.WriteMany([]byte("1"), []byte("22"))) - return centrifuge.ConnectReply{}, centrifuge.DisconnectForceNoReconnect - }) - - mux := http.NewServeMux() - mux.Handle("/connection/sockjs/", NewHandler(node, Config{ - HandlerPrefix: "/connection/sockjs", - })) - server := httptest.NewServer(mux) - defer server.Close() - - url := "ws" + server.URL[4:] - - conn, resp, err := websocket.DefaultDialer.Dial(url+"/connection/sockjs/220/fi0988475/websocket", nil) - require.NoError(t, err) - defer func() { _ = resp.Body.Close() }() - require.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) - require.NotNil(t, conn) - defer func() { _ = conn.Close() }() - _, p, err := conn.ReadMessage() - require.NoError(t, err) - require.Equal(t, "o", string(p)) // open frame of SockJS protocol. - - connectRequest := &protocol.ConnectRequest{ - Token: "boom", - } - cmd := &protocol.Command{ - Id: 1, - Connect: connectRequest, - } - cmdBytes, _ := json.Marshal(cmd) - err = conn.WriteMessage(websocket.TextMessage, sockjsData(cmdBytes)) - require.NoError(t, err) - - _, p, err = conn.ReadMessage() - require.NoError(t, err) - require.Equal(t, "a[\"1\\n22\"]", string(p)) -} - -func TestSockjsHandlerURLParams(t *testing.T) { - n, _ := centrifuge.New(centrifuge.Config{}) - require.NoError(t, n.Run()) - defer func() { _ = n.Shutdown(context.Background()) }() - mux := http.NewServeMux() - - n.OnConnecting(func(ctx context.Context, event centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { - return centrifuge.ConnectReply{ - Credentials: ¢rifuge.Credentials{UserID: "user"}, - }, nil - }) - - doneCh := make(chan struct{}) - - n.OnConnect(func(client *centrifuge.Client) { - require.Equal(t, transportSockJS, client.Transport().Name()) - require.Equal(t, centrifuge.ProtocolTypeJSON, client.Transport().Protocol()) - require.Equal(t, centrifuge.ProtocolVersion2, client.Transport().ProtocolVersion()) - close(doneCh) - }) - - mux.Handle("/connection/sockjs/", NewHandler(n, Config{ - HandlerPrefix: "/connection/sockjs", - })) - server := httptest.NewServer(mux) - defer server.Close() - - url := "ws" + server.URL[4:] - - conn, resp, err := websocket.DefaultDialer.Dial(url+"/connection/sockjs/220/fi0988475/websocket?cf_protocol_version=v2", nil) - require.NoError(t, err) - defer func() { _ = resp.Body.Close() }() - require.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) - require.NotNil(t, conn) - defer func() { _ = conn.Close() }() - _, p, err := conn.ReadMessage() - require.NoError(t, err) - require.Equal(t, "o", string(p)) // open frame of SockJS protocol. - - connectRequest := &protocol.ConnectRequest{ - Token: "boom", - } - cmd := &protocol.Command{ - Id: 1, - Connect: connectRequest, - } - cmdBytes, _ := json.Marshal(cmd) - err = conn.WriteMessage(websocket.TextMessage, sockjsData(cmdBytes)) - require.NoError(t, err) - - waitWithTimeout(t, doneCh) -} diff --git a/internal/sockjs/pool.go b/internal/sockjs/pool.go deleted file mode 100644 index 0ce2b6a00a..0000000000 --- a/internal/sockjs/pool.go +++ /dev/null @@ -1,45 +0,0 @@ -package sockjs - -import ( - "bytes" - "strings" - "sync" -) - -var stringReaderPool sync.Pool - -// GetStringReader from pool. -func GetStringReader(data string) *strings.Reader { - r := bytesReaderPool.Get() - if r == nil { - return strings.NewReader(data) - } - reader := r.(*strings.Reader) - reader.Reset(data) - return reader -} - -// PutStringReader to pool. -func PutStringReader(reader *strings.Reader) { - reader.Reset("") - stringReaderPool.Put(reader) -} - -var bytesReaderPool sync.Pool - -// GetBytesReader from pool. -func GetBytesReader(data []byte) *bytes.Reader { - r := bytesReaderPool.Get() - if r == nil { - return bytes.NewReader(data) - } - reader := r.(*bytes.Reader) - reader.Reset(data) - return reader -} - -// PutBytesReader to pool. -func PutBytesReader(reader *bytes.Reader) { - reader.Reset(nil) - bytesReaderPool.Put(reader) -} diff --git a/internal/sockjs/pool_test.go b/internal/sockjs/pool_test.go deleted file mode 100644 index 7055a694de..0000000000 --- a/internal/sockjs/pool_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package sockjs - -import ( - "io" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestStringReaderPool(t *testing.T) { - r := GetStringReader("string1") - d, err := io.ReadAll(r) - require.NoError(t, err) - require.Equal(t, []byte("string1"), d) - PutStringReader(r) - r = GetStringReader("string2") - defer PutStringReader(r) - d, err = io.ReadAll(r) - require.NoError(t, err) - require.Equal(t, []byte("string2"), d) -} - -func TestBytesReaderPool(t *testing.T) { - r := GetBytesReader([]byte("bytes1")) - d, err := io.ReadAll(r) - require.NoError(t, err) - require.Equal(t, []byte("bytes1"), d) - PutBytesReader(r) - r = GetBytesReader([]byte("bytes2")) - defer PutBytesReader(r) - d, err = io.ReadAll(r) - require.NoError(t, err) - require.Equal(t, []byte("bytes2"), d) -} diff --git a/internal/tntengine/broker.go b/internal/tntengine/broker.go deleted file mode 100644 index 511b81c6d4..0000000000 --- a/internal/tntengine/broker.go +++ /dev/null @@ -1,839 +0,0 @@ -package tntengine - -import ( - "context" - "errors" - "fmt" - "runtime" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/centrifugal/centrifugo/v5/internal/tools" - - "github.com/FZambia/tarantool" - "github.com/centrifugal/centrifuge" - "github.com/centrifugal/protocol" - "github.com/google/uuid" - "github.com/vmihailenco/msgpack/v5" -) - -const internalChannelPrefix = "__" - -const ( - // tarantoolControlChannel is a name for control channel. - tarantoolControlChannel = internalChannelPrefix + "control" - // tarantoolNodeChannelPrefix is a prefix for node channel. - tarantoolNodeChannelPrefix = internalChannelPrefix + "node." -) - -// Broker uses Tarantool to implement centrifuge.Broker functionality. -type Broker struct { - controlRound uint64 // Keep atomic on struct top for 32-bit architectures. - node *centrifuge.Node - sharding bool - config BrokerConfig - shards []*Shard - nodeChannel string -} - -var _ centrifuge.Broker = (*Broker)(nil) - -// BrokerConfig is a config for Tarantool Broker. -type BrokerConfig struct { - // UsePolling allows to turn on polling mode instead of push. - UsePolling bool - - // Shards is a list of Tarantool instances to shard data by channel. - Shards []*Shard -} - -// NewBroker initializes Tarantool Broker. -func NewBroker(n *centrifuge.Node, config BrokerConfig) (*Broker, error) { - if len(config.Shards) == 0 { - return nil, errors.New("no Tarantool shards provided in configuration") - } - if len(config.Shards) > 1 { - n.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Tarantool sharding enabled: %d shards", len(config.Shards)))) - } - e := &Broker{ - node: n, - shards: config.Shards, - config: config, - sharding: len(config.Shards) > 1, - nodeChannel: nodeChannel(n.ID()), - } - return e, nil -} - -// Run runs broker after node initialized. -func (b *Broker) Run(h centrifuge.BrokerEventHandler) error { - for _, shard := range b.shards { - if err := b.runShard(shard, h); err != nil { - return err - } - } - return nil -} - -func (b *Broker) runForever(fn func(), minDelay time.Duration) { - for { - started := time.Now() - fn() - elapsed := time.Since(started) - if elapsed < minDelay { - // Sleep for a while to prevent busy loop when reconnecting. - // If elapsed >= minDelay then fn will be restarted right away – this is - // intentional for fast reconnect in case of one random error. - time.Sleep(minDelay - elapsed) - } - } -} - -const pubSubRoutineMinDelay = 300 * time.Millisecond - -// Run Tarantool shard. -func (b *Broker) runShard(s *Shard, h centrifuge.BrokerEventHandler) error { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("start Tarantool shard: %v", tools.GetLogAddresses(s.config.Addresses)))) - go b.runForever(func() { - b.runPubSub(s, h) - }, pubSubRoutineMinDelay) - go b.runForever(func() { - b.runControlPubSub(s, h) - }, pubSubRoutineMinDelay) - return nil -} - -type pubRequest struct { - MsgType string - Channel string - Data string - HistoryTTL int - HistorySize int - HistoryMetaTTL int -} - -type pubResponse struct { - Offset uint64 - Epoch string -} - -func (m *pubResponse) DecodeMsgpack(d *msgpack.Decoder) error { - var err error - var l int - if l, err = d.DecodeArrayLen(); err != nil { - return err - } - if l != 2 { - return fmt.Errorf("malformed array len: %d", l) - } - if m.Offset, err = d.DecodeUint64(); err != nil { - return err - } - if m.Epoch, err = d.DecodeString(); err != nil { - return err - } - return nil -} - -// Publish - see centrifuge.Broker interface description. -func (b *Broker) Publish(ch string, data []byte, opts centrifuge.PublishOptions) (centrifuge.StreamPosition, bool, error) { - s := consistentShard(ch, b.shards) - - protoPub := &protocol.Publication{ - Data: data, - Info: infoToProto(opts.ClientInfo), - Tags: opts.Tags, - Delta: opts.UseDelta, // Will be cleaned up before passing to Node. - Time: time.Now().UnixMilli(), - } - byteMessage, err := protoPub.MarshalVT() - if err != nil { - return centrifuge.StreamPosition{}, false, err - } - - historyMetaTTL := opts.HistoryMetaTTL - if historyMetaTTL == 0 { - historyMetaTTL = b.node.Config().HistoryMetaTTL - } - historyMetaTTLSeconds := int(historyMetaTTL.Seconds()) - - pr := &pubRequest{ - MsgType: "p", - Channel: ch, - Data: string(byteMessage), - HistoryTTL: int(opts.HistoryTTL.Seconds()), - HistorySize: opts.HistorySize, - HistoryMetaTTL: historyMetaTTLSeconds, - } - var resp pubResponse - err = s.ExecTyped(tarantool.Call("centrifuge.publish", pr), &resp) - if err != nil { - return centrifuge.StreamPosition{}, false, err - } - return centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch}, false, err -} - -// PublishJoin - see centrifuge.Broker interface description. -func (b *Broker) PublishJoin(ch string, info *centrifuge.ClientInfo) error { - s := consistentShard(ch, b.shards) - pr := pubRequest{ - MsgType: "j", - Channel: ch, - Data: b.clientInfoString(info), - } - _, err := s.Exec(tarantool.Call("centrifuge.publish", pr)) - return err -} - -// PublishLeave - see centrifuge.Broker interface description. -func (b *Broker) PublishLeave(ch string, info *centrifuge.ClientInfo) error { - s := consistentShard(ch, b.shards) - pr := pubRequest{ - MsgType: "l", - Channel: ch, - Data: b.clientInfoString(info), - } - _, err := s.Exec(tarantool.Call("centrifuge.publish", pr)) - return err -} - -func (b *Broker) clientInfoString(clientInfo *centrifuge.ClientInfo) string { - var info string - if clientInfo != nil { - byteMessage, err := infoToProto(clientInfo).MarshalVT() - if err != nil { - return info - } - info = string(byteMessage) - } - return info -} - -// PublishControl - see centrifuge.Broker interface description. -func (b *Broker) PublishControl(data []byte, nodeID, _ string) error { - currentRound := atomic.AddUint64(&b.controlRound, 1) - index := currentRound % uint64(len(b.shards)) - var channel string - if nodeID != "" { - channel = nodeChannel(nodeID) - } else { - channel = b.controlChannel() - } - pr := pubRequest{ - MsgType: "c", - Channel: channel, - Data: string(data), - } - _, err := b.shards[index].Exec(tarantool.Call("centrifuge.publish", pr)) - return err -} - -func (b *Broker) controlChannel() string { - return tarantoolControlChannel -} - -func nodeChannel(nodeID string) string { - return tarantoolNodeChannelPrefix + nodeID -} - -// Subscribe - see centrifuge.Broker interface description. -func (b *Broker) Subscribe(ch string) error { - if strings.HasPrefix(ch, internalChannelPrefix) { - return centrifuge.ErrorBadRequest - } - if b.node.LogEnabled(centrifuge.LogLevelDebug) { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "subscribe node on channel", map[string]any{"channel": ch})) - } - r := newSubRequest([]string{ch}, true) - s := b.shards[consistentIndex(ch, len(b.shards))] - return b.sendSubscribe(s, r) -} - -// Unsubscribe - see centrifuge.Broker interface description. -func (b *Broker) Unsubscribe(ch string) error { - if b.node.LogEnabled(centrifuge.LogLevelDebug) { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "unsubscribe node from channel", map[string]any{"channel": ch})) - } - r := newSubRequest([]string{ch}, false) - s := b.shards[consistentIndex(ch, len(b.shards))] - return b.sendSubscribe(s, r) -} - -var errOpTimeout = errors.New("operation timed out") - -func (b *Broker) sendSubscribe(shard *Shard, r subRequest) error { - select { - case shard.subCh <- r: - default: - timer := AcquireTimer(defaultRequestTimeout) - defer ReleaseTimer(timer) - select { - case shard.subCh <- r: - case <-timer.C: - return errOpTimeout - } - } - return r.result() -} - -type historyRequest struct { - Channel string - Offset uint64 - Limit int - Reverse bool - IncludePubs bool - HistoryMetaTTL int -} - -type historyResponse struct { - Offset uint64 - Epoch string - Pubs []*centrifuge.Publication -} - -func (m *historyResponse) DecodeMsgpack(d *msgpack.Decoder) error { - var err error - var l int - if l, err = d.DecodeArrayLen(); err != nil { - return err - } - if l != 3 { - return fmt.Errorf("malformed array len: %d", l) - } - if m.Offset, err = d.DecodeUint64(); err != nil { - return err - } - if m.Epoch, err = d.DecodeString(); err != nil { - return err - } - if l, err = d.DecodeArrayLen(); err != nil { - return err - } - if l == -1 { - return nil - } - - pubs := make([]*centrifuge.Publication, 0, l) - - for i := 0; i < l; i++ { - var l int - if l, err = d.DecodeArrayLen(); err != nil { - return err - } - if l != 5 { - return fmt.Errorf("malformed array len: %d", l) - } - if _, err = d.DecodeUint64(); err != nil { - return err - } - if _, err = d.DecodeString(); err != nil { - return err - } - offset, err := d.DecodeUint64() - if err != nil { - return err - } - if _, err = d.DecodeFloat64(); err != nil { - return err - } - data, err := d.DecodeString() - if err != nil { - return err - } - var p protocol.Publication - if err = p.UnmarshalVT([]byte(data)); err != nil { - return err - } - pub := pubFromProto(&p) - pub.Offset = offset - pubs = append(pubs, pub) - } - m.Pubs = pubs - return nil -} - -func pubFromProto(pub *protocol.Publication) *centrifuge.Publication { - if pub == nil { - return nil - } - return ¢rifuge.Publication{ - Offset: pub.GetOffset(), - Data: pub.Data, - Info: infoFromProto(pub.GetInfo()), - Tags: pub.GetTags(), - } -} - -// History - see centrifuge.Broker interface description. -func (b *Broker) History(ch string, opts centrifuge.HistoryOptions) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) { - filter := opts.Filter - var includePubs = true - var offset uint64 - if filter.Since != nil { - if filter.Reverse { - offset = filter.Since.Offset - 1 - if offset == 0 { - includePubs = false - } - } else { - offset = filter.Since.Offset + 1 - } - } - var limit int - if filter.Limit == 0 { - includePubs = false - } - if filter.Limit > 0 { - limit = filter.Limit - } - - historyMetaTTL := opts.MetaTTL - if historyMetaTTL == 0 { - historyMetaTTL = b.node.Config().HistoryMetaTTL - } - historyMetaTTLSeconds := int(historyMetaTTL.Seconds()) - - s := consistentShard(ch, b.shards) - req := historyRequest{ - Channel: ch, - Offset: offset, - Limit: limit, - Reverse: filter.Reverse, - IncludePubs: includePubs, - HistoryMetaTTL: historyMetaTTLSeconds, - } - var resp historyResponse - err := s.ExecTyped(tarantool.Call("centrifuge.history", req), &resp) - if err != nil { - return nil, centrifuge.StreamPosition{}, err - } - streamPosition := centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch} - return resp.Pubs, streamPosition, nil -} - -type removeHistoryRequest struct { - Channel string -} - -// RemoveHistory - see centrifuge.Broker interface description. -func (b *Broker) RemoveHistory(ch string) error { - s := consistentShard(ch, b.shards) - _, err := s.Exec(tarantool.Call("centrifuge.remove_history", removeHistoryRequest{Channel: ch})) - return err -} - -const ( - // tarantoolPubSubWorkerChannelSize sets buffer size of channel to which we send all - // messages received from Tarantool PUB/SUB connection to process in separate goroutine. - tarantoolPubSubWorkerChannelSize = 512 - // tarantoolSubscribeBatchLimit is a maximum number of channels to include in a single - // batch subscribe call. - tarantoolSubscribeBatchLimit = 512 -) - -func (b *Broker) getShard(channel string) *Shard { - if !b.sharding { - return b.shards[0] - } - return b.shards[consistentIndex(channel, len(b.shards))] -} - -type pollRequest struct { - ConnID string - UsePolling bool - Timeout int -} - -type subscribeRequest struct { - ConnID string - Channels []string -} - -type pubSubMessage struct { - Type string - Channel string - Offset uint64 - Epoch string - Data []byte -} - -func (m *pubSubMessage) DecodeMsgpack(d *msgpack.Decoder) error { - var err error - var l int - if l, err = d.DecodeArrayLen(); err != nil { - return err - } - if l != 5 { - return fmt.Errorf("wrong array len: %d", l) - } - if m.Type, err = d.DecodeString(); err != nil { - return err - } - if m.Channel, err = d.DecodeString(); err != nil { - return err - } - if m.Offset, err = d.DecodeUint64(); err != nil { - return err - } - if m.Epoch, err = d.DecodeString(); err != nil { - return err - } - if data, err := d.DecodeString(); err != nil { - return err - } else { - m.Data = []byte(data) - } - return nil -} - -func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) { - logError := func(errString string) { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart pub/sub", map[string]any{"error": errString})) - } - - u, err := uuid.NewRandom() - if err != nil { - logError(err.Error()) - return - } - connID := u.String() - - conn, cancel, err := s.pubSubConn() - if err != nil { - logError(err.Error()) - return - } - defer cancel() - defer func() { _ = conn.Close() }() - - // Register poller with unique ID. - _, err = conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0})) - if err != nil { - logError(err.Error()) - return - } - - numWorkers := runtime.NumCPU() - - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool PUB/SUB, num workers: %d", numWorkers))) - defer func() { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool PUB/SUB")) - }() - - done := make(chan struct{}) - var doneOnce sync.Once - closeDoneOnce := func() { - doneOnce.Do(func() { - close(done) - _ = conn.Close() - }) - } - defer closeDoneOnce() - - // Run subscriber goroutine. - go func(conn *tarantool.Connection) { - for { - select { - case <-done: - return - case r := <-s.subCh: - isSubscribe := r.subscribe - channelBatch := []subRequest{r} - - chIDs := r.channels - - var otherR *subRequest - - loop: - for len(chIDs) < tarantoolSubscribeBatchLimit { - select { - case r := <-s.subCh: - if r.subscribe != isSubscribe { - // We can not mix subscribe and unsubscribe request into one batch - // so must stop here. As we consumed a subRequest value from channel - // we should take care of it later. - otherR = &r - break loop - } - channelBatch = append(channelBatch, r) - chIDs = append(chIDs, r.channels...) - default: - break loop - } - } - - var opErr error - if isSubscribe { - _, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) - opErr = err - } else { - _, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) - opErr = err - } - - if opErr != nil { - for _, r := range channelBatch { - r.done(opErr) - } - if otherR != nil { - otherR.done(opErr) - } - // Close conn, this should cause Receive to return with err below - // and whole runPubSub method to restart. - closeDoneOnce() - return - } - for _, r := range channelBatch { - r.done(nil) - } - if otherR != nil { - chIDs := otherR.channels - var opErr error - if otherR.subscribe { - _, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) - opErr = err - } else { - _, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) - opErr = err - } - if opErr != nil { - otherR.done(opErr) - // Close conn, this should cause Receive to return with err below - // and whole runPubSub method to restart. - closeDoneOnce() - return - } - otherR.done(nil) - } - } - } - }(conn) - - // Run workers to spread received message processing work over worker goroutines. - workers := make(map[int]chan pubSubMessage) - for i := 0; i < numWorkers; i++ { - workerCh := make(chan pubSubMessage, tarantoolPubSubWorkerChannelSize) - workers[i] = workerCh - go func(ch chan pubSubMessage) { - for { - select { - case <-done: - return - case n := <-ch: - err := b.handleMessage(eventHandler, n) - if err != nil { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling client message", map[string]any{"error": err.Error()})) - continue - } - } - } - }(workerCh) - } - - go func() { - var chIDs []string - - channels := b.node.Hub().Channels() - for i := 0; i < len(channels); i++ { - if b.getShard(channels[i]) == s { - chIDs = append(chIDs, channels[i]) - } - } - - batch := make([]string, 0) - - for i, ch := range chIDs { - if len(batch) > 0 && i%tarantoolSubscribeBatchLimit == 0 { - r := newSubRequest(batch, true) - err := b.sendSubscribe(s, r) - if err != nil { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]any{"error": err.Error()})) - closeDoneOnce() - return - } - batch = nil - } - batch = append(batch, ch) - } - if len(batch) > 0 { - r := newSubRequest(batch, true) - err := b.sendSubscribe(s, r) - if err != nil { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]any{"error": err.Error()})) - closeDoneOnce() - return - } - } - }() - - processPubSubMessages := func(messages []pubSubMessage) { - for _, msg := range messages { - // Add message to worker channel preserving message order - i.e. messages - // from the same channel will be processed in the same worker. - workers[index(msg.Channel, numWorkers)] <- msg - } - } - - for { - err := b.waitPubSubMessages(conn, connID, processPubSubMessages) - if err != nil { - logError(err.Error()) - return - } - } -} - -func (b *Broker) waitPubSubMessages(conn *tarantool.Connection, connID string, cb func([]pubSubMessage)) error { - if !b.config.UsePolling { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - _, err := conn.ExecContext(ctx, tarantool.Call( - "centrifuge.get_messages", - pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25}, - ).WithPushTyped(func(decode func(any) error) { - var m [][]pubSubMessage - if err := decode(&m); err != nil { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding push", map[string]any{"error": err.Error()})) - return - } - if len(m) == 1 { - cb(m[0]) - } - })) - if err != nil { - return err - } - } else { - var m [][]pubSubMessage - err := conn.ExecTyped(tarantool.Call( - "centrifuge.get_messages", - pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25}), - &m, - ) - if err != nil { - return err - } - if len(m) == 1 { - cb(m[0]) - } - } - return nil -} - -func (b *Broker) handleMessage(eventHandler centrifuge.BrokerEventHandler, msg pubSubMessage) error { - switch msg.Type { - case "p": - var pub protocol.Publication - err := pub.UnmarshalVT(msg.Data) - if err == nil { - delta := pub.Delta - pub.Delta = false - publication := pubFromProto(&pub) - publication.Offset = msg.Offset - _ = eventHandler.HandlePublication( - msg.Channel, publication, - centrifuge.StreamPosition{Offset: msg.Offset, Epoch: msg.Epoch}, - delta, nil) - } - case "j": - var info protocol.ClientInfo - err := info.UnmarshalVT(msg.Data) - if err == nil { - _ = eventHandler.HandleJoin(msg.Channel, infoFromProto(&info)) - } - case "l": - var info protocol.ClientInfo - err := info.UnmarshalVT(msg.Data) - if err == nil { - _ = eventHandler.HandleLeave(msg.Channel, infoFromProto(&info)) - } - } - return nil -} - -func (b *Broker) runControlPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) { - logError := func(errString string) { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart control pub/sub", map[string]any{"error": errString})) - } - - u, err := uuid.NewRandom() - if err != nil { - logError(err.Error()) - return - } - connID := u.String() - - conn, cancel, err := s.pubSubConn() - if err != nil { - logError(err.Error()) - return - } - defer cancel() - defer func() { _ = conn.Close() }() - - // Register poller with unique ID. - _, err = conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0})) - if err != nil { - logError(err.Error()) - return - } - - numWorkers := runtime.NumCPU() - - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool control PUB/SUB, num workers: %d", numWorkers))) - defer func() { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool control PUB/SUB")) - }() - - done := make(chan struct{}) - var doneOnce sync.Once - closeDoneOnce := func() { - doneOnce.Do(func() { - close(done) - _ = conn.Close() - }) - } - defer closeDoneOnce() - - // Run workers to spread message processing work over worker goroutines. - workCh := make(chan pubSubMessage) - for i := 0; i < numWorkers; i++ { - go func() { - for { - select { - case <-done: - return - case n := <-workCh: - err := eventHandler.HandleControl(n.Data) - if err != nil { - b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling control message", map[string]any{"error": err.Error()})) - continue - } - } - } - }() - } - - controlChannel := b.controlChannel() - _, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: []string{controlChannel, b.nodeChannel}})) - if err != nil { - logError(err.Error()) - return - } - - processPubSubMessages := func(messages []pubSubMessage) { - for _, msg := range messages { - workCh <- msg - } - } - - for { - err := b.waitPubSubMessages(conn, connID, processPubSubMessages) - if err != nil { - logError(err.Error()) - return - } - } -} diff --git a/internal/tntengine/broker_test.go b/internal/tntengine/broker_test.go deleted file mode 100644 index b7bff372f6..0000000000 --- a/internal/tntengine/broker_test.go +++ /dev/null @@ -1,396 +0,0 @@ -//go:build integration - -package tntengine - -import ( - "context" - "log" - "strconv" - "testing" - "time" - - "github.com/centrifugal/centrifuge" - "github.com/centrifugal/protocol" - "github.com/stretchr/testify/require" -) - -func newTestTarantoolEngine(tb testing.TB) (*Broker, *PresenceManager) { - n, _ := centrifuge.New(centrifuge.Config{}) - var shards []*Shard - for _, port := range []string{"3301"} { - shard, err := NewShard(ShardConfig{Addresses: []string{"127.0.0.1:" + port}}) - if err != nil { - log.Fatal(err) - } - shards = append(shards, shard) - } - - broker, err := NewBroker(n, BrokerConfig{ - UsePolling: false, - Shards: shards, - }) - if err != nil { - tb.Fatal(err) - } - - presenceManager, err := NewPresenceManager(n, PresenceManagerConfig{ - Shards: shards, - }) - if err != nil { - tb.Fatal(err) - } - - n.SetBroker(broker) - n.SetPresenceManager(presenceManager) - err = n.Run() - if err != nil { - tb.Fatal(err) - } - return broker, presenceManager -} - -type recoverTest struct { - Name string - HistorySize int - HistoryLifetime int - NumPublications int - SinceOffset uint64 - NumRecovered int - Sleep int - Limit int - Recovered bool -} - -var recoverTests = []recoverTest{ - {"empty_stream", 10, 60, 0, 0, 0, 0, 0, true}, - {"from_position", 10, 60, 10, 8, 2, 0, 0, true}, - {"from_position_limited", 10, 60, 10, 5, 2, 0, 2, false}, - {"from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, false}, - {"from_position_that_already_gone", 10, 60, 20, 8, 10, 0, 0, false}, - {"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false}, - {"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true}, - {"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true}, - {"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false}, - {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true}, -} - -func TestTarantoolClientSubscribeRecover(t *testing.T) { - for _, tt := range recoverTests { - t.Run(tt.Name, func(t *testing.T) { - testTarantoolClientSubscribeRecover(t, tt) - }) - } -} - -func nodeWithTarantoolBroker(tb testing.TB) *centrifuge.Node { - c := centrifuge.Config{} - return nodeWithTarantoolBrokerWithConfig(tb, c) -} - -func nodeWithTarantoolBrokerWithConfig(tb testing.TB, c centrifuge.Config) *centrifuge.Node { - n, err := centrifuge.New(c) - if err != nil { - tb.Fatal(err) - } - e, _ := newTestTarantoolEngine(tb) - n.SetBroker(e) - err = n.Run() - if err != nil { - tb.Fatal(err) - } - return n -} - -func pubToProto(pub *centrifuge.Publication) *protocol.Publication { - if pub == nil { - return nil - } - return &protocol.Publication{ - Offset: pub.Offset, - Data: pub.Data, - Info: infoToProto(pub.Info), - } -} - -func isRecovered(historyResult centrifuge.HistoryResult, cmdOffset uint64, cmdEpoch string) ([]*protocol.Publication, bool) { - latestOffset := historyResult.Offset - latestEpoch := historyResult.Epoch - - recoveredPubs := make([]*protocol.Publication, 0, len(historyResult.Publications)) - for _, pub := range historyResult.Publications { - protoPub := pubToProto(pub) - recoveredPubs = append(recoveredPubs, protoPub) - } - - nextOffset := cmdOffset + 1 - var recovered bool - if len(recoveredPubs) == 0 { - recovered = latestOffset == cmdOffset && latestEpoch == cmdEpoch - } else { - recovered = recoveredPubs[0].Offset == nextOffset && - recoveredPubs[len(recoveredPubs)-1].Offset == latestOffset && - latestEpoch == cmdEpoch - } - - return recoveredPubs, recovered -} - -// recoverHistory recovers publications since StreamPosition last seen by client. -func recoverHistory(node *centrifuge.Node, ch string, since centrifuge.StreamPosition, maxPublicationLimit int) (centrifuge.HistoryResult, error) { - limit := centrifuge.NoLimit - if maxPublicationLimit > 0 { - limit = maxPublicationLimit - } - return node.History(ch, centrifuge.WithLimit(limit), centrifuge.WithSince(&since)) -} - -func testTarantoolClientSubscribeRecover(t *testing.T, tt recoverTest) { - node := nodeWithTarantoolBroker(t) - defer func() { _ = node.Shutdown(context.Background()) }() - - channel := "test_recovery_tarantool_" + tt.Name - - for i := 1; i <= tt.NumPublications; i++ { - _, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), centrifuge.WithHistory(tt.HistorySize, time.Duration(tt.HistoryLifetime)*time.Second)) - require.NoError(t, err) - } - - time.Sleep(time.Duration(tt.Sleep) * time.Second) - - res, err := node.History(channel) - require.NoError(t, err) - streamTop := res.StreamPosition - - historyResult, err := recoverHistory(node, channel, centrifuge.StreamPosition{Offset: tt.SinceOffset, Epoch: streamTop.Epoch}, tt.Limit) - require.NoError(t, err) - recoveredPubs, recovered := isRecovered(historyResult, tt.SinceOffset, streamTop.Epoch) - require.Equal(t, tt.NumRecovered, len(recoveredPubs)) - require.Equal(t, tt.Recovered, recovered) -} - -const historyIterationChannel = "test" - -type historyIterationTest struct { - NumMessages int - IterateBy int -} - -func (it *historyIterationTest) prepareHistoryIteration(t testing.TB, node *centrifuge.Node) centrifuge.StreamPosition { - numMessages := it.NumMessages - - channel := historyIterationChannel - - historyResult, err := node.History(channel) - require.NoError(t, err) - startPosition := historyResult.StreamPosition - - for i := 1; i <= numMessages; i++ { - _, err := node.Publish(channel, []byte(`{}`), centrifuge.WithHistory(numMessages, time.Hour)) - require.NoError(t, err) - } - - historyResult, err = node.History(channel, centrifuge.WithLimit(centrifuge.NoLimit)) - require.NoError(t, err) - require.Equal(t, numMessages, len(historyResult.Publications)) - return startPosition -} - -func (it *historyIterationTest) testHistoryIteration(t testing.TB, node *centrifuge.Node, startPosition centrifuge.StreamPosition) { - var ( - n int - offset = startPosition.Offset - epoch = startPosition.Epoch - iterateBy = it.IterateBy - ) - for { - res, err := node.History( - historyIterationChannel, - centrifuge.WithSince(¢rifuge.StreamPosition{Offset: offset, Epoch: epoch}), - centrifuge.WithLimit(iterateBy), - ) - if err != nil { - t.Fatal(err) - } - offset += uint64(iterateBy) - if len(res.Publications) == 0 { - break - } - n += len(res.Publications) - } - if n != it.NumMessages { - t.Fatal("num messages mismatch") - } -} - -func (it *historyIterationTest) testHistoryIterationReverse(t testing.TB, node *centrifuge.Node, startPosition centrifuge.StreamPosition) { - var ( - n int - offset = startPosition.Offset - epoch = startPosition.Epoch - iterateBy = it.IterateBy - ) - var since *centrifuge.StreamPosition -outer: - for { - res, err := node.History( - historyIterationChannel, - centrifuge.WithSince(since), - centrifuge.WithLimit(iterateBy), - centrifuge.WithReverse(true), - ) - if err != nil { - t.Fatal(err) - } - var checkOffset uint64 - loop: - for _, pub := range res.Publications { - n += 1 - if pub.Offset == startPosition.Offset+1 { - break outer - } - if checkOffset == 0 { - checkOffset = pub.Offset - continue loop - } - if pub.Offset > checkOffset { - t.Fatal("incorrect order") - } - checkOffset = pub.Offset - } - if len(res.Publications) < iterateBy { - break - } - earliestPub := res.Publications[len(res.Publications)-1] - offset = earliestPub.Offset - since = ¢rifuge.StreamPosition{Offset: offset, Epoch: epoch} - } - if n != it.NumMessages { - t.Fatalf("num messages mismatch, expected %d, got %d", it.NumMessages, n) - } -} - -func TestTarantoolBrokerHistoryIteration(t *testing.T) { - e, _ := newTestTarantoolEngine(t) - it := historyIterationTest{100, 10} - startPosition := it.prepareHistoryIteration(t, e.node) - it.testHistoryIteration(t, e.node, startPosition) -} - -func TestTarantoolBrokerHistoryIterationReverse(t *testing.T) { - e, _ := newTestTarantoolEngine(t) - it := historyIterationTest{100, 10} - startPosition := it.prepareHistoryIteration(t, e.node) - it.testHistoryIterationReverse(t, e.node, startPosition) -} - -func BenchmarkTarantoolPublish_1Ch(b *testing.B) { - broker, _ := newTestTarantoolEngine(b) - rawData := []byte(`{"bench": true}`) - b.SetParallelism(128) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, _, err := broker.Publish("channel", rawData, centrifuge.PublishOptions{}) - if err != nil { - b.Fatal(err) - } - } - }) -} - -func BenchmarkTarantoolSubscribe(b *testing.B) { - broker, _ := newTestTarantoolEngine(b) - i := 0 - b.SetParallelism(128) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - i++ - err := broker.Subscribe("subscribe" + strconv.Itoa(i)) - if err != nil { - b.Fatal(err) - } - } - }) -} - -func BenchmarkTarantoolRecover_1Ch(b *testing.B) { - broker, _ := newTestTarantoolEngine(b) - rawData := []byte("{}") - numMessages := 1000 - numMissing := 5 - for i := 1; i <= numMessages; i++ { - _, _, err := broker.Publish("channel", rawData, centrifuge.PublishOptions{HistorySize: numMessages, HistoryTTL: 300 * time.Second}) - require.NoError(b, err) - } - _, sp, err := broker.History("channel", centrifuge.HistoryOptions{}) - require.NoError(b, err) - b.ResetTimer() - b.SetParallelism(128) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - pubs, _, err := broker.History("channel", centrifuge.HistoryOptions{ - Filter: centrifuge.HistoryFilter{ - Limit: -1, - Since: ¢rifuge.StreamPosition{Offset: sp.Offset - uint64(numMissing), Epoch: ""}, - }, - }) - if err != nil { - b.Fatal(err) - } - if len(pubs) != numMissing { - b.Fatalf("len pubs: %d, expected: %d", len(pubs), numMissing) - } - } - }) -} - -func BenchmarkTarantoolPresence_1Ch(b *testing.B) { - _, pm := newTestTarantoolEngine(b) - b.SetParallelism(128) - _ = pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - p, err := pm.Presence("channel") - if err != nil { - b.Fatal(err) - } - if len(p) != 1 { - b.Fatal("wrong presence len") - } - } - }) -} - -func BenchmarkTarantoolAddPresence_1Ch(b *testing.B) { - _, pm := newTestTarantoolEngine(b) - b.SetParallelism(128) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - err := pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) - if err != nil { - b.Fatal(err) - } - } - }) -} - -func BenchmarkTarantoolPresenceStats_1Ch(b *testing.B) { - _, pm := newTestTarantoolEngine(b) - _ = pm.AddPresence("channel", "uid", ¢rifuge.ClientInfo{}) - b.SetParallelism(128) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - p, err := pm.PresenceStats("channel") - if err != nil { - b.Fatal(err) - } - if p.NumClients != 1 { - b.Fatal("wrong presence stats") - } - } - }) -} diff --git a/internal/tntengine/multi_conn.go b/internal/tntengine/multi_conn.go deleted file mode 100644 index 47c27898ef..0000000000 --- a/internal/tntengine/multi_conn.go +++ /dev/null @@ -1,191 +0,0 @@ -package tntengine - -import ( - "context" - "errors" - "sync" - "time" - - "github.com/FZambia/tarantool" -) - -type ConnectionMode string - -const ( - // ConnectionModeSingleInstance means single Tarantool (single leader). - ConnectionModeSingleInstance ConnectionMode = "standalone" - // ConnectionModeLeaderFollower means Tarantool with replica and automatic failover configured. - ConnectionModeLeaderFollower ConnectionMode = "leader_follower" - // ConnectionModeLeaderFollowerRaft means Tarantool with Raft. - ConnectionModeLeaderFollowerRaft ConnectionMode = "leader_follower_raft" -) - -type MultiConnection struct { - opts MultiOpts - leaderMu sync.RWMutex - leaderAddr string - conns map[string]*tarantool.Connection - closeCh chan struct{} - closeOnce sync.Once -} - -type MultiOpts struct { - ConnectionMode ConnectionMode - LeaderCheckInterval time.Duration -} - -func Connect(addrs []string, opts tarantool.Opts, multiOpts MultiOpts) (*MultiConnection, error) { - conns, err := getConns(addrs, opts) - if err != nil { - return nil, err - } - mc := &MultiConnection{ - opts: multiOpts, - conns: conns, - closeCh: make(chan struct{}), - } - leaderFound := mc.checkLeaderOnce() - if !leaderFound { - return nil, ErrNoLeader - } - go mc.checkLeader() - return mc, nil -} - -var ErrNoLeader = errors.New("no leader") - -func (c *MultiConnection) NewLeaderConn(opts tarantool.Opts) (*tarantool.Connection, error) { - c.leaderMu.RLock() - if c.leaderAddr == "" { - c.leaderMu.RUnlock() - return nil, ErrNoLeader - } - leaderAddr := c.leaderAddr - c.leaderMu.RUnlock() - return tarantool.Connect(leaderAddr, opts) -} - -func (c *MultiConnection) LeaderChanged() { - if c.opts.ConnectionMode == ConnectionModeSingleInstance { - return - } - c.leaderMu.Lock() - defer c.leaderMu.Unlock() - c.leaderAddr = "" -} - -func (c *MultiConnection) LeaderConn() (*tarantool.Connection, error) { - c.leaderMu.RLock() - defer c.leaderMu.RUnlock() - if c.leaderAddr != "" { - return c.conns[c.leaderAddr], nil - } - return nil, ErrNoLeader -} - -func getConns(addrs []string, opts tarantool.Opts) (map[string]*tarantool.Connection, error) { - conns := map[string]*tarantool.Connection{} - var wg sync.WaitGroup - var connsMu sync.Mutex - var firstErr error - var numErrors int - wg.Add(len(addrs)) - for _, addr := range addrs { - go func(addr string) { - defer wg.Done() - conn, err := tarantool.Connect(addr, opts) - if err != nil { - connsMu.Lock() - if firstErr == nil { - firstErr = err - } - numErrors++ - connsMu.Unlock() - return - } - connsMu.Lock() - conns[addr] = conn - connsMu.Unlock() - }(addr) - - } - wg.Wait() - if numErrors == len(addrs) { - return nil, firstErr - } - return conns, nil -} - -func (c *MultiConnection) checkLeader() { - if c.opts.ConnectionMode == ConnectionModeSingleInstance { - return - } - checkInterval := c.opts.LeaderCheckInterval - if checkInterval == 0 { - checkInterval = time.Second - } - for { - select { - case <-c.closeCh: - return - case <-time.After(checkInterval): - c.checkLeaderOnce() - } - } -} - -func (c *MultiConnection) IsLeader(conn *tarantool.Connection) (bool, error) { - if c.opts.ConnectionMode == ConnectionModeSingleInstance { - return true, nil - } - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel() - leaderCheck := "return box.info.ro == false" - if c.opts.ConnectionMode == ConnectionModeLeaderFollowerRaft { - leaderCheck = "return box.info.election.state == 'leader'" - } - result, err := conn.ExecContext(ctx, tarantool.Eval(leaderCheck, []any{})) - if err != nil { - return false, err - } - if len(result) < 1 { - return false, errors.New("unexpected leader check result") - } - isLeader, ok := result[0].(bool) - if !ok { - return false, errors.New("malformed leader check result") - } - return isLeader, nil -} - -func (c *MultiConnection) checkLeaderOnce() bool { - for addr, conn := range c.conns { - if len(c.conns) == 1 { - c.leaderMu.Lock() - c.leaderAddr = addr - c.leaderMu.Unlock() - return true - } - isLeader, err := c.IsLeader(conn) - if err != nil { - continue - } - if isLeader { - c.leaderMu.Lock() - c.leaderAddr = addr - c.leaderMu.Unlock() - return true - } - } - return false -} - -func (c *MultiConnection) Close() error { - c.closeOnce.Do(func() { - close(c.closeCh) - for _, conn := range c.conns { - _ = conn.Close() - } - }) - return nil -} diff --git a/internal/tntengine/presence.go b/internal/tntengine/presence.go deleted file mode 100644 index e155db3b1e..0000000000 --- a/internal/tntengine/presence.go +++ /dev/null @@ -1,179 +0,0 @@ -package tntengine - -import ( - "errors" - "fmt" - "time" - - "github.com/FZambia/tarantool" - "github.com/centrifugal/centrifuge" - "github.com/centrifugal/protocol" - "github.com/vmihailenco/msgpack/v5" -) - -// DefaultPresenceTTL is a default value for presence TTL in Tarantool. -const DefaultPresenceTTL = 60 * time.Second - -// PresenceManagerConfig is a config for Tarantool-based PresenceManager. -type PresenceManagerConfig struct { - // PresenceTTL is an interval how long to consider presence info - // valid after receiving presence update. This allows to automatically - // clean up unnecessary presence entries after TTL passed. - PresenceTTL time.Duration - - // Shards is a list of Tarantool instances to shard data by channel. - Shards []*Shard -} - -// NewPresenceManager initializes Tarantool-based centrifuge.PresenceManager. -func NewPresenceManager(n *centrifuge.Node, config PresenceManagerConfig) (*PresenceManager, error) { - if len(config.Shards) == 0 { - return nil, errors.New("no Tarantool shards provided in configuration") - } - if len(config.Shards) > 1 { - n.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Tarantool sharding enabled: %d shards", len(config.Shards)))) - } - e := &PresenceManager{ - node: n, - shards: config.Shards, - config: config, - sharding: len(config.Shards) > 1, - } - return e, nil -} - -var _ centrifuge.PresenceManager = (*PresenceManager)(nil) - -// PresenceManager uses Tarantool to implement centrifuge.PresenceManager functionality. -type PresenceManager struct { - node *centrifuge.Node - sharding bool - config PresenceManagerConfig - shards []*Shard -} - -type presenceRequest struct { - Channel string -} - -func (m *PresenceManager) Presence(ch string) (map[string]*centrifuge.ClientInfo, error) { - s := consistentShard(ch, m.shards) - result, err := s.Exec(tarantool.Call("centrifuge.presence", presenceRequest{Channel: ch})) - if err != nil { - return nil, err - } - if len(result) == 0 { - return nil, errors.New("malformed presence result") - } - presenceInterfaceSlice, ok := result[0].([]any) - if !ok { - return nil, errors.New("malformed presence format: map expected") - } - presence := make(map[string]*centrifuge.ClientInfo, len(presenceInterfaceSlice)) - for _, v := range presenceInterfaceSlice { - presenceRow, ok := v.([]any) - if !ok { - return nil, errors.New("malformed presence format: tuple expected") - } - var ci protocol.ClientInfo - ciData, ok := presenceRow[3].(string) - if !ok { - return nil, errors.New("malformed presence format: data expected") - } - err := ci.UnmarshalVT([]byte(ciData)) - if err != nil { - return nil, fmt.Errorf("unmarshal ClientInfo: %w", err) - } - presence[ci.Client] = ¢rifuge.ClientInfo{ - ClientID: ci.Client, - UserID: ci.User, - ConnInfo: ci.ConnInfo, - ChanInfo: ci.ChanInfo, - } - } - return presence, nil -} - -type presenceStatsRequest struct { - Channel string -} - -type presenceStatsResponse struct { - NumClients uint32 - NumUsers uint32 -} - -func (m *presenceStatsResponse) DecodeMsgpack(d *msgpack.Decoder) error { - var err error - var l int - if l, err = d.DecodeArrayLen(); err != nil { - return err - } - if l != 2 { - return fmt.Errorf("array len doesn't match: %d", l) - } - if m.NumClients, err = d.DecodeUint32(); err != nil { - return err - } - if m.NumUsers, err = d.DecodeUint32(); err != nil { - return err - } - return nil -} - -func (m *PresenceManager) PresenceStats(ch string) (centrifuge.PresenceStats, error) { - s := consistentShard(ch, m.shards) - var resp presenceStatsResponse - err := s.ExecTyped(tarantool.Call("centrifuge.presence_stats", presenceStatsRequest{Channel: ch}), &resp) - if err != nil { - return centrifuge.PresenceStats{}, err - } - return centrifuge.PresenceStats{NumClients: int(resp.NumClients), NumUsers: int(resp.NumUsers)}, err -} - -type addPresenceRequest struct { - Channel string - TTL int - ClientID string - UserID string - Data string -} - -func (m *PresenceManager) AddPresence(ch string, clientID string, info *centrifuge.ClientInfo) error { - s := consistentShard(ch, m.shards) - ttl := DefaultPresenceTTL - if m.config.PresenceTTL > 0 { - ttl = m.config.PresenceTTL - } - _, err := s.Exec(tarantool.Call("centrifuge.add_presence", addPresenceRequest{ - Channel: ch, - TTL: int(ttl.Seconds()), - ClientID: clientID, - UserID: info.UserID, - Data: m.clientInfoString(info), - })) - return err -} - -func (m *PresenceManager) clientInfoString(clientInfo *centrifuge.ClientInfo) string { - var info string - if clientInfo != nil { - byteMessage, err := infoToProto(clientInfo).MarshalVT() - if err != nil { - return info - } - info = string(byteMessage) - } - return info -} - -type removePresenceRequest struct { - Channel string - ClientID string -} - -func (m *PresenceManager) RemovePresence(ch string, clientID string, _ string) error { - s := consistentShard(ch, m.shards) - _, err := s.Exec(tarantool.Call("centrifuge.remove_presence", removePresenceRequest{Channel: ch, ClientID: clientID})) - return err -} diff --git a/internal/tntengine/shard.go b/internal/tntengine/shard.go deleted file mode 100644 index 738c710df1..0000000000 --- a/internal/tntengine/shard.go +++ /dev/null @@ -1,135 +0,0 @@ -package tntengine - -import ( - "context" - "fmt" - "time" - - "github.com/centrifugal/centrifugo/v5/internal/tools" - - "github.com/FZambia/tarantool" -) - -const ( - defaultConnectTimeout = time.Second - defaultRequestTimeout = time.Second - defaultReadTimeout = 5 * time.Second - defaultWriteTimeout = time.Second -) - -// Shard represents single Tarantool instance. -type Shard struct { - config ShardConfig - subCh chan subRequest - mc *MultiConnection -} - -// ShardConfig allows providing options to connect to Tarantool. -type ShardConfig struct { - // Addresses of Tarantool instances. - Addresses []string - // User for auth. - User string - // Password for auth. - Password string - // ConnectionMode for shard. - ConnectionMode ConnectionMode -} - -func NewShard(c ShardConfig) (*Shard, error) { - shard := &Shard{ - config: c, - subCh: make(chan subRequest), - } - - mc, err := Connect(c.Addresses, tarantool.Opts{ - ConnectTimeout: defaultConnectTimeout, - RequestTimeout: defaultRequestTimeout, - ReadTimeout: defaultReadTimeout, - WriteTimeout: defaultWriteTimeout, - ReconnectDelay: 50 * time.Millisecond, - User: c.User, - Password: c.Password, - SkipSchema: true, - }, MultiOpts{ - ConnectionMode: c.ConnectionMode, - }) - if err != nil { - return nil, fmt.Errorf("error creating connection to %#v: %w", tools.GetLogAddresses(c.Addresses), err) - } - shard.mc = mc - return shard, nil -} - -func (s *Shard) Exec(request *tarantool.Request) ([]any, error) { - conn, err := s.mc.LeaderConn() - if err != nil { - return nil, err - } - return conn.Exec(request) -} - -func (s *Shard) ExecTyped(request *tarantool.Request, result any) error { - conn, err := s.mc.LeaderConn() - if err != nil { - return err - } - return conn.ExecTyped(request, result) -} - -func (s *Shard) pubSubConn() (*tarantool.Connection, func(), error) { - conn, err := s.mc.NewLeaderConn(tarantool.Opts{ - ConnectTimeout: defaultConnectTimeout, - RequestTimeout: 5 * time.Second, - ReadTimeout: defaultReadTimeout, - WriteTimeout: defaultWriteTimeout, - ReconnectDelay: 0, - User: s.config.User, - Password: s.config.Password, - SkipSchema: true, - }) - if err != nil { - return nil, nil, err - } - ctx, cancel := context.WithCancel(context.Background()) - go func() { - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second): - ok, err := s.mc.IsLeader(conn) - if err != nil || !ok { - s.mc.LeaderChanged() - _ = conn.Close() - } - } - } - }() - return conn, cancel, nil -} - -// subRequest is an internal request to subscribe or unsubscribe from one or more channels -type subRequest struct { - channels []string - subscribe bool - err chan error -} - -// newSubRequest creates a new request to subscribe or unsubscribe form a channel. -func newSubRequest(chIDs []string, subscribe bool) subRequest { - return subRequest{ - channels: chIDs, - subscribe: subscribe, - err: make(chan error, 1), - } -} - -// done should only be called once for subRequest. -func (sr *subRequest) done(err error) { - sr.err <- err -} - -func (sr *subRequest) result() error { - return <-sr.err -} diff --git a/internal/tntengine/util.go b/internal/tntengine/util.go deleted file mode 100644 index 2873ccc300..0000000000 --- a/internal/tntengine/util.go +++ /dev/null @@ -1,109 +0,0 @@ -package tntengine - -import ( - "hash/fnv" - "sync" - "time" - - "github.com/centrifugal/centrifuge" - "github.com/centrifugal/protocol" -) - -// index chooses bucket number in range [0, numBuckets). -func index(s string, numBuckets int) int { - if numBuckets == 1 { - return 0 - } - hash := fnv.New64a() - _, _ = hash.Write([]byte(s)) - return int(hash.Sum64() % uint64(numBuckets)) -} - -// consistentIndex is an adapted function from https://github.com/dgryski/go-jump -// package by Damian Gryski. It consistently chooses a hash bucket number in the -// range [0, numBuckets) for the given string. numBuckets must be >= 1. -func consistentIndex(s string, numBuckets int) int { - hash := fnv.New64a() - _, _ = hash.Write([]byte(s)) - key := hash.Sum64() - - var ( - b int64 = -1 - j int64 - ) - - for j < int64(numBuckets) { - b = j - key = key*2862933555777941757 + 1 - j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1))) - } - - return int(b) -} - -func consistentShard(ch string, shards []*Shard) *Shard { - if len(shards) == 1 { - return shards[0] - } - return shards[consistentIndex(ch, len(shards))] -} - -func infoToProto(v *centrifuge.ClientInfo) *protocol.ClientInfo { - if v == nil { - return nil - } - info := &protocol.ClientInfo{ - Client: v.ClientID, - User: v.UserID, - } - if len(v.ConnInfo) > 0 { - info.ConnInfo = v.ConnInfo - } - if len(v.ChanInfo) > 0 { - info.ChanInfo = v.ChanInfo - } - return info -} - -func infoFromProto(v *protocol.ClientInfo) *centrifuge.ClientInfo { - if v == nil { - return nil - } - info := ¢rifuge.ClientInfo{ - ClientID: v.GetClient(), - UserID: v.GetUser(), - } - if len(v.ConnInfo) > 0 { - info.ConnInfo = v.ConnInfo - } - if len(v.ChanInfo) > 0 { - info.ChanInfo = v.ChanInfo - } - return info -} - -var timerPool sync.Pool - -// AcquireTimer from pool. -func AcquireTimer(d time.Duration) *time.Timer { - v := timerPool.Get() - if v == nil { - return time.NewTimer(d) - } - - tm := v.(*time.Timer) - if tm.Reset(d) { - panic("Received an active timer from the pool!") - } - return tm -} - -// ReleaseTimer to pool. -func ReleaseTimer(tm *time.Timer) { - if !tm.Stop() { - // Do not reuse timer that has been already stopped. - // See https://groups.google.com/forum/#!topic/golang-nuts/-8O3AknKpwk - return - } - timerPool.Put(tm) -} diff --git a/internal/usage/usage.go b/internal/usage/usage.go index 9ce434d702..1f7f441700 100644 --- a/internal/usage/usage.go +++ b/internal/usage/usage.go @@ -76,7 +76,6 @@ type Features struct { Websocket bool HTTPStream bool SSE bool - SockJS bool UniWebsocket bool UniGRPC bool UniSSE bool @@ -361,9 +360,6 @@ func (s *Sender) prepareMetrics() ([]*metric, error) { if s.features.SSE { metrics = append(metrics, createPoint("transports_enabled.sse")) } - if s.features.SockJS { - metrics = append(metrics, createPoint("transports_enabled.sockjs")) - } if s.features.UniWebsocket { metrics = append(metrics, createPoint("transports_enabled.uni_websocket")) } diff --git a/main.go b/main.go index 2777247681..49aad0171e 100644 --- a/main.go +++ b/main.go @@ -52,11 +52,9 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/redisnatsbroker" "github.com/centrifugal/centrifugo/v5/internal/rule" "github.com/centrifugal/centrifugo/v5/internal/service" - "github.com/centrifugal/centrifugo/v5/internal/sockjs" "github.com/centrifugal/centrifugo/v5/internal/survey" "github.com/centrifugal/centrifugo/v5/internal/swaggerui" "github.com/centrifugal/centrifugo/v5/internal/telemetry" - "github.com/centrifugal/centrifugo/v5/internal/tntengine" "github.com/centrifugal/centrifugo/v5/internal/tools" "github.com/centrifugal/centrifugo/v5/internal/unigrpc" "github.com/centrifugal/centrifugo/v5/internal/unihttpstream" @@ -217,9 +215,6 @@ var defaults = map[string]any{ "admin_web_path": "", "admin_web_proxy_address": "", - "sockjs": false, - "sockjs_url": "https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js", - "websocket_compression": false, "websocket_compression_min_size": 0, "websocket_compression_level": 1, @@ -299,11 +294,6 @@ var defaults = map[string]any{ "proxy_grpc_compression": false, "proxy_grpc_tls": tools.TLSConfig{}, - "tarantool_mode": "standalone", - "tarantool_address": "tcp://127.0.0.1:3301", - "tarantool_user": "", - "tarantool_password": "", - "api_key": "", "api_error_mode": "", @@ -365,7 +355,6 @@ var defaults = map[string]any{ "websocket_handler_prefix": "/connection/websocket", "webtransport_handler_prefix": "/connection/webtransport", - "sockjs_handler_prefix": "/connection/sockjs", "http_stream_handler_prefix": "/connection/http_stream", "sse_handler_prefix": "/connection/sse", "uni_websocket_handler_prefix": "/connection/uni_websocket", @@ -529,9 +518,9 @@ func main() { "engine", "log_level", "log_file", "pid_file", "debug", "name", "admin", "admin_external", "client_insecure", "admin_insecure", "api_insecure", "api_external", "port", "address", "tls", "tls_cert", "tls_key", "tls_external", "internal_port", - "internal_address", "prometheus", "health", "redis_address", "tarantool_address", + "internal_address", "prometheus", "health", "redis_address", "broker", "nats_url", "grpc_api", "grpc_api_tls", "grpc_api_tls_disable", - "grpc_api_tls_cert", "grpc_api_tls_key", "grpc_api_port", "sockjs", "uni_grpc", + "grpc_api_tls_cert", "grpc_api_tls_key", "grpc_api_port", "uni_grpc", "uni_grpc_port", "uni_websocket", "uni_sse", "uni_http_stream", "sse", "http_stream", "swagger", } @@ -635,8 +624,6 @@ func main() { broker, presenceManager, engineMode, err = memoryEngine(node) } else if engineName == "redis" { broker, presenceManager, engineMode, err = redisEngine(node) - } else if engineName == "tarantool" { - broker, presenceManager, engineMode, err = tarantoolEngine(node) } else if engineName == "redisnats" { if !viper.GetBool("enable_unreleased_features") { log.Fatal().Msg("redisnats engine requires enable_unreleased_features on") @@ -906,7 +893,6 @@ func main() { Websocket: !viper.GetBool("websocket_disable"), HTTPStream: viper.GetBool("http_stream"), SSE: viper.GetBool("sse"), - SockJS: viper.GetBool("sockjs"), UniWebsocket: viper.GetBool("uni_websocket"), UniHTTPStream: viper.GetBool("uni_http_stream"), UniSSE: viper.GetBool("uni_sse"), @@ -974,7 +960,6 @@ func main() { rootCmd.Flags().BoolP("prometheus", "", false, "enable Prometheus metrics endpoint") rootCmd.Flags().BoolP("swagger", "", false, "enable Swagger UI endpoint describing server HTTP API") rootCmd.Flags().BoolP("health", "", false, "enable health check endpoint") - rootCmd.Flags().BoolP("sockjs", "", false, "enable SockJS endpoint") rootCmd.Flags().BoolP("uni_websocket", "", false, "enable unidirectional websocket endpoint") rootCmd.Flags().BoolP("uni_sse", "", false, "enable unidirectional SSE (EventSource) endpoint") rootCmd.Flags().BoolP("uni_http_stream", "", false, "enable unidirectional HTTP-streaming endpoint") @@ -1007,7 +992,6 @@ func main() { rootCmd.Flags().IntP("uni_grpc_port", "", 11000, "port to bind unidirectional GRPC server to") rootCmd.Flags().StringP("redis_address", "", "redis://127.0.0.1:6379", "Redis connection address (Redis engine)") - rootCmd.Flags().StringP("tarantool_address", "", "tcp://127.0.0.1:3301", "Tarantool connection address (Tarantool engine)") rootCmd.Flags().StringP("nats_url", "", "nats://127.0.0.1:4222", "Nats connection URL in format nats://user:pass@localhost:4222 (Nats broker)") var versionCmd = &cobra.Command{ @@ -1575,9 +1559,6 @@ func runHTTPServers(n *centrifuge.Node, ruleContainer *rule.Container, apiExecut } portFlags |= HandlerWebtransport } - if viper.GetBool("sockjs") { - portFlags |= HandlerSockJS - } if viper.GetBool("sse") { portFlags |= HandlerSSE } @@ -2545,20 +2526,6 @@ func uniGRPCHandlerConfig() unigrpc.Config { return unigrpc.Config{} } -func sockjsHandlerConfig() sockjs.Config { - v := viper.GetViper() - cfg := sockjs.Config{} - cfg.URL = v.GetString("sockjs_url") - cfg.WebsocketReadBufferSize = v.GetInt("websocket_read_buffer_size") - cfg.WebsocketWriteBufferSize = v.GetInt("websocket_write_buffer_size") - cfg.WebsocketUseWriteBufferPool = v.GetBool("websocket_use_write_buffer_pool") - cfg.WebsocketWriteTimeout = GetDuration("websocket_write_timeout") - cfg.CheckOrigin = getCheckOrigin() - cfg.WebsocketCheckOrigin = getCheckOrigin() - cfg.PingPongConfig = getPingPongConfig() - return cfg -} - func webTransportHandlerConfig() wt.Config { return wt.Config{ PingPongConfig: getPingPongConfig(), @@ -2784,85 +2751,6 @@ func redisEngine(n *centrifuge.Node) (*centrifuge.RedisBroker, centrifuge.Presen return broker, presenceManager, mode, nil } -func getTarantoolShardConfigs() ([]tntengine.ShardConfig, string, error) { - var shardConfigs []tntengine.ShardConfig - - mode := tntengine.ConnectionModeSingleInstance - if viper.IsSet("tarantool_mode") { - switch viper.GetString("tarantool_mode") { - case "standalone": - // default. - case "leader-follower": - mode = tntengine.ConnectionModeLeaderFollower - case "leader-follower-raft": - mode = tntengine.ConnectionModeLeaderFollowerRaft - default: - return nil, "", fmt.Errorf("unknown Tarantool mode: %s", viper.GetString("tarantool_mode")) - } - } - - var shardAddresses [][]string - - tarantoolAddresses := viper.GetStringSlice("tarantool_address") - for _, shardPart := range tarantoolAddresses { - shardAddresses = append(shardAddresses, strings.Split(shardPart, ",")) - } - - for _, tarantoolAddresses := range shardAddresses { - conf := &tntengine.ShardConfig{ - Addresses: tarantoolAddresses, - User: viper.GetString("tarantool_user"), - Password: viper.GetString("tarantool_password"), - ConnectionMode: mode, - } - shardConfigs = append(shardConfigs, *conf) - } - return shardConfigs, string(mode), nil -} - -func getTarantoolShards() ([]*tntengine.Shard, string, error) { - tarantoolShardConfigs, mode, err := getTarantoolShardConfigs() - if err != nil { - return nil, mode, err - } - tarantoolShards := make([]*tntengine.Shard, 0, len(tarantoolShardConfigs)) - - for _, tarantoolConf := range tarantoolShardConfigs { - tarantoolShard, err := tntengine.NewShard(tarantoolConf) - if err != nil { - return nil, mode, err - } - tarantoolShards = append(tarantoolShards, tarantoolShard) - } - - if len(tarantoolShards) > 1 { - mode += "_sharded" - } - - return tarantoolShards, mode, nil -} - -func tarantoolEngine(n *centrifuge.Node) (centrifuge.Broker, centrifuge.PresenceManager, string, error) { - tarantoolShards, mode, err := getTarantoolShards() - if err != nil { - return nil, nil, "", err - } - broker, err := tntengine.NewBroker(n, tntengine.BrokerConfig{ - Shards: tarantoolShards, - }) - if err != nil { - return nil, nil, "", err - } - presenceManager, err := tntengine.NewPresenceManager(n, tntengine.PresenceManagerConfig{ - Shards: tarantoolShards, - PresenceTTL: GetDuration("global_presence_ttl", true), - }) - if err != nil { - return nil, nil, "", err - } - return broker, presenceManager, mode, nil -} - type logHandler struct { entries chan centrifuge.LogEntry } @@ -2914,8 +2802,6 @@ type HandlerFlag int const ( // HandlerWebsocket enables Raw Websocket handler. HandlerWebsocket HandlerFlag = 1 << iota - // HandlerSockJS enables SockJS handler. - HandlerSockJS // HandlerWebtransport enables Webtransport handler (requires HTTP/3) HandlerWebtransport // HandlerAPI enables API handler. @@ -2946,7 +2832,6 @@ const ( var handlerText = map[HandlerFlag]string{ HandlerWebsocket: "websocket", - HandlerSockJS: "sockjs", HandlerWebtransport: "webtransport", HandlerAPI: "api", HandlerAdmin: "admin", @@ -2963,7 +2848,7 @@ var handlerText = map[HandlerFlag]string{ } func (flags HandlerFlag) String() string { - flagsOrdered := []HandlerFlag{HandlerWebsocket, HandlerSockJS, HandlerWebtransport, HandlerHTTPStream, HandlerSSE, HandlerEmulation, HandlerAPI, HandlerAdmin, HandlerPrometheus, HandlerDebug, HandlerHealth, HandlerUniWebsocket, HandlerUniSSE, HandlerUniHTTPStream, HandlerSwagger} + flagsOrdered := []HandlerFlag{HandlerWebsocket, HandlerWebtransport, HandlerHTTPStream, HandlerSSE, HandlerEmulation, HandlerAPI, HandlerAdmin, HandlerPrometheus, HandlerDebug, HandlerHealth, HandlerUniWebsocket, HandlerUniSSE, HandlerUniHTTPStream, HandlerSwagger} var endpoints []string for _, flag := range flagsOrdered { text, ok := handlerText[flag] @@ -3064,14 +2949,6 @@ func Mux(n *centrifuge.Node, ruleContainer *rule.Container, apiExecutor *api.Exe mux.Handle(ssePrefix, connChain.Then(centrifuge.NewSSEHandler(n, sseHandlerConfig()))) } - if flags&HandlerSockJS != 0 { - // register SockJS connection endpoints. - sockjsConfig := sockjsHandlerConfig() - sockjsPrefix := strings.TrimRight(v.GetString("sockjs_handler_prefix"), "/") - sockjsConfig.HandlerPrefix = sockjsPrefix - mux.Handle(sockjsPrefix+"/", connChain.Then(sockjs.NewHandler(n, sockjsConfig))) - } - if flags&HandlerUniWebsocket != 0 { // register unidirectional WebSocket connection endpoint. wsPrefix := strings.TrimRight(v.GetString("uni_websocket_handler_prefix"), "/") diff --git a/misc/tnt/init.lua b/misc/tnt/init.lua deleted file mode 100755 index 12c27648de..0000000000 --- a/misc/tnt/init.lua +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env tarantool - --- configure path so that you can run application --- from outside the root directory -if package.setsearchroot ~= nil then - package.setsearchroot() -else - -- Workaround for rocks loading in tarantool 1.10 - -- It can be removed in tarantool > 2.2 - -- By default, when you do require('mymodule'), tarantool looks into - -- the current working directory and whatever is specified in - -- package.path and package.cpath. If you run your app while in the - -- root directory of that app, everything goes fine, but if you try to - -- start your app with "tarantool myapp/init.lua", it will fail to load - -- its modules, and modules from myapp/.rocks. - local fio = require("fio") - local app_dir = fio.abspath(fio.dirname(arg[0])) - package.path = app_dir .. "/?.lua;" .. package.path - package.path = app_dir .. "/?/init.lua;" .. package.path - package.path = app_dir .. "/.rocks/share/tarantool/?.lua;" .. package.path - package.path = app_dir .. "/.rocks/share/tarantool/?/init.lua;" .. package.path - package.cpath = app_dir .. "/?.so;" .. package.cpath - package.cpath = app_dir .. "/?.dylib;" .. package.cpath - package.cpath = app_dir .. "/.rocks/lib/tarantool/?.so;" .. package.cpath - package.cpath = app_dir .. "/.rocks/lib/tarantool/?.dylib;" .. package.cpath -end - -require "strict".on() -local log = require("log") -fiber = require "fiber" - -local address = os.getenv("TARANTOOL_ADDRESS") or "0.0.0.0" -local port = os.getenv("TARANTOOL_PORT") or 3301 -local workdir = os.getenv("TARANTOOL_WORKDIR") or "/tmp/standalone_" .. port - -local fio = require("fio") -fio.mkdir(workdir) - -box.cfg { - listen = address .. ":" .. port, - wal_mode = "none", - wal_dir = workdir, -- though WAL not used here by default, see above. - memtx_dir = workdir, - readahead = 10 * 1024 * 1024, - net_msg_max = 10 * 1024 -} -box.schema.user.grant("guest", "super", nil, nil, {if_not_exists = true}) - -centrifuge = require "centrifuge" - -centrifuge.init_spaces({}) -centrifuge.start() diff --git a/misc/tnt/readme.md b/misc/tnt/readme.md deleted file mode 100644 index 9e8783d7e9..0000000000 --- a/misc/tnt/readme.md +++ /dev/null @@ -1,6 +0,0 @@ -Tarantool init Lua script for integration tests. - -``` -tarantoolctl rocks install https://raw.githubusercontent.com/centrifugal/tarantool-centrifuge/main/centrifuge-scm-1.rockspec -tarantool init.lua -```