Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to FastParquet as the default Parquet file reading backend #123

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .pymon
Binary file not shown.
1,421 changes: 824 additions & 597 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ packages = [
python = ">=3.10,<3.11"
dataclasses-json = "^0.5.4"
pandas = "^1.3.5"
pyarrow = "10.0.1"
tqdm = "^4.61.2"
plotly = "^5.1.0"
jsonlines = "^3.1.0"
Expand All @@ -37,6 +36,7 @@ trading-strategy-backtrader = {version="^0.1", optional = true}
coloredlogs = {version = "^15.0.1", optional = true}
filelock = "^3.12.4"
pytest-xdist = {version="^3.3.1", optional = true}
fastparquet = "^2023.8.0"

[tool.poetry.dev-dependencies]
pytest = "7.1.3"
Expand Down
6 changes: 3 additions & 3 deletions tests/test_candle_universe.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
"""Candle universe loading tests."""

import datetime
import sys

import pandas
import pandas as pd
import pytest
from tradingstrategy.candle import GroupedCandleUniverse, is_candle_green, is_candle_red
from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.pair import LegacyPairUniverse, PandasPairUniverse
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.reader import read_parquet
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.transport.jsonl import JSONLMaxResponseSizeExceeded
from tradingstrategy.utils.groupeduniverse import resample_candles


def test_grouped_candles(persistent_test_client: Client):
def test_grouped_candles(persistent_test_client: Client, logger):
"""Group downloaded candles by a trading pair."""

client = persistent_test_client
Expand All @@ -33,7 +34,6 @@ def test_grouped_candles(persistent_test_client: Client):
assert sushi_usdt.get_trading_pair_page_url() == "https://tradingstrategy.ai/trading-view/ethereum/sushi/sushi-usdt"
sushi_usdt_candles = candle_universe.get_candles_by_pair(sushi_usdt.pair_id)

# Get max and min weekly candle of SUSHI-USDT on SushiSwap
high_price = sushi_usdt_candles["high"]
max_price = high_price.max()

Expand Down
34 changes: 2 additions & 32 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.client import Client
from tradingstrategy.chain import ChainId
from tradingstrategy.pair import LegacyPairUniverse


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,45 +62,17 @@ def test_client_download_exchange_universe(client: Client, cache_path: str):
assert exchange.exchange_slug == "shiba-swap"


def test_client_download_pair_universe(client: Client, cache_path: str):
"""Download pair mapping data"""

logger.info("Starting test_client_download_pair_universe")

exchange_universe = client.fetch_exchange_universe()

pairs = client.fetch_pair_universe()
# Check we cached the file correctly
assert os.path.exists(f"{cache_path}/pair-universe.parquet")
# Check universe has data
assert len(pairs) > 50_000

pair_universe = LegacyPairUniverse.create_from_pyarrow_table(pairs)

exchange = exchange_universe.get_by_chain_and_slug(ChainId.ethereum, "uniswap-v2")
assert exchange, "Uniswap v2 not found"

# Uniswap v2 has more than 2k eligible trading pairs
pairs = list(pair_universe.get_all_pairs_on_exchange(exchange.exchange_id))
assert len(pairs) > 2000

pair = pair_universe.get_pair_by_ticker_by_exchange(exchange.exchange_id, "WETH", "DAI")
assert pair, "WETH-DAI not found"
assert pair.base_token_symbol == "WETH"
assert pair.quote_token_symbol == "DAI"


def test_client_download_all_pairs(client: Client, cache_path: str):
"""Download all candles for a specific candle width."""
df = client.fetch_all_candles(TimeBucket.d30)
df = client.fetch_all_candles(TimeBucket.d30).to_pandas()
# Check we cached the file correctly
assert os.path.exists(f"{cache_path}/candles-30d.parquet")
assert len(df) > 100


def test_client_download_all_liquidity_samples(client: Client, cache_path: str):
"""Download all liquidity samples for a specific candle width."""
df = client.fetch_all_liquidity_samples(TimeBucket.d30)
df = client.fetch_all_liquidity_samples(TimeBucket.d30).to_pandas()
# Check we cached the file correctly
assert os.path.exists(f"{cache_path}/liquidity-samples-30d.parquet")
assert len(df) > 100
Expand Down
2 changes: 1 addition & 1 deletion tests/test_lending.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def test_client_download_lending_reserves_all_time(client: Client, cache_path: str):
"""Download all available data on lending protocol reserves."""
df = client.fetch_lending_reserves_all_time()
df = client.fetch_lending_reserves_all_time().to_pandas()
# Check we cached the file correctly
assert Path(f"{cache_path}/lending-reserves-all.parquet").exists()
assert len(df) > 100
Expand Down
24 changes: 1 addition & 23 deletions tests/test_liquidity_universe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.liquidity import GroupedLiquidityUniverse, LiquidityDataUnavailable
from tradingstrategy.pair import DEXPair, LegacyPairUniverse, PandasPairUniverse
from tradingstrategy.pair import DEXPair, PandasPairUniverse
from tradingstrategy.timebucket import TimeBucket


Expand Down Expand Up @@ -98,28 +98,6 @@ def test_combined_candles_and_liquidity(persistent_test_client: Client):
assert abs((liq_pair_count - candle_pair_count) / liq_pair_count) < 0.15


def test_liquidity_index_is_datetime(persistent_test_client: Client):
"""Any liquidity samples use datetime index by default.

Avoid raw running counter indexes. This makes manipulating data much easier.
"""
client = persistent_test_client

exchange_universe = client.fetch_exchange_universe()
exchange = exchange_universe.get_by_chain_and_slug(ChainId.ethereum, "uniswap-v2")
pairs = client.fetch_pair_universe()
pair_universe = LegacyPairUniverse.create_from_pyarrow_table(pairs)
pair = pair_universe.get_pair_by_ticker_by_exchange(exchange.exchange_id, "WETH", "DAI")

exchange = exchange_universe.get_by_chain_and_slug(ChainId.ethereum, "uniswap-v2")
assert exchange, "Uniswap v2 not found"

raw_liquidity_samples = client.fetch_all_liquidity_samples(TimeBucket.d7).to_pandas()
liquidity_universe = GroupedLiquidityUniverse(raw_liquidity_samples)
liq1 = liquidity_universe.get_liquidity_samples_by_pair(pair.pair_id)
assert isinstance(liq1.index, pd.DatetimeIndex)


def test_merge_liquidity_samples(persistent_test_client: Client):
"""Merging two liquidity graphs using Pandas should work.

Expand Down
6 changes: 3 additions & 3 deletions tests/test_resampled_liquidity_universe.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Optimised liquidity universe tests."""
import pandas as pd
import pytest
from tradingstrategy.candle import GroupedCandleUniverse

from tradingstrategy.chain import ChainId
from tradingstrategy.client import Client
from tradingstrategy.liquidity import GroupedLiquidityUniverse, LiquidityDataUnavailable, ResampledLiquidityUniverse
from tradingstrategy.pair import DEXPair, LegacyPairUniverse, PandasPairUniverse
from tradingstrategy.liquidity import ResampledLiquidityUniverse
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.timebucket import TimeBucket


Expand Down
32 changes: 32 additions & 0 deletions testscript.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Compare pyarrow and fastparquet files.

- Use the sample Parquet file (54MB): https://drive.google.com/file/d/1LtiL-n50cNx1JMBC0QE-Ztvf8ENSb-OE/view?usp=sharing
- save as "/tmp/candles-30d.parquet"

- We found out that rows and the contents of pair_id column is corrupted
(among other corruption if you do row by row comparison, but this is easy to spot)]

- One potential cause is usage of row groups in the file

- The file has been written with pyarrow / Python


"""
from pyarrow import parquet as pq # pyarrow 10
from fastparquet import ParquetFile # fastparquet 2023.1.0

path = "/tmp/candles-30d.parquet"

pf1 = ParquetFile(path)
pf2 = pq.read_table(path)

df1 = pf1.to_pandas()
df2 = pf2.to_pandas()

print("Rows ", len(df2))
assert len(df1) == len(df2) # Passes, looks like row count matches

# fastparquet only sees 1280 pair_ids out of 150903
print("Unique pairs fastparquet", len(df1.pair_id.unique()))
print("Unique pairs pyarrow", len(df2.pair_id.unique()))
assert len(df1.pair_id.unique()) == len(df2.pair_id.unique())
17 changes: 9 additions & 8 deletions tradingstrategy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@
warnings.simplefilter("ignore")
import dataclasses_json # Trigger marsmallow import to supress the warning

import pyarrow
import pyarrow as pa
from pyarrow import Table

from tradingstrategy.chain import ChainId
from tradingstrategy.environment.base import Environment, download_with_progress_plain
from tradingstrategy.environment.config import Configuration
Expand Down Expand Up @@ -163,12 +159,17 @@ def clear_caches(self, filename: Optional[Union[str, Path]] = None):
self.transport.purge_cache(filename)

@_retry_corrupted_parquet_fetch
def fetch_pair_universe(self) -> pa.Table:
def fetch_pair_universe(self) -> "pyarrow.Table | fastparquet.ParquetFile":
"""Fetch pair universe from local cache or the candle server.

The compressed file size is around 5 megabytes.

If the download seems to be corrupted, it will be attempted 3 times.


:return:
TODO: Change the return type to DataFrame.

"""
path = self.transport.fetch_pair_universe()
return read_parquet(path)
Expand All @@ -184,7 +185,7 @@ def fetch_exchange_universe(self) -> ExchangeUniverse:
raise RuntimeError(f"Could not read JSON file {path}") from e

@_retry_corrupted_parquet_fetch
def fetch_all_candles(self, bucket: TimeBucket) -> pyarrow.Table:
def fetch_all_candles(self, bucket: TimeBucket) -> "pyarrow.Table | fastparquet.ParquetFile":
"""Get cached blob of candle data of a certain candle width.

The returned data can be between several hundreds of megabytes to several gigabytes
Expand Down Expand Up @@ -457,7 +458,7 @@ def fetch_lending_candles_for_universe(
return result

@_retry_corrupted_parquet_fetch
def fetch_all_liquidity_samples(self, bucket: TimeBucket) -> Table:
def fetch_all_liquidity_samples(self, bucket: TimeBucket) -> "pyarrow.Table | fastparquet.ParquetFile":
"""Get cached blob of liquidity events of a certain time window.

The returned data can be between several hundreds of megabytes to several gigabytes
Expand All @@ -484,7 +485,7 @@ def fetch_lending_reserve_universe(self) -> LendingReserveUniverse:
raise RuntimeError(f"Could not read JSON file {path}") from e

@_retry_corrupted_parquet_fetch
def fetch_lending_reserves_all_time(self) -> Table:
def fetch_lending_reserves_all_time(self) -> "pyarrow.Table | fastparquet.ParquetFile":
"""Get a cached blob of lending protocol reserve events and precomupted stats.

The returned data can be between several hundreds of megabytes to several
Expand Down
Loading