Skip to content

Commit

Permalink
feat: 5153 router provider activity over time (#5224)
Browse files Browse the repository at this point in the history
* feat: update subgraph

* feat: add migrations

* chore: rebuild graph client for testnet subgraph redeploy

* feat: add adopted decimals to asset table

* fix: unit tests for db client

* feat: add adopted decimal to router balances view

* fix: unit tests for subgraph adapter

* feat: router liquidity events carto

* test: add unit tests for carto

* test: add unit tests for subgraph adapter

* fix: lint

* fix: lint for sdk-server
  • Loading branch information
liu-zhipeng authored Nov 29, 2023
1 parent bd8ae51 commit ab4251b
Show file tree
Hide file tree
Showing 67 changed files with 140,791 additions and 139,270 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- migrate:up
ALTER TABLE public.assets
ADD COLUMN IF NOT EXISTS adopted_decimal numeric DEFAULT 0;


-- migrate:down
ALTER TABLE assets
DROP COLUMN adopted_decimal;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- migrate:up
CREATE TYPE event_type AS ENUM (
'Add',
'Remove'
);

CREATE TABLE IF NOT EXISTS public.router_liquidity_events (
id character varying(255) NOT NULL UNIQUE,
domain character varying (255) NOT NULL,
router character (42) NOT NULL,
event event_type DEFAULT 'Add'::event_type NOT NULL,
asset character (42) NOT NULL,
amount numeric DEFAULT 0,
balance numeric DEFAULT 0,
block_number integer NOT NULL,
transaction_hash character(66) NOT NULL,
timestamp integer NOT NULL,
nonce numeric DEFAULT 0 NOT NULL,
PRIMARY KEY(id)
);

GRANT SELECT ON public.router_liquidity_events to query;
GRANT SELECT ON public.router_liquidity_events to reader;


-- migrate:down
DROP TABLE IF EXISTS public.router_liquidity_events;
DROP TYPE event_type;

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
-- migrate:up
DROP VIEW IF EXISTS public.router_tvl;
DROP VIEW IF EXISTS public.router_liquidity;
DROP VIEW IF EXISTS public.routers_with_balances;

CREATE OR REPLACE VIEW public.routers_with_balances AS
SELECT routers.address,
asset_balances.asset_canonical_id,
asset_balances.asset_domain,
asset_balances.router_address,
asset_balances.balance,
assets.local,
assets.adopted,
assets.canonical_id,
assets.canonical_domain,
assets.domain,
assets.key,
assets.id,
asset_balances.fees_earned,
asset_balances.locked,
asset_balances.supplied,
asset_balances.removed,
assets.decimal,
assets.adopted_decimal,
COALESCE (asset_prices.price,0) AS asset_usd_price,
(asset_prices.price * (asset_balances.balance::numeric / 10 ^ assets.decimal)) AS balance_usd,
(asset_prices.price * (asset_balances.fees_earned::numeric / 10 ^ assets.decimal)) AS fee_earned_usd,
(asset_prices.price * (asset_balances.locked::numeric / 10 ^ assets.decimal)) AS locked_usd,
(asset_prices.price * (asset_balances.supplied::numeric / 10 ^ assets.decimal)) AS supplied_usd,
(asset_prices.price * (asset_balances.removed::numeric / 10 ^ assets.decimal)) AS removed_usd
FROM (
routers
LEFT JOIN asset_balances ON routers.address = asset_balances.router_address
LEFT JOIN assets ON asset_balances.asset_canonical_id = assets.canonical_id AND asset_balances.asset_domain::text = assets.domain::text
LEFT JOIN asset_prices ON assets.canonical_id = asset_prices.canonical_id AND asset_prices.timestamp = (SELECT MAX(timestamp) FROM public.asset_prices)
);

CREATE OR REPLACE VIEW public.router_liquidity AS (
SELECT r.domain,
r.local,
r.adopted,
SUM(r.balance) As total_balance,
SUM(r.locked) As total_locked,
SUM(r.supplied) As total_supplied,
SUM(r.removed) As total_removed,
AVG(r.asset_usd_price) As avg_usd_price,
SUM((r.asset_usd_price * (r.balance::numeric / 10 ^ r.decimal))) As total_balance_usd,
SUM((r.asset_usd_price * (r.locked::numeric / 10 ^ r.decimal))) As total_locked_usd,
SUM((r.asset_usd_price * (r.supplied::numeric / 10 ^ r.decimal))) As total_supplied_usd,
SUM((r.asset_usd_price * (r.removed::numeric / 10 ^ r.decimal))) As total_removed_usd
FROM public.routers_with_balances r
GROUP BY 1,
2,
3
ORDER BY 1
);

CREATE OR REPLACE VIEW public.router_tvl AS
SELECT latest_transfer.latest_transfer_day,
router_tvl.asset,
router_tvl.tvl,
router_tvl.price,
router_tvl.tvl_usd
FROM (( SELECT rb.local AS asset,
sum(rb.balance) AS tvl,
avg(rb.asset_usd_price) AS price,
sum(rb.asset_usd_price * (rb.balance::numeric / 10 ^ rb.decimal)) AS tvl_usd
FROM public.routers_with_balances rb
GROUP BY rb.local) router_tvl
CROSS JOIN ( SELECT max((date_trunc('day'::text, to_timestamp((tf.xcall_timestamp)::double precision)))::date) AS latest_transfer_day
FROM public.transfers tf) latest_transfer);


-- migrate:down

31 changes: 30 additions & 1 deletion packages/adapters/database/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
StableSwapTransfer,
StableSwapLpBalance,
RootMessageStatus,
RouterLiquidityEvent,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
import * as db from "zapatos/db";
Expand Down Expand Up @@ -268,6 +269,21 @@ const convertToDbRouterDailyTVL = (tvl: RouterDailyTVL): s.daily_router_tvl.Inse
};
};

const convertToDbRouterLiquidityEvent = (event: RouterLiquidityEvent): s.router_liquidity_events.Insertable => {
return {
id: event.id,
domain: event.domain,
asset: event.asset,
router: event.router,
amount: event.amount,
balance: event.balance,
block_number: event.blockNumber,
transaction_hash: event.transactionHash,
timestamp: event.timestamp,
nonce: event.nonce,
};
};

const sanitizeNull = (obj: { [s: string]: any }): any => {
return Object.fromEntries(Object.entries(obj).filter(([_, v]) => v != null));
};
Expand Down Expand Up @@ -659,6 +675,7 @@ export const saveRouterBalances = async (
key: b.key,
id: b.id,
decimal: b.decimal as any,
adopted_decimal: b.adoptedDecimal as any,
local: b.localAsset,
adopted: b.adoptedAsset,
canonical_id: b.canonicalId,
Expand All @@ -667,7 +684,6 @@ export const saveRouterBalances = async (
},
};
});

await db
.upsert(
"assets",
Expand All @@ -693,6 +709,7 @@ export const saveAssets = async (assets: Asset[], _pool?: Pool | db.TxnClientFor
key: asset.key,
id: asset.id,
decimal: asset.decimal as any,
adopted_decimal: asset.adoptedDecimal as any,
local: asset.localAsset,
adopted: asset.adoptedAsset,
canonical_id: asset.canonicalId,
Expand Down Expand Up @@ -1239,6 +1256,18 @@ export const saveRouterDailyTVL = async (
await db.upsert("daily_router_tvl", tvls, ["id"]).run(poolToUse);
};

export const saveRouterLiquidityEvents = async (
_events: RouterLiquidityEvent[],
_pool?: Pool | db.TxnClientForRepeatableRead,
): Promise<void> => {
const poolToUse = _pool ?? pool;
const events: s.router_liquidity_events.Insertable[] = _events
.map((m) => convertToDbRouterLiquidityEvent(m))
.map(sanitizeNull);

await db.upsert("router_liquidity_events", events, ["id"]).run(poolToUse);
};

export const updateSlippage = async (
_slippageUpdates: SlippageUpdate[],
_pool?: Pool | db.TxnClientForRepeatableRead,
Expand Down
8 changes: 8 additions & 0 deletions packages/adapters/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
StableSwapTransfer,
StableSwapLpBalance,
RootMessageStatus,
RouterLiquidityEvent,
} from "@connext/nxtp-utils";
import { Pool } from "pg";
import { TxnClientForRepeatableRead } from "zapatos/db";
Expand Down Expand Up @@ -71,6 +72,7 @@ import {
resetBackoffs,
updateErrorStatus,
saveRouterDailyTVL,
saveRouterLiquidityEvents,
updateSlippage,
markRootMessagesProcessed,
updateExecuteSimulationData,
Expand Down Expand Up @@ -264,6 +266,10 @@ export type Database = {
) => Promise<void>;
markRootMessagesProcessed: (rootMessages: RootMessage[], _pool?: Pool | TxnClientForRepeatableRead) => Promise<void>;
saveRouterDailyTVL: (_tvls: RouterDailyTVL[], _pool?: Pool | TxnClientForRepeatableRead) => Promise<void>;
saveRouterLiquidityEvents: (
_events: RouterLiquidityEvent[],
_pool?: Pool | TxnClientForRepeatableRead,
) => Promise<void>;
updateSlippage: (_slippageUpdates: SlippageUpdate[], _pool?: Pool | TxnClientForRepeatableRead) => Promise<void>;
updateExecuteSimulationData: (
transferId: string,
Expand Down Expand Up @@ -352,6 +358,7 @@ export const getDatabase = async (databaseUrl: string, logger: Logger): Promise<
saveStableSwapPoolEvent,
markRootMessagesProcessed,
saveRouterDailyTVL,
saveRouterLiquidityEvents,
updateSlippage,
updateExecuteSimulationData,
getPendingTransfersByMessageStatus,
Expand Down Expand Up @@ -428,6 +435,7 @@ export const getDatabaseAndPool = async (
saveStableSwapPoolEvent,
markRootMessagesProcessed,
saveRouterDailyTVL,
saveRouterLiquidityEvents,
updateSlippage,
updateExecuteSimulationData,
getPendingTransfersByMessageStatus,
Expand Down
Loading

0 comments on commit ab4251b

Please sign in to comment.