Quickstart¶
The recommended first run is shadow mode: your strategy executes through the live engine and risk checks, but no real orders are sent to the broker.
First Strategy¶
import asyncio
from ml4t.backtest import Strategy
from ml4t.backtest.types import OrderSide
from ml4t.live import AlpacaBroker, AlpacaDataFeed, LiveEngine, LiveRiskConfig, SafeBroker
class BuyOnceStrategy(Strategy):
def on_data(self, timestamp, data, context, broker):
bar = data.get("SPY")
if bar is None:
return
if broker.get_position("SPY") is None:
broker.submit_order("SPY", 10, side=OrderSide.BUY)
async def main():
broker = AlpacaBroker(api_key="...", secret_key="...", paper=True)
feed = AlpacaDataFeed(
api_key="...",
secret_key="...",
symbols=["SPY"],
data_type="bars",
)
safe_broker = SafeBroker(
broker,
LiveRiskConfig(
shadow_mode=True,
max_position_value=25_000,
max_order_value=5_000,
),
)
engine = LiveEngine(BuyOnceStrategy(), safe_broker, feed)
await engine.connect()
try:
await engine.run()
finally:
await engine.stop()
asyncio.run(main())
Why This Works¶
- Your strategy stays synchronous, just like in
ml4t-backtest LiveEngineruns broker/feed I/O asynchronouslyThreadSafeBrokerWrapperlets strategy code callbroker.submit_order(...)safelySafeBrokerenforces limits before any live order can be placed
Deployment Progression¶
- Shadow mode:
shadow_mode=True - Paper trading:
shadow_mode=Falsewith paper broker credentials - Small live size: conservative limits and low notional exposure
- Gradual scale-up only after observing stable behavior
Common Variations¶
Interactive Brokers Feed¶
IBDataFeed needs a connected IB session object:
broker = IBBroker(port=7497)
await broker.connect()
feed = IBDataFeed(broker.ib, symbols=["SPY", "QQQ"])
Aggregate Ticks Into Bars¶
raw_feed = IBDataFeed(broker.ib, symbols=["SPY"])
feed = BarAggregator(raw_feed, bar_size_minutes=1, flush_timeout_seconds=2.0)