Skip to content

Commit

Permalink
Merge branch 'mempool'
Browse files Browse the repository at this point in the history
  • Loading branch information
martinboehm committed Apr 15, 2019
2 parents 3ef9426 + dffcded commit 9642e30
Show file tree
Hide file tree
Showing 58 changed files with 945 additions and 543 deletions.
13 changes: 13 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,16 @@ type SystemInfo struct {
Blockbook *BlockbookInfo `json:"blockbook"`
Backend *bchain.ChainInfo `json:"backend"`
}

// MempoolTxid contains information about a transaction in mempool
type MempoolTxid struct {
Time int64 `json:"time"`
Txid string `json:"txid"`
}

// MempoolTxids contains a list of mempool txids with paging information
type MempoolTxids struct {
Paging
Mempool []MempoolTxid `json:"mempool"`
MempoolSize int `json:"mempoolSize"`
}
33 changes: 31 additions & 2 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ type Worker struct {
chain bchain.BlockChain
chainParser bchain.BlockChainParser
chainType bchain.ChainType
mempool bchain.Mempool
is *common.InternalState
}

// NewWorker creates new api worker
func NewWorker(db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState) (*Worker, error) {
func NewWorker(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, is *common.InternalState) (*Worker, error) {
w := &Worker{
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
chainType: chain.GetChainParser().GetChainType(),
mempool: mempool,
is: is,
}
return w, nil
Expand Down Expand Up @@ -292,6 +294,10 @@ func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height uint32,
return nil, err
}
}
// for mempool transaction get first seen time
if bchainTx.Confirmations == 0 {
bchainTx.Blocktime = int64(w.mempool.GetTransactionTime(bchainTx.Txid))
}
r := &Tx{
Blockhash: blockhash,
Blockheight: int(height),
Expand Down Expand Up @@ -348,7 +354,7 @@ func (w *Worker) getAddressTxids(addrDesc bchain.AddressDescriptor, mempool bool
}
if mempool {
uniqueTxs := make(map[string]struct{})
o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc)
o, err := w.mempool.GetAddrDescTransactions(addrDesc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1068,3 +1074,26 @@ func (w *Worker) GetSystemInfo(internal bool) (*SystemInfo, error) {
glog.Info("GetSystemInfo finished in ", time.Since(start))
return &SystemInfo{bi, ci}, nil
}

// GetMempool returns a page of mempool txids
func (w *Worker) GetMempool(page int, itemsOnPage int) (*MempoolTxids, error) {
page--
if page < 0 {
page = 0
}
entries := w.mempool.GetAllEntries()
pg, from, to, page := computePaging(len(entries), page, itemsOnPage)
r := &MempoolTxids{
Paging: pg,
MempoolSize: len(entries),
}
r.Mempool = make([]MempoolTxid, to-from)
for i := from; i < to; i++ {
entry := &entries[i]
r.Mempool[i-from] = MempoolTxid{
Txid: entry.Txid,
Time: int64(entry.Time),
}
}
return r, nil
}
28 changes: 17 additions & 11 deletions api/xpub.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (w *Worker) xpubGetAddressTxids(addrDesc bchain.AddressDescriptor, mempool
}
if mempool {
uniqueTxs := make(map[string]int)
o, err := w.chain.GetMempoolTransactionsForAddrDesc(addrDesc)
o, err := w.mempool.GetAddrDescTransactions(addrDesc)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -383,13 +383,13 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
return nil, err
}
// setup filtering of txids
var useTxids func(txid *xpubTxid, ad *xpubAddress) bool
var txidFilter func(txid *xpubTxid, ad *xpubAddress) bool
if !(filter.FromHeight == 0 && filter.ToHeight == 0 && filter.Vout == AddressFilterVoutOff) {
toHeight := maxUint32
if filter.ToHeight != 0 {
toHeight = filter.ToHeight
}
useTxids = func(txid *xpubTxid, ad *xpubAddress) bool {
txidFilter = func(txid *xpubTxid, ad *xpubAddress) bool {
if txid.height < filter.FromHeight || txid.height > toHeight {
return false
}
Expand All @@ -406,6 +406,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
// process mempool, only if ToHeight is not specified
if filter.ToHeight == 0 && !filter.OnlyConfirmed {
txmMap = make(map[string]*Tx)
mempoolEntries := make(bchain.MempoolTxidEntries, 0)
for _, da := range [][]xpubAddress{data.addresses, data.changeAddresses} {
for i := range da {
ad := &da[i]
Expand All @@ -432,18 +433,23 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
}
uBalSat.Add(&uBalSat, tx.getAddrVoutValue(ad.addrDesc))
uBalSat.Sub(&uBalSat, tx.getAddrVinValue(ad.addrDesc))
if page == 0 && !foundTx && (useTxids == nil || useTxids(&txid, ad)) {
if option == AccountDetailsTxidHistory {
txids = append(txids, tx.Txid)
} else if option >= AccountDetailsTxHistoryLight {
txs = append(txs, tx)
}
// mempool txs are returned only on the first page, uniquely and filtered
if page == 0 && !foundTx && (txidFilter == nil || txidFilter(&txid, ad)) {
mempoolEntries = append(mempoolEntries, bchain.MempoolTxidEntry{Txid: txid.txid, Time: uint32(tx.Blocktime)})
}
}

}
}
}
// sort the entries by time descending
sort.Sort(mempoolEntries)
for _, entry := range mempoolEntries {
if option == AccountDetailsTxidHistory {
txids = append(txids, entry.Txid)
} else if option >= AccountDetailsTxHistoryLight {
txs = append(txs, txmMap[entry.Txid])
}
}
}
if option >= AccountDetailsTxidHistory {
txcMap := make(map[string]bool)
Expand All @@ -459,7 +465,7 @@ func (w *Worker) GetXpubAddress(xpub string, page int, txsOnPage int, option Acc
}
// add tx only once
if !added {
add := useTxids == nil || useTxids(&txid, ad)
add := txidFilter == nil || txidFilter(&txid, ad)
txcMap[txid.txid] = add
if add {
txc = append(txc, txid)
Expand Down
5 changes: 5 additions & 0 deletions bchain/basechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (b *BaseChain) GetNetworkName() string {
return b.Network
}

// GetMempoolEntry is not supported by default
func (b *BaseChain) GetMempoolEntry(txid string) (*MempoolEntry, error) {
return nil, errors.New("GetMempoolEntry: not supported")
}

// EthereumTypeGetBalance is not supported
func (b *BaseChain) EthereumTypeGetBalance(addrDesc AddressDescriptor) (*big.Int, error) {
return nil, errors.New("Not supported")
Expand Down
115 changes: 115 additions & 0 deletions bchain/basemempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package bchain

import (
"sort"
"sync"
)

type addrIndex struct {
addrDesc string
n int32
}

type txEntry struct {
addrIndexes []addrIndex
time uint32
}

type txidio struct {
txid string
io []addrIndex
}

// BaseMempool is mempool base handle
type BaseMempool struct {
chain BlockChain
mux sync.Mutex
txEntries map[string]txEntry
addrDescToTx map[string][]Outpoint
OnNewTxAddr OnNewTxAddrFunc
}

// GetTransactions returns slice of mempool transactions for given address
func (m *BaseMempool) GetTransactions(address string) ([]Outpoint, error) {
parser := m.chain.GetChainParser()
addrDesc, err := parser.GetAddrDescFromAddress(address)
if err != nil {
return nil, err
}
return m.GetAddrDescTransactions(addrDesc)
}

// GetAddrDescTransactions returns slice of mempool transactions for given address descriptor, in reverse order
func (m *BaseMempool) GetAddrDescTransactions(addrDesc AddressDescriptor) ([]Outpoint, error) {
m.mux.Lock()
defer m.mux.Unlock()
outpoints := m.addrDescToTx[string(addrDesc)]
rv := make([]Outpoint, len(outpoints))
for i, j := len(outpoints)-1, 0; i >= 0; i-- {
rv[j] = outpoints[i]
j++
}
return rv, nil
}

func (a MempoolTxidEntries) Len() int { return len(a) }
func (a MempoolTxidEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MempoolTxidEntries) Less(i, j int) bool {
// if the Time is equal, sort by txid to make the order defined
hi := a[i].Time
hj := a[j].Time
if hi == hj {
return a[i].Txid > a[j].Txid
}
// order in reverse
return hi > hj
}

// removeEntryFromMempool removes entry from mempool structs. The caller is responsible for locking!
func (m *BaseMempool) removeEntryFromMempool(txid string, entry txEntry) {
delete(m.txEntries, txid)
for _, si := range entry.addrIndexes {
outpoints, found := m.addrDescToTx[si.addrDesc]
if found {
newOutpoints := make([]Outpoint, 0, len(outpoints)-1)
for _, o := range outpoints {
if o.Txid != txid {
newOutpoints = append(newOutpoints, o)
}
}
if len(newOutpoints) > 0 {
m.addrDescToTx[si.addrDesc] = newOutpoints
} else {
delete(m.addrDescToTx, si.addrDesc)
}
}
}
}

// GetAllEntries returns all mempool entries sorted by fist seen time in descending order
func (m *BaseMempool) GetAllEntries() MempoolTxidEntries {
i := 0
m.mux.Lock()
entries := make(MempoolTxidEntries, len(m.txEntries))
for txid, entry := range m.txEntries {
entries[i] = MempoolTxidEntry{
Txid: txid,
Time: entry.time,
}
i++
}
m.mux.Unlock()
sort.Sort(entries)
return entries
}

// GetTransactionTime returns first seen time of a transaction
func (m *BaseMempool) GetTransactionTime(txid string) uint32 {
m.mux.Lock()
e, found := m.txEntries[txid]
m.mux.Unlock()
if !found {
return 0
}
return e.time
}
3 changes: 2 additions & 1 deletion bchain/coins/bch/bcashrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ func NewBCashRPC(config json.RawMessage, pushHandler func(bchain.NotificationTyp

// Initialize initializes BCashRPC instance.
func (b *BCashRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain

params := GetChainParams(chainName)

Expand Down
3 changes: 2 additions & 1 deletion bchain/coins/bellcoin/bellcoinrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ func NewBellcoinRPC(config json.RawMessage, pushHandler func(bchain.Notification

// Initialize initializes BellcoinRPC instance.
func (b *BellcoinRPC) Initialize() error {
chainName, err := b.GetChainInfoAndInitializeMempool(b)
ci, err := b.GetChainInfo()
if err != nil {
return err
}
chainName := ci.Chain

glog.Info("Chain name ", chainName)
params := GetChainParams(chainName)
Expand Down
Loading

0 comments on commit 9642e30

Please sign in to comment.