Skip to content

Commit

Permalink
add height indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
tubackkhoa committed Nov 8, 2023
1 parent 9ae2e3b commit a88e0b4
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 25 deletions.
182 changes: 157 additions & 25 deletions state/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
continue
}
if !hashesInitialized {
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, true, heightInfo)
filteredHashes = txi.matchRange(ctx, qr, filteredHashes, true, heightInfo)
hashesInitialized = true

// Ignore any remaining conditions if the first condition resulted
Expand All @@ -269,7 +269,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResul
break
}
} else {
filteredHashes = txi.matchRange(ctx, qr, startKey(qr.Key), filteredHashes, false, heightInfo)
filteredHashes = txi.matchRange(ctx, qr, filteredHashes, false, heightInfo)
}
}
}
Expand Down Expand Up @@ -492,7 +492,6 @@ REMOVE_LOOP:
func (txi *TxIndex) matchRange(
ctx context.Context,
qr indexer.QueryRange,
startKey []byte,
filteredHashes map[string][]byte,
firstRun bool,
heightInfo HeightInfo,
Expand All @@ -503,9 +502,14 @@ func (txi *TxIndex) matchRange(
return filteredHashes
}

// call matchRangeHeight to have fastest speed
if qr.Key == types.TxHeightKey {
return txi.matchRangeHeight(ctx, qr, filteredHashes, firstRun, heightInfo)
}

tmpHashes := make(map[string][]byte)

it, err := dbm.IteratePrefix(txi.store, startKey)
it, err := dbm.IteratePrefix(txi.store, startKey(qr.Key))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -585,6 +589,125 @@ REMOVE_LOOP:
return filteredHashes
}

// matchRangeHeight returns all matching txs by hash that meet a given queryRange and
// start key. An already filtered result (filteredHashes) is provided such that
// any non-intersecting matches are removed.
//
// NOTE: filteredHashes may be empty if no previous condition has matched.

const blockToSearch = 5000

func (txi *TxIndex) matchRangeHeight(
ctx context.Context,
qr indexer.QueryRange,
filteredHashes map[string][]byte,
firstRun bool,
heightInfo HeightInfo,
) map[string][]byte {

lowerBound, lowerOk := qr.LowerBoundValue().(*big.Int)
upperBound, upperOk := qr.UpperBoundValue().(*big.Int)
if !lowerOk && !upperOk {
return filteredHashes
}
// include =
rangeBound := big.NewInt(blockToSearch - 1)
if lowerBound == nil {
lowerBound = new(big.Int).Sub(upperBound, rangeBound)
} else if upperBound == nil {
upperBound = new(big.Int).Add(lowerBound, rangeBound)
}

lowerHeight := lowerBound.Int64()
upperHeight := upperBound.Int64() + 1

// when search with upperHeight < blockToSearch
if lowerHeight < 1 {
lowerHeight = 1
}

// upper >= lower and upperHeight-lowerHeight <= blockToSearch
if lowerHeight > upperHeight || upperHeight-lowerHeight > blockToSearch {
return filteredHashes
}

tmpHashes := make(map[string][]byte)
startKeyBz := []byte(qr.Key)
fromKey := startKeyWithHeight(startKeyBz, lowerHeight)
toKey := startKeyWithHeight(startKeyBz, upperHeight)

// already have correct range
it, err := txi.store.Iterator(fromKey, toKey)
if err != nil {
panic(err)
}
defer it.Close()

LOOP:
for ; it.Valid(); it.Next() {

if !isTagKey(it.Key()) {
continue
}

if qr.Key != types.TxHeightKey {
keyHeight, err := extractHeightFromKey(it.Key())
if err != nil || !checkHeightConditions(heightInfo, keyHeight) {
continue LOOP
}
}

txi.setTmpHashes(tmpHashes, it)

// XXX: passing time in a ABCI Events is not yet implemented
// case time.Time:
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
// if v == r.upperBound {
// break
// }

// Potentially exit early.
select {
case <-ctx.Done():
break LOOP
default:
}
}
if err := it.Error(); err != nil {
panic(err)
}

if len(tmpHashes) == 0 || firstRun {
// Either:
//
// 1. Regardless if a previous match was attempted, which may have had
// results, but no match was found for the current condition, then we
// return no matches (assuming AND operand).
//
// 2. A previous match was not attempted, so we return all results.
return tmpHashes
}

REMOVE_LOOP:
// Remove/reduce matches in filteredHashes that were not found in this
// match (tmpHashes).
for k, v := range filteredHashes {
tmpHash := tmpHashes[k]
if tmpHash == nil || !bytes.Equal(tmpHashes[k], v) {
delete(filteredHashes, k)

// Potentially exit early.
select {
case <-ctx.Done():
break REMOVE_LOOP
default:
}
}
}

return filteredHashes
}

// Keys

func isTagKey(key []byte) bool {
Expand Down Expand Up @@ -637,21 +760,40 @@ func keyForEvent(key string, value string, result *abci.TxResult, eventSeq int64
))
}

func joinBytes(s ...[]byte) []byte {
n := 0
for _, v := range s {
n += len(v)
}

b, i := make([]byte, n), 0
for _, v := range s {
i += copy(b[i:], v)
}
return b
}

func keyForHeight(result *abci.TxResult) []byte {
return []byte(fmt.Sprintf("%s/%d/%d/%d%s",
types.TxHeightKey,
result.Height,
result.Height,
result.Index,
// Added to facilitate having the eventSeq in event keys
// Otherwise queries break expecting 5 entries
eventSeqSeparator+"0",
))
keyBytes := joinBytes([]byte(types.TxHeightKey), []byte(tagKeySeparator),
[]byte{byte(result.Height >> 24), byte(result.Height >> 16), byte(result.Height >> 8), byte(result.Height)},
[]byte(fmt.Sprintf("/%d/%d%s",
result.Height,
result.Index,
// Added to facilitate having the eventSeq in event keys
// Otherwise queries break expecting 5 entries
eventSeqSeparator+"0",
)),
)
return keyBytes
}

func startKeyWithHeight(key []byte, height int64) []byte {
return joinBytes(key, []byte(tagKeySeparator), []byte{byte(height >> 24), byte(height >> 16), byte(height >> 8), byte(height)}, []byte(tagKeySeparator))
}

func startKeyForCondition(c query.Condition, height int64) []byte {
if height > 0 {
return startKey(c.CompositeKey, c.Operand, height)
if c.CompositeKey == types.TxHeightKey {
return startKeyWithHeight([]byte(c.CompositeKey), height)
}
return startKey(c.CompositeKey, c.Operand)
}
Expand All @@ -678,13 +820,3 @@ func checkBounds(ranges indexer.QueryRange, v *big.Int) bool {

return include
}

//nolint:unused,deadcode
func lookForHeight(conditions []query.Condition) (height int64) {
for _, c := range conditions {
if c.CompositeKey == types.TxHeightKey && c.Op == query.OpEqual {
return c.Operand.(int64)
}
}
return 0
}
55 changes: 55 additions & 0 deletions state/txindex/kv/kv_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"testing"
"time"

dbm "github.com/cometbft/cometbft-db"

Expand All @@ -15,6 +16,7 @@ import (
)

func BenchmarkTxSearch(b *testing.B) {

dbDir, err := os.MkdirTemp("", "benchmark_tx_search_test")
if err != nil {
b.Errorf("failed to create temporary directory: %s", err)
Expand Down Expand Up @@ -72,3 +74,56 @@ func BenchmarkTxSearch(b *testing.B) {
}
}
}

func TestBenchmarkTxSearch(b *testing.T) {

var err error
db := dbm.NewMemDB()

indexer := NewTxIndex(db)

for i := 0; i < 200; i++ {
events := []abci.Event{
{
Type: "transfer",
Attributes: []abci.EventAttribute{
{Key: "address", Value: fmt.Sprintf("address_%d", i%100), Index: true},
},
},
}

txBz := make([]byte, 8)
if _, err := rand.Read(txBz); err != nil {
b.Errorf("failed produce random bytes: %s", err)
}

txResult := &abci.TxResult{
Height: int64(i),
Index: 0,
Tx: types.Tx(string(txBz)),
Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Events: events,
},
}

if err := indexer.Index(txResult); err != nil {
b.Errorf("failed to index tx: %s", err)
}
}

txQuery := query.MustParse("tx.height <= 110 AND tx.height >= 100")

ctx := context.Background()
var ret []*abci.TxResult
start := time.Now().UnixMilli()

if ret, err = indexer.Search(ctx, txQuery); err != nil {
b.Errorf("failed to query for txs: %s", err)
}

b.Logf("found %v, take %d ms", len(ret), time.Now().UnixMilli()-start)

}

0 comments on commit a88e0b4

Please sign in to comment.