From 41530d3ead2be1a1bc97fdad60c4802162e942f9 Mon Sep 17 00:00:00 2001 From: Philemon Ukane Date: Thu, 8 Dec 2022 00:04:21 +0100 Subject: [PATCH 1/2] connect to signalR websocket without signalR client - Remove external dependency used to connect to a signalR websocket. - Fix an error in the exchanges live test. - Update bittrex signalR websocket version. --- cmd/dcrdata/go.mod | 4 - cmd/dcrdata/go.sum | 9 -- exchanges/exchanges.go | 140 ++++++++++------------ exchanges/exchanges_live_test.go | 2 +- exchanges/exchanges_test.go | 29 +++-- exchanges/go.mod | 4 - exchanges/go.sum | 9 -- exchanges/rateserver/go.mod | 4 - exchanges/rateserver/go.sum | 9 -- exchanges/signalr.go | 195 +++++++++++++++++++++++++++++++ exchanges/websocket.go | 137 +++++----------------- 11 files changed, 305 insertions(+), 237 deletions(-) create mode 100644 exchanges/signalr.go diff --git a/cmd/dcrdata/go.mod b/cmd/dcrdata/go.mod index 0e327ef60..e6fd6e27a 100644 --- a/cmd/dcrdata/go.mod +++ b/cmd/dcrdata/go.mod @@ -63,8 +63,6 @@ require ( github.com/btcsuite/btcwallet/wtxmgr v1.5.0 // indirect github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect - github.com/carterjones/go-cloudflare-scraper v0.1.2 // indirect - github.com/carterjones/signalr v0.3.5 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/companyzero/sntrup4591761 v0.0.0-20200131011700-2b0d299dbd22 // indirect @@ -146,7 +144,6 @@ require ( github.com/prometheus/tsdb v0.7.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rjeczalik/notify v0.9.1 // indirect - github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 // indirect @@ -167,6 +164,5 @@ require ( google.golang.org/protobuf v1.26.0 // indirect gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect - gopkg.in/sourcemap.v1 v1.0.5 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/cmd/dcrdata/go.sum b/cmd/dcrdata/go.sum index a6560d97b..852ca24b7 100644 --- a/cmd/dcrdata/go.sum +++ b/cmd/dcrdata/go.sum @@ -183,10 +183,6 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f github.com/bufbuild/buf v0.37.0/go.mod h1:lQ1m2HkIaGOFba6w/aC3KYBHhKEOESP3gaAEpS3dAFM= github.com/caarlos0/env/v6 v6.9.3 h1:Tyg69hoVXDnpO5Qvpsu8EoquarbPyQb+YwExWHP8wWU= github.com/caarlos0/env/v6 v6.9.3/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc= -github.com/carterjones/go-cloudflare-scraper v0.1.2 h1:GNmlJEfhIVPVXaEItnPSDtwOpuJo6rFH2SFWuAjeEuM= -github.com/carterjones/go-cloudflare-scraper v0.1.2/go.mod h1:maO/ygX7QWbdh/TzHqr5uR42b2BW81g/05QRx7fpw38= -github.com/carterjones/signalr v0.3.5 h1:kJSw+6a9XmsOb/+9HWTnY8SjTrVOdpzCSPV/9IVS2nI= -github.com/carterjones/signalr v0.3.5/go.mod h1:SOGIwr/0/4GGNjHWSSginY66OVSaOeM85yWCNytdEwE= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -453,7 +449,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= -github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -1064,8 +1059,6 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= -github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d h1:1VUlQbCfkoSGv7qP7Y+ro3ap1P1pPZxgdGVqiTVy5C4= -github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -1727,8 +1720,6 @@ gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= -gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= diff --git a/exchanges/exchanges.go b/exchanges/exchanges.go index e6a179fbe..3130d130f 100644 --- a/exchanges/exchanges.go +++ b/exchanges/exchanges.go @@ -26,8 +26,6 @@ import ( "decred.org/dcrdex/dex" dexcandles "decred.org/dcrdex/dex/candles" "decred.org/dcrdex/dex/msgjson" - "github.com/carterjones/signalr" - "github.com/carterjones/signalr/hubs" dcrrates "github.com/decred/dcrdata/exchanges/v3/ratesproto" ) @@ -122,7 +120,7 @@ var ( }, // Bittrex uses SignalR, which retrieves the actual websocket endpoint via // HTTP. - Websocket: "socket.bittrex.com", + Websocket: "socket-v3.bittrex.com", } DragonExURLs = URLs{ Price: "https://openapi.dragonex.io/api/v1/market/real/?symbol_id=1520101", @@ -453,7 +451,6 @@ type CommonExchange struct { channels *BotChannels wsMtx sync.RWMutex ws websocketFeed - sr signalrClient wsSync struct { err error errCount int @@ -461,12 +458,10 @@ type CommonExchange struct { update time.Time fail time.Time } - // wsProcessor is only used for websockets, not SignalR. For SignalR, the - // callback function is passed as part of the signalrConfig. + // wsProcessor is used to process messages from websockets. wsProcessor WebsocketProcessor - // Exchanges that use websockets or signalr to maintain a live orderbook can - // use the buy and sell slices to leverage some useful methods on - // CommonExchange. + // Exchanges that use websockets to maintain a live orderbook can use the + // buy and sell slices to leverage some useful methods on CommonExchange. orderMtx sync.RWMutex buys wsOrders asks wsOrders @@ -602,21 +597,9 @@ func (xc *CommonExchange) websocket() (websocketFeed, WebsocketProcessor) { return xc.ws, xc.wsProcessor } -// Grab the SignalR client, which is nil for most exchanges. -func (xc *CommonExchange) signalr() signalrClient { - xc.mtx.RLock() - defer xc.mtx.RUnlock() - return xc.sr -} - -// Creates a websocket connection and starts a listen loop. Closes any existing -// connections for this exchange. -func (xc *CommonExchange) connectWebsocket(processor WebsocketProcessor, cfg *socketConfig) error { - ws, err := newSocketConnection(cfg) - if err != nil { - return err - } - +// addWebsocketConnection adds a websocket connection and it's message processor +// to an exchange. +func (xc *CommonExchange) addWebsocketConnection(ws websocketFeed, processor WebsocketProcessor) { xc.wsMtx.Lock() // Ensure that any previous websocket is closed. if xc.ws != nil { @@ -627,6 +610,17 @@ func (xc *CommonExchange) connectWebsocket(processor WebsocketProcessor, cfg *so xc.wsMtx.Unlock() xc.startWebsocket() +} + +// Creates a websocket connection and starts a listen loop. Closes any existing +// connections for this exchange. +func (xc *CommonExchange) connectWebsocket(processor WebsocketProcessor, cfg *socketConfig) error { + ws, err := newSocketConnection(cfg) + if err != nil { + return err + } + + xc.addWebsocketConnection(ws, processor) return nil } @@ -645,8 +639,7 @@ func (xc *CommonExchange) startWebsocket() { }() } -// wsSend sends a message on a standard websocket connection. For SignalR -// connections, use xc.sr.Send directly. +// wsSend sends a message on a standard websocket connection. func (xc *CommonExchange) wsSend(msg interface{}) error { ws, _ := xc.websocket() if ws == nil { @@ -656,6 +649,15 @@ func (xc *CommonExchange) wsSend(msg interface{}) error { return ws.Write(msg) } +// wsSendJSON is like wsSend but it encodes msg to JSON before sending. +func (xc *CommonExchange) wsSendJSON(msg interface{}) error { + ws, _ := xc.websocket() + if ws == nil { + return errors.New("no connection") + } + return ws.WriteJSON(msg) +} + // Checks whether the websocketFeed Done channel is closed. func (xc *CommonExchange) wsListening() bool { xc.wsMtx.RLock() @@ -673,15 +675,6 @@ func (xc *CommonExchange) setWsFail(err error) { // Clear the field to prevent double Close'ing. xc.ws = nil } - if xc.sr != nil { - // The carterjones/signalr can hang on Close. The goroutine is a stopgap while - // we migrate to a new signalr client. - // https://github.com/decred/dcrdata/issues/1818 - go xc.sr.Close() - // Clear the field to prevent double Close'ing. signalr will hang on - // second call. - xc.sr = nil - } xc.wsSync.err = err xc.wsSync.errCount++ xc.wsSync.fail = time.Now() @@ -731,26 +724,6 @@ func (xc *CommonExchange) wsErrorCount() int { return xc.wsSync.errCount } -// For exchanges that have SignalR-wrapped websockets, connectSignalr will be -// used instead of connectWebsocket. -func (xc *CommonExchange) connectSignalr(cfg *signalrConfig) (err error) { - if cfg.errHandler == nil { - cfg.errHandler = func(err error) { - xc.wsMtx.Lock() - xc.sr = nil - xc.wsMtx.Unlock() - xc.setWsFail(err) - } - } - xc.wsMtx.Lock() - defer xc.wsMtx.Unlock() - if xc.sr != nil { - xc.sr.Close() - } - xc.sr, err = newSignalrConnection(cfg) - return -} - // An intermediate order representation used to track an orderbook over a // websocket connection. type wsOrder struct { @@ -860,7 +833,6 @@ func (xc *CommonExchange) wsDepthStatus(connector func()) (tryHttp, initializing log.Errorf("%s websocket disabled. Too many errors. Will attempt to reconnect after %.1f minutes", xc.token, time.Until(okToTry).Minutes()) } return - } // Used to initialize the embedding exchanges. @@ -1305,9 +1277,9 @@ func NewBittrex(client *http.Client, channels *BotChannels) (bittrex Exchange, e } go func() { <-channels.done - sr := b.signalr() - if sr != nil { - sr.Close() + ws, _ := b.websocket() + if ws != nil { + ws.Close() } }() bittrex = b @@ -1429,7 +1401,7 @@ func translateBittrexCandlesticks(inSticks []*BittrexCandlestick) Candlesticks { const maxBittrexQueueSize = 50 -var bittrexSubscribeOrderbook = hubs.ClientMsg{ +var bittrexSubscribeOrderbook = signalRClientMsg{ H: "c3", M: "Subscribe", A: []interface{}{[]interface{}{"orderbook_DCR-BTC_500", "heartbeat"}}, @@ -1443,7 +1415,7 @@ func (bittrex *BittrexExchange) processBittrexOrderbookPoint(order *BittrexOrder case 0: _, found := book[k] if !found { - bittrex.setWsFail(fmt.Errorf("no order found for bittrex orderbook removal-type update at key %d\n", k)) + bittrex.setWsFail(fmt.Errorf("no order found for bittrex orderbook removal-type update at key %d", k)) return } delete(book, k) @@ -1554,7 +1526,7 @@ type BittrexOrderbookDelta struct { Rate StringFloat `json:"rate"` } -// BittrexWSMsg is used to parse the ridiculous signalr message format into +// BittrexWSMsg is used to parse the ridiculous signalR message format into // something sane. type BittrexWSMsg struct { Name string @@ -1566,7 +1538,7 @@ const ( BittrexMsgBookUpdate = "orderBook" ) -func decodeBittrexWSMessage(msg signalr.Message) ([]*BittrexWSMsg, error) { +func decodeBittrexWSMessage(msg signalRMessage) ([]*BittrexWSMsg, error) { msgs := make([]*BittrexWSMsg, 0, len(msg.M)) for _, hubMsg := range msg.M { msg := &BittrexWSMsg{ @@ -1613,11 +1585,23 @@ func decodeBittrexWSMessage(msg signalr.Message) ([]*BittrexWSMsg, error) { return msgs, nil } -// Handle the SignalR message. The message can be either a full orderbook at -// msg.R (msg.I == "1"), or a list of updates in msg.M[i].A. -func (bittrex *BittrexExchange) msgHandler(inMsg signalr.Message) { +// processWsMessage handles message from the bittrex websocket. The message can +// be either a full orderbook at msg.R (msg.I == "1"), or a list of updates in +// msg.M[i].A. +func (bittrex *BittrexExchange) processWsMessage(inMsg []byte) { + // Ignore KeepAlive messages. + if len(inMsg) == 2 && inMsg[0] == '{' && inMsg[1] == '}' { + return + } + + var msg signalRMessage + err := json.Unmarshal(inMsg, &msg) + if err != nil { + bittrex.setWsFail(fmt.Errorf("unable to read message bytes: %v", err)) + return + } - msgs, err := decodeBittrexWSMessage(inMsg) + msgs, err := decodeBittrexWSMessage(msg) if err != nil { bittrex.setWsFail(fmt.Errorf("Bittrex websocket message decode error: %v", err)) return @@ -1685,20 +1669,18 @@ func (bittrex *BittrexExchange) connectWs() { bittrex.orderSeq = 0 bittrex.orderMtx.Unlock() - err := bittrex.connectSignalr(&signalrConfig{ - host: "socket-v3.bittrex.com", - protocol: "1.5", - endpoint: "/signalr", - msgHandler: bittrex.msgHandler, - }) + conn, err := connectSignalRWebsocket(BittrexURLs.Websocket, "/signalr", nil) if err != nil { - bittrex.setWsFail(err) + bittrex.setWsFail(fmt.Errorf("connectSignalrWebsocket error: %v", err)) return } - // Subscribe to the feed. The full orderbook will be requested once the first - // delta is received. - err = bittrex.sr.Send(bittrexSubscribeOrderbook) + // Add the websocket feed to the bittrex exchange. + bittrex.addWebsocketConnection(conn, bittrex.processWsMessage) + + // Subscribe to the feed. The full orderbook will be requested once the + // first delta is received. + err = bittrex.wsSendJSON(bittrexSubscribeOrderbook) if err != nil { bittrex.setWsFail(fmt.Errorf("Failed to send order update request to bittrex: %v", err)) return @@ -1713,8 +1695,8 @@ func (bittrex *BittrexExchange) connectWs() { bittrex.processFullOrderbook(book) } -// Refresh retrieves and parses API data from Bittrex. -// Bittrex provides timestamps in a string format that is not quite RFC 3339. +// Refresh retrieves and parses API data from Bittrex. Bittrex provides +// timestamps in a string format that is not quite RFC 3339. func (bittrex *BittrexExchange) Refresh() { bittrex.LogRequest() priceResponse := new(BittrexPriceResponse) diff --git a/exchanges/exchanges_live_test.go b/exchanges/exchanges_live_test.go index e6214be2d..ce3d3adb1 100644 --- a/exchanges/exchanges_live_test.go +++ b/exchanges/exchanges_live_test.go @@ -344,7 +344,7 @@ func TestDecredDEXLive(t *testing.T) { case <-chans.index: log.Infof("Why are we receiving index updates?") case u := <-chans.exchange: - log.Infof("Exchange update received: %s", mustEncode(t, u)) + log.Infof("Exchange update received: %s", mustEncode(u)) } } }() diff --git a/exchanges/exchanges_test.go b/exchanges/exchanges_test.go index 2ff46b2f8..7b060b96d 100644 --- a/exchanges/exchanges_test.go +++ b/exchanges/exchanges_test.go @@ -19,8 +19,6 @@ import ( "time" "decred.org/dcrdex/dex/msgjson" - "github.com/carterjones/signalr" - "github.com/carterjones/signalr/hubs" "github.com/decred/slog" ) @@ -149,6 +147,9 @@ func (p *fakePoloniexWebsocket) Read() ([]byte, error) { func (p *fakePoloniexWebsocket) Write(interface{}) error { return nil } +func (p *fakePoloniexWebsocket) WriteJSON(interface{}) error { + return nil +} var poloMtx sync.Mutex var poloOn bool = true @@ -268,7 +269,7 @@ func (conn testBittrexConnection) On() bool { return false } -func (conn testBittrexConnection) Send(hubs.ClientMsg) error { +func (conn testBittrexConnection) Send(signalRClientMsg) error { return nil } @@ -289,7 +290,6 @@ func newTestBittrexExchange() (*BittrexExchange, *tDoer) { }, queue: make([]*BittrexOrderbookUpdate, 0), } - bittrex.sr = testBittrexConnection{xc: bittrex} return bittrex, doer } @@ -297,7 +297,7 @@ func TestBittrexWebsocket(t *testing.T) { var seq uint64 // helper function to prepare a websocket orderbook update. - obUpdate := func(bids []*BittrexOrderbookDelta, asks []*BittrexOrderbookDelta) signalr.Message { + obUpdate := func(bids []*BittrexOrderbookDelta, asks []*BittrexOrderbookDelta) []byte { t.Helper() u := BittrexOrderbookUpdate{ MarketSymbol: "DCR-BTC", @@ -326,14 +326,21 @@ func TestBittrexWebsocket(t *testing.T) { b64 := base64.StdEncoding.EncodeToString(buf.Bytes()) - return signalr.Message{ - M: []hubs.ClientMsg{ + msg := signalRMessage{ + M: []signalRClientMsg{ { M: BittrexMsgBookUpdate, A: []interface{}{b64}, }, }, } + + msgBytes, err := json.Marshal(msg) + if err != nil { + t.Fatalf("error encoding msg to json: %v", err) + } + + return msgBytes } bittrex, doer := newTestBittrexExchange() @@ -357,7 +364,7 @@ func TestBittrexWebsocket(t *testing.T) { Qty: 456789, Rate: 987654, }} - bittrex.msgHandler(obUpdate(nil, asks)) + bittrex.processWsMessage(obUpdate(nil, asks)) bittrex.processFullOrderbook(ob) @@ -382,7 +389,7 @@ func TestBittrexWebsocket(t *testing.T) { Qty: 0, Rate: 987654, }} - bittrex.msgHandler(obUpdate(nil, asks)) + bittrex.processWsMessage(obUpdate(nil, asks)) depths := bittrex.wsDepths() if len(depths.Asks) != 0 { @@ -494,6 +501,10 @@ func (ws *dexWS) Write(thing interface{}) error { return nil } +func (ws *dexWS) WriteJSON(thing interface{}) error { + return nil +} + func (ws *dexWS) Close() { if !atomic.CompareAndSwapUint32(&ws.closed, 0, 1) { return diff --git a/exchanges/go.mod b/exchanges/go.mod index a979e2217..95d88b212 100644 --- a/exchanges/go.mod +++ b/exchanges/go.mod @@ -4,7 +4,6 @@ go 1.18 require ( decred.org/dcrdex v0.5.5 - github.com/carterjones/signalr v0.3.5 github.com/decred/dcrd/dcrutil/v4 v4.0.0 github.com/decred/slog v1.2.0 github.com/golang/protobuf v1.5.2 @@ -33,7 +32,6 @@ require ( github.com/btcsuite/btcwallet/wtxmgr v1.5.0 // indirect github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect - github.com/carterjones/go-cloudflare-scraper v0.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/companyzero/sntrup4591761 v0.0.0-20200131011700-2b0d299dbd22 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect @@ -106,7 +104,6 @@ require ( github.com/prometheus/tsdb v0.7.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rjeczalik/notify v0.9.1 // indirect - github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect @@ -129,6 +126,5 @@ require ( google.golang.org/protobuf v1.26.0 // indirect gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect - gopkg.in/sourcemap.v1 v1.0.5 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/exchanges/go.sum b/exchanges/go.sum index 34d6959bc..23efa244e 100644 --- a/exchanges/go.sum +++ b/exchanges/go.sum @@ -162,10 +162,6 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/bufbuild/buf v0.37.0/go.mod h1:lQ1m2HkIaGOFba6w/aC3KYBHhKEOESP3gaAEpS3dAFM= -github.com/carterjones/go-cloudflare-scraper v0.1.2 h1:GNmlJEfhIVPVXaEItnPSDtwOpuJo6rFH2SFWuAjeEuM= -github.com/carterjones/go-cloudflare-scraper v0.1.2/go.mod h1:maO/ygX7QWbdh/TzHqr5uR42b2BW81g/05QRx7fpw38= -github.com/carterjones/signalr v0.3.5 h1:kJSw+6a9XmsOb/+9HWTnY8SjTrVOdpzCSPV/9IVS2nI= -github.com/carterjones/signalr v0.3.5/go.mod h1:SOGIwr/0/4GGNjHWSSginY66OVSaOeM85yWCNytdEwE= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -292,7 +288,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= -github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -843,8 +838,6 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= -github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d h1:1VUlQbCfkoSGv7qP7Y+ro3ap1P1pPZxgdGVqiTVy5C4= -github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1473,8 +1466,6 @@ gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= -gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= diff --git a/exchanges/rateserver/go.mod b/exchanges/rateserver/go.mod index 7f866a9a5..0b0e7c615 100644 --- a/exchanges/rateserver/go.mod +++ b/exchanges/rateserver/go.mod @@ -36,8 +36,6 @@ require ( github.com/btcsuite/btcwallet/wtxmgr v1.5.0 // indirect github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect - github.com/carterjones/go-cloudflare-scraper v0.1.2 // indirect - github.com/carterjones/signalr v0.3.5 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/companyzero/sntrup4591761 v0.0.0-20200131011700-2b0d299dbd22 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect @@ -110,7 +108,6 @@ require ( github.com/prometheus/tsdb v0.7.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rjeczalik/notify v0.9.1 // indirect - github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect @@ -133,6 +130,5 @@ require ( google.golang.org/protobuf v1.26.0 // indirect gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect - gopkg.in/sourcemap.v1 v1.0.5 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/exchanges/rateserver/go.sum b/exchanges/rateserver/go.sum index 865945e99..f141a96b1 100644 --- a/exchanges/rateserver/go.sum +++ b/exchanges/rateserver/go.sum @@ -162,10 +162,6 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/bufbuild/buf v0.37.0/go.mod h1:lQ1m2HkIaGOFba6w/aC3KYBHhKEOESP3gaAEpS3dAFM= -github.com/carterjones/go-cloudflare-scraper v0.1.2 h1:GNmlJEfhIVPVXaEItnPSDtwOpuJo6rFH2SFWuAjeEuM= -github.com/carterjones/go-cloudflare-scraper v0.1.2/go.mod h1:maO/ygX7QWbdh/TzHqr5uR42b2BW81g/05QRx7fpw38= -github.com/carterjones/signalr v0.3.5 h1:kJSw+6a9XmsOb/+9HWTnY8SjTrVOdpzCSPV/9IVS2nI= -github.com/carterjones/signalr v0.3.5/go.mod h1:SOGIwr/0/4GGNjHWSSginY66OVSaOeM85yWCNytdEwE= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -292,7 +288,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= -github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -844,8 +839,6 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= -github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d h1:1VUlQbCfkoSGv7qP7Y+ro3ap1P1pPZxgdGVqiTVy5C4= -github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1474,8 +1467,6 @@ gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= -gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= -gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= diff --git a/exchanges/signalr.go b/exchanges/signalr.go new file mode 100644 index 000000000..5b2a3516e --- /dev/null +++ b/exchanges/signalr.go @@ -0,0 +1,195 @@ +// Copyright (c) 2019-2021, The Decred developers +// See LICENSE for details. + +// This was almost entirely written using +// https://blog.3d-logic.com/2015/03/29/signalr-on-the-wire-an-informal-description-of-the-signalr-protocol/ +// and github.com/carterjones/signalr as a reference guide. + +package exchanges + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "math" + "math/rand" + "net/http" + "net/url" + "time" +) + +// defaultClientProtocol is the default protocol version used when connecting to +// a signalR websocket. +const defaultClientProtocol = "1.5" + +// signalRClientMsg represents a message sent from or to the signalR server on a +// persistent websocket connection. +type signalRClientMsg struct { + // invocation identifier – allows to match up responses with requests + I int + // the name of the hub + H string + // the name of the method + M string + // arguments (an array, can be empty if the method does not have any + // parameters) + A []interface{} + // state – a dictionary containing additional custom data (optional) + S *json.RawMessage `json:",omitempty"` +} + +// signalRMessage represents a signalR message sent from the server to the +// persistent websocket connection. +type signalRMessage struct { + // message id, present for all non-KeepAlive messages + C string + // an array containing actual data + M []signalRClientMsg + // indicates that the transport was initialized (a.k.a. init message) + S int + // groups token – an encrypted string representing group membership + G string + // other miscellaneous variables that sometimes are sent by the server + I string + E string + R json.RawMessage + H json.RawMessage // could be bool or string depending on a message type + D json.RawMessage + T json.RawMessage +} + +// signalRNegotiation represents a response sent after a negotiation with a +// signalR server. A bunch of other fields have been removed because they are +// not needed. +type signalRNegotiation struct { + ConnectionToken string +} + +// connectSignalRWebsocket connects to a signalR websocket in three steps +// (negotiate, connect, and start) and returns a websocketFeed which can be used +// to read websocket messages from the signalR server. There are no retires if +// connection to signalR websocket fails. +func connectSignalRWebsocket(host, endpoint string, tlsConfig *tls.Config) (websocketFeed, error) { + params := map[string]string{ + "clientProtocol": defaultClientProtocol, + } + + // Step 1: Negotiate with the signalR server to receive a connection token. + sn := new(signalRNegotiation) + err := fetch(http.MethodGet, makeURL("negotiate", host, endpoint, params), sn) + if err != nil { + return nil, err + } + + // Step 2: Connect to signalR websocket. + params["transport"] = "webSockets" + params["connectionToken"] = sn.ConnectionToken + + cfg := &socketConfig{ + address: makeURL("connect", host, endpoint, params), + tlsConfig: tlsConfig, + headers: map[string][]string{ + "User-Agent": {fauxBrowserUA}, + }, + } + + var success bool + ws, err := newSocketConnection(cfg) + if err != nil { + return nil, err + } + defer func() { + if success { + return + } + + // Gracefully close this websocket connection if we encounter an error + // below. + ws.Close() + }() + + // Step 3: Start the connection before returning the websocket connection. + // The websocket connection can be used without this step but we'd like to + // keep this step to be sure we can successfully received websocket messages + // from the signalR server. + confirmation := &struct{ Response string }{} + err = fetch(http.MethodGet, makeURL("start", host, endpoint, params), confirmation) + if err != nil { + return nil, err + } + + // Wait for the init message. + initMsg, err := ws.Read() + if err != nil { + return nil, err + } + + // Extract the server message. + var msg signalRMessage + err = json.Unmarshal(initMsg, &msg) + if err != nil { + return nil, fmt.Errorf("json.Unmarshal error: %w", err) + } + + serverInitialized := 1 + if msg.S != serverInitialized { + return nil, fmt.Errorf("unexpected S value received from server: %d | message: %s", msg.S, string(initMsg)) + } + + success = true + return ws, nil +} + +// makeURL is used to construct a signalR connection URL for the action +// specified. +func makeURL(action, host, endpoint string, params map[string]string) string { + var u url.URL + u.Scheme = "https" + u.Host = host + u.Path = endpoint + + param := url.Values{} + for key, value := range params { + param.Set(key, value) + } + + switch action { + case "negotiate": + u.Path += "/negotiate" + case "connect": + u.Path += "/connect" + u.Scheme = "wss" + param.Set("tid", fmt.Sprintf("%.0f", math.Floor(rand.Float64()*11))) + case "start": + u.Path += "/start" + } + + u.RawQuery = param.Encode() + return u.String() +} + +// fetch makes a request to the provided url and decodes the request response +// into response. +func fetch(method, url string, response interface{}) error { + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + + request, err := http.NewRequestWithContext(ctx, method, url, nil) + if err != nil { + return fmt.Errorf("http.NewRequestWithContext error: %w", err) + } + + resp, err := http.DefaultClient.Do(request) + if err != nil { + return fmt.Errorf("http.DefaultClient.Do error: %w", err) + } + defer resp.Body.Close() + + err = json.NewDecoder(resp.Body).Decode(response) + if err != nil { + return fmt.Errorf("Failed to decode json from %s: %w", request.URL.String(), err) + } + + return nil +} diff --git a/exchanges/websocket.go b/exchanges/websocket.go index 4ebc2cc00..bc3321cdd 100644 --- a/exchanges/websocket.go +++ b/exchanges/websocket.go @@ -6,12 +6,11 @@ package exchanges import ( "crypto/tls" "encoding/json" + "fmt" "net/http" "sync" "time" - "github.com/carterjones/signalr" - "github.com/carterjones/signalr/hubs" "github.com/gorilla/websocket" ) @@ -28,6 +27,12 @@ type websocketFeed interface { // Write sends a message. Write will safely sequence messages from multiple // threads. Write(interface{}) error + // WriteJSON is modeled after the function defined at + // https://godoc.org/github.com/gorilla/websocket#Conn.WriteJSON + // + // WriteJSON is like Write but it will encode a message to json before + // sending it. + WriteJSON(interface{}) error // Close will disconnect, causing any pending Read operations to error out. Close() } @@ -38,6 +43,7 @@ type websocketFeed interface { type socketConfig struct { address string tlsConfig *tls.Config + headers http.Header } // A manager for a gorilla websocket connection. @@ -68,6 +74,15 @@ func (client *socketClient) Write(msg interface{}) error { return client.conn.WriteMessage(websocket.TextMessage, b) } +// WriteJSON is a wrapper for gorilla WriteJSON. Satisfies +// websocketFeed.WriteJSON. Writes are sequenced with a mutex lock for +// per-connection multi-threaded use. +func (client *socketClient) WriteJSON(msg interface{}) error { + client.mtx.Lock() + defer client.mtx.Unlock() + return client.conn.WriteJSON(msg) +} + // Done returns a channel that will be closed when the websocket connection is // closed. Satisfies websocketFeed.Done. func (client *socketClient) Done() chan struct{} { @@ -94,115 +109,19 @@ func newSocketConnection(cfg *socketConfig) (websocketFeed, error) { TLSClientConfig: cfg.tlsConfig, } - conn, _, err := dialer.Dial(cfg.address, nil) - if err != nil { - return nil, err - } - return &socketClient{ - conn: conn, - done: make(chan struct{}), - on: true, - }, nil -} - -/* -// Quickly encode the thing to a JSON-encoded string. -func jsonify(thing interface{}) string { - s, _ := json.MarshalIndent(thing, "", " ") - return string(s) -} - -// Dump the signalr.Message to something readable. -func dumpSignalrMsg(msg signalr.Message) { - fmt.Println("=================================") - fmt.Printf("C: %s\n", jsonify(msg.C)) - fmt.Printf("S: %s\n", jsonify(msg.S)) - fmt.Printf("G: %s\n", jsonify(msg.G)) - fmt.Printf("I: %s\n", jsonify(msg.I)) - fmt.Printf("E: %s\n", jsonify(msg.E)) - s, _ := msg.R.MarshalJSON() - fmt.Printf("R: %s\n", string(s)) - s, _ = msg.H.MarshalJSON() - fmt.Printf("H: %s\n", string(s)) - s, _ = msg.D.MarshalJSON() - fmt.Printf("D: %s\n", string(s)) - s, _ = msg.T.MarshalJSON() - fmt.Printf("T: %s\n", string(s)) - for _, hubMsg := range msg.M { - fmt.Printf(" M: %s\n", hubMsg.M) - for _, arg := range hubMsg.A { - fmt.Printf(" A: %s\n", jsonify(arg)) - } + conn, resp, err := dialer.Dial(cfg.address, cfg.headers) + if err == nil { // Return early if no error. + return &socketClient{ + conn: conn, + done: make(chan struct{}), + on: true, + }, nil } - fmt.Println("=================================") -} -*/ - -// The interface for a signalr connection. -type signalrClient interface { - Send(hubs.ClientMsg) error - Close() -} - -type signalrConfig struct { - host string - protocol string - endpoint string - connectionData string - params map[string]string - msgHandler signalr.MsgHandler // func(msg signalr.Message) - errHandler signalr.ErrHandler // func(err error) -} - -// A wrapper for the signalr.Client. Satisfies signalrClient. -type signalrConnection struct { - c *signalr.Client - mtx sync.Mutex - on bool -} -// Send sends the ClientMsg on the connection. A mutex makes Send thread-safe. -func (conn *signalrConnection) Send(msg hubs.ClientMsg) error { - conn.mtx.Lock() - defer conn.mtx.Unlock() - return conn.c.Send(msg) -} - -// Close closes the underlying signalr connection. -func (conn *signalrConnection) Close() { - // Underlying connection Close can block, so measures should be taken prevent - // calls to Close on an already closed connection. - conn.mtx.Lock() - defer conn.mtx.Unlock() - if !conn.on { - return + // Handle response properly. + if resp == nil { + return nil, fmt.Errorf("received empty response body and an error: %w", err) } - conn.on = false - conn.c.Close() -} -// Create a new signalr connection. Returns the signalrClient interface rather -// than the signalrConnection. -func newSignalrConnection(cfg *signalrConfig) (signalrClient, error) { - // Prepare a SignalR client. - c := signalr.New( - cfg.host, - cfg.protocol, - cfg.endpoint, - cfg.connectionData, - cfg.params, - ) - - // Set the user agent to one that looks like a browser. - c.Headers["User-Agent"] = fauxBrowserUA - - // Start the connection. - err := c.Run(cfg.msgHandler, cfg.errHandler) - if err != nil { - return nil, err - } - return &signalrConnection{ - c: c, - on: true, - }, nil + return nil, fmt.Errorf("unexpected response status %s and error: %w", resp.Status, err) } From 4c73526842ddd54bb07f2261771de83ca52872fc Mon Sep 17 00:00:00 2001 From: Philemon Ukane Date: Fri, 9 Dec 2022 18:48:31 +0100 Subject: [PATCH 2/2] clean up exchanges test --- exchanges/exchanges_live_test.go | 19 ++++++-- exchanges/exchanges_test.go | 81 +++++++++++++------------------- 2 files changed, 46 insertions(+), 54 deletions(-) diff --git a/exchanges/exchanges_live_test.go b/exchanges/exchanges_live_test.go index ce3d3adb1..bee835f39 100644 --- a/exchanges/exchanges_live_test.go +++ b/exchanges/exchanges_live_test.go @@ -193,7 +193,7 @@ func TestPoloniexLiveWebsocket(t *testing.T) { } testConnectWs := func() { - poloniexDoneChannel = make(chan struct{}) + poloniex.ws = newFakePoloniexWebsocket() poloniex.connectWebsocket(processor, &socketConfig{ address: PoloniexURLs.Websocket, }) @@ -208,9 +208,13 @@ func TestPoloniexLiveWebsocket(t *testing.T) { } // Test reconnection by forcing a fail, then checking the wsDepthStatus poloniex.setWsFail(fmt.Errorf("test failure. ignore")) - // subsequent calls to close should be inconsequential. - poloniex.ws.Close() - poloniex.ws.Close() + + // poloniex.setWsFail above closes the ws connection and set poloniex.ws to + // nil. + if poloniex.ws != nil { + t.Fatal("expected nil'ed and closed poloniex websocketFeed") + } + // wsDepthStatus should recognize the closed connection and create a real // websocket connection, signalling to use the HTTP fallback in the meantime. tryHttp, initializing, depth := poloniex.wsDepthStatus(testConnectWs) @@ -234,6 +238,10 @@ func TestPoloniexLiveWebsocket(t *testing.T) { } checkWsDepths(t, poloniex.wsDepths()) poloniex.ws.Close() + + // Subsequent calls to Close should be inconsequential. + poloniex.ws.Close() + poloniex.ws.Close() } func TestBittrexLiveWebsocket(t *testing.T) { enableTestLog() @@ -292,6 +300,7 @@ func TestDecredDEXLive(t *testing.T) { enableTestLog() ctx, cancel := context.WithCancel(context.Background()) + defer cancel() chans := &BotChannels{ index: make(chan *IndexUpdate), @@ -318,7 +327,7 @@ func TestDecredDEXLive(t *testing.T) { xc, err := constructor(nil, chans) if err != nil { - t.Fatalf("NewDecredDEX error: %v", err) + t.Fatalf("NewDecredDEXConstructor error: %v", err) } dcr := xc.(*DecredDEX) defer func() { dcr.ws.Close() }() diff --git a/exchanges/exchanges_test.go b/exchanges/exchanges_test.go index 7b060b96d..cd9bd0211 100644 --- a/exchanges/exchanges_test.go +++ b/exchanges/exchanges_test.go @@ -29,7 +29,8 @@ func enableTestLog() { } } -var initialPoloniexOrderbook = []byte(`[ +var ( + initialPoloniexOrderbook = []byte(`[ 14, 8767, [ @@ -74,11 +75,11 @@ var initialPoloniexOrderbook = []byte(`[ ] ]`) -var poloniexEmptyUpdate = []byte(`[ + poloniexEmptyUpdate = []byte(`[ 1010 ]`) -var poloniexOrderbookUpdate = []byte(`[ + poloniexOrderbookUpdate = []byte(`[ 14, 8768, [ @@ -97,7 +98,7 @@ var poloniexOrderbookUpdate = []byte(`[ ] ]`) -var poloniexTrade = []byte(`[ + poloniexTrade = []byte(`[ 14, 8769, [ @@ -111,26 +112,28 @@ var poloniexTrade = []byte(`[ ] ] ]`) +) // Satisfies the websocketFeed interface -type fakePoloniexWebsocket struct{} - -var poloniexDoneChannel = make(chan struct{}) - -var poloniexReadCount int - -// Done() chan struct{} -// Read() ([]byte, error) -// Write(interface{}) error -// Close() +type fakePoloniexWebsocket struct { + readCount int + done chan struct{} + mtx sync.Mutex + on bool +} +func newFakePoloniexWebsocket() *fakePoloniexWebsocket { + return &fakePoloniexWebsocket{ + done: make(chan struct{}), + on: true, + } +} func (p *fakePoloniexWebsocket) Done() chan struct{} { - return poloniexDoneChannel + return p.done } - func (p *fakePoloniexWebsocket) Read() ([]byte, error) { - poloniexReadCount++ - switch poloniexReadCount { + p.readCount++ + switch p.readCount { case 1: return initialPoloniexOrderbook, nil case 2: @@ -140,33 +143,28 @@ func (p *fakePoloniexWebsocket) Read() ([]byte, error) { time.Sleep(100 * time.Millisecond) return poloniexOrderbookUpdate, nil } - <-poloniexDoneChannel + <-p.done return nil, fmt.Errorf("closed (expected)") } - func (p *fakePoloniexWebsocket) Write(interface{}) error { return nil } func (p *fakePoloniexWebsocket) WriteJSON(interface{}) error { return nil } - -var poloMtx sync.Mutex -var poloOn bool = true - func (p *fakePoloniexWebsocket) Close() { - poloMtx.Lock() - defer poloMtx.Unlock() - if poloOn { - poloOn = false - close(poloniexDoneChannel) + p.mtx.Lock() + defer p.mtx.Unlock() + if !p.on { + return } + p.on = false + close(p.done) } - func (p *fakePoloniexWebsocket) On() bool { - poloMtx.Lock() - defer poloMtx.Unlock() - return poloOn + p.mtx.Lock() + defer p.mtx.Unlock() + return p.on } func newTestPoloniexExchange() *PoloniexExchange { @@ -189,7 +187,7 @@ func TestPoloniexWebsocket(t *testing.T) { enableTestLog() poloniex := newTestPoloniexExchange() - poloniex.ws = &fakePoloniexWebsocket{} + poloniex.ws = newFakePoloniexWebsocket() checkLengths := func(askLen, buyLen int) { if len(poloniex.asks) != askLen || len(poloniex.buys) != buyLen { @@ -258,21 +256,6 @@ func (d *tDoer) queue(body interface{}) *http.Response { return resp } -type testBittrexConnection struct { - xc *BittrexExchange -} - -func (conn testBittrexConnection) Close() {} - -func (conn testBittrexConnection) On() bool { - // Doesn't matter right now. - return false -} - -func (conn testBittrexConnection) Send(signalRClientMsg) error { - return nil -} - func newTestBittrexExchange() (*BittrexExchange, *tDoer) { doer := &tDoer{} bittrex := &BittrexExchange{