API Reference¶
Core Classes¶
LiveEngine
¶
LiveEngine(
strategy,
broker,
feed,
*,
on_error=None,
halt_on_error=False,
feed_silence_seconds=None,
watchdog_poll_seconds=1.0,
halt_on_unhealthy=False,
auto_recover=False,
recovery_cooldown_seconds=5.0,
max_recovery_attempts=3,
on_health_change=None,
)
Async live trading engine.
Bridges async infrastructure with sync Strategy.on_data().
Initialize LiveEngine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy
|
Strategy
|
Strategy instance to execute. |
required |
broker
|
AsyncBrokerProtocol
|
Async broker implementation. |
required |
feed
|
DataFeedProtocol
|
Data feed providing timestamp, data, context tuples. |
required |
on_error
|
Callable[[Exception, datetime, dict], None] | None
|
Custom error handler callback. |
None
|
halt_on_error
|
bool
|
Stop engine on strategy exceptions. |
False
|
feed_silence_seconds
|
float | None
|
Optional threshold for degraded feed reporting. |
None
|
watchdog_poll_seconds
|
float
|
Poll interval for runtime health monitoring. |
1.0
|
halt_on_unhealthy
|
bool
|
Stop the engine when watchdog detects a degraded state. |
False
|
auto_recover
|
bool
|
Attempt reconnect/restart when watchdog detects a recoverable state. |
False
|
recovery_cooldown_seconds
|
float
|
Delay between recovery attempts. |
5.0
|
max_recovery_attempts
|
int
|
Maximum recovery attempts before stopping. |
3
|
on_health_change
|
Callable[[str, dict[str, Any]], None] | None
|
Optional callback invoked when runtime health changes. |
None
|
Source code in src/ml4t/live/engine.py
connect
async
¶
Connect to broker and data feed.
Must be called before run().
Source code in src/ml4t/live/engine.py
run
async
¶
Main async loop - receives bars and dispatches to strategy.
Source code in src/ml4t/live/engine.py
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | |
stop
async
¶
Graceful shutdown.
Source code in src/ml4t/live/engine.py
runtime_status
¶
Return engine runtime health and session context.
Source code in src/ml4t/live/engine.py
LiveRiskConfig
dataclass
¶
LiveRiskConfig(
max_position_value=50000.0,
max_position_shares=1000,
max_total_exposure=200000.0,
max_positions=20,
max_order_value=10000.0,
max_order_shares=500,
max_orders_per_minute=10,
max_daily_loss=5000.0,
max_drawdown_pct=0.05,
max_price_deviation_pct=0.05,
max_data_staleness_seconds=60.0,
dedup_window_seconds=1.0,
allowed_assets=set(),
blocked_assets=set(),
shadow_mode=False,
kill_switch_enabled=False,
fail_on_reconciliation_mismatch=False,
state_file=".ml4t_risk_state.json",
journal_file=None,
)
Risk configuration for live trading.
Multiple layers of protection - all limits are optional. Set to inf/large values to disable specific checks.
Example
Conservative configuration¶
config = LiveRiskConfig( max_position_value=25_000.0, max_daily_loss=2_000.0, shadow_mode=True, # Always start with shadow mode! )
Disable specific checks¶
config = LiveRiskConfig( max_position_value=float('inf'), # No position limit max_daily_loss=10_000.0, # Only daily loss limit )
Safety Recommendations
- Always start with shadow_mode=True
- Graduate to paper trading
- Use small positions when going live
- Set conservative risk limits
__post_init__
¶
Validate configuration parameters.
Source code in src/ml4t/live/safety.py
SafeBroker
¶
Risk-controlled wrapper with state persistence.
Addresses Gemini v1: "If script crashes and restarts, SafeBroker resets max_daily_loss to 0. A losing strategy could burn through the limit again."
Addresses Gemini v2: - Critical Issue A: VirtualPortfolio for shadow mode - Memory leaks: _recent_orders pruned even if dedup disabled - Atomic JSON writes: write to .tmp then os.replace()
Safety Features: 1. Pre-trade validation against all risk limits 2. Order rate limiting 3. Drawdown monitoring with kill switch 4. Fat finger protection (price deviation check) 5. Stale data protection 6. Duplicate order filter 7. Shadow mode with VirtualPortfolio (realistic paper trading) 8. State persistence across restarts
Example
broker = IBBroker() await broker.connect()
safe = SafeBroker( broker=broker, config=LiveRiskConfig( max_position_value=25000, shadow_mode=True, # Test first! ) )
Use safe in strategy¶
engine = LiveEngine(strategy, safe, feed)
Initialize SafeBroker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
broker
|
AsyncBrokerProtocol
|
Async broker implementation (IBBroker, AlpacaBroker, etc.) |
required |
config
|
LiveRiskConfig
|
Risk configuration |
required |
Source code in src/ml4t/live/safety.py
positions
property
¶
Get current positions.
In shadow mode, returns virtual positions. In live mode, returns broker positions.
reconciliation_report
property
¶
Return the latest startup reconciliation report.
get_position
¶
Get position for specific asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object or None |
Source code in src/ml4t/live/safety.py
get_account_value_async
async
¶
Get total account value (async).
Returns:
| Type | Description |
|---|---|
float
|
Total account value in base currency |
Source code in src/ml4t/live/safety.py
get_cash_async
async
¶
Get available cash (async).
Returns:
| Type | Description |
|---|---|
float
|
Available cash in base currency |
cancel_order_async
async
¶
Cancel pending order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
ID of order to cancel |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancel request submitted |
Source code in src/ml4t/live/safety.py
close_position_async
async
¶
Close entire position.
Close positions bypass normal limits (safety feature).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol to close |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists |
Source code in src/ml4t/live/safety.py
replace_order_async
async
¶
Replace a pending order via cancel-and-resubmit.
Source code in src/ml4t/live/safety.py
submit_order_async
async
¶
submit_order_async(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order with full risk validation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
quantity
|
int
|
Number of shares/contracts |
required |
side
|
OrderSide | None
|
Order side (BUY/SELL), auto-detected from quantity if None |
None
|
order_type
|
OrderType
|
Type of order |
MARKET
|
limit_price
|
float | None
|
Limit price for LIMIT/STOP_LIMIT orders |
None
|
stop_price
|
float | None
|
Stop price for STOP/STOP_LIMIT orders |
None
|
**kwargs
|
Any
|
Additional broker-specific parameters |
{}
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object |
Raises:
| Type | Description |
|---|---|
RiskLimitError
|
If order violates any risk limit |
Source code in src/ml4t/live/safety.py
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 | |
record_market_snapshot
¶
Cache a single price observation for the staleness guard.
The supported way to keep the cache fresh is the streaming path:
a Feed (e.g. IBDataFeed) emits ticks, LiveEngine shuttles
them into _record_market_data on every bar, and the cache stays
current automatically. Use that for any continuous-loop deployment.
This method is the non-streaming escape hatch for one-shot flows that legitimately have no tick stream in front of the broker:
- A CLI flatten tool that takes a position list and submits MOC/MARKET closeouts.
- A REST-only broker adapter that fetches quotes synchronously per request rather than via a streaming feed.
- A test harness setting up controlled state.
It is not the right tool inside a notebook that already runs a
live engine — there the streaming path covers staleness implicitly.
Reaching for record_market_snapshot from inside a tick-driven
flow is a code smell: it usually means the engine isn't actually
wired up, and the snapshot will go stale silently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Symbol to record. |
required |
price
|
float
|
Reference price (e.g. last close, mid quote, snapshot top-of-book mid). Must be > 0. |
required |
timestamp
|
datetime | None
|
Bar/quote timestamp. Defaults to now (UTC). The
|
None
|
Source code in src/ml4t/live/safety.py
enable_kill_switch
¶
Manually enable kill switch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reason
|
str
|
Reason for activation (default: "Manual") |
'Manual'
|
disable_kill_switch
¶
Manually disable kill switch (use with caution!).
Source code in src/ml4t/live/safety.py
close_all_positions
async
¶
Emergency close all positions.
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of close orders |
Source code in src/ml4t/live/safety.py
record_event
¶
Append a structured runtime event to the execution journal.
Source code in src/ml4t/live/safety.py
preview_reconciliation_async
async
¶
Build the current runtime reconciliation report without mutating state.
Source code in src/ml4t/live/safety.py
preflight_async
async
¶
Probe broker reachability and startup reconciliation without persisting state.
Source code in src/ml4t/live/safety.py
connect
async
¶
Connect to broker and reconcile persisted state.
Source code in src/ml4t/live/safety.py
disconnect
async
¶
Disconnect from broker and save state.
Source code in src/ml4t/live/safety.py
is_connected_async
async
¶
get_positions_async
async
¶
Get all positions (async).
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbol to Position |
Source code in src/ml4t/live/safety.py
get_pending_orders_async
async
¶
Get pending orders (async).
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending orders |
get_position_async
async
¶
Get position (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object or None |
Source code in src/ml4t/live/safety.py
ThreadSafeBrokerWrapper
¶
Wraps an async broker for use from sync strategy code.
This wrapper is passed to Strategy.on_data() instead of the raw broker. It bridges the sync/async boundary by scheduling coroutines on the main event loop and blocking the worker thread until they complete.
Thread Safety: - Strategy runs in worker thread (via asyncio.to_thread) - Broker methods run on main event loop - run_coroutine_threadsafe() handles the cross-thread communication
Timeouts (from design review): - Getters (get_cash, get_account_value): 5s - Order operations (submit, cancel, close): 30s
Example
LiveEngine creates this wrapper¶
loop = asyncio.get_running_loop() wrapped = ThreadSafeBrokerWrapper(ib_broker, loop)
Strategy uses it like a normal sync broker¶
order = wrapped.submit_order('AAPL', 100, OrderSide.BUY)
Note
This class implements BrokerProtocol but does not inherit from it. It provides a sync interface backed by async operations.
Initialize thread-safe wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_broker
|
AsyncBrokerProtocol
|
Async broker implementation (IBBroker, etc.) |
required |
loop
|
AbstractEventLoop
|
Main event loop (from asyncio.get_running_loop()) |
required |
Source code in src/ml4t/live/wrappers.py
positions
property
¶
Get current positions (thread-safe read).
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbol to Position |
pending_orders
property
¶
Get pending orders (thread-safe read).
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending Order objects |
is_connected
property
¶
Check if broker is connected.
Returns:
| Type | Description |
|---|---|
bool
|
True if connected and ready to trade |
get_position
¶
Get position for specific asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., "AAPL") |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object if holding position, None otherwise |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
get_account_value
¶
Get total account value (cash + positions).
Returns:
| Type | Description |
|---|---|
float
|
Total account value in base currency |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (5s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
get_cash
¶
Get available cash balance.
Returns:
| Type | Description |
|---|---|
float
|
Available cash in base currency |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (5s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
submit_order
¶
submit_order(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., "AAPL") |
required |
quantity
|
int
|
Number of shares/contracts (positive for buy, negative for sell) |
required |
side
|
OrderSide | None
|
Order side (BUY/SELL), auto-detected from quantity if None |
None
|
order_type
|
OrderType
|
Type of order (MARKET, LIMIT, STOP, etc.) |
MARKET
|
limit_price
|
float | None
|
Limit price for LIMIT/STOP_LIMIT orders |
None
|
stop_price
|
float | None
|
Stop price for STOP/STOP_LIMIT orders |
None
|
**kwargs
|
Any
|
Additional broker-specific parameters |
{}
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object with order_id and initial status |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (30s) |
ValueError
|
If order parameters are invalid |
RuntimeError
|
If broker is not connected or error occurs |
Source code in src/ml4t/live/wrappers.py
cancel_order
¶
Cancel pending order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
ID of order to cancel |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancel request submitted, False if order not found |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (30s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
replace_order
¶
Replace a pending order with updated parameters.
Source code in src/ml4t/live/wrappers.py
close_position
¶
Close entire position in asset.
Convenience method that submits a closing order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol to close |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists, None if no position |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (30s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
Brokers¶
IBBroker
¶
Bases: AsyncBrokerProtocol
Interactive Brokers implementation.
Design: - All broker operations are async - Uses asyncio.Lock for thread safety - Event handlers use put_nowait() (non-blocking) - Reconnection handled externally
Connection Ports: - TWS Paper: 7497 - TWS Live: 7496 - Gateway Paper: 4002 - Gateway Live: 4001
Example
broker = IBBroker(port=7497) # Paper trading await broker.connect() positions = await broker.get_positions_async() await broker.disconnect()
Initialize IBBroker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
IB Gateway/TWS host (default: '127.0.0.1') |
'127.0.0.1'
|
port
|
int
|
IB Gateway/TWS port (default: 7497 for paper) |
7497
|
client_id
|
int
|
Unique client ID (default: 1) |
1
|
account
|
str | None
|
IB account ID (default: use first account) |
None
|
market_data_type
|
int | None
|
IB market-data type to request after connect.
|
None
|
Source code in src/ml4t/live/brokers/ib.py
positions
property
¶
Thread-safe position access (Gemini v2 Critical Issue C).
Note: This is called from worker thread via ThreadSafeBrokerWrapper. The lock prevents RuntimeError during dict iteration if IB callback modifies positions concurrently.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
pending_orders
property
¶
Get list of pending orders.
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending Order objects |
connect
async
¶
Connect to IB Gateway/TWS.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If connection fails |
TimeoutError
|
If connection times out |
Source code in src/ml4t/live/brokers/ib.py
disconnect
async
¶
Disconnect from IB.
Source code in src/ml4t/live/brokers/ib.py
get_position
¶
Thread-safe single position access.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object if exists, None otherwise |
get_positions_async
async
¶
Async thread-safe position access with lock.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
Source code in src/ml4t/live/brokers/ib.py
get_account_value_async
async
¶
Get Net Liquidation Value.
Returns:
| Type | Description |
|---|---|
float
|
Account net liquidation value in USD |
Source code in src/ml4t/live/brokers/ib.py
get_cash_async
async
¶
Get available funds.
Returns:
| Type | Description |
|---|---|
float
|
Available funds in USD |
Source code in src/ml4t/live/brokers/ib.py
submit_order_async
async
¶
submit_order_async(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order to IB.
TASK-013: Full order submission implementation with IB order tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
quantity
|
float
|
Number of shares |
required |
side
|
OrderSide | None
|
BUY or SELL (auto-detected if None) |
None
|
order_type
|
OrderType
|
Market, limit, stop, or stop-limit |
MARKET
|
limit_price
|
float | None
|
Limit price for limit orders |
None
|
stop_price
|
float | None
|
Stop price for stop orders |
None
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If not connected |
ValueError
|
If order parameters are invalid |
Source code in src/ml4t/live/brokers/ib.py
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 | |
cancel_order_async
async
¶
Cancel pending order.
TASK-016: Full order cancellation implementation.
This method finds the IB order ID from our tracking map and cancels the order via the IB API. Handles edge cases like order not found or order already filled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
Order ID to cancel (e.g., 'ML4T-1') |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancellation request sent successfully, False otherwise |
Note
The actual cancellation is confirmed via _on_order_status callback when IB sends the 'Cancelled' status update.
Source code in src/ml4t/live/brokers/ib.py
replace_order_async
async
¶
Replace a pending order via cancel-and-resubmit.
Source code in src/ml4t/live/brokers/ib.py
close_position_async
async
¶
Close position in asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists, None otherwise |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Depends on TASK-013 |
Source code in src/ml4t/live/brokers/ib.py
AlpacaBroker
¶
Bases: AsyncBrokerProtocol
Alpaca Markets broker implementation.
Design (matching IBBroker patterns): - All broker operations are async - Uses asyncio.Lock for thread safety - WebSocket stream for real-time order updates - REST API for account/position queries and order submission
Paper vs Live: - paper=True (default): Uses paper trading endpoint - paper=False: Uses live trading endpoint (USE WITH CAUTION)
Example
broker = AlpacaBroker( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', paper=True, # Always start with paper trading! ) await broker.connect() positions = await broker.get_positions_async() await broker.disconnect()
Initialize AlpacaBroker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
Alpaca API key (from https://app.alpaca.markets) |
required |
secret_key
|
str
|
Alpaca secret key |
required |
paper
|
bool
|
Use paper trading endpoint (default: True) |
True
|
Source code in src/ml4t/live/brokers/alpaca.py
positions
property
¶
Thread-safe position access (shallow copy).
Note: This is called from worker thread via ThreadSafeBrokerWrapper. The shallow copy prevents RuntimeError during dict iteration.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
pending_orders
property
¶
Get list of pending orders.
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending Order objects |
connect
async
¶
Connect to Alpaca and sync initial state.
Steps: 1. Create TradingClient (REST) 2. Create TradingStream (WebSocket) 3. Verify connection by fetching account 4. Register trade update callback 5. Sync positions and open orders 6. Start WebSocket stream for order updates
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If connection fails |
Source code in src/ml4t/live/brokers/alpaca.py
disconnect
async
¶
Disconnect from Alpaca.
Source code in src/ml4t/live/brokers/alpaca.py
get_position
¶
Thread-safe single position access.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., 'AAPL' or 'BTC/USD') |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object if exists, None otherwise |
Source code in src/ml4t/live/brokers/alpaca.py
get_positions_async
async
¶
Async thread-safe position access with lock.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
Source code in src/ml4t/live/brokers/alpaca.py
get_account_value_async
async
¶
Get portfolio value (equity).
Returns:
| Type | Description |
|---|---|
float
|
Total account equity in USD |
Source code in src/ml4t/live/brokers/alpaca.py
get_cash_async
async
¶
Get available cash.
Returns:
| Type | Description |
|---|---|
float
|
Available cash in USD |
Source code in src/ml4t/live/brokers/alpaca.py
submit_order_async
async
¶
submit_order_async(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order to Alpaca.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., 'AAPL' or 'BTC/USD') |
required |
quantity
|
float
|
Number of shares/units |
required |
side
|
OrderSide | None
|
BUY or SELL (auto-detected from quantity sign if None) |
None
|
order_type
|
OrderType
|
Market, limit, stop, or stop-limit |
MARKET
|
limit_price
|
float | None
|
Limit price for limit orders |
None
|
stop_price
|
float | None
|
Stop price for stop orders |
None
|
**kwargs
|
Any
|
Additional parameters (ignored) |
{}
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If not connected |
ValueError
|
If order parameters are invalid |
Source code in src/ml4t/live/brokers/alpaca.py
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 | |
cancel_order_async
async
¶
Cancel pending order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
Order ID to cancel (e.g., 'ML4T-1') |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancellation request sent successfully, False otherwise |
Source code in src/ml4t/live/brokers/alpaca.py
replace_order_async
async
¶
Replace a pending order via cancel-and-resubmit.
Source code in src/ml4t/live/brokers/alpaca.py
close_position_async
async
¶
Close position in asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists, None otherwise |
Source code in src/ml4t/live/brokers/alpaca.py
Data Feeds¶
IBDataFeed
¶
Bases: DataFeedProtocol
Real-time market data feed from Interactive Brokers.
Subscribes to tick-by-tick market data for specified symbols. Emits data as (timestamp, data, context) tuples.
Data Format
timestamp: datetime - Tick timestamp data: dict[str, dict] - {symbol: {'price': float, 'size': int}} context: dict - Additional metadata (bid, ask, etc.)
Note
- IB must be connected before creating feed
- Requires market data subscription for symbols
- Throttles rapid ticks to avoid overwhelming strategy
Example
ib = IB() await ib.connectAsync('127.0.0.1', 7497, clientId=1)
feed = IBDataFeed(ib, symbols=['SPY', 'QQQ', 'IWM']) await feed.start()
Use directly or wrap with BarAggregator¶
aggregator = BarAggregator(feed, bar_size_minutes=1)
Initialize IB data feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ib
|
IB
|
Connected IB instance |
required |
symbols
|
list[str]
|
List of symbols to subscribe to |
required |
exchange
|
str
|
IB exchange (default: SMART routing) |
'SMART'
|
currency
|
str
|
Currency (default: USD) |
'USD'
|
tick_throttle_ms
|
int
|
Minimum milliseconds between tick emissions (prevents overwhelming strategy with rapid ticks) |
100
|
Source code in src/ml4t/live/feeds/ib_feed.py
stats
property
¶
Get feed statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with keys: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
start
async
¶
Subscribe to market data for all symbols.
Creates contracts and subscribes to real-time tick data.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If IB not connected |
Source code in src/ml4t/live/feeds/ib_feed.py
stop
¶
Unsubscribe from market data.
Cancels all market data subscriptions and stops feed.
Source code in src/ml4t/live/feeds/ib_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) where: |
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
Stops when
- stop() is called (None sentinel)
- Feed is not running
Source code in src/ml4t/live/feeds/ib_feed.py
AlpacaDataFeed
¶
Bases: DataFeedProtocol
Real-time market data feed from Alpaca Markets.
Subscribes to real-time data for specified symbols. Supports both stocks and crypto.
Data Types
bars: OHLCV minute bars (default, recommended for strategies) quotes: Bid/ask quotes (for spread-sensitive strategies) trades: Individual trades (highest frequency)
Data Feeds
iex: Free tier (limited data) sip: Premium (full market data, requires subscription)
Data Format
timestamp: datetime - Bar/quote/trade timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Additional metadata
Example
Stocks only¶
feed = AlpacaDataFeed( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', symbols=['AAPL', 'MSFT'], )
Mixed stocks and crypto¶
feed = AlpacaDataFeed( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', symbols=['AAPL', 'BTC/USD', 'ETH/USD'], )
await feed.start()
async for timestamp, data, context in feed: strategy.on_data(timestamp, data, context, broker)
Initialize Alpaca data feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
Alpaca API key |
required |
secret_key
|
str
|
Alpaca secret key |
required |
symbols
|
list[str]
|
List of symbols (e.g., ['AAPL', 'BTC/USD']) |
required |
data_type
|
str
|
Type of data - 'bars' (default), 'quotes', or 'trades' |
'bars'
|
feed
|
str
|
Data feed type - 'iex' (free) or 'sip' (premium) |
'iex'
|
Source code in src/ml4t/live/feeds/alpaca_feed.py
stats
property
¶
Get feed statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with keys: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
start
async
¶
Subscribe to market data for all symbols.
Creates streams and subscribes to real-time data.
Source code in src/ml4t/live/feeds/alpaca_feed.py
stop
¶
Stop data feed.
Closes all streams and signals consumer to exit.
Source code in src/ml4t/live/feeds/alpaca_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) where: |
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
Stops when
- stop() is called (None sentinel)
- Feed is not running
Source code in src/ml4t/live/feeds/alpaca_feed.py
__anext__
async
¶
Get next data item.
Returns:
| Type | Description |
|---|---|
tuple[datetime, dict[str, Any], dict[str, Any]]
|
Tuple of (timestamp, data, context) |
Raises:
| Type | Description |
|---|---|
StopAsyncIteration
|
When feed is stopped |
Source code in src/ml4t/live/feeds/alpaca_feed.py
DataBentoFeed
¶
Bases: DataFeedProtocol
Market data feed from DataBento.
Supports both historical replay and real-time streaming.
Historical Mode
- Reads from .dbn files (DataBento native format)
- Replays at historical speed or accelerated
- Perfect for strategy validation
Real-time Mode
- Streams live market data
- Supports multiple datasets (GLBX, XNAS, OPRA, etc.)
- Low-latency tick data
Data Format
timestamp: datetime - Event timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Additional fields (bid, ask, trade_count, etc.)
Example Historical
feed = DataBentoFeed.from_file( 'ES_202401.dbn', symbols=['ES.FUT'], replay_speed=10.0, # 10x speed )
Example Real-time
feed = DataBentoFeed.from_live( api_key=os.getenv('DATABENTO_API_KEY'), dataset='GLBX.MDP3', schema='ohlcv-1s', symbols=['ES.c.0', 'NQ.c.0'], )
Initialize DataBento feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
Historical | Live
|
DataBento client (Historical or Live) |
required |
symbols
|
list[str]
|
List of symbols to subscribe to |
required |
mode
|
str
|
'historical' or 'live' |
'historical'
|
replay_speed
|
float
|
Playback speed multiplier (historical only) 1.0 = real-time, 10.0 = 10x speed |
1.0
|
Source code in src/ml4t/live/feeds/databento_feed.py
from_file
classmethod
¶
Create feed from historical .dbn file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str | Path
|
Path to .dbn file |
required |
symbols
|
list[str]
|
Symbols to filter (or all if empty) |
required |
replay_speed
|
float
|
Playback speed (1.0 = real-time) |
1.0
|
Returns:
| Type | Description |
|---|---|
DataBentoFeed
|
DataBentoFeed configured for historical replay |
Source code in src/ml4t/live/feeds/databento_feed.py
from_live
classmethod
¶
Create feed for real-time streaming.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
DataBento API key |
required |
dataset
|
str
|
Dataset code (e.g., 'GLBX.MDP3', 'XNAS.ITCH') |
required |
schema
|
str
|
Data schema (e.g., 'ohlcv-1s', 'mbp-10', 'trades') |
required |
symbols
|
list[str]
|
Symbols to subscribe to |
required |
Returns:
| Type | Description |
|---|---|
DataBentoFeed
|
DataBentoFeed configured for live streaming |
Source code in src/ml4t/live/feeds/databento_feed.py
start
async
¶
Start data feed.
Historical mode: Begins replay task Live mode: Starts streaming subscription
Source code in src/ml4t/live/feeds/databento_feed.py
stop
¶
Stop data feed.
Source code in src/ml4t/live/feeds/databento_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) |
Source code in src/ml4t/live/feeds/databento_feed.py
CryptoFeed
¶
CryptoFeed(
exchange,
symbols,
*,
timeframe="1m",
stream_trades=False,
stream_ohlcv=True,
api_key=None,
api_secret=None,
api_passphrase=None,
)
Bases: DataFeedProtocol
Cryptocurrency market data feed via CCXT.
Provides unified interface to 100+ crypto exchanges. Supports both REST (polling) and WebSocket (streaming).
Data Format
timestamp: datetime - Candle/trade timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Exchange-specific metadata
Exchange Symbols
- Binance: 'BTC/USDT', 'ETH/USDT'
- Coinbase: 'BTC-USD', 'ETH-USD'
- Kraken: 'BTC/USD', 'ETH/USD'
Timeframes
'1m', '5m', '15m', '1h', '4h', '1d'
Example WebSocket (Real-time): feed = CryptoFeed( exchange='binance', symbols=['BTC/USDT', 'ETH/USDT'], stream_trades=True, # Stream trades (fastest) )
Example Authenticated
feed = CryptoFeed( exchange='binance', symbols=['BTC/USDT'], api_key='your-key', api_secret='your-secret', )
Initialize crypto feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exchange
|
str
|
Exchange ID (e.g., 'binance', 'coinbasepro', 'kraken') |
required |
symbols
|
list[str]
|
Trading pairs (e.g., ['BTC/USDT', 'ETH/USDT']) |
required |
timeframe
|
str
|
OHLCV timeframe ('1m', '5m', '1h', etc.) |
'1m'
|
stream_trades
|
bool
|
Stream trade ticks (faster updates) |
False
|
stream_ohlcv
|
bool
|
Stream OHLCV candles |
True
|
api_key
|
str | None
|
API key (for authenticated endpoints) |
None
|
api_secret
|
str | None
|
API secret |
None
|
api_passphrase
|
str | None
|
API passphrase (Coinbase only) |
None
|
Source code in src/ml4t/live/feeds/crypto_feed.py
start
async
¶
Start streaming market data.
Initiates WebSocket subscriptions for all symbols.
Source code in src/ml4t/live/feeds/crypto_feed.py
stop
¶
Stop streaming and close exchange connection.
Source code in src/ml4t/live/feeds/crypto_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) |
Source code in src/ml4t/live/feeds/crypto_feed.py
OKXFundingFeed
¶
Bases: DataFeedProtocol
OKX funding rate feed with OHLCV bars.
Combines price data with funding rate information for ML strategies that trade crypto perpetual futures based on funding rate signals.
Data Flow
- Poll /market/candles for latest OHLCV bar
- Poll /public/funding-rate for current funding rate
- Combine into (timestamp, data, context) tuple
- Emit to strategy
Symbol Format
OKX perpetual swaps use format: BTC-USDT-SWAP, ETH-USDT-SWAP
Initialize OKX funding rate feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
symbols
|
list[str]
|
List of perpetual swap symbols (e.g., ['BTC-USDT-SWAP']) |
required |
timeframe
|
str
|
OHLCV bar timeframe ('1m', '1H', '4H', '1D') |
'1H'
|
poll_interval_seconds
|
float
|
How often to poll for new data |
60.0
|
Source code in src/ml4t/live/feeds/okx_feed.py
start
async
¶
Start the OKX data feed.
Begins polling for OHLCV and funding rate data.
Source code in src/ml4t/live/feeds/okx_feed.py
stop
¶
Stop the data feed.
Source code in src/ml4t/live/feeds/okx_feed.py
close
async
¶
__aiter__
¶
__anext__
async
¶
Get next bar with funding data.
Returns:
| Type | Description |
|---|---|
tuple[datetime, dict[str, Any], dict[str, Any]]
|
(timestamp, data, context) tuple |
Raises:
| Type | Description |
|---|---|
StopAsyncIteration
|
When feed stops |
Source code in src/ml4t/live/feeds/okx_feed.py
BarAggregator
¶
Aggregates raw ticks or 5-second bars into minute bars.
Addresses Gemini's concerns: 1. "If IBDataFeed pushes a tick to Strategy.on_data, the strategy might trigger 60x more often than intended." - Buffer incoming data. 2. "The 15:59 bar is never emitted because no 16:00 tick arrives." - Background flush checker emits bars on timeout.
The aggregator buffers incoming data and emits when: - A bar boundary is crossed (new tick arrives in next minute) - OR timeout expires (2s past bar end with no new data)
Example
raw_feed = IBTickFeed(ib, assets=['AAPL']) aggregated_feed = BarAggregator(raw_feed, bar_size_minutes=1)
async for timestamp, data, context in aggregated_feed: # data contains completed minute bars only strategy.on_data(timestamp, data, context, broker)
Initialize BarAggregator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_feed
|
DataFeedProtocol
|
Raw tick or sub-minute bar feed |
required |
bar_size_minutes
|
int
|
Output bar size in minutes (default: 1) |
1
|
assets
|
list[str] | None
|
List of assets to track (default: all from source) |
None
|
flush_timeout_seconds
|
float
|
Seconds after bar end before forcing emit (default: 2.0) |
2.0
|
Source code in src/ml4t/live/feeds/aggregator.py
start
async
¶
stop
¶
Stop aggregation.
Source code in src/ml4t/live/feeds/aggregator.py
__aiter__
async
¶
Async iterator interface.
Uses None sentinel for shutdown (Gemini fix: avoids busy-wait with 1s timeout).
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) where data is {asset: ohlcv_dict} |
Source code in src/ml4t/live/feeds/aggregator.py
Safety Types¶
RiskLimitError
¶
Bases: Exception
Raised when an order violates risk limits.
RiskState
dataclass
¶
RiskState(
date,
daily_loss=0.0,
orders_placed=0,
high_water_mark=0.0,
session_start_equity=None,
persisted_positions=dict(),
persisted_pending_orders=list(),
kill_switch_activated=False,
kill_switch_reason="",
)
Persisted risk state - survives restarts.
This state is saved to disk after every order and on shutdown. Uses atomic JSON writes (write to .tmp then os.replace) to prevent corruption.
Example
state = RiskState(date="2023-10-15", daily_loss=1500.0) state.kill_switch_activated = True state.kill_switch_reason = "Max daily loss exceeded"
Note
The kill switch state persists across restarts and must be manually reset by deleting the state file or setting kill_switch_activated=False.
from_dict
classmethod
¶
Create RiskState from dictionary (for JSON loading).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict
|
Dictionary with RiskState fields |
required |
Returns:
| Type | Description |
|---|---|
RiskState
|
RiskState instance |
to_dict
¶
Convert to dictionary (for JSON saving).
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with all fields |
Source code in src/ml4t/live/safety.py
save_atomic
staticmethod
¶
Save state with atomic write (write to .tmp then os.replace).
This prevents corruption if process dies mid-write.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
RiskState
|
RiskState to save |
required |
filepath
|
str
|
Path to save to |
required |
Raises:
| Type | Description |
|---|---|
OSError
|
If write fails |
Source code in src/ml4t/live/safety.py
load
staticmethod
¶
Load state from file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
Path to load from |
required |
Returns:
| Type | Description |
|---|---|
RiskState | None
|
RiskState if file exists and is valid, None otherwise |
Source code in src/ml4t/live/safety.py
VirtualPortfolio
¶
Manages internal accounting for Shadow Mode (Paper Trading).
Addresses Gemini's Critical Issue A: "The Infinite Buy Loop"
Problem: In shadow mode, returning fake Order objects without updating position state causes strategies to keep buying forever because get_position() always returns None.
Solution: Track shadow positions locally. When shadow_mode=True: - submit_order() updates this virtual portfolio - positions/get_position() return from this portfolio - Strategy sees realistic position state
Handles: - New positions - Position increases (weighted avg cost basis) - Position decreases (partial close) - Position close (quantity = 0) - Position flip (long -> short or vice versa)
Example
portfolio = VirtualPortfolio(initial_cash=100_000.0)
Simulate buy order fill¶
order = Order( asset="AAPL", side=OrderSide.BUY, quantity=100, filled_price=150.0, filled_quantity=100, ... ) portfolio.process_fill(order)
Check position¶
pos = portfolio.positions.get("AAPL") assert pos.quantity == 100 assert pos.entry_price == 150.0
Simulate sell order fill (close)¶
sell_order = Order( asset="AAPL", side=OrderSide.SELL, quantity=100, filled_price=155.0, filled_quantity=100, ... ) portfolio.process_fill(sell_order) assert "AAPL" not in portfolio.positions
Initialize virtual portfolio.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_cash
|
float
|
Starting cash balance (default: 100,000) |
100000.0
|
Source code in src/ml4t/live/safety.py
positions
property
¶
Get current positions (returns copy for safety).
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbol to Position |
account_value
property
¶
Get total account value (cash + position market value).
Returns:
| Type | Description |
|---|---|
float
|
Total account value |
process_fill
¶
Update state based on filled shadow order.
Handles: - Weighted average cost basis for position increases - Position flipping (long -> short or vice versa) - Partial and full closes
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order
|
Order
|
Filled Order object (must have filled_quantity and filled_price) |
required |
Source code in src/ml4t/live/safety.py
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 | |
update_prices
¶
Update current prices for accurate account value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prices
|
dict[str, float]
|
Dictionary mapping asset symbol to current price |
required |