Home / Libraries / ML4T Live / Docs
ML4T Live
ML4T Live Documentation
Production trading with broker integrations
Skip to content

API Reference

Core Classes

LiveEngine

LiveEngine(
    strategy,
    broker,
    feed,
    *,
    on_error=None,
    halt_on_error=False,
)

Async live trading engine.

Bridges async infrastructure with sync Strategy.on_data().

Key Design Decisions: 1. Strategy runs in thread pool (via asyncio.to_thread) 2. ThreadSafeBrokerWrapper passed to strategy for sync broker calls 3. Graceful shutdown on SIGINT/SIGTERM 4. Configurable error handling

Lifecycle: 1. connect() - Connect to broker and data feed 2. run() - Main loop (blocks until shutdown) 3. stop() - Graceful shutdown

Example

engine = LiveEngine(strategy, broker, feed) await engine.connect()

try: await engine.run() except KeyboardInterrupt: await engine.stop()

Initialize LiveEngine.

Parameters:

Name Type Description Default
strategy Strategy

Strategy instance to execute

required
broker AsyncBrokerProtocol

Async broker implementation (IBBroker, AlpacaBroker, etc.)

required
feed DataFeedProtocol

Data feed providing timestamp, data, context tuples

required
on_error Callable[[Exception, datetime, dict], None] | None

Custom error handler callback. Signature: (error, timestamp, data) -> None

None
halt_on_error bool

If True, stop engine on strategy error. If False, log error and continue.

False
Source code in src/ml4t/live/engine.py
def __init__(
    self,
    strategy: Strategy,
    broker: AsyncBrokerProtocol,
    feed: DataFeedProtocol,
    *,
    on_error: Callable[[Exception, datetime, dict], None] | None = None,
    halt_on_error: bool = False,
):
    """Initialize LiveEngine.

    Args:
        strategy: Strategy instance to execute
        broker: Async broker implementation (IBBroker, AlpacaBroker, etc.)
        feed: Data feed providing timestamp, data, context tuples
        on_error: Custom error handler callback. Signature:
            (error, timestamp, data) -> None
        halt_on_error: If True, stop engine on strategy error. If False,
            log error and continue.
    """
    self.strategy = strategy
    self.broker = broker
    self.feed = feed
    self.on_error = on_error or self._default_error_handler
    self.halt_on_error = halt_on_error

    # State
    self._running = False
    self._shutdown_event = asyncio.Event()
    self._loop: asyncio.AbstractEventLoop | None = None
    self._wrapped_broker: ThreadSafeBrokerWrapper | None = None

    # Statistics
    self._bar_count = 0
    self._error_count = 0
    self._last_bar_time: datetime | None = None

stats property

stats

Get engine statistics.

Returns:

Type Description
dict[str, Any]

Dict with keys:

dict[str, Any]
  • running: bool - Is engine running?
dict[str, Any]
  • bar_count: int - Total bars processed
dict[str, Any]
  • error_count: int - Total strategy errors
dict[str, Any]
  • last_bar_time: datetime | None - Timestamp of last bar

connect async

connect()

Connect to broker and data feed.

Must be called before run().

Steps: 1. Connect broker (authenticate, subscribe to account updates) 2. Start data feed (subscribe to market data) 3. Create ThreadSafeBrokerWrapper for strategy use 4. Install signal handlers for graceful shutdown

Source code in src/ml4t/live/engine.py
async def connect(self) -> None:
    """Connect to broker and data feed.

    Must be called before run().

    Steps:
    1. Connect broker (authenticate, subscribe to account updates)
    2. Start data feed (subscribe to market data)
    3. Create ThreadSafeBrokerWrapper for strategy use
    4. Install signal handlers for graceful shutdown
    """
    logger.info("LiveEngine: Connecting...")

    # Connect broker
    await self.broker.connect()

    # Start data feed
    await self.feed.start()

    # Create thread-safe wrapper
    self._loop = asyncio.get_running_loop()
    self._wrapped_broker = ThreadSafeBrokerWrapper(self.broker, self._loop)

    # Install signal handlers
    self._install_signal_handlers()

    logger.info("LiveEngine: Connected and ready")

run async

run()

Main async loop - receives bars and dispatches to strategy.

Runs until: 1. stop() is called 2. Data feed ends 3. Unrecoverable error (if halt_on_error=True) 4. SIGINT/SIGTERM received

Strategy Execution: - Strategy.on_data() runs in thread pool (asyncio.to_thread) - Receives ThreadSafeBrokerWrapper, not raw broker - Broker calls block the worker thread (not event loop)

Error Handling: - Strategy exceptions caught and passed to on_error callback - If halt_on_error=True, engine stops - If halt_on_error=False, log and continue

Source code in src/ml4t/live/engine.py
async def run(self) -> None:
    """Main async loop - receives bars and dispatches to strategy.

    Runs until:
    1. stop() is called
    2. Data feed ends
    3. Unrecoverable error (if halt_on_error=True)
    4. SIGINT/SIGTERM received

    Strategy Execution:
    - Strategy.on_data() runs in thread pool (asyncio.to_thread)
    - Receives ThreadSafeBrokerWrapper, not raw broker
    - Broker calls block the worker thread (not event loop)

    Error Handling:
    - Strategy exceptions caught and passed to on_error callback
    - If halt_on_error=True, engine stops
    - If halt_on_error=False, log and continue
    """
    if self._wrapped_broker is None:
        raise RuntimeError("Call connect() before run()")

    self._running = True
    logger.info("LiveEngine: Starting main loop")

    # Strategy lifecycle callback
    self.strategy.on_start(self._wrapped_broker)

    try:
        async for timestamp, data, context in self.feed:
            # Check for shutdown
            if self._shutdown_event.is_set():
                logger.info("LiveEngine: Shutdown requested")
                break

            self._bar_count += 1
            self._last_bar_time = timestamp

            try:
                # Run strategy in thread pool to avoid blocking event loop
                await asyncio.to_thread(
                    self.strategy.on_data,
                    timestamp,
                    data,
                    context,
                    self._wrapped_broker,
                )
            except Exception as e:
                self._error_count += 1
                self.on_error(e, timestamp, data)

                if self.halt_on_error:
                    logger.error("LiveEngine: Halting due to strategy error")
                    break

    except asyncio.CancelledError:
        logger.info("LiveEngine: Cancelled")
    finally:
        self._running = False
        self.strategy.on_end(self._wrapped_broker)
        logger.info(
            f"LiveEngine: Stopped. Bars: {self._bar_count}, Errors: {self._error_count}"
        )

stop async

stop()

Graceful shutdown.

Steps: 1. Set shutdown event (signals main loop to exit) 2. Stop data feed (no more bars) 3. Disconnect broker (cancel subscriptions, close connection)

Safe to call multiple times.

Source code in src/ml4t/live/engine.py
async def stop(self) -> None:
    """Graceful shutdown.

    Steps:
    1. Set shutdown event (signals main loop to exit)
    2. Stop data feed (no more bars)
    3. Disconnect broker (cancel subscriptions, close connection)

    Safe to call multiple times.
    """
    logger.info("LiveEngine: Stopping...")
    self._shutdown_event.set()

    # Stop data feed
    self.feed.stop()

    # Disconnect broker
    await self.broker.disconnect()

    logger.info("LiveEngine: Stopped")

LiveRiskConfig dataclass

LiveRiskConfig(
    max_position_value=50000.0,
    max_position_shares=1000,
    max_total_exposure=200000.0,
    max_positions=20,
    max_order_value=10000.0,
    max_order_shares=500,
    max_orders_per_minute=10,
    max_daily_loss=5000.0,
    max_drawdown_pct=0.05,
    max_price_deviation_pct=0.05,
    max_data_staleness_seconds=60.0,
    dedup_window_seconds=1.0,
    allowed_assets=set(),
    blocked_assets=set(),
    shadow_mode=False,
    kill_switch_enabled=False,
    state_file=".ml4t_risk_state.json",
)

Risk configuration for live trading.

Multiple layers of protection - all limits are optional. Set to inf/large values to disable specific checks.

Example

Conservative configuration

config = LiveRiskConfig( max_position_value=25_000.0, max_daily_loss=2_000.0, shadow_mode=True, # Always start with shadow mode! )

Disable specific checks

config = LiveRiskConfig( max_position_value=float('inf'), # No position limit max_daily_loss=10_000.0, # Only daily loss limit )

Safety Recommendations
  1. Always start with shadow_mode=True
  2. Graduate to paper trading
  3. Use small positions when going live
  4. Set conservative risk limits

__post_init__

__post_init__()

Validate configuration parameters.

Source code in src/ml4t/live/safety.py
def __post_init__(self):
    """Validate configuration parameters."""
    # Validate position limits
    if self.max_position_value <= 0:
        raise ValueError(f"max_position_value must be positive, got {self.max_position_value}")

    if self.max_position_shares <= 0:
        raise ValueError(
            f"max_position_shares must be positive, got {self.max_position_shares}"
        )

    if self.max_total_exposure <= 0:
        raise ValueError(f"max_total_exposure must be positive, got {self.max_total_exposure}")

    if self.max_positions <= 0:
        raise ValueError(f"max_positions must be positive, got {self.max_positions}")

    # Validate order limits
    if self.max_order_value <= 0:
        raise ValueError(f"max_order_value must be positive, got {self.max_order_value}")

    if self.max_order_shares <= 0:
        raise ValueError(f"max_order_shares must be positive, got {self.max_order_shares}")

    if self.max_orders_per_minute <= 0:
        raise ValueError(
            f"max_orders_per_minute must be positive, got {self.max_orders_per_minute}"
        )

    # Validate loss limits
    if self.max_daily_loss <= 0:
        raise ValueError(f"max_daily_loss must be positive, got {self.max_daily_loss}")

    if not 0 < self.max_drawdown_pct <= 1:
        raise ValueError(
            f"max_drawdown_pct must be between 0 and 1, got {self.max_drawdown_pct}"
        )

    # Validate price protection
    if not 0 < self.max_price_deviation_pct <= 1:
        raise ValueError(
            f"max_price_deviation_pct must be between 0 and 1, "
            f"got {self.max_price_deviation_pct}"
        )

    if self.max_data_staleness_seconds <= 0:
        raise ValueError(
            f"max_data_staleness_seconds must be positive, "
            f"got {self.max_data_staleness_seconds}"
        )

    if self.dedup_window_seconds < 0:
        raise ValueError(
            f"dedup_window_seconds must be non-negative, got {self.dedup_window_seconds}"
        )

    # Validate asset restrictions
    if self.allowed_assets and self.blocked_assets:
        overlap = self.allowed_assets & self.blocked_assets
        if overlap:
            raise ValueError(f"Assets cannot be in both allowed and blocked lists: {overlap}")

    # Validate state file path
    if not self.state_file:
        raise ValueError("state_file cannot be empty")

SafeBroker

SafeBroker(broker, config)

Risk-controlled wrapper with state persistence.

Addresses Gemini v1: "If script crashes and restarts, SafeBroker resets max_daily_loss to 0. A losing strategy could burn through the limit again."

Addresses Gemini v2: - Critical Issue A: VirtualPortfolio for shadow mode - Memory leaks: _recent_orders pruned even if dedup disabled - Atomic JSON writes: write to .tmp then os.replace()

Safety Features: 1. Pre-trade validation against all risk limits 2. Order rate limiting 3. Drawdown monitoring with kill switch 4. Fat finger protection (price deviation check) 5. Stale data protection 6. Duplicate order filter 7. Shadow mode with VirtualPortfolio (realistic paper trading) 8. State persistence across restarts

Example

broker = IBBroker() await broker.connect()

safe = SafeBroker( broker=broker, config=LiveRiskConfig( max_position_value=25000, shadow_mode=True, # Test first! ) )

Use safe in strategy

engine = LiveEngine(strategy, safe, feed)

Initialize SafeBroker.

Parameters:

Name Type Description Default
broker AsyncBrokerProtocol

Async broker implementation (IBBroker, AlpacaBroker, etc.)

required
config LiveRiskConfig

Risk configuration

required
Source code in src/ml4t/live/safety.py
def __init__(self, broker: AsyncBrokerProtocol, config: LiveRiskConfig):
    """Initialize SafeBroker.

    Args:
        broker: Async broker implementation (IBBroker, AlpacaBroker, etc.)
        config: Risk configuration
    """
    self._broker = broker
    self.config = config

    # Load or initialize state
    self._state = self._load_state()

    # Rate limiting
    self._order_timestamps: list[float] = []

    # Duplicate detection
    self._recent_orders: list[tuple[float, str, float]] = []  # (time, asset, qty)

    # NEW: VirtualPortfolio for shadow mode (Gemini v2 fix)
    self._virtual_portfolio = VirtualPortfolio(initial_cash=100_000.0)

    # Initialize high water mark if not set
    if self._state.high_water_mark == 0.0:
        try:
            # Note: This is sync access to async method - we'll fix this
            # by making initialization async if needed
            pass
        except Exception:
            pass

    logger.info(f"SafeBroker initialized. Shadow mode: {config.shadow_mode}")
    if self._state.kill_switch_activated:
        logger.warning(
            f"Kill switch was previously activated: {self._state.kill_switch_reason}"
        )

positions property

positions

Get current positions.

In shadow mode, returns virtual positions. In live mode, returns broker positions.

pending_orders property

pending_orders

Get pending orders.

is_connected property

is_connected

Check if broker is connected.

get_position

get_position(asset)

Get position for specific asset.

Parameters:

Name Type Description Default
asset str

Asset symbol

required

Returns:

Type Description
Position | None

Position object or None

Source code in src/ml4t/live/safety.py
def get_position(self, asset: str) -> Position | None:
    """Get position for specific asset.

    Args:
        asset: Asset symbol

    Returns:
        Position object or None
    """
    if self.config.shadow_mode:
        return self._virtual_portfolio.positions.get(asset)
    return self._broker.get_position(asset)  # type: ignore[attr-defined]

get_account_value_async async

get_account_value_async()

Get total account value (async).

Returns:

Type Description
float

Total account value in base currency

Source code in src/ml4t/live/safety.py
async def get_account_value_async(self) -> float:
    """Get total account value (async).

    Returns:
        Total account value in base currency
    """
    if self.config.shadow_mode:
        return self._virtual_portfolio.account_value
    return await self._broker.get_account_value_async()

get_cash_async async

get_cash_async()

Get available cash (async).

Returns:

Type Description
float

Available cash in base currency

Source code in src/ml4t/live/safety.py
async def get_cash_async(self) -> float:
    """Get available cash (async).

    Returns:
        Available cash in base currency
    """
    if self.config.shadow_mode:
        return self._virtual_portfolio.cash
    return await self._broker.get_cash_async()

cancel_order_async async

cancel_order_async(order_id)

Cancel pending order.

Parameters:

Name Type Description Default
order_id str

ID of order to cancel

required

Returns:

Type Description
bool

True if cancel request submitted

Source code in src/ml4t/live/safety.py
async def cancel_order_async(self, order_id: str) -> bool:
    """Cancel pending order.

    Args:
        order_id: ID of order to cancel

    Returns:
        True if cancel request submitted
    """
    return await self._broker.cancel_order_async(order_id)

close_position_async async

close_position_async(asset)

Close entire position.

Close positions bypass normal limits (safety feature).

Parameters:

Name Type Description Default
asset str

Asset symbol to close

required

Returns:

Type Description
Order | None

Order object if position exists

Source code in src/ml4t/live/safety.py
async def close_position_async(self, asset: str) -> Order | None:
    """Close entire position.

    Close positions bypass normal limits (safety feature).

    Args:
        asset: Asset symbol to close

    Returns:
        Order object if position exists
    """
    # Close positions bypass normal limits (safety feature)
    if self.config.shadow_mode:
        logger.info(f"SHADOW: Would close position in {asset}")
        return None
    return await self._broker.close_position_async(asset)

submit_order_async async

submit_order_async(
    asset,
    quantity,
    side=None,
    order_type=MARKET,
    limit_price=None,
    stop_price=None,
    **kwargs,
)

Submit order with full risk validation.

Parameters:

Name Type Description Default
asset str

Asset symbol

required
quantity int

Number of shares/contracts

required
side OrderSide | None

Order side (BUY/SELL), auto-detected from quantity if None

None
order_type OrderType

Type of order

MARKET
limit_price float | None

Limit price for LIMIT/STOP_LIMIT orders

None
stop_price float | None

Stop price for STOP/STOP_LIMIT orders

None
**kwargs Any

Additional broker-specific parameters

{}

Returns:

Type Description
Order

Order object

Raises:

Type Description
RiskLimitError

If order violates any risk limit

Source code in src/ml4t/live/safety.py
async def submit_order_async(
    self,
    asset: str,
    quantity: int,
    side: OrderSide | None = None,
    order_type: OrderType = OrderType.MARKET,
    limit_price: float | None = None,
    stop_price: float | None = None,
    **kwargs: Any,
) -> Order:
    """Submit order with full risk validation.

    Args:
        asset: Asset symbol
        quantity: Number of shares/contracts
        side: Order side (BUY/SELL), auto-detected from quantity if None
        order_type: Type of order
        limit_price: Limit price for LIMIT/STOP_LIMIT orders
        stop_price: Stop price for STOP/STOP_LIMIT orders
        **kwargs: Additional broker-specific parameters

    Returns:
        Order object

    Raises:
        RiskLimitError: If order violates any risk limit
    """
    # Infer side
    if side is None:
        side = OrderSide.BUY if quantity > 0 else OrderSide.SELL
        quantity = abs(quantity)

    # === Risk Checks ===

    # 1. Kill switch
    if self.config.kill_switch_enabled or self._state.kill_switch_activated:
        raise RiskLimitError(
            f"Kill switch active: {self._state.kill_switch_reason or 'Manual activation'}"
        )

    # 2. Asset check
    self._check_asset(asset)

    # 3. Duplicate check
    self._check_duplicate(asset, float(quantity))

    # 4. Rate limit
    self._check_rate_limit()

    # 5. Order size limits
    price = await self._estimate_price(asset, limit_price)
    order_value = abs(quantity) * price
    self._check_order_limits(quantity, order_value)

    # 6. Position limits
    await self._check_position_limits(asset, quantity, order_value, side)

    # 7. Fat finger check (limit orders)
    if limit_price and order_type in (OrderType.LIMIT, OrderType.STOP_LIMIT):
        await self._check_price_deviation(asset, limit_price)

    # 8. Drawdown check (may activate kill switch)
    await self._check_drawdown()

    # === Shadow Mode (Gemini v2 fix: use VirtualPortfolio) ===
    if self.config.shadow_mode:
        # Create filled order
        order = Order(
            asset=asset,
            side=side,
            quantity=quantity,
            order_type=order_type,
            limit_price=limit_price,
            stop_price=stop_price,
            order_id=f"SHADOW-{int(time.time() * 1000)}",
            status=OrderStatus.FILLED,
            filled_quantity=quantity,
            filled_price=price,
            filled_at=datetime.now(),
        )

        # CRITICAL: Update VirtualPortfolio (fixes infinite buy loop)
        self._virtual_portfolio.process_fill(order)

        logger.info(
            f"SHADOW: {side.value} {quantity} {asset} @ ${price:.2f} "
            f"(value: ${order_value:,.0f})"
        )

        # Update state
        self._state.orders_placed += 1
        self._prune_history()  # Memory leak fix
        self._save_state()

        return order

    # === Execute ===
    logger.info(f"SafeBroker: Submitting {side.value} {quantity} {asset}")
    order = await self._broker.submit_order_async(
        asset, quantity, side, order_type, limit_price, stop_price, **kwargs
    )

    # Update state
    self._state.orders_placed += 1
    self._recent_orders.append((time.time(), asset, float(quantity)))
    self._prune_history()  # Memory leak fix
    self._save_state()

    return order

enable_kill_switch

enable_kill_switch(reason='Manual')

Manually enable kill switch.

Parameters:

Name Type Description Default
reason str

Reason for activation (default: "Manual")

'Manual'
Source code in src/ml4t/live/safety.py
def enable_kill_switch(self, reason: str = "Manual") -> None:
    """Manually enable kill switch.

    Args:
        reason: Reason for activation (default: "Manual")
    """
    self._activate_kill_switch(reason)

disable_kill_switch

disable_kill_switch()

Manually disable kill switch (use with caution!).

Source code in src/ml4t/live/safety.py
def disable_kill_switch(self) -> None:
    """Manually disable kill switch (use with caution!)."""
    logger.warning("Kill switch DISABLED - proceed with caution!")
    self._state.kill_switch_activated = False
    self._state.kill_switch_reason = ""
    self.config.kill_switch_enabled = False
    self._save_state()

close_all_positions async

close_all_positions()

Emergency close all positions.

Returns:

Type Description
list[Order]

List of close orders

Source code in src/ml4t/live/safety.py
async def close_all_positions(self) -> list[Order]:
    """Emergency close all positions.

    Returns:
        List of close orders
    """
    logger.warning("EMERGENCY: Closing ALL positions")
    orders = []
    for asset in list(self.positions.keys()):
        order = await self.close_position_async(asset)
        if order:
            orders.append(order)
    return orders

connect async

connect()

Connect to broker.

Source code in src/ml4t/live/safety.py
async def connect(self) -> None:
    """Connect to broker."""
    await self._broker.connect()

disconnect async

disconnect()

Disconnect from broker and save state.

Source code in src/ml4t/live/safety.py
async def disconnect(self) -> None:
    """Disconnect from broker and save state."""
    self._save_state()
    await self._broker.disconnect()

is_connected_async async

is_connected_async()

Check if connected (async).

Source code in src/ml4t/live/safety.py
async def is_connected_async(self) -> bool:
    """Check if connected (async)."""
    return await self._broker.is_connected_async()

get_positions_async async

get_positions_async()

Get all positions (async).

Returns:

Type Description
dict[str, Position]

Dictionary mapping asset symbol to Position

Source code in src/ml4t/live/safety.py
async def get_positions_async(self) -> dict[str, Position]:
    """Get all positions (async).

    Returns:
        Dictionary mapping asset symbol to Position
    """
    if self.config.shadow_mode:
        return self._virtual_portfolio.positions
    return await self._broker.get_positions_async()

get_pending_orders_async async

get_pending_orders_async()

Get pending orders (async).

Returns:

Type Description
list[Order]

List of pending orders

Source code in src/ml4t/live/safety.py
async def get_pending_orders_async(self) -> list[Order]:
    """Get pending orders (async).

    Returns:
        List of pending orders
    """
    return await self._broker.get_pending_orders_async()

get_position_async async

get_position_async(asset)

Get position (async).

Parameters:

Name Type Description Default
asset str

Asset symbol

required

Returns:

Type Description
Position | None

Position object or None

Source code in src/ml4t/live/safety.py
async def get_position_async(self, asset: str) -> Position | None:
    """Get position (async).

    Args:
        asset: Asset symbol

    Returns:
        Position object or None
    """
    if self.config.shadow_mode:
        return self._virtual_portfolio.positions.get(asset)
    return await self._broker.get_position_async(asset)

ThreadSafeBrokerWrapper

ThreadSafeBrokerWrapper(async_broker, loop)

Wraps an async broker for use from sync strategy code.

This wrapper is passed to Strategy.on_data() instead of the raw broker. It bridges the sync/async boundary by scheduling coroutines on the main event loop and blocking the worker thread until they complete.

Thread Safety: - Strategy runs in worker thread (via asyncio.to_thread) - Broker methods run on main event loop - run_coroutine_threadsafe() handles the cross-thread communication

Timeouts (from design review): - Getters (get_cash, get_account_value): 5s - Order operations (submit, cancel, close): 30s

Example

LiveEngine creates this wrapper

loop = asyncio.get_running_loop() wrapped = ThreadSafeBrokerWrapper(ib_broker, loop)

Strategy uses it like a normal sync broker

order = wrapped.submit_order('AAPL', 100, OrderSide.BUY)

Note

This class implements BrokerProtocol but does not inherit from it. It provides a sync interface backed by async operations.

Initialize thread-safe wrapper.

Parameters:

Name Type Description Default
async_broker AsyncBrokerProtocol

Async broker implementation (IBBroker, etc.)

required
loop AbstractEventLoop

Main event loop (from asyncio.get_running_loop())

required
Source code in src/ml4t/live/wrappers.py
def __init__(self, async_broker: AsyncBrokerProtocol, loop: asyncio.AbstractEventLoop):
    """Initialize thread-safe wrapper.

    Args:
        async_broker: Async broker implementation (IBBroker, etc.)
        loop: Main event loop (from asyncio.get_running_loop())
    """
    self._broker = async_broker
    self._loop = loop
    self._loop_thread_id = threading.get_ident()

positions property

positions

Get current positions (thread-safe read).

Returns:

Type Description
dict[str, Position]

Dictionary mapping asset symbol to Position

pending_orders property

pending_orders

Get pending orders (thread-safe read).

Returns:

Type Description
list[Order]

List of pending Order objects

is_connected property

is_connected

Check if broker is connected.

Returns:

Type Description
bool

True if connected and ready to trade

get_position

get_position(asset)

Get position for specific asset.

Parameters:

Name Type Description Default
asset str

Asset symbol (e.g., "AAPL")

required

Returns:

Type Description
Position | None

Position object if holding position, None otherwise

Raises:

Type Description
TimeoutError

If operation times out

RuntimeError

If broker error occurs

Source code in src/ml4t/live/wrappers.py
def get_position(self, asset: str) -> Position | None:
    """Get position for specific asset.

    Args:
        asset: Asset symbol (e.g., "AAPL")

    Returns:
        Position object if holding position, None otherwise

    Raises:
        TimeoutError: If operation times out
        RuntimeError: If broker error occurs
    """
    # Can use positions property since it returns a copy
    return self.positions.get(asset)

get_account_value

get_account_value()

Get total account value (cash + positions).

Returns:

Type Description
float

Total account value in base currency

Raises:

Type Description
TimeoutError

If operation times out (5s)

RuntimeError

If broker error occurs

Source code in src/ml4t/live/wrappers.py
def get_account_value(self) -> float:
    """Get total account value (cash + positions).

    Returns:
        Total account value in base currency

    Raises:
        TimeoutError: If operation times out (5s)
        RuntimeError: If broker error occurs
    """
    return self._run_sync(self._broker.get_account_value_async(), timeout=5.0)

get_cash

get_cash()

Get available cash balance.

Returns:

Type Description
float

Available cash in base currency

Raises:

Type Description
TimeoutError

If operation times out (5s)

RuntimeError

If broker error occurs

Source code in src/ml4t/live/wrappers.py
def get_cash(self) -> float:
    """Get available cash balance.

    Returns:
        Available cash in base currency

    Raises:
        TimeoutError: If operation times out (5s)
        RuntimeError: If broker error occurs
    """
    return self._run_sync(self._broker.get_cash_async(), timeout=5.0)

submit_order

submit_order(
    asset,
    quantity,
    side=None,
    order_type=MARKET,
    limit_price=None,
    stop_price=None,
    **kwargs,
)

Submit order for execution.

Parameters:

Name Type Description Default
asset str

Asset symbol (e.g., "AAPL")

required
quantity int

Number of shares/contracts (positive for buy, negative for sell)

required
side OrderSide | None

Order side (BUY/SELL), auto-detected from quantity if None

None
order_type OrderType

Type of order (MARKET, LIMIT, STOP, etc.)

MARKET
limit_price float | None

Limit price for LIMIT/STOP_LIMIT orders

None
stop_price float | None

Stop price for STOP/STOP_LIMIT orders

None
**kwargs Any

Additional broker-specific parameters

{}

Returns:

Type Description
Order

Order object with order_id and initial status

Raises:

Type Description
TimeoutError

If operation times out (30s)

ValueError

If order parameters are invalid

RuntimeError

If broker is not connected or error occurs

Source code in src/ml4t/live/wrappers.py
def submit_order(
    self,
    asset: str,
    quantity: int,
    side: OrderSide | None = None,
    order_type: OrderType = OrderType.MARKET,
    limit_price: float | None = None,
    stop_price: float | None = None,
    **kwargs: Any,
) -> Order:
    """Submit order for execution.

    Args:
        asset: Asset symbol (e.g., "AAPL")
        quantity: Number of shares/contracts (positive for buy, negative for sell)
        side: Order side (BUY/SELL), auto-detected from quantity if None
        order_type: Type of order (MARKET, LIMIT, STOP, etc.)
        limit_price: Limit price for LIMIT/STOP_LIMIT orders
        stop_price: Stop price for STOP/STOP_LIMIT orders
        **kwargs: Additional broker-specific parameters

    Returns:
        Order object with order_id and initial status

    Raises:
        TimeoutError: If operation times out (30s)
        ValueError: If order parameters are invalid
        RuntimeError: If broker is not connected or error occurs
    """
    return self._run_sync(
        self._broker.submit_order_async(
            asset, quantity, side, order_type, limit_price, stop_price, **kwargs
        ),
        timeout=30.0,  # Orders need longer timeout
    )

cancel_order

cancel_order(order_id)

Cancel pending order.

Parameters:

Name Type Description Default
order_id str

ID of order to cancel

required

Returns:

Type Description
bool

True if cancel request submitted, False if order not found

Raises:

Type Description
TimeoutError

If operation times out (30s)

RuntimeError

If broker error occurs

Source code in src/ml4t/live/wrappers.py
def cancel_order(self, order_id: str) -> bool:
    """Cancel pending order.

    Args:
        order_id: ID of order to cancel

    Returns:
        True if cancel request submitted, False if order not found

    Raises:
        TimeoutError: If operation times out (30s)
        RuntimeError: If broker error occurs
    """
    return self._run_sync(self._broker.cancel_order_async(order_id), timeout=30.0)

close_position

close_position(asset)

Close entire position in asset.

Convenience method that submits a closing order.

Parameters:

Name Type Description Default
asset str

Asset symbol to close

required

Returns:

Type Description
Order | None

Order object if position exists, None if no position

Raises:

Type Description
TimeoutError

If operation times out (30s)

RuntimeError

If broker error occurs

Source code in src/ml4t/live/wrappers.py
def close_position(self, asset: str) -> Order | None:
    """Close entire position in asset.

    Convenience method that submits a closing order.

    Args:
        asset: Asset symbol to close

    Returns:
        Order object if position exists, None if no position

    Raises:
        TimeoutError: If operation times out (30s)
        RuntimeError: If broker error occurs
    """
    return self._run_sync(self._broker.close_position_async(asset), timeout=30.0)

Brokers

IBBroker

IBBroker(
    host="127.0.0.1", port=7497, client_id=1, account=None
)

Bases: AsyncBrokerProtocol

Interactive Brokers implementation.

Design: - All broker operations are async - Uses asyncio.Lock for thread safety - Event handlers use put_nowait() (non-blocking) - Reconnection handled externally

Connection Ports: - TWS Paper: 7497 - TWS Live: 7496 - Gateway Paper: 4002 - Gateway Live: 4001

Example

broker = IBBroker(port=7497) # Paper trading await broker.connect() positions = await broker.get_positions_async() await broker.disconnect()

Initialize IBBroker.

Parameters:

Name Type Description Default
host str

IB Gateway/TWS host (default: '127.0.0.1')

'127.0.0.1'
port int

IB Gateway/TWS port (default: 7497 for paper)

7497
client_id int

Unique client ID (default: 1)

1
account str | None

IB account ID (default: use first account)

None
Source code in src/ml4t/live/brokers/ib.py
def __init__(
    self,
    host: str = "127.0.0.1",
    port: int = 7497,  # Paper trading default
    client_id: int = 1,
    account: str | None = None,
):
    """Initialize IBBroker.

    Args:
        host: IB Gateway/TWS host (default: '127.0.0.1')
        port: IB Gateway/TWS port (default: 7497 for paper)
        client_id: Unique client ID (default: 1)
        account: IB account ID (default: use first account)
    """
    self._host = host
    self._port = port
    self._client_id = client_id
    self._account = account

    self.ib = IB()
    self.ib.RequestTimeout = 60
    self.ib.RaiseRequestErrors = True
    self._connected = False

    # Thread-safe state with locks
    self._positions: dict[str, Position] = {}
    self._position_lock = asyncio.Lock()
    self._pending_orders: dict[str, Order] = {}
    self._order_lock = asyncio.Lock()

    # Order tracking
    self._order_counter = 0
    self._ib_order_map: dict[int, tuple[str, float]] = {}  # IB orderId -> (our_id, timestamp)

    # Contract cache
    self._contracts: dict[str, Contract] = {}

is_connected property

is_connected

Check if connected to IB.

positions property

positions

Thread-safe position access (Gemini v2 Critical Issue C).

Note: This is called from worker thread via ThreadSafeBrokerWrapper. The lock prevents RuntimeError during dict iteration if IB callback modifies positions concurrently.

Returns:

Type Description
dict[str, Position]

Dictionary mapping asset symbols to Position objects

pending_orders property

pending_orders

Get list of pending orders.

Returns:

Type Description
list[Order]

List of pending Order objects

connect async

connect()

Connect to IB Gateway/TWS.

Raises:

Type Description
RuntimeError

If connection fails

TimeoutError

If connection times out

Source code in src/ml4t/live/brokers/ib.py
async def connect(self) -> None:
    """Connect to IB Gateway/TWS.

    Raises:
        RuntimeError: If connection fails
        asyncio.TimeoutError: If connection times out
    """
    if self._connected:
        logger.info("IBBroker: Already connected")
        return

    logger.info(
        f"IBBroker: Connecting to {self._host}:{self._port} (client_id={self._client_id})"
    )

    try:
        # Use outer timeout wrapper like production code
        await asyncio.wait_for(
            self.ib.connectAsync(
                host=self._host,
                port=self._port,
                clientId=self._client_id,
                account=self._account or "",  # Pass account like production
                timeout=15,
            ),
            timeout=20,  # Outer timeout wrapper
        )
    except (TimeoutError, ConnectionRefusedError) as e:
        logger.error(f"IBBroker: Connection failed: {e}")
        raise
    except Exception:
        logger.exception("IBBroker: Unexpected error during connect")
        raise

    self._connected = True

    # Get account if not specified
    if self._account is None:
        accounts = self.ib.managedAccounts()
        if accounts:
            self._account = accounts[0]
    logger.info(f"IBBroker: Connected successfully, account={self._account}")

    # Subscribe to events
    self.ib.orderStatusEvent += self._on_order_status
    self.ib.positionEvent += self._on_position

    # Initial sync
    await self._sync_positions()
    await self._sync_orders()

disconnect async

disconnect()

Disconnect from IB.

Source code in src/ml4t/live/brokers/ib.py
async def disconnect(self) -> None:
    """Disconnect from IB."""
    if self._connected:
        self.ib.disconnect()
        self._connected = False
        # Give time for socket cleanup to prevent zombie connections
        await asyncio.sleep(0.1)
        logger.info("IBBroker: Disconnected")

get_position

get_position(asset)

Thread-safe single position access.

Parameters:

Name Type Description Default
asset str

Asset symbol

required

Returns:

Type Description
Position | None

Position object if exists, None otherwise

Source code in src/ml4t/live/brokers/ib.py
def get_position(self, asset: str) -> Position | None:
    """Thread-safe single position access.

    Args:
        asset: Asset symbol

    Returns:
        Position object if exists, None otherwise
    """
    return self._positions.get(asset.upper())

get_positions_async async

get_positions_async()

Async thread-safe position access with lock.

Returns:

Type Description
dict[str, Position]

Dictionary mapping asset symbols to Position objects

Source code in src/ml4t/live/brokers/ib.py
async def get_positions_async(self) -> dict[str, Position]:
    """Async thread-safe position access with lock.

    Returns:
        Dictionary mapping asset symbols to Position objects
    """
    async with self._position_lock:
        return dict(self._positions)

get_account_value_async async

get_account_value_async()

Get Net Liquidation Value.

Returns:

Type Description
float

Account net liquidation value in USD

Source code in src/ml4t/live/brokers/ib.py
async def get_account_value_async(self) -> float:
    """Get Net Liquidation Value.

    Returns:
        Account net liquidation value in USD
    """
    for av in self.ib.accountValues():
        if (
            av.tag == "NetLiquidation"
            and av.currency == "USD"
            and (av.account == self._account or self._account is None)
        ):
            return float(av.value)
    return 0.0

get_cash_async async

get_cash_async()

Get available funds.

Returns:

Type Description
float

Available funds in USD

Source code in src/ml4t/live/brokers/ib.py
async def get_cash_async(self) -> float:
    """Get available funds.

    Returns:
        Available funds in USD
    """
    for av in self.ib.accountValues():
        if (
            av.tag == "AvailableFunds"
            and av.currency == "USD"
            and (av.account == self._account or self._account is None)
        ):
            return float(av.value)
    return 0.0

submit_order_async async

submit_order_async(
    asset,
    quantity,
    side=None,
    order_type=MARKET,
    limit_price=None,
    stop_price=None,
    **kwargs,
)

Submit order to IB.

TASK-013: Full order submission implementation with IB order tracking.

Parameters:

Name Type Description Default
asset str

Asset symbol

required
quantity float

Number of shares

required
side OrderSide | None

BUY or SELL (auto-detected if None)

None
order_type OrderType

Market, limit, stop, or stop-limit

MARKET
limit_price float | None

Limit price for limit orders

None
stop_price float | None

Stop price for stop orders

None

Returns:

Type Description
Order

Order object

Raises:

Type Description
RuntimeError

If not connected

ValueError

If order parameters are invalid

Source code in src/ml4t/live/brokers/ib.py
async def submit_order_async(
    self,
    asset: str,
    quantity: float,
    side: OrderSide | None = None,
    order_type: OrderType = OrderType.MARKET,
    limit_price: float | None = None,
    stop_price: float | None = None,
    **kwargs: Any,
) -> Order:
    """Submit order to IB.

    TASK-013: Full order submission implementation with IB order tracking.

    Args:
        asset: Asset symbol
        quantity: Number of shares
        side: BUY or SELL (auto-detected if None)
        order_type: Market, limit, stop, or stop-limit
        limit_price: Limit price for limit orders
        stop_price: Stop price for stop orders

    Returns:
        Order object

    Raises:
        RuntimeError: If not connected
        ValueError: If order parameters are invalid
    """
    if not self.is_connected:
        raise RuntimeError("Not connected to IB")

    # Auto-detect side if not provided
    asset = asset.upper()
    if side is None:
        pos = self.get_position(asset)
        if pos and pos.quantity < 0:
            # Short position, assume closing (buy)
            side = OrderSide.BUY
        else:
            # Long or no position, assume opening/adding (buy)
            side = OrderSide.BUY

    # Get contract
    contract = self._get_contract(asset)

    # Create IB order
    action = "BUY" if side == OrderSide.BUY else "SELL"
    ib_order = self._create_ib_order(action, quantity, order_type, limit_price, stop_price)

    # Submit atomically with lock
    async with self._order_lock:
        self._order_counter += 1
        order_id = f"ML4T-{self._order_counter}"

        # Place order with IB
        trade = self.ib.placeOrder(contract, ib_order)

        # Create our order
        order = Order(
            asset=asset,
            side=side,
            quantity=quantity,
            order_type=order_type,
            limit_price=limit_price,
            stop_price=stop_price,
            order_id=order_id,
            status=OrderStatus.PENDING,
            created_at=datetime.now(),
        )

        # Track order
        self._pending_orders[order_id] = order
        self._ib_order_map[trade.order.orderId] = (order_id, time.time())

    logger.info(f"IBBroker: Order {order_id} submitted: {side.value} {quantity} {asset}")
    return order

cancel_order_async async

cancel_order_async(order_id)

Cancel pending order.

TASK-016: Full order cancellation implementation.

This method finds the IB order ID from our tracking map and cancels the order via the IB API. Handles edge cases like order not found or order already filled.

Parameters:

Name Type Description Default
order_id str

Order ID to cancel (e.g., 'ML4T-1')

required

Returns:

Type Description
bool

True if cancellation request sent successfully, False otherwise

Note

The actual cancellation is confirmed via _on_order_status callback when IB sends the 'Cancelled' status update.

Source code in src/ml4t/live/brokers/ib.py
async def cancel_order_async(self, order_id: str) -> bool:
    """Cancel pending order.

    TASK-016: Full order cancellation implementation.

    This method finds the IB order ID from our tracking map and cancels
    the order via the IB API. Handles edge cases like order not found
    or order already filled.

    Args:
        order_id: Order ID to cancel (e.g., 'ML4T-1')

    Returns:
        True if cancellation request sent successfully, False otherwise

    Note:
        The actual cancellation is confirmed via _on_order_status callback
        when IB sends the 'Cancelled' status update.
    """
    # Find IB order ID from our tracking map
    ib_order_id = None
    for ib_id, (our_id, _) in self._ib_order_map.items():
        if our_id == order_id:
            ib_order_id = ib_id
            break

    if ib_order_id is None:
        logger.warning(f"IBBroker: Order {order_id} not found in tracking map")
        return False

    # Find the trade in open trades and cancel
    for trade in self.ib.openTrades():
        if trade.order.orderId == ib_order_id:
            self.ib.cancelOrder(trade.order)
            logger.info(f"IBBroker: Cancellation requested for order {order_id}")
            return True

    # Order not in open trades (possibly already filled or cancelled)
    logger.warning(f"IBBroker: Order {order_id} not found in open trades")
    return False

close_position_async async

close_position_async(asset)

Close position in asset.

Parameters:

Name Type Description Default
asset str

Asset symbol

required

Returns:

Type Description
Order | None

Order object if position exists, None otherwise

Raises:

Type Description
NotImplementedError

Depends on TASK-013

Source code in src/ml4t/live/brokers/ib.py
async def close_position_async(self, asset: str) -> Order | None:
    """Close position in asset.

    Args:
        asset: Asset symbol

    Returns:
        Order object if position exists, None otherwise

    Raises:
        NotImplementedError: Depends on TASK-013
    """
    pos = self.get_position(asset)
    if not pos or pos.quantity == 0:
        return None

    side = OrderSide.SELL if pos.quantity > 0 else OrderSide.BUY
    return await self.submit_order_async(asset, abs(pos.quantity), side)

AlpacaBroker

AlpacaBroker(api_key, secret_key, paper=True)

Bases: AsyncBrokerProtocol

Alpaca Markets broker implementation.

Design (matching IBBroker patterns): - All broker operations are async - Uses asyncio.Lock for thread safety - WebSocket stream for real-time order updates - REST API for account/position queries and order submission

Paper vs Live: - paper=True (default): Uses paper trading endpoint - paper=False: Uses live trading endpoint (USE WITH CAUTION)

Example

broker = AlpacaBroker( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', paper=True, # Always start with paper trading! ) await broker.connect() positions = await broker.get_positions_async() await broker.disconnect()

Initialize AlpacaBroker.

Parameters:

Name Type Description Default
api_key str

Alpaca API key (from https://app.alpaca.markets)

required
secret_key str

Alpaca secret key

required
paper bool

Use paper trading endpoint (default: True)

True
Source code in src/ml4t/live/brokers/alpaca.py
def __init__(
    self,
    api_key: str,
    secret_key: str,
    paper: bool = True,  # Paper trading by default (SAFETY)
):
    """Initialize AlpacaBroker.

    Args:
        api_key: Alpaca API key (from https://app.alpaca.markets)
        secret_key: Alpaca secret key
        paper: Use paper trading endpoint (default: True)
    """
    self._api_key = api_key
    self._secret_key = secret_key
    self._paper = paper

    # Clients (created in connect())
    self._trading_client: TradingClient | None = None
    self._trading_stream: TradingStream | None = None
    self._stream_task: asyncio.Task | None = None

    # Connection state
    self._connected = False

    # Thread-safe state with locks (matching IBBroker pattern)
    self._positions: dict[str, Position] = {}
    self._position_lock = asyncio.Lock()
    self._pending_orders: dict[str, Order] = {}
    self._order_lock = asyncio.Lock()

    # Order tracking (matching IBBroker pattern)
    self._order_counter = 0
    # Alpaca order ID (UUID string) -> (our_id, timestamp)
    self._alpaca_order_map: dict[str, tuple[str, float]] = {}

is_connected property

is_connected

Check if connected to Alpaca.

positions property

positions

Thread-safe position access (shallow copy).

Note: This is called from worker thread via ThreadSafeBrokerWrapper. The shallow copy prevents RuntimeError during dict iteration.

Returns:

Type Description
dict[str, Position]

Dictionary mapping asset symbols to Position objects

pending_orders property

pending_orders

Get list of pending orders.

Returns:

Type Description
list[Order]

List of pending Order objects

connect async

connect()

Connect to Alpaca and sync initial state.

Steps: 1. Create TradingClient (REST) 2. Create TradingStream (WebSocket) 3. Verify connection by fetching account 4. Register trade update callback 5. Sync positions and open orders 6. Start WebSocket stream for order updates

Raises:

Type Description
RuntimeError

If connection fails

Source code in src/ml4t/live/brokers/alpaca.py
async def connect(self) -> None:
    """Connect to Alpaca and sync initial state.

    Steps:
    1. Create TradingClient (REST)
    2. Create TradingStream (WebSocket)
    3. Verify connection by fetching account
    4. Register trade update callback
    5. Sync positions and open orders
    6. Start WebSocket stream for order updates

    Raises:
        RuntimeError: If connection fails
    """
    if self._connected:
        logger.info("AlpacaBroker: Already connected")
        return

    mode = "paper" if self._paper else "LIVE"
    logger.info(f"AlpacaBroker: Connecting ({mode} trading)")

    try:
        # Create REST client
        self._trading_client = TradingClient(
            api_key=self._api_key,
            secret_key=self._secret_key,
            paper=self._paper,
        )

        # Verify connection by fetching account
        account = self._trading_client.get_account()
        equity = float(account.equity) if account.equity else 0.0
        cash = float(account.cash) if account.cash else 0.0
        logger.info(
            f"AlpacaBroker: Account verified - equity=${equity:,.2f}, cash=${cash:,.2f}"
        )

        # Create WebSocket stream for order updates
        self._trading_stream = TradingStream(
            api_key=self._api_key,
            secret_key=self._secret_key,
            paper=self._paper,
        )

        # Subscribe to trade updates BEFORE initial sync (IBBroker pattern)
        self._trading_stream.subscribe_trade_updates(self._on_trade_update)

        # Initial sync
        await self._sync_positions()
        await self._sync_orders()

        # Start stream in background task
        self._stream_task = asyncio.create_task(self._run_trading_stream())

        self._connected = True
        logger.info("AlpacaBroker: Connected successfully")

    except Exception as e:
        logger.error(f"AlpacaBroker: Connection failed: {e}")
        raise RuntimeError(f"Failed to connect to Alpaca: {e}") from e

disconnect async

disconnect()

Disconnect from Alpaca.

Source code in src/ml4t/live/brokers/alpaca.py
async def disconnect(self) -> None:
    """Disconnect from Alpaca."""
    if not self._connected:
        return

    # Cancel stream task
    if self._stream_task and not self._stream_task.done():
        self._stream_task.cancel()
        try:
            await self._stream_task
        except asyncio.CancelledError:
            pass

    # Close stream
    if self._trading_stream:
        try:
            self._trading_stream.stop()
        except Exception as e:
            logger.warning(f"AlpacaBroker: Error stopping stream: {e}")

    self._connected = False
    self._trading_client = None
    self._trading_stream = None
    self._stream_task = None

    logger.info("AlpacaBroker: Disconnected")

get_position

get_position(asset)

Thread-safe single position access.

Parameters:

Name Type Description Default
asset str

Asset symbol (e.g., 'AAPL' or 'BTC/USD')

required

Returns:

Type Description
Position | None

Position object if exists, None otherwise

Source code in src/ml4t/live/brokers/alpaca.py
def get_position(self, asset: str) -> Position | None:
    """Thread-safe single position access.

    Args:
        asset: Asset symbol (e.g., 'AAPL' or 'BTC/USD')

    Returns:
        Position object if exists, None otherwise
    """
    return self._positions.get(asset.upper())

get_positions_async async

get_positions_async()

Async thread-safe position access with lock.

Returns:

Type Description
dict[str, Position]

Dictionary mapping asset symbols to Position objects

Source code in src/ml4t/live/brokers/alpaca.py
async def get_positions_async(self) -> dict[str, Position]:
    """Async thread-safe position access with lock.

    Returns:
        Dictionary mapping asset symbols to Position objects
    """
    async with self._position_lock:
        return dict(self._positions)

get_account_value_async async

get_account_value_async()

Get portfolio value (equity).

Returns:

Type Description
float

Total account equity in USD

Source code in src/ml4t/live/brokers/alpaca.py
async def get_account_value_async(self) -> float:
    """Get portfolio value (equity).

    Returns:
        Total account equity in USD
    """
    if not self._trading_client:
        return 0.0

    account = self._trading_client.get_account()
    return float(account.equity) if account.equity else 0.0

get_cash_async async

get_cash_async()

Get available cash.

Returns:

Type Description
float

Available cash in USD

Source code in src/ml4t/live/brokers/alpaca.py
async def get_cash_async(self) -> float:
    """Get available cash.

    Returns:
        Available cash in USD
    """
    if not self._trading_client:
        return 0.0

    account = self._trading_client.get_account()
    return float(account.cash) if account.cash else 0.0

submit_order_async async

submit_order_async(
    asset,
    quantity,
    side=None,
    order_type=MARKET,
    limit_price=None,
    stop_price=None,
    **kwargs,
)

Submit order to Alpaca.

Parameters:

Name Type Description Default
asset str

Asset symbol (e.g., 'AAPL' or 'BTC/USD')

required
quantity float

Number of shares/units

required
side OrderSide | None

BUY or SELL (auto-detected from quantity sign if None)

None
order_type OrderType

Market, limit, stop, or stop-limit

MARKET
limit_price float | None

Limit price for limit orders

None
stop_price float | None

Stop price for stop orders

None
**kwargs Any

Additional parameters (ignored)

{}

Returns:

Type Description
Order

Order object

Raises:

Type Description
RuntimeError

If not connected

ValueError

If order parameters are invalid

Source code in src/ml4t/live/brokers/alpaca.py
async def submit_order_async(
    self,
    asset: str,
    quantity: float,
    side: OrderSide | None = None,
    order_type: OrderType = OrderType.MARKET,
    limit_price: float | None = None,
    stop_price: float | None = None,
    **kwargs: Any,
) -> Order:
    """Submit order to Alpaca.

    Args:
        asset: Asset symbol (e.g., 'AAPL' or 'BTC/USD')
        quantity: Number of shares/units
        side: BUY or SELL (auto-detected from quantity sign if None)
        order_type: Market, limit, stop, or stop-limit
        limit_price: Limit price for limit orders
        stop_price: Stop price for stop orders
        **kwargs: Additional parameters (ignored)

    Returns:
        Order object

    Raises:
        RuntimeError: If not connected
        ValueError: If order parameters are invalid
    """
    if not self.is_connected or not self._trading_client:
        raise RuntimeError("Not connected to Alpaca")

    # Normalize asset symbol
    asset = asset.upper()

    # Auto-detect side from quantity sign if not provided
    if side is None:
        side = OrderSide.BUY if quantity > 0 else OrderSide.SELL
    qty = abs(quantity)

    # Create order request
    order_request = self._create_order_request(
        asset, qty, side, order_type, limit_price, stop_price
    )

    # Submit atomically with lock (IBBroker pattern)
    async with self._order_lock:
        self._order_counter += 1
        order_id = f"ML4T-{self._order_counter}"

        # Submit to Alpaca
        alpaca_order = self._trading_client.submit_order(order_request)

        # Create our order object
        order = Order(
            asset=asset,
            side=side,
            quantity=qty,
            order_type=order_type,
            limit_price=limit_price,
            stop_price=stop_price,
            order_id=order_id,
            status=self._map_order_status(alpaca_order.status),
            created_at=alpaca_order.created_at or datetime.now(UTC),
        )

        # Track order
        self._pending_orders[order_id] = order
        self._alpaca_order_map[str(alpaca_order.id)] = (order_id, time.time())

    logger.info(f"AlpacaBroker: Order {order_id} submitted: {side.value} {qty} {asset}")
    return order

cancel_order_async async

cancel_order_async(order_id)

Cancel pending order.

Parameters:

Name Type Description Default
order_id str

Order ID to cancel (e.g., 'ML4T-1')

required

Returns:

Type Description
bool

True if cancellation request sent successfully, False otherwise

Source code in src/ml4t/live/brokers/alpaca.py
async def cancel_order_async(self, order_id: str) -> bool:
    """Cancel pending order.

    Args:
        order_id: Order ID to cancel (e.g., 'ML4T-1')

    Returns:
        True if cancellation request sent successfully, False otherwise
    """
    if not self._trading_client:
        return False

    # Find Alpaca order ID from our tracking map
    alpaca_order_id = None
    for alpaca_id, (our_id, _) in self._alpaca_order_map.items():
        if our_id == order_id:
            alpaca_order_id = alpaca_id
            break

    if alpaca_order_id is None:
        logger.warning(f"AlpacaBroker: Order {order_id} not found in tracking map")
        return False

    try:
        self._trading_client.cancel_order_by_id(alpaca_order_id)
        logger.info(f"AlpacaBroker: Cancellation requested for order {order_id}")
        return True
    except Exception as e:
        logger.warning(f"AlpacaBroker: Failed to cancel order {order_id}: {e}")
        return False

close_position_async async

close_position_async(asset)

Close position in asset.

Parameters:

Name Type Description Default
asset str

Asset symbol

required

Returns:

Type Description
Order | None

Order object if position exists, None otherwise

Source code in src/ml4t/live/brokers/alpaca.py
async def close_position_async(self, asset: str) -> Order | None:
    """Close position in asset.

    Args:
        asset: Asset symbol

    Returns:
        Order object if position exists, None otherwise
    """
    pos = self.get_position(asset)
    if not pos or pos.quantity == 0:
        return None

    side = OrderSide.SELL if pos.quantity > 0 else OrderSide.BUY
    return await self.submit_order_async(asset, abs(pos.quantity), side)

Data Feeds

IBDataFeed

IBDataFeed(
    ib,
    symbols,
    *,
    exchange="SMART",
    currency="USD",
    tick_throttle_ms=100,
)

Bases: DataFeedProtocol

Real-time market data feed from Interactive Brokers.

Subscribes to tick-by-tick market data for specified symbols. Emits data as (timestamp, data, context) tuples.

Data Format

timestamp: datetime - Tick timestamp data: dict[str, dict] - {symbol: {'price': float, 'size': int}} context: dict - Additional metadata (bid, ask, etc.)

Note
  • IB must be connected before creating feed
  • Requires market data subscription for symbols
  • Throttles rapid ticks to avoid overwhelming strategy
Example

ib = IB() await ib.connectAsync('127.0.0.1', 7497, clientId=1)

feed = IBDataFeed(ib, symbols=['SPY', 'QQQ', 'IWM']) await feed.start()

Use directly or wrap with BarAggregator

aggregator = BarAggregator(feed, bar_size_minutes=1)

Initialize IB data feed.

Parameters:

Name Type Description Default
ib IB

Connected IB instance

required
symbols list[str]

List of symbols to subscribe to

required
exchange str

IB exchange (default: SMART routing)

'SMART'
currency str

Currency (default: USD)

'USD'
tick_throttle_ms int

Minimum milliseconds between tick emissions (prevents overwhelming strategy with rapid ticks)

100
Source code in src/ml4t/live/feeds/ib_feed.py
def __init__(
    self,
    ib: IB,
    symbols: list[str],
    *,
    exchange: str = "SMART",
    currency: str = "USD",
    tick_throttle_ms: int = 100,  # Min time between emits
):
    """Initialize IB data feed.

    Args:
        ib: Connected IB instance
        symbols: List of symbols to subscribe to
        exchange: IB exchange (default: SMART routing)
        currency: Currency (default: USD)
        tick_throttle_ms: Minimum milliseconds between tick emissions
            (prevents overwhelming strategy with rapid ticks)
    """
    self.ib = ib
    self.symbols = symbols
    self.exchange = exchange
    self.currency = currency
    self.tick_throttle_ms = tick_throttle_ms

    # State
    self._queue: asyncio.Queue = asyncio.Queue()
    self._running = False
    self._contracts: dict[str, Stock] = {}
    self._tickers: dict[str, Ticker] = {}
    self._last_emit_time = 0.0

    # Statistics
    self._tick_count = 0
    self._throttled_count = 0

stats property

stats

Get feed statistics.

Returns:

Type Description
dict[str, Any]

Dict with keys:

dict[str, Any]
  • running: bool
dict[str, Any]
  • tick_count: int - Total ticks received
dict[str, Any]
  • throttled_count: int - Ticks throttled
dict[str, Any]
  • symbols: list[str] - Subscribed symbols

start async

start()

Subscribe to market data for all symbols.

Creates contracts and subscribes to real-time tick data.

Raises:

Type Description
RuntimeError

If IB not connected

Source code in src/ml4t/live/feeds/ib_feed.py
async def start(self) -> None:
    """Subscribe to market data for all symbols.

    Creates contracts and subscribes to real-time tick data.

    Raises:
        RuntimeError: If IB not connected
    """
    if not self.ib.isConnected():
        raise RuntimeError("IB must be connected before starting feed")

    logger.info(f"IBDataFeed: Starting feed for {len(self.symbols)} symbols")
    self._running = True

    # Create contracts
    for symbol in self.symbols:
        contract = Stock(symbol, self.exchange, self.currency)
        self._contracts[symbol] = contract

        # Qualify contract (ensure IB recognizes it)
        qualified = await self.ib.qualifyContractsAsync(contract)
        if not qualified:
            logger.warning(f"IBDataFeed: Could not qualify contract for {symbol}")
            continue

        # Request market data
        ticker = self.ib.reqMktData(contract, "", False, False)
        self._tickers[symbol] = ticker

    # Register callback for ticker updates
    self.ib.pendingTickersEvent += self._on_pending_tickers

    logger.info(f"IBDataFeed: Subscribed to {len(self._tickers)} symbols")

stop

stop()

Unsubscribe from market data.

Cancels all market data subscriptions and stops feed.

Source code in src/ml4t/live/feeds/ib_feed.py
def stop(self) -> None:
    """Unsubscribe from market data.

    Cancels all market data subscriptions and stops feed.
    """
    logger.info("IBDataFeed: Stopping feed")
    self._running = False

    # Unsubscribe from all tickers
    for symbol, contract in self._contracts.items():
        try:
            self.ib.cancelMktData(contract)
        except Exception as e:
            logger.warning(f"IBDataFeed: Error canceling {symbol}: {e}")

    # Remove callback
    self.ib.pendingTickersEvent -= self._on_pending_tickers

    # Signal consumer to exit
    self._queue.put_nowait(None)

    logger.info(
        f"IBDataFeed: Stopped. Ticks: {self._tick_count}, Throttled: {self._throttled_count}"
    )

__aiter__ async

__aiter__()

Async iterator yielding market data.

Yields:

Type Description
AsyncIterator[tuple[datetime, dict, dict]]

Tuple of (timestamp, data, context) where:

AsyncIterator[tuple[datetime, dict, dict]]
  • timestamp: datetime of tick
AsyncIterator[tuple[datetime, dict, dict]]
  • data: {symbol: {'price': float, 'size': int}}
AsyncIterator[tuple[datetime, dict, dict]]
  • context: {symbol: {'bid', 'ask', 'bid_size', 'ask_size', 'volume'}}
Stops when
  • stop() is called (None sentinel)
  • Feed is not running
Source code in src/ml4t/live/feeds/ib_feed.py
async def __aiter__(self) -> AsyncIterator[tuple[datetime, dict, dict]]:
    """Async iterator yielding market data.

    Yields:
        Tuple of (timestamp, data, context) where:
        - timestamp: datetime of tick
        - data: {symbol: {'price': float, 'size': int}}
        - context: {symbol: {'bid', 'ask', 'bid_size', 'ask_size', 'volume'}}

    Stops when:
        - stop() is called (None sentinel)
        - Feed is not running
    """
    while self._running:
        item = await self._queue.get()

        # None sentinel signals shutdown
        if item is None:
            break

        yield item

AlpacaDataFeed

AlpacaDataFeed(
    api_key,
    secret_key,
    symbols,
    *,
    data_type="bars",
    feed="iex",
)

Bases: DataFeedProtocol

Real-time market data feed from Alpaca Markets.

Subscribes to real-time data for specified symbols. Supports both stocks and crypto.

Data Types

bars: OHLCV minute bars (default, recommended for strategies) quotes: Bid/ask quotes (for spread-sensitive strategies) trades: Individual trades (highest frequency)

Data Feeds

iex: Free tier (limited data) sip: Premium (full market data, requires subscription)

Data Format

timestamp: datetime - Bar/quote/trade timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Additional metadata

Example

Stocks only

feed = AlpacaDataFeed( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', symbols=['AAPL', 'MSFT'], )

Mixed stocks and crypto

feed = AlpacaDataFeed( api_key='PKXXXXXXXX', secret_key='XXXXXXXXXX', symbols=['AAPL', 'BTC/USD', 'ETH/USD'], )

await feed.start()

async for timestamp, data, context in feed: strategy.on_data(timestamp, data, context, broker)

Initialize Alpaca data feed.

Parameters:

Name Type Description Default
api_key str

Alpaca API key

required
secret_key str

Alpaca secret key

required
symbols list[str]

List of symbols (e.g., ['AAPL', 'BTC/USD'])

required
data_type str

Type of data - 'bars' (default), 'quotes', or 'trades'

'bars'
feed str

Data feed type - 'iex' (free) or 'sip' (premium)

'iex'
Source code in src/ml4t/live/feeds/alpaca_feed.py
def __init__(
    self,
    api_key: str,
    secret_key: str,
    symbols: list[str],
    *,
    data_type: str = "bars",  # 'bars', 'quotes', 'trades'
    feed: str = "iex",  # 'iex' (free) or 'sip' (premium)
):
    """Initialize Alpaca data feed.

    Args:
        api_key: Alpaca API key
        secret_key: Alpaca secret key
        symbols: List of symbols (e.g., ['AAPL', 'BTC/USD'])
        data_type: Type of data - 'bars' (default), 'quotes', or 'trades'
        feed: Data feed type - 'iex' (free) or 'sip' (premium)
    """
    self._api_key = api_key
    self._secret_key = secret_key
    self._data_type = data_type
    self._feed = feed

    # Separate stock and crypto symbols
    self._stock_symbols = [s for s in symbols if not self._is_crypto(s)]
    self._crypto_symbols = [s for s in symbols if self._is_crypto(s)]

    # Streams (created in start())
    self._stock_stream: StockDataStream | None = None
    self._crypto_stream: CryptoDataStream | None = None
    self._stream_tasks: list[asyncio.Task] = []

    # State
    self._queue: asyncio.Queue = asyncio.Queue()
    self._running = False

    # Statistics
    self._bar_count = 0
    self._quote_count = 0
    self._trade_count = 0

stats property

stats

Get feed statistics.

Returns:

Type Description
dict[str, Any]

Dict with keys:

dict[str, Any]
  • running: bool
dict[str, Any]
  • bar_count: int
dict[str, Any]
  • quote_count: int
dict[str, Any]
  • trade_count: int
dict[str, Any]
  • stock_symbols: list[str]
dict[str, Any]
  • crypto_symbols: list[str]

start async

start()

Subscribe to market data for all symbols.

Creates streams and subscribes to real-time data.

Source code in src/ml4t/live/feeds/alpaca_feed.py
async def start(self) -> None:
    """Subscribe to market data for all symbols.

    Creates streams and subscribes to real-time data.
    """
    logger.info(
        f"AlpacaDataFeed: Starting feed for "
        f"{len(self._stock_symbols)} stocks, {len(self._crypto_symbols)} crypto"
    )
    self._running = True

    # Create stock stream if we have stock symbols
    if self._stock_symbols:
        # Convert string feed to DataFeed enum
        feed_enum = DataFeed.IEX if self._feed.lower() == "iex" else DataFeed.SIP
        self._stock_stream = StockDataStream(
            api_key=self._api_key,
            secret_key=self._secret_key,
            feed=feed_enum,
        )

        # Subscribe based on data type
        if self._data_type == "bars":
            self._stock_stream.subscribe_bars(self._on_stock_bar, *self._stock_symbols)
        elif self._data_type == "quotes":
            self._stock_stream.subscribe_quotes(self._on_stock_quote, *self._stock_symbols)
        elif self._data_type == "trades":
            self._stock_stream.subscribe_trades(self._on_stock_trade, *self._stock_symbols)

        # Start stream in background
        task = asyncio.create_task(self._run_stock_stream())
        self._stream_tasks.append(task)

    # Create crypto stream if we have crypto symbols
    if self._crypto_symbols:
        self._crypto_stream = CryptoDataStream(
            api_key=self._api_key,
            secret_key=self._secret_key,
        )

        # Subscribe based on data type
        if self._data_type == "bars":
            self._crypto_stream.subscribe_bars(self._on_crypto_bar, *self._crypto_symbols)
        elif self._data_type == "quotes":
            self._crypto_stream.subscribe_quotes(self._on_crypto_quote, *self._crypto_symbols)
        elif self._data_type == "trades":
            self._crypto_stream.subscribe_trades(self._on_crypto_trade, *self._crypto_symbols)

        # Start stream in background
        task = asyncio.create_task(self._run_crypto_stream())
        self._stream_tasks.append(task)

    logger.info("AlpacaDataFeed: Subscriptions started")

stop

stop()

Stop data feed.

Closes all streams and signals consumer to exit.

Source code in src/ml4t/live/feeds/alpaca_feed.py
def stop(self) -> None:
    """Stop data feed.

    Closes all streams and signals consumer to exit.
    """
    logger.info("AlpacaDataFeed: Stopping feed")
    self._running = False

    # Cancel stream tasks
    for task in self._stream_tasks:
        if not task.done():
            task.cancel()

    # Stop streams
    if self._stock_stream:
        try:
            self._stock_stream.stop()
        except Exception as e:
            logger.warning(f"AlpacaDataFeed: Error stopping stock stream: {e}")

    if self._crypto_stream:
        try:
            self._crypto_stream.stop()
        except Exception as e:
            logger.warning(f"AlpacaDataFeed: Error stopping crypto stream: {e}")

    # Signal consumer to exit
    self._queue.put_nowait(None)

    logger.info(
        f"AlpacaDataFeed: Stopped. "
        f"Bars: {self._bar_count}, Quotes: {self._quote_count}, Trades: {self._trade_count}"
    )

__aiter__ async

__aiter__()

Async iterator yielding market data.

Yields:

Type Description
AsyncIterator[tuple[datetime, dict, dict]]

Tuple of (timestamp, data, context) where:

AsyncIterator[tuple[datetime, dict, dict]]
  • timestamp: datetime of bar/quote/trade
AsyncIterator[tuple[datetime, dict, dict]]
  • data: {symbol: {'open', 'high', 'low', 'close', 'volume'}} for bars
AsyncIterator[tuple[datetime, dict, dict]]
  • context: {symbol: additional metadata}
Stops when
  • stop() is called (None sentinel)
  • Feed is not running
Source code in src/ml4t/live/feeds/alpaca_feed.py
async def __aiter__(self) -> AsyncIterator[tuple[datetime, dict, dict]]:
    """Async iterator yielding market data.

    Yields:
        Tuple of (timestamp, data, context) where:
        - timestamp: datetime of bar/quote/trade
        - data: {symbol: {'open', 'high', 'low', 'close', 'volume'}} for bars
        - context: {symbol: additional metadata}

    Stops when:
        - stop() is called (None sentinel)
        - Feed is not running
    """
    while self._running:
        item = await self._queue.get()

        # None sentinel signals shutdown
        if item is None:
            break

        yield item

__anext__ async

__anext__()

Get next data item.

Returns:

Type Description
tuple[datetime, dict[str, Any], dict[str, Any]]

Tuple of (timestamp, data, context)

Raises:

Type Description
StopAsyncIteration

When feed is stopped

Source code in src/ml4t/live/feeds/alpaca_feed.py
async def __anext__(self) -> tuple[datetime, dict[str, Any], dict[str, Any]]:
    """Get next data item.

    Returns:
        Tuple of (timestamp, data, context)

    Raises:
        StopAsyncIteration: When feed is stopped
    """
    if not self._running:
        raise StopAsyncIteration

    item = await self._queue.get()
    if item is None:
        raise StopAsyncIteration

    return item

DataBentoFeed

DataBentoFeed(
    client, symbols, *, mode="historical", replay_speed=1.0
)

Bases: DataFeedProtocol

Market data feed from DataBento.

Supports both historical replay and real-time streaming.

Historical Mode
  • Reads from .dbn files (DataBento native format)
  • Replays at historical speed or accelerated
  • Perfect for strategy validation
Real-time Mode
  • Streams live market data
  • Supports multiple datasets (GLBX, XNAS, OPRA, etc.)
  • Low-latency tick data
Data Format

timestamp: datetime - Event timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Additional fields (bid, ask, trade_count, etc.)

Example Historical

feed = DataBentoFeed.from_file( 'ES_202401.dbn', symbols=['ES.FUT'], replay_speed=10.0, # 10x speed )

Example Real-time

feed = DataBentoFeed.from_live( api_key=os.getenv('DATABENTO_API_KEY'), dataset='GLBX.MDP3', schema='ohlcv-1s', symbols=['ES.c.0', 'NQ.c.0'], )

Initialize DataBento feed.

Parameters:

Name Type Description Default
client Historical | Live

DataBento client (Historical or Live)

required
symbols list[str]

List of symbols to subscribe to

required
mode str

'historical' or 'live'

'historical'
replay_speed float

Playback speed multiplier (historical only) 1.0 = real-time, 10.0 = 10x speed

1.0
Source code in src/ml4t/live/feeds/databento_feed.py
def __init__(
    self,
    client: "db.Historical | db.Live",
    symbols: list[str],
    *,
    mode: str = "historical",
    replay_speed: float = 1.0,
):
    """Initialize DataBento feed.

    Args:
        client: DataBento client (Historical or Live)
        symbols: List of symbols to subscribe to
        mode: 'historical' or 'live'
        replay_speed: Playback speed multiplier (historical only)
            1.0 = real-time, 10.0 = 10x speed
    """
    if not DATABENTO_AVAILABLE:
        raise ImportError("databento package required. Install with: pip install databento")

    self.client = client
    self.symbols = symbols
    self.mode = mode
    self.replay_speed = replay_speed

    # State
    self._queue: asyncio.Queue = asyncio.Queue()
    self._running = False
    self._replay_task: asyncio.Task | None = None

    # Statistics
    self._record_count = 0

stats property

stats

Get feed statistics.

from_file classmethod

from_file(file_path, symbols, *, replay_speed=1.0)

Create feed from historical .dbn file.

Parameters:

Name Type Description Default
file_path str | Path

Path to .dbn file

required
symbols list[str]

Symbols to filter (or all if empty)

required
replay_speed float

Playback speed (1.0 = real-time)

1.0

Returns:

Type Description
DataBentoFeed

DataBentoFeed configured for historical replay

Source code in src/ml4t/live/feeds/databento_feed.py
@classmethod
def from_file(
    cls,
    file_path: str | Path,
    symbols: list[str],
    *,
    replay_speed: float = 1.0,
) -> "DataBentoFeed":
    """Create feed from historical .dbn file.

    Args:
        file_path: Path to .dbn file
        symbols: Symbols to filter (or all if empty)
        replay_speed: Playback speed (1.0 = real-time)

    Returns:
        DataBentoFeed configured for historical replay
    """
    if not DATABENTO_AVAILABLE:
        raise ImportError("databento package not installed")

    # Read file
    store = db.DBNStore.from_file(file_path)

    return cls(
        client=store,
        symbols=symbols,
        mode="historical",
        replay_speed=replay_speed,
    )

from_live classmethod

from_live(api_key, dataset, schema, symbols)

Create feed for real-time streaming.

Parameters:

Name Type Description Default
api_key str

DataBento API key

required
dataset str

Dataset code (e.g., 'GLBX.MDP3', 'XNAS.ITCH')

required
schema str

Data schema (e.g., 'ohlcv-1s', 'mbp-10', 'trades')

required
symbols list[str]

Symbols to subscribe to

required

Returns:

Type Description
DataBentoFeed

DataBentoFeed configured for live streaming

Source code in src/ml4t/live/feeds/databento_feed.py
@classmethod
def from_live(
    cls,
    api_key: str,
    dataset: str,
    schema: str,
    symbols: list[str],
) -> "DataBentoFeed":
    """Create feed for real-time streaming.

    Args:
        api_key: DataBento API key
        dataset: Dataset code (e.g., 'GLBX.MDP3', 'XNAS.ITCH')
        schema: Data schema (e.g., 'ohlcv-1s', 'mbp-10', 'trades')
        symbols: Symbols to subscribe to

    Returns:
        DataBentoFeed configured for live streaming
    """
    if not DATABENTO_AVAILABLE:
        raise ImportError("databento package not installed")

    client = db.Live(key=api_key)

    # Configure subscription
    client.subscribe(
        dataset=dataset,
        schema=schema,
        symbols=symbols,
    )

    return cls(
        client=client,
        symbols=symbols,
        mode="live",
    )

start async

start()

Start data feed.

Historical mode: Begins replay task Live mode: Starts streaming subscription

Source code in src/ml4t/live/feeds/databento_feed.py
async def start(self) -> None:
    """Start data feed.

    Historical mode: Begins replay task
    Live mode: Starts streaming subscription
    """
    logger.info(f"DataBentoFeed: Starting {self.mode} feed for {len(self.symbols)} symbols")
    self._running = True

    if self.mode == "historical":
        # Start replay task
        self._replay_task = asyncio.create_task(self._replay_historical())
    elif self.mode == "live":
        # Start live streaming task
        self._replay_task = asyncio.create_task(self._stream_live())

    logger.info("DataBentoFeed: Feed started")

stop

stop()

Stop data feed.

Source code in src/ml4t/live/feeds/databento_feed.py
def stop(self) -> None:
    """Stop data feed."""
    logger.info("DataBentoFeed: Stopping feed")
    self._running = False

    # Cancel replay task
    if self._replay_task:
        self._replay_task.cancel()

    self._signal_stop()

    logger.info(f"DataBentoFeed: Stopped. Records: {self._record_count}")

__aiter__ async

__aiter__()

Async iterator yielding market data.

Yields:

Type Description
AsyncIterator[tuple[datetime, dict, dict]]

Tuple of (timestamp, data, context)

Source code in src/ml4t/live/feeds/databento_feed.py
async def __aiter__(self) -> AsyncIterator[tuple[datetime, dict, dict]]:
    """Async iterator yielding market data.

    Yields:
        Tuple of (timestamp, data, context)
    """
    while True:
        item = await self._queue.get()

        if item is None:  # Shutdown sentinel
            break

        yield item

CryptoFeed

CryptoFeed(
    exchange,
    symbols,
    *,
    timeframe="1m",
    stream_trades=False,
    stream_ohlcv=True,
    api_key=None,
    api_secret=None,
    api_passphrase=None,
)

Bases: DataFeedProtocol

Cryptocurrency market data feed via CCXT.

Provides unified interface to 100+ crypto exchanges. Supports both REST (polling) and WebSocket (streaming).

Data Format

timestamp: datetime - Candle/trade timestamp data: dict[str, dict] - {symbol: {'open', 'high', 'low', 'close', 'volume'}} context: dict - Exchange-specific metadata

Exchange Symbols
  • Binance: 'BTC/USDT', 'ETH/USDT'
  • Coinbase: 'BTC-USD', 'ETH-USD'
  • Kraken: 'BTC/USD', 'ETH/USD'
Timeframes

'1m', '5m', '15m', '1h', '4h', '1d'

Example WebSocket (Real-time): feed = CryptoFeed( exchange='binance', symbols=['BTC/USDT', 'ETH/USDT'], stream_trades=True, # Stream trades (fastest) )

Example OHLCV Bars

feed = CryptoFeed( exchange='binance', symbols=['BTC/USDT'], timeframe='1m', stream_ohlcv=True, )

Example Authenticated

feed = CryptoFeed( exchange='binance', symbols=['BTC/USDT'], api_key='your-key', api_secret='your-secret', )

Initialize crypto feed.

Parameters:

Name Type Description Default
exchange str

Exchange ID (e.g., 'binance', 'coinbasepro', 'kraken')

required
symbols list[str]

Trading pairs (e.g., ['BTC/USDT', 'ETH/USDT'])

required
timeframe str

OHLCV timeframe ('1m', '5m', '1h', etc.)

'1m'
stream_trades bool

Stream trade ticks (faster updates)

False
stream_ohlcv bool

Stream OHLCV candles

True
api_key str | None

API key (for authenticated endpoints)

None
api_secret str | None

API secret

None
api_passphrase str | None

API passphrase (Coinbase only)

None
Source code in src/ml4t/live/feeds/crypto_feed.py
def __init__(
    self,
    exchange: str,
    symbols: list[str],
    *,
    timeframe: str = "1m",
    stream_trades: bool = False,
    stream_ohlcv: bool = True,
    api_key: str | None = None,
    api_secret: str | None = None,
    api_passphrase: str | None = None,
):
    """Initialize crypto feed.

    Args:
        exchange: Exchange ID (e.g., 'binance', 'coinbasepro', 'kraken')
        symbols: Trading pairs (e.g., ['BTC/USDT', 'ETH/USDT'])
        timeframe: OHLCV timeframe ('1m', '5m', '1h', etc.)
        stream_trades: Stream trade ticks (faster updates)
        stream_ohlcv: Stream OHLCV candles
        api_key: API key (for authenticated endpoints)
        api_secret: API secret
        api_passphrase: API passphrase (Coinbase only)
    """
    if not CCXT_AVAILABLE:
        raise ImportError("ccxt package required. Install with: pip install ccxt[asyncio]")

    self.exchange_id = exchange
    self.symbols = symbols
    self.timeframe = timeframe
    self.stream_trades = stream_trades
    self.stream_ohlcv = stream_ohlcv

    # Create exchange instance
    exchange_class = getattr(ccxt, exchange)
    config = {
        "enableRateLimit": True,
    }

    if api_key:
        config["apiKey"] = api_key
    if api_secret:
        config["secret"] = api_secret
    if api_passphrase:
        config["password"] = api_passphrase

    self.exchange = exchange_class(config)

    # State
    self._queue: asyncio.Queue = asyncio.Queue()
    self._running = False
    self._stream_tasks: list[asyncio.Task] = []

    # Statistics
    self._tick_count = 0
    self._trade_count = 0
    self._candle_count = 0

stats property

stats

Get feed statistics.

start async

start()

Start streaming market data.

Initiates WebSocket subscriptions for all symbols.

Source code in src/ml4t/live/feeds/crypto_feed.py
async def start(self) -> None:
    """Start streaming market data.

    Initiates WebSocket subscriptions for all symbols.
    """
    logger.info(f"CryptoFeed: Starting {self.exchange_id} feed for {len(self.symbols)} symbols")
    self._running = True

    # Load markets
    await self.exchange.load_markets()

    # Start streaming tasks
    for symbol in self.symbols:
        if self.stream_trades:
            task = asyncio.create_task(self._stream_trades_for_symbol(symbol))
            self._stream_tasks.append(task)

        if self.stream_ohlcv:
            task = asyncio.create_task(self._stream_ohlcv_for_symbol(symbol))
            self._stream_tasks.append(task)

    logger.info(f"CryptoFeed: Started {len(self._stream_tasks)} stream(s)")

stop

stop()

Stop streaming and close exchange connection.

Source code in src/ml4t/live/feeds/crypto_feed.py
def stop(self) -> None:
    """Stop streaming and close exchange connection."""
    logger.info("CryptoFeed: Stopping feed")
    self._running = False

    # Cancel all streaming tasks
    for task in self._stream_tasks:
        task.cancel()

    # Signal consumer
    self._queue.put_nowait(None)

    logger.info(
        f"CryptoFeed: Stopped. "
        f"Ticks: {self._tick_count}, Trades: {self._trade_count}, "
        f"Candles: {self._candle_count}"
    )

__aiter__ async

__aiter__()

Async iterator yielding market data.

Yields:

Type Description
AsyncIterator[tuple[datetime, dict, dict]]

Tuple of (timestamp, data, context)

Source code in src/ml4t/live/feeds/crypto_feed.py
async def __aiter__(self) -> AsyncIterator[tuple[datetime, dict, dict]]:
    """Async iterator yielding market data.

    Yields:
        Tuple of (timestamp, data, context)
    """
    while self._running:
        item = await self._queue.get()

        if item is None:  # Shutdown sentinel
            break

        yield item

close async

close()

Close exchange connection.

Should be called in finally block.

Source code in src/ml4t/live/feeds/crypto_feed.py
async def close(self) -> None:
    """Close exchange connection.

    Should be called in finally block.
    """
    await self.exchange.close()

OKXFundingFeed

OKXFundingFeed(
    symbols, *, timeframe="1H", poll_interval_seconds=60.0
)

Bases: DataFeedProtocol

OKX funding rate feed with OHLCV bars.

Combines price data with funding rate information for ML strategies that trade crypto perpetual futures based on funding rate signals.

Data Flow
  1. Poll /market/candles for latest OHLCV bar
  2. Poll /public/funding-rate for current funding rate
  3. Combine into (timestamp, data, context) tuple
  4. Emit to strategy
Symbol Format

OKX perpetual swaps use format: BTC-USDT-SWAP, ETH-USDT-SWAP

Initialize OKX funding rate feed.

Parameters:

Name Type Description Default
symbols list[str]

List of perpetual swap symbols (e.g., ['BTC-USDT-SWAP'])

required
timeframe str

OHLCV bar timeframe ('1m', '1H', '4H', '1D')

'1H'
poll_interval_seconds float

How often to poll for new data

60.0
Source code in src/ml4t/live/feeds/okx_feed.py
def __init__(
    self,
    symbols: list[str],
    *,
    timeframe: str = "1H",
    poll_interval_seconds: float = 60.0,
):
    """Initialize OKX funding rate feed.

    Args:
        symbols: List of perpetual swap symbols (e.g., ['BTC-USDT-SWAP'])
        timeframe: OHLCV bar timeframe ('1m', '1H', '4H', '1D')
        poll_interval_seconds: How often to poll for new data
    """
    self.symbols = symbols
    self.timeframe = timeframe
    self.poll_interval = poll_interval_seconds

    # State
    self._queue: asyncio.Queue = asyncio.Queue()
    self._running = False
    self._poll_task: asyncio.Task | None = None
    self._client: httpx.AsyncClient | None = None

    # Track last emitted timestamp per symbol to avoid duplicates
    self._last_timestamps: dict[str, datetime | None] = dict.fromkeys(symbols)

    # Statistics
    self._bar_count = 0
    self._funding_updates = 0

stats property

stats

Get feed statistics.

start async

start()

Start the OKX data feed.

Begins polling for OHLCV and funding rate data.

Source code in src/ml4t/live/feeds/okx_feed.py
async def start(self) -> None:
    """Start the OKX data feed.

    Begins polling for OHLCV and funding rate data.
    """
    logger.info(f"OKXFundingFeed: Starting feed for {len(self.symbols)} symbols")
    self._running = True

    # Create async HTTP client
    self._client = httpx.AsyncClient(timeout=30.0)

    # Start polling task
    self._poll_task = asyncio.create_task(self._poll_loop())

    logger.info(f"OKXFundingFeed: Started polling every {self.poll_interval}s")

stop

stop()

Stop the data feed.

Source code in src/ml4t/live/feeds/okx_feed.py
def stop(self) -> None:
    """Stop the data feed."""
    logger.info("OKXFundingFeed: Stopping feed")
    self._running = False

    if self._poll_task:
        self._poll_task.cancel()

    # Signal consumer
    self._queue.put_nowait(None)

    logger.info(
        f"OKXFundingFeed: Stopped. Bars: {self._bar_count}, "
        f"Funding updates: {self._funding_updates}"
    )

close async

close()

Close HTTP client.

Source code in src/ml4t/live/feeds/okx_feed.py
async def close(self) -> None:
    """Close HTTP client."""
    if self._client:
        await self._client.aclose()

__aiter__

__aiter__()

Return async iterator.

Source code in src/ml4t/live/feeds/okx_feed.py
def __aiter__(self):
    """Return async iterator."""
    return self

__anext__ async

__anext__()

Get next bar with funding data.

Returns:

Type Description
tuple[datetime, dict[str, Any], dict[str, Any]]

(timestamp, data, context) tuple

Raises:

Type Description
StopAsyncIteration

When feed stops

Source code in src/ml4t/live/feeds/okx_feed.py
async def __anext__(self) -> tuple[datetime, dict[str, Any], dict[str, Any]]:
    """Get next bar with funding data.

    Returns:
        (timestamp, data, context) tuple

    Raises:
        StopAsyncIteration: When feed stops
    """
    item = await self._queue.get()

    if item is None:  # Shutdown sentinel
        raise StopAsyncIteration

    return item

BarAggregator

BarAggregator(
    source_feed,
    bar_size_minutes=1,
    assets=None,
    flush_timeout_seconds=2.0,
)

Aggregates raw ticks or 5-second bars into minute bars.

Addresses Gemini's concerns: 1. "If IBDataFeed pushes a tick to Strategy.on_data, the strategy might trigger 60x more often than intended." - Buffer incoming data. 2. "The 15:59 bar is never emitted because no 16:00 tick arrives." - Background flush checker emits bars on timeout.

The aggregator buffers incoming data and emits when: - A bar boundary is crossed (new tick arrives in next minute) - OR timeout expires (2s past bar end with no new data)

Example

raw_feed = IBTickFeed(ib, assets=['AAPL']) aggregated_feed = BarAggregator(raw_feed, bar_size_minutes=1)

async for timestamp, data, context in aggregated_feed: # data contains completed minute bars only strategy.on_data(timestamp, data, context, broker)

Initialize BarAggregator.

Parameters:

Name Type Description Default
source_feed DataFeedProtocol

Raw tick or sub-minute bar feed

required
bar_size_minutes int

Output bar size in minutes (default: 1)

1
assets list[str] | None

List of assets to track (default: all from source)

None
flush_timeout_seconds float

Seconds after bar end before forcing emit (default: 2.0)

2.0
Source code in src/ml4t/live/feeds/aggregator.py
def __init__(
    self,
    source_feed: "DataFeedProtocol",
    bar_size_minutes: int = 1,
    assets: list[str] | None = None,
    flush_timeout_seconds: float = 2.0,
):
    """Initialize BarAggregator.

    Args:
        source_feed: Raw tick or sub-minute bar feed
        bar_size_minutes: Output bar size in minutes (default: 1)
        assets: List of assets to track (default: all from source)
        flush_timeout_seconds: Seconds after bar end before forcing emit (default: 2.0)
    """
    self.source = source_feed
    self.bar_size = timedelta(minutes=bar_size_minutes)
    self.assets = assets or []
    self.flush_timeout = flush_timeout_seconds

    # Per-asset bar buffers
    self._buffers: dict[str, BarBuffer] = {}
    self._current_bar_start: datetime | None = None
    self._last_data_time: float = 0  # Track when we last got data

    # Output queue (use None sentinel for shutdown instead of timeout)
    self._queue: asyncio.Queue = asyncio.Queue()
    self._running = False
    self._aggregate_task: asyncio.Task | None = None
    self._flush_task: asyncio.Task | None = None

start async

start()

Start aggregation.

Source code in src/ml4t/live/feeds/aggregator.py
async def start(self) -> None:
    """Start aggregation."""
    self._running = True
    await self.source.start()

    # Start aggregation task
    self._aggregate_task = asyncio.create_task(self._aggregate_loop())

stop

stop()

Stop aggregation.

Source code in src/ml4t/live/feeds/aggregator.py
def stop(self) -> None:
    """Stop aggregation."""
    self._running = False
    self.source.stop()
    self._signal_stop()
    if self._aggregate_task:
        self._aggregate_task.cancel()
    # Cancel flush task
    if self._flush_task:
        self._flush_task.cancel()

__aiter__ async

__aiter__()

Async iterator interface.

Uses None sentinel for shutdown (Gemini fix: avoids busy-wait with 1s timeout).

Yields:

Type Description
AsyncIterator[tuple[datetime, dict, dict]]

Tuple of (timestamp, data, context) where data is {asset: ohlcv_dict}

Source code in src/ml4t/live/feeds/aggregator.py
async def __aiter__(self) -> AsyncIterator[tuple[datetime, dict, dict]]:
    """Async iterator interface.

    Uses None sentinel for shutdown (Gemini fix: avoids busy-wait with 1s timeout).

    Yields:
        Tuple of (timestamp, data, context) where data is {asset: ohlcv_dict}
    """
    while True:
        item = await self._queue.get()
        if item is None:  # Shutdown sentinel
            break
        yield item

Safety Types

RiskLimitError

Bases: Exception

Raised when an order violates risk limits.

RiskState dataclass

RiskState(
    date,
    daily_loss=0.0,
    orders_placed=0,
    high_water_mark=0.0,
    kill_switch_activated=False,
    kill_switch_reason="",
)

Persisted risk state - survives restarts.

This state is saved to disk after every order and on shutdown. Uses atomic JSON writes (write to .tmp then os.replace) to prevent corruption.

Example

state = RiskState(date="2023-10-15", daily_loss=1500.0) state.kill_switch_activated = True state.kill_switch_reason = "Max daily loss exceeded"

Note

The kill switch state persists across restarts and must be manually reset by deleting the state file or setting kill_switch_activated=False.

from_dict classmethod

from_dict(data)

Create RiskState from dictionary (for JSON loading).

Parameters:

Name Type Description Default
data dict

Dictionary with RiskState fields

required

Returns:

Type Description
RiskState

RiskState instance

Source code in src/ml4t/live/safety.py
@classmethod
def from_dict(cls, data: dict) -> "RiskState":
    """Create RiskState from dictionary (for JSON loading).

    Args:
        data: Dictionary with RiskState fields

    Returns:
        RiskState instance
    """
    return cls(**data)

to_dict

to_dict()

Convert to dictionary (for JSON saving).

Returns:

Type Description
dict[str, Any]

Dictionary with all fields

Source code in src/ml4t/live/safety.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary (for JSON saving).

    Returns:
        Dictionary with all fields
    """
    return {
        "date": self.date,
        "daily_loss": self.daily_loss,
        "orders_placed": self.orders_placed,
        "high_water_mark": self.high_water_mark,
        "kill_switch_activated": self.kill_switch_activated,
        "kill_switch_reason": self.kill_switch_reason,
    }

save_atomic staticmethod

save_atomic(state, filepath)

Save state with atomic write (write to .tmp then os.replace).

This prevents corruption if process dies mid-write.

Parameters:

Name Type Description Default
state RiskState

RiskState to save

required
filepath str

Path to save to

required

Raises:

Type Description
OSError

If write fails

Source code in src/ml4t/live/safety.py
@staticmethod
def save_atomic(state: "RiskState", filepath: str) -> None:
    """Save state with atomic write (write to .tmp then os.replace).

    This prevents corruption if process dies mid-write.

    Args:
        state: RiskState to save
        filepath: Path to save to

    Raises:
        OSError: If write fails
    """
    tmp_file = f"{filepath}.tmp"
    with open(tmp_file, "w") as f:
        json.dump(state.to_dict(), f, indent=2)

    # Atomic rename (POSIX guarantees atomicity)
    os.replace(tmp_file, filepath)

load staticmethod

load(filepath)

Load state from file.

Parameters:

Name Type Description Default
filepath str

Path to load from

required

Returns:

Type Description
RiskState | None

RiskState if file exists and is valid, None otherwise

Source code in src/ml4t/live/safety.py
@staticmethod
def load(filepath: str) -> "RiskState | None":
    """Load state from file.

    Args:
        filepath: Path to load from

    Returns:
        RiskState if file exists and is valid, None otherwise
    """
    path = Path(filepath)
    if not path.exists():
        return None

    try:
        with open(filepath) as f:
            data = json.load(f)
        return RiskState.from_dict(data)
    except (json.JSONDecodeError, TypeError, KeyError) as e:
        # Corrupted file or invalid format
        print(f"Warning: Could not load risk state from {filepath}: {e}")
        return None

create_for_today staticmethod

create_for_today()

Create new state for today's date.

Returns:

Type Description
RiskState

RiskState with today's date and default values

Source code in src/ml4t/live/safety.py
@staticmethod
def create_for_today() -> "RiskState":
    """Create new state for today's date.

    Returns:
        RiskState with today's date and default values
    """
    return RiskState(date=datetime.now().strftime("%Y-%m-%d"))

VirtualPortfolio

VirtualPortfolio(initial_cash=100000.0)

Manages internal accounting for Shadow Mode (Paper Trading).

Addresses Gemini's Critical Issue A: "The Infinite Buy Loop"

Problem: In shadow mode, returning fake Order objects without updating position state causes strategies to keep buying forever because get_position() always returns None.

Solution: Track shadow positions locally. When shadow_mode=True: - submit_order() updates this virtual portfolio - positions/get_position() return from this portfolio - Strategy sees realistic position state

Handles: - New positions - Position increases (weighted avg cost basis) - Position decreases (partial close) - Position close (quantity = 0) - Position flip (long -> short or vice versa)

Example

portfolio = VirtualPortfolio(initial_cash=100_000.0)

Simulate buy order fill

order = Order( asset="AAPL", side=OrderSide.BUY, quantity=100, filled_price=150.0, filled_quantity=100, ... ) portfolio.process_fill(order)

Check position

pos = portfolio.positions.get("AAPL") assert pos.quantity == 100 assert pos.entry_price == 150.0

Simulate sell order fill (close)

sell_order = Order( asset="AAPL", side=OrderSide.SELL, quantity=100, filled_price=155.0, filled_quantity=100, ... ) portfolio.process_fill(sell_order) assert "AAPL" not in portfolio.positions

Initialize virtual portfolio.

Parameters:

Name Type Description Default
initial_cash float

Starting cash balance (default: 100,000)

100000.0
Source code in src/ml4t/live/safety.py
def __init__(self, initial_cash: float = 100_000.0):
    """Initialize virtual portfolio.

    Args:
        initial_cash: Starting cash balance (default: 100,000)
    """
    self._initial_cash = initial_cash
    self._cash = initial_cash
    self._positions: dict[str, Position] = {}

positions property

positions

Get current positions (returns copy for safety).

Returns:

Type Description
dict[str, Position]

Dictionary mapping asset symbol to Position

cash property

cash

Get current cash balance.

Returns:

Type Description
float

Available cash

account_value property

account_value

Get total account value (cash + position market value).

Returns:

Type Description
float

Total account value

process_fill

process_fill(order)

Update state based on filled shadow order.

Handles: - Weighted average cost basis for position increases - Position flipping (long -> short or vice versa) - Partial and full closes

Parameters:

Name Type Description Default
order Order

Filled Order object (must have filled_quantity and filled_price)

required
Source code in src/ml4t/live/safety.py
def process_fill(self, order: Order) -> None:
    """Update state based on filled shadow order.

    Handles:
    - Weighted average cost basis for position increases
    - Position flipping (long -> short or vice versa)
    - Partial and full closes

    Args:
        order: Filled Order object (must have filled_quantity and filled_price)
    """
    if not order.filled_quantity or not order.filled_price:
        logger.warning(f"VirtualPortfolio: Order {order.order_id} has no fill info")
        return

    asset = order.asset
    fill_qty = order.filled_quantity
    fill_price = order.filled_price
    transaction_value = fill_qty * fill_price

    # Cash impact
    if order.side == OrderSide.BUY:
        self._cash -= transaction_value
        signed_qty = fill_qty
    else:
        self._cash += transaction_value
        signed_qty = -fill_qty

    current = self._positions.get(asset)

    if current is None:
        # New position
        self._positions[asset] = Position(
            asset=asset,
            quantity=signed_qty,
            entry_price=fill_price,
            entry_time=datetime.now(),
            current_price=fill_price,
        )
        logger.info(
            f"Shadow: Opened {asset} {'LONG' if signed_qty > 0 else 'SHORT'} {abs(signed_qty)}"
        )

    else:
        old_qty = current.quantity
        new_qty = old_qty + signed_qty

        if new_qty == 0:
            # Position closed
            del self._positions[asset]
            logger.info(f"Shadow: Closed {asset}")

        elif (old_qty > 0 and new_qty < 0) or (old_qty < 0 and new_qty > 0):
            # Position flipped (e.g., Long 100 -> Sell 200 -> Short 100)
            self._positions[asset] = Position(
                asset=asset,
                quantity=new_qty,
                entry_price=fill_price,  # Reset basis on flip
                entry_time=datetime.now(),
                current_price=fill_price,
            )
            logger.info(f"Shadow: Flipped {asset} to {new_qty}")

        elif abs(new_qty) > abs(old_qty):
            # Increasing position - weighted average cost basis
            total_old = old_qty * current.entry_price
            total_new = signed_qty * fill_price
            new_avg = (total_old + total_new) / new_qty
            current.quantity = new_qty
            current.entry_price = abs(new_avg)
            current.current_price = fill_price
            logger.info(
                f"Shadow: Increased {asset} to {new_qty}, basis ${current.entry_price:.2f}"
            )

        else:
            # Decreasing position (partial close) - basis unchanged
            current.quantity = new_qty
            current.current_price = fill_price
            logger.info(f"Shadow: Reduced {asset} to {new_qty}")

update_prices

update_prices(prices)

Update current prices for accurate account value.

Parameters:

Name Type Description Default
prices dict[str, float]

Dictionary mapping asset symbol to current price

required
Source code in src/ml4t/live/safety.py
def update_prices(self, prices: dict[str, float]) -> None:
    """Update current prices for accurate account value.

    Args:
        prices: Dictionary mapping asset symbol to current price
    """
    for asset, price in prices.items():
        if asset in self._positions:
            self._positions[asset].current_price = price