From 53fd31330de3fc8568e68fecfd046b14aea34083 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 26 Apr 2021 07:18:33 +0530 Subject: [PATCH 1/3] rather than just keeping tx hashes which were removed/ dropped, also keeping timestamp when last time it was seen to be attempting to get into pool --- app/bootup/bootup.go | 8 ++++---- app/data/pending.go | 14 ++++++-------- app/data/queued.go | 23 ++++++++++++----------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 16d7efc..3007517 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -114,8 +114,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { pendingPool := &data.PendingPool{ Transactions: make(map[common.Hash]*data.MemPoolTx), TxsFromAddress: make(map[common.Address]data.TxList), - DroppedTxs: make(map[common.Hash]bool), - RemovedTxs: make(map[common.Hash]bool), + DroppedTxs: make(map[common.Hash]time.Time), + RemovedTxs: make(map[common.Hash]time.Time), AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetPendingPoolSize()), DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetPendingPoolSize()), Done: 0, @@ -142,8 +142,8 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { queuedPool := &data.QueuedPool{ Transactions: make(map[common.Hash]*data.MemPoolTx), TxsFromAddress: make(map[common.Address]data.TxList), - DroppedTxs: make(map[common.Hash]bool), - RemovedTxs: make(map[common.Hash]bool), + DroppedTxs: make(map[common.Hash]time.Time), + RemovedTxs: make(map[common.Hash]time.Time), AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, config.GetQueuedPoolSize()), DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, config.GetQueuedPoolSize()), AddTxChan: make(chan data.AddRequest, 1), diff --git a/app/data/pending.go b/app/data/pending.go index 7219c87..c402474 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -20,8 +20,8 @@ import ( type PendingPool struct { Transactions map[common.Hash]*MemPoolTx TxsFromAddress map[common.Address]TxList - DroppedTxs map[common.Hash]bool - RemovedTxs map[common.Hash]bool + DroppedTxs map[common.Hash]time.Time + RemovedTxs map[common.Hash]time.Time AscTxsByGasPrice TxList DescTxsByGasPrice TxList Done uint64 @@ -133,7 +133,7 @@ func (p *PendingPool) Start(ctx context.Context) { // is due to the fact, no other competing // worker attempting to read from/ write to // this one, now - p.DroppedTxs[tx.Hash] = true + p.DroppedTxs[tx.Hash] = time.Now().UTC() } @@ -145,19 +145,17 @@ func (p *PendingPool) Start(ctx context.Context) { } if _, ok := p.DroppedTxs[tx.Hash]; ok { + p.DroppedTxs[tx.Hash] = time.Now().UTC() return false } if _, ok := p.RemovedTxs[tx.Hash]; ok { + p.RemovedTxs[tx.Hash] = time.Now().UTC() return false } if needToDropTxs() { dropTx(pickTxWithLowestGasPrice()) - - if len(p.DroppedTxs)%10 == 0 { - log.Printf("๐Ÿงน Dropped 10 pending txs, was about to hit limit\n") - } } // Marking we found this tx in mempool now @@ -235,7 +233,7 @@ func (p *PendingPool) Start(ctx context.Context) { if removed { // Marking that tx has been removed, so that // it won't get picked up next time - p.RemovedTxs[req.TxStat.Hash] = true + p.RemovedTxs[req.TxStat.Hash] = time.Now().UTC() p.Done++ } diff --git a/app/data/queued.go b/app/data/queued.go index a05033e..65b78bc 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -22,8 +22,8 @@ import ( type QueuedPool struct { Transactions map[common.Hash]*MemPoolTx TxsFromAddress map[common.Address]TxList - DroppedTxs map[common.Hash]bool - RemovedTxs map[common.Hash]bool + DroppedTxs map[common.Hash]time.Time + RemovedTxs map[common.Hash]time.Time AscTxsByGasPrice TxList DescTxsByGasPrice TxList AddTxChan chan AddRequest @@ -118,7 +118,7 @@ func (q *QueuedPool) Start(ctx context.Context) { removeTx(tx) // Marking that tx has been dropped, so that // it won't get picked up next time - q.DroppedTxs[tx.Hash] = true + q.DroppedTxs[tx.Hash] = time.Now().UTC() } @@ -129,19 +129,17 @@ func (q *QueuedPool) Start(ctx context.Context) { } if _, ok := q.DroppedTxs[tx.Hash]; ok { + q.DroppedTxs[tx.Hash] = time.Now().UTC() return false } if _, ok := q.RemovedTxs[tx.Hash]; ok { + q.RemovedTxs[tx.Hash] = time.Now().UTC() return false } if needToDropTxs() { dropTx(pickTxWithLowestGasPrice()) - - if len(q.DroppedTxs)%10 == 0 { - log.Printf("๐Ÿงน Dropped 10 queued txs, was about to hit limit\n") - } } // Marking we found this tx in mempool now @@ -184,11 +182,14 @@ func (q *QueuedPool) Start(ctx context.Context) { case req := <-q.RemoveTxChan: // if removed will return non-nil reference to removed tx - req.ResponseChan <- txRemover(req.Hash) + removed := txRemover(req.Hash) + req.ResponseChan <- removed - // Marking that tx has been removed, so that - // it won't get picked up next time - q.RemovedTxs[req.Hash] = true + if removed != nil { + // Marking that tx has been removed, so that + // it won't get picked up next time + q.RemovedTxs[req.Hash] = time.Now().UTC() + } case req := <-q.TxExistsChan: From c9bc37281a52cc25bdd3f55452218d228d227472 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 26 Apr 2021 07:28:29 +0530 Subject: [PATCH 2/3] after 1 hour of keeping track of which txs were removed from queued pool, free some memory --- app/data/queued.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/app/data/queued.go b/app/data/queued.go index 65b78bc..7c1fa8e 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -261,6 +261,36 @@ func (q *QueuedPool) Start(ctx context.Context) { req.ResponseChan <- nil + case <-time.After(time.Duration(1) * time.Millisecond): + // After 1 hour of keeping entries which were previously removed + // are now being deleted from memory, so that memory usage for keeping track of + // which were removed in past doesn't become a problem for us. + // + // 1 hour is just a random time period, it can be possibly improved + // + // Just hoping after 1 hour of last time this tx was seen to be added + // into this pool, it has been either dropped/ removed/ unstuck, so it won't + // be attempted to be added here again + + for k := range q.DroppedTxs { + + if time.Now().UTC().Sub(q.DroppedTxs[k]) > time.Duration(1)*time.Hour { + delete(q.DroppedTxs, k) + } + + } + + case <-time.After(time.Duration(1) * time.Millisecond): + // Read ๐Ÿ‘† comment + + for k := range q.RemovedTxs { + + if time.Now().UTC().Sub(q.RemovedTxs[k]) > time.Duration(1)*time.Hour { + delete(q.RemovedTxs, k) + } + + } + } } From f34a3e62b0f65e66025bcbe00559ba29b8c97ad0 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 26 Apr 2021 07:30:45 +0530 Subject: [PATCH 3/3] after 1 hour of tracking, clean some memory in pending pool --- app/data/pending.go | 30 ++++++++++++++++++++++++++++++ app/data/queued.go | 2 +- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/app/data/pending.go b/app/data/pending.go index c402474..c4e6f82 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -332,6 +332,36 @@ func (p *PendingPool) Start(ctx context.Context) { req <- LastSeenBlock{Number: p.LastSeenBlock, At: p.LastSeenAt} + case <-time.After(time.Duration(1) * time.Millisecond): + // After 1 hour of keeping entries which were previously removed + // are now being deleted from memory, so that memory usage for keeping track of + // which were removed in past doesn't become a problem for us. + // + // 1 hour is just a random time period, it can be possibly improved + // + // Just hoping after 1 hour of last time this tx was seen to be added + // into this pool, it has been either dropped/ confirmed, so it won't + // be attempted to be added here again + + for k := range p.DroppedTxs { + + if time.Now().UTC().Sub(p.DroppedTxs[k]) > time.Duration(1)*time.Hour { + delete(p.DroppedTxs, k) + } + + } + + case <-time.After(time.Duration(1) * time.Millisecond): + // Read ๐Ÿ‘† comment + + for k := range p.RemovedTxs { + + if time.Now().UTC().Sub(p.RemovedTxs[k]) > time.Duration(1)*time.Hour { + delete(p.RemovedTxs, k) + } + + } + } } diff --git a/app/data/queued.go b/app/data/queued.go index 7c1fa8e..8d90bca 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -269,7 +269,7 @@ func (q *QueuedPool) Start(ctx context.Context) { // 1 hour is just a random time period, it can be possibly improved // // Just hoping after 1 hour of last time this tx was seen to be added - // into this pool, it has been either dropped/ removed/ unstuck, so it won't + // into this pool, it has been either dropped/ confirmed/ unstuck, so it won't // be attempted to be added here again for k := range q.DroppedTxs {