diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e018df1..8e0a6877 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - Worker pool abstraction implementation & tests - Switch prices to worker pool and remove concurrency for quotes since we support only one quote denom atm. - Switch ingest block processing system to rely on worker pool with 2 block processing workers. +- Wait for cold-start (first block) to be processed before starting the next block to avoid overloading the system. ## v25.2.0 diff --git a/docs/architecture/ingest.md b/docs/architecture/ingest.md index 2478ff8c..da53961e 100644 --- a/docs/architecture/ingest.md +++ b/docs/architecture/ingest.md @@ -23,6 +23,11 @@ under a few seconds. Additionally, this mechanism helps to control resources and avoid overloading the system at cold start with many pre-computation requests. +We keep a wait group in the `mvc.IngestUsecase` implementation to wait for the first block to finish processing before starting the next one. + +Given the above architecture with block process jobs being queued up during start-up, the workers must differentiate updates by height. +That is, if a block process job for height X is being processed to compute a price for uosmo when uosmo already has a price for height X+1, the worker must discard the update for height X. + ## Parsing Block Pool Metadata Since we may push either all pools or only the ones updated within a block, we diff --git a/domain/pricing.go b/domain/pricing.go index 804b77bd..9f24cdcd 100644 --- a/domain/pricing.go +++ b/domain/pricing.go @@ -120,10 +120,16 @@ func FormatPricingCacheKey(a, b string) string { } type PricingWorker interface { - // UpdatePrices updates prices for the tokens from the unique block pool metadata + // UpdatePricesAsync updates prices for the tokens from the unique block pool metadata // that contains information about changed denoms and pools within a block. // Propagates the results to the listeners. + // Performs the update asynchronously. UpdatePricesAsync(height uint64, uniqueBlockPoolMetaData BlockPoolMetadata) + // UpdatePricesSync updates prices for the tokens from the unique block pool metadata + // that contains information about changed denoms and pools within a block. + // Propagates the results to the listeners. + // Performs the update synchronously. + UpdatePricesSync(height uint64, uniqueBlockPoolMetaData BlockPoolMetadata) // RegisterListener registers a listener for pricing updates. RegisterListener(listener PricingUpdateListener) diff --git a/ingest/usecase/ingest_usecase.go b/ingest/usecase/ingest_usecase.go index b222948d..4f4d6db6 100644 --- a/ingest/usecase/ingest_usecase.go +++ b/ingest/usecase/ingest_usecase.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "strings" + "sync" + "sync/atomic" "time" "github.com/cosmos/cosmos-sdk/codec" @@ -36,6 +38,11 @@ type ingestUseCase struct { // Worker that computes prices for all tokens with the default quote. defaultQuotePriceUpdateWorker domain.PricingWorker + // Flag to check if the first block has been processed. + hasProcessedFirstBlock atomic.Bool + // Wait group to wait for the first block to be processed. + firstBlockWg sync.WaitGroup + logger log.Logger } @@ -63,6 +70,8 @@ func NewIngestUsecase(poolsUseCase mvc.PoolsUsecase, routerUseCase mvc.RouterUse logger: logger, defaultQuotePriceUpdateWorker: quotePriceUpdateWorker, + + hasProcessedFirstBlock: atomic.Bool{}, }, nil } @@ -95,9 +104,26 @@ func (p *ingestUseCase) ProcessBlockData(ctx context.Context, height uint64, tak p.sortAndStorePools(allPools) - // Note: we must queue the update before we start updating prices as pool liquidity - // worker listens for the pricing updates at the same height. - p.defaultQuotePriceUpdateWorker.UpdatePricesAsync(height, uniqueBlockPoolMetadata) + if !p.hasProcessedFirstBlock.Load() { + // For the first block, we need to update the prices synchronously. + // and let any subsequent block wait before starting its computation + // to avoid overloading the system. + p.firstBlockWg.Add(1) + defer p.firstBlockWg.Done() + + // Pre-compute the prices for all + p.defaultQuotePriceUpdateWorker.UpdatePricesSync(height, uniqueBlockPoolMetadata) + + p.hasProcessedFirstBlock.Store(true) + } else { + + // Wait for the first block to be processed before + // updating the prices for the next block. + p.firstBlockWg.Wait() + + // For any block after the first block, we can update the prices asynchronously. + p.defaultQuotePriceUpdateWorker.UpdatePricesAsync(height, uniqueBlockPoolMetadata) + } // Store the latest ingested height. p.chainInfoUseCase.StoreLatestHeight(height) diff --git a/tokens/usecase/pricing/worker/pricing_worker.go b/tokens/usecase/pricing/worker/pricing_worker.go index 75548d9c..b5d7c8b6 100644 --- a/tokens/usecase/pricing/worker/pricing_worker.go +++ b/tokens/usecase/pricing/worker/pricing_worker.go @@ -38,10 +38,10 @@ func New(tokensUseCase mvc.TokensUsecase, quoteDenom string, minLiquidityCap uin // UpdatePrices implements PricingWorker. func (p *pricingWorker) UpdatePricesAsync(height uint64, uniqueBlockPoolMetaData domain.BlockPoolMetadata) { - go p.UpdatePrices(height, uniqueBlockPoolMetaData) + go p.UpdatePricesSync(height, uniqueBlockPoolMetaData) } -func (p *pricingWorker) UpdatePrices(height uint64, uniqueBlockPoolMetaData domain.BlockPoolMetadata) { +func (p *pricingWorker) UpdatePricesSync(height uint64, uniqueBlockPoolMetaData domain.BlockPoolMetadata) { baseDenoms := domain.KeysFromMap(uniqueBlockPoolMetaData.UpdatedDenoms) ctx, cancel := context.WithTimeout(context.Background(), priceUpdateTimeout)