From d9d8e9af81d5169e7fee30d7764b976642eae770 Mon Sep 17 00:00:00 2001 From: Mikko Ohtamaa Date: Tue, 8 Oct 2024 20:50:20 +0200 Subject: [PATCH] Calculate and load indicators inline (#1056) - Create `calculate_and_load_indicators_inline` which allows easy access to indicator data within the notebook --- deps/trading-strategy | 2 +- tests/backtest/test_indicator.py | 70 ++++++- .../strategy/pandas_trader/indicator.py | 171 +++++++++++++++++- tradeexecutor/utils/binance.py | 2 +- 4 files changed, 233 insertions(+), 12 deletions(-) diff --git a/deps/trading-strategy b/deps/trading-strategy index 1cad5d746..0cfdd826e 160000 --- a/deps/trading-strategy +++ b/deps/trading-strategy @@ -1 +1 @@ -Subproject commit 1cad5d746fa94a3cdcc46bc723a112d495a4dd97 +Subproject commit 0cfdd826e546c1c4dc11af7ed919b65be76f1ff4 diff --git a/tests/backtest/test_indicator.py b/tests/backtest/test_indicator.py index 8f82e0d86..e8e2f9a83 100644 --- a/tests/backtest/test_indicator.py +++ b/tests/backtest/test_indicator.py @@ -7,15 +7,14 @@ import pandas as pd import pandas_ta import pytest -import futureproof -import inspect +from pyasn1_modules.rfc3779 import id_pe_ipAddrBlocks from tradeexecutor.state.identifier import AssetIdentifier, TradingPairIdentifier from tradeexecutor.strategy.execution_context import ExecutionContext, unit_test_execution_context, unit_test_trading_execution_context from tradeexecutor.strategy.pandas_trader.indicator import ( IndicatorSet, DiskIndicatorStorage, IndicatorDefinition, IndicatorFunctionSignatureMismatch, - calculate_and_load_indicators, IndicatorKey, IndicatorSource, IndicatorDependencyResolver, - IndicatorOrderError, IndicatorCalculationFailed, MemoryIndicatorStorage, + calculate_and_load_indicators, IndicatorKey, IndicatorSource, IndicatorDependencyResolver, + IndicatorCalculationFailed, MemoryIndicatorStorage, calculate_and_load_indicators_inline, ) from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInputIndicators from tradeexecutor.strategy.parameters import StrategyParameters @@ -24,10 +23,8 @@ from tradeexecutor.testing.synthetic_ethereum_data import generate_random_ethereum_address from tradeexecutor.testing.synthetic_exchange_data import generate_exchange from tradeexecutor.testing.synthetic_price_data import generate_multi_pair_candles -from tradeexecutor.utils.python_function import hash_function from tradingstrategy.candle import GroupedCandleUniverse from tradingstrategy.chain import ChainId -from tradingstrategy.pair import HumanReadableTradingPairDescription from tradingstrategy.timebucket import TimeBucket from tradingstrategy.universe import Universe @@ -796,9 +793,20 @@ def regime( fast_sma: pd.Series = dependency_resolver.get_indicator_data("fast_sma", pair=pair, parameters={"length": length}) return close > fast_sma + def multipair(universe: TradingStrategyUniverse, dependency_resolver: IndicatorDependencyResolver) -> pd.DataFrame: + # Test multipair data resolution + series = dependency_resolver.get_indicator_data_pairs_combined("regime") + assert isinstance(series.index, pd.MultiIndex) + assert isinstance(series, pd.Series) + # Change from pd.Series to pd.DataFrame with column "value" + df = series.to_frame(name='value') + assert df.columns == ["value"] + return df + def create_indicators(parameters: StrategyParameters, indicators: IndicatorSet, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext): indicators.add("fast_sma", pandas_ta.sma, {"length": parameters.fast_sma}, order=1) indicators.add("regime", regime, {"length": parameters.fast_sma}, order=2) + indicators.add("multipair", multipair, {}, IndicatorSource.strategy_universe, order=3) class MyParameters: fast_sma = 20 @@ -818,7 +826,7 @@ class MyParameters: wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, exchange.exchange_slug, "WBTC", "USDC")) keys = list(indicator_result.keys()) - keys = sorted(keys, key=lambda k: (k.pair.internal_id, k.definition.name)) # Ensure we read set in deterministic order + keys = sorted(keys, key=lambda k: (k.pair.internal_id if k.pair else 9_999_999, k.definition.name)) # Ensure we read set in deterministic order # Check our pair x indicator matrix assert keys[0].pair== weth_usdc @@ -834,7 +842,7 @@ class MyParameters: for result in indicator_result.values(): assert not result.cached - assert isinstance(result.data, pd.Series) + assert isinstance(result.data, (pd.Series, pd.DataFrame)) assert len(result.data) > 0 # Run with higher workers count should still work since it should force single thread @@ -851,4 +859,48 @@ class MyParameters: assert logger_mock.call_count == 1 assert logger_mock.call_args[0][0] == "MemoryIndicatorStorage does not support multiprocessing, setting max_workers and max_readers to 1" - \ No newline at end of file + + +def test_calculate_indicators_inline(strategy_universe): + """Test calculate_and_load_indicators_inline() """ + + # Some example parameters we use to calculate indicators. + # E.g. RSI length + class Parameters: + rsi_length = 20 + sma_long = 200 + sma_short = 12 + + # Create indicators. + # Map technical indicator functions to their parameters, like length. + # You can also use hardcoded values, but we recommend passing in parameter dict, + # as this allows later to reuse the code for optimising/grid searches, etc. + def create_indicators( + timestamp, + parameters, + strategy_universe, + execution_context, + ) -> IndicatorSet: + indicator_set = IndicatorSet() + indicator_set.add("rsi", pandas_ta.rsi, {"length": parameters.rsi_length}) + indicator_set.add("sma_long", pandas_ta.sma, {"length": parameters.sma_long}) + indicator_set.add("sma_short", pandas_ta.sma, {"length": parameters.sma_short}) + return indicator_set + + # Calculate indicators - will spawn multiple worker processed, + # or load cached results from the disk + indicators = calculate_and_load_indicators_inline( + strategy_universe=strategy_universe, + parameters=StrategyParameters.from_class(Parameters), + create_indicators=create_indicators, + ) + + # From calculated indicators, read one indicator (RSI for BTC) + wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WBTC", "USDC")) + rsi = indicators.get_indicator_series("rsi", pair=wbtc_usdc) + assert len(rsi) == 214 # We have series data for 214 days + + + + + diff --git a/tradeexecutor/strategy/pandas_trader/indicator.py b/tradeexecutor/strategy/pandas_trader/indicator.py index 5c9ebcd8d..2ad9f372a 100644 --- a/tradeexecutor/strategy/pandas_trader/indicator.py +++ b/tradeexecutor/strategy/pandas_trader/indicator.py @@ -19,6 +19,7 @@ from abc import ABC, abstractmethod from collections import defaultdict +from IPython import get_ipython from skopt.space import Dimension # Enable pickle patch that allows multiprocessing in notebooks @@ -48,11 +49,12 @@ from tqdm_loggable.auto import tqdm from tradeexecutor.state.identifier import TradingPairIdentifier -from tradeexecutor.strategy.execution_context import ExecutionContext +from tradeexecutor.strategy.execution_context import ExecutionContext, notebook_execution_context from tradeexecutor.strategy.parameters import StrategyParameters from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse, UniverseCacheKey from tradeexecutor.utils.cpu import get_safe_max_workers_count from tradeexecutor.utils.python_function import hash_function +from tradingstrategy.client import Client from tradingstrategy.pair import HumanReadableTradingPairDescription from tradingstrategy.utils.groupeduniverse import PairCandlesMissing @@ -1342,6 +1344,72 @@ def match_indicator( return result + def get_indicator_data_pairs_combined( + self, + name: str, + ) -> pd.Series: + """Get a DataFrame that contains indicator data for all pairs combined. + + - Allows to access the indicator data for all pairs as a combined dataframe. + + Example: + + .. code-block:: python + + def regime( + close: pd.Series, + pair: TradingPairIdentifier, + length: int, + dependency_resolver: IndicatorDependencyResolver, + ) -> pd.Series: + fast_sma: pd.Series = dependency_resolver.get_indicator_data("fast_sma", pair=pair, parameters={"length": length}) + return close > fast_sma + + def multipair(universe: TradingStrategyUniverse, dependency_resolver: IndicatorDependencyResolver) -> pd.DataFrame: + # Test multipair data resolution + series = dependency_resolver.get_indicator_data_pairs_combined("regime") + assert isinstance(series.index, pd.MultiIndex) + assert isinstance(series, pd.Series) + return series + # Change from pd.Series to pd.DataFrame with column "value" + # df = series.to_frame(name='value') + # assert df.columns == ["value"] + # return df + + def create_indicators(parameters: StrategyParameters, indicators: IndicatorSet, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext): + indicators.add("regime", regime, {"length": parameters.fast_sma}, order=2) + indicators.add("multipair", multipair, {}, IndicatorSource.strategy_universe, order=3) + + Output: + + .. code-block:: text + + pair_id timestamp + 1 2021-06-01 False + 2021-06-02 False + 2021-06-03 False + 2021-06-04 False + 2021-06-05 False + ... + 2 2021-12-27 True + 2021-12-28 True + 2021-12-29 False + 2021-12-30 False + 2021-12-31 False + + :param name: + An indicator that was previously calculated by its `order`. + + :return: + DataFrame with MultiIndex (pair_id, timestamp) + """ + + series_map = {pair.internal_id: self.get_indicator_data(name, pair=pair) for pair in self.strategy_universe.iterate_pairs()} + series_list = list(series_map.values()) + pair_ids = list(series_map.keys()) + combined = pd.concat(series_list, keys=pair_ids, names=['pair_id', 'timestamp']) + return combined + def get_indicator_data( self, name: str, @@ -1683,6 +1751,107 @@ def calculate_and_load_indicators( return result +def calculate_and_load_indicators_inline( + strategy_universe: TradingStrategyUniverse, + parameters: StrategyParameters, + indicator_storage_path=DEFAULT_INDICATOR_STORAGE_PATH, + execution_context: ExecutionContext = notebook_execution_context, + verbose=True, + indicator_set: IndicatorSet | None = None, + create_indicators: CreateIndicatorsProtocol = None, + storage: IndicatorStorage | None = None, +) -> "tradeexecutor.strategy.pandas_trader.strategy_input.StrategyInputIndicators": + """Calculate indicators in the notebook itself, before starting the backtest. + + - To be used within Jupyter Notebooks + + - Useful for single iteration backtests + + - Useful for accessing indicator data if you do not need a backtest + + Example: + + .. code-block:: python + + # Some example parameters we use to calculate indicators. + # E.g. RSI length + class Parameters: + rsi_length = 20 + sma_long = 200 + sma_short = 12 + + # Create indicators. + # Map technical indicator functions to their parameters, like length. + # You can also use hardcoded values, but we recommend passing in parameter dict, + # as this allows later to reuse the code for optimising/grid searches, etc. + def create_indicators( + timestamp, + parameters, + strategy_universe, + execution_context, + ) -> IndicatorSet: + indicator_set = IndicatorSet() + indicator_set.add("rsi", pandas_ta.rsi, {"length": parameters.rsi_length}) + indicator_set.add("sma_long", pandas_ta.sma, {"length": parameters.sma_long}) + indicator_set.add("sma_short", pandas_ta.sma, {"length": parameters.sma_short}) + return indicator_set + + # Calculate indicators - will spawn multiple worker processed, + # or load cached results from the disk + indicators = calculate_and_load_indicators_inline( + strategy_universe=strategy_universe, + parameters=StrategyParameters.from_class(Parameters), + create_indicators=create_indicators, + ) + + # From calculated indicators, read one indicator (RSI for BTC) + wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WBTC", "USDC")) + rsi = indicators.get_indicator_series("rsi", pair=wbtc_usdc) + assert len(rsi) == 214 # We have series data for 214 days + + """ + + # Hack to be able to run notebook with ipython from the command line + # https://stackoverflow.com/a/39662359/315168 + ipython = get_ipython().__class__.__name__ == "TerminalInteractiveShell" + + # TODO: Eliminate circulates + from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInputIndicators + + if storage is None: + storage = DiskIndicatorStorage( + indicator_storage_path, + strategy_universe.get_cache_key() + ) + + if create_indicators: + assert indicator_set is None, f"Cannot give both indicator_set and create_indicators" + indicator_set = prepare_indicators(create_indicators, parameters, strategy_universe, execution_context, timestamp=None) + + if ipython: + # Unable to fork + max_workers = 1 + else: + max_workers = get_safe_max_workers_count() + + indicator_result_map = calculate_and_load_indicators( + strategy_universe=strategy_universe, + storage=storage, + execution_context=execution_context, + parameters=parameters, + verbose=verbose, + indicators=indicator_set, + max_workers=max_workers, + ) + + return StrategyInputIndicators( + strategy_universe=strategy_universe, + indicator_results=indicator_result_map, + available_indicators=indicator_set, + timestamp=None, + ) + + def warm_up_indicator_cache( strategy_universe: TradingStrategyUniverse, storage: DiskIndicatorStorage, diff --git a/tradeexecutor/utils/binance.py b/tradeexecutor/utils/binance.py index b0ee171e2..2b5ce5dad 100644 --- a/tradeexecutor/utils/binance.py +++ b/tradeexecutor/utils/binance.py @@ -107,7 +107,7 @@ def fetch_binance_dataset( df, {symbol: pair for symbol, pair in zip(symbols, pairs)} ) - candle_df["pair_id"].replace(spot_symbol_map, inplace=True) + candle_df["pair_id"] = candle_df["pair_id"].replace(spot_symbol_map) candle_universe, stop_loss_candle_universe = load_candle_universe_from_dataframe( df=candle_df,