From 92a678c1648c286e5f95775bbc7f8a59a56b582e Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 3 May 2021 09:02:23 +0530 Subject: [PATCH 1/8] removed redis from pending pool --- app/data/pending.go | 31 ++++++++++++++++--------------- go.mod | 1 + go.sum | 5 +++++ 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/app/data/pending.go b/app/data/pending.go index c4e6f82..880ec97 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -10,9 +10,9 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" "github.com/gammazero/workerpool" - "github.com/go-redis/redis/v8" "github.com/itzmeanjan/harmony/app/config" "github.com/itzmeanjan/harmony/app/listen" + "github.com/itzmeanjan/pubsub" ) // PendingPool - Currently present pending tx(s) i.e. which are ready to @@ -40,7 +40,7 @@ type PendingPool struct { DoneChan chan chan uint64 SetLastSeenBlockChan chan uint64 LastSeenBlockChan chan chan LastSeenBlock - PubSub *redis.Client + PubSub *pubsub.PubSub RPC *rpc.Client } @@ -163,7 +163,7 @@ func (p *PendingPool) Start(ctx context.Context) { tx.Pool = "pending" addTx(tx) - p.PublishAdded(ctx, p.PubSub, tx) + p.PublishAdded(ctx, tx) return true @@ -193,7 +193,7 @@ func (p *PendingPool) Start(ctx context.Context) { } removeTx(tx) - p.PublishRemoved(ctx, p.PubSub, tx) + p.PublishRemoved(ctx, tx) return true @@ -1022,18 +1022,19 @@ func (p *PendingPool) VerifiedAdd(ctx context.Context, tx *MemPoolTx) bool { // PublishAdded - Publish new pending tx pool content ( in messagepack serialized format ) // to pubsub topic -func (p *PendingPool) PublishAdded(ctx context.Context, pubsub *redis.Client, msg *MemPoolTx) { +func (p *PendingPool) PublishAdded(ctx context.Context, msg *MemPoolTx) { _msg, err := msg.ToMessagePack() if err != nil { - log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return - } - if err := pubsub.Publish(ctx, config.GetPendingTxEntryPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish new pending tx : %s\n", err.Error()) + if ok, _ := p.PubSub.Publish(&pubsub.Message{ + Topics: []string{config.GetPendingTxEntryPublishTopic()}, + Data: _msg, + }); !ok { + log.Printf("[❗️] Failed to publish new pending tx\n") } } @@ -1043,7 +1044,6 @@ func (p *PendingPool) PublishAdded(ctx context.Context, pubsub *redis.Client, ms func (p *PendingPool) Remove(ctx context.Context, txStat *TxStatus) bool { respChan := make(chan bool) - p.RemoveTxChan <- RemoveRequest{TxStat: txStat, ResponseChan: respChan} return <-respChan @@ -1054,18 +1054,19 @@ func (p *PendingPool) Remove(ctx context.Context, txStat *TxStatus) bool { // to pubsub topic // // These tx(s) are leaving pending pool i.e. they're confirmed now -func (p *PendingPool) PublishRemoved(ctx context.Context, pubsub *redis.Client, msg *MemPoolTx) { +func (p *PendingPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) { _msg, err := msg.ToMessagePack() if err != nil { - log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return - } - if err := pubsub.Publish(ctx, config.GetPendingTxExitPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish confirmed tx : %s\n", err.Error()) + if ok, _ := p.PubSub.Publish(&pubsub.Message{ + Topics: []string{config.GetPendingTxExitPublishTopic()}, + Data: _msg, + }); !ok { + log.Printf("[❗️] Failed to publish confirmed/dropped tx\n") } } diff --git a/go.mod b/go.mod index c856af7..dfa998e 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-redis/redis/v8 v8.7.0 github.com/gorilla/websocket v1.4.2 + github.com/itzmeanjan/pubsub v0.1.2 // indirect github.com/labstack/echo/v4 v4.2.0 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-connmgr v0.2.4 diff --git a/go.sum b/go.sum index c97b290..2ada4a3 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,7 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= @@ -351,6 +352,8 @@ github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= +github.com/itzmeanjan/pubsub v0.1.2 h1:JHFn/zPSdgmhP4+YkIMbReXRK7HmJNnLzlUGLRQo3E4= +github.com/itzmeanjan/pubsub v0.1.2/go.mod h1:LdGbd7JeJ2Z3BzyhrXjk65+tAH4QDvROkL/m6ff/lrk= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= @@ -639,6 +642,7 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -734,6 +738,7 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= From eeaf001777556fdb28b576b76c7fd4944fdea0c4 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 3 May 2021 09:06:34 +0530 Subject: [PATCH 2/8] removed redis from queued pool --- app/data/queued.go | 30 ++++++++++++++++-------------- go.mod | 2 +- go.sum | 20 ++++++++++++++++++++ 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/app/data/queued.go b/app/data/queued.go index 8d90bca..1bf3717 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -9,8 +9,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/gammazero/workerpool" - "github.com/go-redis/redis/v8" "github.com/itzmeanjan/harmony/app/config" + "github.com/itzmeanjan/pubsub" ) // QueuedPool - Currently present queued tx(s) i.e. these tx(s) are stuck @@ -33,7 +33,7 @@ type QueuedPool struct { CountTxsChan chan CountRequest ListTxsChan chan ListRequest TxsFromAChan chan TxsFromARequest - PubSub *redis.Client + PubSub *pubsub.PubSub RPC *rpc.Client PendingPool *PendingPool } @@ -147,7 +147,7 @@ func (q *QueuedPool) Start(ctx context.Context) { tx.Pool = "queued" addTx(tx) - q.PublishAdded(ctx, q.PubSub, tx) + q.PublishAdded(ctx, tx) return true @@ -163,7 +163,7 @@ func (q *QueuedPool) Start(ctx context.Context) { tx.UnstuckAt = time.Now().UTC() removeTx(tx) - q.PublishRemoved(ctx, q.PubSub, q.Transactions[txHash]) + q.PublishRemoved(ctx, tx) return tx @@ -774,18 +774,19 @@ func (q *QueuedPool) Add(ctx context.Context, tx *MemPoolTx) bool { // PublishAdded - Publish new tx, entered queued pool, ( in messagepack serialized format ) // to pubsub topic -func (q *QueuedPool) PublishAdded(ctx context.Context, pubsub *redis.Client, msg *MemPoolTx) { +func (q *QueuedPool) PublishAdded(ctx context.Context, msg *MemPoolTx) { _msg, err := msg.ToMessagePack() if err != nil { - log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return - } - if err := pubsub.Publish(ctx, config.GetQueuedTxEntryPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish new queued tx : %s\n", err.Error()) + if ok, _ := q.PubSub.Publish(&pubsub.Message{ + Topics: []string{config.GetQueuedTxEntryPublishTopic()}, + Data: _msg, + }); !ok { + log.Printf("[❗️] Failed to publish new queued tx\n") } } @@ -807,18 +808,19 @@ func (q *QueuedPool) Remove(ctx context.Context, txHash common.Hash) *MemPoolTx // These tx(s) are leaving queued pool i.e. they're ( probably ) going to // sit in pending pool now, unless they're already mined & harmony // failed to keep track of it -func (q *QueuedPool) PublishRemoved(ctx context.Context, pubsub *redis.Client, msg *MemPoolTx) { +func (q *QueuedPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) { _msg, err := msg.ToMessagePack() if err != nil { - log.Printf("[❗️] Failed to serialize into messagepack : %s\n", err.Error()) return - } - if err := pubsub.Publish(ctx, config.GetQueuedTxExitPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish unstuck tx : %s\n", err.Error()) + if ok, _ := q.PubSub.Publish(&pubsub.Message{ + Topics: []string{config.GetQueuedTxExitPublishTopic()}, + Data: _msg, + }); !ok { + log.Printf("[❗️] Failed to publish unstuck tx\n") } } diff --git a/go.mod b/go.mod index dfa998e..13d0eb0 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-redis/redis/v8 v8.7.0 github.com/gorilla/websocket v1.4.2 - github.com/itzmeanjan/pubsub v0.1.2 // indirect + github.com/itzmeanjan/pubsub v0.1.2 github.com/labstack/echo/v4 v4.2.0 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-connmgr v0.2.4 diff --git a/go.sum b/go.sum index 2ada4a3..39f38b5 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,7 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 h1:5sXbqlSomvdjlRbWyNqkPsJ3Fg+tQZCbgeX1VGljbQY= github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw= github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= @@ -150,6 +151,7 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/dvyukov/go-fuzz v0.0.0-20200318091601-be3528f3a813/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/edsrzf/mmap-go v0.0.0-20160512033002-935e0e8a636c/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -161,6 +163,7 @@ github.com/ethereum/go-ethereum v1.10.1/go.mod h1:E5e/zvdfUVr91JZ0AwjyuJM3x+no51 github.com/fatih/color v1.3.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= +github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c= github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= @@ -173,6 +176,7 @@ github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc h1:F7BbnLACph7UYiz github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc/go.mod h1:IlBLfYXnuw9sspy1XS6ctu5exGb6WHGKQsyo4s7bOEA= github.com/gammazero/workerpool v1.1.1 h1:MN29GcZtZZAgzTU+Zk54Y+J9XkE54MoXON/NCZvNulo= github.com/gammazero/workerpool v1.1.1/go.mod h1:5BN0IJVRjSFAypo9QTJCaWdijjNz9Jjl6VFS1PRjCeg= +github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= @@ -221,9 +225,11 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3-0.20201103224600-674baa8c7fc3 h1:ur2rms48b3Ep1dxh7aUV2FZEQ8jEVO2F6ILKx8ofkAg= github.com/golang/snappy v0.0.3-0.20201103224600-674baa8c7fc3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -291,7 +297,9 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= +github.com/holiman/uint256 v1.1.1 h1:4JywC80b+/hSfljFlEBLHrrh+CIONLDz9NuFl0af4Mw= github.com/holiman/uint256 v1.1.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= @@ -386,6 +394,7 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0= github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0= +github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356 h1:I/yrLt2WilKxlQKCM52clh5rGzTKpVctGT1lH4Dc8Jw= github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= github.com/kilic/bls12-381 v0.0.0-20201226121925-69dacb279461/go.mod h1:vDTTHJONJ6G+P2R74EhnyotQDTliQDnFEwhdmfzw1ig= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= @@ -642,6 +651,7 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= @@ -738,6 +748,7 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -767,6 +778,7 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9 github.com/pelletier/go-toml v1.8.1 h1:1Nf83orprkJyknT6h7zbuEGUEjcyVlCxSUGTENmNCRM= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= +github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM= github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -793,13 +805,16 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.6.2-0.20190402121629-4f204dcbc150/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= +github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v0.0.0-20160617231935-a62a804a8a00/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xhandler v0.0.0-20160618193221-ed27b6fd6521/go.mod h1:RvLn4FgxWubrpZHtQLnOf6EwhN2hEMusxZOhcW9H3UQ= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -850,6 +865,7 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= +github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw= github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3/go.mod h1:hpGUWaI9xL8pRQCTXQgocU38Qw1g0Us7n5PxxTwTCYU= @@ -867,6 +883,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca h1:Ld/zXl5t4+D69SiV4JoN7kkfvJdOWlPpfxrzxpLMoUk= github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M= @@ -874,6 +891,7 @@ github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyV github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc= github.com/tklauser/numcpus v0.2.1/go.mod h1:9aU+wOc6WjUIZEwWMP62PL/41d65P+iks1gBkr4QyP8= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef h1:wHSqTBrZW24CsNJDfeh9Ex6Pm0Rcpc7qrgKBiL44vF4= github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -1186,6 +1204,7 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -1205,6 +1224,7 @@ gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVY gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0= gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= From d1c41e330c120085ab53056f8651857333cff3df Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 3 May 2021 09:21:18 +0530 Subject: [PATCH 3/8] started pub/sub hub during boot up --- app/bootup/bootup.go | 41 ++++++++++------------------------------- app/data/resource.go | 8 ++------ main.go | 2 +- 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 3007517..6f4ccae 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -2,18 +2,19 @@ package bootup import ( "context" + "errors" "strconv" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" - "github.com/go-redis/redis/v8" "github.com/itzmeanjan/harmony/app/config" "github.com/itzmeanjan/harmony/app/data" "github.com/itzmeanjan/harmony/app/graph" "github.com/itzmeanjan/harmony/app/listen" "github.com/itzmeanjan/harmony/app/networking" + "github.com/itzmeanjan/pubsub" ) // GetNetwork - Make RPC call for reading network ID @@ -53,34 +54,12 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { return nil, err } - var options *redis.Options + _pubsub := pubsub.New() + go _pubsub.Start(ctx) - // If password is given in config file - if config.Get("RedisPassword") != "" { - - options = &redis.Options{ - Network: config.Get("RedisConnection"), - Addr: config.Get("RedisAddress"), - Password: config.Get("RedisPassword"), - DB: int(config.GetRedisDBIndex()), - } - - } else { - // If password is not given, attempting to connect with out it - // - // Though this is not recommended in production environment - options = &redis.Options{ - Network: config.Get("RedisConnection"), - Addr: config.Get("RedisAddress"), - DB: int(config.GetRedisDBIndex()), - } - - } - - _redis := redis.NewClient(options) - // Checking whether connection was successful or not - if err := _redis.Ping(ctx).Err(); err != nil { - return nil, err + <-time.After(time.Duration(1) * time.Millisecond) + if !_pubsub.Alive { + return nil, errors.New("failed to start pub/sub hub") } // Passed this redis client handle to graphql query resolver @@ -134,7 +113,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { DoneChan: make(chan chan uint64, 1), SetLastSeenBlockChan: lastSeenBlockChan, LastSeenBlockChan: make(chan chan data.LastSeenBlock, 1), - PubSub: _redis, + PubSub: _pubsub, RPC: client, } @@ -153,7 +132,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { CountTxsChan: make(chan data.CountRequest, 1), ListTxsChan: make(chan data.ListRequest, 1), TxsFromAChan: make(chan data.TxsFromARequest, 1), - PubSub: _redis, + PubSub: _pubsub, RPC: client, PendingPool: pendingPool, } @@ -242,7 +221,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { RPCClient: client, WSClient: wsClient, Pool: pool, - Redis: _redis, + PubSub: _pubsub, StartedAt: time.Now().UTC(), NetworkID: network}, nil diff --git a/app/data/resource.go b/app/data/resource.go index 82a56e9..419b3e7 100644 --- a/app/data/resource.go +++ b/app/data/resource.go @@ -1,12 +1,11 @@ package data import ( - "log" "time" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" - "github.com/go-redis/redis/v8" + "github.com/itzmeanjan/pubsub" ) // Resource - Shared resources among multiple go routines @@ -16,7 +15,7 @@ type Resource struct { RPCClient *rpc.Client WSClient *ethclient.Client Pool *MemPool - Redis *redis.Client + PubSub *pubsub.PubSub StartedAt time.Time NetworkID uint64 } @@ -27,8 +26,5 @@ func (r *Resource) Release() { r.RPCClient.Close() r.WSClient.Close() - if err := r.Redis.Close(); err != nil { - log.Printf("[❗️] Failed to close redis client : %s\n", err.Error()) - } } diff --git a/main.go b/main.go index cdb3822..e336bcd 100644 --- a/main.go +++ b/main.go @@ -30,7 +30,7 @@ func main() { // This is application's root level context, to be passed down // to worker go routines - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) resources, err := bootup.SetGround(ctx, abs) if err != nil { From 5be3e5ee3c162cf827d727f43a67134f9367d881 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 3 May 2021 09:53:59 +0530 Subject: [PATCH 4/8] subscription management in grqphql --- app/bootup/bootup.go | 2 +- app/graph/util.go | 114 +++++++++++++++++-------------------------- 2 files changed, 47 insertions(+), 69 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 6f4ccae..7bedbef 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -65,7 +65,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // Passed this redis client handle to graphql query resolver // // To be used when subscription requests are received from clients - if err := graph.InitRedisClient(_redis); err != nil { + if err := graph.InitPubSub(_pubsub); err != nil { return nil, err } diff --git a/app/graph/util.go b/app/graph/util.go index f54b979..b880c69 100644 --- a/app/graph/util.go +++ b/app/graph/util.go @@ -7,14 +7,14 @@ import ( "regexp" "time" - "github.com/go-redis/redis/v8" "github.com/itzmeanjan/harmony/app/config" "github.com/itzmeanjan/harmony/app/data" "github.com/itzmeanjan/harmony/app/graph/model" + "github.com/itzmeanjan/pubsub" ) var memPool *data.MemPool -var redisClient *redis.Client +var pubsubHub *pubsub.PubSub var parentCtx context.Context // InitMemPool - Initializing mempool handle, in this module @@ -30,16 +30,15 @@ func InitMemPool(pool *data.MemPool) error { } -// InitRedisClient - Initializing redis client handle, so that all -// subscriptions can be done using this client -func InitRedisClient(client *redis.Client) error { +// InitPubSub - Initializing pubsub handle, for managing subscriptions +func InitPubSub(client *pubsub.PubSub) error { if client != nil { - redisClient = client + pubsubHub = client return nil } - return errors.New("bad redis client received in graphQL handler") + return errors.New("bad pub/sub received in graphQL handler") } @@ -106,19 +105,17 @@ func checkHash(hash string) bool { } -// SubscribeToTopic - Subscribes to Redis topic(s), with context of caller -// while waiting for subscription confirmation -func SubscribeToTopic(ctx context.Context, topic ...string) (*redis.PubSub, error) { +// SubscribeToTopic - Subscribes to PubSub topic(s), while configuring subscription such +// that at max 256 messages can be kept in buffer at a time. If client is consuming slowly +// it might miss some messages when buffer stays full. +func SubscribeToTopic(ctx context.Context, topic ...string) (*pubsub.Subscriber, error) { - _pubsub := redisClient.Subscribe(ctx, topic...) - - // Waiting for subscription confirmation - _, err := _pubsub.Receive(ctx) - if err != nil { - return nil, err + _sub := pubsubHub.Subscribe(256, topic...) + if _sub == nil { + return nil, errors.New("topic subscription failed") } - return _pubsub, nil + return _sub, nil } @@ -126,7 +123,7 @@ func SubscribeToTopic(ctx context.Context, topic ...string) (*redis.PubSub, erro // happening in pending tx pool // // When tx joins/ leaves pending pool, subscribers will receive notification -func SubscribeToPendingPool(ctx context.Context) (*redis.PubSub, error) { +func SubscribeToPendingPool(ctx context.Context) (*pubsub.Subscriber, error) { return SubscribeToTopic(ctx, config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()) @@ -139,7 +136,7 @@ func SubscribeToPendingPool(ctx context.Context) (*redis.PubSub, error) { // // @note Tx(s) generally join queued pool, when there's nonce gap & this tx can't be // processed until some lower nonce tx(s) get(s) processed -func SubscribeToQueuedPool(ctx context.Context) (*redis.PubSub, error) { +func SubscribeToQueuedPool(ctx context.Context) (*pubsub.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxEntryPublishTopic(), config.GetQueuedTxExitPublishTopic()) @@ -153,7 +150,7 @@ func SubscribeToQueuedPool(ctx context.Context) (*redis.PubSub, error) { // // It'll subscribe to all 4 topics for listening // to tx(s) entering/ leaving any portion of mempool -func SubscribeToMemPool(ctx context.Context) (*redis.PubSub, error) { +func SubscribeToMemPool(ctx context.Context) (*pubsub.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxEntryPublishTopic(), @@ -165,7 +162,7 @@ func SubscribeToMemPool(ctx context.Context) (*redis.PubSub, error) { // SubscribeToPendingTxEntry - Subscribe to topic where new pending tx(s) // are published -func SubscribeToPendingTxEntry(ctx context.Context) (*redis.PubSub, error) { +func SubscribeToPendingTxEntry(ctx context.Context) (*pubsub.Subscriber, error) { return SubscribeToTopic(ctx, config.GetPendingTxEntryPublishTopic()) @@ -173,7 +170,7 @@ func SubscribeToPendingTxEntry(ctx context.Context) (*redis.PubSub, error) { // SubscribeToQueuedTxEntry - Subscribe to topic where new queued tx(s) // are published -func SubscribeToQueuedTxEntry(ctx context.Context) (*redis.PubSub, error) { +func SubscribeToQueuedTxEntry(ctx context.Context) (*pubsub.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxEntryPublishTopic()) @@ -181,7 +178,7 @@ func SubscribeToQueuedTxEntry(ctx context.Context) (*redis.PubSub, error) { // SubscribeToPendingTxExit - Subscribe to topic where pending tx(s), getting // confirmed are published -func SubscribeToPendingTxExit(ctx context.Context) (*redis.PubSub, error) { +func SubscribeToPendingTxExit(ctx context.Context) (*pubsub.Subscriber, error) { return SubscribeToTopic(ctx, config.GetPendingTxExitPublishTopic()) @@ -189,7 +186,7 @@ func SubscribeToPendingTxExit(ctx context.Context) (*redis.PubSub, error) { // SubscribeToQueuedTxExit - Subscribe to topic where queued tx(s), getting // unstuck are published -func SubscribeToQueuedTxExit(ctx context.Context) (*redis.PubSub, error) { +func SubscribeToQueuedTxExit(ctx context.Context) (*pubsub.Subscriber, error) { return SubscribeToTopic(ctx, config.GetQueuedTxExitPublishTopic()) @@ -206,7 +203,7 @@ func SubscribeToQueuedTxExit(ctx context.Context) (*redis.PubSub, error) { // // You can always blindly return `true` in your `evaluationCriteria` function, // so that you get to receive any tx being published on topic of your interest -func ListenToMessages(ctx context.Context, pubsub *redis.PubSub, topics []string, comm chan<- *model.MemPoolTx, pubCriteria PublishingCriteria, params ...interface{}) { +func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, topics []string, comm chan<- *model.MemPoolTx, pubCriteria PublishingCriteria, params ...interface{}) { defer func() { close(comm) @@ -230,7 +227,7 @@ func ListenToMessages(ctx context.Context, pubsub *redis.PubSub, topics []string // Denotes `harmony` is being shutdown // // We must unsubscribe from all topics & get out of this infinite loop - UnsubscribeFromTopic(context.Background(), pubsub, topics...) + UnsubscribeFromTopic(context.Background(), subscriber, topics...) break OUTER @@ -239,53 +236,32 @@ func ListenToMessages(ctx context.Context, pubsub *redis.PubSub, topics []string // Denotes client is not active anymore // // We must unsubscribe from all topics & get out of this infinite loop - UnsubscribeFromTopic(context.Background(), pubsub, topics...) + UnsubscribeFromTopic(context.Background(), subscriber, topics...) break OUTER default: - // If client is still active, we'll reach here in - // 10 ms & continue to read message published, if any - - msg, err := pubsub.ReceiveTimeout(ctx, time.Millisecond*time.Duration(10)) - if err != nil { - continue + // Read next message + received := subscriber.Next() + if received == nil { + break } - switch m := msg.(type) { - - case *redis.Subscription: - - // Pubsub broker informed we've been unsubscribed from - // this topic - // - // It's better to leave this infinite loop - if m.Kind == "unsubscribe" { - break OUTER + // New message has been published on topic + // of our interest, we'll attempt to deserialize + // data to deliver it to client in expected format + message := UnmarshalPubSubMessage(received.Data) + if message != nil && pubCriteria(message, params...) { + + // Only publish non-nil data i.e. if (de)-serialisation + // fails some how, it's better to send nothing, rather than + // sending client `nil` + sendable := message.ToGraphQL() + if sendable != nil { + comm <- sendable } - case *redis.Message: - - // New message has been published on topic - // of our interest, we'll attempt to deserialize - // data to deliver it to client in expected format - message := UnmarshalPubSubMessage([]byte(m.Payload)) - if message != nil && pubCriteria(message, params...) { - - // Only publish non-nil data i.e. if (de)-serialisation - // fails some how, it's better to send nothing, rather than - // sending client `nil` - sendable := message.ToGraphQL() - if sendable != nil { - comm <- sendable - } - - } - - default: - // @note Doing nothing yet - } } @@ -297,12 +273,14 @@ func ListenToMessages(ctx context.Context, pubsub *redis.PubSub, topics []string // UnsubscribeFromTopic - Given topic name to which client is already subscribed to, // attempts to unsubscribe from -func UnsubscribeFromTopic(ctx context.Context, pubsub *redis.PubSub, topic ...string) { - - if err := pubsub.Unsubscribe(ctx, topic...); err != nil { +func UnsubscribeFromTopic(ctx context.Context, subscriber *pubsub.Subscriber, topic ...string) { - log.Printf("[❗️] Failed to unsubscribe from Redis pubsub topic(s) : %s\n", err.Error()) + if ok, _ := subscriber.UnsubscribeAll(pubsubHub); !ok { + log.Printf("[❗️] Failed to unsubscribe from topic(s)\n") + } + if !subscriber.Close() { + log.Printf("[❗️] Failed to destroy subscriber\n") } } From f09827fd9cd9537d4c7e511e59c8f8eb5a3d78a7 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 3 May 2021 10:09:05 +0530 Subject: [PATCH 5/8] replaced redis with pubsub in p2p networking handler part --- app/bootup/bootup.go | 2 +- app/data/pool.go | 3 +- app/graph/util.go | 2 - app/networking/bootstrap.go | 14 ++--- app/networking/listen.go | 113 +++++++++++------------------------- go.mod | 4 +- go.sum | 12 ---- 7 files changed, 46 insertions(+), 104 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 7bedbef..62b42c7 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -72,7 +72,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { // Redis client to be used in p2p networking communication // handling section for letting clients know of some newly // seen mempool tx - if err := networking.InitRedisClient(_redis); err != nil { + if err := networking.InitRedisClient(_pubsub); err != nil { return nil, err } diff --git a/app/data/pool.go b/app/data/pool.go index c6e4673..5a2607f 100644 --- a/app/data/pool.go +++ b/app/data/pool.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/go-redis/redis/v8" ) // MemPool - Current state of mempool, where all pending/ queued tx(s) @@ -179,7 +178,7 @@ func (m *MemPool) Stat(start time.Time) { // is received from any `harmony` peer, it will be checked against latest state // of local mempool view, to decide whether this tx can be acted upon // somehow or not -func (m *MemPool) HandleTxFromPeer(ctx context.Context, pubsub *redis.Client, tx *MemPoolTx) bool { +func (m *MemPool) HandleTxFromPeer(ctx context.Context, tx *MemPoolTx) bool { // Checking whether we already have this tx included in pool // or not diff --git a/app/graph/util.go b/app/graph/util.go index b880c69..98a3fbd 100644 --- a/app/graph/util.go +++ b/app/graph/util.go @@ -210,10 +210,8 @@ func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, topics }() if !(len(topics) > 0) { - log.Printf("[❗️] Empty topic list was unexpected\n") return - } { diff --git a/app/networking/bootstrap.go b/app/networking/bootstrap.go index ac47043..cb0a97d 100644 --- a/app/networking/bootstrap.go +++ b/app/networking/bootstrap.go @@ -4,12 +4,12 @@ import ( "context" "errors" - "github.com/go-redis/redis/v8" "github.com/itzmeanjan/harmony/app/data" + "github.com/itzmeanjan/pubsub" ) var memPool *data.MemPool -var redisClient *redis.Client +var pubsubHub *pubsub.PubSub var connectionManager *ConnectionManager // InitMemPool - Initializing mempool handle, in this module @@ -28,22 +28,22 @@ func InitMemPool(pool *data.MemPool) error { // InitRedisClient - Initializing redis client handle, so that all // subscriptions can be done using this client -func InitRedisClient(client *redis.Client) error { +func InitRedisClient(client *pubsub.PubSub) error { if client != nil { - redisClient = client + pubsubHub = client return nil } - return errors.New("bad redis client received in p2p networking handler") + return errors.New("bad pub/sub received in p2p networking handler") } // Setup - Bootstraps `harmony`'s p2p networking stack func Setup(ctx context.Context, comm chan struct{}) error { - if !(memPool != nil && redisClient != nil) { - return errors.New("mempool/ redisClient instance not initialised") + if !(memPool != nil && pubsubHub != nil) { + return errors.New("mempool/ pubsubHub instance not initialised") } // Attempt to create a new `harmony` node diff --git a/app/networking/listen.go b/app/networking/listen.go index f25a735..27c4362 100644 --- a/app/networking/listen.go +++ b/app/networking/listen.go @@ -6,9 +6,7 @@ import ( "encoding/binary" "io" "log" - "time" - "github.com/go-redis/redis/v8" "github.com/itzmeanjan/harmony/app/config" "github.com/itzmeanjan/harmony/app/graph" "github.com/libp2p/go-libp2p-core/host" @@ -28,37 +26,30 @@ func ReadFrom(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWrit buf := make([]byte, 4) if _, err := io.ReadFull(rw.Reader, buf); err != nil { - if err == io.EOF { break } log.Printf("[❗️] Failed to read size of next chunk : %s | %s\n", err.Error(), remote) break - } size := binary.LittleEndian.Uint32(buf) - chunk := make([]byte, size) if _, err := io.ReadFull(rw.Reader, chunk); err != nil { - if err == io.EOF { break } log.Printf("[❗️] Failed to read chunk from peer : %s | %s\n", err.Error(), remote) break - } tx := graph.UnmarshalPubSubMessage(chunk) if tx == nil { - log.Printf("[❗️] Failed to deserialise message from peer | %s\n", remote) continue - } // Keeping entry of from which peer we received this tx @@ -66,11 +57,9 @@ func ReadFrom(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWrit // when it'll be published on Pub/Sub topic tx.ReceivedFrom = peerId - if memPool.HandleTxFromPeer(ctx, redisClient, tx) { - + if memPool.HandleTxFromPeer(ctx, tx) { log.Printf("✅ New tx from peer : %d bytes | %s\n", len(chunk), remote) continue - } log.Printf("👍 Seen tx from peer : %d bytes | %s\n", len(chunk), remote) @@ -91,90 +80,56 @@ func WriteTo(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWrite config.GetPendingTxEntryPublishTopic(), config.GetPendingTxExitPublishTopic()} - pubsub, err := graph.SubscribeToMemPool(ctx) + subscriber, err := graph.SubscribeToMemPool(ctx) if err != nil { - log.Printf("[❗️] Failed to subscribe to mempool changes : %s\n", err.Error()) return - } - { - OUTER: - for { - - <-time.After(time.Millisecond * time.Duration(1)) - - msg, err := pubsub.ReceiveTimeout(ctx, time.Millisecond*time.Duration(9)) - if err != nil { - continue - } - - switch m := msg.(type) { - - case *redis.Subscription: - - // Pubsub broker informed we've been unsubscribed from - // this topic - // - // It's better to leave this infinite loop - if m.Kind == "unsubscribe" { - return - } - - case *redis.Message: - - msg := graph.UnmarshalPubSubMessage([]byte(m.Payload)) - // Failed to deserialise message, we don't need - // to send it to remote - if msg == nil { - break - } - - // We found this tx from this peer, so we're - // not sending it back - if msg.ReceivedFrom == peerId { - break - } - - chunk := make([]byte, 4+len(m.Payload)) - - binary.LittleEndian.PutUint32(chunk[:4], uint32(len(m.Payload))) - n := copy(chunk[4:], []byte(m.Payload)) - - if n != len(m.Payload) { - - log.Printf("[❗️] Failed to prepare chunk for peer | %s\n", remote) - break - - } - - if _, err := rw.Write(chunk); err != nil { + for { - log.Printf("[❗️] Failed to write chunk on stream : %s | %s\n", err.Error(), remote) - break OUTER + received := subscriber.Next() + if received == nil { + continue + } - } + msg := graph.UnmarshalPubSubMessage(received.Data) + // Failed to deserialise message, we don't need + // to send it to remote + if msg == nil { + continue + } - if err := rw.Flush(); err != nil { + // We found this tx from this peer, so we're + // not sending it back + if msg.ReceivedFrom == peerId { + continue + } - log.Printf("[❗️] Failed to flush stream buffer : %s | %s\n", err.Error(), remote) - break OUTER + chunk := make([]byte, 4+len(received.Data)) - } + binary.LittleEndian.PutUint32(chunk[:4], uint32(len(received.Data))) + n := copy(chunk[4:], received.Data) - default: - // @note Doing nothing yet + if n != len(received.Data) { + log.Printf("[❗️] Failed to prepare chunk for peer | %s\n", remote) + continue + } - } + if _, err := rw.Write(chunk); err != nil { + log.Printf("[❗️] Failed to write chunk on stream : %s | %s\n", err.Error(), remote) + break + } + if err := rw.Flush(); err != nil { + log.Printf("[❗️] Failed to flush stream buffer : %s | %s\n", err.Error(), remote) + break } - } - if err := pubsub.Unsubscribe(context.Background(), topics...); err != nil { - log.Printf("[❗️] Failed to unsubscribe from Redis pubsub topic(s) : %s\n", err.Error()) } + graph.UnsubscribeFromTopic(ctx, subscriber, topics...) + } // HandleStream - Attepts new stream & handles it through out its life time diff --git a/go.mod b/go.mod index 13d0eb0..76b9948 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc // indirect github.com/gammazero/workerpool v1.1.1 github.com/go-ole/go-ole v1.2.5 // indirect - github.com/go-redis/redis/v8 v8.7.0 + github.com/google/go-cmp v0.5.4 // indirect github.com/gorilla/websocket v1.4.2 github.com/itzmeanjan/pubsub v0.1.2 github.com/labstack/echo/v4 v4.2.0 @@ -25,6 +25,8 @@ require ( github.com/magiconair/properties v1.8.4 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/multiformats/go-multiaddr v0.3.1 + github.com/onsi/ginkgo v1.15.0 // indirect + github.com/onsi/gomega v1.10.5 // indirect github.com/pelletier/go-toml v1.8.1 // indirect github.com/shirou/gopsutil v3.21.2+incompatible // indirect github.com/spf13/afero v1.5.1 // indirect diff --git a/go.sum b/go.sum index 39f38b5..c45f207 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,6 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dgryski/trifles v0.0.0-20190318185328-a8d75aae118c/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= @@ -191,8 +189,6 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-redis/redis/v8 v8.7.0 h1:LJ8sFG5eNH1u3SxlptEZ3mEgm/5J9Qx6QhiTG3HhpCo= -github.com/go-redis/redis/v8 v8.7.0/go.mod h1:BRxHBWn3pO3CfjyX6vAoyeRmCquvxr6QG+2onGV2gYs= github.com/go-sourcemap/sourcemap v2.1.2+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= @@ -931,14 +927,6 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opentelemetry.io/otel v0.18.0 h1:d5Of7+Zw4ANFOJB+TIn2K3QWsgS2Ht7OU9DqZHI6qu8= -go.opentelemetry.io/otel v0.18.0/go.mod h1:PT5zQj4lTsR1YeARt8YNKcFb88/c2IKoSABK9mX0r78= -go.opentelemetry.io/otel/metric v0.18.0 h1:yuZCmY9e1ZTaMlZXLrrbAPmYW6tW1A5ozOZeOYGaTaY= -go.opentelemetry.io/otel/metric v0.18.0/go.mod h1:kEH2QtzAyBy3xDVQfGZKIcok4ZZFvd5xyKPfPcuK6pE= -go.opentelemetry.io/otel/oteltest v0.18.0 h1:FbKDFm/LnQDOHuGjED+fy3s5YMVg0z019GJ9Er66hYo= -go.opentelemetry.io/otel/oteltest v0.18.0/go.mod h1:NyierCU3/G8DLTva7KRzGii2fdxdR89zXKH1bNWY7Bo= -go.opentelemetry.io/otel/trace v0.18.0 h1:ilCfc/fptVKaDMK1vWk0elxpolurJbEgey9J6g6s+wk= -go.opentelemetry.io/otel/trace v0.18.0/go.mod h1:FzdUu3BPwZSZebfQ1vl5/tAa8LyMLXSJN57AXIt/iDk= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= From d0e9c4dffbdf4c4fcdb75460c9b781895b137e45 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 3 May 2021 10:12:40 +0530 Subject: [PATCH 6/8] removed not needed functions --- app/bootup/bootup.go | 9 +++------ app/config/config.go | 19 ------------------- app/networking/bootstrap.go | 6 +++--- 3 files changed, 6 insertions(+), 28 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 62b42c7..149fb9d 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -62,17 +62,14 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { return nil, errors.New("failed to start pub/sub hub") } - // Passed this redis client handle to graphql query resolver - // // To be used when subscription requests are received from clients if err := graph.InitPubSub(_pubsub); err != nil { return nil, err } - // Redis client to be used in p2p networking communication - // handling section for letting clients know of some newly - // seen mempool tx - if err := networking.InitRedisClient(_pubsub); err != nil { + // Pubsub to be used in p2p networking handling section + // for letting clients know of some newly seen mempool tx + if err := networking.InitPubSub(_pubsub); err != nil { return nil, err } diff --git a/app/config/config.go b/app/config/config.go index 6dd8b16..501855b 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -4,7 +4,6 @@ import ( "log" "math" "runtime" - "strconv" "github.com/spf13/viper" ) @@ -130,24 +129,6 @@ func GetQueuedTxExitPublishTopic() string { } -// GetRedisDBIndex - Read desired redis database index, which -// user asked `harmony` to use -// -// If nothing is provided, it'll use `1`, by default -func GetRedisDBIndex() uint8 { - - db := Get("RedisDB") - - _db, err := strconv.ParseUint(db, 10, 8) - if err != nil { - log.Printf("[❗️] Failed to parse redis database index : `%s`, using 1\n", err.Error()) - return 1 - } - - return uint8(_db) - -} - // GetConcurrencyFactor - Size of worker pool, is dictated by rule below // // @note You can set floating point value for `ConcurrencyFactor` ( > 0 ) diff --git a/app/networking/bootstrap.go b/app/networking/bootstrap.go index cb0a97d..555c456 100644 --- a/app/networking/bootstrap.go +++ b/app/networking/bootstrap.go @@ -26,9 +26,9 @@ func InitMemPool(pool *data.MemPool) error { } -// InitRedisClient - Initializing redis client handle, so that all -// subscriptions can be done using this client -func InitRedisClient(client *pubsub.PubSub) error { +// InitPubSub - Initializing pubsub handle, so that all +// subscriptions can be managed using it +func InitPubSub(client *pubsub.PubSub) error { if client != nil { pubsubHub = client From 75646e1030809a3992bbe7fbeccda6b32f4ee6ae Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 3 May 2021 12:17:37 +0530 Subject: [PATCH 7/8] updated doc --- README.md | 40 ++++++++++++++++------------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index b1ab3ee..88f045d 100644 --- a/README.md +++ b/README.md @@ -81,10 +81,6 @@ During my journey of exploring Ethereum MemPool, I found good initiative from [B ## Prerequisite - Make sure you've _`Go ( >= 1.16)`_, _`make`_ installed -- You need to also have _`Redis ( >= 5.x )`_ - -> Note : Consider setting up Redis instance with password protection - - Get one Ethereum Node up & running, with `txpool` RPC API enabled. You can always use SaaS Ethereum node. ## Installation @@ -112,10 +108,6 @@ PendingTxEntryTopic=pending_pool_entry PendingTxExitTopic=pending_pool_exit QueuedTxEntryTopic=queued_pool_entry QueuedTxExitTopic=queued_pool_exit -RedisConnection=tcp -RedisAddress=127.0.0.1:6379 -RedisPassword=password -RedisDB=1 ConcurrencyFactor=10 Port=7000 ``` @@ -127,14 +119,10 @@ WSUrl | To be used for listening to newly mined block headers MemPoolPollingPeriod | RPC node's mempool to be checked every `X` milliseconds PendingPoolSize | #-of pending tx(s) to be kept in-memory at a time QueuedPoolSize | #-of queued tx(s) to be kept in-memory at a time -PendingTxEntryTopic | Whenever tx enters pending pool, it'll be published on Redis topic `t` -PendingTxExitTopic | Whenever tx leaves pending pool, it'll be published on Redis topic `t` -QueuedTxEntryTopic | Whenever tx enters queued pool, it'll be published on Redis topic `t` -QueuedTxExitTopic | Whenever tx leaves queued pool, it'll be published on Redis topic `t` -RedisConnection | Communicate with Redis over transport protocol -RedisAddress | `address:port` combination of Redis -RedisPassword | Authentication details for talking to Redis. **[ Not mandatory ]** -RedisDB | Redis database to be used. **[ By default there're 16 of them ]** +PendingTxEntryTopic | Whenever tx enters pending pool, it'll be published on Pub/Sub topic `t` +PendingTxExitTopic | Whenever tx leaves pending pool, it'll be published on Pub/Sub topic `t` +QueuedTxEntryTopic | Whenever tx enters queued pool, it'll be published on Pub/Sub topic `t` +QueuedTxExitTopic | Whenever tx leaves queued pool, it'll be published on Pub/Sub topic `t` ConcurrencyFactor | Whenever concurrency can be leveraged, `harmony` will create worker pool with `#-of logical CPUs x ConcurrencyFactor` go routines. **[ Can be float too ]** Port | Starts HTTP server on this port ( > 1024 ) @@ -157,7 +145,7 @@ NetworkingStream=this-is-stream NetworkingBootstrap= ``` -As `harmony` nodes will form a P2P network, you need to **first** switch networking on, by setting `NetworkingEnabled` to `true` ( default value is `false` ). +As `harmony` nodes will form a P2P mesh network, you need to **first** switch networking on, by setting `NetworkingEnabled` to `true` ( default value is `false` ). > If you explicitly set this field to `false`, all `Networking*` fields to be ignored. @@ -186,8 +174,6 @@ This way you can keep adding `N`-many nodes to your cluster. ✅ **This is recommended practice, but you can always test multi-node set up, while relying on same Ethereum Node. In that case your interest can be putting all these `harmony` instances behind load balancer & serving client requests in better fashion & it's perfectly okay.** -> ❗️ If you're using same Redis instance for multiple `harmony` nodes, make sure you've changed DB identifier or Pub/Sub topic names, to avoid any kind of clash. - --- - Let's build & run `harmony` @@ -215,18 +201,24 @@ You'll receive response like 👇 ```json { - "pendingPoolSize": 67, - "queuedPoolSize": 0, - "uptime": "29.214603s", - "networkID": 137 + "pendingPoolSize": 257530, + "queuedPoolSize": 55278, + "uptime": "271h54m7.240520958s", + "processed": 15808071, + "latestBlock": 12359655, + "latestSeenAgo": "8.46197605s", + "networkID": 1 } ``` Field | Interpretation --- | --- pendingPoolSize | Currently these many tx(s) are in pending state i.e. waiting to be picked up by some miner when next block gets mined -queuedPoolSize | These tx(s) are stuck, will only be eligible for mining when lower nonce tx(s) of same wallet gets mined +queuedPoolSize | These tx(s) are stuck, will only be eligible for mining when lower nonce tx(s) of same wallet joins pending pool uptime | This mempool monitoring engine is alive for last `t` time unit +processed | Mempool has seen `N` tx(s) getting confirmed/ dropped i.e. permanently leaving pool +latestBlock | Last block, mempool heard of, from RPC Node +lastestSeenAgo | Last block was seen `t` time unit ago networkID | The mempool monitoring engine keeps track of mempool of this network ### Mempool From 519b5fbae3fc5740640c23f4eb26c40b40944674 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Sat, 8 May 2021 18:06:07 +0530 Subject: [PATCH 8/8] updated to latest version of `pubsub` --- app/bootup/bootup.go | 5 +---- app/data/pending.go | 4 ++-- app/data/queued.go | 4 ++-- app/graph/util.go | 33 +++++++++++++++++++-------------- app/networking/listen.go | 7 +------ go.mod | 4 ++-- go.sum | 10 ++++++---- 7 files changed, 33 insertions(+), 34 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 149fb9d..2d1a956 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -54,10 +54,7 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { return nil, err } - _pubsub := pubsub.New() - go _pubsub.Start(ctx) - - <-time.After(time.Duration(1) * time.Millisecond) + _pubsub := pubsub.New(ctx) if !_pubsub.Alive { return nil, errors.New("failed to start pub/sub hub") } diff --git a/app/data/pending.go b/app/data/pending.go index 880ec97..694f9df 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -1031,7 +1031,7 @@ func (p *PendingPool) PublishAdded(ctx context.Context, msg *MemPoolTx) { } if ok, _ := p.PubSub.Publish(&pubsub.Message{ - Topics: []string{config.GetPendingTxEntryPublishTopic()}, + Topics: []pubsub.String{pubsub.String(config.GetPendingTxEntryPublishTopic())}, Data: _msg, }); !ok { log.Printf("[❗️] Failed to publish new pending tx\n") @@ -1063,7 +1063,7 @@ func (p *PendingPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) { } if ok, _ := p.PubSub.Publish(&pubsub.Message{ - Topics: []string{config.GetPendingTxExitPublishTopic()}, + Topics: []pubsub.String{pubsub.String(config.GetPendingTxExitPublishTopic())}, Data: _msg, }); !ok { log.Printf("[❗️] Failed to publish confirmed/dropped tx\n") diff --git a/app/data/queued.go b/app/data/queued.go index 1bf3717..ccca228 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -783,7 +783,7 @@ func (q *QueuedPool) PublishAdded(ctx context.Context, msg *MemPoolTx) { } if ok, _ := q.PubSub.Publish(&pubsub.Message{ - Topics: []string{config.GetQueuedTxEntryPublishTopic()}, + Topics: []pubsub.String{pubsub.String(config.GetQueuedTxEntryPublishTopic())}, Data: _msg, }); !ok { log.Printf("[❗️] Failed to publish new queued tx\n") @@ -817,7 +817,7 @@ func (q *QueuedPool) PublishRemoved(ctx context.Context, msg *MemPoolTx) { } if ok, _ := q.PubSub.Publish(&pubsub.Message{ - Topics: []string{config.GetQueuedTxExitPublishTopic()}, + Topics: []pubsub.String{pubsub.String(config.GetQueuedTxExitPublishTopic())}, Data: _msg, }); !ok { log.Printf("[❗️] Failed to publish unstuck tx\n") diff --git a/app/graph/util.go b/app/graph/util.go index 98a3fbd..ddabb26 100644 --- a/app/graph/util.go +++ b/app/graph/util.go @@ -105,12 +105,24 @@ func checkHash(hash string) bool { } +// _pubsubCompatibleStrings - Given topics as string ( go standard type ) +// slice converts that into pubsub compatible string slice +func _pubsubCompatibleStrings(topics []string) []pubsub.String { + _topics := make([]pubsub.String, 0, len(topics)) + + for i := 0; i < len(topics); i++ { + _topics = append(_topics, pubsub.String(topics[i])) + } + + return _topics +} + // SubscribeToTopic - Subscribes to PubSub topic(s), while configuring subscription such // that at max 256 messages can be kept in buffer at a time. If client is consuming slowly -// it might miss some messages when buffer stays full. +// buffer size will be extended. func SubscribeToTopic(ctx context.Context, topic ...string) (*pubsub.Subscriber, error) { - _sub := pubsubHub.Subscribe(256, topic...) + _sub := pubsubHub.Subscribe(ctx, 256, topic...) if _sub == nil { return nil, errors.New("topic subscription failed") } @@ -225,7 +237,7 @@ func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, topics // Denotes `harmony` is being shutdown // // We must unsubscribe from all topics & get out of this infinite loop - UnsubscribeFromTopic(context.Background(), subscriber, topics...) + UnsubscribeFromTopic(context.Background(), subscriber) break OUTER @@ -234,7 +246,7 @@ func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, topics // Denotes client is not active anymore // // We must unsubscribe from all topics & get out of this infinite loop - UnsubscribeFromTopic(context.Background(), subscriber, topics...) + UnsubscribeFromTopic(context.Background(), subscriber) break OUTER @@ -269,18 +281,11 @@ func ListenToMessages(ctx context.Context, subscriber *pubsub.Subscriber, topics } -// UnsubscribeFromTopic - Given topic name to which client is already subscribed to, -// attempts to unsubscribe from -func UnsubscribeFromTopic(ctx context.Context, subscriber *pubsub.Subscriber, topic ...string) { - - if ok, _ := subscriber.UnsubscribeAll(pubsubHub); !ok { +// UnsubscribeFromTopic - Unsubscribes subscriber from all topics +func UnsubscribeFromTopic(ctx context.Context, subscriber *pubsub.Subscriber) { + if ok, _ := subscriber.UnsubscribeAll(); !ok { log.Printf("[❗️] Failed to unsubscribe from topic(s)\n") } - - if !subscriber.Close() { - log.Printf("[❗️] Failed to destroy subscriber\n") - } - } // UnmarshalPubSubMessage - Attempts to unmarshal message pack serialized diff --git a/app/networking/listen.go b/app/networking/listen.go index 27c4362..2fe89ed 100644 --- a/app/networking/listen.go +++ b/app/networking/listen.go @@ -75,11 +75,6 @@ func WriteTo(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWrite // @note Deferred functions are executed in LIFO order defer cancel() - topics := []string{config.GetQueuedTxEntryPublishTopic(), - config.GetQueuedTxExitPublishTopic(), - config.GetPendingTxEntryPublishTopic(), - config.GetPendingTxExitPublishTopic()} - subscriber, err := graph.SubscribeToMemPool(ctx) if err != nil { log.Printf("[❗️] Failed to subscribe to mempool changes : %s\n", err.Error()) @@ -128,7 +123,7 @@ func WriteTo(ctx context.Context, cancel context.CancelFunc, rw *bufio.ReadWrite } - graph.UnsubscribeFromTopic(ctx, subscriber, topics...) + graph.UnsubscribeFromTopic(ctx, subscriber) } diff --git a/go.mod b/go.mod index 76b9948..6e93dec 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/go-ole/go-ole v1.2.5 // indirect github.com/google/go-cmp v0.5.4 // indirect github.com/gorilla/websocket v1.4.2 - github.com/itzmeanjan/pubsub v0.1.2 + github.com/itzmeanjan/pubsub v0.1.4 github.com/labstack/echo/v4 v4.2.0 github.com/libp2p/go-libp2p v0.13.0 github.com/libp2p/go-libp2p-connmgr v0.2.4 @@ -25,6 +25,7 @@ require ( github.com/magiconair/properties v1.8.4 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/multiformats/go-multiaddr v0.3.1 + github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onsi/ginkgo v1.15.0 // indirect github.com/onsi/gomega v1.10.5 // indirect github.com/pelletier/go-toml v1.8.1 // indirect @@ -38,7 +39,6 @@ require ( github.com/vmihailenco/msgpack/v5 v5.2.0 golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 // indirect - golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b // indirect golang.org/x/text v0.3.5 // indirect gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index c45f207..f012814 100644 --- a/go.sum +++ b/go.sum @@ -251,6 +251,7 @@ github.com/google/uuid v1.1.5 h1:kxhtnfFVi+rYdOALN0B3k9UT86zVJKfBimRaciULW4I= github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v0.0.0-20160226214623-1ea25387ff6f/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= @@ -356,8 +357,8 @@ github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= -github.com/itzmeanjan/pubsub v0.1.2 h1:JHFn/zPSdgmhP4+YkIMbReXRK7HmJNnLzlUGLRQo3E4= -github.com/itzmeanjan/pubsub v0.1.2/go.mod h1:LdGbd7JeJ2Z3BzyhrXjk65+tAH4QDvROkL/m6ff/lrk= +github.com/itzmeanjan/pubsub v0.1.4 h1:cKhELtYkPKW6RV60wzrwWKe/qxv1emJ24uNxy/AYXq4= +github.com/itzmeanjan/pubsub v0.1.4/go.mod h1:SaVrEsAQmxY70qWQIYQ9rWLXLElsicMcxPddiuXKfLU= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= @@ -917,6 +918,7 @@ github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+m github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= +github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -1087,8 +1089,8 @@ golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210217105451-b926d437f341/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b h1:ggRgirZABFolTmi3sn6Ivd9SipZwLedQ5wR0aAKnFxU= -golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=