Skip to content

Commit

Permalink
fix(webscoket): include ws connection with backoff (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-brito authored Sep 17, 2021
1 parent 9be8210 commit f48bb5c
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 42 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
- run: go test -race -cover ./...

- name: lint
if: github.event_name == 'pull_request'
uses: golangci/[email protected]
with:
version: latest
Expand Down
43 changes: 29 additions & 14 deletions exchange/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/rodrigo-brito/ninjabot/model"

"github.com/adshao/go-binance/v2"
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ func WithBinanceCredentials(key, secret string) BinanceOption {
}

func NewBinance(ctx context.Context, options ...BinanceOption) (*Binance, error) {
binance.WebsocketKeepalive = true
exchange := &Binance{ctx: ctx}
for _, option := range options {
option(exchange)
Expand Down Expand Up @@ -456,25 +458,38 @@ func (b *Binance) Position(symbol string) (asset, quote float64, err error) {
return acc.Balance(assetTick).Free, acc.Balance(quoteTick).Free, nil
}

func (b *Binance) CandlesSubscription(symbol, period string) (chan model.Candle, chan error) {
func (b *Binance) CandlesSubscription(ctx context.Context, symbol, period string) (chan model.Candle, chan error) {
ccandle := make(chan model.Candle)
cerr := make(chan error)

go func() {
done, _, err := binance.WsKlineServe(symbol, period, func(event *binance.WsKlineEvent) {
ccandle <- CandleFromWsKline(symbol, event.Kline)
}, func(err error) {
cerr <- err
})
if err != nil {
cerr <- err
close(cerr)
close(ccandle)
return
b := &backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 1 * time.Second,
}
for {
done, _, err := binance.WsKlineServe(symbol, period, func(event *binance.WsKlineEvent) {
b.Reset()
ccandle <- CandleFromWsKline(symbol, event.Kline)
}, func(err error) {
cerr <- err
})
if err != nil {
cerr <- err
close(cerr)
close(ccandle)
return
}

select {
case <-ctx.Done():
close(cerr)
close(ccandle)
return
case <-done:
time.Sleep(b.Duration())
}
}
<-done
close(cerr)
close(ccandle)
}()

return ccandle, cerr
Expand Down
2 changes: 1 addition & 1 deletion exchange/csvfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (c *CSVFeed) CandlesByLimit(_ context.Context, pair, timeframe string, limi
return result, nil
}

func (c CSVFeed) CandlesSubscription(pair, timeframe string) (chan model.Candle, chan error) {
func (c CSVFeed) CandlesSubscription(_ context.Context, pair, timeframe string) (chan model.Candle, chan error) {
ccandle := make(chan model.Candle)
cerr := make(chan error)
key := c.feedTimeframeKey(pair, timeframe)
Expand Down
3 changes: 2 additions & 1 deletion exchange/exchange.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exchange

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (d *DataFeedSubscription) Connect() {
log.Infof("Connecting to the exchange.")
for _, feed := range d.Feeds {
pair, timeframe := d.pairTimeframeFromKey(feed)
ccandle, cerr := d.exchange.CandlesSubscription(pair, timeframe)
ccandle, cerr := d.exchange.CandlesSubscription(context.Background(), pair, timeframe)
d.DataFeeds[feed] = &DataFeed{
Data: ccandle,
Err: cerr,
Expand Down
4 changes: 2 additions & 2 deletions exchange/paperwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,6 @@ func (p *PaperWallet) CandlesByLimit(ctx context.Context, pair, period string, l
return p.feeder.CandlesByLimit(ctx, pair, period, limit)
}

func (p *PaperWallet) CandlesSubscription(pair, timeframe string) (chan model.Candle, chan error) {
return p.feeder.CandlesSubscription(pair, timeframe)
func (p *PaperWallet) CandlesSubscription(ctx context.Context, pair, timeframe string) (chan model.Candle, chan error) {
return p.feeder.CandlesSubscription(ctx, pair, timeframe)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
entgo.io/ent v0.8.0
github.com/adshao/go-binance/v2 v2.2.1
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/markcheno/go-talib v0.0.0-20190307022042-cd53a9264d70
github.com/mattn/go-sqlite3 v1.14.7
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-openapi/inflect v0.19.0 h1:9jCH9scKIbHeV9m12SmPilScz6krDxKRasNNSNPXu/4=
github.com/go-openapi/inflect v0.19.0/go.mod h1:lHpZVlpIQqLyKwJ4N+YSc9hchQy/i12fJykb83CRBH4=
github.com/go-sql-driver/mysql v1.5.1-0.20200311113236-681ffa848bae/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -132,6 +131,8 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
Expand Down Expand Up @@ -215,11 +216,9 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v1.1.3 h1:xghbfqPkxzxP3C/f3n5DdpAbdKLj4ZE4BWQI362l53M=
github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -273,7 +272,6 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -344,12 +342,10 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
Expand Down
4 changes: 3 additions & 1 deletion ninjabot.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func NewBot(ctx context.Context, settings model.Settings, exch service.Exchange,
if err != nil {
return nil, err
}
// register telegram as notifier
WithNotifier(bot.telegram)(bot)
}

return bot, nil
Expand Down Expand Up @@ -232,7 +234,7 @@ func (n *NinjaBot) Run(ctx context.Context) error {
n.strategiesControllers[pair] = strategyController

// link to ninja bot controller
// TODO: include onCandleClose: false for better precision in OCO orders
// TODO: include onCandleClose: `false` to improve precision in OCO orders (backtesting)
n.dataFeed.Subscribe(pair, n.strategy.Timeframe(), n.onCandle, true)

// preload candles to warmup strategy
Expand Down
47 changes: 33 additions & 14 deletions order/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,30 @@ func (s summary) Profit() float64 {
func (s summary) Payoff() float64 {
avgWin := 0.0
avgLose := 0.0

for _, value := range s.Win {
avgWin += value
}

for _, value := range s.Lose {
avgLose += value
}

if len(s.Win) == 0 || len(s.Lose) == 0 {
if len(s.Win) == 0 || len(s.Lose) == 0 || avgLose == 0 {
return 0
}

return (avgWin / float64(len(s.Win))) / math.Abs(avgLose/float64(len(s.Lose)))
}

func (s summary) WinPercentage() float64 {
if len(s.Win)+len(s.Lose) == 0 {
return 0
}

return float64(len(s.Win)) / float64(len(s.Win)+len(s.Lose)) * 100
}

func (s summary) String() string {
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
Expand All @@ -60,7 +70,7 @@ func (s summary) String() string {
{"Trades", strconv.Itoa(len(s.Lose) + len(s.Win))},
{"Win", strconv.Itoa(len(s.Win))},
{"Loss", strconv.Itoa(len(s.Lose))},
{"% Win", fmt.Sprintf("%.1f", float64(len(s.Win))/float64(len(s.Win)+len(s.Lose))*100)},
{"% Win", fmt.Sprintf("%.1f", s.WinPercentage())},
{"Payoff", fmt.Sprintf("%.1f", s.Payoff()*100)},
{"Profit", fmt.Sprintf("%.4f %s", s.Profit(), quote)},
{"Volume", fmt.Sprintf("%.4f %s", s.Volume, quote)},
Expand Down Expand Up @@ -154,11 +164,20 @@ func (c *Controller) notify(message string) {
}
}

func (c *Controller) notifyError(err error) {
log.Error(err)
if c.notifier != nil {
c.notifier.OrError(err)
}
}

func (c *Controller) processTrade(order *model.Order) {
profitValue, profit, volume, err := c.calculateProfit(order)
if err != nil {
log.Errorf("order/controller storage: %s", err)
c.notifyError(fmt.Errorf("order/controller storage: %s", err))
return
}

order.Profit = profit
if _, ok := c.Results[order.Symbol]; !ok {
c.Results[order.Symbol] = &summary{Symbol: order.Symbol}
Expand Down Expand Up @@ -189,7 +208,7 @@ func (c *Controller) updateOrders() {
Order(storage.Asc(order.FieldID)).
All(c.ctx)
if err != nil {
log.Error("orderController/start:", err)
c.notifyError(fmt.Errorf("orderController/start: %s", err))
c.mtx.Unlock()
return
}
Expand All @@ -216,7 +235,7 @@ func (c *Controller) updateOrders() {
SetQuantity(excOrder.Quantity).
SetPrice(excOrder.Price).Save(c.ctx)
if err != nil {
log.Error("orderControler/update: ", err)
c.notifyError(fmt.Errorf("orderControler/update: %s", err))
continue
}

Expand Down Expand Up @@ -305,14 +324,14 @@ func (c *Controller) CreateOrderOCO(side model.SideType, symbol string, size, pr
log.Infof("[ORDER] Creating OCO order for %s", symbol)
orders, err := c.exchange.CreateOrderOCO(side, symbol, size, price, stop, stopLimit)
if err != nil {
log.Errorf("order/controller exchange: %s", err)
c.notifyError(fmt.Errorf("order/controller exchange: %s", err))
return nil, err
}

for i := range orders {
err := c.createOrder(&orders[i])
if err != nil {
log.Errorf("order/controller storage: %s", err)
c.notifyError(fmt.Errorf("order/controller storage: %s", err))
return nil, err
}
go c.orderFeed.Publish(orders[i], true)
Expand All @@ -328,13 +347,13 @@ func (c *Controller) CreateOrderLimit(side model.SideType, symbol string, size,
log.Infof("[ORDER] Creating LIMIT %s order for %s", side, symbol)
order, err := c.exchange.CreateOrderLimit(side, symbol, size, limit)
if err != nil {
log.Errorf("order/controller exchange: %s", err)
c.notifyError(fmt.Errorf("order/controller exchange: %s", err))
return model.Order{}, err
}

err = c.createOrder(&order)
if err != nil {
log.Errorf("order/controller storage: %s", err)
c.notifyError(fmt.Errorf("order/controller storage: %s", err))
return model.Order{}, err
}
go c.orderFeed.Publish(order, true)
Expand All @@ -349,13 +368,13 @@ func (c *Controller) CreateOrderMarketQuote(side model.SideType, symbol string,
log.Infof("[ORDER] Creating MARKET %s order for %s", side, symbol)
order, err := c.exchange.CreateOrderMarketQuote(side, symbol, amount)
if err != nil {
log.Errorf("order/controller exchange: %s", err)
c.notifyError(fmt.Errorf("order/controller exchange: %s", err))
return model.Order{}, err
}

err = c.createOrder(&order)
if err != nil {
log.Errorf("order/controller storage: %s", err)
c.notifyError(fmt.Errorf("order/controller storage: %s", err))
return model.Order{}, err
}

Expand All @@ -376,13 +395,13 @@ func (c *Controller) CreateOrderMarket(side model.SideType, symbol string, size
log.Infof("[ORDER] Creating MARKET %s order for %s", side, symbol)
order, err := c.exchange.CreateOrderMarket(side, symbol, size)
if err != nil {
log.Errorf("order/controller exchange: %s", err)
c.notifyError(fmt.Errorf("order/controller exchange: %s", err))
return model.Order{}, err
}

err = c.createOrder(&order)
if err != nil {
log.Errorf("order/controller storage: %s", err)
c.notifyError(fmt.Errorf("order/controller storage: %s", err))
return model.Order{}, err
}

Expand Down Expand Up @@ -410,7 +429,7 @@ func (c *Controller) Cancel(order model.Order) error {
SetStatus(string(model.OrderStatusTypePendingCancel)).
Save(c.ctx)
if err != nil {
log.Errorf("order/controller storage: %s", err)
c.notifyError(fmt.Errorf("order/controller storage: %s", err))
return err
}
log.Infof("[ORDER CANCELED] %s", order)
Expand Down
4 changes: 2 additions & 2 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Exchange interface {
type Feeder interface {
CandlesByPeriod(ctx context.Context, pair, period string, start, end time.Time) ([]model.Candle, error)
CandlesByLimit(ctx context.Context, pair, period string, limit int) ([]model.Candle, error)
CandlesSubscription(pair, timeframe string) (chan model.Candle, chan error)
CandlesSubscription(ctx context.Context, pair, timeframe string) (chan model.Candle, chan error)
}

type Broker interface {
Expand All @@ -36,6 +36,6 @@ type Notifier interface {
}

type Telegram interface {
Notify(text string)
Notifier
Start()
}
1 change: 0 additions & 1 deletion strategy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type Controller struct {
}

func NewStrategyController(pair string, strategy Strategy, broker service.Broker) *Controller {

dataframe := &model.Dataframe{
Pair: pair,
Metadata: make(map[string]model.Series),
Expand Down

0 comments on commit f48bb5c

Please sign in to comment.