Skip to content

Commit

Permalink
tracker: remove parallel transaction scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
RaghavSood committed Jun 22, 2024
1 parent d370f8a commit 105cff4
Showing 1 changed file with 7 additions and 169 deletions.
176 changes: 7 additions & 169 deletions tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math/big"
"os"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -412,22 +411,15 @@ func (t *Tracker) processBlock(height int64) error {
start := time.Now()
txLosses, txTransactions, spentTxids, spentVouts = t.scanTransactions(block.Hash, block.Height, block.Tx)
elapsed := time.Since(start)
fastStart := time.Now()
fastTxLosses, fastTxTransactions, fastSpentTxids, fastSpentVouts := t.fastScanTransactions(block.Hash, block.Height, block.Tx)
fastElapsed := time.Since(fastStart)

log.Info().
Stringer("slow_elapsed", elapsed).
Stringer("fast_elapsed", fastElapsed).
Int("slow_losses", len(txLosses)).
Int("fast_losses", len(fastTxLosses)).
Int("slow_transactions", len(txTransactions)).
Int("fast_transactions", len(fastTxTransactions)).
Int("slow_spent_txids", len(spentTxids)).
Int("fast_spent_txids", len(fastSpentTxids)).
Int("slow_spent_vouts", len(spentVouts)).
Int("fast_spent_vouts", len(fastSpentVouts)).
Msg("Transaction scan results")
Int64("block_height", height).
Stringer("elapsed", elapsed).
Int("losses", len(txLosses)).
Int("transactions", len(txTransactions)).
Int("spent_txids", len(spentTxids)).
Int("spent_vouts", len(spentVouts)).
Msg("Block transactions scanned")

losses = append(losses, txLosses...)
transactions = append(transactions, txTransactions...)
Expand Down Expand Up @@ -542,157 +534,3 @@ func (t *Tracker) scanTransactions(blockhash string, blockHeight int64, transact

return losses, txs, spentTxids, spentVouts
}

func (t *Tracker) fastScanTransactions(blockhash string, blockHeight int64, transactions []btypes.TransactionDetail) ([]types.Loss, []types.Transaction, []string, []int) {
numCPUs := runtime.NumCPU()
transactionCh := make(chan btypes.TransactionDetail, len(transactions))
lossCh := make(chan types.Loss)
txCh := make(chan types.Transaction)
spentTxidVoutCh := make(chan [2]interface{})

var wg sync.WaitGroup
wg.Add(numCPUs)
log.Info().Int("num_cpus", numCPUs).Msg("Starting fast transaction scan")

for i := 0; i < numCPUs; i++ {
go func() {
defer wg.Done()
for tx := range transactionCh {
t.processTransaction(blockhash, blockHeight, tx, lossCh, txCh, spentTxidVoutCh)
}
}()
}

log.Info().
Msg("Started runners")

go func() {
wg.Wait()
log.Info().Msg("All runners done")
close(lossCh)
close(txCh)
close(spentTxidVoutCh)
}()

log.Info().
Msg("Sending transactions to channel")
for _, tx := range transactions {
transactionCh <- tx
}
close(transactionCh)
log.Info().
Msg("Transactions sent to channel")

var losses []types.Loss
var txs []types.Transaction
var spentTxids []string
var spentVouts []int

for lossCh != nil || txCh != nil || spentTxidVoutCh != nil {
select {
case loss, ok := <-lossCh:
if !ok {
lossCh = nil
continue
}
losses = append(losses, loss)
case tx, ok := <-txCh:
if !ok {
txCh = nil
continue
}
txs = append(txs, tx)
case spent, ok := <-spentTxidVoutCh:
if !ok {
spentTxidVoutCh = nil
continue
}
spentTxids = append(spentTxids, spent[0].(string))
spentVouts = append(spentVouts, spent[1].(int))
}
}

return losses, txs, spentTxids, spentVouts
}

func (t *Tracker) processTransaction(blockhash string, blockHeight int64, tx btypes.TransactionDetail, lossCh chan types.Loss, txCh chan types.Transaction, spentTxidVoutCh chan [2]interface{}) {
var atLeastOneBurn bool

for _, vin := range tx.Vin {
if vin.Coinbase != "" {
continue
}

spentScript := vin.Prevout.ScriptPubKey.Hex

if t.bf.TestString(spentScript) {
exists, err := t.db.BurnScriptExists(spentScript)
if err != nil {
log.Error().Err(err).Str("script", spentScript).Msg("Failed to check if script exists")
continue
}

log.Info().
Str("script", spentScript).
Bool("exists", exists).
Str("txid", vin.Txid).
Int("vout", vin.Vout).
Msg("Identified spending of burn script output")

if exists {
spentTxidVoutCh <- [2]interface{}{vin.Txid, vin.Vout}
}
}
}

for _, vout := range tx.Vout {
if vout.ScriptPubKey.Type == "nulldata" {
lossCh <- types.Loss{
TxID: tx.Txid,
BlockHash: blockhash,
BlockHeight: blockHeight,
Vout: vout.N,
Amount: types.FromBTCString(types.BTCString(vout.Value)),
BurnScript: vout.ScriptPubKey.Hex,
}

atLeastOneBurn = true
} else if t.bf.TestString(vout.ScriptPubKey.Hex) {
exists, err := t.db.BurnScriptExists(vout.ScriptPubKey.Hex)
if err != nil {
log.Error().Err(err).Str("script", vout.ScriptPubKey.Hex).Msg("Failed to check if script exists")
continue
}

log.Info().Str("script", vout.ScriptPubKey.Hex).Bool("exists", exists).Msg("Burn script identified")

if exists {
lossCh <- types.Loss{
TxID: tx.Txid,
BlockHash: blockhash,
BlockHeight: blockHeight,
Vout: vout.N,
Amount: types.FromBTCString(types.BTCString(vout.Value)),
BurnScript: vout.ScriptPubKey.Hex,
}

atLeastOneBurn = true
}
}
}

if atLeastOneBurn {
jsonTx, err := json.Marshal(tx)
if err != nil {
log.Error().Err(err).Str("txid", tx.Txid).Msg("Failed to marshal transaction")
return
}

txCh <- types.Transaction{
TxID: tx.Txid,
TransactionDetails: string(jsonTx),
BlockHeight: blockHeight,
BlockHash: blockhash,
}
}
}

0 comments on commit 105cff4

Please sign in to comment.