Skip to content

Commit

Permalink
Calculate and load indicators inline (#1056)
Browse files Browse the repository at this point in the history
- Create `calculate_and_load_indicators_inline` which allows easy access to indicator data within the notebook
  • Loading branch information
miohtama authored Oct 8, 2024
1 parent 681baf1 commit d9d8e9a
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 12 deletions.
2 changes: 1 addition & 1 deletion deps/trading-strategy
70 changes: 61 additions & 9 deletions tests/backtest/test_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
import pandas as pd
import pandas_ta
import pytest
import futureproof
import inspect
from pyasn1_modules.rfc3779 import id_pe_ipAddrBlocks

from tradeexecutor.state.identifier import AssetIdentifier, TradingPairIdentifier
from tradeexecutor.strategy.execution_context import ExecutionContext, unit_test_execution_context, unit_test_trading_execution_context
from tradeexecutor.strategy.pandas_trader.indicator import (
IndicatorSet, DiskIndicatorStorage, IndicatorDefinition, IndicatorFunctionSignatureMismatch,
calculate_and_load_indicators, IndicatorKey, IndicatorSource, IndicatorDependencyResolver,
IndicatorOrderError, IndicatorCalculationFailed, MemoryIndicatorStorage,
calculate_and_load_indicators, IndicatorKey, IndicatorSource, IndicatorDependencyResolver,
IndicatorCalculationFailed, MemoryIndicatorStorage, calculate_and_load_indicators_inline,
)
from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInputIndicators
from tradeexecutor.strategy.parameters import StrategyParameters
Expand All @@ -24,10 +23,8 @@
from tradeexecutor.testing.synthetic_ethereum_data import generate_random_ethereum_address
from tradeexecutor.testing.synthetic_exchange_data import generate_exchange
from tradeexecutor.testing.synthetic_price_data import generate_multi_pair_candles
from tradeexecutor.utils.python_function import hash_function
from tradingstrategy.candle import GroupedCandleUniverse
from tradingstrategy.chain import ChainId
from tradingstrategy.pair import HumanReadableTradingPairDescription
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.universe import Universe

Expand Down Expand Up @@ -796,9 +793,20 @@ def regime(
fast_sma: pd.Series = dependency_resolver.get_indicator_data("fast_sma", pair=pair, parameters={"length": length})
return close > fast_sma

def multipair(universe: TradingStrategyUniverse, dependency_resolver: IndicatorDependencyResolver) -> pd.DataFrame:
# Test multipair data resolution
series = dependency_resolver.get_indicator_data_pairs_combined("regime")
assert isinstance(series.index, pd.MultiIndex)
assert isinstance(series, pd.Series)
# Change from pd.Series to pd.DataFrame with column "value"
df = series.to_frame(name='value')
assert df.columns == ["value"]
return df

def create_indicators(parameters: StrategyParameters, indicators: IndicatorSet, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext):
indicators.add("fast_sma", pandas_ta.sma, {"length": parameters.fast_sma}, order=1)
indicators.add("regime", regime, {"length": parameters.fast_sma}, order=2)
indicators.add("multipair", multipair, {}, IndicatorSource.strategy_universe, order=3)

class MyParameters:
fast_sma = 20
Expand All @@ -818,7 +826,7 @@ class MyParameters:
wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, exchange.exchange_slug, "WBTC", "USDC"))

keys = list(indicator_result.keys())
keys = sorted(keys, key=lambda k: (k.pair.internal_id, k.definition.name)) # Ensure we read set in deterministic order
keys = sorted(keys, key=lambda k: (k.pair.internal_id if k.pair else 9_999_999, k.definition.name)) # Ensure we read set in deterministic order

# Check our pair x indicator matrix
assert keys[0].pair== weth_usdc
Expand All @@ -834,7 +842,7 @@ class MyParameters:

for result in indicator_result.values():
assert not result.cached
assert isinstance(result.data, pd.Series)
assert isinstance(result.data, (pd.Series, pd.DataFrame))
assert len(result.data) > 0

# Run with higher workers count should still work since it should force single thread
Expand All @@ -851,4 +859,48 @@ class MyParameters:

assert logger_mock.call_count == 1
assert logger_mock.call_args[0][0] == "MemoryIndicatorStorage does not support multiprocessing, setting max_workers and max_readers to 1"



def test_calculate_indicators_inline(strategy_universe):
"""Test calculate_and_load_indicators_inline() """

# Some example parameters we use to calculate indicators.
# E.g. RSI length
class Parameters:
rsi_length = 20
sma_long = 200
sma_short = 12

# Create indicators.
# Map technical indicator functions to their parameters, like length.
# You can also use hardcoded values, but we recommend passing in parameter dict,
# as this allows later to reuse the code for optimising/grid searches, etc.
def create_indicators(
timestamp,
parameters,
strategy_universe,
execution_context,
) -> IndicatorSet:
indicator_set = IndicatorSet()
indicator_set.add("rsi", pandas_ta.rsi, {"length": parameters.rsi_length})
indicator_set.add("sma_long", pandas_ta.sma, {"length": parameters.sma_long})
indicator_set.add("sma_short", pandas_ta.sma, {"length": parameters.sma_short})
return indicator_set

# Calculate indicators - will spawn multiple worker processed,
# or load cached results from the disk
indicators = calculate_and_load_indicators_inline(
strategy_universe=strategy_universe,
parameters=StrategyParameters.from_class(Parameters),
create_indicators=create_indicators,
)

# From calculated indicators, read one indicator (RSI for BTC)
wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WBTC", "USDC"))
rsi = indicators.get_indicator_series("rsi", pair=wbtc_usdc)
assert len(rsi) == 214 # We have series data for 214 days





171 changes: 170 additions & 1 deletion tradeexecutor/strategy/pandas_trader/indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict

from IPython import get_ipython
from skopt.space import Dimension

# Enable pickle patch that allows multiprocessing in notebooks
Expand Down Expand Up @@ -48,11 +49,12 @@
from tqdm_loggable.auto import tqdm

from tradeexecutor.state.identifier import TradingPairIdentifier
from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.execution_context import ExecutionContext, notebook_execution_context
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse, UniverseCacheKey
from tradeexecutor.utils.cpu import get_safe_max_workers_count
from tradeexecutor.utils.python_function import hash_function
from tradingstrategy.client import Client
from tradingstrategy.pair import HumanReadableTradingPairDescription
from tradingstrategy.utils.groupeduniverse import PairCandlesMissing

Expand Down Expand Up @@ -1342,6 +1344,72 @@ def match_indicator(

return result

def get_indicator_data_pairs_combined(
self,
name: str,
) -> pd.Series:
"""Get a DataFrame that contains indicator data for all pairs combined.
- Allows to access the indicator data for all pairs as a combined dataframe.
Example:
.. code-block:: python
def regime(
close: pd.Series,
pair: TradingPairIdentifier,
length: int,
dependency_resolver: IndicatorDependencyResolver,
) -> pd.Series:
fast_sma: pd.Series = dependency_resolver.get_indicator_data("fast_sma", pair=pair, parameters={"length": length})
return close > fast_sma
def multipair(universe: TradingStrategyUniverse, dependency_resolver: IndicatorDependencyResolver) -> pd.DataFrame:
# Test multipair data resolution
series = dependency_resolver.get_indicator_data_pairs_combined("regime")
assert isinstance(series.index, pd.MultiIndex)
assert isinstance(series, pd.Series)
return series
# Change from pd.Series to pd.DataFrame with column "value"
# df = series.to_frame(name='value')
# assert df.columns == ["value"]
# return df
def create_indicators(parameters: StrategyParameters, indicators: IndicatorSet, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext):
indicators.add("regime", regime, {"length": parameters.fast_sma}, order=2)
indicators.add("multipair", multipair, {}, IndicatorSource.strategy_universe, order=3)
Output:
.. code-block:: text
pair_id timestamp
1 2021-06-01 False
2021-06-02 False
2021-06-03 False
2021-06-04 False
2021-06-05 False
...
2 2021-12-27 True
2021-12-28 True
2021-12-29 False
2021-12-30 False
2021-12-31 False
:param name:
An indicator that was previously calculated by its `order`.
:return:
DataFrame with MultiIndex (pair_id, timestamp)
"""

series_map = {pair.internal_id: self.get_indicator_data(name, pair=pair) for pair in self.strategy_universe.iterate_pairs()}
series_list = list(series_map.values())
pair_ids = list(series_map.keys())
combined = pd.concat(series_list, keys=pair_ids, names=['pair_id', 'timestamp'])
return combined

def get_indicator_data(
self,
name: str,
Expand Down Expand Up @@ -1683,6 +1751,107 @@ def calculate_and_load_indicators(
return result


def calculate_and_load_indicators_inline(
strategy_universe: TradingStrategyUniverse,
parameters: StrategyParameters,
indicator_storage_path=DEFAULT_INDICATOR_STORAGE_PATH,
execution_context: ExecutionContext = notebook_execution_context,
verbose=True,
indicator_set: IndicatorSet | None = None,
create_indicators: CreateIndicatorsProtocol = None,
storage: IndicatorStorage | None = None,
) -> "tradeexecutor.strategy.pandas_trader.strategy_input.StrategyInputIndicators":
"""Calculate indicators in the notebook itself, before starting the backtest.
- To be used within Jupyter Notebooks
- Useful for single iteration backtests
- Useful for accessing indicator data if you do not need a backtest
Example:
.. code-block:: python
# Some example parameters we use to calculate indicators.
# E.g. RSI length
class Parameters:
rsi_length = 20
sma_long = 200
sma_short = 12
# Create indicators.
# Map technical indicator functions to their parameters, like length.
# You can also use hardcoded values, but we recommend passing in parameter dict,
# as this allows later to reuse the code for optimising/grid searches, etc.
def create_indicators(
timestamp,
parameters,
strategy_universe,
execution_context,
) -> IndicatorSet:
indicator_set = IndicatorSet()
indicator_set.add("rsi", pandas_ta.rsi, {"length": parameters.rsi_length})
indicator_set.add("sma_long", pandas_ta.sma, {"length": parameters.sma_long})
indicator_set.add("sma_short", pandas_ta.sma, {"length": parameters.sma_short})
return indicator_set
# Calculate indicators - will spawn multiple worker processed,
# or load cached results from the disk
indicators = calculate_and_load_indicators_inline(
strategy_universe=strategy_universe,
parameters=StrategyParameters.from_class(Parameters),
create_indicators=create_indicators,
)
# From calculated indicators, read one indicator (RSI for BTC)
wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WBTC", "USDC"))
rsi = indicators.get_indicator_series("rsi", pair=wbtc_usdc)
assert len(rsi) == 214 # We have series data for 214 days
"""

# Hack to be able to run notebook with ipython from the command line
# https://stackoverflow.com/a/39662359/315168
ipython = get_ipython().__class__.__name__ == "TerminalInteractiveShell"

# TODO: Eliminate circulates
from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInputIndicators

if storage is None:
storage = DiskIndicatorStorage(
indicator_storage_path,
strategy_universe.get_cache_key()
)

if create_indicators:
assert indicator_set is None, f"Cannot give both indicator_set and create_indicators"
indicator_set = prepare_indicators(create_indicators, parameters, strategy_universe, execution_context, timestamp=None)

if ipython:
# Unable to fork
max_workers = 1
else:
max_workers = get_safe_max_workers_count()

indicator_result_map = calculate_and_load_indicators(
strategy_universe=strategy_universe,
storage=storage,
execution_context=execution_context,
parameters=parameters,
verbose=verbose,
indicators=indicator_set,
max_workers=max_workers,
)

return StrategyInputIndicators(
strategy_universe=strategy_universe,
indicator_results=indicator_result_map,
available_indicators=indicator_set,
timestamp=None,
)


def warm_up_indicator_cache(
strategy_universe: TradingStrategyUniverse,
storage: DiskIndicatorStorage,
Expand Down
2 changes: 1 addition & 1 deletion tradeexecutor/utils/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def fetch_binance_dataset(
df, {symbol: pair for symbol, pair in zip(symbols, pairs)}
)

candle_df["pair_id"].replace(spot_symbol_map, inplace=True)
candle_df["pair_id"] = candle_df["pair_id"].replace(spot_symbol_map)

candle_universe, stop_loss_candle_universe = load_candle_universe_from_dataframe(
df=candle_df,
Expand Down

0 comments on commit d9d8e9a

Please sign in to comment.