Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: queue block process updates waiting on pre-computation to be finished before starting next #370

Merged
merged 2 commits into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- Worker pool abstraction implementation & tests
- Switch prices to worker pool and remove concurrency for quotes since we support only one quote denom atm.
- Switch ingest block processing system to rely on worker pool with 2 block processing workers.
- Wait for cold-start (first block) to be processed before starting the next block to avoid overloading the system.

## v25.2.0

Expand Down
5 changes: 5 additions & 0 deletions docs/architecture/ingest.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ under a few seconds.

Additionally, this mechanism helps to control resources and avoid overloading the system at cold start with many pre-computation requests.

We keep a wait group in the `mvc.IngestUsecase` implementation to wait for the first block to finish processing before starting the next one.

Given the above architecture with block process jobs being queued up during start-up, the workers must differentiate updates by height.
That is, if a block process job for height X is being processed to compute a price for uosmo when uosmo already has a price for height X+1, the worker must discard the update for height X.

## Parsing Block Pool Metadata

Since we may push either all pools or only the ones updated within a block, we
Expand Down
8 changes: 7 additions & 1 deletion domain/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,16 @@ func FormatPricingCacheKey(a, b string) string {
}

type PricingWorker interface {
// UpdatePrices updates prices for the tokens from the unique block pool metadata
// UpdatePricesAsync updates prices for the tokens from the unique block pool metadata
// that contains information about changed denoms and pools within a block.
// Propagates the results to the listeners.
// Performs the update asynchronously.
UpdatePricesAsync(height uint64, uniqueBlockPoolMetaData BlockPoolMetadata)
// UpdatePricesSync updates prices for the tokens from the unique block pool metadata
// that contains information about changed denoms and pools within a block.
// Propagates the results to the listeners.
// Performs the update synchronously.
UpdatePricesSync(height uint64, uniqueBlockPoolMetaData BlockPoolMetadata)

// RegisterListener registers a listener for pricing updates.
RegisterListener(listener PricingUpdateListener)
Expand Down
32 changes: 29 additions & 3 deletions ingest/usecase/ingest_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cosmos/cosmos-sdk/codec"
Expand Down Expand Up @@ -36,6 +38,11 @@ type ingestUseCase struct {
// Worker that computes prices for all tokens with the default quote.
defaultQuotePriceUpdateWorker domain.PricingWorker

// Flag to check if the first block has been processed.
hasProcessedFirstBlock atomic.Bool
// Wait group to wait for the first block to be processed.
firstBlockWg sync.WaitGroup

logger log.Logger
}

Expand Down Expand Up @@ -63,6 +70,8 @@ func NewIngestUsecase(poolsUseCase mvc.PoolsUsecase, routerUseCase mvc.RouterUse
logger: logger,

defaultQuotePriceUpdateWorker: quotePriceUpdateWorker,

hasProcessedFirstBlock: atomic.Bool{},
p0mvn marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

Expand Down Expand Up @@ -95,9 +104,26 @@ func (p *ingestUseCase) ProcessBlockData(ctx context.Context, height uint64, tak

p.sortAndStorePools(allPools)

// Note: we must queue the update before we start updating prices as pool liquidity
// worker listens for the pricing updates at the same height.
p.defaultQuotePriceUpdateWorker.UpdatePricesAsync(height, uniqueBlockPoolMetadata)
if !p.hasProcessedFirstBlock.Load() {
// For the first block, we need to update the prices synchronously.
// and let any subsequent block wait before starting its computation
// to avoid overloading the system.
p.firstBlockWg.Add(1)
defer p.firstBlockWg.Done()

// Pre-compute the prices for all
p.defaultQuotePriceUpdateWorker.UpdatePricesSync(height, uniqueBlockPoolMetadata)

p.hasProcessedFirstBlock.Store(true)
} else {

// Wait for the first block to be processed before
// updating the prices for the next block.
p.firstBlockWg.Wait()

// For any block after the first block, we can update the prices asynchronously.
p.defaultQuotePriceUpdateWorker.UpdatePricesAsync(height, uniqueBlockPoolMetadata)
}

// Store the latest ingested height.
p.chainInfoUseCase.StoreLatestHeight(height)
Expand Down
4 changes: 2 additions & 2 deletions tokens/usecase/pricing/worker/pricing_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func New(tokensUseCase mvc.TokensUsecase, quoteDenom string, minLiquidityCap uin

// UpdatePrices implements PricingWorker.
func (p *pricingWorker) UpdatePricesAsync(height uint64, uniqueBlockPoolMetaData domain.BlockPoolMetadata) {
go p.UpdatePrices(height, uniqueBlockPoolMetaData)
go p.UpdatePricesSync(height, uniqueBlockPoolMetaData)
}

func (p *pricingWorker) UpdatePrices(height uint64, uniqueBlockPoolMetaData domain.BlockPoolMetadata) {
func (p *pricingWorker) UpdatePricesSync(height uint64, uniqueBlockPoolMetaData domain.BlockPoolMetadata) {
baseDenoms := domain.KeysFromMap(uniqueBlockPoolMetaData.UpdatedDenoms)

ctx, cancel := context.WithTimeout(context.Background(), priceUpdateTimeout)
Expand Down
Loading