Skip to content

Commit

Permalink
switch quotes onto new candidate routes system; fix integration test …
Browse files Browse the repository at this point in the history
…flakiness (#395)

* switch quotes onto new candidate routes system; fix integration test flakiness

* updates

* updates
  • Loading branch information
p0mvn authored Jul 19, 2024
1 parent 055c719 commit bd8d5e2
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 79 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

# Changelog

## v25.5.0

- Switch quotes onto new candidate routes system

## v25.4.1

- Resolve goroutine leak stemming from creating a grpc connection for every cosmwasm pool in route. Share one connection.
Expand Down
4 changes: 1 addition & 3 deletions router/usecase/candidate_routes.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package usecase

import (
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/osmosis-labs/sqs/domain"
"github.com/osmosis-labs/sqs/domain/mvc"
Expand Down Expand Up @@ -192,7 +190,7 @@ func (c candidateRouteFinder) FindCandidateRoutes(tokenIn sdk.Coin, tokenOutDeno
return sqsdomain.CandidateRoutes{}, err
}
if len(rankedPools) == 0 {
return sqsdomain.CandidateRoutes{}, fmt.Errorf("no pools found for denom %s", currenTokenInDenom)
return sqsdomain.CandidateRoutes{}, nil
}

for i := 0; i < len(rankedPools) && len(routes) < options.MaxRoutes; i++ {
Expand Down
12 changes: 9 additions & 3 deletions router/usecase/dynamic_splits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/osmosis-labs/osmosis/osmomath"
"github.com/osmosis-labs/sqs/domain"
"github.com/osmosis-labs/sqs/router/usecase"
"github.com/osmosis-labs/sqs/router/usecase/route"
"github.com/osmosis-labs/sqs/router/usecase/routertesting"
Expand Down Expand Up @@ -46,12 +47,17 @@ func (s *RouterTestSuite) setupSplitsMainnetTestCase(displayDenomIn string, amou

ctx := context.TODO()

config := useCases.Router.GetConfig()

options := domain.CandidateRouteSearchOptions{
MaxRoutes: config.MaxRoutes,
MaxPoolsPerRoute: config.MaxPoolsPerRoute,
MinPoolLiquidityCap: config.MinPoolLiquidityCap,
}
// Get candidate routes
candidateRoutes, err := useCases.Router.GetCandidateRoutes(ctx, tokenIn, chainDenomOut)
candidateRoutes, err := useCases.CandidateRouteSearcher.FindCandidateRoutes(tokenIn, chainDenomOut, options)
s.Require().NoError(err)

config := useCases.Router.GetConfig()

// TODO: consider moving to interface.
routerUseCase, ok := useCases.Router.(*usecase.RouterUseCaseImpl)
s.Require().True(ok)
Expand Down
4 changes: 2 additions & 2 deletions router/usecase/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func ValidateAndFilterRoutes(candidateRoutes [][]candidatePoolWrapper, tokenInDe
return validateAndFilterRoutes(candidateRoutes, tokenInDenom, logger)
}

func (r *routerUseCaseImpl) HandleRoutes(ctx context.Context, pools []sqsdomain.PoolI, tokenIn sdk.Coin, tokenOutDenom string, maxRoutes, maxPoolsPerRoute int) (candidateRoutes sqsdomain.CandidateRoutes, err error) {
return r.handleCandidateRoutes(ctx, pools, tokenIn, tokenOutDenom, maxRoutes, maxPoolsPerRoute)
func (r *routerUseCaseImpl) HandleRoutes(ctx context.Context, tokenIn sdk.Coin, tokenOutDenom string, candidateRouteSearchOptions domain.CandidateRouteSearchOptions) (candidateRoutes sqsdomain.CandidateRoutes, err error) {
return r.handleCandidateRoutes(ctx, tokenIn, tokenOutDenom, candidateRouteSearchOptions)
}

func EstimateAndRankSingleRouteQuote(ctx context.Context, routes []route.RouteImpl, tokenIn sdk.Coin, logger log.Logger) (domain.Quote, []RouteWithOutAmount, error) {
Expand Down
2 changes: 1 addition & 1 deletion router/usecase/optimized_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (s *RouterTestSuite) TestGetOptimalQuote_Mainnet() {
// that provides no slippage swaps. Given that 100K is under the liqudiity of kava.USDT in the
// transmuter pool, the split routes should be essentially the same.
// Update: as of 30.06.24, the kava.usdt for osmo only has one optimal route.
const usdtOsmoExpectedRoutesHighLiq = 1
const usdtOsmoExpectedRoutesHighLiq = 2
var oneHundredThousandUSDValue = osmomath.NewInt(100_000_000_000)

tests := map[string]struct {
Expand Down
50 changes: 27 additions & 23 deletions router/usecase/router_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,8 @@ func (r *routerUseCaseImpl) GetOptimalQuote(ctx context.Context, tokenIn sdk.Coi
options.MinPoolLiquidityCap = r.ConvertMinTokensPoolLiquidityCapToFilter(dynamicMinPoolLiquidityCap)
}

poolsAboveMinLiquidity := r.getSortedPoolsShallowCopy()
// Zero implies no filtering, so we skip the iterations.
if options.MinPoolLiquidityCap > 0 {
poolsAboveMinLiquidity = FilterPoolsByMinLiquidity(poolsAboveMinLiquidity, options.MinPoolLiquidityCap)
}

r.logger.Info("filtered pools", zap.Int("num_pools", len(poolsAboveMinLiquidity)))

// Find candidate routes and rank them by direct quotes.
topSingleRouteQuote, rankedRoutes, err = r.computeAndRankRoutesByDirectQuote(ctx, poolsAboveMinLiquidity, tokenIn, tokenOutDenom, options)
topSingleRouteQuote, rankedRoutes, err = r.computeAndRankRoutesByDirectQuote(ctx, tokenIn, tokenOutDenom, options)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -355,11 +347,17 @@ func (r *routerUseCaseImpl) rankRoutesByDirectQuote(ctx context.Context, candida
}

// computeAndRankRoutesByDirectQuote computes candidate routes and ranks them by token out after estimating direct quotes.
func (r *routerUseCaseImpl) computeAndRankRoutesByDirectQuote(ctx context.Context, pools []sqsdomain.PoolI, tokenIn sdk.Coin, tokenOutDenom string, routingOptions domain.RouterOptions) (domain.Quote, []route.RouteImpl, error) {
func (r *routerUseCaseImpl) computeAndRankRoutesByDirectQuote(ctx context.Context, tokenIn sdk.Coin, tokenOutDenom string, routingOptions domain.RouterOptions) (domain.Quote, []route.RouteImpl, error) {
tokenInOrderOfMagnitude := GetPrecomputeOrderOfMagnitude(tokenIn.Amount)

candidateRouteSearchOptions := domain.CandidateRouteSearchOptions{
MaxRoutes: routingOptions.MaxRoutes,
MaxPoolsPerRoute: routingOptions.MaxPoolsPerRoute,
MinPoolLiquidityCap: routingOptions.MinPoolLiquidityCap,
}

// If top routes are not present in cache, retrieve unranked candidate routes
candidateRoutes, err := r.handleCandidateRoutes(ctx, pools, tokenIn, tokenOutDenom, routingOptions.MaxRoutes, routingOptions.MaxPoolsPerRoute)
candidateRoutes, err := r.handleCandidateRoutes(ctx, tokenIn, tokenOutDenom, candidateRouteSearchOptions)
if err != nil {
r.logger.Error("error handling routes", zap.Error(err))
return nil, nil, err
Expand Down Expand Up @@ -491,7 +489,21 @@ func (r *routerUseCaseImpl) GetCustomDirectQuoteMultiPool(ctx context.Context, t

// GetCandidateRoutes implements domain.RouterUsecase.
func (r *routerUseCaseImpl) GetCandidateRoutes(ctx context.Context, tokenIn sdk.Coin, tokenOutDenom string) (sqsdomain.CandidateRoutes, error) {
candidateRoutes, err := r.handleCandidateRoutes(ctx, r.getSortedPoolsShallowCopy(), tokenIn, tokenOutDenom, r.defaultConfig.MaxRoutes, r.defaultConfig.MaxPoolsPerRoute)
candidateRouteSearchOptions := domain.CandidateRouteSearchOptions{
MaxRoutes: r.defaultConfig.MaxRoutes,
MaxPoolsPerRoute: r.defaultConfig.MaxPoolsPerRoute,
MinPoolLiquidityCap: r.defaultConfig.MinPoolLiquidityCap,
}

// Get the dynamic min pool liquidity cap for the given token in and token out denoms.
dynamicMinPoolLiquidityCap, err := r.tokenMetadataHolder.GetMinPoolLiquidityCap(tokenIn.Denom, tokenOutDenom)
if err == nil {
// Set the dynamic min pool liquidity cap only if there is no error retrieving it.
// Otherwise, use the default.
candidateRouteSearchOptions.MinPoolLiquidityCap = r.ConvertMinTokensPoolLiquidityCapToFilter(dynamicMinPoolLiquidityCap)
}

candidateRoutes, err := r.handleCandidateRoutes(ctx, tokenIn, tokenOutDenom, candidateRouteSearchOptions)
if err != nil {
return sqsdomain.CandidateRoutes{}, err
}
Expand Down Expand Up @@ -601,7 +613,7 @@ func (r *routerUseCaseImpl) GetCachedRankedRoutes(ctx context.Context, tokenInDe
// - there is an error retrieving routes from cache
// - there are no routes cached and there is an error computing them
// - fails to persist the computed routes in cache
func (r *routerUseCaseImpl) handleCandidateRoutes(ctx context.Context, pools []sqsdomain.PoolI, tokenIn sdk.Coin, tokenOutDenom string, maxRoutes, maxPoolsPerRoutes int) (candidateRoutes sqsdomain.CandidateRoutes, err error) {
func (r *routerUseCaseImpl) handleCandidateRoutes(ctx context.Context, tokenIn sdk.Coin, tokenOutDenom string, candidateRouteSearchOptions domain.CandidateRouteSearchOptions) (candidateRoutes sqsdomain.CandidateRoutes, err error) {
r.logger.Debug("getting routes")

// Check cache for routes if enabled
Expand All @@ -619,8 +631,9 @@ func (r *routerUseCaseImpl) handleCandidateRoutes(ctx context.Context, pools []s
if !isFoundCached {
r.logger.Debug("calculating routes")

candidateRoutes, err = GetCandidateRoutes(pools, tokenIn, tokenOutDenom, maxRoutes, maxPoolsPerRoutes, r.logger)
candidateRoutes, err = r.candidateRouteSearcher.FindCandidateRoutes(tokenIn, tokenOutDenom, candidateRouteSearchOptions)
if err != nil {
r.logger.Error("error getting candidate routes for pricing", zap.Error(err))
return sqsdomain.CandidateRoutes{}, err
}

Expand Down Expand Up @@ -833,15 +846,6 @@ func (r *routerUseCaseImpl) GetConfig() domain.RouterConfig {
return r.defaultConfig
}

// getSortedPoolsShallowCopy returns a shallow copy of the sorted pools.
// This is to avoid concurrent read and write to the same address by ingester.
func (r *routerUseCaseImpl) getSortedPoolsShallowCopy() []sqsdomain.PoolI {
r.sortedPoolsMu.RLock()
poolsCopy := r.sortedPools
r.sortedPoolsMu.RUnlock()
return poolsCopy
}

// filterOutGeneralizedCosmWasmPoolRoutes filters out routes that contain generalized cosm wasm pool.
// The reason for this is that making network requests to chain is expensive. Generalized cosmwasm pools
// make such network requests.
Expand Down
99 changes: 65 additions & 34 deletions router/usecase/router_usecase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,15 @@ func (s *RouterTestSuite) TestHandleRoutes() {
},
)

defaultSinglePools = []sqsdomain.PoolI{defaultPool}
recomputedRoute = WithCandidateRoutePools(
EmptyCandidateRoute,
[]sqsdomain.CandidatePool{
{
ID: defaultPool.GetId() + 1,
TokenOutDenom: tokenOutDenom,
},
},
)

singleDefaultRoutes = sqsdomain.CandidateRoutes{
Routes: []sqsdomain.CandidateRoute{defaultRoute},
Expand All @@ -137,7 +145,12 @@ func (s *RouterTestSuite) TestHandleRoutes() {
},
}

emptyPools = []sqsdomain.PoolI{}
singeldRecomputedRoutes = sqsdomain.CandidateRoutes{
Routes: []sqsdomain.CandidateRoute{recomputedRoute},
UniquePoolIDs: map[uint64]struct{}{
defaultPool.GetId() + 1: {},
},
}

emptyRoutes = sqsdomain.CandidateRoutes{}

Expand All @@ -157,7 +170,6 @@ func (s *RouterTestSuite) TestHandleRoutes() {
name string

repositoryRoutes sqsdomain.CandidateRoutes
repositoryPools []sqsdomain.PoolI
takerFeeMap sqsdomain.TakerFeeMap
isCacheDisabled bool
shouldSkipAddToCache bool
Expand All @@ -171,7 +183,6 @@ func (s *RouterTestSuite) TestHandleRoutes() {
name: "routes in cache -> use them",

repositoryRoutes: singleDefaultRoutes,
repositoryPools: emptyPools,

expectedCandidateRoutes: singleDefaultRoutes,
expectedIsCached: true,
Expand All @@ -180,36 +191,32 @@ func (s *RouterTestSuite) TestHandleRoutes() {
name: "cache is disabled in config -> recomputes routes despite having available in cache",

repositoryRoutes: singleDefaultRoutes,
repositoryPools: emptyPools,
isCacheDisabled: true,

expectedCandidateRoutes: emptyRoutes,
expectedCandidateRoutes: singeldRecomputedRoutes,
expectedIsCached: false,
},
{
name: "no routes in cache but relevant pools in store -> recomputes routes & caches them",
name: "no routes in cache -> recomputes routes & caches them",

repositoryRoutes: emptyRoutes,
repositoryPools: defaultSinglePools,
shouldSkipAddToCache: true,

expectedCandidateRoutes: singleDefaultRoutes,
expectedCandidateRoutes: singeldRecomputedRoutes,
expectedIsCached: true,
},
{
name: "empty routes in cache but relevant pools in store -> does not recompute routes",
name: "empty routes in cache-> does not recompute routes",

repositoryRoutes: emptyRoutes,
repositoryPools: defaultSinglePools,

expectedCandidateRoutes: emptyRoutes,
expectedIsCached: true,
},
{
name: "no routes in cache and no relevant pools in store -> returns no routes & caches them",
name: "no routes in cache and -> returns no routes & caches them",

repositoryRoutes: emptyRoutes,
repositoryPools: emptyPools,

expectedCandidateRoutes: emptyRoutes,
expectedIsCached: true,
Expand All @@ -235,31 +242,30 @@ func (s *RouterTestSuite) TestHandleRoutes() {
candidateRouteCache.Set(usecase.FormatCandidateRouteCacheKey(tokenInDenom, tokenOutDenom), tc.repositoryRoutes, time.Hour)
}

poolsUseCaseMock := &mocks.PoolsUsecaseMock{
// These are the pools returned by the call to GetAllPools
Pools: tc.repositoryPools,
}
poolsUseCaseMock := &mocks.PoolsUsecaseMock{}

tokenMetaDataHolder := mocks.TokenMetadataHolderMock{}
candidateRouteFinderMock := mocks.CandidateRouteFinderMock{}
candidateRouteFinderMock := mocks.CandidateRouteFinderMock{
Routes: tc.expectedCandidateRoutes,
}

routerUseCase := usecase.NewRouterUsecase(routerRepositoryMock, poolsUseCaseMock, candidateRouteFinderMock, &tokenMetaDataHolder, domain.RouterConfig{
RouteCacheEnabled: !tc.isCacheDisabled,
}, emptyCosmWasmPoolsRouterConfig, &log.NoOpLogger{}, cache.New(), candidateRouteCache)

// Validate and sort pools
sortedPools := usecase.ValidateAndSortPools(tc.repositoryPools, emptyCosmWasmPoolsRouterConfig, []uint64{}, noOpLogger)

// Filter pools by min liquidity
sortedPools = usecase.FilterPoolsByMinLiquidity(sortedPools, minPoolLiquidityCap)

routerUseCaseImpl, ok := routerUseCase.(*usecase.RouterUseCaseImpl)
s.Require().True(ok)

// System under test
ctx := context.Background()
// TODO: filter pools per router config
actualCandidateRoutes, err := routerUseCaseImpl.HandleRoutes(ctx, sortedPools, sdk.NewCoin(tokenInDenom, one), tokenOutDenom, defaultRouterConfig.MaxRoutes, defaultRouterConfig.MaxPoolsPerRoute)

candidateRouteSearchOptions := domain.CandidateRouteSearchOptions{
MinPoolLiquidityCap: minPoolLiquidityCap,
MaxRoutes: defaultRouterConfig.MaxRoutes,
MaxPoolsPerRoute: defaultRouterConfig.MaxPoolsPerRoute,
}

// System under test
actualCandidateRoutes, err := routerUseCaseImpl.HandleRoutes(ctx, sdk.NewCoin(tokenInDenom, one), tokenOutDenom, candidateRouteSearchOptions)

if tc.expectedError != nil {
s.Require().EqualError(err, tc.expectedError.Error())
Expand All @@ -277,6 +283,14 @@ func (s *RouterTestSuite) TestHandleRoutes() {
}

cachedCandidateRoutes, isCached, err := routerUseCaseImpl.GetCachedCandidateRoutes(ctx, tokenInDenom, tokenOutDenom)

if tc.isCacheDisabled {
s.Require().NoError(err)
s.Require().Empty(cachedCandidateRoutes.Routes)
s.Require().False(isCached)
return
}

// For the case where the cache is disabled, the expected routes in cache
// will be the same as the original routes in the repository.
// Check that router repository was updated
Expand Down Expand Up @@ -612,7 +626,7 @@ func (s *RouterTestSuite) TestGetOptimalQuote_Cache_Overwrites() {

// For the default amount in, we expect this pool to be returned.
// See test description above for details.
expectedRoutePoolID: poolID1400Concentrated,
expectedRoutePoolID: poolID1265Concentrated,
},
"cache is set to balancer - overwrites computed": {
amountIn: defaultAmountInCache,
Expand All @@ -634,7 +648,7 @@ func (s *RouterTestSuite) TestGetOptimalQuote_Cache_Overwrites() {
cacheExpiryDuration: time.Nanosecond,

// We expect this pool because the cache with balancer pool expires.
expectedRoutePoolID: poolID1400Concentrated,
expectedRoutePoolID: poolID1265Concentrated,
},
}

Expand Down Expand Up @@ -694,7 +708,7 @@ func (s *RouterTestSuite) TestGetCandidateRoutes_Chain_FindUnsupportedRoutes() {
const (
// This was selected by looking at the routes and concluding that it's
// probably fine. Might need to re-evaluate in the future.
expectedZeroPoolCount = 38
expectedZeroPoolCount = 39
)

viper.SetConfigFile("../../config.json")
Expand Down Expand Up @@ -722,9 +736,20 @@ func (s *RouterTestSuite) TestGetCandidateRoutes_Chain_FindUnsupportedRoutes() {
s.Require().NotZero(len(tokenMetadata))
for chainDenom, tokenMeta := range tokenMetadata {

routes, err := usecase.GetCandidateRoutes(mainnetState.Pools, sdk.NewCoin(chainDenom, one), USDC, config.Router.MaxRoutes, config.Router.MaxPoolsPerRoute, noOpLogger)
minPoolLiquidityCap, err := mainnetUsecase.Tokens.GetMinPoolLiquidityCap(chainDenom, USDC)
s.Require().NoError(err)

minPoolLiquidityCapFilter := mainnetUsecase.Router.ConvertMinTokensPoolLiquidityCapToFilter(minPoolLiquidityCap)

options := domain.CandidateRouteSearchOptions{
MinPoolLiquidityCap: minPoolLiquidityCapFilter,
MaxRoutes: config.Router.MaxRoutes,
MaxPoolsPerRoute: config.Router.MaxPoolsPerRoute,
}

routes, err := mainnetUsecase.CandidateRouteSearcher.FindCandidateRoutes(sdk.NewCoin(chainDenom, one), USDC, options)
if err != nil {
fmt.Printf("Error for %s -- %s\n", chainDenom, tokenMeta.HumanDenom)
fmt.Printf("Error for %s -- %s -- %v\n", chainDenom, tokenMeta.HumanDenom, err)
errorCounter++
continue
}
Expand Down Expand Up @@ -752,9 +777,15 @@ func (s *RouterTestSuite) TestGetCandidateRoutes_Chain_FindUnsupportedRoutes() {

for chainDenom, tokenMeta := range tokenMetadata {

routes, err := usecase.GetCandidateRoutes(mainnetState.Pools, sdk.NewCoin(chainDenom, one), USDC, config.Router.MaxRoutes, config.Router.MaxPoolsPerRoute, noOpLogger)
options := domain.CandidateRouteSearchOptions{
MinPoolLiquidityCap: 0,
MaxRoutes: config.Router.MaxRoutes,
MaxPoolsPerRoute: config.Router.MaxPoolsPerRoute,
}

routes, err := mainnetUsecase.CandidateRouteSearcher.FindCandidateRoutes(sdk.NewCoin(chainDenom, one), USDC, options)
if err != nil {
fmt.Printf("Error for %s -- %s\n", chainDenom, tokenMeta.HumanDenom)
fmt.Printf("Error for %s -- %s -- %v\n", chainDenom, tokenMeta.HumanDenom, err)
errorCounter++
continue
}
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion router/usecase/routertesting/parsing/pools.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion router/usecase/routertesting/parsing/taker_fees.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion router/usecase/routertesting/parsing/tokens.json

Large diffs are not rendered by default.

Loading

0 comments on commit bd8d5e2

Please sign in to comment.