Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example messaging with actor & data #2407

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/backtest/example_10_messaging_with_actor_data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Example - Messaging with Actor Data

This example demonstrates how to work with custom data classes
and the Actor's publish/subscribe mechanism in NautilusTrader.

## What You'll Learn

- How to create custom data classes (both serializable and non-serializable)
- How to publish and subscribe to custom data using Actor methods
- How to handle custom data events in your strategy

## Implementation Details

The strategy showcases two approaches to custom data classes:
- `Last10BarsStats`: A simple non-serializable data
- `Last10BarsStatsSerializable`: A serializable data showing proper setup for data persistence and transfer between nodes
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3
# -------------------------------------------------------------------------------------------------
# Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
# https://nautechsystems.io
#
# Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -------------------------------------------------------------------------------------------------

from decimal import Decimal

from strategy import DemoStrategy
from strategy import DemoStrategyConfig

from examples.utils.data_provider import prepare_demo_data_eurusd_futures_1min
from nautilus_trader.backtest.engine import BacktestEngine
from nautilus_trader.config import BacktestEngineConfig
from nautilus_trader.config import LoggingConfig
from nautilus_trader.model import Bar
from nautilus_trader.model import TraderId
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.enums import AccountType
from nautilus_trader.model.enums import OmsType
from nautilus_trader.model.identifiers import Venue
from nautilus_trader.model.instruments.base import Instrument
from nautilus_trader.model.objects import Money


if __name__ == "__main__":

# ----------------------------------------------------------------------------------
# 1. Configure and create backtest engine
# ----------------------------------------------------------------------------------

engine_config = BacktestEngineConfig(
trader_id=TraderId("BACKTEST-DATA-PUB-001"), # Unique identifier for this backtest
logging=LoggingConfig(log_level="INFO"),
)
engine = BacktestEngine(config=engine_config)

# ----------------------------------------------------------------------------------
# 2. Prepare market data
# ----------------------------------------------------------------------------------

prepared_data: dict = prepare_demo_data_eurusd_futures_1min()
venue_name: str = prepared_data["venue_name"]
eurusd_instrument: Instrument = prepared_data["instrument"]
eurusd_1min_bartype = prepared_data["bar_type"]
eurusd_1min_bars: list[Bar] = prepared_data["bars_list"]

# ----------------------------------------------------------------------------------
# 3. Configure trading environment
# ----------------------------------------------------------------------------------

# Set up the trading venue with a margin account
engine.add_venue(
venue=Venue(venue_name),
oms_type=OmsType.NETTING, # Use a netting order management system
account_type=AccountType.MARGIN, # Use a margin trading account
starting_balances=[Money(1_000_000, USD)], # Set initial capital
base_currency=USD, # Account currency
default_leverage=Decimal(1), # No leverage (1:1)
)

# Register the trading instrument
engine.add_instrument(eurusd_instrument)

# Load historical market data
engine.add_data(eurusd_1min_bars)

# ----------------------------------------------------------------------------------
# 4. Configure and run strategy
# ----------------------------------------------------------------------------------

# Create strategy configuration
strategy_config = DemoStrategyConfig(
instrument=eurusd_instrument,
bar_type=eurusd_1min_bartype,
)

# Create and register the strategy
strategy = DemoStrategy(config=strategy_config)
engine.add_strategy(strategy)

# Execute the backtest
engine.run()

# Clean up resources
engine.dispose()
147 changes: 147 additions & 0 deletions examples/backtest/example_10_messaging_with_actor_data/strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# -------------------------------------------------------------------------------------------------
# Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
# https://nautechsystems.io
#
# Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -------------------------------------------------------------------------------------------------

from nautilus_trader.common.enums import LogColor
from nautilus_trader.config import StrategyConfig
from nautilus_trader.core.data import Data
from nautilus_trader.core.datetime import unix_nanos_to_dt
from nautilus_trader.model import InstrumentId
from nautilus_trader.model.custom import customdataclass
from nautilus_trader.model.data import Bar
from nautilus_trader.model.data import BarType
from nautilus_trader.model.data import DataType
from nautilus_trader.model.instruments import Instrument
from nautilus_trader.trading.strategy import Strategy


class Last10BarsStats(Data):
"""
A data class for storing statistics of the last 10 bars.

This class inherits from Data class which is required for using Actor/Strategy publish/subscribe methods.
The Data class inheritance automatically provides:
- `ts_event` attribute: Used for proper data ordering in backtests
- `ts_init` attribute: Used for initialization time tracking

Since this class doesn't use @customdataclass decorator, it can have attributes of any Python types
(like the complex BarType attribute). This is suitable for strategies where data
doesn't need to be serialized, transferred between nodes or persisted.

"""

bar_type: BarType
last_bar_index: int = 0
max_price: float = 0.0
min_price: float = 0.0
volume_total: float = 0.0


# Just an example of a serializable data class, we don't use it in this strategy
@customdataclass
class Last10BarsStatsSerializable(Data):
"""
A serializable data class for storing statistics of the last 10 bars.

This class uses the @customdataclass decorator which adds serialization capabilities
required for:
- Data persistence in the catalog system
- Data transfer between different nodes
- Automatic serialization methods: to_dict(), from_dict(), to_bytes(), to_arrow()

Note: When using @customdataclass, attributes must be of supported types only:
- InstrumentId
- Basic types: str, bool, float, int, bytes, ndarray

This example demonstrates proper usage of @customdataclass, though in this simple
strategy we use `Last10BarsStats` instead as we don't need serialization capabilities.

"""

instrument_id: InstrumentId
last_bar_index: int = 0
max_price: float = 0.0
min_price: float = 0.0
volume_total: float = 0.0


class DemoStrategyConfig(StrategyConfig, frozen=True):
instrument: Instrument
bar_type: BarType


class DemoStrategy(Strategy):
"""
A demonstration strategy showing how to publish and subscribe to custom data.
"""

def __init__(self, config: DemoStrategyConfig):
super().__init__(config)

# Counter for processed bars
self.bars_processed = 0

def on_start(self):
# Subscribe to market data
self.subscribe_bars(self.config.bar_type)
self.log.info(f"Subscribed to {self.config.bar_type}", color=LogColor.YELLOW)

# Subscribe to our custom data type
self.subscribe_data(DataType(Last10BarsStats))
self.log.info("Subscribed to data of type: Last10BarsStatistics", color=LogColor.YELLOW)

def on_bar(self, bar: Bar):
# Count processed bars
self.bars_processed += 1
self.log.info(
f"Bar #{self.bars_processed} | "
f"Bar: {bar} | "
f"Time={unix_nanos_to_dt(bar.ts_event)}",
)

# Every 10th bar, publish our custom data
if self.bars_processed % 10 == 0:
# Log our plans
self.log.info(
"Going to publish data of type: Last10BarsStatistics",
color=LogColor.GREEN,
)

# Get the last 10 bars from the cache
last_10_bars_list = self.cache.bars(self.config.bar_type)[:10]
# Create data object
data = Last10BarsStats(
bar_type=bar.bar_type,
last_bar_index=self.bars_processed - 1,
max_price=max(bar.high for bar in last_10_bars_list),
min_price=min(bar.low for bar in last_10_bars_list),
volume_total=sum(bar.volume for bar in last_10_bars_list),
ts_event=bar.ts_event, # This field was added by @customdataclass decorator
ts_init=bar.ts_init, # This field was added by @customdataclass decorator
)
# Publish the data
self.publish_data(DataType(Last10BarsStats), data)

def on_data(self, data: Data):
"""
Process received data from subscribed data sources.
"""
if isinstance(data, Last10BarsStats):
self.log.info(
f"Received Last10BarsStatistics data: {data}",
color=LogColor.RED,
)

def on_stop(self):
self.log.info(f"Strategy stopped. Processed {self.bars_processed} bars.")
Loading