Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] make IndexRange agnostic of business logic #13442

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ type Aggregator struct {

onFreeze kv.OnFreezeFunc

ps *background.ProgressSet
ps *background.ProgressSet
iiDomainMp map[kv.InvertedIdx]kv.Domain // ii to domain (in which they belong)

// next fields are set only if agg.doTraceCtx is true. can enable by env: TRACE_AGG=true
leakDetector *dbg.LeakDetector
Expand Down Expand Up @@ -142,6 +143,7 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
collateAndBuildWorkers: 1,
mergeWorkers: 1,
iis: make(map[kv.InvertedIdx]*InvertedIndex),
iiDomainMp: make(map[kv.InvertedIdx]kv.Domain),

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,

Expand Down Expand Up @@ -193,7 +195,7 @@ func getStateIndicesSalt(baseDir string) (salt *uint32, err error) {
return salt, nil
}

func (a *Aggregator) registerDomain(name kv.Domain, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, logger log.Logger) (err error) {
func (a *Aggregator) registerDomain(name kv.Domain, historyIdx kv.InvertedIdx, salt *uint32, dirs datadir.Dirs, aggregationStep uint64, logger log.Logger) (err error) {
cfg := Schema[name]
//TODO: move dynamic part of config to InvertedIndex
cfg.restrictSubsetFileDeletions = a.commitmentValuesTransform
Expand All @@ -207,6 +209,7 @@ func (a *Aggregator) registerDomain(name kv.Domain, salt *uint32, dirs datadir.D
if err != nil {
return err
}
a.iiDomainMp[historyIdx] = name
return nil
}

Expand Down Expand Up @@ -1560,25 +1563,16 @@ func (ac *AggregatorRoTx) HistoryStartFrom(domainName kv.Domain) uint64 {
}

func (ac *AggregatorRoTx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int, tx kv.Tx) (timestamps stream.U64, err error) {
switch name {
case kv.AccountsHistoryIdx:
return ac.d[kv.AccountsDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.StorageHistoryIdx:
return ac.d[kv.StorageDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.CodeHistoryIdx:
return ac.d[kv.CodeDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.CommitmentHistoryIdx:
return ac.d[kv.StorageDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
case kv.ReceiptHistoryIdx:
return ac.d[kv.ReceiptDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
default:
// check the ii
if v, ok := ac.iis[name]; ok {
return v.IdxRange(k, fromTs, toTs, asc, limit, tx)
}

return nil, fmt.Errorf("unexpected history name: %s", name)
// search in domain
if v, ok := ac.a.iiDomainMp[name]; ok {
return ac.d[v].ht.IdxRange(k, fromTs, toTs, asc, limit, tx)
}
// next check the ii
if v, ok := ac.iis[name]; ok {
return v.IdxRange(k, fromTs, toTs, asc, limit, tx)
}

return nil, fmt.Errorf("unexpected history name: %s", name)
}

// -- range end
Expand Down
10 changes: 5 additions & 5 deletions erigon-lib/state/aggregator2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ func NewAggregator2(ctx context.Context, dirs datadir.Dirs, aggregationStep uint
if err != nil {
return nil, err
}
if err := a.registerDomain(kv.AccountsDomain, salt, dirs, aggregationStep, logger); err != nil {
if err := a.registerDomain(kv.AccountsDomain, kv.AccountsHistoryIdx, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.StorageDomain, salt, dirs, aggregationStep, logger); err != nil {
if err := a.registerDomain(kv.StorageDomain, kv.StorageHistoryIdx, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.CodeDomain, salt, dirs, aggregationStep, logger); err != nil {
if err := a.registerDomain(kv.CodeDomain, kv.CodeHistoryIdx, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.CommitmentDomain, salt, dirs, aggregationStep, logger); err != nil {
if err := a.registerDomain(kv.CommitmentDomain, kv.CommitmentHistoryIdx, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerDomain(kv.ReceiptDomain, salt, dirs, aggregationStep, logger); err != nil {
if err := a.registerDomain(kv.ReceiptDomain, kv.ReceiptHistoryIdx, salt, dirs, aggregationStep, logger); err != nil {
return nil, err
}
if err := a.registerII(kv.LogAddrIdx, salt, dirs, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil {
Expand Down
Loading