Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Liquidity module sync #24

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions database/liquidity.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ VALUES ($1, $2, $3, $4, $5, $6)`
func (db *CyberDb) SaveSwap(
address string,
poolID uint64,
orderPrice sdk.Dec,
swapPrice sdk.Dec,
exchangedOfferCoin sdk.Coin,
exchangedDemandCoin sdk.Coin,
exchangedOfferCoinFee sdk.Coin,
exchangedDemandCoinFee sdk.Coin,
height int64,
timestamp time.Time,
) error {
stmt := `
INSERT INTO swaps (pool_id, address, swap_price, exchanged_offer_coin, exchanged_demand_coin, exchanged_offer_coin_fee, exchanged_demand_coin_fee, height)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`
INSERT INTO swaps (pool_id, address, order_price, swap_price, exchanged_offer_coin, exchanged_demand_coin, exchanged_offer_coin_fee, exchanged_demand_coin_fee, height, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`
exchangedOfferCoinDb := bddbtypes.DbCoin{Amount: exchangedOfferCoin.Amount.String(), Denom: exchangedOfferCoin.Denom}
exchangedOfferCoinValue, err := exchangedOfferCoinDb.Value()
if err != nil {
Expand All @@ -68,12 +70,49 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`
stmt,
poolID,
address,
orderPrice.String(),
swapPrice.String(),
exchangedOfferCoinValue,
exchangedDemandCoinValue,
exchangedOfferCoinFeeValue,
exchangedDemandCoinFeeValue,
height,
timestamp,
)
return err
}

func (db *CyberDb) SaveFailedSwap(
address string,
poolID uint64,
orderPrice sdk.Dec,
exchangedOfferCoin sdk.Coin,
remainingOfferCoin sdk.Coin,
height int64,
timestamp time.Time,
) error {
stmt := `
INSERT INTO failed_swaps (pool_id, address, order_price, exchanged_offer_coin, remaining_offer_coin, height, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7)`
exchangedOfferCoinDb := bddbtypes.DbCoin{Amount: exchangedOfferCoin.Amount.String(), Denom: exchangedOfferCoin.Denom}
exchangedOfferCoinValue, err := exchangedOfferCoinDb.Value()
if err != nil {
return fmt.Errorf("error while converting coin to dbcoin: %s", err)
}
exchangedDemandCoinDb := bddbtypes.DbCoin{Amount: remainingOfferCoin.Amount.String(), Denom: remainingOfferCoin.Denom}
exchangedDemandCoinValue, err := exchangedDemandCoinDb.Value()
if err != nil {
return fmt.Errorf("error while converting coin to dbcoin: %s", err)
}
_, err = db.Sql.Exec(
stmt,
poolID,
address,
orderPrice.String(),
exchangedOfferCoinValue,
exchangedDemandCoinValue,
height,
timestamp,
)
return err
}
Expand Down
37 changes: 26 additions & 11 deletions database/schema/08-liquidity.sql
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
CREATE TABLE swaps
CREATE TABLE pools
(
pool_id BIGINT NOT NULL PRIMARY KEY,
pool_name TEXT NOT NULL,
address TEXT NOT NULL,
a_denom TEXT NOT NULL,
b_denom TEXT NOT NULL,
pool_denom TEXT NOT NULL
);

CREATE TABLE swaps
(
id SERIAL PRIMARY KEY,
pool_id BIGINT NOT NULL REFERENCES pools (pool_id),
address TEXT NOT NULL,
order_price TEXT NOT NULL,
swap_price TEXT NOT NULL,
exchanged_offer_coin COIN NOT NULL,
exchanged_demand_coin COIN NOT NULL,
exchanged_offer_coin_fee COIN NOT NULL,
exchanged_demand_coin_fee COIN NOT NULL,
height BIGINT NOT NULL REFERENCES block (height)
height BIGINT NOT NULL REFERENCES block (height),
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

CREATE TABLE pools
CREATE TABLE failed_swaps
(
pool_id BIGINT NOT NULL PRIMARY KEY,
pool_name TEXT NOT NULL,
id SERIAL PRIMARY KEY,
pool_id BIGINT NOT NULL REFERENCES pools (pool_id),
address TEXT NOT NULL,
a_denom TEXT NOT NULL,
b_denom TEXT NOT NULL,
pool_denom TEXT NOT NULL
order_price TEXT NOT NULL,
exchanged_offer_coin COIN NOT NULL,
remaining_offer_coin COIN NOT NULL,
height BIGINT NOT NULL REFERENCES block (height),
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

CREATE TABLE pools_volumes
(
pool_id BIGINT NOT NULL,
pool_id BIGINT NOT NULL REFERENCES pools (pool_id),
volume_a BIGINT NOT NULL,
volume_b BIGINT NOT NULL,
fee_a BIGINT NOT NULL,
Expand All @@ -32,15 +47,15 @@ CREATE TABLE pools_volumes

CREATE TABLE pools_liquidity
(
pool_id BIGINT NOT NULL,
pool_id BIGINT NOT NULL REFERENCES pools (pool_id),
liquidity_a BIGINT NOT NULL,
liquidity_b BIGINT NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

CREATE TABLE pools_rates
(
pool_id BIGINT NOT NULL,
pool_id BIGINT NOT NULL REFERENCES pools (pool_id),
rate TEXT NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
117 changes: 83 additions & 34 deletions modules/liquidity/handle_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package liquidity
import (
"fmt"
sdk "github.com/cosmos/cosmos-sdk/types"
dbtypes "github.com/cybercongress/cyberindex/database"
"github.com/forbole/juno/v3/types"
"github.com/rs/zerolog/log"
liquiditytypes "github.com/tendermint/liquidity/x/liquidity/types"
Expand All @@ -26,8 +25,9 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve
attrs := eventAttrsFromEvent(event)
status, err := attrs.SwapStatus()
if err != nil {
return err
continue
}

if status {
addr, err := attrs.SwapRequesterAddr()
if err != nil {
Expand All @@ -43,24 +43,44 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve
return err
}

orderPrice, err := attrs.OrderPrice()
if err != nil {
return err
}

exchangedOfferCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueExchangedOfferCoinAmount)
if err != nil {
panic(err)
}
exchangedDemandCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueDemandCoinDenom, liquiditytypes.AttributeValueExchangedDemandCoinAmount)
if err != nil {
panic(err)
}
exchangedOfferCoinFee, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueOfferCoinFeeAmount)
if err != nil {
panic(err)
}
feeDec, err := attrs.DecCoinAttrs(liquiditytypes.AttributeValueDemandCoinDenom, liquiditytypes.AttributeValueExchangedCoinFeeAmount)
if err != nil {
panic(err)
}
exchangedDemandCoinFee := sdk.NewCoin(feeDec.Denom, feeDec.Amount.Ceil().TruncateInt())

err = m.db.SaveSwap(
addr,
poolID,
orderPrice,
swapPrice,
exchangedOfferCoin,
exchangedDemandCoin,
exchangedOfferCoinFee,
exchangedDemandCoinFee,
height,
timestamp,
)
if err != nil {
fmt.Errorf("error while saving swap: %s", err)
panic(err)
}

poolsRateMap[poolID] = swapPrice
Expand Down Expand Up @@ -89,7 +109,9 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve
timestamp,
)
if err != nil {
fmt.Errorf("error while saving volume: %s", err)
log.Error().Str("module", "liquidity").Err(fmt.Errorf("error while saving volume: %s", err))
// TODO fails of pool 20 (ibc/4B322204B4F59D770680FE4D7A565DDC3F37BFF035474B717476C66A4F83DD72 - decimal 18)
//panic(err)
}
}

Expand All @@ -101,46 +123,23 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve
)
if err != nil {
fmt.Errorf("error while saving rate: %s", err)
panic(err)
}
}

for poolID := range poolsRateMap {
// return err nil if not found and default pool row, later will panic on error
pool, err := m.db.GetPoolInfo(poolID)
if err == nil {
// if we sync from non zero and parse current block
// than save pool not from message but with query from application state
if len(pool.Address) == 0 {
poolState, err := m.keeper.GetPool(poolID, height)
if err != nil {
panic(err)
}
pool = dbtypes.PoolRow{
PoolId: int64(poolState.Id),
PoolName: poolState.Name(),
Address: poolState.ReserveAccountAddress,
ADenom: poolState.ReserveCoinDenoms[0],
BDenom: poolState.ReserveCoinDenoms[1],
PoolDenom: poolState.PoolCoinDenom,
}
err = m.db.SavePool(
poolState.Id,
poolState.ReserveAccountAddress,
poolState.Name(),
poolState.ReserveCoinDenoms[0],
poolState.ReserveCoinDenoms[1],
poolState.PoolCoinDenom,
)
if err != nil {
panic(err)
}
}

// NOTE - already updated as transacted, but check the flow anyway
err := m.bankModule.RefreshBalances(height, []string{pool.Address})
if err != nil {
//error while storing up-to-date balances: error while storing up-to-date balances: pq: insert or update on table "account_balance" violates foreign key constraint "account_balance_address_fkey"
log.Debug().Str("module", "liquidity").Err(err)
//panic(err)
}
// if not in db that query application state

// NOTE if not in db than query application state
poolBalances, err := m.db.GetAccountBalance(pool.Address)
if len(poolBalances) == 0 {
balances, err := m.bankModule.GetBalances([]string{pool.Address}, height)
Expand All @@ -166,13 +165,52 @@ func (m *Module) executePoolBatches(height int64, endBlockEvents []abcitypes.Eve
log.Debug().Str("module", "liquidity").Err(err)
}
}
} else {
fmt.Errorf("error while saving swap: %s", "FAILED_SWAP")

addr, err := attrs.SwapRequesterAddr()
if err != nil {
return err
}
poolID, err := attrs.PoolID()
if err != nil {
return err
}

orderPrice, err := attrs.OrderPrice()
if err != nil {
return err
}

exchangedOfferCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueExchangedOfferCoinAmount)
if err != nil {
panic(err)
}
remainingOfferCoin, err := attrs.CoinAttrs(liquiditytypes.AttributeValueOfferCoinDenom, liquiditytypes.AttributeValueRemainingOfferCoinAmount)
if err != nil {
panic(err)
}

err = m.db.SaveFailedSwap(
addr,
poolID,
orderPrice,
exchangedOfferCoin,
remainingOfferCoin,
height,
timestamp,
)
if err != nil {
fmt.Errorf("error while saving swap: %s", err)
panic(err)
}
}
}

return nil
}

// not used, refactor
// not used, to delete before merge
func (m *Module) executePoolCreations(height int64, beginBlockEvents []abcitypes.Event, timestamp time.Time) error {
events := types.FindEventsByType(beginBlockEvents, liquiditytypes.EventTypeCreatePool)
for _, event := range events {
Expand Down Expand Up @@ -311,6 +349,18 @@ func (attrs EventAttributes) SwapPrice() (sdk.Dec, error) {
return d, nil
}

func (attrs EventAttributes) OrderPrice() (sdk.Dec, error) {
v, err := attrs.Attr(liquiditytypes.AttributeValueOrderPrice)
if err != nil {
return sdk.Dec{}, err
}
d, err := sdk.NewDecFromStr(v)
if err != nil {
return sdk.Dec{}, fmt.Errorf("parse swap price: %w", err)
}
return d, nil
}

func (attrs EventAttributes) CoinAttrs(denomKey, amountKey string) (sdk.Coin, error) {
denom, err := attrs.Attr(denomKey)
if err != nil {
Expand Down Expand Up @@ -383,7 +433,6 @@ func (attrs EventAttributes) ReserveAccount() (string, error) {
return v, nil
}

//sdk.NewAttribute(types.AttributeValueDepositCoins, msg.DepositCoins.String()),
func (attrs EventAttributes) PoolDepositCoins() ([]sdk.Coin, error) {
v, err := attrs.Attr(liquiditytypes.AttributeValueDepositCoins)
coins := strings.Split(v, ",")
Expand Down
Loading