From 5efe27f0de96ebb3a47eac3495ff983b54139417 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Mon, 29 Mar 2021 22:41:14 +0530 Subject: [PATCH 01/20] don't sort when can be avoided --- app/data/pool.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/app/data/pool.go b/app/data/pool.go index 08a7a5d..acfca1a 100644 --- a/app/data/pool.go +++ b/app/data/pool.go @@ -145,32 +145,40 @@ func (m *MemPool) TopXQueuedWithLowGasPrice(x uint64) []*MemPoolTx { func (m *MemPool) Process(ctx context.Context, rpc *rpc.Client, pubsub *redis.Client, pending map[string]map[string]*MemPoolTx, queued map[string]map[string]*MemPoolTx) { start := time.Now().UTC() - if v := m.Queued.RemoveUnstuck(ctx, rpc, pubsub, m.Pending, pending, queued); v != 0 { - log.Printf("[➖] Removed %d unstuck tx(s) from queued tx pool, in %s\n", v, time.Now().UTC().Sub(start)) + removedQ := m.Queued.RemoveUnstuck(ctx, rpc, pubsub, m.Pending, pending, queued) + + if removedQ != 0 { + log.Printf("[➖] Removed %d unstuck tx(s) from queued tx pool, in %s\n", removedQ, time.Now().UTC().Sub(start)) } start = time.Now().UTC() - if v := m.Queued.AddQueued(ctx, pubsub, queued); v != 0 { - log.Printf("[➕] Added %d tx(s) to queued tx pool, in %s\n", v, time.Now().UTC().Sub(start)) + addedQ := m.Queued.AddQueued(ctx, pubsub, queued) + + if addedQ != 0 { + log.Printf("[➕] Added %d tx(s) to queued tx pool, in %s\n", addedQ, time.Now().UTC().Sub(start)) } start = time.Now().UTC() - if m.Queued.SortTxs() { + if (removedQ != 0 || addedQ != 0) && m.Queued.SortTxs() { log.Printf("[➕] Sorted queued pool tx(s), in %s\n", time.Now().UTC().Sub(start)) } start = time.Now().UTC() - if v := m.Pending.RemoveConfirmedAndDropped(ctx, rpc, pubsub, pending); v != 0 { - log.Printf("[➖] Removed %d confirmed/ dropped tx(s) from pending tx pool, in %s\n", v, time.Now().UTC().Sub(start)) + removedP := m.Pending.RemoveConfirmedAndDropped(ctx, rpc, pubsub, pending) + + if removedP != 0 { + log.Printf("[➖] Removed %d confirmed/ dropped tx(s) from pending tx pool, in %s\n", removedP, time.Now().UTC().Sub(start)) } start = time.Now().UTC() - if v := m.Pending.AddPendings(ctx, pubsub, pending); v != 0 { - log.Printf("[➕] Added %d tx(s) to pending tx pool, in %s\n", v, time.Now().UTC().Sub(start)) + addedP := m.Pending.AddPendings(ctx, pubsub, pending) + + if addedP != 0 { + log.Printf("[➕] Added %d tx(s) to pending tx pool, in %s\n", addedP, time.Now().UTC().Sub(start)) } start = time.Now().UTC() - if m.Pending.SortTxs() { + if (removedP != 0 || addedP != 0) && m.Pending.SortTxs() { log.Printf("[➕] Sorted pending pool tx(s), in %s\n", time.Now().UTC().Sub(start)) } From 206b2b351701c038e855be0a43797f4f4a0220ea Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 08:54:00 +0530 Subject: [PATCH 02/20] insert new tx into sorted tx slice --- app/data/txs.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 app/data/txs.go diff --git a/app/data/txs.go b/app/data/txs.go new file mode 100644 index 0000000..581f402 --- /dev/null +++ b/app/data/txs.go @@ -0,0 +1,61 @@ +package data + +// Insert - Insert into array of sorted ( in terms of gas price paid ) +// mempool txs & keep it sorted +// +// If more memory allocation is required for inserting new element, it'll +// be done & new slice to be returned +func Insert(txs MemPoolTxsDesc, tx *MemPoolTx) MemPoolTxsDesc { + + n := len(txs) + idx := findInsertionPoint(txs, 0, n-1, tx) + + if n+1 <= cap(txs) { + + _txs := txs[:n+1] + + copy(_txs[idx+1:], txs[idx:]) + copy(_txs[idx:], []*MemPoolTx{tx}) + + return _txs + + } + + _txs := make([]*MemPoolTx, 0, n+1) + + copy(_txs, txs[:idx]) + copy(_txs[idx:], []*MemPoolTx{tx}) + copy(_txs[idx+1:], txs[idx:]) + + return _txs + +} + +// findInsertionPoint - Find index at which newly arrived tx should be entered to +// keep this slice sorted +func findInsertionPoint(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { + + if low > high { + return 0 + } + + if low == high { + + if BigHexToBigDecimal(txs[low].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0 { + return low + } + + return low + 1 + + } + + mid := (low + high) / 2 + if BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0 { + + return findInsertionPoint(txs, low, mid, tx) + + } + + return findInsertionPoint(txs, mid+1, high, tx) + +} From 5235fa97a1610f0f0a109c5a45ecbc81cc477763 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 09:06:20 +0530 Subject: [PATCH 03/20] find already present tx (in slice) using binary search --- app/data/txs.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/app/data/txs.go b/app/data/txs.go index 581f402..a3b67ab 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -59,3 +59,29 @@ func findInsertionPoint(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) in return findInsertionPoint(txs, mid+1, high, tx) } + +// findTx - Find index of tx, which is already present in this slice +func findTx(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { + + if low > high { + return -1 + } + + if low == high { + + if txs[low].Hash == tx.Hash { + return low + } + + return -1 + + } + + mid := (low + high) / 2 + if BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) >= 0 { + return findTx(txs, low, mid, tx) + } + + return findTx(txs, mid+1, high, tx) + +} From 8dae67256b366e3634c5523d29412005dd163e48 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 09:21:45 +0530 Subject: [PATCH 04/20] remove existing tx entry from slice --- app/data/txs.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/app/data/txs.go b/app/data/txs.go index a3b67ab..f517658 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -31,6 +31,24 @@ func Insert(txs MemPoolTxsDesc, tx *MemPoolTx) MemPoolTxsDesc { } +// Remove - Removes existing entry from sorted ( in terms of gas price paid ) slice of txs +func Remove(txs MemPoolTxsDesc, tx *MemPoolTx) MemPoolTxsDesc { + + n := len(txs) + idx := findTx(txs, 0, n-1, tx) + if idx == -1 { + // denotes nothing to delete + return txs + } + + copy(txs[idx:], txs[idx+1:]) + txs[n-1] = nil + txs = txs[:n-1] + + return txs + +} + // findInsertionPoint - Find index at which newly arrived tx should be entered to // keep this slice sorted func findInsertionPoint(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { From e1a52281966824c5bdd546426798dbc0c51b73e6 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 10:10:00 +0530 Subject: [PATCH 05/20] descending sorting --- app/data/tx.go | 26 -------------------------- app/data/txs.go | 12 +++++++++--- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/app/data/tx.go b/app/data/tx.go index 756bdf0..36d2457 100644 --- a/app/data/tx.go +++ b/app/data/tx.go @@ -12,32 +12,6 @@ import ( "github.com/vmihailenco/msgpack/v5" ) -// MemPoolTxsDesc - List of mempool tx(s) -// -// @note This structure to be used for sorting tx(s) -// in descending way, using gas price they're paying -type MemPoolTxsDesc []*MemPoolTx - -// Len - Count of tx(s) present in mempool -func (m *MemPoolTxsDesc) Len() int { - return len(*m) -} - -// Swap - Swap two tx(s), given their index in slice -func (m *MemPoolTxsDesc) Swap(i, j int) { - - (*m)[i], (*m)[j] = (*m)[j], (*m)[i] - -} - -// Less - Actual sorting logic i.e. higher gas price -// tx gets prioritized -func (m *MemPoolTxsDesc) Less(i, j int) bool { - - return BigHexToBigDecimal((*m)[i].GasPrice).Cmp(BigHexToBigDecimal((*m)[j].GasPrice)) >= 0 - -} - // MemPoolTx - This is how tx is placed in mempool, after performing // RPC call for fetching currently pending/ queued tx(s) in mempool // it'll be destructured into this format, for further computation diff --git a/app/data/txs.go b/app/data/txs.go index f517658..ff16679 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -1,5 +1,11 @@ package data +// MemPoolTxsDesc - List of mempool tx(s) +// +// @note This structure to be used for sorting tx(s) +// in descending way, using gas price they're paying +type MemPoolTxsDesc []*MemPoolTx + // Insert - Insert into array of sorted ( in terms of gas price paid ) // mempool txs & keep it sorted // @@ -59,7 +65,7 @@ func findInsertionPoint(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) in if low == high { - if BigHexToBigDecimal(txs[low].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0 { + if !(BigHexToBigDecimal(txs[low].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { return low } @@ -68,7 +74,7 @@ func findInsertionPoint(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) in } mid := (low + high) / 2 - if BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0 { + if !(BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { return findInsertionPoint(txs, low, mid, tx) @@ -96,7 +102,7 @@ func findTx(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { } mid := (low + high) / 2 - if BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) >= 0 { + if !(BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) >= 0) { return findTx(txs, low, mid, tx) } From 841922b6d3e6b19be9f7f7daf124c1e5618ab157 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 10:10:30 +0530 Subject: [PATCH 06/20] don't explicitly call sort, to be inserted/ removed from slice, while keeping order maintained --- app/bootup/bootup.go | 2 ++ app/data/pending.go | 30 +++++------------------------- app/data/pool.go | 22 ++++------------------ app/data/queued.go | 31 +++++-------------------------- 4 files changed, 16 insertions(+), 69 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index f8f15a8..2565ed8 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -101,10 +101,12 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { pool := &data.MemPool{ Pending: &data.PendingPool{ Transactions: make(map[common.Hash]*data.MemPoolTx), + SortedTxs: make(data.MemPoolTxsDesc, 0, 1024), Lock: &sync.RWMutex{}, }, Queued: &data.QueuedPool{ Transactions: make(map[common.Hash]*data.MemPoolTx), + SortedTxs: make(data.MemPoolTxsDesc, 0, 1024), Lock: &sync.RWMutex{}, }, } diff --git a/app/data/pending.go b/app/data/pending.go index cb5df40..9634bd0 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -3,7 +3,6 @@ package data import ( "context" "log" - "sort" "sync" "time" @@ -259,6 +258,9 @@ func (p *PendingPool) Add(ctx context.Context, pubsub *redis.Client, tx *MemPool // After adding new tx in pending pool, also attempt to // publish it to pubsub topic p.PublishAdded(ctx, pubsub, tx) + // Insert into sorted pending tx list, keep sorted + p.SortedTxs = Insert(p.SortedTxs, tx) + return true } @@ -309,6 +311,8 @@ func (p *PendingPool) Remove(ctx context.Context, pubsub *redis.Client, txStat * // Publishing this confirmed tx p.PublishRemoved(ctx, pubsub, tx) + // Remove from sorted tx list, keep it sorted + p.SortedTxs = Remove(p.SortedTxs, tx) delete(p.Transactions, txStat.Hash) @@ -483,27 +487,3 @@ func (p *PendingPool) AddPendings(ctx context.Context, pubsub *redis.Client, txs return count } - -// SortTxs - Sorts current pending tx list ascendingly -// as per gas price paid by senders -// -// @note This is supposed to be invoked after every time you add -// new tx(s) to pending pool -func (p *PendingPool) SortTxs() bool { - - txs := MemPoolTxsDesc(p.ListTxs()) - - if len(txs) == 0 { - return false - } - - sort.Sort(&txs) - - p.Lock.Lock() - defer p.Lock.Unlock() - - p.SortedTxs = txs - - return true - -} diff --git a/app/data/pool.go b/app/data/pool.go index acfca1a..d3787a1 100644 --- a/app/data/pool.go +++ b/app/data/pool.go @@ -145,43 +145,29 @@ func (m *MemPool) TopXQueuedWithLowGasPrice(x uint64) []*MemPoolTx { func (m *MemPool) Process(ctx context.Context, rpc *rpc.Client, pubsub *redis.Client, pending map[string]map[string]*MemPoolTx, queued map[string]map[string]*MemPoolTx) { start := time.Now().UTC() - removedQ := m.Queued.RemoveUnstuck(ctx, rpc, pubsub, m.Pending, pending, queued) - if removedQ != 0 { + if removedQ := m.Queued.RemoveUnstuck(ctx, rpc, pubsub, m.Pending, pending, queued); removedQ != 0 { log.Printf("[➖] Removed %d unstuck tx(s) from queued tx pool, in %s\n", removedQ, time.Now().UTC().Sub(start)) } start = time.Now().UTC() - addedQ := m.Queued.AddQueued(ctx, pubsub, queued) - if addedQ != 0 { + if addedQ := m.Queued.AddQueued(ctx, pubsub, queued); addedQ != 0 { log.Printf("[➕] Added %d tx(s) to queued tx pool, in %s\n", addedQ, time.Now().UTC().Sub(start)) } start = time.Now().UTC() - if (removedQ != 0 || addedQ != 0) && m.Queued.SortTxs() { - log.Printf("[➕] Sorted queued pool tx(s), in %s\n", time.Now().UTC().Sub(start)) - } - - start = time.Now().UTC() - removedP := m.Pending.RemoveConfirmedAndDropped(ctx, rpc, pubsub, pending) - if removedP != 0 { + if removedP := m.Pending.RemoveConfirmedAndDropped(ctx, rpc, pubsub, pending); removedP != 0 { log.Printf("[➖] Removed %d confirmed/ dropped tx(s) from pending tx pool, in %s\n", removedP, time.Now().UTC().Sub(start)) } start = time.Now().UTC() - addedP := m.Pending.AddPendings(ctx, pubsub, pending) - if addedP != 0 { + if addedP := m.Pending.AddPendings(ctx, pubsub, pending); addedP != 0 { log.Printf("[➕] Added %d tx(s) to pending tx pool, in %s\n", addedP, time.Now().UTC().Sub(start)) } - start = time.Now().UTC() - if (removedP != 0 || addedP != 0) && m.Pending.SortTxs() { - log.Printf("[➕] Sorted pending pool tx(s), in %s\n", time.Now().UTC().Sub(start)) - } - } // Stat - Log current mempool state diff --git a/app/data/queued.go b/app/data/queued.go index 1bdbdf3..b74b8d6 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -3,7 +3,6 @@ package data import ( "context" "log" - "sort" "sync" "time" @@ -263,6 +262,9 @@ func (q *QueuedPool) Add(ctx context.Context, pubsub *redis.Client, tx *MemPoolT // As soon as we find new entry for queued pool // we publish that tx to pubsub topic q.PublishAdded(ctx, pubsub, tx) + // Insert into sorted pending tx list, keep sorted + q.SortedTxs = Insert(q.SortedTxs, tx) + return true } @@ -304,6 +306,8 @@ func (q *QueuedPool) Remove(ctx context.Context, pubsub *redis.Client, txHash co // Publishing unstuck tx, this is probably going to // enter pending pool now q.PublishRemoved(ctx, pubsub, q.Transactions[txHash]) + // Remove from sorted tx list, keep it sorted + q.SortedTxs = Remove(q.SortedTxs, tx) delete(q.Transactions, txHash) @@ -504,28 +508,3 @@ func (q *QueuedPool) AddQueued(ctx context.Context, pubsub *redis.Client, txs ma return count } - -// SortTxs - Keeping sort entries in tx list -// for tx(s) currently living in queued pool, where sorting -// is being done as per gas price paid by tx senders -// -// @note This function is supposed to be invoked every time you add -// any new tx to queued pool -func (q *QueuedPool) SortTxs() bool { - - txs := MemPoolTxsDesc(q.ListTxs()) - - if len(txs) == 0 { - return false - } - - sort.Sort(&txs) - - q.Lock.Lock() - defer q.Lock.Unlock() - - q.SortedTxs = txs - - return true - -} From 4f690d919fa8775a915cebced0ab25288e471dca Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 11:54:07 +0530 Subject: [PATCH 07/20] find index of tx from slice ( where gas price & tx hash both matches ) --- app/data/txs.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/app/data/txs.go b/app/data/txs.go index ff16679..9673940 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -84,6 +84,32 @@ func findInsertionPoint(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) in } +// findTxFromSlice - Given a slice where txs are ordered by gas price +// paid in descending order & one target tx, whose hash we already know +// we'll iteratively attempt to find out what is index of exactly that tx +// +// Only doing a binary search doesn't help, because there could be multiple +// tx(s) with same gas price & we need to find out exactly that specific entry +// matching tx hash +// +// Please note, `txs` slice is nothing but a view of original slice holding +// all ordered txs, where this subslice is starting from specific index which +// is starting point of tx(s) with this gas price paid by `tx` +func findTxFromSlice(txs MemPoolTxsDesc, tx *MemPoolTx) int { + + idx := -1 + + for i, v := range txs { + if v.Hash == tx.Hash { + idx = i + break + } + } + + return idx + +} + // findTx - Find index of tx, which is already present in this slice func findTx(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { @@ -92,17 +118,11 @@ func findTx(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { } if low == high { - - if txs[low].Hash == tx.Hash { - return low - } - - return -1 - + return findTxFromSlice(txs[low:], tx) } mid := (low + high) / 2 - if !(BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) >= 0) { + if !(BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { return findTx(txs, low, mid, tx) } From 85dc9b58abc77819a0435985284ca891d56683ff Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 12:11:31 +0530 Subject: [PATCH 08/20] find index of tx correctly, when using count of pending/ queued txs use slice length --- app/data/pending.go | 2 +- app/data/queued.go | 2 +- app/data/txs.go | 9 ++++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/app/data/pending.go b/app/data/pending.go index 9634bd0..6100996 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -54,7 +54,7 @@ func (p *PendingPool) Count() uint64 { p.Lock.RLock() defer p.Lock.RUnlock() - return uint64(len(p.Transactions)) + return uint64(len(p.SortedTxs)) } diff --git a/app/data/queued.go b/app/data/queued.go index b74b8d6..d5e0374 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -58,7 +58,7 @@ func (q *QueuedPool) Count() uint64 { q.Lock.RLock() defer q.Lock.RUnlock() - return uint64(len(q.Transactions)) + return uint64(len(q.SortedTxs)) } diff --git a/app/data/txs.go b/app/data/txs.go index 9673940..582842d 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -118,7 +118,14 @@ func findTx(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { } if low == high { - return findTxFromSlice(txs[low:], tx) + + idx := findTxFromSlice(txs[low:], tx) + if idx == -1 { + return -1 + } + + return low + idx + } mid := (low + high) / 2 From 194fd99ab1fddd0f057daaaab2e462e07bf86c8d Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 13:27:49 +0530 Subject: [PATCH 09/20] descending gas price based sorted tx list --- app/data/desc_gasprice.go | 64 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 app/data/desc_gasprice.go diff --git a/app/data/desc_gasprice.go b/app/data/desc_gasprice.go new file mode 100644 index 0000000..209eddb --- /dev/null +++ b/app/data/desc_gasprice.go @@ -0,0 +1,64 @@ +package data + +// MemPoolTxsDesc - List of mempool tx(s) +// +// @note This structure to be used for sorting tx(s) +// in descending way, using gas price they're paying +type MemPoolTxsDesc []*MemPoolTx + +// findInsertionPoint - Find index at which newly arrived tx should be entered to +// keep this slice sorted, where it's sorted descendically as per gas price paid +func (m MemPoolTxsDesc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { + + if low > high { + return 0 + } + + if low == high { + + if !(BigHexToBigDecimal(m[low].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { + return low + } + + return low + 1 + + } + + mid := (low + high) / 2 + if !(BigHexToBigDecimal(m[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { + + return m.findInsertionPoint(low, mid, tx) + + } + + return m.findInsertionPoint(mid+1, high, tx) + +} + +// findTx - Find index of tx, which is already present in this slice, where +// txs are sorted descendically as per gas price paid +func (m MemPoolTxsDesc) findTx(low int, high int, tx *MemPoolTx) int { + + if low > high { + return -1 + } + + if low == high { + + idx := findTxFromSlice(m[low:], tx) + if idx == -1 { + return -1 + } + + return low + idx + + } + + mid := (low + high) / 2 + if !(BigHexToBigDecimal(m[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { + return m.findTx(low, mid, tx) + } + + return m.findTx(mid+1, high, tx) + +} From 60be0cd39fa9e6926e78dcaf1fd106bdc64b5968 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 13:33:14 +0530 Subject: [PATCH 10/20] txs sorted in ascending order as per gas price paid --- app/data/asc_gasprice.go | 64 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 app/data/asc_gasprice.go diff --git a/app/data/asc_gasprice.go b/app/data/asc_gasprice.go new file mode 100644 index 0000000..ca1cedf --- /dev/null +++ b/app/data/asc_gasprice.go @@ -0,0 +1,64 @@ +package data + +// MemPoolTxsAsc - List of mempool tx(s) +// +// @note This structure to be used for sorting tx(s) +// in ascending way, using gas price they're paying +type MemPoolTxsAsc []*MemPoolTx + +// findInsertionPoint - Find index at which newly arrived tx should be entered to +// keep this slice sorted, where it's sorted descendically as per gas price paid +func (m MemPoolTxsAsc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { + + if low > high { + return 0 + } + + if low == high { + + if BigHexToBigDecimal(m[low].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0 { + return low + } + + return low + 1 + + } + + mid := (low + high) / 2 + if BigHexToBigDecimal(m[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0 { + + return m.findInsertionPoint(low, mid, tx) + + } + + return m.findInsertionPoint(mid+1, high, tx) + +} + +// findTx - Find index of tx, which is already present in this slice, where +// txs are sorted descendically as per gas price paid +func (m MemPoolTxsAsc) findTx(low int, high int, tx *MemPoolTx) int { + + if low > high { + return -1 + } + + if low == high { + + idx := findTxFromSlice(m[low:], tx) + if idx == -1 { + return -1 + } + + return low + idx + + } + + mid := (low + high) / 2 + if BigHexToBigDecimal(m[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) >= 0 { + return m.findTx(low, mid, tx) + } + + return m.findTx(mid+1, high, tx) + +} From 1ddb1d52c7331178b3bbac964687f13dee3b691f Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 13:38:32 +0530 Subject: [PATCH 11/20] implemented `len` method of interface --- app/data/asc_gasprice.go | 10 +++-- app/data/desc_gasprice.go | 10 +++-- app/data/txs.go | 82 +++++---------------------------------- 3 files changed, 23 insertions(+), 79 deletions(-) diff --git a/app/data/asc_gasprice.go b/app/data/asc_gasprice.go index ca1cedf..fc0391c 100644 --- a/app/data/asc_gasprice.go +++ b/app/data/asc_gasprice.go @@ -6,8 +6,13 @@ package data // in ascending way, using gas price they're paying type MemPoolTxsAsc []*MemPoolTx +// len - Number of tx(s) present in this slice +func (m MemPoolTxsAsc) len() int { + return len(m) +} + // findInsertionPoint - Find index at which newly arrived tx should be entered to -// keep this slice sorted, where it's sorted descendically as per gas price paid +// keep this slice sorted func (m MemPoolTxsAsc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { if low > high { @@ -35,8 +40,7 @@ func (m MemPoolTxsAsc) findInsertionPoint(low int, high int, tx *MemPoolTx) int } -// findTx - Find index of tx, which is already present in this slice, where -// txs are sorted descendically as per gas price paid +// findTx - Find index of tx, which is already present in this sorted slice func (m MemPoolTxsAsc) findTx(low int, high int, tx *MemPoolTx) int { if low > high { diff --git a/app/data/desc_gasprice.go b/app/data/desc_gasprice.go index 209eddb..f6a8721 100644 --- a/app/data/desc_gasprice.go +++ b/app/data/desc_gasprice.go @@ -6,8 +6,13 @@ package data // in descending way, using gas price they're paying type MemPoolTxsDesc []*MemPoolTx +// len - Number of txs present in slice +func (m MemPoolTxsDesc) len() int { + return len(m) +} + // findInsertionPoint - Find index at which newly arrived tx should be entered to -// keep this slice sorted, where it's sorted descendically as per gas price paid +// keep this slice sorted func (m MemPoolTxsDesc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { if low > high { @@ -35,8 +40,7 @@ func (m MemPoolTxsDesc) findInsertionPoint(low int, high int, tx *MemPoolTx) int } -// findTx - Find index of tx, which is already present in this slice, where -// txs are sorted descendically as per gas price paid +// findTx - Find index of tx, which is already present in this sorted slice func (m MemPoolTxsDesc) findTx(low int, high int, tx *MemPoolTx) int { if low > high { diff --git a/app/data/txs.go b/app/data/txs.go index 582842d..6d6d56a 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -1,10 +1,11 @@ package data -// MemPoolTxsDesc - List of mempool tx(s) -// -// @note This structure to be used for sorting tx(s) -// in descending way, using gas price they're paying -type MemPoolTxsDesc []*MemPoolTx +type MemPooltxs interface { + len() int + + findInsertionPoint(int, int, *MemPoolTx) int + findTx(int, int, *MemPoolTx) int +} // Insert - Insert into array of sorted ( in terms of gas price paid ) // mempool txs & keep it sorted @@ -55,47 +56,9 @@ func Remove(txs MemPoolTxsDesc, tx *MemPoolTx) MemPoolTxsDesc { } -// findInsertionPoint - Find index at which newly arrived tx should be entered to -// keep this slice sorted -func findInsertionPoint(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { - - if low > high { - return 0 - } - - if low == high { - - if !(BigHexToBigDecimal(txs[low].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { - return low - } - - return low + 1 - - } - - mid := (low + high) / 2 - if !(BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { - - return findInsertionPoint(txs, low, mid, tx) - - } - - return findInsertionPoint(txs, mid+1, high, tx) - -} - -// findTxFromSlice - Given a slice where txs are ordered by gas price -// paid in descending order & one target tx, whose hash we already know -// we'll iteratively attempt to find out what is index of exactly that tx -// -// Only doing a binary search doesn't help, because there could be multiple -// tx(s) with same gas price & we need to find out exactly that specific entry -// matching tx hash -// -// Please note, `txs` slice is nothing but a view of original slice holding -// all ordered txs, where this subslice is starting from specific index which -// is starting point of tx(s) with this gas price paid by `tx` -func findTxFromSlice(txs MemPoolTxsDesc, tx *MemPoolTx) int { +// findTxFromSlice - Given a slice of txs, attempt to linearly find +// out tx for which we've txHash given +func findTxFromSlice(txs []*MemPoolTx, tx *MemPoolTx) int { idx := -1 @@ -109,30 +72,3 @@ func findTxFromSlice(txs MemPoolTxsDesc, tx *MemPoolTx) int { return idx } - -// findTx - Find index of tx, which is already present in this slice -func findTx(txs MemPoolTxsDesc, low int, high int, tx *MemPoolTx) int { - - if low > high { - return -1 - } - - if low == high { - - idx := findTxFromSlice(txs[low:], tx) - if idx == -1 { - return -1 - } - - return low + idx - - } - - mid := (low + high) / 2 - if !(BigHexToBigDecimal(txs[mid].GasPrice).Cmp(BigHexToBigDecimal(tx.GasPrice)) > 0) { - return findTx(txs, low, mid, tx) - } - - return findTx(txs, mid+1, high, tx) - -} From a46010bb38f6b54f58b01a7d3a69aa04c096eacf Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 13:41:19 +0530 Subject: [PATCH 12/20] implemented `cap` method for getting capacity of underlying array backing slice --- app/data/asc_gasprice.go | 6 ++++++ app/data/desc_gasprice.go | 6 ++++++ app/data/txs.go | 9 +++++---- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/app/data/asc_gasprice.go b/app/data/asc_gasprice.go index fc0391c..2f258ac 100644 --- a/app/data/asc_gasprice.go +++ b/app/data/asc_gasprice.go @@ -11,6 +11,12 @@ func (m MemPoolTxsAsc) len() int { return len(m) } +// cap - Number of elements can be kept in slice +// without further memory allocation +func (m MemPoolTxsAsc) cap() int { + return cap(m) +} + // findInsertionPoint - Find index at which newly arrived tx should be entered to // keep this slice sorted func (m MemPoolTxsAsc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { diff --git a/app/data/desc_gasprice.go b/app/data/desc_gasprice.go index f6a8721..2ed1679 100644 --- a/app/data/desc_gasprice.go +++ b/app/data/desc_gasprice.go @@ -11,6 +11,12 @@ func (m MemPoolTxsDesc) len() int { return len(m) } +// cap - Number of elements can be kept in slice +// without further memory allocation +func (m MemPoolTxsDesc) cap() int { + return cap(m) +} + // findInsertionPoint - Find index at which newly arrived tx should be entered to // keep this slice sorted func (m MemPoolTxsDesc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { diff --git a/app/data/txs.go b/app/data/txs.go index 6d6d56a..b94c0e9 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -1,7 +1,8 @@ package data -type MemPooltxs interface { +type TxList interface { len() int + cap() int findInsertionPoint(int, int, *MemPoolTx) int findTx(int, int, *MemPoolTx) int @@ -12,10 +13,10 @@ type MemPooltxs interface { // // If more memory allocation is required for inserting new element, it'll // be done & new slice to be returned -func Insert(txs MemPoolTxsDesc, tx *MemPoolTx) MemPoolTxsDesc { +func Insert(txs TxList, tx *MemPoolTx) MemPoolTxsDesc { - n := len(txs) - idx := findInsertionPoint(txs, 0, n-1, tx) + n := txs.len() + idx := txs.findInsertionPoint(0, n-1, tx) if n+1 <= cap(txs) { From 4adb6029115519428a38f81b929793b18c5f782e Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 13:45:40 +0530 Subject: [PATCH 13/20] return all txs as slice --- app/data/asc_gasprice.go | 5 +++++ app/data/desc_gasprice.go | 5 +++++ app/data/txs.go | 3 ++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/app/data/asc_gasprice.go b/app/data/asc_gasprice.go index 2f258ac..e5b6700 100644 --- a/app/data/asc_gasprice.go +++ b/app/data/asc_gasprice.go @@ -17,6 +17,11 @@ func (m MemPoolTxsAsc) cap() int { return cap(m) } +// get - Return slice of txs +func (m MemPoolTxsAsc) get() []*MemPoolTx { + return m +} + // findInsertionPoint - Find index at which newly arrived tx should be entered to // keep this slice sorted func (m MemPoolTxsAsc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { diff --git a/app/data/desc_gasprice.go b/app/data/desc_gasprice.go index 2ed1679..8da8b31 100644 --- a/app/data/desc_gasprice.go +++ b/app/data/desc_gasprice.go @@ -17,6 +17,11 @@ func (m MemPoolTxsDesc) cap() int { return cap(m) } +// get - Return slice of txs +func (m MemPoolTxsDesc) get() []*MemPoolTx { + return m +} + // findInsertionPoint - Find index at which newly arrived tx should be entered to // keep this slice sorted func (m MemPoolTxsDesc) findInsertionPoint(low int, high int, tx *MemPoolTx) int { diff --git a/app/data/txs.go b/app/data/txs.go index b94c0e9..5149d03 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -3,6 +3,7 @@ package data type TxList interface { len() int cap() int + get() []*MemPoolTx findInsertionPoint(int, int, *MemPoolTx) int findTx(int, int, *MemPoolTx) int @@ -18,7 +19,7 @@ func Insert(txs TxList, tx *MemPoolTx) MemPoolTxsDesc { n := txs.len() idx := txs.findInsertionPoint(0, n-1, tx) - if n+1 <= cap(txs) { + if n+1 <= txs.cap() { _txs := txs[:n+1] From fe7a9bf8fdd89cc035135d19624452c7bca2ef88 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 14:49:10 +0530 Subject: [PATCH 14/20] insert into generic txlist, which is nothing but one slice which is sorted in terms of certain field --- app/data/txs.go | 59 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/app/data/txs.go b/app/data/txs.go index 5149d03..b007378 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -9,52 +9,77 @@ type TxList interface { findTx(int, int, *MemPoolTx) int } -// Insert - Insert into array of sorted ( in terms of gas price paid ) -// mempool txs & keep it sorted +// Insert - Insert tx into slice of sorted mempool txs, while keeping it sorted // // If more memory allocation is required for inserting new element, it'll // be done & new slice to be returned -func Insert(txs TxList, tx *MemPoolTx) MemPoolTxsDesc { +func Insert(txs TxList, tx *MemPoolTx) TxList { n := txs.len() idx := txs.findInsertionPoint(0, n-1, tx) if n+1 <= txs.cap() { - _txs := txs[:n+1] + _txs := txs.get()[:n+1] - copy(_txs[idx+1:], txs[idx:]) + copy(_txs[idx+1:], txs.get()[idx:]) copy(_txs[idx:], []*MemPoolTx{tx}) - return _txs + switch txs.(type) { + + case MemPoolTxsAsc: + return (MemPoolTxsAsc)(_txs) + case MemPoolTxsDesc: + return (MemPoolTxsDesc)(_txs) + default: + return nil + + } } _txs := make([]*MemPoolTx, 0, n+1) - copy(_txs, txs[:idx]) + copy(_txs, txs.get()[:idx]) copy(_txs[idx:], []*MemPoolTx{tx}) - copy(_txs[idx+1:], txs[idx:]) + copy(_txs[idx+1:], txs.get()[idx:]) + + switch txs.(type) { + + case MemPoolTxsAsc: + return (MemPoolTxsAsc)(_txs) + case MemPoolTxsDesc: + return (MemPoolTxsDesc)(_txs) + default: + return nil - return _txs + } } -// Remove - Removes existing entry from sorted ( in terms of gas price paid ) slice of txs -func Remove(txs MemPoolTxsDesc, tx *MemPoolTx) MemPoolTxsDesc { +// Remove - Removes existing entry from sorted slice of txs +func Remove(txs TxList, tx *MemPoolTx) TxList { - n := len(txs) - idx := findTx(txs, 0, n-1, tx) + n := txs.len() + idx := txs.findTx(0, n-1, tx) if idx == -1 { // denotes nothing to delete return txs } - copy(txs[idx:], txs[idx+1:]) - txs[n-1] = nil - txs = txs[:n-1] + copy(txs.get()[idx:], txs.get()[idx+1:]) + txs.get()[n-1] = nil + + switch txs.(type) { - return txs + case MemPoolTxsAsc: + return (MemPoolTxsAsc)(txs.get()[:n-1]) + case MemPoolTxsDesc: + return (MemPoolTxsDesc)(txs.get()[:n-1]) + default: + return nil + + } } From f47d7745b29aa7c33f0f1eb117955c2e4baf4fdf Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 15:00:54 +0530 Subject: [PATCH 15/20] keeping both ascending & descending sorted tx list --- app/data/pending.go | 29 +++++++++++++++++------------ app/data/queued.go | 29 +++++++++++++++++------------ app/data/txs.go | 5 +++-- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/app/data/pending.go b/app/data/pending.go index 6100996..c069c9a 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -16,9 +16,10 @@ import ( // PendingPool - Currently present pending tx(s) i.e. which are ready to // be mined in next block type PendingPool struct { - Transactions map[common.Hash]*MemPoolTx - SortedTxs MemPoolTxsDesc - Lock *sync.RWMutex + Transactions map[common.Hash]*MemPoolTx + AscTxsByGasPrice TxList + DescTxsByGasPrice TxList + Lock *sync.RWMutex } // Get - Given tx hash, attempts to find out tx in pending pool, if any @@ -54,7 +55,7 @@ func (p *PendingPool) Count() uint64 { p.Lock.RLock() defer p.Lock.RUnlock() - return uint64(len(p.SortedTxs)) + return uint64(p.AscTxsByGasPrice.len()) } @@ -127,7 +128,7 @@ func (p *PendingPool) TopXWithHighGasPrice(x uint64) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - if txs := p.SortedTxs; txs != nil { + if txs := p.DescTxsByGasPrice.get(); txs != nil { return txs[:x] } @@ -142,7 +143,7 @@ func (p *PendingPool) TopXWithLowGasPrice(x uint64) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - if txs := p.SortedTxs; txs != nil { + if txs := p.AscTxsByGasPrice.get(); txs != nil { return txs[len(txs)-int(x):] } @@ -157,9 +158,9 @@ func (p *PendingPool) SentFrom(address common.Address) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(p.SortedTxs)) + result := make([]*MemPoolTx, 0, p.DescTxsByGasPrice.len()) - for _, tx := range p.SortedTxs { + for _, tx := range p.DescTxsByGasPrice.get() { if tx.IsSentFrom(address) { result = append(result, tx) @@ -178,9 +179,9 @@ func (p *PendingPool) SentTo(address common.Address) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(p.SortedTxs)) + result := make([]*MemPoolTx, 0, p.DescTxsByGasPrice.len()) - for _, tx := range p.SortedTxs { + for _, tx := range p.DescTxsByGasPrice.get() { if tx.IsSentTo(address) { result = append(result, tx) @@ -258,8 +259,10 @@ func (p *PendingPool) Add(ctx context.Context, pubsub *redis.Client, tx *MemPool // After adding new tx in pending pool, also attempt to // publish it to pubsub topic p.PublishAdded(ctx, pubsub, tx) + // Insert into sorted pending tx list, keep sorted - p.SortedTxs = Insert(p.SortedTxs, tx) + p.AscTxsByGasPrice = Insert(p.AscTxsByGasPrice, tx) + p.DescTxsByGasPrice = Insert(p.DescTxsByGasPrice, tx) return true @@ -311,8 +314,10 @@ func (p *PendingPool) Remove(ctx context.Context, pubsub *redis.Client, txStat * // Publishing this confirmed tx p.PublishRemoved(ctx, pubsub, tx) + // Remove from sorted tx list, keep it sorted - p.SortedTxs = Remove(p.SortedTxs, tx) + p.AscTxsByGasPrice = Remove(p.AscTxsByGasPrice, tx) + p.DescTxsByGasPrice = Remove(p.DescTxsByGasPrice, tx) delete(p.Transactions, txStat.Hash) diff --git a/app/data/queued.go b/app/data/queued.go index d5e0374..1b720d0 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -20,9 +20,10 @@ import ( // when next block is going to be picked, when these tx(s) are going to be // moved to pending pool, only they can be considered before mining type QueuedPool struct { - Transactions map[common.Hash]*MemPoolTx - SortedTxs MemPoolTxsDesc - Lock *sync.RWMutex + Transactions map[common.Hash]*MemPoolTx + AscTxsByGasPrice TxList + DescTxsByGasPrice TxList + Lock *sync.RWMutex } // Get - Given tx hash, attempts to find out tx in queued pool, if any @@ -58,7 +59,7 @@ func (q *QueuedPool) Count() uint64 { q.Lock.RLock() defer q.Lock.RUnlock() - return uint64(len(q.SortedTxs)) + return uint64(q.AscTxsByGasPrice.len()) } @@ -131,7 +132,7 @@ func (q *QueuedPool) TopXWithHighGasPrice(x uint64) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - if txs := q.SortedTxs; txs != nil { + if txs := q.DescTxsByGasPrice.get(); txs != nil { return txs[:x] } @@ -146,7 +147,7 @@ func (q *QueuedPool) TopXWithLowGasPrice(x uint64) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - if txs := q.SortedTxs; txs != nil { + if txs := q.AscTxsByGasPrice.get(); txs != nil { return txs[len(txs)-int(x):] } @@ -161,9 +162,9 @@ func (q *QueuedPool) SentFrom(address common.Address) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(q.SortedTxs)) + result := make([]*MemPoolTx, 0, q.DescTxsByGasPrice.len()) - for _, tx := range q.SortedTxs { + for _, tx := range q.DescTxsByGasPrice.get() { if tx.IsSentFrom(address) { result = append(result, tx) @@ -182,9 +183,9 @@ func (q *QueuedPool) SentTo(address common.Address) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(q.SortedTxs)) + result := make([]*MemPoolTx, 0, q.DescTxsByGasPrice.len()) - for _, tx := range q.SortedTxs { + for _, tx := range q.DescTxsByGasPrice.get() { if tx.IsSentTo(address) { result = append(result, tx) @@ -262,8 +263,10 @@ func (q *QueuedPool) Add(ctx context.Context, pubsub *redis.Client, tx *MemPoolT // As soon as we find new entry for queued pool // we publish that tx to pubsub topic q.PublishAdded(ctx, pubsub, tx) + // Insert into sorted pending tx list, keep sorted - q.SortedTxs = Insert(q.SortedTxs, tx) + q.AscTxsByGasPrice = Insert(q.AscTxsByGasPrice, tx) + q.DescTxsByGasPrice = Insert(q.DescTxsByGasPrice, tx) return true @@ -306,8 +309,10 @@ func (q *QueuedPool) Remove(ctx context.Context, pubsub *redis.Client, txHash co // Publishing unstuck tx, this is probably going to // enter pending pool now q.PublishRemoved(ctx, pubsub, q.Transactions[txHash]) + // Remove from sorted tx list, keep it sorted - q.SortedTxs = Remove(q.SortedTxs, tx) + q.AscTxsByGasPrice = Remove(q.AscTxsByGasPrice, tx) + q.DescTxsByGasPrice = Remove(q.DescTxsByGasPrice, tx) delete(q.Transactions, txHash) diff --git a/app/data/txs.go b/app/data/txs.go index b007378..31fb035 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -69,13 +69,14 @@ func Remove(txs TxList, tx *MemPoolTx) TxList { copy(txs.get()[idx:], txs.get()[idx+1:]) txs.get()[n-1] = nil + _txs := txs.get()[:n-1] switch txs.(type) { case MemPoolTxsAsc: - return (MemPoolTxsAsc)(txs.get()[:n-1]) + return (MemPoolTxsAsc)(_txs) case MemPoolTxsDesc: - return (MemPoolTxsDesc)(txs.get()[:n-1]) + return (MemPoolTxsDesc)(_txs) default: return nil From 9d0cf889ffc7e525f9600df167af9a9702089252 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 18:50:32 +0530 Subject: [PATCH 16/20] starting with slice of capacity 1024 --- app/bootup/bootup.go | 14 ++++++++------ app/data/pending.go | 12 ++---------- app/data/queued.go | 12 ++---------- 3 files changed, 12 insertions(+), 26 deletions(-) diff --git a/app/bootup/bootup.go b/app/bootup/bootup.go index 2565ed8..112d18c 100644 --- a/app/bootup/bootup.go +++ b/app/bootup/bootup.go @@ -100,14 +100,16 @@ func SetGround(ctx context.Context, file string) (*data.Resource, error) { pool := &data.MemPool{ Pending: &data.PendingPool{ - Transactions: make(map[common.Hash]*data.MemPoolTx), - SortedTxs: make(data.MemPoolTxsDesc, 0, 1024), - Lock: &sync.RWMutex{}, + Transactions: make(map[common.Hash]*data.MemPoolTx), + AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024), + DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024), + Lock: &sync.RWMutex{}, }, Queued: &data.QueuedPool{ - Transactions: make(map[common.Hash]*data.MemPoolTx), - SortedTxs: make(data.MemPoolTxsDesc, 0, 1024), - Lock: &sync.RWMutex{}, + Transactions: make(map[common.Hash]*data.MemPoolTx), + AscTxsByGasPrice: make(data.MemPoolTxsAsc, 0, 1024), + DescTxsByGasPrice: make(data.MemPoolTxsDesc, 0, 1024), + Lock: &sync.RWMutex{}, }, } diff --git a/app/data/pending.go b/app/data/pending.go index c069c9a..df79229 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -128,11 +128,7 @@ func (p *PendingPool) TopXWithHighGasPrice(x uint64) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - if txs := p.DescTxsByGasPrice.get(); txs != nil { - return txs[:x] - } - - return nil + return p.DescTxsByGasPrice.get()[:x] } @@ -143,11 +139,7 @@ func (p *PendingPool) TopXWithLowGasPrice(x uint64) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - if txs := p.AscTxsByGasPrice.get(); txs != nil { - return txs[len(txs)-int(x):] - } - - return nil + return p.AscTxsByGasPrice.get()[:x] } diff --git a/app/data/queued.go b/app/data/queued.go index 1b720d0..5174592 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -132,11 +132,7 @@ func (q *QueuedPool) TopXWithHighGasPrice(x uint64) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - if txs := q.DescTxsByGasPrice.get(); txs != nil { - return txs[:x] - } - - return nil + return q.DescTxsByGasPrice.get()[:x] } @@ -147,11 +143,7 @@ func (q *QueuedPool) TopXWithLowGasPrice(x uint64) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - if txs := q.AscTxsByGasPrice.get(); txs != nil { - return txs[len(txs)-int(x):] - } - - return nil + return q.AscTxsByGasPrice.get()[:x] } From 90acc3682851803d73aa98e6c1fd08db4c5b491f Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 19:09:24 +0530 Subject: [PATCH 17/20] finding tx from slice to be done without copying tx pointer --- app/data/txs.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/app/data/txs.go b/app/data/txs.go index 31fb035..3b3d3a4 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -90,11 +90,15 @@ func findTxFromSlice(txs []*MemPoolTx, tx *MemPoolTx) int { idx := -1 - for i, v := range txs { - if v.Hash == tx.Hash { + // Don't copy tx elements from slice, rather access them by + // pointer ( directly from index ) + for i := 0; i < len(txs); i++ { + + if txs[i].Hash == tx.Hash { idx = i break } + } return idx From 0bf0aec271884e16ab3bd0a21e0b4d41742c5787 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 19:24:13 +0530 Subject: [PATCH 18/20] corrected how reallocation to be done when we need to ask for more memory during insertion --- app/data/txs.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/app/data/txs.go b/app/data/txs.go index 3b3d3a4..3a59c42 100644 --- a/app/data/txs.go +++ b/app/data/txs.go @@ -38,12 +38,17 @@ func Insert(txs TxList, tx *MemPoolTx) TxList { } - _txs := make([]*MemPoolTx, 0, n+1) + _txs := make([]*MemPoolTx, n+1) copy(_txs, txs.get()[:idx]) copy(_txs[idx:], []*MemPoolTx{tx}) copy(_txs[idx+1:], txs.get()[idx:]) + // Previous array now only contains `nil` + for i := 0; i < txs.len(); i++ { + txs.get()[i] = nil + } + switch txs.(type) { case MemPoolTxsAsc: From 3905ff335c8868f65cf9fe3d0966b0fcd4463ef2 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Tue, 30 Mar 2021 20:48:43 +0530 Subject: [PATCH 19/20] added noise over tls --- app/networking/host.go | 8 +++++--- go.mod | 3 ++- go.sum | 4 ++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/app/networking/host.go b/app/networking/host.go index 2e9f067..7c08218 100644 --- a/app/networking/host.go +++ b/app/networking/host.go @@ -12,7 +12,8 @@ import ( connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" - libp2ptls "github.com/libp2p/go-libp2p-tls" + noise "github.com/libp2p/go-libp2p-noise" + tls "github.com/libp2p/go-libp2p-tls" ma "github.com/multiformats/go-multiaddr" ) @@ -31,13 +32,14 @@ func CreateHost(ctx context.Context) (host.Host, error) { fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", config.GetNetworkingPort()), }...) - security := libp2p.Security(libp2ptls.ID, libp2ptls.New) + _tls := libp2p.Security(tls.ID, tls.New) + _noise := libp2p.Security(noise.ID, noise.New) transports := libp2p.DefaultTransports connManager := libp2p.ConnectionManager(connmgr.NewConnManager(10, 100, time.Minute)) - opts := []libp2p.Option{identity, addrs, security, transports, connManager} + opts := []libp2p.Option{identity, addrs, _tls, _noise, transports, connManager} return libp2p.New(ctx, opts...) diff --git a/go.mod b/go.mod index 6e6bc7b..c856af7 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect github.com/agnivade/levenshtein v1.1.0 // indirect github.com/deckarep/golang-set v1.7.1 // indirect - github.com/ethereum/go-ethereum v1.10.0 + github.com/ethereum/go-ethereum v1.10.1 github.com/gammazero/deque v0.0.0-20201010052221-3932da5530cc // indirect github.com/gammazero/workerpool v1.1.1 github.com/go-ole/go-ole v1.2.5 // indirect @@ -19,6 +19,7 @@ require ( github.com/libp2p/go-libp2p-core v0.8.0 github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-kad-dht v0.11.1 + github.com/libp2p/go-libp2p-noise v0.1.1 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/magiconair/properties v1.8.4 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect diff --git a/go.sum b/go.sum index d1a64ba..c97b290 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ethereum/go-ethereum v1.9.25/go.mod h1:vMkFiYLHI4tgPw4k2j4MHKoovchFE8plZ0M9VMk4/oM= -github.com/ethereum/go-ethereum v1.10.0 h1:EBZuZYjk1DHboBJb2YkBN8xItELRY6mtZEiYJKuH0+M= -github.com/ethereum/go-ethereum v1.10.0/go.mod h1:E5e/zvdfUVr91JZ0AwjyuJM3x+no51zZJRz61orLLSk= +github.com/ethereum/go-ethereum v1.10.1 h1:bGQezu+kqqRBczcSAruEoqVzTjtkeDnUGI2I4uroyUE= +github.com/ethereum/go-ethereum v1.10.1/go.mod h1:E5e/zvdfUVr91JZ0AwjyuJM3x+no51zZJRz61orLLSk= github.com/fatih/color v1.3.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= From 1fc6e689ed3d2aaf21aea2a13e576618187b40f3 Mon Sep 17 00:00:00 2001 From: Anjan Roy Date: Wed, 31 Mar 2021 16:55:43 +0530 Subject: [PATCH 20/20] iterating over slice of txs, instead of map --- app/data/pending.go | 28 ++++++++-------------------- app/data/queued.go | 28 ++++++++-------------------- 2 files changed, 16 insertions(+), 40 deletions(-) diff --git a/app/data/pending.go b/app/data/pending.go index df79229..54a6fb4 100644 --- a/app/data/pending.go +++ b/app/data/pending.go @@ -82,7 +82,7 @@ func (p *PendingPool) DuplicateTxs(hash common.Hash) []*MemPoolTx { result := make([]*MemPoolTx, 0, p.Count()) - for _, tx := range p.Transactions { + for _, tx := range p.DescTxsByGasPrice.get() { // First checking if tx under radar is the one for which // we're finding duplicate tx(s). If yes, we will move to next one @@ -94,9 +94,7 @@ func (p *PendingPool) DuplicateTxs(hash common.Hash) []*MemPoolTx { // If yes, we'll include it considerable duplicate tx list, for given // txHash if tx.IsDuplicateOf(targetTx) { - result = append(result, tx) - } } @@ -111,13 +109,7 @@ func (p *PendingPool) ListTxs() []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(p.Transactions)) - - for _, v := range p.Transactions { - result = append(result, v) - } - - return result + return p.DescTxsByGasPrice.get() } @@ -150,7 +142,7 @@ func (p *PendingPool) SentFrom(address common.Address) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - result := make([]*MemPoolTx, 0, p.DescTxsByGasPrice.len()) + result := make([]*MemPoolTx, 0, p.Count()) for _, tx := range p.DescTxsByGasPrice.get() { @@ -171,7 +163,7 @@ func (p *PendingPool) SentTo(address common.Address) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - result := make([]*MemPoolTx, 0, p.DescTxsByGasPrice.len()) + result := make([]*MemPoolTx, 0, p.Count()) for _, tx := range p.DescTxsByGasPrice.get() { @@ -192,9 +184,9 @@ func (p *PendingPool) OlderThanX(x time.Duration) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(p.Transactions)) + result := make([]*MemPoolTx, 0, p.Count()) - for _, tx := range p.Transactions { + for _, tx := range p.DescTxsByGasPrice.get() { if tx.IsPendingForGTE(x) { result = append(result, tx) @@ -213,9 +205,9 @@ func (p *PendingPool) FresherThanX(x time.Duration) []*MemPoolTx { p.Lock.RLock() defer p.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(p.Transactions)) + result := make([]*MemPoolTx, 0, p.Count()) - for _, tx := range p.Transactions { + for _, tx := range p.DescTxsByGasPrice.get() { if tx.IsPendingForLTE(x) { result = append(result, tx) @@ -273,9 +265,7 @@ func (p *PendingPool) PublishAdded(ctx context.Context, pubsub *redis.Client, ms } if err := pubsub.Publish(ctx, config.GetPendingTxEntryPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish new pending tx : %s\n", err.Error()) - } } @@ -332,9 +322,7 @@ func (p *PendingPool) PublishRemoved(ctx context.Context, pubsub *redis.Client, } if err := pubsub.Publish(ctx, config.GetPendingTxExitPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish confirmed tx : %s\n", err.Error()) - } } diff --git a/app/data/queued.go b/app/data/queued.go index 5174592..e01e6c8 100644 --- a/app/data/queued.go +++ b/app/data/queued.go @@ -86,7 +86,7 @@ func (q *QueuedPool) DuplicateTxs(hash common.Hash) []*MemPoolTx { result := make([]*MemPoolTx, 0, q.Count()) - for _, tx := range q.Transactions { + for _, tx := range q.DescTxsByGasPrice.get() { // First checking if tx under radar is the one for which // we're finding duplicate tx(s). If yes, we will move to next one @@ -98,9 +98,7 @@ func (q *QueuedPool) DuplicateTxs(hash common.Hash) []*MemPoolTx { // If yes, we'll include it considerable duplicate tx list, for given // txHash if tx.IsDuplicateOf(targetTx) { - result = append(result, tx) - } } @@ -115,13 +113,7 @@ func (q *QueuedPool) ListTxs() []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(q.Transactions)) - - for _, v := range q.Transactions { - result = append(result, v) - } - - return result + return q.DescTxsByGasPrice.get() } @@ -154,7 +146,7 @@ func (q *QueuedPool) SentFrom(address common.Address) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - result := make([]*MemPoolTx, 0, q.DescTxsByGasPrice.len()) + result := make([]*MemPoolTx, 0, q.Count()) for _, tx := range q.DescTxsByGasPrice.get() { @@ -175,7 +167,7 @@ func (q *QueuedPool) SentTo(address common.Address) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - result := make([]*MemPoolTx, 0, q.DescTxsByGasPrice.len()) + result := make([]*MemPoolTx, 0, q.Count()) for _, tx := range q.DescTxsByGasPrice.get() { @@ -196,9 +188,9 @@ func (q *QueuedPool) OlderThanX(x time.Duration) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(q.Transactions)) + result := make([]*MemPoolTx, 0, q.Count()) - for _, tx := range q.Transactions { + for _, tx := range q.DescTxsByGasPrice.get() { if tx.IsQueuedForGTE(x) { result = append(result, tx) @@ -217,9 +209,9 @@ func (q *QueuedPool) FresherThanX(x time.Duration) []*MemPoolTx { q.Lock.RLock() defer q.Lock.RUnlock() - result := make([]*MemPoolTx, 0, len(q.Transactions)) + result := make([]*MemPoolTx, 0, q.Count()) - for _, tx := range q.Transactions { + for _, tx := range q.DescTxsByGasPrice.get() { if tx.IsQueuedForLTE(x) { result = append(result, tx) @@ -277,9 +269,7 @@ func (q *QueuedPool) PublishAdded(ctx context.Context, pubsub *redis.Client, msg } if err := pubsub.Publish(ctx, config.GetQueuedTxEntryPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish new queued tx : %s\n", err.Error()) - } } @@ -329,9 +319,7 @@ func (q *QueuedPool) PublishRemoved(ctx context.Context, pubsub *redis.Client, m } if err := pubsub.Publish(ctx, config.GetQueuedTxExitPublishTopic(), _msg).Err(); err != nil { - log.Printf("[❗️] Failed to publish unstuck tx : %s\n", err.Error()) - } }