Skip to content

Commit

Permalink
fix: add head attribute to series table and delete all head entries b…
Browse files Browse the repository at this point in the history
…efore adding a new one
  • Loading branch information
robcxyz committed Oct 28, 2024
1 parent e3fc2cc commit 53fe55c
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""v0.7.1 add head to series
Revision ID: 588a77c865e3
Revises: 026c4de9d667
Create Date: 2024-10-27 14:30:33.557685
"""
from alembic import op
import sqlalchemy as sa
import sqlmodel


# revision identifiers, used by Alembic.
revision = '588a77c865e3'
down_revision = '026c4de9d667'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(op.f('ix_contract_method_volumes_end_date'), 'contract_method_volumes', ['end_date'], unique=False)
op.create_index(op.f('ix_contract_method_volumes_start_date'), 'contract_method_volumes', ['start_date'], unique=False)
op.add_column('pool_series_15_min', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('pool_series_1_day', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('pool_series_1_hour', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('pool_series_1_month', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('pool_series_1_week', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('pool_series_4_hour', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('pool_series_5_min', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('token_series_15_min', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('token_series_1_day', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('token_series_1_hour', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('token_series_1_month', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('token_series_1_week', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('token_series_4_hour', sa.Column('head', sa.Boolean(), nullable=False))
op.add_column('token_series_5_min', sa.Column('head', sa.Boolean(), nullable=False))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('token_series_5_min', 'head')
op.drop_column('token_series_4_hour', 'head')
op.drop_column('token_series_1_week', 'head')
op.drop_column('token_series_1_month', 'head')
op.drop_column('token_series_1_hour', 'head')
op.drop_column('token_series_1_day', 'head')
op.drop_column('token_series_15_min', 'head')
op.drop_column('pool_series_5_min', 'head')
op.drop_column('pool_series_4_hour', 'head')
op.drop_column('pool_series_1_week', 'head')
op.drop_column('pool_series_1_month', 'head')
op.drop_column('pool_series_1_hour', 'head')
op.drop_column('pool_series_1_day', 'head')
op.drop_column('pool_series_15_min', 'head')
op.drop_index(op.f('ix_contract_method_volumes_start_date'), table_name='contract_method_volumes')
op.drop_index(op.f('ix_contract_method_volumes_end_date'), table_name='contract_method_volumes')
# ### end Alembic commands ###
98 changes: 69 additions & 29 deletions balanced_backend/cron/pool_series.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from datetime import datetime
from typing import TYPE_CHECKING

from sqlalchemy import delete
from sqlmodel import select
from loguru import logger
from pydantic import BaseModel
import asyncio

from balanced_backend.crud.dex import get_dex_swaps, get_last_swap_time
from balanced_backend.crud.series import get_pool_series_table_by_timestamp
from balanced_backend.tables.dex import DexSwap
from balanced_backend.tables.series import PoolSeriesTableType
from balanced_backend.tables.utils import get_pool_series_table
from balanced_backend.config import settings
Expand All @@ -26,32 +29,34 @@ class SeriesTable(BaseModel):
delta: int
pool_ids: set[int] = set()
pool_close: dict[int, float] = {}
skip_modulo: int = 1 # Do run_number % skip_modulo == 0 for skipping long updates


TIME_SERIES_TABLES: list[SeriesTable] = [
SeriesTable(
table_suffix="5Min",
delta=60 * 5,
),
SeriesTable(
table_suffix="15Min",
delta=60 * 15,
),
SeriesTable(
table_suffix="1Hour",
delta=60 * 60,
),
SeriesTable(
table_suffix="4Hour",
delta=60 * 60 * 4,
),
SeriesTable(
table_suffix="1Day",
delta=60 * 60 * 24,
),
# SeriesTable(
# table_suffix="5Min",
# delta=60 * 5,
# ),
# SeriesTable(
# table_suffix="15Min",
# delta=60 * 15,
# ),
# SeriesTable(
# table_suffix="1Hour",
# delta=60 * 60,
# ),
# SeriesTable(
# table_suffix="4Hour",
# delta=60 * 60 * 4,
# ),
# SeriesTable(
# table_suffix="1Day",
# delta=60 * 60 * 24,
# ),
SeriesTable(
table_suffix="1Week",
delta=60 * 60 * 24 * 7,
skip_modulo=10,
),
# SeriesTable(
# table_suffix="1Month",
Expand All @@ -72,8 +77,26 @@ def get_last_volume_time(session: 'Session', table: PoolSeriesTableType) -> int:
return volume_time


RUN_NUMBER_COUNT_DICT: dict[int, int] = {}


def _skip_time_series(pool_volume: SeriesTable) -> bool:
if pool_volume.delta not in RUN_NUMBER_COUNT_DICT:
RUN_NUMBER_COUNT_DICT[pool_volume.delta] = 0

RUN_NUMBER_COUNT_DICT[pool_volume.delta] += 1
run_count = RUN_NUMBER_COUNT_DICT[pool_volume.delta]

if not run_count - 1 % pool_volume.skip_modulo == 0:
return True
return False


def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable):
# Get the table we want to be building the series dynamically since there are many
if _skip_time_series(pool_volume):
return

Table = get_pool_series_table(table_suffix=pool_volume.table_suffix)

logger.info(f"Running series {pool_volume.table_suffix}...")
Expand Down Expand Up @@ -106,21 +129,29 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable):
pool_series = [i for i in last_volume_timeseries if i.pool_id == p][0]
pool_volume.pool_close[p] = pool_series.open

# TODO: Rm this tmp
volume_time += pool_volume.delta

current_time = datetime.now().timestamp()
head = False
while volume_time < last_swap_time + pool_volume.delta:

if volume_time > current_time:
head = True

swaps = get_dex_swaps(
session=session,
start_time=volume_time,
end_time=volume_time + pool_volume.delta,
columns=[
"pool_id",
"ending_price_decimal",
"base_token_value_decimal",
"quote_token_value_decimal",
"lp_fees_decimal",
"baln_fees_decimal",
DexSwap.pool_id,
DexSwap.ending_price_decimal,
DexSwap.base_token_value_decimal,
DexSwap.quote_token_value_decimal,
DexSwap.lp_fees_decimal,
DexSwap.baln_fees_decimal,
]
)
num_swaps = len(swaps)

# Need extra call here because there may be no swaps in a period. This is needed
# because we need to enrich this series with pool stats data to later be able to
Expand Down Expand Up @@ -153,6 +184,15 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable):
)

for p in pool_volume.pool_ids:
# if settings.VERBOSE:
logger.info(
f"Processing pool: {p} in segment: {pool_volume.table_suffix}...")

if head:
query = delete(Table).where(Table.head).where(Table.pool_id == p)
session.execute(query)
volume_time = datetime.now().timestamp()

total_supply = [i for i in total_supplies if i['pool_id'] == p][0][
'total_supply']

Expand Down Expand Up @@ -191,8 +231,7 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable):
i.baln_fees_decimal for i in pool_swaps
if i.from_token == get_cached_pool_stats(pool_id=p)['base_address']
])
if volume_time > datetime.now().timestamp():
volume_time = datetime.now().timestamp()

t = Table(
chain_id=settings.CHAIN_ID,
pool_id=p,
Expand All @@ -209,6 +248,7 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable):
base_lp_fees=base_lp_fees,
base_baln_fees=bases_baln_fees,
total_supply=total_supply,
head=head,
)

session.merge(t)
Expand Down
12 changes: 8 additions & 4 deletions balanced_backend/cron/token_price.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def set_previous_prices(
icx_price = get_band_price(symbol='ICX', height=block_height)

tokens = get_tokens(session=session)
token_prices = [TokenPrice(**i.dict()) for i in tokens]
token_prices = [TokenPrice(**i.model_dump()) for i in tokens]
token_prices = get_token_prices(
pools=pool_prices,
tokens=token_prices,
Expand All @@ -58,14 +58,18 @@ def run_token_prices(session: 'Session'):
tokens = get_tokens(session=session)

# Current
pool_prices = [PoolPrice(**i.dict()) for i in pools]
token_prices = [TokenPrice(**i.dict()) for i in tokens]
pool_prices = [PoolPrice(**i.model_dump()) for i in pools]
token_prices = [TokenPrice(**i.model_dump()) for i in tokens]
token_prices = get_token_prices(pools=pool_prices, tokens=token_prices)
for t in tokens:
t.price = [i.price for i in token_prices if i.address == t.address][0]
t.path = [i.path for i in token_prices if i.address == t.address][0]

if t.price is None:
continue

session.merge(t)
session.commit()
session.commit()

for period in ['24h', '7d', '30d']:
set_previous_prices(session=session, pools=pools, period=period)
Expand Down

0 comments on commit 53fe55c

Please sign in to comment.