API Reference¶
Core Classes¶
LiveEngine
¶
Async live trading engine.
Bridges async infrastructure with sync Strategy.on_data().
Key Design Decisions: 1. Strategy runs in thread pool (via asyncio.to_thread) 2. ThreadSafeBrokerWrapper passed to strategy for sync broker calls 3. Graceful shutdown on SIGINT/SIGTERM 4. Configurable error handling
Lifecycle: 1. connect() - Connect to broker and data feed 2. run() - Main loop (blocks until shutdown) 3. stop() - Graceful shutdown
Example
engine = LiveEngine(strategy, broker, feed) await engine.connect()
try: await engine.run() except KeyboardInterrupt: await engine.stop()
Initialize LiveEngine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy
|
Strategy
|
Strategy instance to execute |
required |
broker
|
AsyncBrokerProtocol
|
Async broker implementation (IBBroker, AlpacaBroker, etc.) |
required |
feed
|
DataFeedProtocol
|
Data feed providing timestamp, data, context tuples |
required |
on_error
|
Callable[[Exception, datetime, dict], None] | None
|
Custom error handler callback. Signature: (error, timestamp, data) -> None |
None
|
halt_on_error
|
bool
|
If True, stop engine on strategy error. If False, log error and continue. |
False
|
Source code in src/ml4t/live/engine.py
stats
property
¶
Get engine statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with keys: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
connect
async
¶
Connect to broker and data feed.
Must be called before run().
Steps: 1. Connect broker (authenticate, subscribe to account updates) 2. Start data feed (subscribe to market data) 3. Create ThreadSafeBrokerWrapper for strategy use 4. Install signal handlers for graceful shutdown
Source code in src/ml4t/live/engine.py
run
async
¶
Main async loop - receives bars and dispatches to strategy.
Runs until: 1. stop() is called 2. Data feed ends 3. Unrecoverable error (if halt_on_error=True) 4. SIGINT/SIGTERM received
Strategy Execution: - Strategy.on_data() runs in thread pool (asyncio.to_thread) - Receives ThreadSafeBrokerWrapper, not raw broker - Broker calls block the worker thread (not event loop)
Error Handling: - Strategy exceptions caught and passed to on_error callback - If halt_on_error=True, engine stops - If halt_on_error=False, log and continue
Source code in src/ml4t/live/engine.py
stop
async
¶
Graceful shutdown.
Steps: 1. Set shutdown event (signals main loop to exit) 2. Stop data feed (no more bars) 3. Disconnect broker (cancel subscriptions, close connection)
Safe to call multiple times.
Source code in src/ml4t/live/engine.py
LiveRiskConfig
dataclass
¶
LiveRiskConfig(
max_position_value=50000.0,
max_position_shares=1000,
max_total_exposure=200000.0,
max_positions=20,
max_order_value=10000.0,
max_order_shares=500,
max_orders_per_minute=10,
max_daily_loss=5000.0,
max_drawdown_pct=0.05,
max_price_deviation_pct=0.05,
max_data_staleness_seconds=60.0,
dedup_window_seconds=1.0,
allowed_assets=set(),
blocked_assets=set(),
shadow_mode=False,
kill_switch_enabled=False,
state_file=".ml4t_risk_state.json",
)
Risk configuration for live trading.
Multiple layers of protection - all limits are optional. Set to inf/large values to disable specific checks.
Example
Conservative configuration¶
config = LiveRiskConfig( max_position_value=25_000.0, max_daily_loss=2_000.0, shadow_mode=True, # Always start with shadow mode! )
Disable specific checks¶
config = LiveRiskConfig( max_position_value=float('inf'), # No position limit max_daily_loss=10_000.0, # Only daily loss limit )
Safety Recommendations
- Always start with shadow_mode=True
- Graduate to paper trading
- Use small positions when going live
- Set conservative risk limits
__post_init__
¶
Validate configuration parameters.
Source code in src/ml4t/live/safety.py
SafeBroker
¶
Risk-controlled wrapper with state persistence.
Addresses Gemini v1: "If script crashes and restarts, SafeBroker resets max_daily_loss to 0. A losing strategy could burn through the limit again."
Addresses Gemini v2: - Critical Issue A: VirtualPortfolio for shadow mode - Memory leaks: _recent_orders pruned even if dedup disabled - Atomic JSON writes: write to .tmp then os.replace()
Safety Features: 1. Pre-trade validation against all risk limits 2. Order rate limiting 3. Drawdown monitoring with kill switch 4. Fat finger protection (price deviation check) 5. Stale data protection 6. Duplicate order filter 7. Shadow mode with VirtualPortfolio (realistic paper trading) 8. State persistence across restarts
Example
broker = IBBroker() await broker.connect()
safe = SafeBroker( broker=broker, config=LiveRiskConfig( max_position_value=25000, shadow_mode=True, # Test first! ) )
Use safe in strategy¶
engine = LiveEngine(strategy, safe, feed)
Initialize SafeBroker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
broker
|
AsyncBrokerProtocol
|
Async broker implementation (IBBroker, AlpacaBroker, etc.) |
required |
config
|
LiveRiskConfig
|
Risk configuration |
required |
Source code in src/ml4t/live/safety.py
positions
property
¶
Get current positions.
In shadow mode, returns virtual positions. In live mode, returns broker positions.
get_position
¶
Get position for specific asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object or None |
Source code in src/ml4t/live/safety.py
get_account_value_async
async
¶
Get total account value (async).
Returns:
| Type | Description |
|---|---|
float
|
Total account value in base currency |
Source code in src/ml4t/live/safety.py
get_cash_async
async
¶
Get available cash (async).
Returns:
| Type | Description |
|---|---|
float
|
Available cash in base currency |
cancel_order_async
async
¶
Cancel pending order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
ID of order to cancel |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancel request submitted |
Source code in src/ml4t/live/safety.py
close_position_async
async
¶
Close entire position.
Close positions bypass normal limits (safety feature).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol to close |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists |
Source code in src/ml4t/live/safety.py
submit_order_async
async
¶
submit_order_async(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order with full risk validation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
quantity
|
int
|
Number of shares/contracts |
required |
side
|
OrderSide | None
|
Order side (BUY/SELL), auto-detected from quantity if None |
None
|
order_type
|
OrderType
|
Type of order |
MARKET
|
limit_price
|
float | None
|
Limit price for LIMIT/STOP_LIMIT orders |
None
|
stop_price
|
float | None
|
Stop price for STOP/STOP_LIMIT orders |
None
|
**kwargs
|
Any
|
Additional broker-specific parameters |
{}
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object |
Raises:
| Type | Description |
|---|---|
RiskLimitError
|
If order violates any risk limit |
Source code in src/ml4t/live/safety.py
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 | |
enable_kill_switch
¶
Manually enable kill switch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reason
|
str
|
Reason for activation (default: "Manual") |
'Manual'
|
disable_kill_switch
¶
Manually disable kill switch (use with caution!).
Source code in src/ml4t/live/safety.py
close_all_positions
async
¶
Emergency close all positions.
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of close orders |
Source code in src/ml4t/live/safety.py
connect
async
¶
disconnect
async
¶
is_connected_async
async
¶
get_positions_async
async
¶
Get all positions (async).
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbol to Position |
Source code in src/ml4t/live/safety.py
get_pending_orders_async
async
¶
Get pending orders (async).
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending orders |
get_position_async
async
¶
Get position (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object or None |
Source code in src/ml4t/live/safety.py
ThreadSafeBrokerWrapper
¶
Wraps an async broker for use from sync strategy code.
This wrapper is passed to Strategy.on_data() instead of the raw broker. It bridges the sync/async boundary by scheduling coroutines on the main event loop and blocking the worker thread until they complete.
Thread Safety: - Strategy runs in worker thread (via asyncio.to_thread) - Broker methods run on main event loop - run_coroutine_threadsafe() handles the cross-thread communication
Timeouts (from design review): - Getters (get_cash, get_account_value): 5s - Order operations (submit, cancel, close): 30s
Example
LiveEngine creates this wrapper¶
loop = asyncio.get_running_loop() wrapped = ThreadSafeBrokerWrapper(ib_broker, loop)
Strategy uses it like a normal sync broker¶
order = wrapped.submit_order('AAPL', 100, OrderSide.BUY)
Note
This class implements BrokerProtocol but does not inherit from it. It provides a sync interface backed by async operations.
Initialize thread-safe wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_broker
|
AsyncBrokerProtocol
|
Async broker implementation (IBBroker, etc.) |
required |
loop
|
AbstractEventLoop
|
Main event loop (from asyncio.get_running_loop()) |
required |
Source code in src/ml4t/live/wrappers.py
positions
property
¶
Get current positions (thread-safe read).
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbol to Position |
pending_orders
property
¶
Get pending orders (thread-safe read).
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending Order objects |
is_connected
property
¶
Check if broker is connected.
Returns:
| Type | Description |
|---|---|
bool
|
True if connected and ready to trade |
get_position
¶
Get position for specific asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., "AAPL") |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object if holding position, None otherwise |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
get_account_value
¶
Get total account value (cash + positions).
Returns:
| Type | Description |
|---|---|
float
|
Total account value in base currency |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (5s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
get_cash
¶
Get available cash balance.
Returns:
| Type | Description |
|---|---|
float
|
Available cash in base currency |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (5s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
submit_order
¶
submit_order(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order for execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., "AAPL") |
required |
quantity
|
int
|
Number of shares/contracts (positive for buy, negative for sell) |
required |
side
|
OrderSide | None
|
Order side (BUY/SELL), auto-detected from quantity if None |
None
|
order_type
|
OrderType
|
Type of order (MARKET, LIMIT, STOP, etc.) |
MARKET
|
limit_price
|
float | None
|
Limit price for LIMIT/STOP_LIMIT orders |
None
|
stop_price
|
float | None
|
Stop price for STOP/STOP_LIMIT orders |
None
|
**kwargs
|
Any
|
Additional broker-specific parameters |
{}
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object with order_id and initial status |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (30s) |
ValueError
|
If order parameters are invalid |
RuntimeError
|
If broker is not connected or error occurs |
Source code in src/ml4t/live/wrappers.py
cancel_order
¶
Cancel pending order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
ID of order to cancel |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancel request submitted, False if order not found |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (30s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
close_position
¶
Close entire position in asset.
Convenience method that submits a closing order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol to close |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists, None if no position |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If operation times out (30s) |
RuntimeError
|
If broker error occurs |
Source code in src/ml4t/live/wrappers.py
Brokers¶
IBBroker
¶
Bases: AsyncBrokerProtocol
Interactive Brokers implementation.
Design: - All broker operations are async - Uses asyncio.Lock for thread safety - Event handlers use put_nowait() (non-blocking) - Reconnection handled externally
Connection Ports: - TWS Paper: 7497 - TWS Live: 7496 - Gateway Paper: 4002 - Gateway Live: 4001
Example
broker = IBBroker(port=7497) # Paper trading await broker.connect() positions = await broker.get_positions_async() await broker.disconnect()
Initialize IBBroker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
IB Gateway/TWS host (default: '127.0.0.1') |
'127.0.0.1'
|
port
|
int
|
IB Gateway/TWS port (default: 7497 for paper) |
7497
|
client_id
|
int
|
Unique client ID (default: 1) |
1
|
account
|
str | None
|
IB account ID (default: use first account) |
None
|
Source code in src/ml4t/live/brokers/ib.py
positions
property
¶
Thread-safe position access (Gemini v2 Critical Issue C).
Note: This is called from worker thread via ThreadSafeBrokerWrapper. The lock prevents RuntimeError during dict iteration if IB callback modifies positions concurrently.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
pending_orders
property
¶
Get list of pending orders.
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending Order objects |
connect
async
¶
Connect to IB Gateway/TWS.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If connection fails |
TimeoutError
|
If connection times out |
Source code in src/ml4t/live/brokers/ib.py
disconnect
async
¶
Disconnect from IB.
Source code in src/ml4t/live/brokers/ib.py
get_position
¶
Thread-safe single position access.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object if exists, None otherwise |
get_positions_async
async
¶
Async thread-safe position access with lock.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
Source code in src/ml4t/live/brokers/ib.py
get_account_value_async
async
¶
Get Net Liquidation Value.
Returns:
| Type | Description |
|---|---|
float
|
Account net liquidation value in USD |
Source code in src/ml4t/live/brokers/ib.py
get_cash_async
async
¶
Get available funds.
Returns:
| Type | Description |
|---|---|
float
|
Available funds in USD |
Source code in src/ml4t/live/brokers/ib.py
submit_order_async
async
¶
submit_order_async(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order to IB.
TASK-013: Full order submission implementation with IB order tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
quantity
|
float
|
Number of shares |
required |
side
|
OrderSide | None
|
BUY or SELL (auto-detected if None) |
None
|
order_type
|
OrderType
|
Market, limit, stop, or stop-limit |
MARKET
|
limit_price
|
float | None
|
Limit price for limit orders |
None
|
stop_price
|
float | None
|
Stop price for stop orders |
None
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If not connected |
ValueError
|
If order parameters are invalid |
Source code in src/ml4t/live/brokers/ib.py
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | |
cancel_order_async
async
¶
Cancel pending order.
TASK-016: Full order cancellation implementation.
This method finds the IB order ID from our tracking map and cancels the order via the IB API. Handles edge cases like order not found or order already filled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
Order ID to cancel (e.g., 'ML4T-1') |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancellation request sent successfully, False otherwise |
Note
The actual cancellation is confirmed via _on_order_status callback when IB sends the 'Cancelled' status update.
Source code in src/ml4t/live/brokers/ib.py
close_position_async
async
¶
Close position in asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists, None otherwise |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Depends on TASK-013 |
Source code in src/ml4t/live/brokers/ib.py
AlpacaBroker
¶
Bases: AsyncBrokerProtocol
Alpaca Markets broker implementation.
Design (matching IBBroker patterns): - All broker operations are async - Uses asyncio.Lock for thread safety - WebSocket stream for real-time order updates - REST API for account/position queries and order submission
Paper vs Live: - paper=True (default): Uses paper trading endpoint - paper=False: Uses live trading endpoint (USE WITH CAUTION)
Example
broker = AlpacaBroker( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', paper=True, # Always start with paper trading! ) await broker.connect() positions = await broker.get_positions_async() await broker.disconnect()
Initialize AlpacaBroker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
Alpaca API key (from https://app.alpaca.markets) |
required |
secret_key
|
str
|
Alpaca secret key |
required |
paper
|
bool
|
Use paper trading endpoint (default: True) |
True
|
Source code in src/ml4t/live/brokers/alpaca.py
positions
property
¶
Thread-safe position access (shallow copy).
Note: This is called from worker thread via ThreadSafeBrokerWrapper. The shallow copy prevents RuntimeError during dict iteration.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
pending_orders
property
¶
Get list of pending orders.
Returns:
| Type | Description |
|---|---|
list[Order]
|
List of pending Order objects |
connect
async
¶
Connect to Alpaca and sync initial state.
Steps: 1. Create TradingClient (REST) 2. Create TradingStream (WebSocket) 3. Verify connection by fetching account 4. Register trade update callback 5. Sync positions and open orders 6. Start WebSocket stream for order updates
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If connection fails |
Source code in src/ml4t/live/brokers/alpaca.py
disconnect
async
¶
Disconnect from Alpaca.
Source code in src/ml4t/live/brokers/alpaca.py
get_position
¶
Thread-safe single position access.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., 'AAPL' or 'BTC/USD') |
required |
Returns:
| Type | Description |
|---|---|
Position | None
|
Position object if exists, None otherwise |
Source code in src/ml4t/live/brokers/alpaca.py
get_positions_async
async
¶
Async thread-safe position access with lock.
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbols to Position objects |
Source code in src/ml4t/live/brokers/alpaca.py
get_account_value_async
async
¶
Get portfolio value (equity).
Returns:
| Type | Description |
|---|---|
float
|
Total account equity in USD |
Source code in src/ml4t/live/brokers/alpaca.py
get_cash_async
async
¶
Get available cash.
Returns:
| Type | Description |
|---|---|
float
|
Available cash in USD |
Source code in src/ml4t/live/brokers/alpaca.py
submit_order_async
async
¶
submit_order_async(
asset,
quantity,
side=None,
order_type=MARKET,
limit_price=None,
stop_price=None,
**kwargs,
)
Submit order to Alpaca.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol (e.g., 'AAPL' or 'BTC/USD') |
required |
quantity
|
float
|
Number of shares/units |
required |
side
|
OrderSide | None
|
BUY or SELL (auto-detected from quantity sign if None) |
None
|
order_type
|
OrderType
|
Market, limit, stop, or stop-limit |
MARKET
|
limit_price
|
float | None
|
Limit price for limit orders |
None
|
stop_price
|
float | None
|
Stop price for stop orders |
None
|
**kwargs
|
Any
|
Additional parameters (ignored) |
{}
|
Returns:
| Type | Description |
|---|---|
Order
|
Order object |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If not connected |
ValueError
|
If order parameters are invalid |
Source code in src/ml4t/live/brokers/alpaca.py
cancel_order_async
async
¶
Cancel pending order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
Order ID to cancel (e.g., 'ML4T-1') |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cancellation request sent successfully, False otherwise |
Source code in src/ml4t/live/brokers/alpaca.py
close_position_async
async
¶
Close position in asset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset
|
str
|
Asset symbol |
required |
Returns:
| Type | Description |
|---|---|
Order | None
|
Order object if position exists, None otherwise |
Source code in src/ml4t/live/brokers/alpaca.py
Data Feeds¶
IBDataFeed
¶
Bases: DataFeedProtocol
Real-time market data feed from Interactive Brokers.
Subscribes to tick-by-tick market data for specified symbols. Emits data as (timestamp, data, context) tuples.
Data Format
timestamp: datetime - Tick timestamp data: dict[str, dict] - {symbol: {'price': float, 'size': int}} context: dict - Additional metadata (bid, ask, etc.)
Note
- IB must be connected before creating feed
- Requires market data subscription for symbols
- Throttles rapid ticks to avoid overwhelming strategy
Example
ib = IB() await ib.connectAsync('127.0.0.1', 7497, clientId=1)
feed = IBDataFeed(ib, symbols=['SPY', 'QQQ', 'IWM']) await feed.start()
Use directly or wrap with BarAggregator¶
aggregator = BarAggregator(feed, bar_size_minutes=1)
Initialize IB data feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ib
|
IB
|
Connected IB instance |
required |
symbols
|
list[str]
|
List of symbols to subscribe to |
required |
exchange
|
str
|
IB exchange (default: SMART routing) |
'SMART'
|
currency
|
str
|
Currency (default: USD) |
'USD'
|
tick_throttle_ms
|
int
|
Minimum milliseconds between tick emissions (prevents overwhelming strategy with rapid ticks) |
100
|
Source code in src/ml4t/live/feeds/ib_feed.py
stats
property
¶
Get feed statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with keys: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
start
async
¶
Subscribe to market data for all symbols.
Creates contracts and subscribes to real-time tick data.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If IB not connected |
Source code in src/ml4t/live/feeds/ib_feed.py
stop
¶
Unsubscribe from market data.
Cancels all market data subscriptions and stops feed.
Source code in src/ml4t/live/feeds/ib_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) where: |
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
Stops when
- stop() is called (None sentinel)
- Feed is not running
Source code in src/ml4t/live/feeds/ib_feed.py
AlpacaDataFeed
¶
Bases: DataFeedProtocol
Real-time market data feed from Alpaca Markets.
Subscribes to real-time data for specified symbols. Supports both stocks and crypto.
Data Types
bars: OHLCV minute bars (default, recommended for strategies) quotes: Bid/ask quotes (for spread-sensitive strategies) trades: Individual trades (highest frequency)
Data Feeds
iex: Free tier (limited data) sip: Premium (full market data, requires subscription)
Data Format
timestamp: datetime - Bar/quote/trade timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Additional metadata
Example
Stocks only¶
feed = AlpacaDataFeed( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', symbols=['AAPL', 'MSFT'], )
Mixed stocks and crypto¶
feed = AlpacaDataFeed( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', symbols=['AAPL', 'BTC/USD', 'ETH/USD'], )
await feed.start()
async for timestamp, data, context in feed: strategy.on_data(timestamp, data, context, broker)
Initialize Alpaca data feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
Alpaca API key |
required |
secret_key
|
str
|
Alpaca secret key |
required |
symbols
|
list[str]
|
List of symbols (e.g., ['AAPL', 'BTC/USD']) |
required |
data_type
|
str
|
Type of data - 'bars' (default), 'quotes', or 'trades' |
'bars'
|
feed
|
str
|
Data feed type - 'iex' (free) or 'sip' (premium) |
'iex'
|
Source code in src/ml4t/live/feeds/alpaca_feed.py
stats
property
¶
Get feed statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with keys: |
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
dict[str, Any]
|
|
start
async
¶
Subscribe to market data for all symbols.
Creates streams and subscribes to real-time data.
Source code in src/ml4t/live/feeds/alpaca_feed.py
stop
¶
Stop data feed.
Closes all streams and signals consumer to exit.
Source code in src/ml4t/live/feeds/alpaca_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) where: |
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
AsyncIterator[tuple[datetime, dict, dict]]
|
|
Stops when
- stop() is called (None sentinel)
- Feed is not running
Source code in src/ml4t/live/feeds/alpaca_feed.py
__anext__
async
¶
Get next data item.
Returns:
| Type | Description |
|---|---|
tuple[datetime, dict[str, Any], dict[str, Any]]
|
Tuple of (timestamp, data, context) |
Raises:
| Type | Description |
|---|---|
StopAsyncIteration
|
When feed is stopped |
Source code in src/ml4t/live/feeds/alpaca_feed.py
DataBentoFeed
¶
Bases: DataFeedProtocol
Market data feed from DataBento.
Supports both historical replay and real-time streaming.
Historical Mode
- Reads from .dbn files (DataBento native format)
- Replays at historical speed or accelerated
- Perfect for strategy validation
Real-time Mode
- Streams live market data
- Supports multiple datasets (GLBX, XNAS, OPRA, etc.)
- Low-latency tick data
Data Format
timestamp: datetime - Event timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Additional fields (bid, ask, trade_count, etc.)
Example Historical
feed = DataBentoFeed.from_file( 'ES_202401.dbn', symbols=['ES.FUT'], replay_speed=10.0, # 10x speed )
Example Real-time
feed = DataBentoFeed.from_live( api_key=os.getenv('DATABENTO_API_KEY'), dataset='GLBX.MDP3', schema='ohlcv-1s', symbols=['ES.c.0', 'NQ.c.0'], )
Initialize DataBento feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
Historical | Live
|
DataBento client (Historical or Live) |
required |
symbols
|
list[str]
|
List of symbols to subscribe to |
required |
mode
|
str
|
'historical' or 'live' |
'historical'
|
replay_speed
|
float
|
Playback speed multiplier (historical only) 1.0 = real-time, 10.0 = 10x speed |
1.0
|
Source code in src/ml4t/live/feeds/databento_feed.py
from_file
classmethod
¶
Create feed from historical .dbn file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str | Path
|
Path to .dbn file |
required |
symbols
|
list[str]
|
Symbols to filter (or all if empty) |
required |
replay_speed
|
float
|
Playback speed (1.0 = real-time) |
1.0
|
Returns:
| Type | Description |
|---|---|
DataBentoFeed
|
DataBentoFeed configured for historical replay |
Source code in src/ml4t/live/feeds/databento_feed.py
from_live
classmethod
¶
Create feed for real-time streaming.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
DataBento API key |
required |
dataset
|
str
|
Dataset code (e.g., 'GLBX.MDP3', 'XNAS.ITCH') |
required |
schema
|
str
|
Data schema (e.g., 'ohlcv-1s', 'mbp-10', 'trades') |
required |
symbols
|
list[str]
|
Symbols to subscribe to |
required |
Returns:
| Type | Description |
|---|---|
DataBentoFeed
|
DataBentoFeed configured for live streaming |
Source code in src/ml4t/live/feeds/databento_feed.py
start
async
¶
Start data feed.
Historical mode: Begins replay task Live mode: Starts streaming subscription
Source code in src/ml4t/live/feeds/databento_feed.py
stop
¶
Stop data feed.
Source code in src/ml4t/live/feeds/databento_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) |
Source code in src/ml4t/live/feeds/databento_feed.py
CryptoFeed
¶
CryptoFeed(
exchange,
symbols,
*,
timeframe="1m",
stream_trades=False,
stream_ohlcv=True,
api_key=None,
api_secret=None,
api_passphrase=None,
)
Bases: DataFeedProtocol
Cryptocurrency market data feed via CCXT.
Provides unified interface to 100+ crypto exchanges. Supports both REST (polling) and WebSocket (streaming).
Data Format
timestamp: datetime - Candle/trade timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Exchange-specific metadata
Exchange Symbols
- Binance: 'BTC/USDT', 'ETH/USDT'
- Coinbase: 'BTC-USD', 'ETH-USD'
- Kraken: 'BTC/USD', 'ETH/USD'
Timeframes
'1m', '5m', '15m', '1h', '4h', '1d'
Example WebSocket (Real-time): feed = CryptoFeed( exchange='binance', symbols=['BTC/USDT', 'ETH/USDT'], stream_trades=True, # Stream trades (fastest) )
Example Authenticated
feed = CryptoFeed( exchange='binance', symbols=['BTC/USDT'], api_key='your-key', api_secret='your-secret', )
Initialize crypto feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exchange
|
str
|
Exchange ID (e.g., 'binance', 'coinbasepro', 'kraken') |
required |
symbols
|
list[str]
|
Trading pairs (e.g., ['BTC/USDT', 'ETH/USDT']) |
required |
timeframe
|
str
|
OHLCV timeframe ('1m', '5m', '1h', etc.) |
'1m'
|
stream_trades
|
bool
|
Stream trade ticks (faster updates) |
False
|
stream_ohlcv
|
bool
|
Stream OHLCV candles |
True
|
api_key
|
str | None
|
API key (for authenticated endpoints) |
None
|
api_secret
|
str | None
|
API secret |
None
|
api_passphrase
|
str | None
|
API passphrase (Coinbase only) |
None
|
Source code in src/ml4t/live/feeds/crypto_feed.py
start
async
¶
Start streaming market data.
Initiates WebSocket subscriptions for all symbols.
Source code in src/ml4t/live/feeds/crypto_feed.py
stop
¶
Stop streaming and close exchange connection.
Source code in src/ml4t/live/feeds/crypto_feed.py
__aiter__
async
¶
Async iterator yielding market data.
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) |
Source code in src/ml4t/live/feeds/crypto_feed.py
OKXFundingFeed
¶
Bases: DataFeedProtocol
OKX funding rate feed with OHLCV bars.
Combines price data with funding rate information for ML strategies that trade crypto perpetual futures based on funding rate signals.
Data Flow
- Poll /market/candles for latest OHLCV bar
- Poll /public/funding-rate for current funding rate
- Combine into (timestamp, data, context) tuple
- Emit to strategy
Symbol Format
OKX perpetual swaps use format: BTC-USDT-SWAP, ETH-USDT-SWAP
Initialize OKX funding rate feed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
symbols
|
list[str]
|
List of perpetual swap symbols (e.g., ['BTC-USDT-SWAP']) |
required |
timeframe
|
str
|
OHLCV bar timeframe ('1m', '1H', '4H', '1D') |
'1H'
|
poll_interval_seconds
|
float
|
How often to poll for new data |
60.0
|
Source code in src/ml4t/live/feeds/okx_feed.py
start
async
¶
Start the OKX data feed.
Begins polling for OHLCV and funding rate data.
Source code in src/ml4t/live/feeds/okx_feed.py
stop
¶
Stop the data feed.
Source code in src/ml4t/live/feeds/okx_feed.py
close
async
¶
__aiter__
¶
__anext__
async
¶
Get next bar with funding data.
Returns:
| Type | Description |
|---|---|
tuple[datetime, dict[str, Any], dict[str, Any]]
|
(timestamp, data, context) tuple |
Raises:
| Type | Description |
|---|---|
StopAsyncIteration
|
When feed stops |
Source code in src/ml4t/live/feeds/okx_feed.py
BarAggregator
¶
Aggregates raw ticks or 5-second bars into minute bars.
Addresses Gemini's concerns: 1. "If IBDataFeed pushes a tick to Strategy.on_data, the strategy might trigger 60x more often than intended." - Buffer incoming data. 2. "The 15:59 bar is never emitted because no 16:00 tick arrives." - Background flush checker emits bars on timeout.
The aggregator buffers incoming data and emits when: - A bar boundary is crossed (new tick arrives in next minute) - OR timeout expires (2s past bar end with no new data)
Example
raw_feed = IBTickFeed(ib, assets=['AAPL']) aggregated_feed = BarAggregator(raw_feed, bar_size_minutes=1)
async for timestamp, data, context in aggregated_feed: # data contains completed minute bars only strategy.on_data(timestamp, data, context, broker)
Initialize BarAggregator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_feed
|
DataFeedProtocol
|
Raw tick or sub-minute bar feed |
required |
bar_size_minutes
|
int
|
Output bar size in minutes (default: 1) |
1
|
assets
|
list[str] | None
|
List of assets to track (default: all from source) |
None
|
flush_timeout_seconds
|
float
|
Seconds after bar end before forcing emit (default: 2.0) |
2.0
|
Source code in src/ml4t/live/feeds/aggregator.py
start
async
¶
stop
¶
Stop aggregation.
Source code in src/ml4t/live/feeds/aggregator.py
__aiter__
async
¶
Async iterator interface.
Uses None sentinel for shutdown (Gemini fix: avoids busy-wait with 1s timeout).
Yields:
| Type | Description |
|---|---|
AsyncIterator[tuple[datetime, dict, dict]]
|
Tuple of (timestamp, data, context) where data is {asset: ohlcv_dict} |
Source code in src/ml4t/live/feeds/aggregator.py
Safety Types¶
RiskLimitError
¶
Bases: Exception
Raised when an order violates risk limits.
RiskState
dataclass
¶
RiskState(
date,
daily_loss=0.0,
orders_placed=0,
high_water_mark=0.0,
kill_switch_activated=False,
kill_switch_reason="",
)
Persisted risk state - survives restarts.
This state is saved to disk after every order and on shutdown. Uses atomic JSON writes (write to .tmp then os.replace) to prevent corruption.
Example
state = RiskState(date="2023-10-15", daily_loss=1500.0) state.kill_switch_activated = True state.kill_switch_reason = "Max daily loss exceeded"
Note
The kill switch state persists across restarts and must be manually reset by deleting the state file or setting kill_switch_activated=False.
from_dict
classmethod
¶
Create RiskState from dictionary (for JSON loading).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict
|
Dictionary with RiskState fields |
required |
Returns:
| Type | Description |
|---|---|
RiskState
|
RiskState instance |
to_dict
¶
Convert to dictionary (for JSON saving).
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with all fields |
Source code in src/ml4t/live/safety.py
save_atomic
staticmethod
¶
Save state with atomic write (write to .tmp then os.replace).
This prevents corruption if process dies mid-write.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
RiskState
|
RiskState to save |
required |
filepath
|
str
|
Path to save to |
required |
Raises:
| Type | Description |
|---|---|
OSError
|
If write fails |
Source code in src/ml4t/live/safety.py
load
staticmethod
¶
Load state from file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
Path to load from |
required |
Returns:
| Type | Description |
|---|---|
RiskState | None
|
RiskState if file exists and is valid, None otherwise |
Source code in src/ml4t/live/safety.py
VirtualPortfolio
¶
Manages internal accounting for Shadow Mode (Paper Trading).
Addresses Gemini's Critical Issue A: "The Infinite Buy Loop"
Problem: In shadow mode, returning fake Order objects without updating position state causes strategies to keep buying forever because get_position() always returns None.
Solution: Track shadow positions locally. When shadow_mode=True: - submit_order() updates this virtual portfolio - positions/get_position() return from this portfolio - Strategy sees realistic position state
Handles: - New positions - Position increases (weighted avg cost basis) - Position decreases (partial close) - Position close (quantity = 0) - Position flip (long -> short or vice versa)
Example
portfolio = VirtualPortfolio(initial_cash=100_000.0)
Simulate buy order fill¶
order = Order( asset="AAPL", side=OrderSide.BUY, quantity=100, filled_price=150.0, filled_quantity=100, ... ) portfolio.process_fill(order)
Check position¶
pos = portfolio.positions.get("AAPL") assert pos.quantity == 100 assert pos.entry_price == 150.0
Simulate sell order fill (close)¶
sell_order = Order( asset="AAPL", side=OrderSide.SELL, quantity=100, filled_price=155.0, filled_quantity=100, ... ) portfolio.process_fill(sell_order) assert "AAPL" not in portfolio.positions
Initialize virtual portfolio.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_cash
|
float
|
Starting cash balance (default: 100,000) |
100000.0
|
Source code in src/ml4t/live/safety.py
positions
property
¶
Get current positions (returns copy for safety).
Returns:
| Type | Description |
|---|---|
dict[str, Position]
|
Dictionary mapping asset symbol to Position |
account_value
property
¶
Get total account value (cash + position market value).
Returns:
| Type | Description |
|---|---|
float
|
Total account value |
process_fill
¶
Update state based on filled shadow order.
Handles: - Weighted average cost basis for position increases - Position flipping (long -> short or vice versa) - Partial and full closes
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order
|
Order
|
Filled Order object (must have filled_quantity and filled_price) |
required |
Source code in src/ml4t/live/safety.py
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 | |
update_prices
¶
Update current prices for accurate account value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prices
|
dict[str, float]
|
Dictionary mapping asset symbol to current price |
required |