Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repr agg.iis and others as array rather than map #13458

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 31 additions & 32 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
type Aggregator struct {
db kv.RoDB
d [kv.DomainLen]*Domain
iis map[kv.InvertedIdx]*InvertedIndex
iis []*InvertedIndex
dirs datadir.Dirs
tmpdir string
aggregationStep uint64
Expand Down Expand Up @@ -141,7 +141,6 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
logger: logger,
collateAndBuildWorkers: 1,
mergeWorkers: 1,
iis: make(map[kv.InvertedIdx]*InvertedIndex),

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

Expand Down Expand Up @@ -215,17 +214,18 @@ func (a *Aggregator) registerII(idx kv.InvertedIdx, salt *uint32, dirs datadir.D
keysTable: indexKeysTable,
valuesTable: indexTable,
compression: seg.CompressNone,
iiId: idx,
}

if _, ok := a.iis[idx]; ok {
if ii := a.searchII(idx); ii != nil {
return fmt.Errorf("inverted index %s already registered", idx)
}
var err error

a.iis[idx], err = NewInvertedIndex(idxCfg, logger)
ii, err := NewInvertedIndex(idxCfg, logger)
if err != nil {
return err
}
a.iis = append(a.iis, ii)
return nil
}

Expand Down Expand Up @@ -493,13 +493,7 @@ func (c AggV3Collation) Close() {

type AggV3StaticFiles struct {
d [kv.DomainLen]StaticFiles
ivfs map[kv.InvertedIdx]InvertedFiles
}

func NewAggV3StaticFiles() *AggV3StaticFiles {
return &AggV3StaticFiles{
ivfs: make(map[kv.InvertedIdx]InvertedFiles),
}
ivfs []InvertedFiles
}

// CleanupOnError - call it on collation fail. It's closing all files
Expand All @@ -521,7 +515,7 @@ func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error {
txTo = a.FirstTxNumOfStep(step + 1)
stepStartedAt = time.Now()

static = NewAggV3StaticFiles()
static = &AggV3StaticFiles{ivfs: make([]InvertedFiles, len(a.iis))}
closeCollations = true
collListMu = sync.Mutex{}
collations = make([]Collation, 0)
Expand Down Expand Up @@ -762,9 +756,21 @@ func (a *Aggregator) DomainTables(domains ...kv.Domain) (tables []string) {
}
func (a *Aggregator) InvertedIndexTables(indices ...kv.InvertedIdx) (tables []string) {
for _, idx := range indices {
tables = append(tables, a.iis[idx].Tables()...)
if ii := a.searchII(idx); ii != nil {
tables = append(tables, ii.Tables()...)
}
}
return tables

return
}

func (a *Aggregator) searchII(name kv.InvertedIdx) *InvertedIndex {
for _, ii := range a.iis {
if ii.iiId == name {
return ii
}
}
return nil
}

type flusher interface {
Expand Down Expand Up @@ -1071,15 +1077,15 @@ func (ac *AggregatorRoTx) Prune(ctx context.Context, tx kv.RwTx, limit uint64, l
}
}

stats := make(map[kv.InvertedIdx]*InvertedIndexPruneStat, len(ac.a.iis))
stats := make([]*InvertedIndexPruneStat, len(ac.a.iis))
for iikey := range ac.a.iis {
stat, err := ac.iis[iikey].Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil)
if err != nil {
return nil, err
}
stats[iikey] = stat
}
for iikey, _ := range ac.a.iis {
for iikey := range ac.a.iis {
aggStat.Indices[ac.iis[iikey].ii.filenameBase] = stats[iikey]
}

Expand Down Expand Up @@ -1230,13 +1236,7 @@ func (a *Aggregator) recalcVisibleFilesMinimaxTxNum() {

type RangesV3 struct {
domain [kv.DomainLen]DomainRanges
invertedIndex map[kv.InvertedIdx]*MergeRange
}

func NewRangesV3() *RangesV3 {
return &RangesV3{
invertedIndex: make(map[kv.InvertedIdx]*MergeRange),
}
invertedIndex []*MergeRange
}

func (r RangesV3) String() string {
Expand All @@ -1250,7 +1250,7 @@ func (r RangesV3) String() string {
aggStep := r.domain[kv.AccountsDomain].aggStep
for p, mr := range r.invertedIndex {
if mr != nil && mr.needMerge {
ss = append(ss, mr.String(string(p), aggStep))
sudeepdino008 marked this conversation as resolved.
Show resolved Hide resolved
ss = append(ss, mr.String(fmt.Sprintf("idx%d", p), aggStep))
}
}
return strings.Join(ss, ", ")
Expand All @@ -1271,7 +1271,7 @@ func (r RangesV3) any() bool {
}

func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3 {
r := NewRangesV3()
r := &RangesV3{invertedIndex: make([]*MergeRange, len(ac.a.iis))}
if ac.a.commitmentValuesTransform {
lmrAcc := ac.d[kv.AccountsDomain].files.LatestMergedRange()
lmrSto := ac.d[kv.StorageDomain].files.LatestMergedRange()
Expand Down Expand Up @@ -1333,7 +1333,7 @@ func (ac *AggregatorRoTx) RestrictSubsetFileDeletions(b bool) {
}

func (ac *AggregatorRoTx) mergeFiles(ctx context.Context, files *SelectedStaticFilesV3, r *RangesV3) (*MergedFilesV3, error) {
mf := NewMergedFilesV3()
mf := &MergedFilesV3{iis: make([]*filesItem, len(ac.a.iis))}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(ac.a.mergeWorkers)
closeFiles := true
Expand Down Expand Up @@ -1570,10 +1570,9 @@ func (ac *AggregatorRoTx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs
return ac.d[kv.ReceiptDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
default:
// check the ii
if v, ok := ac.iis[name]; ok {
return v.IdxRange(k, fromTs, toTs, asc, limit, tx)
if ii := ac.searchII(name); ii != nil {
return ii.IdxRange(k, fromTs, toTs, asc, limit, tx)
}

return nil, fmt.Errorf("unexpected history name: %s", name)
}
}
Expand Down Expand Up @@ -1622,7 +1621,7 @@ func (ac *AggregatorRoTx) nastyFileRead(name kv.Domain, from, to uint64) (*seg.R
type AggregatorRoTx struct {
a *Aggregator
d [kv.DomainLen]*DomainRoTx
iis map[kv.InvertedIdx]*InvertedIndexRoTx
iis []*InvertedIndexRoTx

id uint64 // auto-increment id of ctx for logs
_leakID uint64 // set only if TRACE_AGG=true
Expand All @@ -1633,7 +1632,7 @@ func (a *Aggregator) BeginFilesRo() *AggregatorRoTx {
a: a,
id: a.aggRoTxAutoIncrement.Add(1),
_leakID: a.leakDetector.Add(),
iis: make(map[kv.InvertedIdx]*InvertedIndexRoTx, len(a.iis)),
iis: make([]*InvertedIndexRoTx, len(a.iis)),
}

a.visibleFilesLock.RLock()
Expand Down
23 changes: 5 additions & 18 deletions erigon-lib/state/aggregator_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ type SelectedStaticFilesV3 struct {
d [kv.DomainLen][]*filesItem
dHist [kv.DomainLen][]*filesItem
dIdx [kv.DomainLen][]*filesItem
ii map[kv.InvertedIdx][]*filesItem
}

func NewSelectedStaticFilesV3() *SelectedStaticFilesV3 {
return &SelectedStaticFilesV3{ii: make(map[kv.InvertedIdx][]*filesItem)}
ii [][]*filesItem
}

func (sf SelectedStaticFilesV3) Close() {
Expand All @@ -37,9 +33,7 @@ func (sf SelectedStaticFilesV3) Close() {
clist = append(clist, sf.d[id], sf.dIdx[id], sf.dHist[id])
}

for _, i := range sf.ii {
clist = append(clist, i)
}
clist = append(clist, sf.ii...)
for _, group := range clist {
for _, item := range group {
if item != nil {
Expand All @@ -55,7 +49,7 @@ func (sf SelectedStaticFilesV3) Close() {
}

func (ac *AggregatorRoTx) staticFilesInRange(r *RangesV3) (*SelectedStaticFilesV3, error) {
sf := NewSelectedStaticFilesV3()
sf := &SelectedStaticFilesV3{ii: make([][]*filesItem, len(r.invertedIndex))}
for id := range ac.d {
if !r.domain[id].any() {
continue
Expand All @@ -75,11 +69,7 @@ type MergedFilesV3 struct {
d [kv.DomainLen]*filesItem
dHist [kv.DomainLen]*filesItem
dIdx [kv.DomainLen]*filesItem
iis map[kv.InvertedIdx]*filesItem
}

func NewMergedFilesV3() *MergedFilesV3 {
return &MergedFilesV3{iis: make(map[kv.InvertedIdx]*filesItem)}
iis []*filesItem
}

func (mf MergedFilesV3) FrozenList() (frozen []string) {
Expand Down Expand Up @@ -109,10 +99,7 @@ func (mf MergedFilesV3) Close() {
for id := range mf.d {
clist = append(clist, mf.d[id], mf.dHist[id], mf.dIdx[id])
}
for _, ii := range mf.iis {
clist = append(clist, ii)
}

clist = append(clist, mf.iis...)
for _, item := range clist {
if item != nil {
if item.decompressor != nil {
Expand Down
14 changes: 8 additions & 6 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type SharedDomains struct {
storage *btree2.Map[string, dataWithPrevStep]

domainWriters [kv.DomainLen]*domainBufferedWriter
iiWriters map[kv.InvertedIdx]*invertedIndexBufferedWriter
iiWriters []*invertedIndexBufferedWriter

currentChangesAccumulator *StateChangeSet
pastChangesAccumulator map[string]*StateChangeSet
Expand All @@ -114,12 +114,12 @@ type HasAgg interface {
func NewSharedDomains(tx kv.Tx, logger log.Logger) (*SharedDomains, error) {

sd := &SharedDomains{
logger: logger,
storage: btree2.NewMap[string, dataWithPrevStep](128),
iiWriters: map[kv.InvertedIdx]*invertedIndexBufferedWriter{},
logger: logger,
storage: btree2.NewMap[string, dataWithPrevStep](128),
//trace: true,
}
sd.SetTx(tx)
sd.iiWriters = make([]*invertedIndexBufferedWriter, len(sd.aggTx.iis))

sd.aggTx.a.DiscardHistory(kv.CommitmentDomain)

Expand Down Expand Up @@ -662,8 +662,10 @@ func (sd *SharedDomains) delAccountStorage(addr, loc []byte, preVal []byte, prev
}

func (sd *SharedDomains) IndexAdd(table kv.InvertedIdx, key []byte) (err error) {
if writer, ok := sd.iiWriters[table]; ok {
return writer.Add(key)
for _, writer := range sd.iiWriters {
if writer.iiId == table {
return writer.Add(key)
}
}
panic(fmt.Errorf("unknown index %s", table))
}
Expand Down
7 changes: 2 additions & 5 deletions erigon-lib/state/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@ func (ac *AggregatorRoTx) IntegrityInvertedIndexAllValuesAreInRange(ctx context.
}
default:
// check the ii
if v, ok := ac.iis[name]; ok {
err := v.IntegrityInvertedIndexAllValuesAreInRange(ctx, failFast, fromStep)
if err != nil {
return err
}
if v := ac.searchII(name); v != nil {
return v.IntegrityInvertedIndexAllValuesAreInRange(ctx, failFast, fromStep)
}
panic(fmt.Sprintf("unexpected: %s", name))
}
Expand Down
3 changes: 3 additions & 0 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type iiCfg struct {
aggregationStep uint64 // amount of transactions inside single aggregation step
keysTable string // bucket name for index keys; txnNum_u64 -> key (k+auto_increment)
valuesTable string // bucket name for index values; k -> txnNum_u64 , Needs to be table with DupSort
iiId kv.InvertedIdx

withExistence bool // defines if existence index should be built
compression seg.FileCompression // compression type for inverted index keys and values
Expand Down Expand Up @@ -381,6 +382,7 @@ type invertedIndexBufferedWriter struct {
txNum uint64
aggregationStep uint64
txNumBytes [8]byte
iiId kv.InvertedIdx
}

// loadFunc - is analog of etl.Identity, but it signaling to etl - use .Put instead of .AppendDup - to allow duplicates
Expand Down Expand Up @@ -455,6 +457,7 @@ func (iit *InvertedIndexRoTx) newWriter(tmpdir string, discard bool) *invertedIn
// etl collector doesn't fsync: means if have enough ram, all files produced by all collectors will be in ram
indexKeys: etl.NewCollector(iit.ii.filenameBase+".flush.ii.keys", tmpdir, etl.NewSortableBuffer(WALCollectorRAM), iit.ii.logger).LogLvl(log.LvlTrace),
index: etl.NewCollector(iit.ii.filenameBase+".flush.ii.vals", tmpdir, etl.NewSortableBuffer(WALCollectorRAM), iit.ii.logger).LogLvl(log.LvlTrace),
iiId: iit.ii.iiId,
}
w.indexKeys.SortAndFlushInBackground(true)
w.index.SortAndFlushInBackground(true)
Expand Down
10 changes: 9 additions & 1 deletion erigon-lib/state/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,4 +1082,12 @@ func hasCoverVisibleFile(visibleFiles []visibleFile, item *filesItem) bool {
}

func (ac *AggregatorRoTx) DbgDomain(idx kv.Domain) *DomainRoTx { return ac.d[idx] }
func (ac *AggregatorRoTx) DbgII(idx kv.InvertedIdx) *InvertedIndexRoTx { return ac.iis[idx] }
func (ac *AggregatorRoTx) DbgII(idx kv.InvertedIdx) *InvertedIndexRoTx { return ac.searchII(idx) }
func (ac *AggregatorRoTx) searchII(idx kv.InvertedIdx) *InvertedIndexRoTx {
for _, iit := range ac.iis {
sudeepdino008 marked this conversation as resolved.
Show resolved Hide resolved
if iit.ii.iiId == idx {
return iit
}
}
return nil
}
54 changes: 28 additions & 26 deletions erigon-lib/state/squeeze.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,26 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error {
return nil
}

rng := NewRangesV3()
rng.domain = [5]DomainRanges{
kv.AccountsDomain: {
name: kv.AccountsDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: ac.a.StepSize(),
},
kv.StorageDomain: {
name: kv.StorageDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: ac.a.StepSize(),
},
kv.CommitmentDomain: {
name: kv.CommitmentDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: ac.a.StepSize(),
rng := &RangesV3{
domain: [5]DomainRanges{
kv.AccountsDomain: {
name: kv.AccountsDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: ac.a.StepSize(),
},
kv.StorageDomain: {
name: kv.StorageDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: ac.a.StepSize(),
},
kv.CommitmentDomain: {
name: kv.CommitmentDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: ac.a.StepSize(),
},
},
}
sf, err := ac.staticFilesInRange(rng)
Expand Down Expand Up @@ -320,13 +321,14 @@ func (a *Aggregator) RebuildCommitmentFiles(ctx context.Context, rwDb kv.RwDB, t
acRo := a.BeginFilesRo() // this tx is used to read existing domain files and closed in the end
defer acRo.Close()

rng := NewRangesV3()
rng.domain = [5]DomainRanges{
kv.AccountsDomain: {
name: kv.AccountsDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: a.StepSize(),
rng := &RangesV3{
domain: [5]DomainRanges{
kv.AccountsDomain: {
name: kv.AccountsDomain,
values: MergeRange{true, 0, math.MaxUint64},
history: HistoryRanges{},
aggStep: a.StepSize(),
},
},
}
sf, err := acRo.staticFilesInRange(rng)
Expand Down
Loading