diff --git a/balanced_backend/alembic/versions/588a77c865e3_v0_7_1_add_head_to_series.py b/balanced_backend/alembic/versions/588a77c865e3_v0_7_1_add_head_to_series.py new file mode 100644 index 0000000..cce6c27 --- /dev/null +++ b/balanced_backend/alembic/versions/588a77c865e3_v0_7_1_add_head_to_series.py @@ -0,0 +1,59 @@ +"""v0.7.1 add head to series + +Revision ID: 588a77c865e3 +Revises: 026c4de9d667 +Create Date: 2024-10-27 14:30:33.557685 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel + + +# revision identifiers, used by Alembic. +revision = '588a77c865e3' +down_revision = '026c4de9d667' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_index(op.f('ix_contract_method_volumes_end_date'), 'contract_method_volumes', ['end_date'], unique=False) + op.create_index(op.f('ix_contract_method_volumes_start_date'), 'contract_method_volumes', ['start_date'], unique=False) + op.add_column('pool_series_15_min', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('pool_series_1_day', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('pool_series_1_hour', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('pool_series_1_month', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('pool_series_1_week', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('pool_series_4_hour', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('pool_series_5_min', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('token_series_15_min', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('token_series_1_day', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('token_series_1_hour', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('token_series_1_month', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('token_series_1_week', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('token_series_4_hour', sa.Column('head', sa.Boolean(), nullable=False)) + op.add_column('token_series_5_min', sa.Column('head', sa.Boolean(), nullable=False)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('token_series_5_min', 'head') + op.drop_column('token_series_4_hour', 'head') + op.drop_column('token_series_1_week', 'head') + op.drop_column('token_series_1_month', 'head') + op.drop_column('token_series_1_hour', 'head') + op.drop_column('token_series_1_day', 'head') + op.drop_column('token_series_15_min', 'head') + op.drop_column('pool_series_5_min', 'head') + op.drop_column('pool_series_4_hour', 'head') + op.drop_column('pool_series_1_week', 'head') + op.drop_column('pool_series_1_month', 'head') + op.drop_column('pool_series_1_hour', 'head') + op.drop_column('pool_series_1_day', 'head') + op.drop_column('pool_series_15_min', 'head') + op.drop_index(op.f('ix_contract_method_volumes_start_date'), table_name='contract_method_volumes') + op.drop_index(op.f('ix_contract_method_volumes_end_date'), table_name='contract_method_volumes') + # ### end Alembic commands ### diff --git a/balanced_backend/cron/pool_series.py b/balanced_backend/cron/pool_series.py index cb1ae09..2e802d4 100644 --- a/balanced_backend/cron/pool_series.py +++ b/balanced_backend/cron/pool_series.py @@ -1,5 +1,7 @@ from datetime import datetime from typing import TYPE_CHECKING + +from sqlalchemy import delete from sqlmodel import select from loguru import logger from pydantic import BaseModel @@ -7,6 +9,7 @@ from balanced_backend.crud.dex import get_dex_swaps, get_last_swap_time from balanced_backend.crud.series import get_pool_series_table_by_timestamp +from balanced_backend.tables.dex import DexSwap from balanced_backend.tables.series import PoolSeriesTableType from balanced_backend.tables.utils import get_pool_series_table from balanced_backend.config import settings @@ -26,32 +29,34 @@ class SeriesTable(BaseModel): delta: int pool_ids: set[int] = set() pool_close: dict[int, float] = {} + skip_modulo: int = 1 # Do run_number % skip_modulo == 0 for skipping long updates TIME_SERIES_TABLES: list[SeriesTable] = [ - SeriesTable( - table_suffix="5Min", - delta=60 * 5, - ), - SeriesTable( - table_suffix="15Min", - delta=60 * 15, - ), - SeriesTable( - table_suffix="1Hour", - delta=60 * 60, - ), - SeriesTable( - table_suffix="4Hour", - delta=60 * 60 * 4, - ), - SeriesTable( - table_suffix="1Day", - delta=60 * 60 * 24, - ), + # SeriesTable( + # table_suffix="5Min", + # delta=60 * 5, + # ), + # SeriesTable( + # table_suffix="15Min", + # delta=60 * 15, + # ), + # SeriesTable( + # table_suffix="1Hour", + # delta=60 * 60, + # ), + # SeriesTable( + # table_suffix="4Hour", + # delta=60 * 60 * 4, + # ), + # SeriesTable( + # table_suffix="1Day", + # delta=60 * 60 * 24, + # ), SeriesTable( table_suffix="1Week", delta=60 * 60 * 24 * 7, + skip_modulo=10, ), # SeriesTable( # table_suffix="1Month", @@ -72,8 +77,26 @@ def get_last_volume_time(session: 'Session', table: PoolSeriesTableType) -> int: return volume_time +RUN_NUMBER_COUNT_DICT: dict[int, int] = {} + + +def _skip_time_series(pool_volume: SeriesTable) -> bool: + if pool_volume.delta not in RUN_NUMBER_COUNT_DICT: + RUN_NUMBER_COUNT_DICT[pool_volume.delta] = 0 + + RUN_NUMBER_COUNT_DICT[pool_volume.delta] += 1 + run_count = RUN_NUMBER_COUNT_DICT[pool_volume.delta] + + if not run_count - 1 % pool_volume.skip_modulo == 0: + return True + return False + + def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable): # Get the table we want to be building the series dynamically since there are many + if _skip_time_series(pool_volume): + return + Table = get_pool_series_table(table_suffix=pool_volume.table_suffix) logger.info(f"Running series {pool_volume.table_suffix}...") @@ -106,21 +129,29 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable): pool_series = [i for i in last_volume_timeseries if i.pool_id == p][0] pool_volume.pool_close[p] = pool_series.open + # TODO: Rm this tmp + volume_time += pool_volume.delta + + current_time = datetime.now().timestamp() + head = False while volume_time < last_swap_time + pool_volume.delta: + + if volume_time > current_time: + head = True + swaps = get_dex_swaps( session=session, start_time=volume_time, end_time=volume_time + pool_volume.delta, columns=[ - "pool_id", - "ending_price_decimal", - "base_token_value_decimal", - "quote_token_value_decimal", - "lp_fees_decimal", - "baln_fees_decimal", + DexSwap.pool_id, + DexSwap.ending_price_decimal, + DexSwap.base_token_value_decimal, + DexSwap.quote_token_value_decimal, + DexSwap.lp_fees_decimal, + DexSwap.baln_fees_decimal, ] ) - num_swaps = len(swaps) # Need extra call here because there may be no swaps in a period. This is needed # because we need to enrich this series with pool stats data to later be able to @@ -153,6 +184,15 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable): ) for p in pool_volume.pool_ids: + # if settings.VERBOSE: + logger.info( + f"Processing pool: {p} in segment: {pool_volume.table_suffix}...") + + if head: + query = delete(Table).where(Table.head).where(Table.pool_id == p) + session.execute(query) + volume_time = datetime.now().timestamp() + total_supply = [i for i in total_supplies if i['pool_id'] == p][0][ 'total_supply'] @@ -191,8 +231,7 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable): i.baln_fees_decimal for i in pool_swaps if i.from_token == get_cached_pool_stats(pool_id=p)['base_address'] ]) - if volume_time > datetime.now().timestamp(): - volume_time = datetime.now().timestamp() + t = Table( chain_id=settings.CHAIN_ID, pool_id=p, @@ -209,6 +248,7 @@ def get_time_series_for_interval(session: 'Session', pool_volume: SeriesTable): base_lp_fees=base_lp_fees, base_baln_fees=bases_baln_fees, total_supply=total_supply, + head=head, ) session.merge(t) diff --git a/balanced_backend/cron/token_price.py b/balanced_backend/cron/token_price.py index 7f9d461..96dbbc1 100644 --- a/balanced_backend/cron/token_price.py +++ b/balanced_backend/cron/token_price.py @@ -39,7 +39,7 @@ def set_previous_prices( icx_price = get_band_price(symbol='ICX', height=block_height) tokens = get_tokens(session=session) - token_prices = [TokenPrice(**i.dict()) for i in tokens] + token_prices = [TokenPrice(**i.model_dump()) for i in tokens] token_prices = get_token_prices( pools=pool_prices, tokens=token_prices, @@ -58,14 +58,18 @@ def run_token_prices(session: 'Session'): tokens = get_tokens(session=session) # Current - pool_prices = [PoolPrice(**i.dict()) for i in pools] - token_prices = [TokenPrice(**i.dict()) for i in tokens] + pool_prices = [PoolPrice(**i.model_dump()) for i in pools] + token_prices = [TokenPrice(**i.model_dump()) for i in tokens] token_prices = get_token_prices(pools=pool_prices, tokens=token_prices) for t in tokens: t.price = [i.price for i in token_prices if i.address == t.address][0] t.path = [i.path for i in token_prices if i.address == t.address][0] + + if t.price is None: + continue + session.merge(t) - session.commit() + session.commit() for period in ['24h', '7d', '30d']: set_previous_prices(session=session, pools=pools, period=period)