From fc964df8153b033cce402778cd8f836c2dfa2b2e Mon Sep 17 00:00:00 2001 From: Mikko Ohtamaa Date: Thu, 17 Oct 2024 16:13:09 +0200 Subject: [PATCH] Add forward_fill_until to deal with gaps of rarely traded live pairs (#179) - Add: `forward_fill(forward_fill_until)` - The default behavior is to forward fill gaps between first and last candle - However the last candle might not be updated if we load live sparse data and there has been no trades (no candles) - Force the forward fill to go until a certain timestamp --- CHANGELOG.md | 5 +- tests/test_candle_data.py | 82 ++++++++++++++++ tradingstrategy/candle.py | 10 +- tradingstrategy/utils/forward_fill.py | 136 ++++++++++++++++++++++---- tradingstrategy/utils/wrangle.py | 11 +++ 5 files changed, 218 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 204c1b18..f3e8463f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ # Current -- Add: `Client.fetch_top_pairs()` -- +- Add: `Client.fetch_top_pairs()` - create a helper function to create always expanding trading universe for external signal providers +- Add: `forward_fill(forward_fill_until)`. The default behavior is to forward fill gaps between first and last candle. However the last candle might not be updated if we load live sparse data and there has been no trades (no candles). Force the forward fill to go until a certain timestamp. + # 0.24 - Dependencies: Upgrade to Pandas 2.x. [NumPy 2.x is still incompatible](https://stackoverflow.com/questions/78634235/numpy-dtype-size-changed-may-indicate-binary-incompatibility-expected-96-from). diff --git a/tests/test_candle_data.py b/tests/test_candle_data.py index 8e5c62bc..1aaba00b 100644 --- a/tests/test_candle_data.py +++ b/tests/test_candle_data.py @@ -197,3 +197,85 @@ def test_forward_fill_too_early_multiple_pairs(): pair=1, when=pd.Timestamp("2019-12-31"), tolerance=pd.Timedelta(7, "d")) + + +def test_forward_fill_until_single_pair(): + """Forward fill data missing data for a single pair, until a certain date.""" + + data = [ + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-01"), 100.10), + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-02"), 100.50), + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-03"), 101.10), + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-09"), 101.80), + ] + + df = pd.DataFrame(data, columns=Candle.DATAFRAME_FIELDS) + df = df.set_index("timestamp", drop=False) + + assert len(df) == 4 + + # Forward fill until the start of the next month + candles = forward_fill( + df, + TimeBucket.d1.to_frequency(), + forward_fill_until=pd.Timestamp("2020-02-01"), + ) + + assert len(candles) == 32 + + last_entry = candles.iloc[-1] + assert last_entry.open == pytest.approx(101.80) + assert last_entry.high == pytest.approx(101.80) + assert last_entry.low == pytest.approx(101.80) + assert last_entry.close == pytest.approx(101.80) + assert last_entry.volume == 0 + assert last_entry.timestamp == pd.Timestamp("2020-02-01") + + +def test_forward_fill_until_multi_pair(): + """Forward fill data missing data for multiple pairs, until a certain date.""" + + data = [ + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-01"), 100.10, volume=1000), + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-02"), 100.50, volume=1000), + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-03"), 101.10, volume=1000), + Candle.generate_synthetic_sample(1, pd.Timestamp("2020-01-09"), 101.80, volume=2000), + + Candle.generate_synthetic_sample(2, pd.Timestamp("2020-01-01"), 2.5, volume=50), + Candle.generate_synthetic_sample(2, pd.Timestamp("2020-01-03"), 2.2, volume=50), + Candle.generate_synthetic_sample(2, pd.Timestamp("2020-01-05"), 2.1, volume=50), + Candle.generate_synthetic_sample(2, pd.Timestamp("2020-01-18"), 3.8, volume=100), + + ] + + df = pd.DataFrame(data, columns=Candle.DATAFRAME_FIELDS) + df = df.set_index("timestamp", drop=False) + grouped = df.groupby("pair_id") + + # Forward fill until the start of the next month + candles_grouped = forward_fill( + grouped, + TimeBucket.d1.to_frequency(), + forward_fill_until=pd.Timestamp("2020-02-01"), + columns=("open", "high", "low", "close", "volume"), + ) + + candles = candles_grouped.get_group(1) + assert len(candles) == 32 + last_entry = candles.iloc[-1] + assert last_entry.open == pytest.approx(101.80) + assert last_entry.high == pytest.approx(101.80) + assert last_entry.low == pytest.approx(101.80) + assert last_entry.close == pytest.approx(101.80) + assert last_entry.volume == 0 + assert last_entry.timestamp == pd.Timestamp("2020-02-01") + + candles = candles_grouped.get_group(2) + assert len(candles) == 32 + last_entry = candles.iloc[-1] + assert last_entry.open == pytest.approx(3.8) + assert last_entry.high == pytest.approx(3.8) + assert last_entry.low == pytest.approx(3.8) + assert last_entry.close == pytest.approx(3.8) + assert last_entry.volume == 0 + assert last_entry.timestamp == pd.Timestamp("2020-02-01") diff --git a/tradingstrategy/candle.py b/tradingstrategy/candle.py index 8dab6e9a..3c89c2cf 100644 --- a/tradingstrategy/candle.py +++ b/tradingstrategy/candle.py @@ -222,9 +222,11 @@ def to_pyarrow_schema(cls, small_candles=False) -> pa.Schema: @staticmethod def generate_synthetic_sample( - pair_id: int, - timestamp: pd.Timestamp, - price: float) -> dict: + pair_id: int, + timestamp: pd.Timestamp, + price: float, + volume: float | None = None, + ) -> dict: """Generate a candle dataframe. Used in testing when manually fiddled data is needed. @@ -250,7 +252,7 @@ def generate_synthetic_sample( "avg": 0, "start_block": 0, "end_block": 0, - "volume": 0, + "volume": 0 if volume is None else volume, "buy_volume": 0, "sell_volume": 0, } diff --git a/tradingstrategy/utils/forward_fill.py b/tradingstrategy/utils/forward_fill.py index aa433d64..cafa68fd 100644 --- a/tradingstrategy/utils/forward_fill.py +++ b/tradingstrategy/utils/forward_fill.py @@ -22,11 +22,73 @@ from pandas.core.groupby import DataFrameGroupBy +def generate_future_filler_data( + last_valid_row: pd.Series, + timestamp: pd.Timestamp, + columns: Collection[str], +): + """Create a new placeholder OHLCV entry based on the last valid entry.""" + new_row = {} + last_close = last_valid_row["close"] + + for col in columns: + match col: + case "open" | "high" | "low" | "close": + new_row[col] = last_close + case "volume": + new_row[col] = 0 + case "timestamp": + new_row[col] = timestamp + case _: + raise NotImplementedError(f"Unsupported column {col}") + + return new_row + + +def fill_future_gap( + df, + timestamp: pd.Timestamp, + columns: Collection[str], +): + """Add a virtual OHLCV value at the end of the pair OHLCV data series if there is no real value.""" + + assert isinstance(df, pd.DataFrame) + assert isinstance(df.index, pd.DatetimeIndex), f"Expected DatetimeIndex index, got {type(df.index)}" + + if timestamp not in df.index: + # Get the latest valid entry before the timestamp + last_valid_ts = df.index[-1] + last_valid_entry = df.loc[last_valid_ts] + data = generate_future_filler_data( + last_valid_entry, timestamp, columns + ) + # Create a new row with the timestamp and the last valid entry's values] + df.loc[timestamp] = data + + return df + + +def fill_future_gap_multi_pair( + grouped_df, + timestamp: pd.Timestamp, + columns: Collection[str], +): + assert isinstance(grouped_df, DataFrameGroupBy) + + def _apply(df): + df = fill_future_gap(df, timestamp, columns) + return df + + fixed = grouped_df.apply(_apply) + return fixed.reset_index().set_index("timestamp").groupby("pair_id") + + def forward_fill( - df: pd.DataFrame | DataFrameGroupBy, + single_or_multipair_data: pd.DataFrame | DataFrameGroupBy, freq: pd.DateOffset | str, columns: Collection[str] = ("open", "high", "low", "close", "volume", "timestamp"), drop_other_columns=True, + forward_fill_until: pd.Timestamp | None = None, ) -> pd.DataFrame: """Forward-fill OHLCV data for multiple trading pairs. @@ -106,7 +168,7 @@ def forward_fill( flattened_df.to_parquet(fpath) print(f"Wrote {fpath} {os.path.getsize(fpath):,} bytes") - :param df: + :param single_or_multipair_data: Candle data for single or multiple trading pairs - GroupBy DataFrame containing candle data for multiple trading pairs @@ -138,28 +200,55 @@ def forward_fill( columns like `start_block` and `end_block`. It's unlikely we are going to need forward-filled data in these columns. + :param forward_fill_until: + The timestamp which we know the data is valid for. + + If there are price gaps at rarely traded pairs at the end of the (live) OHLCV series, + we will forward fill the data until this timestamp. + + If not given forward fills until the last trade of the pair. + + The timestamp must match the index timestamp frequency . + :return: DataFrame where each timestamp has a value set for columns. + + For multi pair data if input is `DataFrameGroupBy` then a similar `DataFrameGroupBy` is + returned. """ - assert isinstance(df, (pd.DataFrame, DataFrameGroupBy)) + assert isinstance(single_or_multipair_data, (pd.DataFrame, DataFrameGroupBy)) assert isinstance(freq, (pd.DateOffset, str)), f"Expected pd.DateOffset, got: {freq}" - source = df - - grouped = isinstance(df, DataFrameGroupBy) + original = single_or_multipair_data + grouped = isinstance(single_or_multipair_data, DataFrameGroupBy) # https://www.statology.org/pandas-drop-all-columns-except/ if drop_other_columns: - df = df[list(columns)] + single_or_multipair_data = single_or_multipair_data[list(columns)] + + # Set the end marker if we know when the data should end + if forward_fill_until is not None: + assert isinstance(forward_fill_until, pd.Timestamp), f"Got: {type(forward_fill_until)}" + + if grouped: + single_or_multipair_data = fill_future_gap_multi_pair(single_or_multipair_data, forward_fill_until, columns) + else: + single_or_multipair_data = fill_future_gap(single_or_multipair_data, forward_fill_until, columns) # Fill missing timestamps with NaN # https://stackoverflow.com/a/45620300/315168 - df = df.resample(freq).mean(numeric_only=True) + # This will also ungroup the data + single_or_multipair_data = single_or_multipair_data.resample(freq).mean(numeric_only=True) + + if grouped: + # resample() will set pair_id to NaN + # fix here + single_or_multipair_data["pair_id"] = single_or_multipair_data.index.get_level_values('pair_id') columns = set(columns) - # We always need to ffill close first + # We always need to ffill close column first for column in ("close", "open", "high", "low", "volume", "timestamp"): if column in columns: columns.remove(column) @@ -167,22 +256,28 @@ def forward_fill( match column: case "volume": # Sparse volume is 0 - df["volume"] = df["volume"].fillna(0.0) + single_or_multipair_data["volume"] = single_or_multipair_data["volume"].fillna(0.0) case "close": # Sparse close is the previous close - df["close"] = df["close"].fillna(method="ffill") + single_or_multipair_data["close"] = single_or_multipair_data["close"].fillna(method="ffill") case "open" | "high" | "low": # Fill open, high, low from the ffill'ed close. - df[column] = df[column].fillna(df["close"]) + single_or_multipair_data[column] = single_or_multipair_data[column].fillna(single_or_multipair_data["close"]) case "timestamp": - if isinstance(df.index, pd.MultiIndex): - if "timestamp" in source.obj.columns: + + if grouped: + check_columns = original.obj.columns + else: + check_columns = original.columns + + if isinstance(single_or_multipair_data.index, pd.MultiIndex): + if "timestamp" in check_columns: # pair_id, timestamp index - df["timestamp"] = df.index.get_level_values(1) - elif isinstance(df.index, pd.DatetimeIndex): - if "timestamp" in source.columns: + single_or_multipair_data["timestamp"] = single_or_multipair_data.index.get_level_values(1) + elif isinstance(single_or_multipair_data.index, pd.DatetimeIndex): + if "timestamp" in check_columns: # timestamp index - df["timestamp"] = df.index + single_or_multipair_data["timestamp"] = single_or_multipair_data.index else: raise NotImplementedError(f"Unknown column: {column} - forward_fill() does not know how to handle") @@ -192,6 +287,7 @@ def forward_fill( # Regroup by pair, as this was the original data format if grouped: - df = df.groupby("pair_id") + single_or_multipair_data["timestamp"] = single_or_multipair_data.index.get_level_values('timestamp') + single_or_multipair_data = single_or_multipair_data.groupby(level="pair_id") - return df + return single_or_multipair_data diff --git a/tradingstrategy/utils/wrangle.py b/tradingstrategy/utils/wrangle.py index 3d26c79c..c0d8be1b 100644 --- a/tradingstrategy/utils/wrangle.py +++ b/tradingstrategy/utils/wrangle.py @@ -264,6 +264,7 @@ def fix_dex_price_data( min_max_price: tuple | None = DEFAULT_MIN_MAX_RANGE, remove_candles_with_zero: bool = True, pair_id_column="pair_id", + forward_fill_until: datetime.datetime | None = None, ) -> pd.DataFrame: """Wrangle DEX price data for all known issues. @@ -355,6 +356,14 @@ def fix_dex_price_data( Forward-filling data will delete any unknown columns, see :py:func:`tradingstrategy.utils.forward_fill.forward_fill` details. + :param forward_fill_until: + The timestamp which we know the data is valid for. + + If there are price gaps at rarely traded pairs at the end of the (live) OHLCV series, + we will forward fill the data until this timestamp. + + If not given forward fills until the last trade of the pair. + :return: Fixed data frame. @@ -395,6 +404,8 @@ def fix_dex_price_data( # Need to group here # TODO: Make this smarter, but how? Read index data in groupby instance? + assert "timestamp" in raw_df.columns, f"Got {raw_df.columns}" + regrouped = raw_df.set_index("timestamp", drop=False).groupby(pair_id_column, group_keys=True) logger.info("Fixing prices having bad open/close values between timeframes: %s", fix_inbetween_threshold)