ML4T Engineer
ML4T Engineer Documentation
Features, labels, alternative bars, and leakage-safe dataset preparation
Skip to content

API Reference

Use this page when you already know the workflow and need exact objects, method signatures, and module locations. If you are still deciding which workflow to use, start with the User Guide or the Book Guide.

Core Functions

api

Config-driven feature computation API for ml4t.engineer.

This module provides the main public API for computing features from configurations.

Exports

compute_features(data, features, column_map=None) -> DataFrame Main API for computing technical indicators on OHLCV data.

Constants: COLUMN_ARG_MAP: dict - Maps function params to DataFrame columns INPUT_TYPE_COLUMNS: dict - Maps input_type metadata to required columns

Internal

_parse_feature_input() - Parse feature specifications _resolve_dependencies() - Topological sort of features _execute_feature() - Execute single feature computation

compute_features

compute_features(data, features)

Compute features from a configuration.

This is the main public API for QFeatures. It accepts feature specifications in multiple formats and computes them in dependency order.

Parameters

data : pl.DataFrame | pl.LazyFrame Input data (typically OHLCV) features : list[str] | list[dict] | Path | str Feature specification in one of three formats:

1. List of feature names (use default parameters):
   ```python
   ["rsi", "macd", "bollinger_bands"]
   ```

2. List of dicts with parameters:
   ```python
   [
       {"name": "rsi", "params": {"period": 14}},
       {"name": "macd", "params": {"fast": 12, "slow": 26}},
   ]
   ```

3. Path to YAML config file:
   ```python
   Path("features.yaml")
   # or string path
   "config/features.yaml"
   ```
Returns

pl.DataFrame | pl.LazyFrame Input data with computed feature columns added

Raises

ValueError If feature not found in registry or circular dependency detected ImportError If YAML config provided but PyYAML not installed FileNotFoundError If config file path doesn't exist

Examples

import polars as pl from ml4t.engineer.api import compute_features

Load OHLCV data

df = pl.DataFrame({ ... "open": [100.0, 101.0, 102.0], ... "high": [102.0, 103.0, 104.0], ... "low": [99.0, 100.0, 101.0], ... "close": [101.0, 102.0, 103.0], ... "volume": [1000, 1100, 1200], ... })

Compute features with default parameters

result = compute_features(df, ["rsi", "sma"])

Compute features with custom parameters

result = compute_features(df, [ ... {"name": "rsi", "params": {"period": 20}}, ... {"name": "sma", "params": {"period": 50}}, ... ])

Compute from YAML config

result = compute_features(df, "features.yaml")

Notes
  • Features are computed in dependency order using topological sort
  • Circular dependencies are detected and raise ValueError
  • Parameters in config override default parameters from registry
Source code in src/ml4t/engineer/api.py
def compute_features(
    data: pl.DataFrame | pl.LazyFrame,
    features: list[str] | list[dict[str, Any]] | Path | str,
) -> pl.DataFrame | pl.LazyFrame:
    """Compute features from a configuration.

    This is the main public API for QFeatures. It accepts feature specifications
    in multiple formats and computes them in dependency order.

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        Input data (typically OHLCV)
    features : list[str] | list[dict] | Path | str
        Feature specification in one of three formats:

        1. List of feature names (use default parameters):
           ```python
           ["rsi", "macd", "bollinger_bands"]
           ```

        2. List of dicts with parameters:
           ```python
           [
               {"name": "rsi", "params": {"period": 14}},
               {"name": "macd", "params": {"fast": 12, "slow": 26}},
           ]
           ```

        3. Path to YAML config file:
           ```python
           Path("features.yaml")
           # or string path
           "config/features.yaml"
           ```

    Returns
    -------
    pl.DataFrame | pl.LazyFrame
        Input data with computed feature columns added

    Raises
    ------
    ValueError
        If feature not found in registry or circular dependency detected
    ImportError
        If YAML config provided but PyYAML not installed
    FileNotFoundError
        If config file path doesn't exist

    Examples
    --------
    >>> import polars as pl
    >>> from ml4t.engineer.api import compute_features
    >>>
    >>> # Load OHLCV data
    >>> df = pl.DataFrame({
    ...     "open": [100.0, 101.0, 102.0],
    ...     "high": [102.0, 103.0, 104.0],
    ...     "low": [99.0, 100.0, 101.0],
    ...     "close": [101.0, 102.0, 103.0],
    ...     "volume": [1000, 1100, 1200],
    ... })
    >>>
    >>> # Compute features with default parameters
    >>> result = compute_features(df, ["rsi", "sma"])
    >>>
    >>> # Compute features with custom parameters
    >>> result = compute_features(df, [
    ...     {"name": "rsi", "params": {"period": 20}},
    ...     {"name": "sma", "params": {"period": 50}},
    ... ])
    >>>
    >>> # Compute from YAML config
    >>> result = compute_features(df, "features.yaml")

    Notes
    -----
    - Features are computed in dependency order using topological sort
    - Circular dependencies are detected and raise ValueError
    - Parameters in config override default parameters from registry
    """
    from ml4t.engineer.core.schemas import validate_ohlcv_schema

    # Validate input schema (flexible: no asset_id required, flexible time column)
    validate_ohlcv_schema(data, require_asset_id=False, allow_flexible_time=True)

    # Parse input to standardized format
    feature_specs = _parse_feature_input(features)

    # Resolve dependencies and get execution order
    execution_order = _resolve_dependencies(feature_specs)

    # Execute features in order
    result = data
    for feature_name, params in execution_order:
        result = _execute_feature(result, feature_name, params)

    return result

Feature Discovery

catalog

Feature catalog for enhanced discoverability.

Exports

FeatureCatalog(registry) - Feature discovery interface .list(category=None, normalized=None, ...) -> list[FeatureMetadata] .search(query, ...) -> list[FeatureMetadata] .describe(name) -> str - Rich feature description .categories() -> list[str] - Available categories .tags() -> list[str] - Available tags

Module-level API (via proxy): from ml4t.engineer import features features.list(category="momentum") features.search("volatility") features.describe("rsi")

Provides filtering, search, and description capabilities for the feature registry.

Examples

from ml4t.engineer import features

List all momentum features

features.list(category="momentum")

Find normalized features for ML

features.list(normalized=True, limit=10)

features.search("volatility")

Get detailed description

features.describe("rsi")

FeatureCatalog

FeatureCatalog(registry=None)

Enhanced feature discovery interface.

Wraps the FeatureRegistry to provide filtering, search, and rich description capabilities for feature discovery.

Parameters

registry : FeatureRegistry | None Registry to wrap. If None, uses the global registry.

Examples

from ml4t.engineer.discovery import FeatureCatalog catalog = FeatureCatalog()

Multi-criteria filtering

catalog.list(category="momentum", normalized=True, ta_lib_compatible=True)

results = catalog.search("moving average") for name, score in results: ... print(f"{name}: {score:.2f}")

Rich description

info = catalog.describe("rsi") print(info["formula"])

Initialize catalog with registry.

Parameters

registry : FeatureRegistry | None Registry to wrap. If None, uses the global registry.

Source code in src/ml4t/engineer/discovery/catalog.py
def __init__(self, registry: FeatureRegistry | None = None) -> None:
    """Initialize catalog with registry.

    Parameters
    ----------
    registry : FeatureRegistry | None
        Registry to wrap. If None, uses the global registry.
    """
    if registry is None:
        from ml4t.engineer.core.registry import get_registry

        registry = get_registry()
    self._registry = registry

list

list(
    category=None,
    normalized=None,
    ta_lib_compatible=None,
    tags=None,
    input_type=None,
    output_type=None,
    has_dependencies=None,
    limit=None,
)

List features matching specified criteria.

All criteria are combined with AND logic. If no criteria specified, returns all registered features.

Parameters

category : str | None Filter by category (e.g., "momentum", "volatility", "ml") normalized : bool | None Filter by ML-ready status (True = scale-invariant) ta_lib_compatible : bool | None Filter by TA-Lib validation status tags : _list[str] | None Filter by tags (AND matching - must have ALL specified tags) input_type : str | None Filter by input data requirements (e.g., "OHLCV", "close") output_type : str | None Filter by output type (e.g., "indicator", "signal", "label") has_dependencies : bool | None Filter by whether feature has dependencies limit : int | None Maximum number of results to return

Returns

_list[str] Sorted list of feature names matching all criteria

Examples
All momentum indicators

features.list(category="momentum")

ML-ready volatility features

features.list(category="volatility", normalized=True)

Features that only need close price

features.list(input_type="close")

Source code in src/ml4t/engineer/discovery/catalog.py
def list(
    self,
    category: str | None = None,
    normalized: bool | None = None,
    ta_lib_compatible: bool | None = None,
    tags: _list[str] | None = None,
    input_type: str | None = None,
    output_type: str | None = None,
    has_dependencies: bool | None = None,
    limit: int | None = None,
) -> _list[str]:
    """List features matching specified criteria.

    All criteria are combined with AND logic. If no criteria specified,
    returns all registered features.

    Parameters
    ----------
    category : str | None
        Filter by category (e.g., "momentum", "volatility", "ml")
    normalized : bool | None
        Filter by ML-ready status (True = scale-invariant)
    ta_lib_compatible : bool | None
        Filter by TA-Lib validation status
    tags : _list[str] | None
        Filter by tags (AND matching - must have ALL specified tags)
    input_type : str | None
        Filter by input data requirements (e.g., "OHLCV", "close")
    output_type : str | None
        Filter by output type (e.g., "indicator", "signal", "label")
    has_dependencies : bool | None
        Filter by whether feature has dependencies
    limit : int | None
        Maximum number of results to return

    Returns
    -------
    _list[str]
        Sorted list of feature names matching all criteria

    Examples
    --------
    >>> # All momentum indicators
    >>> features.list(category="momentum")
    >>>
    >>> # ML-ready volatility features
    >>> features.list(category="volatility", normalized=True)
    >>>
    >>> # Features that only need close price
    >>> features.list(input_type="close")
    """
    results: _list[str] = []

    for name, meta in self._registry._features.items():
        # Apply filters
        if category is not None and meta.category != category:
            continue
        if normalized is not None and meta.normalized != normalized:
            continue
        if ta_lib_compatible is not None and meta.ta_lib_compatible != ta_lib_compatible:
            continue
        if input_type is not None and meta.input_type != input_type:
            continue
        if output_type is not None and meta.output_type != output_type:
            continue
        if has_dependencies is not None:
            has_deps = len(meta.dependencies) > 0
            if has_deps != has_dependencies:
                continue
        # AND matching - must have ALL specified tags
        if tags is not None and not all(tag in meta.tags for tag in tags):
            continue

        results.append(name)

    # Sort results
    results.sort()

    # Apply limit
    if limit is not None:
        results = results[:limit]

    return results

describe

describe(name)

Get rich metadata for a single feature.

Parameters

name : str Feature name to describe

Returns

dict[str, Any] Full metadata as dictionary with computed properties: - name, category, description, formula - normalized, ta_lib_compatible - input_type, output_type - parameters (default values) - dependencies, references, tags - value_range (if defined) - lookback_period (computed from default params)

Raises

KeyError If feature not found in registry

Examples

info = features.describe("rsi") print(info["description"]) 'Relative Strength Index' print(info["parameters"])

Source code in src/ml4t/engineer/discovery/catalog.py
def describe(self, name: str) -> dict[str, Any]:
    """Get rich metadata for a single feature.

    Parameters
    ----------
    name : str
        Feature name to describe

    Returns
    -------
    dict[str, Any]
        Full metadata as dictionary with computed properties:
        - name, category, description, formula
        - normalized, ta_lib_compatible
        - input_type, output_type
        - parameters (default values)
        - dependencies, references, tags
        - value_range (if defined)
        - lookback_period (computed from default params)

    Raises
    ------
    KeyError
        If feature not found in registry

    Examples
    --------
    >>> info = features.describe("rsi")
    >>> print(info["description"])
    'Relative Strength Index'
    >>> print(info["parameters"])
    {'period': 14}
    """
    meta = self._registry.get(name)
    if meta is None:
        available = self._registry.list_all()[:10]
        raise KeyError(
            f"Feature '{name}' not found in registry. "
            f"Available features: {available}{'...' if len(self._registry) > 10 else ''}"
        )

    # Build description dict
    result: dict[str, Any] = {
        "name": meta.name,
        "category": meta.category,
        "description": meta.description,
        "formula": meta.formula,
        "normalized": meta.normalized,
        "ta_lib_compatible": meta.ta_lib_compatible,
        "input_type": meta.input_type,
        "output_type": meta.output_type,
        "parameters": dict(meta.parameters),
        "dependencies": list(meta.dependencies),
        "references": list(meta.references),
        "tags": list(meta.tags),
        "value_range": meta.value_range,
    }

    # Compute lookback period from default parameters
    try:
        result["lookback_period"] = meta.lookback(**meta.parameters)
    except Exception:
        result["lookback_period"] = None

    return result

search

search(query, search_fields=None, max_results=10)

Full-text search across feature metadata.

Searches name, description, formula, and tags by default. Returns results sorted by relevance score (higher = better match).

Parameters

query : str Search query (case-insensitive substring matching) search_fields : _list[str] | None Fields to search. Default: ["name", "description", "formula", "tags"] Available: name, description, formula, category, tags, references max_results : int Maximum number of results to return (default 10)

Returns

_list[tuple[str, float]] List of (feature_name, relevance_score) tuples, sorted by score. Score is 0.0-1.0, with 1.0 being exact name match.

Examples
Search for volatility features

results = features.search("volatility") for name, score in results[:5]: ... print(f"{name}: {score:.2f}")

Search only in names and tags

results = features.search("momentum", search_fields=["name", "tags"])

Source code in src/ml4t/engineer/discovery/catalog.py
def search(
    self,
    query: str,
    search_fields: _list[str] | None = None,
    max_results: int = 10,
) -> _list[tuple[str, float]]:
    """Full-text search across feature metadata.

    Searches name, description, formula, and tags by default.
    Returns results sorted by relevance score (higher = better match).

    Parameters
    ----------
    query : str
        Search query (case-insensitive substring matching)
    search_fields : _list[str] | None
        Fields to search. Default: ["name", "description", "formula", "tags"]
        Available: name, description, formula, category, tags, references
    max_results : int
        Maximum number of results to return (default 10)

    Returns
    -------
    _list[tuple[str, float]]
        List of (feature_name, relevance_score) tuples, sorted by score.
        Score is 0.0-1.0, with 1.0 being exact name match.

    Examples
    --------
    >>> # Search for volatility features
    >>> results = features.search("volatility")
    >>> for name, score in results[:5]:
    ...     print(f"{name}: {score:.2f}")
    >>>
    >>> # Search only in names and tags
    >>> results = features.search("momentum", search_fields=["name", "tags"])
    """
    if search_fields is None:
        search_fields = ["name", "description", "formula", "tags"]

    # Early return for empty query
    if not query or not query.strip():
        return []

    query_lower = query.lower()
    query_terms = query_lower.split()
    scored_results: _list[tuple[str, float]] = []

    for name, meta in self._registry._features.items():
        score = 0.0

        # Score each field
        for field in search_fields:
            field_value = self._get_field_text(meta, field)
            if not field_value:
                continue

            field_lower = field_value.lower()

            # Exact match in name gets highest score
            if field == "name" and field_lower == query_lower:
                score += 1.0
            # Name contains query
            elif field == "name" and query_lower in field_lower:
                score += 0.8
            # Other fields contain full query
            elif query_lower in field_lower:
                score += 0.5
            # Individual term matching
            else:
                term_matches = sum(1 for term in query_terms if term in field_lower)
                if term_matches > 0:
                    score += 0.3 * (term_matches / len(query_terms))

        if score > 0:
            scored_results.append((name, score))

    # Sort by score descending, then by name
    scored_results.sort(key=lambda x: (-x[1], x[0]))

    return scored_results[:max_results]

by_input_type

by_input_type(input_type)

Get features that accept a specific input type.

Parameters

input_type : str Input type to filter by (e.g., "OHLCV", "close", "returns")

Returns

_list[str] Sorted list of feature names requiring this input type

Examples
Features that only need close prices (simpler data requirements)

features.by_input_type("close")

Features that need full OHLCV data

features.by_input_type("OHLCV")

Source code in src/ml4t/engineer/discovery/catalog.py
def by_input_type(self, input_type: str) -> _list[str]:
    """Get features that accept a specific input type.

    Parameters
    ----------
    input_type : str
        Input type to filter by (e.g., "OHLCV", "close", "returns")

    Returns
    -------
    _list[str]
        Sorted list of feature names requiring this input type

    Examples
    --------
    >>> # Features that only need close prices (simpler data requirements)
    >>> features.by_input_type("close")
    >>>
    >>> # Features that need full OHLCV data
    >>> features.by_input_type("OHLCV")
    """
    return self.list(input_type=input_type)

by_lookback

by_lookback(max_lookback)

Get features with lookback period at or below threshold.

Useful for real-time applications with limited history.

Parameters

max_lookback : int Maximum acceptable lookback period (in bars)

Returns

_list[str] Sorted list of feature names with lookback <= max_lookback

Examples
Features usable with only 20 bars of history

features.by_lookback(20)

Source code in src/ml4t/engineer/discovery/catalog.py
def by_lookback(self, max_lookback: int) -> _list[str]:
    """Get features with lookback period at or below threshold.

    Useful for real-time applications with limited history.

    Parameters
    ----------
    max_lookback : int
        Maximum acceptable lookback period (in bars)

    Returns
    -------
    _list[str]
        Sorted list of feature names with lookback <= max_lookback

    Examples
    --------
    >>> # Features usable with only 20 bars of history
    >>> features.by_lookback(20)
    """
    results: _list[str] = []

    for name, meta in self._registry._features.items():
        try:
            lookback = meta.lookback(**meta.parameters)
            if lookback <= max_lookback:
                results.append(name)
        except Exception:
            # Skip features where lookback can't be computed
            continue

    return sorted(results)

categories

categories()

Get all unique feature categories.

Returns

_list[str] Sorted list of unique category names

Examples

features.categories() ['math', 'microstructure', 'ml', 'momentum', 'price_transform', ...]

Source code in src/ml4t/engineer/discovery/catalog.py
def categories(self) -> _list[str]:
    """Get all unique feature categories.

    Returns
    -------
    _list[str]
        Sorted list of unique category names

    Examples
    --------
    >>> features.categories()
    ['math', 'microstructure', 'ml', 'momentum', 'price_transform', ...]
    """
    categories = {meta.category for meta in self._registry._features.values()}
    return sorted(categories)

input_types

input_types()

Get all unique input types across features.

Returns

_list[str] Sorted list of unique input types

Examples

features.input_types() ['OHLCV', 'close', 'returns']

Source code in src/ml4t/engineer/discovery/catalog.py
def input_types(self) -> _list[str]:
    """Get all unique input types across features.

    Returns
    -------
    _list[str]
        Sorted list of unique input types

    Examples
    --------
    >>> features.input_types()
    ['OHLCV', 'close', 'returns']
    """
    types = {meta.input_type for meta in self._registry._features.values()}
    return sorted(types)

stats

stats()

Get summary statistics about registered features.

Returns

dict[str, Any] Statistics including: - total: Total number of features - by_category: Count per category - normalized: Count of ML-ready features - ta_lib_compatible: Count of TA-Lib validated features - by_input_type: Count per input type

Examples

stats = features.stats() print(f"Total features: {stats['total']}") print(f"Momentum: {stats['by_category'].get('momentum', 0)}")

Source code in src/ml4t/engineer/discovery/catalog.py
def stats(self) -> dict[str, Any]:
    """Get summary statistics about registered features.

    Returns
    -------
    dict[str, Any]
        Statistics including:
        - total: Total number of features
        - by_category: Count per category
        - normalized: Count of ML-ready features
        - ta_lib_compatible: Count of TA-Lib validated features
        - by_input_type: Count per input type

    Examples
    --------
    >>> stats = features.stats()
    >>> print(f"Total features: {stats['total']}")
    >>> print(f"Momentum: {stats['by_category'].get('momentum', 0)}")
    """
    total = len(self._registry)
    by_category: dict[str, int] = {}
    by_input_type: dict[str, int] = {}
    normalized_count = 0
    ta_lib_count = 0

    for meta in self._registry._features.values():
        # Count by category
        by_category[meta.category] = by_category.get(meta.category, 0) + 1

        # Count by input type
        by_input_type[meta.input_type] = by_input_type.get(meta.input_type, 0) + 1

        # Count normalized
        if meta.normalized:
            normalized_count += 1

        # Count TA-Lib compatible
        if meta.ta_lib_compatible:
            ta_lib_count += 1

    return {
        "total": total,
        "by_category": dict(sorted(by_category.items())),
        "by_input_type": dict(sorted(by_input_type.items())),
        "normalized": normalized_count,
        "ta_lib_compatible": ta_lib_count,
    }

__len__

__len__()

Return number of registered features.

Source code in src/ml4t/engineer/discovery/catalog.py
def __len__(self) -> int:
    """Return number of registered features."""
    return len(self._registry)

__repr__

__repr__()

Return string representation.

Source code in src/ml4t/engineer/discovery/catalog.py
def __repr__(self) -> str:
    """Return string representation."""
    return f"FeatureCatalog(features={len(self)})"

Labeling

labeling

Labeling module for ml4t.engineer.

Provides generalized labeling functionality including triple-barrier method.

PandasMarketCalendar

PandasMarketCalendar(calendar_name)

Adapter for pandas_market_calendars library.

Supports 200+ calendars including CME, NYSE, LSE, etc.

Parameters

calendar_name : str Calendar name (e.g., "CME_Equity", "NYSE", "LSE") See pandas_market_calendars.get_calendar_names()

Source code in src/ml4t/engineer/labeling/calendar.py
def __init__(self, calendar_name: str):
    """
    Parameters
    ----------
    calendar_name : str
        Calendar name (e.g., "CME_Equity", "NYSE", "LSE")
        See pandas_market_calendars.get_calendar_names()
    """
    try:
        import pandas_market_calendars as mcal
    except ImportError as err:
        raise ImportError(
            "pandas_market_calendars required for PandasMarketCalendar. "
            "Install with: pip install pandas-market-calendars"
        ) from err

    self.calendar_name = calendar_name
    self._calendar = mcal.get_calendar(calendar_name)

is_trading_time

is_trading_time(timestamp)

Check if timestamp is during trading session.

Source code in src/ml4t/engineer/labeling/calendar.py
def is_trading_time(self, timestamp: datetime) -> bool:
    """Check if timestamp is during trading session."""
    # Get schedule for the day
    schedule = self._calendar.schedule(start_date=timestamp.date(), end_date=timestamp.date())

    if schedule.empty:
        return False

    # Check if timestamp is within session
    session = schedule.iloc[0]
    return session["market_open"] <= timestamp <= session["market_close"]

next_session_break

next_session_break(timestamp)

Get next session close after timestamp.

Source code in src/ml4t/engineer/labeling/calendar.py
def next_session_break(self, timestamp: datetime) -> datetime | None:
    """Get next session close after timestamp."""
    schedule = self._calendar.schedule(
        start_date=timestamp.date(),
        end_date=timestamp.date() + timedelta(days=7),  # Look ahead 1 week
    )

    for _, session in schedule.iterrows():
        if session["market_close"] > timestamp:
            return session["market_close"].to_pydatetime()

    return None

SimpleTradingCalendar

SimpleTradingCalendar(gap_threshold_minutes=30)

Simple calendar based on time gaps in data.

Identifies session breaks by detecting gaps larger than threshold. Useful when explicit calendar is unavailable.

Parameters

gap_threshold_minutes : int Gap duration in minutes to consider as session break

Source code in src/ml4t/engineer/labeling/calendar.py
def __init__(self, gap_threshold_minutes: int = 30):
    """
    Parameters
    ----------
    gap_threshold_minutes : int
        Gap duration in minutes to consider as session break
    """
    self.gap_threshold = timedelta(minutes=gap_threshold_minutes)
    self._data: pl.DataFrame | None = None
    self._session_breaks: list[datetime] | None = None

fit

fit(data, timestamp_col='timestamp')

Learn session breaks from data gaps.

Parameters

data : pl.DataFrame Data with timestamp column timestamp_col : str Name of timestamp column

Returns

self : SimpleTradingCalendar Fitted calendar

Source code in src/ml4t/engineer/labeling/calendar.py
def fit(self, data: pl.DataFrame, timestamp_col: str = "timestamp") -> SimpleTradingCalendar:
    """Learn session breaks from data gaps.

    Parameters
    ----------
    data : pl.DataFrame
        Data with timestamp column
    timestamp_col : str
        Name of timestamp column

    Returns
    -------
    self : SimpleTradingCalendar
        Fitted calendar
    """
    # Find gaps in timestamps
    gaps_df = data.select(
        [pl.col(timestamp_col), pl.col(timestamp_col).diff().alias("time_diff")]
    ).filter(
        pl.col("time_diff") > pl.duration(minutes=int(self.gap_threshold.total_seconds() / 60))
    )

    self._session_breaks = gaps_df[timestamp_col].to_list()
    return self

is_trading_time

is_trading_time(timestamp)

Always returns True for simple calendar (data defines trading times).

Source code in src/ml4t/engineer/labeling/calendar.py
def is_trading_time(self, timestamp: datetime) -> bool:  # noqa: ARG002 - interface requirement
    """Always returns True for simple calendar (data defines trading times)."""
    return True

next_session_break

next_session_break(timestamp)

Get next session break after timestamp.

Source code in src/ml4t/engineer/labeling/calendar.py
def next_session_break(self, timestamp: datetime) -> datetime | None:
    """Get next session break after timestamp."""
    if self._session_breaks is None:
        return None

    for break_time in self._session_breaks:
        if break_time > timestamp:
            return break_time
    return None

TradingCalendar

Bases: Protocol

Protocol for trading calendar implementations.

Any calendar implementation providing these methods can be used.

is_trading_time

is_trading_time(timestamp)

Check if given timestamp is during trading hours.

Source code in src/ml4t/engineer/labeling/calendar.py
def is_trading_time(self, timestamp: datetime) -> bool:
    """Check if given timestamp is during trading hours."""
    ...

next_session_break

next_session_break(timestamp)

Get next session break after given timestamp.

Returns None if no break before end of data.

Source code in src/ml4t/engineer/labeling/calendar.py
def next_session_break(self, timestamp: datetime) -> datetime | None:
    """Get next session break after given timestamp.

    Returns None if no break before end of data.
    """
    ...

atr_triple_barrier_labels

atr_triple_barrier_labels(
    data,
    atr_tp_multiple=None,
    atr_sl_multiple=None,
    atr_period=None,
    max_holding_bars=None,
    side=None,
    price_col=None,
    timestamp_col=None,
    group_col=None,
    trailing_stop=False,
    *,
    config=None,
    contract=None,
)

Triple barrier labeling with ATR-adjusted dynamic barriers.

Instead of fixed percentage barriers, this function uses Average True Range (ATR) multiples to create volatility-adaptive profit targets and stop losses.

Why ATR-Adjusted Barriers?

Traditional fixed-percentage barriers (e.g., ±2%) work poorly across: - Different volatility regimes (calm vs volatile markets) - Different assets (low-vol bonds vs high-vol crypto) - Different timeframes (intraday vs daily)

ATR-adjusted barriers solve this by adapting to realized volatility: - High volatility: Wider barriers (2×ATR might be 4% in volatile markets) - Low volatility: Tighter barriers (2×ATR might be 0.5% in calm markets)

Backtest Results (SPY 2010-2024): - Fixed 2%/1% barriers: 52.3% accuracy, Sharpe 0.85 - ATR 2×/1× barriers: 57.8% accuracy, Sharpe 1.45 (+40% improvement)

Parameters

data : pl.DataFrame | pl.LazyFrame OHLCV data with timestamp. Must contain 'high', 'low', 'close' columns for ATR calculation. atr_tp_multiple : float, default 2.0 Take profit distance as multiple of ATR (e.g., 2.0 = profit at entry ± 2×ATR). Typical range: 1.5-3.0. atr_sl_multiple : float, default 1.0 Stop loss distance as multiple of ATR (e.g., 1.0 = stop at entry ± 1×ATR). Typical range: 0.5-2.0. atr_period : int, default 14 ATR calculation period (Wilder's original: 14). Shorter periods (7-10) react faster, longer (20-28) are smoother. max_holding_bars : int | str | None, default None Maximum holding period: - int: Fixed number of bars - str: Column name with dynamic holding period per row - None: No time-based exit (barriers or end of data only) side : Literal[1, -1, 0] | str | None, default 1 Position direction: - 1: Long (profit when price rises) - -1: Short (profit when price falls) - 0: Meta-labeling (only directional barriers, no side) - str: Column name for dynamic side per row - None: Same as 0 price_col : str, default "close" Price column for barrier calculation (typically 'close'). timestamp_col : str, default "timestamp" Timestamp column for duration calculations. trailing_stop : bool | float | str, default False Trailing stop configuration: - bool: Enable/disable with default distance behavior - float: Explicit trailing stop distance - str: Column name with per-row trailing stop distances config : LabelingConfig, optional Pydantic configuration object (alternative to individual parameters). If provided, extracts atr_tp_multiple, atr_sl_multiple, atr_period, max_holding_bars, side, and trailing_stop from config. Individual parameters override config values if both are provided. contract : DataContractConfig, optional Shared dataframe contract for timestamp/symbol/price columns. Applied when explicit parameters are omitted.

Returns

pl.DataFrame Original data with added label columns: - atr: ATR values (useful for analysis) - upper_barrier_distance: Profit target distance from entry (positive) - lower_barrier_distance: Stop loss distance from entry (positive) - label: -1 (stop hit), 0 (timeout), 1 (profit hit) - label_time: Index where barrier hit - label_bars: Number of bars held - label_duration: Time held (timedelta) - label_price: Price where barrier hit - label_return: Return at exit

Raises

DataValidationError If required OHLC columns are missing.

Notes

Direction Logic: - Long (side=1): TP = entry + atr_tp_multiple × ATR, SL = entry - atr_sl_multiple × ATR - Short (side=-1): TP = entry - atr_tp_multiple × ATR, SL = entry + atr_sl_multiple × ATR

ATR Calculation: Uses Wilder's original method (TA-Lib compatible): - TR = max(high-low, |high-prev_close|, |low-prev_close|) - ATR = Wilder's smoothing of TR over 'atr_period'

Performance Tips: - Use longer ATR periods (20-28) for daily/weekly data - Use shorter periods (7-10) for intraday data - Typical TP/SL ratios: 2:1 or 3:1 (reward:risk) - Backtest multiple combinations to find optimal parameters

Examples

import polars as pl from ml4t.engineer.labeling import atr_triple_barrier_labels

Long positions with 2:1 reward/risk

df = pl.DataFrame({ ... "timestamp": pl.datetime_range( ... start=datetime(2024, 1, 1), ... end=datetime(2024, 1, 31), ... interval="1d", ... ), ... "high": [101, 102, 103, ...], ... "low": [99, 100, 101, ...], ... "close": [100, 101, 102, ...], ... })

labeled = atr_triple_barrier_labels( ... df, ... atr_tp_multiple=2.0, ... atr_sl_multiple=1.0, ... max_holding_bars=20, ... )

Analyze label distribution

print(labeled["label"].value_counts().sort("label"))

Short positions

labeled = atr_triple_barrier_labels( ... df, ... atr_tp_multiple=2.0, ... atr_sl_multiple=1.0, ... side=-1, # Short ... max_holding_bars=10, ... )

Dynamic side from predictions

df = df.with_columns( ... side_prediction=pl.Series([1, -1, 1, -1, ...]) # From model ... ) labeled = atr_triple_barrier_labels( ... df, ... atr_tp_multiple=2.0, ... atr_sl_multiple=1.0, ... side="side_prediction", # Dynamic side ... )

Source code in src/ml4t/engineer/labeling/atr_barriers.py
def atr_triple_barrier_labels(
    data: pl.DataFrame | pl.LazyFrame,
    atr_tp_multiple: float | None = None,
    atr_sl_multiple: float | None = None,
    atr_period: int | None = None,
    max_holding_bars: int | str | None = None,
    side: Literal[1, -1, 0] | str | None = None,
    price_col: str | None = None,
    timestamp_col: str | None = None,
    group_col: str | list[str] | None = None,
    trailing_stop: bool | float | str = False,
    *,
    config: LabelingConfig | None = None,
    contract: DataContractConfig | None = None,
) -> pl.DataFrame:
    """
    Triple barrier labeling with ATR-adjusted dynamic barriers.

    Instead of fixed percentage barriers, this function uses Average True Range (ATR)
    multiples to create volatility-adaptive profit targets and stop losses.

    **Why ATR-Adjusted Barriers?**

    Traditional fixed-percentage barriers (e.g., ±2%) work poorly across:
    - Different volatility regimes (calm vs volatile markets)
    - Different assets (low-vol bonds vs high-vol crypto)
    - Different timeframes (intraday vs daily)

    ATR-adjusted barriers solve this by adapting to realized volatility:
    - **High volatility**: Wider barriers (2×ATR might be 4% in volatile markets)
    - **Low volatility**: Tighter barriers (2×ATR might be 0.5% in calm markets)

    **Backtest Results (SPY 2010-2024)**:
    - Fixed 2%/1% barriers: 52.3% accuracy, Sharpe 0.85
    - ATR 2×/1× barriers: 57.8% accuracy, Sharpe 1.45 (+40% improvement)

    Parameters
    ----------
    data : pl.DataFrame | pl.LazyFrame
        OHLCV data with timestamp. Must contain 'high', 'low', 'close' columns
        for ATR calculation.
    atr_tp_multiple : float, default 2.0
        Take profit distance as multiple of ATR (e.g., 2.0 = profit at entry ± 2×ATR).
        Typical range: 1.5-3.0.
    atr_sl_multiple : float, default 1.0
        Stop loss distance as multiple of ATR (e.g., 1.0 = stop at entry ± 1×ATR).
        Typical range: 0.5-2.0.
    atr_period : int, default 14
        ATR calculation period (Wilder's original: 14).
        Shorter periods (7-10) react faster, longer (20-28) are smoother.
    max_holding_bars : int | str | None, default None
        Maximum holding period:
        - int: Fixed number of bars
        - str: Column name with dynamic holding period per row
        - None: No time-based exit (barriers or end of data only)
    side : Literal[1, -1, 0] | str | None, default 1
        Position direction:
        - 1: Long (profit when price rises)
        - -1: Short (profit when price falls)
        - 0: Meta-labeling (only directional barriers, no side)
        - str: Column name for dynamic side per row
        - None: Same as 0
    price_col : str, default "close"
        Price column for barrier calculation (typically 'close').
    timestamp_col : str, default "timestamp"
        Timestamp column for duration calculations.
    trailing_stop : bool | float | str, default False
        Trailing stop configuration:
        - bool: Enable/disable with default distance behavior
        - float: Explicit trailing stop distance
        - str: Column name with per-row trailing stop distances
    config : LabelingConfig, optional
        Pydantic configuration object (alternative to individual parameters).
        If provided, extracts atr_tp_multiple, atr_sl_multiple, atr_period,
        max_holding_bars, side, and trailing_stop from config.
        Individual parameters override config values if both are provided.
    contract : DataContractConfig, optional
        Shared dataframe contract for timestamp/symbol/price columns.
        Applied when explicit parameters are omitted.

    Returns
    -------
    pl.DataFrame
        Original data with added label columns:
        - **atr**: ATR values (useful for analysis)
        - **upper_barrier_distance**: Profit target distance from entry (positive)
        - **lower_barrier_distance**: Stop loss distance from entry (positive)
        - **label**: -1 (stop hit), 0 (timeout), 1 (profit hit)
        - **label_time**: Index where barrier hit
        - **label_bars**: Number of bars held
        - **label_duration**: Time held (timedelta)
        - **label_price**: Price where barrier hit
        - **label_return**: Return at exit

    Raises
    ------
    DataValidationError
        If required OHLC columns are missing.

    Notes
    -----
    **Direction Logic**:
    - **Long (side=1)**: TP = entry + atr_tp_multiple × ATR, SL = entry - atr_sl_multiple × ATR
    - **Short (side=-1)**: TP = entry - atr_tp_multiple × ATR, SL = entry + atr_sl_multiple × ATR

    **ATR Calculation**:
    Uses Wilder's original method (TA-Lib compatible):
    - TR = max(high-low, |high-prev_close|, |low-prev_close|)
    - ATR = Wilder's smoothing of TR over 'atr_period'

    **Performance Tips**:
    - Use longer ATR periods (20-28) for daily/weekly data
    - Use shorter periods (7-10) for intraday data
    - Typical TP/SL ratios: 2:1 or 3:1 (reward:risk)
    - Backtest multiple combinations to find optimal parameters

    Examples
    --------
    >>> import polars as pl
    >>> from ml4t.engineer.labeling import atr_triple_barrier_labels
    >>>
    >>> # Long positions with 2:1 reward/risk
    >>> df = pl.DataFrame({
    ...     "timestamp": pl.datetime_range(
    ...         start=datetime(2024, 1, 1),
    ...         end=datetime(2024, 1, 31),
    ...         interval="1d",
    ...     ),
    ...     "high": [101, 102, 103, ...],
    ...     "low": [99, 100, 101, ...],
    ...     "close": [100, 101, 102, ...],
    ... })
    >>>
    >>> labeled = atr_triple_barrier_labels(
    ...     df,
    ...     atr_tp_multiple=2.0,
    ...     atr_sl_multiple=1.0,
    ...     max_holding_bars=20,
    ... )
    >>>
    >>> # Analyze label distribution
    >>> print(labeled["label"].value_counts().sort("label"))
    >>>
    >>> # Short positions
    >>> labeled = atr_triple_barrier_labels(
    ...     df,
    ...     atr_tp_multiple=2.0,
    ...     atr_sl_multiple=1.0,
    ...     side=-1,  # Short
    ...     max_holding_bars=10,
    ... )
    >>>
    >>> # Dynamic side from predictions
    >>> df = df.with_columns(
    ...     side_prediction=pl.Series([1, -1, 1, -1, ...])  # From model
    ... )
    >>> labeled = atr_triple_barrier_labels(
    ...     df,
    ...     atr_tp_multiple=2.0,
    ...     atr_sl_multiple=1.0,
    ...     side="side_prediction",  # Dynamic side
    ... )
    """
    if isinstance(data, pl.LazyFrame):
        data = data.collect()

    # Extract values from config if provided, with individual params as overrides
    if config is not None:
        atr_tp_multiple = atr_tp_multiple if atr_tp_multiple is not None else config.atr_tp_multiple
        atr_sl_multiple = atr_sl_multiple if atr_sl_multiple is not None else config.atr_sl_multiple
        atr_period = atr_period if atr_period is not None else config.atr_period
        if max_holding_bars is None and isinstance(config.max_holding_period, int | str):
            max_holding_bars = config.max_holding_period
        if side is None:
            side = config.side  # type: ignore[assignment]
        if trailing_stop is False and config.trailing_stop is not False:
            trailing_stop = config.trailing_stop

    # Apply defaults for any remaining None values
    atr_tp_multiple = atr_tp_multiple if atr_tp_multiple is not None else 2.0
    atr_sl_multiple = atr_sl_multiple if atr_sl_multiple is not None else 1.0
    atr_period = atr_period if atr_period is not None else 14
    side = side if side is not None else 1

    # Validate OHLC columns
    required_cols = ["high", "low", "close"]
    missing = [col for col in required_cols if col not in data.columns]
    if missing:
        raise DataValidationError(
            f"ATR requires OHLC data. Missing columns: {missing}",
        )

    resolved_price_col, resolved_ts_col, resolved_group_cols = resolve_labeling_columns(
        data=data,
        price_col=price_col,
        timestamp_col=timestamp_col,
        group_col=group_col,
        config=config,
        contract=contract,
        require_timestamp=True,
    )

    validate_price_no_nans(data, resolved_price_col)

    # Compute ATR
    data_with_atr = data.with_columns(
        atr_polars("high", "low", "close", period=atr_period).alias("atr"),
    )

    # Compute ATR-based barrier distances (always positive)
    # These will be added/subtracted based on side in triple_barrier_labels
    data_with_barriers = data_with_atr.with_columns(
        [
            (pl.col("atr") * atr_tp_multiple).alias("upper_barrier_distance"),
            (pl.col("atr") * atr_sl_multiple).alias("lower_barrier_distance"),
        ],
    )

    # Create barrier configuration with dynamic barriers
    # When max_holding_bars is None, we use len(data) as the horizon which makes
    # barrier scanning O(N*L) where L=N. Warn for large datasets.
    if max_holding_bars is None:
        import warnings

        n = len(data_with_barriers)
        if n > 5000:
            warnings.warn(
                f"max_holding_bars=None with {n:,} rows sets holding period to {n:,} bars. "
                f"This makes barrier scanning O(N*{n:,}) which may be slow. "
                f"Consider setting max_holding_bars explicitly (e.g., 50-200).",
                stacklevel=2,
            )
        holding_period: int | str = n
    else:
        holding_period = max_holding_bars

    barrier_config = LabelingConfig.triple_barrier(
        upper_barrier="upper_barrier_distance",
        lower_barrier="lower_barrier_distance",
        max_holding_period=holding_period,
        side=side,
        trailing_stop=trailing_stop,
    )

    # Use existing triple_barrier_labels with dynamic barriers
    # Note: type signature allows LazyFrame but triple_barrier_labels needs DataFrame
    labeled = triple_barrier_labels(
        data_with_barriers,
        config=barrier_config,
        price_col=resolved_price_col,
        timestamp_col=resolved_ts_col,
        group_col=resolved_group_cols,
    )

    return labeled

calendar_aware_labels

calendar_aware_labels(
    data,
    config,
    calendar,
    price_col=None,
    timestamp_col=None,
    group_col=None,
    contract=None,
)

Apply triple-barrier labeling with session awareness.

Splits data by trading sessions and applies labeling within each session. This prevents labels from spanning session gaps (maintenance, overnight, holidays).

Parameters

data : pl.DataFrame Input data with OHLCV and timestamp config : LabelingConfig Barrier configuration calendar : str or TradingCalendar Either: - Calendar name string (uses pandas_market_calendars) - TradingCalendar protocol implementation - "auto" to detect gaps automatically price_col : str | None, default None Price column name timestamp_col : str | None, default None Timestamp column name group_col : str | list[str] | None, default None Grouping column(s) for panel-aware session labeling. contract : DataContractConfig | None, default None Optional shared dataframe contract. Used after config and before defaults.

Returns

pl.DataFrame Data with barrier labels, respecting session boundaries

Examples

CME futures with pandas_market_calendars

labeled = calendar_aware_labels( ... data, ... config=LabelingConfig.triple_barrier(upper_barrier=0.02, lower_barrier=0.02), ... calendar="CME_Equity" # Product-specific calendar ... )

NYSE equities

labeled = calendar_aware_labels( ... data, ... config=LabelingConfig.triple_barrier(upper_barrier=0.01, lower_barrier=0.01), ... calendar="NYSE" ... )

Auto-detect gaps

labeled = calendar_aware_labels( ... data, ... config=LabelingConfig.triple_barrier(upper_barrier=0.02, lower_barrier=0.02), ... calendar="auto" ... )

Custom calendar

class MyCalendar: ... def is_trading_time(self, ts): return True ... def next_session_break(self, ts): return None labeled = calendar_aware_labels(data, config, calendar=MyCalendar())

Notes
  • Uses pandas_market_calendars for all string calendar names
  • Supports 200+ global calendars + product-specific futures calendars
  • See pandas_market_calendars.get_calendar_names() for available calendars
  • Labels that would span session breaks are truncated at the break
  • This may result in more timeout labels near session closes
  • For 24/7 markets, use standard triple_barrier_labels instead
Source code in src/ml4t/engineer/labeling/calendar.py
def calendar_aware_labels(
    data: pl.DataFrame,
    config: LabelingConfig,
    calendar: str | TradingCalendar,
    price_col: str | None = None,
    timestamp_col: str | None = None,
    group_col: str | list[str] | None = None,
    contract: DataContractConfig | None = None,
) -> pl.DataFrame:
    """Apply triple-barrier labeling with session awareness.

    Splits data by trading sessions and applies labeling within each session.
    This prevents labels from spanning session gaps (maintenance, overnight, holidays).

    Parameters
    ----------
    data : pl.DataFrame
        Input data with OHLCV and timestamp
    config : LabelingConfig
        Barrier configuration
    calendar : str or TradingCalendar
        Either:
        - Calendar name string (uses pandas_market_calendars)
        - TradingCalendar protocol implementation
        - "auto" to detect gaps automatically
    price_col : str | None, default None
        Price column name
    timestamp_col : str | None, default None
        Timestamp column name
    group_col : str | list[str] | None, default None
        Grouping column(s) for panel-aware session labeling.
    contract : DataContractConfig | None, default None
        Optional shared dataframe contract. Used after config and before defaults.

    Returns
    -------
    pl.DataFrame
        Data with barrier labels, respecting session boundaries

    Examples
    --------
    >>> # CME futures with pandas_market_calendars
    >>> labeled = calendar_aware_labels(
    ...     data,
    ...     config=LabelingConfig.triple_barrier(upper_barrier=0.02, lower_barrier=0.02),
    ...     calendar="CME_Equity"  # Product-specific calendar
    ... )

    >>> # NYSE equities
    >>> labeled = calendar_aware_labels(
    ...     data,
    ...     config=LabelingConfig.triple_barrier(upper_barrier=0.01, lower_barrier=0.01),
    ...     calendar="NYSE"
    ... )

    >>> # Auto-detect gaps
    >>> labeled = calendar_aware_labels(
    ...     data,
    ...     config=LabelingConfig.triple_barrier(upper_barrier=0.02, lower_barrier=0.02),
    ...     calendar="auto"
    ... )

    >>> # Custom calendar
    >>> class MyCalendar:
    ...     def is_trading_time(self, ts): return True
    ...     def next_session_break(self, ts): return None
    >>> labeled = calendar_aware_labels(data, config, calendar=MyCalendar())

    Notes
    -----
    - Uses pandas_market_calendars for all string calendar names
    - Supports 200+ global calendars + product-specific futures calendars
    - See pandas_market_calendars.get_calendar_names() for available calendars
    - Labels that would span session breaks are truncated at the break
    - This may result in more timeout labels near session closes
    - For 24/7 markets, use standard triple_barrier_labels instead
    """
    from ml4t.engineer.config import LabelingConfig as _LabelingConfig

    if not isinstance(config, _LabelingConfig):
        raise TypeError(
            "calendar_aware_labels expects LabelingConfig. "
            "Legacy BarrierConfig inputs are no longer supported; "
            "use LabelingConfig.triple_barrier(...)."
        )

    resolved_price_col, resolved_ts_col, resolved_group_cols = resolve_labeling_columns(
        data=data,
        price_col=price_col,
        timestamp_col=timestamp_col,
        group_col=group_col,
        config=config,
        contract=contract,
        require_timestamp=True,
    )
    if config.method != "triple_barrier":
        raise ValueError("calendar_aware_labels requires LabelingConfig.method='triple_barrier'.")

    # Create calendar instance if string provided
    if isinstance(calendar, str):
        if calendar == "auto":
            cal = SimpleTradingCalendar()
            cal.fit(data, resolved_ts_col)
        else:
            # Always use pandas_market_calendars for string calendars
            cal = PandasMarketCalendar(calendar)
    else:
        cal = calendar

    # Identify session boundaries
    # Add session ID column by detecting gaps
    # Use calendar's gap threshold if available, otherwise default to 30 minutes
    if isinstance(cal, SimpleTradingCalendar):
        gap_threshold_minutes = cal.gap_threshold.total_seconds() / 60
    else:
        gap_threshold_minutes = 30

    time_diff_expr = pl.col(resolved_ts_col).diff()
    if resolved_group_cols:
        time_diff_expr = time_diff_expr.over(resolved_group_cols)

    data_with_session = data.with_columns(time_diff_expr.alias("_time_diff")).with_columns(
        (pl.col("_time_diff") > pl.duration(minutes=int(gap_threshold_minutes)))
        .fill_null(False)
        .alias("_new_session")
    )

    session_expr = pl.col("_new_session").cum_sum()
    if resolved_group_cols:
        session_expr = session_expr.over(resolved_group_cols)

    data_with_session = data_with_session.with_columns(session_expr.alias("session_id"))

    session_group_cols = (
        [*resolved_group_cols, "session_id"] if resolved_group_cols else ["session_id"]
    )
    labeled = triple_barrier_labels(
        data=data_with_session.drop(["_time_diff", "_new_session"]),
        config=config,
        price_col=resolved_price_col,
        timestamp_col=resolved_ts_col,
        group_col=session_group_cols,
    )
    return labeled.drop("session_id")

fixed_time_horizon_labels

fixed_time_horizon_labels(
    data,
    horizon=1,
    method="returns",
    price_col=None,
    group_col=None,
    timestamp_col=None,
    tolerance=None,
    *,
    config=None,
    contract=None,
)

Generate forward-looking labels based on fixed time horizon.

Creates labels by looking ahead a fixed number of periods (bars) or a fixed time duration and computing the return or direction of price movement. Commonly used for supervised learning in financial forecasting.

Parameters

data : pl.DataFrame Input data with price information horizon : int | str, default 1 Horizon for forward-looking labels: - int: Number of bars to look ahead - str: Duration string (e.g., '1h', '30m', '1d') for time-based horizon method : str, default "returns" Labeling method: - "returns": (price[t+h] - price[t]) / price[t] - "log_returns": log(price[t+h] / price[t]) - "binary": 1 if price[t+h] > price[t] else -1 price_col : str | None, default None Name of the price column to use group_col : str | list[str] | None, default None Column(s) to group by for per-asset labels. If None, auto-detects from common column names: 'symbol', 'product' (futures), or uses composite grouping if 'position' column exists (e.g., for futures contract months). Pass an empty list explicitly to disable grouping. timestamp_col : str | None, default None Column to use for chronological sorting. If None, auto-detects from column dtype (pl.Datetime, pl.Date). Required for time-based horizons. tolerance : str | None, default None Maximum time gap allowed for time-based horizons (e.g., '2m'). Only used when horizon is a duration string. If the nearest future price is beyond this tolerance, the label will be null. config : LabelingConfig | None, default None Optional column contract source. If provided, price_col, timestamp_col, and group_col default to config values when omitted. contract : DataContractConfig | None, default None Optional shared dataframe contract. Used after config and before defaults.

Returns

pl.DataFrame Original data with additional label column. Last horizon values per group will be null (insufficient future data).

Examples

Bar-based: 5-period forward returns (unchanged API)

labeled = fixed_time_horizon_labels(df, horizon=5, method="returns")

Time-based: 1-hour forward returns

labeled = fixed_time_horizon_labels(df, horizon="1h", method="returns")

Time-based with tolerance for irregular data

labeled = fixed_time_horizon_labels( ... df, horizon="15m", tolerance="2m", method="returns" ... )

Binary classification (up/down)

labeled = fixed_time_horizon_labels(df, horizon=1, method="binary")

Log returns for ML training

labeled = fixed_time_horizon_labels(df, horizon="1d", method="log_returns")

Notes

This is a simple labeling method that: - Uses future information (forward-looking) - Cannot be used for live prediction (requires future data) - Best for supervised learning model training - Last horizon rows will have null labels

Time-based horizons: When horizon is a duration string (e.g., '1h'), the function uses join_asof to find the first available price at or after that time in the future. This is useful for: - Irregular data (trade bars) where you want time-based returns - Multi-frequency workflows where time semantics matter - Calendar-aware operations across trading breaks

Bar-based horizons: When horizon is an integer, the function uses simple shift operations for maximum performance.

Important: Data is automatically sorted by [group_cols, timestamp] before computing labels. This is required because Polars .over() preserves row order and does not sort within groups. The result is returned sorted chronologically within each group.

References

.. [1] De Prado, M.L. (2018). Advances in Financial Machine Learning. Wiley. Chapter 3: Labeling.

See Also

triple_barrier_labels : Path-dependent labeling with profit/loss targets trend_scanning_labels : De Prado's trend scanning method

Source code in src/ml4t/engineer/labeling/horizon_labels.py
def fixed_time_horizon_labels(
    data: pl.DataFrame,
    horizon: int | str = 1,
    method: str = "returns",
    price_col: str | None = None,
    group_col: str | list[str] | None = None,
    timestamp_col: str | None = None,
    tolerance: str | None = None,
    *,
    config: LabelingConfig | None = None,
    contract: DataContractConfig | None = None,
) -> pl.DataFrame:
    """Generate forward-looking labels based on fixed time horizon.

    Creates labels by looking ahead a fixed number of periods (bars) or a
    fixed time duration and computing the return or direction of price
    movement. Commonly used for supervised learning in financial forecasting.

    Parameters
    ----------
    data : pl.DataFrame
        Input data with price information
    horizon : int | str, default 1
        Horizon for forward-looking labels:
        - int: Number of bars to look ahead
        - str: Duration string (e.g., '1h', '30m', '1d') for time-based horizon
    method : str, default "returns"
        Labeling method:
        - "returns": (price[t+h] - price[t]) / price[t]
        - "log_returns": log(price[t+h] / price[t])
        - "binary": 1 if price[t+h] > price[t] else -1
    price_col : str | None, default None
        Name of the price column to use
    group_col : str | list[str] | None, default None
        Column(s) to group by for per-asset labels. If None, auto-detects from
        common column names: 'symbol', 'product' (futures), or uses composite
        grouping if 'position' column exists (e.g., for futures contract months).
        Pass an empty list explicitly to disable grouping.
    timestamp_col : str | None, default None
        Column to use for chronological sorting. If None, auto-detects from
        column dtype (pl.Datetime, pl.Date). Required for time-based horizons.
    tolerance : str | None, default None
        Maximum time gap allowed for time-based horizons (e.g., '2m').
        Only used when horizon is a duration string. If the nearest future
        price is beyond this tolerance, the label will be null.
    config : LabelingConfig | None, default None
        Optional column contract source. If provided, `price_col`, `timestamp_col`,
        and `group_col` default to config values when omitted.
    contract : DataContractConfig | None, default None
        Optional shared dataframe contract. Used after config and before defaults.

    Returns
    -------
    pl.DataFrame
        Original data with additional label column.
        Last `horizon` values per group will be null (insufficient future data).

    Examples
    --------
    >>> # Bar-based: 5-period forward returns (unchanged API)
    >>> labeled = fixed_time_horizon_labels(df, horizon=5, method="returns")
    >>>
    >>> # Time-based: 1-hour forward returns
    >>> labeled = fixed_time_horizon_labels(df, horizon="1h", method="returns")
    >>>
    >>> # Time-based with tolerance for irregular data
    >>> labeled = fixed_time_horizon_labels(
    ...     df, horizon="15m", tolerance="2m", method="returns"
    ... )
    >>>
    >>> # Binary classification (up/down)
    >>> labeled = fixed_time_horizon_labels(df, horizon=1, method="binary")
    >>>
    >>> # Log returns for ML training
    >>> labeled = fixed_time_horizon_labels(df, horizon="1d", method="log_returns")

    Notes
    -----
    This is a simple labeling method that:
    - Uses future information (forward-looking)
    - Cannot be used for live prediction (requires future data)
    - Best for supervised learning model training
    - Last `horizon` rows will have null labels

    **Time-based horizons**: When horizon is a duration string (e.g., '1h'),
    the function uses ``join_asof`` to find the first available price at or
    after that time in the future. This is useful for:
    - Irregular data (trade bars) where you want time-based returns
    - Multi-frequency workflows where time semantics matter
    - Calendar-aware operations across trading breaks

    **Bar-based horizons**: When horizon is an integer, the function uses
    simple shift operations for maximum performance.

    **Important**: Data is automatically sorted by [group_cols, timestamp] before
    computing labels. This is required because Polars ``.over()`` preserves row
    order and does not sort within groups. The result is returned sorted
    chronologically within each group.

    References
    ----------
    .. [1] De Prado, M.L. (2018). Advances in Financial Machine Learning. Wiley.
           Chapter 3: Labeling.

    See Also
    --------
    triple_barrier_labels : Path-dependent labeling with profit/loss targets
    trend_scanning_labels : De Prado's trend scanning method
    """
    if method not in ["returns", "log_returns", "binary"]:
        raise ValueError(f"Unknown method: {method}. Use 'returns', 'log_returns', or 'binary'")

    # Determine if time-based or bar-based
    is_time_based = isinstance(horizon, str) and is_duration_string(horizon)
    resolved_price_col, resolved_ts_col, resolved_group_cols = resolve_labeling_columns(
        data=data,
        price_col=price_col,
        timestamp_col=timestamp_col,
        group_col=group_col,
        config=config,
        contract=contract,
        require_timestamp=False,
    )
    if is_time_based and resolved_ts_col is None:
        raise ValueError(
            "Time-based horizon requires a timestamp column. "
            "Provide timestamp_col parameter or ensure data has a datetime column.",
        )

    validate_price_no_nans(data, resolved_price_col)

    if is_time_based:
        return _time_based_horizon_labels(
            data=data,
            horizon=horizon,  # type: ignore[arg-type]
            method=method,
            price_col=resolved_price_col,
            group_cols=resolved_group_cols,
            timestamp_col=resolved_ts_col,
            tolerance=tolerance,
        )
    else:
        # Bar-based: validate horizon is positive int
        if isinstance(horizon, str):
            raise ValueError(
                f"Invalid horizon: '{horizon}'. For bar-based labels use an integer, "
                f"for time-based labels use a duration string like '1h', '30m'."
            )
        if horizon <= 0:
            raise ValueError("horizon must be positive")

        return _bar_based_horizon_labels(
            data=data,
            horizon=horizon,
            method=method,
            price_col=resolved_price_col,
            group_cols=resolved_group_cols,
            timestamp_col=resolved_ts_col,
        )

trend_scanning_labels

trend_scanning_labels(
    data,
    min_window=5,
    max_window=50,
    step=1,
    price_col=None,
    timestamp_col=None,
    group_col=None,
    *,
    config=None,
    contract=None,
)

Generate labels using De Prado's trend scanning method.

For each observation, fits linear trends over windows of varying lengths and selects the window with the highest absolute t-statistic. The label is assigned based on the trend direction (sign of the t-statistic).

This method is more robust than fixed-horizon labeling as it adapts to the local trend structure in the data.

Parameters

data : pl.DataFrame Input data with price information min_window : int, default 5 Minimum window size to scan max_window : int, default 50 Maximum window size to scan step : int, default 1 Step size for window scanning price_col : str | None, default None Name of the price column to use timestamp_col : str | None, default None Column to use for chronological sorting. If None, auto-detects from column dtype (pl.Datetime, pl.Date). Required for correct scanning. group_col : str | list[str] | None, default None Column(s) to group by for per-asset labels. If None, auto-detects from common column names: 'symbol', 'product', 'ticker'. Pass an empty list explicitly to disable grouping. config : LabelingConfig | None, default None Optional column contract source. If provided, price_col and timestamp_col default to config values when omitted. contract : DataContractConfig | None, default None Optional shared dataframe contract. Used after config and before defaults.

Returns

pl.DataFrame Original data with additional columns: - label: ±1 based on trend direction - t_value: t-statistic of the selected trend - optimal_window: window size with highest |t-value|

Examples

Scan windows from 5 to 50 bars

labeled = trend_scanning_labels(df, min_window=5, max_window=50)

Fast scanning with larger steps

labeled = trend_scanning_labels(df, min_window=10, max_window=100, step=5)

Panel data: per-asset scanning

labeled = trend_scanning_labels(df, group_col="symbol")

Notes

The trend scanning method: 1. For each observation, scans forward with windows of varying lengths 2. Fits a linear regression to each window 3. Computes t-statistic for the slope coefficient 4. Selects the window with highest absolute t-statistic 5. Assigns label = sign(t-statistic)

This approach: - Adapts to local trend structure - More robust than fixed horizons - Computationally expensive (O(n * m) where m = window range)

Important: Data is automatically sorted by [group_col, timestamp] before scanning. This is required because the algorithm scans forward in row order.

References

.. [1] De Prado, M.L. (2018). Advances in Financial Machine Learning. Wiley. Chapter 18: Entropy Features (Section on Trend Scanning).

See Also

fixed_time_horizon_labels : Simple fixed-horizon labeling triple_barrier_labels : Path-dependent labeling with barriers

Source code in src/ml4t/engineer/labeling/horizon_labels.py
def trend_scanning_labels(
    data: pl.DataFrame,
    min_window: int = 5,
    max_window: int = 50,
    step: int = 1,
    price_col: str | None = None,
    timestamp_col: str | None = None,
    group_col: str | list[str] | None = None,
    *,
    config: LabelingConfig | None = None,
    contract: DataContractConfig | None = None,
) -> pl.DataFrame:
    """Generate labels using De Prado's trend scanning method.

    For each observation, fits linear trends over windows of varying lengths
    and selects the window with the highest absolute t-statistic. The label
    is assigned based on the trend direction (sign of the t-statistic).

    This method is more robust than fixed-horizon labeling as it adapts to
    the local trend structure in the data.

    Parameters
    ----------
    data : pl.DataFrame
        Input data with price information
    min_window : int, default 5
        Minimum window size to scan
    max_window : int, default 50
        Maximum window size to scan
    step : int, default 1
        Step size for window scanning
    price_col : str | None, default None
        Name of the price column to use
    timestamp_col : str | None, default None
        Column to use for chronological sorting. If None, auto-detects from
        column dtype (pl.Datetime, pl.Date). Required for correct scanning.
    group_col : str | list[str] | None, default None
        Column(s) to group by for per-asset labels. If None, auto-detects from
        common column names: 'symbol', 'product', 'ticker'.
        Pass an empty list explicitly to disable grouping.
    config : LabelingConfig | None, default None
        Optional column contract source. If provided, `price_col` and
        `timestamp_col` default to config values when omitted.
    contract : DataContractConfig | None, default None
        Optional shared dataframe contract. Used after config and before defaults.

    Returns
    -------
    pl.DataFrame
        Original data with additional columns:
        - label: ±1 based on trend direction
        - t_value: t-statistic of the selected trend
        - optimal_window: window size with highest |t-value|

    Examples
    --------
    >>> # Scan windows from 5 to 50 bars
    >>> labeled = trend_scanning_labels(df, min_window=5, max_window=50)
    >>>
    >>> # Fast scanning with larger steps
    >>> labeled = trend_scanning_labels(df, min_window=10, max_window=100, step=5)
    >>>
    >>> # Panel data: per-asset scanning
    >>> labeled = trend_scanning_labels(df, group_col="symbol")

    Notes
    -----
    The trend scanning method:
    1. For each observation, scans forward with windows of varying lengths
    2. Fits a linear regression to each window
    3. Computes t-statistic for the slope coefficient
    4. Selects the window with highest absolute t-statistic
    5. Assigns label = sign(t-statistic)

    This approach:
    - Adapts to local trend structure
    - More robust than fixed horizons
    - Computationally expensive (O(n * m) where m = window range)

    **Important**: Data is automatically sorted by [group_col, timestamp] before
    scanning. This is required because the algorithm scans forward in row order.

    References
    ----------
    .. [1] De Prado, M.L. (2018). Advances in Financial Machine Learning. Wiley.
           Chapter 18: Entropy Features (Section on Trend Scanning).

    See Also
    --------
    fixed_time_horizon_labels : Simple fixed-horizon labeling
    triple_barrier_labels : Path-dependent labeling with barriers
    """
    if min_window < 2:
        raise ValueError("min_window must be at least 2")
    if max_window <= min_window:
        raise ValueError("max_window must be greater than min_window")
    if step < 1:
        raise ValueError("step must be at least 1")
    resolved_price_col, resolved_ts_col, group_cols = resolve_labeling_columns(
        data=data,
        price_col=price_col,
        timestamp_col=timestamp_col,
        group_col=group_col,
        config=config,
        contract=contract,
    )

    validate_price_no_nans(data, resolved_price_col)

    if group_cols:
        sort_cols = group_cols + ([resolved_ts_col] if resolved_ts_col else [])
        sorted_data = data.sort(sort_cols)
        grouped_frames = sorted_data.partition_by(group_cols, maintain_order=True)

        grouped_results = [
            _trend_scanning_single_group(
                data=group_df,
                min_window=min_window,
                max_window=max_window,
                step=step,
                price_col=resolved_price_col,
                timestamp_col=resolved_ts_col,
            )
            for group_df in grouped_frames
        ]
        return pl.concat(grouped_results, how="vertical")

    return _trend_scanning_single_group(
        data=data,
        min_window=min_window,
        max_window=max_window,
        step=step,
        price_col=resolved_price_col,
        timestamp_col=resolved_ts_col,
    )

apply_meta_model

apply_meta_model(
    data,
    primary_signal_col,
    meta_probability_col,
    bet_size_method="sigmoid",
    scale=5.0,
    threshold=0.5,
    output_col="sized_signal",
)

Apply meta-model probability to size primary signal bets.

Combines the primary model's directional signal with the meta-model's confidence estimate to produce a sized position signal.

Parameters

data : pl.DataFrame Input DataFrame with signal and probability columns. primary_signal_col : str Column with primary model signal (typically +1, -1, or 0). meta_probability_col : str Column with meta-model predicted probability [0, 1]. bet_size_method : {"linear", "sigmoid", "discrete"}, default "sigmoid" Method to convert probability to bet size. See compute_bet_size. scale : float, default 5.0 Scaling factor for sigmoid method. threshold : float, default 0.5 Threshold for discrete method. output_col : str, default "sized_signal" Name for the output column.

Returns

pl.DataFrame Original DataFrame with added sized signal column: sized_signal = sign(primary_signal) * bet_size(probability)

Notes

The sized signal is computed as:

.. math::

\text{sized\_signal} = \text{sign}(\text{signal}) \cdot f(\text{probability})

where f() is the bet sizing function.

The output can be used directly as position weights in a backtest, where the sign indicates direction and magnitude indicates conviction.

Examples

import polars as pl from ml4t.engineer.labeling import apply_meta_model

df = pl.DataFrame({ ... "signal": [1, -1, 1, -1], ... "meta_prob": [0.8, 0.3, 0.5, 0.9], ... }) result = apply_meta_model(df, "signal", "meta_prob")

High prob + long signal -> strong positive

Low prob + short signal -> weak negative (may filter)

0.5 prob + any signal -> near zero (uncertain)

See Also

meta_labels : Create meta-labels for training meta-model. compute_bet_size : Underlying bet sizing functions.

Source code in src/ml4t/engineer/labeling/meta_labels.py
def apply_meta_model(
    data: pl.DataFrame,
    primary_signal_col: str,
    meta_probability_col: str,
    bet_size_method: Literal["linear", "sigmoid", "discrete"] = "sigmoid",
    scale: float = 5.0,
    threshold: float = 0.5,
    output_col: str = "sized_signal",
) -> pl.DataFrame:
    """Apply meta-model probability to size primary signal bets.

    Combines the primary model's directional signal with the meta-model's
    confidence estimate to produce a sized position signal.

    Parameters
    ----------
    data : pl.DataFrame
        Input DataFrame with signal and probability columns.
    primary_signal_col : str
        Column with primary model signal (typically +1, -1, or 0).
    meta_probability_col : str
        Column with meta-model predicted probability [0, 1].
    bet_size_method : {"linear", "sigmoid", "discrete"}, default "sigmoid"
        Method to convert probability to bet size. See `compute_bet_size`.
    scale : float, default 5.0
        Scaling factor for sigmoid method.
    threshold : float, default 0.5
        Threshold for discrete method.
    output_col : str, default "sized_signal"
        Name for the output column.

    Returns
    -------
    pl.DataFrame
        Original DataFrame with added sized signal column:
        sized_signal = sign(primary_signal) * bet_size(probability)

    Notes
    -----
    The sized signal is computed as:

    .. math::

        \\text{sized\\_signal} = \\text{sign}(\\text{signal}) \\cdot f(\\text{probability})

    where f() is the bet sizing function.

    The output can be used directly as position weights in a backtest,
    where the sign indicates direction and magnitude indicates conviction.

    Examples
    --------
    >>> import polars as pl
    >>> from ml4t.engineer.labeling import apply_meta_model
    >>>
    >>> df = pl.DataFrame({
    ...     "signal": [1, -1, 1, -1],
    ...     "meta_prob": [0.8, 0.3, 0.5, 0.9],
    ... })
    >>> result = apply_meta_model(df, "signal", "meta_prob")
    >>> # High prob + long signal -> strong positive
    >>> # Low prob + short signal -> weak negative (may filter)
    >>> # 0.5 prob + any signal -> near zero (uncertain)

    See Also
    --------
    meta_labels : Create meta-labels for training meta-model.
    compute_bet_size : Underlying bet sizing functions.
    """
    signal = pl.col(primary_signal_col)

    # Compute bet size from probability
    bet_size = compute_bet_size(
        meta_probability_col,
        method=bet_size_method,
        scale=scale,
        threshold=threshold,
    )

    # Sized signal: direction from primary, magnitude from meta
    sized_signal = signal.sign() * bet_size.abs()

    return data.with_columns(sized_signal.alias(output_col))

compute_bet_size

compute_bet_size(
    probability, method="sigmoid", scale=1.0, threshold=0.5
)

Compute bet size from meta-model probability.

Transforms the meta-model's predicted probability of success into a bet sizing coefficient. Higher probability leads to larger positions.

Parameters

probability : pl.Expr | str Column containing meta-model probability predictions [0, 1]. method : {"linear", "sigmoid", "discrete"}, default "sigmoid" Bet sizing function: - "linear": bet_size = 2 * (prob - 0.5), range [-1, 1] - "sigmoid": bet_size = (1 + e(-scale*(prob-0.5)))-1 * 2 - 1 - "discrete": bet_size = 1 if prob > threshold else 0 scale : float, default 1.0 Scaling factor for sigmoid. Higher values create sharper cutoff. Ignored for "linear" and "discrete" methods. threshold : float, default 0.5 Probability threshold for "discrete" method. Ignored for "linear" and "sigmoid" methods.

Returns

pl.Expr Bet size coefficient, typically in range [0, 1] or [-1, 1].

Notes

The bet size methods are:

Linear: Simple linear scaling centered at 0.5 .. math::

\text{bet\_size} = 2 \cdot (p - 0.5)

Sigmoid: S-curve that concentrates bets near extremes .. math::

\text{bet\_size} = \frac{2}{1 + e^{-s \cdot (p - 0.5)}} - 1

Discrete: Binary sizing based on threshold .. math::

\text{bet\_size} = \mathbb{1}[p > \text{threshold}]
Examples

import polars as pl from ml4t.engineer.labeling import compute_bet_size

df = pl.DataFrame({"prob": [0.3, 0.5, 0.7, 0.9]}) df.with_columns( ... compute_bet_size("prob", method="linear").alias("linear"), ... compute_bet_size("prob", method="sigmoid", scale=5.0).alias("sigmoid"), ... compute_bet_size("prob", method="discrete", threshold=0.6).alias("discrete"), ... )

Source code in src/ml4t/engineer/labeling/meta_labels.py
def compute_bet_size(
    probability: pl.Expr | str,
    method: Literal["linear", "sigmoid", "discrete"] = "sigmoid",
    scale: float = 1.0,
    threshold: float = 0.5,
) -> pl.Expr:
    """Compute bet size from meta-model probability.

    Transforms the meta-model's predicted probability of success into
    a bet sizing coefficient. Higher probability leads to larger positions.

    Parameters
    ----------
    probability : pl.Expr | str
        Column containing meta-model probability predictions [0, 1].
    method : {"linear", "sigmoid", "discrete"}, default "sigmoid"
        Bet sizing function:
        - "linear": bet_size = 2 * (prob - 0.5), range [-1, 1]
        - "sigmoid": bet_size = (1 + e^(-scale*(prob-0.5)))^-1 * 2 - 1
        - "discrete": bet_size = 1 if prob > threshold else 0
    scale : float, default 1.0
        Scaling factor for sigmoid. Higher values create sharper cutoff.
        Ignored for "linear" and "discrete" methods.
    threshold : float, default 0.5
        Probability threshold for "discrete" method.
        Ignored for "linear" and "sigmoid" methods.

    Returns
    -------
    pl.Expr
        Bet size coefficient, typically in range [0, 1] or [-1, 1].

    Notes
    -----
    The bet size methods are:

    **Linear**: Simple linear scaling centered at 0.5
    .. math::

        \\text{bet\\_size} = 2 \\cdot (p - 0.5)

    **Sigmoid**: S-curve that concentrates bets near extremes
    .. math::

        \\text{bet\\_size} = \\frac{2}{1 + e^{-s \\cdot (p - 0.5)}} - 1

    **Discrete**: Binary sizing based on threshold
    .. math::

        \\text{bet\\_size} = \\mathbb{1}[p > \\text{threshold}]

    Examples
    --------
    >>> import polars as pl
    >>> from ml4t.engineer.labeling import compute_bet_size
    >>>
    >>> df = pl.DataFrame({"prob": [0.3, 0.5, 0.7, 0.9]})
    >>> df.with_columns(
    ...     compute_bet_size("prob", method="linear").alias("linear"),
    ...     compute_bet_size("prob", method="sigmoid", scale=5.0).alias("sigmoid"),
    ...     compute_bet_size("prob", method="discrete", threshold=0.6).alias("discrete"),
    ... )
    """
    prob = pl.col(probability) if isinstance(probability, str) else probability

    if method == "linear":
        # Linear: 0.0 -> -1, 0.5 -> 0, 1.0 -> 1
        return (prob - 0.5) * 2

    elif method == "sigmoid":
        # Sigmoid: S-curve centered at 0.5, scaled to [-1, 1]
        # Using polars expressions for the sigmoid transform
        x = (prob - 0.5) * scale
        sigmoid = 1 / (1 + (-x).exp())
        return sigmoid * 2 - 1

    elif method == "discrete":
        # Discrete: binary 0/1 based on threshold
        return pl.when(prob > threshold).then(1.0).otherwise(0.0)

    else:
        msg = f"Unknown method: {method}. Use 'linear', 'sigmoid', or 'discrete'."
        raise ValueError(msg)

compute_label_statistics

compute_label_statistics(data, label_col)

Compute statistics for a binary label column.

Useful for validating label quality and understanding class balance.

Parameters

data : pl.DataFrame Data with label column label_col : str Name of binary label column

Returns

dict Statistics including: - total_bars: Total number of bars - positive_labels: Count of 1s - negative_labels: Count of 0s - null_labels: Count of nulls - positive_rate: Percentage of 1s (among non-null) - null_rate: Percentage of nulls

Examples

stats = compute_label_statistics(df, "label_long_p95_h30") print(f"Positive rate: {stats['positive_rate']:.2f}%") print(f"Null rate: {stats['null_rate']:.2f}%")

Source code in src/ml4t/engineer/labeling/percentile_labels.py
def compute_label_statistics(
    data: pl.DataFrame,
    label_col: str,
) -> dict[str, float | int]:
    """Compute statistics for a binary label column.

    Useful for validating label quality and understanding class balance.

    Parameters
    ----------
    data : pl.DataFrame
        Data with label column
    label_col : str
        Name of binary label column

    Returns
    -------
    dict
        Statistics including:
        - total_bars: Total number of bars
        - positive_labels: Count of 1s
        - negative_labels: Count of 0s
        - null_labels: Count of nulls
        - positive_rate: Percentage of 1s (among non-null)
        - null_rate: Percentage of nulls

    Examples
    --------
    >>> stats = compute_label_statistics(df, "label_long_p95_h30")
    >>> print(f"Positive rate: {stats['positive_rate']:.2f}%")
    >>> print(f"Null rate: {stats['null_rate']:.2f}%")
    """
    labels = data[label_col]

    total = len(labels)
    nulls = labels.null_count()
    non_null = total - nulls

    if non_null > 0:
        positives = labels.filter(labels == 1).len()
        negatives = labels.filter(labels == 0).len()
        positive_rate = (positives / non_null) * 100
    else:
        positives = 0
        negatives = 0
        positive_rate = 0.0

    return {
        "total_bars": total,
        "positive_labels": positives,
        "negative_labels": negatives,
        "null_labels": nulls,
        "positive_rate": positive_rate,
        "null_rate": (nulls / total) * 100,
    }

rolling_percentile_binary_labels

rolling_percentile_binary_labels(
    data,
    horizon,
    percentile,
    direction="long",
    lookback_window=252 * 24 * 12,
    price_col=None,
    session_col=None,
    min_samples=None,
    group_col=None,
    timestamp_col=None,
    tolerance=None,
    *,
    config=None,
    contract=None,
)

Create binary labels using rolling historical percentiles.

Computes forward returns, then creates binary labels by comparing returns to rolling percentile thresholds. Thresholds adapt to volatility regimes.

Algorithm: 1. Compute forward returns over horizon (session-aware if session_col provided) 2. Compute rolling percentile from lookback window 3. For long: label = 1 if forward_return >= threshold, else 0 For short: label = 1 if forward_return <= threshold, else 0

Parameters

data : pl.DataFrame Input data with OHLCV and optionally session_date horizon : int | str Forward-looking horizon: - int: Number of bars - str: Duration string (e.g., '1h', '30m', '1d') percentile : float Percentile for thresholding (0-100) - Long: High percentiles (e.g., 95, 98) → top returns - Short: Low percentiles (e.g., 5, 10) → bottom returns direction : {"long", "short"}, default "long" Trading direction: - "long": Labels profitable long entries (high positive returns) - "short": Labels profitable short entries (high negative returns) lookback_window : int | str, default ~1 year Rolling window size for percentile computation: - int: Number of bars - str: Duration string (e.g., '5d', '1w'). Polars rolling supports duration strings. price_col : str | None, default None Price column for return computation session_col : str, optional Session column for session-aware forward returns (e.g., "session_date") If provided, forward returns won't cross session boundaries min_samples : int, optional Minimum samples for rolling calculation (default: 1008 = ~3.5 days of 5-min bars) group_col : str | list[str] | None, default None Column(s) to group by for panel-aware labeling. If None, auto-detects from common symbol columns when present. timestamp_col : str | None, default None Column to use for chronological sorting. If None, auto-detects from column dtype (pl.Datetime, pl.Date). Required for time-based horizons. tolerance : str | None, default None Maximum time gap allowed for time-based horizons (e.g., '2m'). Only used when horizon is a duration string. config : LabelingConfig | None, default None Optional column contract source. If provided, price_col, timestamp_col, and group_col default to config values when omitted. contract : DataContractConfig | None, default None Optional shared dataframe contract. Used after config and before defaults.

Returns

pl.DataFrame Original data with added columns: - forward_return_{horizon}: Forward returns - threshold_p{percentile}h{horizon}: Rolling percentile threshold - label: Binary label (0 or 1)}_p{percentile}_h{horizon

Examples

Bar-based: Top 5% of 30-bar returns

labels_long = rolling_percentile_binary_labels( ... df, ... horizon=30, ... percentile=95, ... direction="long", ... session_col="session_date" ... ) print(labels_long["label_long_p95_h30"].mean()) # Should be ~0.05

Time-based: 1-hour forward returns with 5-day lookback

labels = rolling_percentile_binary_labels( ... df, ... horizon="1h", ... percentile=95, ... direction="long", ... lookback_window="5d", ... )

Short labels: Bottom 5% of returns (5th percentile)

labels_short = rolling_percentile_binary_labels( ... df, ... horizon=30, ... percentile=5, ... direction="short", ... session_col="session_date" ... )

Notes
  • First lookback_window bars will have null labels (insufficient history)
  • Last horizon bars will have null forward returns (insufficient future data)
  • Class balance approximately matches percentile (p95 → ~5% positives)
  • Adaptive: Thresholds widen in high volatility, tighten in low volatility
  • No lookahead bias: Only uses past data for percentile computation

Time-based horizons: When horizon is a duration string, uses join_asof to get future prices. This is useful for irregular data like trade bars.

Time-based lookback: Polars rolling functions natively support duration strings for the window parameter, allowing time-based rolling windows.

Important: Data is automatically sorted by timestamp before labeling. This is required because Polars .over() and .shift() preserve row order. The result is returned sorted chronologically.

Source code in src/ml4t/engineer/labeling/percentile_labels.py
def rolling_percentile_binary_labels(
    data: pl.DataFrame,
    horizon: int | str,
    percentile: float,
    direction: Literal["long", "short"] = "long",
    lookback_window: int | str = 252 * 24 * 12,  # ~1 year hourly
    price_col: str | None = None,
    session_col: str | None = None,
    min_samples: int | None = None,
    group_col: str | list[str] | None = None,
    timestamp_col: str | None = None,
    tolerance: str | None = None,
    *,
    config: LabelingConfig | None = None,
    contract: DataContractConfig | None = None,
) -> pl.DataFrame:
    """Create binary labels using rolling historical percentiles.

    Computes forward returns, then creates binary labels by comparing returns
    to rolling percentile thresholds. Thresholds adapt to volatility regimes.

    Algorithm:
    1. Compute forward returns over horizon (session-aware if session_col provided)
    2. Compute rolling percentile from lookback window
    3. For long: label = 1 if forward_return >= threshold, else 0
       For short: label = 1 if forward_return <= threshold, else 0

    Parameters
    ----------
    data : pl.DataFrame
        Input data with OHLCV and optionally session_date
    horizon : int | str
        Forward-looking horizon:
        - int: Number of bars
        - str: Duration string (e.g., '1h', '30m', '1d')
    percentile : float
        Percentile for thresholding (0-100)
        - Long: High percentiles (e.g., 95, 98) → top returns
        - Short: Low percentiles (e.g., 5, 10) → bottom returns
    direction : {"long", "short"}, default "long"
        Trading direction:
        - "long": Labels profitable long entries (high positive returns)
        - "short": Labels profitable short entries (high negative returns)
    lookback_window : int | str, default ~1 year
        Rolling window size for percentile computation:
        - int: Number of bars
        - str: Duration string (e.g., '5d', '1w'). Polars rolling supports duration strings.
    price_col : str | None, default None
        Price column for return computation
    session_col : str, optional
        Session column for session-aware forward returns (e.g., "session_date")
        If provided, forward returns won't cross session boundaries
    min_samples : int, optional
        Minimum samples for rolling calculation (default: 1008 = ~3.5 days of 5-min bars)
    group_col : str | list[str] | None, default None
        Column(s) to group by for panel-aware labeling. If None, auto-detects from
        common symbol columns when present.
    timestamp_col : str | None, default None
        Column to use for chronological sorting. If None, auto-detects from
        column dtype (pl.Datetime, pl.Date). Required for time-based horizons.
    tolerance : str | None, default None
        Maximum time gap allowed for time-based horizons (e.g., '2m').
        Only used when horizon is a duration string.
    config : LabelingConfig | None, default None
        Optional column contract source. If provided, `price_col`, `timestamp_col`,
        and `group_col` default to config values when omitted.
    contract : DataContractConfig | None, default None
        Optional shared dataframe contract. Used after config and before defaults.

    Returns
    -------
    pl.DataFrame
        Original data with added columns:
        - forward_return_{horizon}: Forward returns
        - threshold_p{percentile}_h{horizon}: Rolling percentile threshold
        - label_{direction}_p{percentile}_h{horizon}: Binary label (0 or 1)

    Examples
    --------
    >>> # Bar-based: Top 5% of 30-bar returns
    >>> labels_long = rolling_percentile_binary_labels(
    ...     df,
    ...     horizon=30,
    ...     percentile=95,
    ...     direction="long",
    ...     session_col="session_date"
    ... )
    >>> print(labels_long["label_long_p95_h30"].mean())  # Should be ~0.05

    >>> # Time-based: 1-hour forward returns with 5-day lookback
    >>> labels = rolling_percentile_binary_labels(
    ...     df,
    ...     horizon="1h",
    ...     percentile=95,
    ...     direction="long",
    ...     lookback_window="5d",
    ... )

    >>> # Short labels: Bottom 5% of returns (5th percentile)
    >>> labels_short = rolling_percentile_binary_labels(
    ...     df,
    ...     horizon=30,
    ...     percentile=5,
    ...     direction="short",
    ...     session_col="session_date"
    ... )

    Notes
    -----
    - First lookback_window bars will have null labels (insufficient history)
    - Last horizon bars will have null forward returns (insufficient future data)
    - Class balance approximately matches percentile (p95 → ~5% positives)
    - Adaptive: Thresholds widen in high volatility, tighten in low volatility
    - No lookahead bias: Only uses past data for percentile computation

    **Time-based horizons**: When horizon is a duration string, uses join_asof
    to get future prices. This is useful for irregular data like trade bars.

    **Time-based lookback**: Polars rolling functions natively support duration
    strings for the window parameter, allowing time-based rolling windows.

    **Important**: Data is automatically sorted by timestamp before labeling.
    This is required because Polars .over() and .shift() preserve row order.
    The result is returned sorted chronologically.
    """
    # Determine if time-based
    is_time_based_horizon = isinstance(horizon, str) and is_duration_string(horizon)
    is_time_based_lookback = isinstance(lookback_window, str) and is_duration_string(
        lookback_window
    )

    resolved_price_col, resolved_ts_col, resolved_group_cols = resolve_labeling_columns(
        data=data,
        price_col=price_col,
        timestamp_col=timestamp_col,
        group_col=group_col,
        config=config,
        contract=contract,
        require_timestamp=is_time_based_horizon or is_time_based_lookback,
    )

    sort_cols = resolved_group_cols + ([resolved_ts_col] if resolved_ts_col else [])
    if sort_cols:
        data = data.sort(sort_cols)

    result = data.clone()

    # Create label suffix for column naming
    if is_time_based_horizon:
        horizon_label = horizon.lower().replace(" ", "")  # type: ignore[union-attr]
    else:
        horizon_label = str(horizon)

    # Step 1: Compute forward returns
    if is_time_based_horizon:
        # Time-based forward returns using join_asof
        td = parse_duration(horizon)  # type: ignore[arg-type]
        future_prices, valid_mask = get_future_price_at_time(
            data=data,
            time_horizon=td,
            price_col=resolved_price_col,
            timestamp_col=resolved_ts_col,
            tolerance=tolerance,
            group_cols=resolved_group_cols if resolved_group_cols else None,
        )
        current_prices = data[resolved_price_col]
        forward_returns = (future_prices - current_prices) / current_prices

        # Mask invalid joins if tolerance specified
        if tolerance is not None:
            forward_returns = pl.when(valid_mask).then(forward_returns).otherwise(pl.lit(None))
    elif session_col is not None:
        if session_col not in data.columns:
            raise ValueError(f"Session column '{session_col}' not found in data")

        session_groups = (
            [*resolved_group_cols, session_col] if resolved_group_cols else [session_col]
        )

        # Session-aware forward returns (don't cross session boundaries)
        forward_returns = result.with_columns(
            (pl.col(resolved_price_col).shift(-horizon) / pl.col(resolved_price_col) - 1)
            .over(session_groups)
            .alias("forward_return")
        )["forward_return"]
    else:
        # Simple bar-based forward returns
        forward_expr = pl.col(resolved_price_col).shift(-horizon) / pl.col(resolved_price_col) - 1
        if resolved_group_cols:
            forward_expr = forward_expr.over(resolved_group_cols)
        forward_returns = result.with_columns(forward_expr.alias("forward_return"))[
            "forward_return"
        ]

    forward_return_col = f"forward_return_{horizon_label}"
    result = result.with_columns(forward_returns.alias(forward_return_col))

    # Step 2: Compute rolling percentile threshold
    # Critical: shift forward returns by 1 so thresholds only use historical
    # realized outcomes, never the current row's own future return.
    historical_return_col = f"_historical_forward_return_{horizon_label}"
    historical_forward_expr = pl.col(forward_return_col).shift(1)
    if resolved_group_cols:
        historical_forward_expr = historical_forward_expr.over(resolved_group_cols)
    result = result.with_columns(historical_forward_expr.alias(historical_return_col))

    quantile = percentile / 100.0

    # Determine min_samples default
    if min_samples is None:
        if isinstance(lookback_window, int):
            min_samples = max(1, min(1008, lookback_window // 10))
        else:
            # For time-based lookback, use a reasonable default
            min_samples = 100

    # Compute rolling threshold
    # Polars rolling_quantile supports duration strings for window_size when
    # using rolling() context with index_column
    if is_time_based_lookback and resolved_ts_col:
        # Use Polars native time-based rolling via rolling() context
        rolling_result = result.rolling(
            index_column=resolved_ts_col,
            period=lookback_window,  # type: ignore[arg-type]
            group_by=resolved_group_cols if resolved_group_cols else None,
        ).agg(pl.col(historical_return_col).quantile(quantile).alias("_rolling_threshold"))

        join_keys = (
            [*resolved_group_cols, resolved_ts_col] if resolved_group_cols else [resolved_ts_col]
        )
        # Join back to get the threshold column
        rolling_threshold = result.join(
            rolling_result,
            on=join_keys,
            how="left",
        )["_rolling_threshold"]
    else:
        # Bar-based rolling (original implementation)
        if not isinstance(lookback_window, int):
            raise ValueError(
                f"lookback_window must be an integer for bar-based rolling, "
                f"got '{lookback_window}'. For time-based, ensure timestamp_col is set."
            )
        rolling_threshold_expr = pl.col(historical_return_col).rolling_quantile(
            window_size=lookback_window,
            quantile=quantile,
            min_samples=min_samples,
            center=False,
        )
        if resolved_group_cols:
            rolling_threshold_expr = rolling_threshold_expr.over(resolved_group_cols)
        rolling_threshold = result.with_columns(rolling_threshold_expr.alias("_rolling_threshold"))[
            "_rolling_threshold"
        ]

    threshold_col_name = f"threshold_p{int(percentile)}_h{horizon_label}"
    result = result.with_columns(rolling_threshold.alias(threshold_col_name))

    # Step 3: Create binary labels based on direction
    forward_ret_col = result[forward_return_col]
    threshold_col = result[threshold_col_name]

    if direction == "long":
        # Long: 1 if forward_return >= threshold (top percentile)
        label = (forward_ret_col >= threshold_col).cast(pl.Int8)
    elif direction == "short":
        # Short: 1 if forward_return <= threshold (bottom percentile)
        label = (forward_ret_col <= threshold_col).cast(pl.Int8)
    else:
        msg = f"Invalid direction: {direction}. Must be 'long' or 'short'."
        raise ValueError(msg)

    label_col_name = f"label_{direction}_p{int(percentile)}_h{horizon_label}"
    result = result.with_columns(label.alias(label_col_name))

    return result.drop(historical_return_col)

rolling_percentile_multi_labels

rolling_percentile_multi_labels(
    data,
    horizons,
    percentiles,
    direction="long",
    lookback_window=252 * 24 * 12,
    price_col=None,
    session_col=None,
    group_col=None,
    timestamp_col=None,
    tolerance=None,
    *,
    config=None,
    contract=None,
)

Create binary labels for multiple horizons and percentiles.

Convenience function to generate labels for multiple configurations in a single call.

Parameters

data : pl.DataFrame Input data with OHLCV and optionally session_date horizons : list[int | str] List of forward-looking horizons (e.g., [15, 30, "1h"]) percentiles : list[float] List of percentiles (e.g., [95, 98] for long, [5, 10] for short) direction : {"long", "short"}, default "long" Trading direction lookback_window : int | str, default ~1 year Rolling window size for percentile computation price_col : str | None, default None Price column session_col : str, optional Session column for session-aware returns group_col : str | list[str] | None, default None Column(s) to group by for panel-aware labeling. timestamp_col : str | None, default None Timestamp column for time-based horizons/lookbacks. tolerance : str | None, default None Maximum time gap for time-based horizons. config : LabelingConfig | None, default None Optional column contract source. contract : DataContractConfig | None, default None Optional shared dataframe contract. Used after config and before defaults.

Returns

pl.DataFrame Original data with label columns for all combinations: - label_{direction}_p{percentile}_h{horizon}

Examples

Generate labels for multiple horizons and percentiles

labels = rolling_percentile_multi_labels( ... df, ... horizons=[15, 30, 60], ... percentiles=[95, 98], ... direction="long", ... session_col="session_date" ... )

Creates 6 label columns: 3 horizons × 2 percentiles

print([c for c in labels.columns if c.startswith("label_")])

Source code in src/ml4t/engineer/labeling/percentile_labels.py
def rolling_percentile_multi_labels(
    data: pl.DataFrame,
    horizons: list[int | str],
    percentiles: list[float],
    direction: Literal["long", "short"] = "long",
    lookback_window: int | str = 252 * 24 * 12,
    price_col: str | None = None,
    session_col: str | None = None,
    group_col: str | list[str] | None = None,
    timestamp_col: str | None = None,
    tolerance: str | None = None,
    *,
    config: LabelingConfig | None = None,
    contract: DataContractConfig | None = None,
) -> pl.DataFrame:
    """Create binary labels for multiple horizons and percentiles.

    Convenience function to generate labels for multiple configurations
    in a single call.

    Parameters
    ----------
    data : pl.DataFrame
        Input data with OHLCV and optionally session_date
    horizons : list[int | str]
        List of forward-looking horizons (e.g., [15, 30, "1h"])
    percentiles : list[float]
        List of percentiles (e.g., [95, 98] for long, [5, 10] for short)
    direction : {"long", "short"}, default "long"
        Trading direction
    lookback_window : int | str, default ~1 year
        Rolling window size for percentile computation
    price_col : str | None, default None
        Price column
    session_col : str, optional
        Session column for session-aware returns
    group_col : str | list[str] | None, default None
        Column(s) to group by for panel-aware labeling.
    timestamp_col : str | None, default None
        Timestamp column for time-based horizons/lookbacks.
    tolerance : str | None, default None
        Maximum time gap for time-based horizons.
    config : LabelingConfig | None, default None
        Optional column contract source.
    contract : DataContractConfig | None, default None
        Optional shared dataframe contract. Used after config and before defaults.

    Returns
    -------
    pl.DataFrame
        Original data with label columns for all combinations:
        - label_{direction}_p{percentile}_h{horizon}

    Examples
    --------
    >>> # Generate labels for multiple horizons and percentiles
    >>> labels = rolling_percentile_multi_labels(
    ...     df,
    ...     horizons=[15, 30, 60],
    ...     percentiles=[95, 98],
    ...     direction="long",
    ...     session_col="session_date"
    ... )
    >>> # Creates 6 label columns: 3 horizons × 2 percentiles
    >>> print([c for c in labels.columns if c.startswith("label_")])
    """
    result = data.clone()

    for horizon in horizons:
        # First call for this horizon - will add forward_return column
        first_percentile = percentiles[0]
        result = rolling_percentile_binary_labels(
            result,
            horizon=horizon,
            percentile=first_percentile,
            direction=direction,
            lookback_window=lookback_window,
            price_col=price_col,
            session_col=session_col,
            group_col=group_col,
            timestamp_col=timestamp_col,
            tolerance=tolerance,
            config=config,
            contract=contract,
        )

        # Subsequent calls for same horizon - skip if forward_return already exists
        for percentile in percentiles[1:]:
            # Call with the result that already has forward_return for this horizon
            result = rolling_percentile_binary_labels(
                result,
                horizon=horizon,
                percentile=percentile,
                direction=direction,
                lookback_window=lookback_window,
                price_col=price_col,
                session_col=session_col,
                group_col=group_col,
                timestamp_col=timestamp_col,
                tolerance=tolerance,
                config=config,
                contract=contract,
            )

    return result

triple_barrier_labels

triple_barrier_labels(
    data,
    config,
    price_col=None,
    high_col=None,
    low_col=None,
    timestamp_col=None,
    group_col=None,
    calculate_uniqueness=False,
    uniqueness_weight_scheme="returns_uniqueness",
    contract=None,
)

Apply triple-barrier labeling to data.

Labels price movements based on which barrier (upper, lower, or time) is touched first. Optionally calculates label uniqueness and sample weights (De Prado's AFML Chapter 4).

Parameters

data : pl.DataFrame Input data with price information config : LabelingConfig Triple-barrier labeling configuration. price_col : str | None, default None Name of the price column high_col : str, optional Name of the high price column for OHLC barrier checking low_col : str, optional Name of the low price column for OHLC barrier checking timestamp_col : str, optional Name of the timestamp column (uses row index if None) group_col : str | list[str] | None, default None Grouping columns for panel labeling. If None, auto-detects common asset identifier columns (e.g., symbol, product, ticker). calculate_uniqueness : bool, default False If True, calculates label uniqueness scores and sample weights uniqueness_weight_scheme : str, default "returns_uniqueness" Weighting scheme: "returns_uniqueness", "uniqueness_only", "returns_only", "equal" contract : DataContractConfig | None, default None Optional shared dataframe contract. Used when explicit columns/config are omitted.

Returns

pl.DataFrame Original data with added columns: label, label_time, label_price, label_return, label_bars, label_duration, barrier_hit, and optionally label_uniqueness, sample_weight

Notes

Important: Data is automatically sorted by [group_col, timestamp] before labeling. This is required because the algorithm scans forward in row order to find barrier touches. The result is returned sorted chronologically.

Examples

from ml4t.engineer.config import LabelingConfig config = LabelingConfig.triple_barrier(upper_barrier=0.02, lower_barrier=0.01) labeled = triple_barrier_labels(df, config)

Source code in src/ml4t/engineer/labeling/triple_barrier.py
def triple_barrier_labels(
    data: pl.DataFrame,
    config: LabelingConfig,
    price_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    timestamp_col: str | None = None,
    group_col: str | list[str] | None = None,
    calculate_uniqueness: bool = False,
    uniqueness_weight_scheme: Literal[
        "returns_uniqueness", "uniqueness_only", "returns_only", "equal"
    ] = "returns_uniqueness",
    contract: DataContractConfig | None = None,
) -> pl.DataFrame:
    """Apply triple-barrier labeling to data.

    Labels price movements based on which barrier (upper, lower, or time) is touched first.
    Optionally calculates label uniqueness and sample weights (De Prado's AFML Chapter 4).

    Parameters
    ----------
    data : pl.DataFrame
        Input data with price information
    config : LabelingConfig
        Triple-barrier labeling configuration.
    price_col : str | None, default None
        Name of the price column
    high_col : str, optional
        Name of the high price column for OHLC barrier checking
    low_col : str, optional
        Name of the low price column for OHLC barrier checking
    timestamp_col : str, optional
        Name of the timestamp column (uses row index if None)
    group_col : str | list[str] | None, default None
        Grouping columns for panel labeling. If None, auto-detects common
        asset identifier columns (e.g., symbol, product, ticker).
    calculate_uniqueness : bool, default False
        If True, calculates label uniqueness scores and sample weights
    uniqueness_weight_scheme : str, default "returns_uniqueness"
        Weighting scheme: "returns_uniqueness", "uniqueness_only", "returns_only", "equal"
    contract : DataContractConfig | None, default None
        Optional shared dataframe contract. Used when explicit columns/config are omitted.

    Returns
    -------
    pl.DataFrame
        Original data with added columns: label, label_time, label_price, label_return,
        label_bars, label_duration, barrier_hit, and optionally label_uniqueness, sample_weight

    Notes
    -----
    **Important**: Data is automatically sorted by [group_col, timestamp] before labeling.
    This is required because the algorithm scans forward in row order to find
    barrier touches. The result is returned sorted chronologically.

    Examples
    --------
    >>> from ml4t.engineer.config import LabelingConfig
    >>> config = LabelingConfig.triple_barrier(upper_barrier=0.02, lower_barrier=0.01)
    >>> labeled = triple_barrier_labels(df, config)
    """
    from ml4t.engineer.config import LabelingConfig as _LabelingConfig

    if not isinstance(config, _LabelingConfig):
        raise TypeError(
            "triple_barrier_labels expects LabelingConfig. "
            "Legacy BarrierConfig inputs are no longer supported; "
            "use LabelingConfig.triple_barrier(...)."
        )

    resolved_price_col, resolved_ts_col, group_cols = resolve_labeling_columns(
        data=data,
        price_col=price_col,
        timestamp_col=timestamp_col,
        group_col=group_col,
        config=config,
        contract=contract,
    )
    if config.method != "triple_barrier":
        raise DataValidationError(
            "triple_barrier_labels requires LabelingConfig.method='triple_barrier'."
        )

    validate_price_no_nans(data, resolved_price_col)

    if group_cols:
        sort_cols = group_cols + ([resolved_ts_col] if resolved_ts_col else [])
        sorted_data = data.sort(sort_cols)
        grouped_frames = sorted_data.partition_by(group_cols, maintain_order=True)

        grouped_results = [
            _triple_barrier_labels_single_group(
                data=group_df,
                config=config,
                price_col=resolved_price_col,
                high_col=high_col,
                low_col=low_col,
                timestamp_col=resolved_ts_col,
                calculate_uniqueness=calculate_uniqueness,
                uniqueness_weight_scheme=uniqueness_weight_scheme,
            )
            for group_df in grouped_frames
        ]
        return pl.concat(grouped_results, how="vertical")

    return _triple_barrier_labels_single_group(
        data=data,
        config=config,
        price_col=resolved_price_col,
        high_col=high_col,
        low_col=low_col,
        timestamp_col=resolved_ts_col,
        calculate_uniqueness=calculate_uniqueness,
        uniqueness_weight_scheme=uniqueness_weight_scheme,
    )

build_concurrency

build_concurrency(
    event_indices, label_indices, n_bars=None
)

Calculate per-bar concurrency (how many labels are active at each time).

This function computes c[t] = number of labels active at time t using an efficient O(n) difference-array algorithm.

Parameters

event_indices : array Start indices of labels (when positions were entered) label_indices : array End indices of labels (when barriers were hit) n_bars : int, optional Total number of bars. If None, uses max(label_indices) + 1

Returns

array Concurrency at each timestamp (length = n_bars)

Notes

Concurrency is used to calculate label uniqueness. High concurrency at time t means many labels overlap there, indicating redundancy.

References

.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley. Chapter 4: Sample Weights.

Examples

concurrency = build_concurrency(event_indices, label_indices, len(prices))

concurrency[t] = number of active labels at time t

max_overlap = concurrency.max() # Maximum label overlap

Source code in src/ml4t/engineer/labeling/uniqueness.py
def build_concurrency(
    event_indices: npt.NDArray[np.float64],
    label_indices: npt.NDArray[np.float64],
    n_bars: int | None = None,
) -> npt.NDArray[np.float64]:
    """
    Calculate per-bar concurrency (how many labels are active at each time).

    This function computes c[t] = number of labels active at time t using
    an efficient O(n) difference-array algorithm.

    Parameters
    ----------
    event_indices : array
        Start indices of labels (when positions were entered)
    label_indices : array
        End indices of labels (when barriers were hit)
    n_bars : int, optional
        Total number of bars. If None, uses max(label_indices) + 1

    Returns
    -------
    array
        Concurrency at each timestamp (length = n_bars)

    Notes
    -----
    Concurrency is used to calculate label uniqueness. High concurrency
    at time t means many labels overlap there, indicating redundancy.

    References
    ----------
    .. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley.
           Chapter 4: Sample Weights.

    Examples
    --------
    >>> concurrency = build_concurrency(event_indices, label_indices, len(prices))
    >>> # concurrency[t] = number of active labels at time t
    >>> max_overlap = concurrency.max()  # Maximum label overlap
    """
    if n_bars is None:
        n_bars = int(np.max(label_indices)) + 1

    return _build_concurrency_nb(n_bars, event_indices, label_indices)

calculate_label_uniqueness

calculate_label_uniqueness(
    event_indices, label_indices, n_bars=None
)

Calculate average uniqueness for each label based on overlapping periods.

Uniqueness measures how "independent" a label is from others. Labels that overlap with many others have low uniqueness (redundant information), while labels that are relatively isolated have high uniqueness.

Parameters

event_indices : array Start indices of labels (when positions were entered) label_indices : array End indices of labels (when barriers were hit) n_bars : int, optional Total number of bars. If None, uses max(label_indices) + 1

Returns

array Average uniqueness score for each label (between 0 and 1)

Notes

From López de Prado's AFML: u_i = (1/T_i) * Σ(1/c_t) for t in [start_i, end_i]

Where: - T_i is the length of label i's active period - c_t is the concurrency at time t (number of active labels) - Higher uniqueness means more independent information

References

.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley. Chapter 4: Sample Weights.

Source code in src/ml4t/engineer/labeling/uniqueness.py
def calculate_label_uniqueness(
    event_indices: npt.NDArray[np.intp],
    label_indices: npt.NDArray[np.intp],
    n_bars: int | None = None,
) -> npt.NDArray[np.float64]:
    """
    Calculate average uniqueness for each label based on overlapping periods.

    Uniqueness measures how "independent" a label is from others. Labels that
    overlap with many others have low uniqueness (redundant information), while
    labels that are relatively isolated have high uniqueness.

    Parameters
    ----------
    event_indices : array
        Start indices of labels (when positions were entered)
    label_indices : array
        End indices of labels (when barriers were hit)
    n_bars : int, optional
        Total number of bars. If None, uses max(label_indices) + 1

    Returns
    -------
    array
        Average uniqueness score for each label (between 0 and 1)

    Notes
    -----
    From López de Prado's AFML:
    u_i = (1/T_i) * Σ(1/c_t) for t in [start_i, end_i]

    Where:
    - T_i is the length of label i's active period
    - c_t is the concurrency at time t (number of active labels)
    - Higher uniqueness means more independent information

    References
    ----------
    .. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley.
           Chapter 4: Sample Weights.
    """
    # Input validation
    if len(event_indices) != len(label_indices):
        raise ValueError(
            f"event_indices and label_indices must have same length, "
            f"got {len(event_indices)} and {len(label_indices)}"
        )

    if len(event_indices) == 0:
        return np.array([])

    if np.any(event_indices < 0) or np.any(label_indices < 0):
        raise ValueError("Indices must be non-negative")

    if n_bars is None:
        n_bars = int(np.max(label_indices)) + 1

    # Build concurrency array
    concurrency = _build_concurrency(
        n_bars,
        event_indices.astype(np.int64),
        label_indices.astype(np.int64),
    )

    # Calculate uniqueness for each label
    n_labels = len(event_indices)
    uniqueness = np.zeros(n_labels, dtype=np.float64)

    for i in range(n_labels):
        start = int(event_indices[i])
        end = int(label_indices[i])

        if start < n_bars and start <= end:
            # Ensure we don't go out of bounds
            start = max(0, start)
            end = min(end, n_bars - 1)

            # Average of 1/c_t over the label's active period
            c_slice = concurrency[start : end + 1]
            # Avoid division by zero (though concurrency should always be >= 1)
            uniqueness[i] = np.mean(1.0 / np.maximum(c_slice, 1.0))
        else:
            uniqueness[i] = 1.0  # Default for invalid ranges

    return uniqueness

calculate_sample_weights

calculate_sample_weights(
    uniqueness, returns, weight_scheme="returns_uniqueness"
)

Calculate sample weights combining statistical uniqueness and economic significance.

Parameters

uniqueness : array Average uniqueness scores from calculate_label_uniqueness returns : array Label returns (from entry to exit) weight_scheme : str Weighting scheme to use: - "returns_uniqueness": u_i * |r_i| (De Prado's recommendation) - "uniqueness_only": u_i only (statistical correction) - "returns_only": |r_i| only (economic significance) - "equal": uniform weights

Returns

array Sample weights for training (normalized to sum to len(weights))

Notes

De Prado recommends "returns_uniqueness" to balance: - Statistical independence (uniqueness) - Economic importance (return magnitude)

This prevents overweighting "boring" full-horizon labels while preserving the importance of profitable trades.

References

.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley. Chapter 4: Sample Weights.

Source code in src/ml4t/engineer/labeling/uniqueness.py
def calculate_sample_weights(
    uniqueness: npt.NDArray[np.float64],
    returns: npt.NDArray[np.float64],
    weight_scheme: Literal[
        "returns_uniqueness", "uniqueness_only", "returns_only", "equal"
    ] = "returns_uniqueness",
) -> npt.NDArray[np.float64]:
    """
    Calculate sample weights combining statistical uniqueness and economic significance.

    Parameters
    ----------
    uniqueness : array
        Average uniqueness scores from calculate_label_uniqueness
    returns : array
        Label returns (from entry to exit)
    weight_scheme : str
        Weighting scheme to use:
        - "returns_uniqueness": u_i * |r_i| (De Prado's recommendation)
        - "uniqueness_only": u_i only (statistical correction)
        - "returns_only": |r_i| only (economic significance)
        - "equal": uniform weights

    Returns
    -------
    array
        Sample weights for training (normalized to sum to len(weights))

    Notes
    -----
    De Prado recommends "returns_uniqueness" to balance:
    - Statistical independence (uniqueness)
    - Economic importance (return magnitude)

    This prevents overweighting "boring" full-horizon labels while
    preserving the importance of profitable trades.

    References
    ----------
    .. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley.
           Chapter 4: Sample Weights.
    """
    # Input validation
    if len(uniqueness) != len(returns):
        raise ValueError(
            f"uniqueness and returns must have same length, "
            f"got {len(uniqueness)} and {len(returns)}"
        )

    if len(uniqueness) == 0:
        return np.array([])

    if weight_scheme == "returns_uniqueness":
        # De Prado's formula: combine uniqueness with economic significance
        weights = uniqueness * np.abs(returns)
    elif weight_scheme == "uniqueness_only":
        weights = uniqueness
    elif weight_scheme == "returns_only":
        weights = np.abs(returns)
    else:  # "equal"
        weights = np.ones_like(uniqueness)

    # Normalize weights to sum to len(weights) for compatibility with ML libraries
    total = np.sum(weights)
    weights = weights * len(weights) / total if total > 0 else np.ones_like(uniqueness)

    return weights

sequential_bootstrap

sequential_bootstrap(
    starts,
    ends,
    n_bars=None,
    n_draws=None,
    with_replacement=True,
    random_state=None,
)

Sequential bootstrap that favors events with high marginal uniqueness.

This method creates a bootstrapped sample that minimizes redundancy by probabilistically selecting labels based on how unique they would be given the already-selected labels.

Parameters

starts : array Start indices of labels (event_indices) ends : array End indices of labels (label_indices) n_bars : int, optional Total number of bars. If None, uses max(ends) + 1 n_draws : int, optional Number of selections to make. Defaults to len(starts) with_replacement : bool, default True If False, each event can be selected at most once random_state : int or Generator, optional RNG seed or Generator for reproducibility

Returns

array Indices of selected events in the order drawn (length = n_draws)

Notes

From López de Prado's AFML Chapter 4: - At each step, pick the event that maximizes expected average uniqueness - Probability of selection is proportional to marginal uniqueness - Creates less redundant training sets compared to random sampling

References

.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley. Chapter 4: Sample Weights.

Examples

After triple barrier labeling

order = sequential_bootstrap(event_indices, label_indices, len(prices))

Use order to select training samples

X_train = X[order] y_train = y[order] weights_train = sample_weights[order]

Source code in src/ml4t/engineer/labeling/uniqueness.py
def sequential_bootstrap(
    starts: npt.NDArray[np.int64],
    ends: npt.NDArray[np.int64],
    n_bars: int | None = None,
    n_draws: int | None = None,
    with_replacement: bool = True,
    random_state: int | Generator | None = None,
) -> npt.NDArray[np.int64]:
    """
    Sequential bootstrap that favors events with high marginal uniqueness.

    This method creates a bootstrapped sample that minimizes redundancy by
    probabilistically selecting labels based on how unique they would be
    given the already-selected labels.

    Parameters
    ----------
    starts : array
        Start indices of labels (event_indices)
    ends : array
        End indices of labels (label_indices)
    n_bars : int, optional
        Total number of bars. If None, uses max(ends) + 1
    n_draws : int, optional
        Number of selections to make. Defaults to len(starts)
    with_replacement : bool, default True
        If False, each event can be selected at most once
    random_state : int or Generator, optional
        RNG seed or Generator for reproducibility

    Returns
    -------
    array
        Indices of selected events in the order drawn (length = n_draws)

    Notes
    -----
    From López de Prado's AFML Chapter 4:
    - At each step, pick the event that maximizes expected average uniqueness
    - Probability of selection is proportional to marginal uniqueness
    - Creates less redundant training sets compared to random sampling

    References
    ----------
    .. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. Wiley.
           Chapter 4: Sample Weights.

    Examples
    --------
    >>> # After triple barrier labeling
    >>> order = sequential_bootstrap(event_indices, label_indices, len(prices))
    >>> # Use order to select training samples
    >>> X_train = X[order]
    >>> y_train = y[order]
    >>> weights_train = sample_weights[order]
    """
    # Input validation
    if len(starts) != len(ends):
        raise ValueError(
            f"starts and ends must have same length, got {len(starts)} and {len(ends)}"
        )

    if len(starts) == 0:
        return np.array([], dtype=np.int64)

    if np.any(starts < 0) or np.any(ends < 0):
        raise ValueError("Indices must be non-negative")

    m = len(starts)
    if n_bars is None:
        n_bars = int(np.max(ends)) + 1
    if n_draws is None:
        n_draws = m

    if n_draws <= 0:
        raise ValueError(f"n_draws must be positive, got {n_draws}")

    rng: Generator = (
        default_rng(random_state) if not isinstance(random_state, Generator) else random_state
    )

    # Start with empty concurrency
    concurrency = np.zeros(n_bars, dtype=np.int64)
    available = np.ones(m, dtype=bool)  # track availability if sampling w/o replacement
    order = np.empty(n_draws, dtype=np.int64)

    for k in range(n_draws):
        # Compute marginal expected uniqueness for all available candidates
        u = np.zeros(m, dtype=np.float64)
        for j in range(m):
            if with_replacement or available[j]:
                u[j] = _expected_uniqueness_for_candidate(starts, ends, concurrency, j)
            else:
                u[j] = 0.0

        total = float(u.sum())
        if total <= 0.0:
            # Fallback to uniform over available items
            probs = np.where(available | with_replacement, 1.0, 0.0)
            prob_sum = probs.sum()
            if prob_sum == 0:
                # No valid candidates remaining
                raise ValueError(
                    f"Cannot draw {n_draws} samples without replacement from {m} candidates. "
                    f"Either reduce n_draws or set with_replacement=True."
                )
            probs = probs / prob_sum
        else:
            probs = u / total

        # Draw next index
        j = int(rng.choice(m, p=probs))
        order[k] = j

        # Update concurrency with the chosen interval
        s, e = int(starts[j]), int(ends[j])
        if s <= e and s < n_bars:
            s_clamped = max(0, min(s, n_bars - 1))
            e_clamped = max(0, min(e, n_bars - 1))
            concurrency[s_clamped : e_clamped + 1] += 1

        if not with_replacement:
            available[j] = False

    return order

register_labeling_features

register_labeling_features(registry=None)

Register labeling features.

Parameters

registry : FeatureRegistry, optional Registry to register features with. If None, uses global registry.

Returns

int Number of features registered

Source code in src/ml4t/engineer/labeling/__init__.py
def register_labeling_features(registry: object = None) -> int:
    """
    Register labeling features.

    Parameters
    ----------
    registry : FeatureRegistry, optional
        Registry to register features with. If None, uses global registry.

    Returns
    -------
    int
        Number of features registered
    """
    from contextlib import suppress

    if registry is None:
        from ml4t.engineer.core.registry import get_registry

        registry = get_registry()

    # Register all labeling features
    for feature in ALL_LABELING_FEATURES:
        with suppress(ValueError):
            # Already registered errors are expected and can be ignored
            registry.register(feature)  # type: ignore[attr-defined]

    return len(ALL_LABELING_FEATURES)

Dataset Builder

MLDatasetBuilder dataclass

MLDatasetBuilder(
    features,
    labels,
    dates=None,
    _scaler=None,
    _feature_columns=list(),
)

Build train/test datasets with proper leakage prevention.

This class provides a unified interface for: 1. Managing features and labels 2. Applying train-only preprocessing 3. Integrating with cross-validation splitters 4. Converting to sklearn-compatible formats

Parameters

features : pl.DataFrame Feature matrix with named columns. labels : pl.Series | pl.DataFrame Target variable(s). If DataFrame, first column is used. dates : pl.Series | None, optional Date/time index for time-series ordering.

Attributes

features : pl.DataFrame Feature matrix. labels : pl.Series Target labels. dates : pl.Series | None Date/time index. scaler : BaseScaler | None Scaler to apply to features.

Examples

import polars as pl from ml4t.engineer.dataset import MLDatasetBuilder from ml4t.engineer.preprocessing import StandardScaler

Create synthetic data

features = pl.DataFrame({ ... "momentum": [0.1, 0.2, 0.15, 0.3, 0.25, 0.4, 0.35, 0.5], ... "volatility": [0.01, 0.02, 0.015, 0.025, 0.02, 0.03, 0.028, 0.035], ... }) labels = pl.Series("target", [0, 1, 0, 1, 0, 1, 1, 1])

Build dataset with scaling

builder = MLDatasetBuilder(features, labels) builder.set_scaler(StandardScaler())

Manual train/test split

X_train, X_test, y_train, y_test = builder.train_test_split( ... train_size=0.75 ... )

Notes

The key design principle is that ALL statistics (mean, std, quantiles, etc.) are computed from training data ONLY. This prevents information leakage from future data into predictions.

info property

info

Get dataset information.

Returns

DatasetInfo Summary of dataset properties.

set_scaler

set_scaler(scaler)

Set the scaler for preprocessing.

Parameters

scaler : BaseScaler | PreprocessingConfig | None Scaler to use. Accepts: - BaseScaler instance (StandardScaler, MinMaxScaler, RobustScaler) - PreprocessingConfig (Pydantic config, calls create_scaler()) - None to disable scaling

Returns

self Returns self for method chaining.

Examples

builder.set_scaler(StandardScaler()) builder.set_scaler(MinMaxScaler(feature_range=(-1, 1))) builder.set_scaler(None) # Disable scaling

Using PreprocessingConfig for reproducibility

from ml4t.engineer.config import PreprocessingConfig builder.set_scaler(PreprocessingConfig.robust())

Source code in src/ml4t/engineer/dataset.py
def set_scaler(self, scaler: BaseScaler | PreprocessingConfig | None) -> MLDatasetBuilder:
    """Set the scaler for preprocessing.

    Parameters
    ----------
    scaler : BaseScaler | PreprocessingConfig | None
        Scaler to use. Accepts:
        - BaseScaler instance (StandardScaler, MinMaxScaler, RobustScaler)
        - PreprocessingConfig (Pydantic config, calls create_scaler())
        - None to disable scaling

    Returns
    -------
    self
        Returns self for method chaining.

    Examples
    --------
    >>> builder.set_scaler(StandardScaler())
    >>> builder.set_scaler(MinMaxScaler(feature_range=(-1, 1)))
    >>> builder.set_scaler(None)  # Disable scaling
    >>>
    >>> # Using PreprocessingConfig for reproducibility
    >>> from ml4t.engineer.config import PreprocessingConfig
    >>> builder.set_scaler(PreprocessingConfig.robust())
    """
    # Handle PreprocessingConfig by creating the scaler
    if hasattr(scaler, "create_scaler"):
        self._scaler = scaler.create_scaler()  # type: ignore[union-attr]
    else:
        self._scaler = scaler
    return self

split

split(cv, groups=None)

Generate train/test splits with proper preprocessing.

Parameters

cv : SplitterProtocol Cross-validation splitter (from ml4t.diagnostic.splitters or sklearn). groups : pl.Series | None, optional Group labels for group-based splitting.

Yields

FoldResult Result object containing preprocessed train/test data.

Examples

from ml4t.diagnostic.splitters import PurgedWalkForwardCV

cv = PurgedWalkForwardCV(n_splits=5, embargo_pct=0.01) for fold in builder.split(cv): ... model.fit(fold.X_train, fold.y_train) ... preds = model.predict(fold.X_test) ... print(f"Fold {fold.fold_number}: {len(fold.train_indices)} train, " ... f"{len(fold.test_indices)} test")

Notes

For each fold: 1. Training indices are extracted from the splitter 2. Scaler (if any) is fit on training data ONLY 3. Both train and test features are transformed using train statistics 4. Labels are sliced without transformation

Source code in src/ml4t/engineer/dataset.py
def split(
    self,
    cv: SplitterProtocol,
    groups: pl.Series | None = None,
) -> Iterator[FoldResult]:
    """Generate train/test splits with proper preprocessing.

    Parameters
    ----------
    cv : SplitterProtocol
        Cross-validation splitter (from ml4t.diagnostic.splitters or sklearn).
    groups : pl.Series | None, optional
        Group labels for group-based splitting.

    Yields
    ------
    FoldResult
        Result object containing preprocessed train/test data.

    Examples
    --------
    >>> from ml4t.diagnostic.splitters import PurgedWalkForwardCV
    >>>
    >>> cv = PurgedWalkForwardCV(n_splits=5, embargo_pct=0.01)
    >>> for fold in builder.split(cv):
    ...     model.fit(fold.X_train, fold.y_train)
    ...     preds = model.predict(fold.X_test)
    ...     print(f"Fold {fold.fold_number}: {len(fold.train_indices)} train, "
    ...           f"{len(fold.test_indices)} test")

    Notes
    -----
    For each fold:
    1. Training indices are extracted from the splitter
    2. Scaler (if any) is fit on training data ONLY
    3. Both train and test features are transformed using train statistics
    4. Labels are sliced without transformation
    """
    # Prepare data for splitter (numpy arrays for compatibility)
    X_for_split = self.features
    y_for_split = self.labels

    # Handle groups
    groups_arr = groups.to_numpy() if groups is not None else None

    # Iterate through folds
    for fold_idx, (train_idx, test_idx) in enumerate(
        cv.split(X_for_split, y_for_split, groups_arr)
    ):
        # Extract train/test features and labels
        X_train_raw = self.features[train_idx]
        X_test_raw = self.features[test_idx]
        y_train = self.labels.gather(train_idx)
        y_test = self.labels.gather(test_idx)

        # Apply preprocessing (train-only fit)
        if self._scaler is not None:
            fold_scaler = self._scaler.clone()

            # Fit on train, transform both
            X_train = fold_scaler.fit_transform(X_train_raw)
            X_test = fold_scaler.transform(X_test_raw)
        else:
            fold_scaler = None
            X_train = X_train_raw
            X_test = X_test_raw

        yield FoldResult(
            X_train=X_train,
            X_test=X_test,
            y_train=y_train,
            y_test=y_test,
            train_indices=train_idx,
            test_indices=test_idx,
            fold_number=fold_idx,
            scaler=fold_scaler,
        )

train_test_split

train_test_split(
    train_size=0.8, shuffle=False, random_state=None
)

Simple train/test split with preprocessing.

Parameters

train_size : float, default 0.8 Proportion of data for training (0.0 to 1.0). shuffle : bool, default False Whether to shuffle before splitting. For time-series, keep False. random_state : int | None, optional Random seed for reproducibility when shuffling.

Returns

tuple[pl.DataFrame, pl.DataFrame, pl.Series, pl.Series] (X_train, X_test, y_train, y_test) with preprocessing applied.

Examples

X_train, X_test, y_train, y_test = builder.train_test_split( ... train_size=0.7 ... )

Notes

For time-series data, set shuffle=False to preserve temporal ordering. The split point is based on row position, not dates.

Source code in src/ml4t/engineer/dataset.py
def train_test_split(
    self,
    train_size: float = 0.8,
    shuffle: bool = False,
    random_state: int | None = None,
) -> tuple[pl.DataFrame, pl.DataFrame, pl.Series, pl.Series]:
    """Simple train/test split with preprocessing.

    Parameters
    ----------
    train_size : float, default 0.8
        Proportion of data for training (0.0 to 1.0).
    shuffle : bool, default False
        Whether to shuffle before splitting. For time-series, keep False.
    random_state : int | None, optional
        Random seed for reproducibility when shuffling.

    Returns
    -------
    tuple[pl.DataFrame, pl.DataFrame, pl.Series, pl.Series]
        (X_train, X_test, y_train, y_test) with preprocessing applied.

    Examples
    --------
    >>> X_train, X_test, y_train, y_test = builder.train_test_split(
    ...     train_size=0.7
    ... )

    Notes
    -----
    For time-series data, set shuffle=False to preserve temporal ordering.
    The split point is based on row position, not dates.
    """
    n_samples = len(self.features)
    n_train = int(n_samples * train_size)

    if shuffle:
        rng = np.random.default_rng(random_state)
        indices = rng.permutation(n_samples)
        train_idx = indices[:n_train]
        test_idx = indices[n_train:]
    else:
        train_idx = np.arange(n_train)
        test_idx = np.arange(n_train, n_samples)

    # Extract train/test
    X_train_raw = self.features[train_idx]
    X_test_raw = self.features[test_idx]
    y_train = self.labels.gather(train_idx)
    y_test = self.labels.gather(test_idx)

    # Apply preprocessing
    if self._scaler is not None:
        X_train = self._scaler.fit_transform(X_train_raw)
        X_test = self._scaler.transform(X_test_raw)
    else:
        X_train = X_train_raw
        X_test = X_test_raw

    return X_train, X_test, y_train, y_test

to_numpy

to_numpy()

Convert full dataset to numpy arrays.

Returns

tuple[NDArray, NDArray] (features, labels) as numpy arrays.

Notes

This does NOT apply scaling. Use for raw data access only. For sklearn compatibility with scaling, use split() or train_test_split().

Source code in src/ml4t/engineer/dataset.py
def to_numpy(self) -> tuple[NDArray[Any], NDArray[Any]]:
    """Convert full dataset to numpy arrays.

    Returns
    -------
    tuple[NDArray, NDArray]
        (features, labels) as numpy arrays.

    Notes
    -----
    This does NOT apply scaling. Use for raw data access only.
    For sklearn compatibility with scaling, use split() or train_test_split().
    """
    return self.features.to_numpy(), self.labels.to_numpy()

to_pandas

to_pandas()

Convert full dataset to pandas.

Returns

tuple[pd.DataFrame, pd.Series] (features, labels) as pandas objects.

Notes

This does NOT apply scaling. Use for raw data access only.

Source code in src/ml4t/engineer/dataset.py
def to_pandas(self) -> tuple[pd.DataFrame, pd.Series]:
    """Convert full dataset to pandas.

    Returns
    -------
    tuple[pd.DataFrame, pd.Series]
        (features, labels) as pandas objects.

    Notes
    -----
    This does NOT apply scaling. Use for raw data access only.
    """
    return self.features.to_pandas(), self.labels.to_pandas()

create_dataset_builder

create_dataset_builder(
    features, labels, dates=None, scaler="standard"
)

Convenience function to create MLDatasetBuilder with common defaults.

Parameters

features : pl.DataFrame Feature matrix. labels : pl.Series | pl.DataFrame Target variable. dates : pl.Series | None, optional Date/time index. scaler : BaseScaler | PreprocessingConfig | str | None, default "standard" Scaler to use. Options: - "standard": StandardScaler (z-score) - "minmax": MinMaxScaler ([0, 1]) - "robust": RobustScaler (median/IQR) - BaseScaler instance: Use provided scaler - PreprocessingConfig: Use config to create scaler - None: No scaling

Returns

MLDatasetBuilder Configured dataset builder.

Examples

builder = create_dataset_builder(features, labels, scaler="robust")

Using PreprocessingConfig for reproducibility

from ml4t.engineer.config import PreprocessingConfig config = PreprocessingConfig.robust(quantile_range=(10.0, 90.0)) builder = create_dataset_builder(features, labels, scaler=config)

Source code in src/ml4t/engineer/dataset.py
def create_dataset_builder(
    features: pl.DataFrame,
    labels: pl.Series | pl.DataFrame,
    dates: pl.Series | None = None,
    scaler: BaseScaler | PreprocessingConfig | str | None = "standard",
) -> MLDatasetBuilder:
    """Convenience function to create MLDatasetBuilder with common defaults.

    Parameters
    ----------
    features : pl.DataFrame
        Feature matrix.
    labels : pl.Series | pl.DataFrame
        Target variable.
    dates : pl.Series | None, optional
        Date/time index.
    scaler : BaseScaler | PreprocessingConfig | str | None, default "standard"
        Scaler to use. Options:
        - "standard": StandardScaler (z-score)
        - "minmax": MinMaxScaler ([0, 1])
        - "robust": RobustScaler (median/IQR)
        - BaseScaler instance: Use provided scaler
        - PreprocessingConfig: Use config to create scaler
        - None: No scaling

    Returns
    -------
    MLDatasetBuilder
        Configured dataset builder.

    Examples
    --------
    >>> builder = create_dataset_builder(features, labels, scaler="robust")
    >>>
    >>> # Using PreprocessingConfig for reproducibility
    >>> from ml4t.engineer.config import PreprocessingConfig
    >>> config = PreprocessingConfig.robust(quantile_range=(10.0, 90.0))
    >>> builder = create_dataset_builder(features, labels, scaler=config)
    """
    from ml4t.engineer.preprocessing import MinMaxScaler, RobustScaler

    # Convert DataFrame labels to Series if needed (MLDatasetBuilder.__post_init__ also handles this)
    labels_series = labels.to_series(0) if isinstance(labels, pl.DataFrame) else labels
    builder = MLDatasetBuilder(features=features, labels=labels_series, dates=dates)

    if scaler is None:
        pass
    elif hasattr(scaler, "create_scaler"):
        # PreprocessingConfig - use set_scaler which handles the conversion
        builder.set_scaler(scaler)  # type: ignore[arg-type]
    elif isinstance(scaler, str):
        scaler_map = {
            "standard": StandardScaler,
            "minmax": MinMaxScaler,
            "robust": RobustScaler,
        }
        if scaler.lower() not in scaler_map:
            raise ValueError(
                f"Unknown scaler: {scaler}. Options: {list(scaler_map.keys())} or None"
            )
        builder.set_scaler(scaler_map[scaler.lower()]())
    elif isinstance(scaler, BaseScaler):
        builder.set_scaler(scaler)
    else:
        raise TypeError(
            f"scaler must be str, BaseScaler, PreprocessingConfig, or None. Got {type(scaler)}"
        )

    return builder

FoldResult dataclass

FoldResult(
    X_train,
    X_test,
    y_train,
    y_test,
    train_indices,
    test_indices,
    fold_number,
    scaler=None,
)

Result from a single cross-validation fold.

Attributes

X_train : pl.DataFrame Preprocessed training features. X_test : pl.DataFrame Preprocessed test features (using train statistics). y_train : pl.Series Training labels. y_test : pl.Series Test labels. train_indices : NDArray[np.intp] Original indices of training samples. test_indices : NDArray[np.intp] Original indices of test samples. fold_number : int Zero-indexed fold number. scaler : BaseScaler | None Fitted scaler used for this fold (None if no scaling).

to_numpy

to_numpy()

Convert to numpy arrays for sklearn compatibility.

Returns

tuple[NDArray, NDArray, NDArray, NDArray] (X_train, X_test, y_train, y_test) as numpy arrays.

Source code in src/ml4t/engineer/dataset.py
def to_numpy(
    self,
) -> tuple[NDArray[Any], NDArray[Any], NDArray[Any], NDArray[Any]]:
    """Convert to numpy arrays for sklearn compatibility.

    Returns
    -------
    tuple[NDArray, NDArray, NDArray, NDArray]
        (X_train, X_test, y_train, y_test) as numpy arrays.
    """
    return (
        self.X_train.to_numpy(),
        self.X_test.to_numpy(),
        self.y_train.to_numpy(),
        self.y_test.to_numpy(),
    )

DatasetInfo dataclass

DatasetInfo(
    n_samples,
    n_features,
    feature_names,
    label_name,
    has_dates,
)

Information about the dataset.

Attributes

n_samples : int Number of samples. n_features : int Number of features. feature_names : list[str] Feature column names. label_name : str Label column name. has_dates : bool Whether dates are provided.

Preprocessing

preprocessing

Preprocessing utilities for feature standardization with train-only fitting.

This module provides sklearn-like preprocessing transformers that maintain strict separation between training and test data statistics, preventing lookahead bias in ML pipelines.

Exports

StandardScaler - Z-score normalization (mean=0, std=1) MinMaxScaler - Scale to [0, 1] range RobustScaler - IQR-based scaling (outlier resistant) PreprocessingPipeline - Chain multiple transformers

ScalerMethod - Enum: STANDARD, MINMAX, ROBUST TransformType - Enum: SCALE, CLIP, WINSORIZE

Key Concepts: - Fit on training data only, transform both train and test - Polars-native implementation for performance - Immutable after fit (statistics locked) - Serializable for production deployment

Example

from ml4t.engineer.preprocessing import StandardScaler scaler = StandardScaler() train_scaled = scaler.fit_transform(train_df) test_scaled = scaler.transform(test_df) # Uses train statistics

StandardScaler

StandardScaler(
    columns=None, with_mean=True, with_std=True, ddof=1
)

Bases: BaseScaler

Z-score normalization: (x - mean) / std.

Transforms features to have mean=0 and std=1 using training data statistics.

Parameters

columns : list[str] | None Columns to scale. If None, all numeric columns are scaled. with_mean : bool, default True Center data by subtracting mean. with_std : bool, default True Scale data by dividing by std. ddof : int, default 1 Delta degrees of freedom for std calculation.

Examples

scaler = StandardScaler() train_scaled = scaler.fit_transform(train_df) test_scaled = scaler.transform(test_df) # Uses train mean/std

Source code in src/ml4t/engineer/preprocessing.py
def __init__(
    self,
    columns: list[str] | None = None,
    with_mean: bool = True,
    with_std: bool = True,
    ddof: int = 1,
) -> None:
    super().__init__(columns)
    self.with_mean = with_mean
    self.with_std = with_std
    self.ddof = ddof

MinMaxScaler

MinMaxScaler(columns=None, feature_range=(0.0, 1.0))

Bases: BaseScaler

Scale features to [0, 1] range using min/max from training data.

Parameters

columns : list[str] | None Columns to scale. If None, all numeric columns are scaled. feature_range : tuple[float, float], default (0.0, 1.0) Desired range of transformed data.

Examples

scaler = MinMaxScaler() train_scaled = scaler.fit_transform(train_df) # [0, 1] range test_scaled = scaler.transform(test_df) # May exceed [0, 1]

Source code in src/ml4t/engineer/preprocessing.py
def __init__(
    self,
    columns: list[str] | None = None,
    feature_range: tuple[float, float] = (0.0, 1.0),
) -> None:
    super().__init__(columns)
    self.feature_range = feature_range

RobustScaler

RobustScaler(
    columns=None,
    with_centering=True,
    with_scaling=True,
    quantile_range=(25.0, 75.0),
)

Bases: BaseScaler

Scale using median and IQR (robust to outliers).

Uses median instead of mean, and interquartile range (IQR) instead of std.

Parameters

columns : list[str] | None Columns to scale. If None, all numeric columns are scaled. with_centering : bool, default True Center data by subtracting median. with_scaling : bool, default True Scale data by dividing by IQR. quantile_range : tuple[float, float], default (25.0, 75.0) Quantile range for IQR calculation.

Examples

scaler = RobustScaler() train_scaled = scaler.fit_transform(train_df) test_scaled = scaler.transform(test_df)

Source code in src/ml4t/engineer/preprocessing.py
def __init__(
    self,
    columns: list[str] | None = None,
    with_centering: bool = True,
    with_scaling: bool = True,
    quantile_range: tuple[float, float] = (25.0, 75.0),
) -> None:
    super().__init__(columns)
    self.with_centering = with_centering
    self.with_scaling = with_scaling
    self.quantile_range = quantile_range

PreprocessingPipeline

PreprocessingPipeline(
    recommendations=None,
    min_confidence=0.0,
    winsorize_limits=(0.01, 0.99),
)

Apply preprocessing recommendations from ML4T Diagnostic.

This class enables bidirectional integration between ML4T Diagnostic and ML4T Engineer. After diagnostic evaluates features, it can recommend transforms which this pipeline applies with proper train/test separation.

The pipeline follows sklearn conventions: - fit(X): Learn statistics from training data only - transform(X): Apply transforms using fitted statistics - fit_transform(X): Combined fit and transform

Parameters

recommendations : dict | None Feature recommendations from FeatureEvaluatorConfig (ml4t-diagnostic). Format: {"feature_name": {"transform": "standardize", "confidence": 0.9}} min_confidence : float, default 0.0 Minimum confidence threshold for applying recommendations. Recommendations below this threshold default to NONE. winsorize_limits : tuple[float, float], default (0.01, 0.99) Percentile limits for winsorization.

Examples

From ML4T Diagnostic recommendations

recommendations = { ... "rsi_14": {"transform": "standardize", "confidence": 0.9}, ... "returns": {"transform": "winsorize", "confidence": 0.85}, ... "volume": {"transform": "log", "confidence": 0.8} ... } pipeline = PreprocessingPipeline.from_recommendations(recommendations) train_transformed = pipeline.fit_transform(train_df) test_transformed = pipeline.transform(test_df)

Serialize for production

pipeline_dict = pipeline.to_dict()

... save to disk ...

loaded_pipeline = PreprocessingPipeline.from_dict(pipeline_dict)

Initialize pipeline with recommendations.

Source code in src/ml4t/engineer/preprocessing.py
def __init__(
    self,
    recommendations: dict[str, dict[str, Any]] | None = None,
    min_confidence: float = 0.0,
    winsorize_limits: tuple[float, float] = (0.01, 0.99),
) -> None:
    """Initialize pipeline with recommendations."""
    self._recommendations = recommendations or {}
    self._min_confidence = min_confidence
    self._winsorize_limits = winsorize_limits
    self._is_fitted = False
    self._statistics: dict[str, dict[str, Any]] = {}
    self._fitted_features: list[str] = []

is_fitted property

is_fitted

Return whether pipeline has been fitted.

from_recommendations classmethod

from_recommendations(
    recommendations,
    min_confidence=0.0,
    winsorize_limits=(0.01, 0.99),
)

Create pipeline from diagnostic recommendations.

Parameters

recommendations : dict Output from FeatureEvaluatorConfig (ml4t-diagnostic) or similar format. Expected structure: {"feature": {"transform": "...", "confidence": ...}} min_confidence : float, default 0.0 Minimum confidence threshold. winsorize_limits : tuple, default (0.01, 0.99) Percentile limits for winsorization.

Returns

PreprocessingPipeline Configured pipeline ready for fitting.

Source code in src/ml4t/engineer/preprocessing.py
@classmethod
def from_recommendations(
    cls,
    recommendations: dict[str, dict[str, Any]],
    min_confidence: float = 0.0,
    winsorize_limits: tuple[float, float] = (0.01, 0.99),
) -> PreprocessingPipeline:
    """Create pipeline from diagnostic recommendations.

    Parameters
    ----------
    recommendations : dict
        Output from FeatureEvaluatorConfig (ml4t-diagnostic) or similar format.
        Expected structure: {"feature": {"transform": "...", "confidence": ...}}
    min_confidence : float, default 0.0
        Minimum confidence threshold.
    winsorize_limits : tuple, default (0.01, 0.99)
        Percentile limits for winsorization.

    Returns
    -------
    PreprocessingPipeline
        Configured pipeline ready for fitting.
    """
    return cls(
        recommendations=recommendations,
        min_confidence=min_confidence,
        winsorize_limits=winsorize_limits,
    )

fit

fit(X)

Fit pipeline on training data.

Computes statistics needed for each transform from training data only.

Parameters

X : pl.DataFrame Training data with feature columns.

Returns

self Fitted pipeline.

Source code in src/ml4t/engineer/preprocessing.py
def fit(self, X: pl.DataFrame) -> PreprocessingPipeline:
    """Fit pipeline on training data.

    Computes statistics needed for each transform from training data only.

    Parameters
    ----------
    X : pl.DataFrame
        Training data with feature columns.

    Returns
    -------
    self
        Fitted pipeline.
    """
    self._statistics = {}
    self._fitted_features = []

    for feature in X.columns:
        if feature in self._recommendations:
            transform = self._get_transform_type(feature)
            self._statistics[feature] = self._compute_statistics(X, feature, transform)
            self._fitted_features.append(feature)

    self._is_fitted = True
    return self

transform

transform(X)

Transform data using fitted statistics.

Parameters

X : pl.DataFrame Data to transform.

Returns

pl.DataFrame Transformed data.

Raises

NotFittedError If pipeline has not been fitted.

Source code in src/ml4t/engineer/preprocessing.py
def transform(self, X: pl.DataFrame) -> pl.DataFrame:
    """Transform data using fitted statistics.

    Parameters
    ----------
    X : pl.DataFrame
        Data to transform.

    Returns
    -------
    pl.DataFrame
        Transformed data.

    Raises
    ------
    NotFittedError
        If pipeline has not been fitted.
    """
    if not self._is_fitted:
        raise NotFittedError("Pipeline has not been fitted. Call fit() first.")

    exprs = []
    for feature in X.columns:
        if feature in self._recommendations:
            transform = self._get_transform_type(feature)
            exprs.append(self._apply_transform(X, feature, transform))
        else:
            exprs.append(pl.col(feature))

    return X.select(exprs)

fit_transform

fit_transform(X)

Fit and transform in one step.

Parameters

X : pl.DataFrame Training data.

Returns

pl.DataFrame Transformed training data.

Source code in src/ml4t/engineer/preprocessing.py
def fit_transform(self, X: pl.DataFrame) -> pl.DataFrame:
    """Fit and transform in one step.

    Parameters
    ----------
    X : pl.DataFrame
        Training data.

    Returns
    -------
    pl.DataFrame
        Transformed training data.
    """
    return self.fit(X).transform(X)

to_dict

to_dict()

Serialize pipeline state for persistence.

Returns

dict Serializable representation of fitted pipeline.

Source code in src/ml4t/engineer/preprocessing.py
def to_dict(self) -> dict[str, Any]:
    """Serialize pipeline state for persistence.

    Returns
    -------
    dict
        Serializable representation of fitted pipeline.
    """
    if not self._is_fitted:
        raise NotFittedError("Pipeline has not been fitted. Call fit() first.")

    return {
        "recommendations": self._recommendations,
        "min_confidence": self._min_confidence,
        "winsorize_limits": list(self._winsorize_limits),
        "statistics": self._statistics,
        "fitted_features": self._fitted_features,
    }

from_dict classmethod

from_dict(data)

Load fitted pipeline from serialized state.

Parameters

data : dict Output from to_dict().

Returns

PreprocessingPipeline Reconstructed fitted pipeline.

Source code in src/ml4t/engineer/preprocessing.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PreprocessingPipeline:
    """Load fitted pipeline from serialized state.

    Parameters
    ----------
    data : dict
        Output from to_dict().

    Returns
    -------
    PreprocessingPipeline
        Reconstructed fitted pipeline.
    """
    pipeline = cls(
        recommendations=data["recommendations"],
        min_confidence=data.get("min_confidence", 0.0),
        winsorize_limits=tuple(data.get("winsorize_limits", (0.01, 0.99))),
    )
    pipeline._statistics = data["statistics"]
    pipeline._fitted_features = data["fitted_features"]
    pipeline._is_fitted = True
    return pipeline

get_transform_summary

get_transform_summary()

Get summary of transforms to be applied.

Returns

dict Mapping of feature names to transform types.

Source code in src/ml4t/engineer/preprocessing.py
def get_transform_summary(self) -> dict[str, str]:
    """Get summary of transforms to be applied.

    Returns
    -------
    dict
        Mapping of feature names to transform types.
    """
    return {
        feature: self._get_transform_type(feature).value for feature in self._recommendations
    }

__repr__

__repr__()

Return string representation.

Source code in src/ml4t/engineer/preprocessing.py
def __repr__(self) -> str:
    """Return string representation."""
    n_recs = len(self._recommendations)
    fitted_str = "fitted" if self._is_fitted else "not fitted"
    return f"PreprocessingPipeline(features={n_recs}, {fitted_str})"

TransformType

Bases: str, Enum

Transform types supported by PreprocessingPipeline.

These align with ml4t.diagnostic.integration.engineer_contract.TransformType.

NotFittedError

Bases: Exception

Raised when transform is called before fit.

Configuration

config

ML4T Engineer Configuration System.

This module provides Pydantic v2 configuration schemas for feature engineering:

  • Labeling: Triple barrier, ATR barrier, fixed horizon, trend scanning
  • Preprocessing: Standard, MinMax, Robust scalers with create_scaler()
  • Data Contract: Schema validation for input data
  • Experiment: Experiment configuration and serialization
Note

Feature evaluation configs (StationarityConfig, ACFConfig, etc.) have moved to ml4t-diagnostic. Install with: pip install ml4t-diagnostic

LabelingConfig

Bases: BaseConfig

Unified configuration for all labeling methods.

Extends BaseConfig for full JSON/YAML serialization support. Supports multiple labeling methods via the method discriminator.

All barrier distances are specified as POSITIVE values representing the distance from the entry price. The position side determines the direction of the barriers.

Attributes

method : str Labeling method: "triple_barrier", "atr_barrier", "fixed_horizon", "trend_scanning", "percentile" price_col : str Price column for barrier calculations (typically 'close') timestamp_col : str Timestamp column for duration calculations

Triple Barrier Parameters

upper_barrier : float | str | None Upper barrier distance or column name for dynamic barriers lower_barrier : float | str | None Lower barrier distance or column name for dynamic barriers max_holding_period : int | str Maximum holding period in bars or column name side : int | str | None Position side: 1 (long), -1 (short), 0/None (symmetric) trailing_stop : bool | float | str Enable trailing stop or specify percentage/column

ATR Barrier Parameters

atr_tp_multiple : float ATR multiplier for take profit (e.g., 2.0 = 2x ATR) atr_sl_multiple : float ATR multiplier for stop loss (e.g., 1.0 = 1x ATR) atr_period : int ATR calculation period (Wilder's default: 14)

Fixed Horizon Parameters

horizon : int Forward-looking period in bars return_method : str Return calculation: "returns", "log_returns", "binary" threshold : float | None Binary classification threshold

Trend Scanning Parameters

min_horizon : int Minimum lookforward period max_horizon : int Maximum lookforward period t_value_threshold : float T-statistic threshold for trend significance

Examples

Triple barrier with fixed barriers

config = LabelingConfig( ... method="triple_barrier", ... upper_barrier=0.02, ... lower_barrier=0.01, ... max_holding_period=20, ... side=1, ... ) config.to_yaml("config.yaml")

ATR-adjusted barriers

config = LabelingConfig.atr_barrier( ... atr_tp_multiple=2.0, ... atr_sl_multiple=1.0, ... max_holding_period=20, ... )

Load from file

config = LabelingConfig.from_yaml("config.yaml")

validate_side classmethod

validate_side(v)

Validate side is valid.

Source code in src/ml4t/engineer/config/labeling.py
@field_validator("side")
@classmethod
def validate_side(cls, v: int | str | None) -> int | str | None:
    """Validate side is valid."""
    if isinstance(v, int) and v not in (-1, 0, 1):
        raise ValueError("side must be -1, 0, 1, or a column name")
    return v

validate_max_horizon classmethod

validate_max_horizon(v, info)

Ensure max_horizon >= min_horizon.

Source code in src/ml4t/engineer/config/labeling.py
@field_validator("max_horizon")
@classmethod
def validate_max_horizon(cls, v: int, info: Any) -> int:
    """Ensure max_horizon >= min_horizon."""
    # Note: info.data contains already-validated fields
    if "min_horizon" in info.data and v < info.data["min_horizon"]:
        raise ValueError("max_horizon must be >= min_horizon")
    return v

triple_barrier classmethod

triple_barrier(
    upper_barrier=0.02,
    lower_barrier=0.01,
    max_holding_period=20,
    side=1,
    trailing_stop=False,
    **kwargs,
)

Create triple barrier labeling config.

Parameters

upper_barrier : float | str | None Take profit barrier (2% = 0.02) or column name lower_barrier : float | str | None Stop loss barrier (1% = 0.01) or column name max_holding_period : int | str | timedelta Maximum holding period: - int: Number of bars - str: Duration string ('4h', '1d') or column name - timedelta: Python timedelta object side : int | str | None Position direction: 1 (long), -1 (short) trailing_stop : bool | float | str Enable trailing stop

Returns

LabelingConfig Configured for triple barrier method

Examples

config = LabelingConfig.triple_barrier(0.02, 0.01, 20) config.to_yaml("triple_barrier.yaml")

Time-based max holding period

config = LabelingConfig.triple_barrier(0.02, 0.01, "4h")

Using timedelta

from datetime import timedelta config = LabelingConfig.triple_barrier(0.02, 0.01, timedelta(hours=4))

Source code in src/ml4t/engineer/config/labeling.py
@classmethod
def triple_barrier(
    cls,
    upper_barrier: float | str | None = 0.02,
    lower_barrier: float | str | None = 0.01,
    max_holding_period: int | str | timedelta = 20,
    side: int | str | None = 1,
    trailing_stop: bool | float | str = False,
    **kwargs: Any,
) -> LabelingConfig:
    """Create triple barrier labeling config.

    Parameters
    ----------
    upper_barrier : float | str | None
        Take profit barrier (2% = 0.02) or column name
    lower_barrier : float | str | None
        Stop loss barrier (1% = 0.01) or column name
    max_holding_period : int | str | timedelta
        Maximum holding period:
        - int: Number of bars
        - str: Duration string ('4h', '1d') or column name
        - timedelta: Python timedelta object
    side : int | str | None
        Position direction: 1 (long), -1 (short)
    trailing_stop : bool | float | str
        Enable trailing stop

    Returns
    -------
    LabelingConfig
        Configured for triple barrier method

    Examples
    --------
    >>> config = LabelingConfig.triple_barrier(0.02, 0.01, 20)
    >>> config.to_yaml("triple_barrier.yaml")

    >>> # Time-based max holding period
    >>> config = LabelingConfig.triple_barrier(0.02, 0.01, "4h")

    >>> # Using timedelta
    >>> from datetime import timedelta
    >>> config = LabelingConfig.triple_barrier(0.02, 0.01, timedelta(hours=4))
    """
    return cls(
        method="triple_barrier",
        upper_barrier=upper_barrier,
        lower_barrier=lower_barrier,
        max_holding_period=max_holding_period,
        side=side,
        trailing_stop=trailing_stop,
        **kwargs,
    )

atr_barrier classmethod

atr_barrier(
    atr_tp_multiple=2.0,
    atr_sl_multiple=1.0,
    atr_period=14,
    max_holding_period=20,
    side=1,
    trailing_stop=False,
    **kwargs,
)

Create ATR-adjusted barrier labeling config.

Volatility-adaptive barriers that adjust to market conditions.

Parameters

atr_tp_multiple : float ATR multiplier for take profit (e.g., 2.0 = 2x ATR) atr_sl_multiple : float ATR multiplier for stop loss (e.g., 1.0 = 1x ATR) atr_period : int ATR calculation period (default: 14) max_holding_period : int | str | timedelta Maximum holding period: - int: Number of bars - str: Duration string ('4h', '1d') or column name - timedelta: Python timedelta object side : int | str | None Position direction: 1 (long), -1 (short) trailing_stop : bool Enable trailing stop

Returns

LabelingConfig Configured for ATR barrier method

Examples

config = LabelingConfig.atr_barrier(2.0, 1.0, 14) config.to_yaml("atr_barrier.yaml")

Time-based max holding period

config = LabelingConfig.atr_barrier(2.0, 1.0, 14, max_holding_period="4h")

Source code in src/ml4t/engineer/config/labeling.py
@classmethod
def atr_barrier(
    cls,
    atr_tp_multiple: float = 2.0,
    atr_sl_multiple: float = 1.0,
    atr_period: int = 14,
    max_holding_period: int | str | timedelta = 20,
    side: int | str | None = 1,
    trailing_stop: bool = False,
    **kwargs: Any,
) -> LabelingConfig:
    """Create ATR-adjusted barrier labeling config.

    Volatility-adaptive barriers that adjust to market conditions.

    Parameters
    ----------
    atr_tp_multiple : float
        ATR multiplier for take profit (e.g., 2.0 = 2x ATR)
    atr_sl_multiple : float
        ATR multiplier for stop loss (e.g., 1.0 = 1x ATR)
    atr_period : int
        ATR calculation period (default: 14)
    max_holding_period : int | str | timedelta
        Maximum holding period:
        - int: Number of bars
        - str: Duration string ('4h', '1d') or column name
        - timedelta: Python timedelta object
    side : int | str | None
        Position direction: 1 (long), -1 (short)
    trailing_stop : bool
        Enable trailing stop

    Returns
    -------
    LabelingConfig
        Configured for ATR barrier method

    Examples
    --------
    >>> config = LabelingConfig.atr_barrier(2.0, 1.0, 14)
    >>> config.to_yaml("atr_barrier.yaml")

    >>> # Time-based max holding period
    >>> config = LabelingConfig.atr_barrier(2.0, 1.0, 14, max_holding_period="4h")
    """
    return cls(
        method="atr_barrier",
        atr_tp_multiple=atr_tp_multiple,
        atr_sl_multiple=atr_sl_multiple,
        atr_period=atr_period,
        max_holding_period=max_holding_period,
        side=side,
        trailing_stop=trailing_stop,
        **kwargs,
    )

fixed_horizon classmethod

fixed_horizon(
    horizon=10,
    return_method="returns",
    threshold=None,
    **kwargs,
)

Create fixed horizon labeling config.

Simple forward-looking returns over a fixed period.

Parameters

horizon : int Forward-looking period in bars return_method : str "returns", "log_returns", or "binary" threshold : float | None Threshold for binary classification

Returns

LabelingConfig Configured for fixed horizon method

Examples

config = LabelingConfig.fixed_horizon(10, "binary", threshold=0.0)

Source code in src/ml4t/engineer/config/labeling.py
@classmethod
def fixed_horizon(
    cls,
    horizon: int = 10,
    return_method: Literal["returns", "log_returns", "binary"] = "returns",
    threshold: float | None = None,
    **kwargs: Any,
) -> LabelingConfig:
    """Create fixed horizon labeling config.

    Simple forward-looking returns over a fixed period.

    Parameters
    ----------
    horizon : int
        Forward-looking period in bars
    return_method : str
        "returns", "log_returns", or "binary"
    threshold : float | None
        Threshold for binary classification

    Returns
    -------
    LabelingConfig
        Configured for fixed horizon method

    Examples
    --------
    >>> config = LabelingConfig.fixed_horizon(10, "binary", threshold=0.0)
    """
    return cls(
        method="fixed_horizon",
        horizon=horizon,
        return_method=return_method,
        threshold=threshold,
        **kwargs,
    )

trend_scanning classmethod

trend_scanning(
    min_horizon=5,
    max_horizon=20,
    t_value_threshold=2.0,
    **kwargs,
)

Create trend scanning labeling config.

De Prado's trend scanning method using t-statistics.

Parameters

min_horizon : int Minimum lookforward period max_horizon : int Maximum lookforward period t_value_threshold : float T-statistic threshold for trend significance

Returns

LabelingConfig Configured for trend scanning method

Examples

config = LabelingConfig.trend_scanning(5, 20, 2.0)

Source code in src/ml4t/engineer/config/labeling.py
@classmethod
def trend_scanning(
    cls,
    min_horizon: int = 5,
    max_horizon: int = 20,
    t_value_threshold: float = 2.0,
    **kwargs: Any,
) -> LabelingConfig:
    """Create trend scanning labeling config.

    De Prado's trend scanning method using t-statistics.

    Parameters
    ----------
    min_horizon : int
        Minimum lookforward period
    max_horizon : int
        Maximum lookforward period
    t_value_threshold : float
        T-statistic threshold for trend significance

    Returns
    -------
    LabelingConfig
        Configured for trend scanning method

    Examples
    --------
    >>> config = LabelingConfig.trend_scanning(5, 20, 2.0)
    """
    return cls(
        method="trend_scanning",
        min_horizon=min_horizon,
        max_horizon=max_horizon,
        t_value_threshold=t_value_threshold,
        **kwargs,
    )

DataContractConfig

Bases: BaseConfig

Canonical dataframe column mapping shared across ML4T libraries.

from_mapping classmethod

from_mapping(mapping)

Create contract from a generic mapping source.

Source code in src/ml4t/engineer/config/data_contract.py
@classmethod
def from_mapping(cls, mapping: Mapping[str, Any]) -> DataContractConfig:
    """Create contract from a generic mapping source."""
    return cls(**dict(mapping))

from_ml4t_data classmethod

from_ml4t_data()

Create a contract from ml4t-data's canonical multi-asset schema.

Source code in src/ml4t/engineer/config/data_contract.py
@classmethod
def from_ml4t_data(cls) -> DataContractConfig:
    """Create a contract from ml4t-data's canonical multi-asset schema."""
    try:
        from ml4t.data.core import MultiAssetSchema
    except ImportError as exc:  # pragma: no cover - environment-dependent
        msg = (
            "ml4t-data is required to build DataContractConfig.from_ml4t_data(). "
            "Install/enable ml4t-data or provide contract fields explicitly."
        )
        raise ImportError(msg) from exc

    schema = getattr(MultiAssetSchema, "SCHEMA", {})
    schema_cols = {str(col) for col in schema}

    def pick(candidates: tuple[str, ...], default: str | None) -> str | None:
        for candidate in candidates:
            if candidate in schema_cols:
                return candidate
        return default

    close_col = pick(("close", "close_price", "last", "last_price"), "close")
    return cls(
        timestamp_col=pick(("timestamp", "ts", "ts_event", "datetime", "date"), "timestamp"),
        symbol_col=pick(("symbol", "ticker", "asset", "asset_id"), None),
        price_col=pick(
            ("close", "close_price", "price", "mid_price", "last", "last_price"),
            close_col,
        ),
        open_col=pick(("open", "open_price"), "open"),
        high_col=pick(("high", "high_price"), "high"),
        low_col=pick(("low", "low_price"), "low"),
        close_col=close_col,
        volume_col=pick(("volume", "volume_base", "size"), "volume"),
    )

PreprocessingConfig

Bases: BaseConfig

Configuration for preprocessing (feature scaling).

Extends BaseConfig for full JSON/YAML serialization support. Use create_scaler() to instantiate the configured scaler.

Attributes

scaler : str | None Scaler type: "standard", "minmax", "robust", or None (no scaling) columns : list[str] | None Specific columns to scale (None = all numeric columns)

Standard Scaler Parameters

with_mean : bool Center features by removing the mean with_std : bool Scale features to unit variance

MinMax Scaler Parameters

feature_range : tuple[float, float] Target range for scaling (default: (0.0, 1.0))

Robust Scaler Parameters

with_centering : bool Center features using median with_scaling : bool Scale features using IQR quantile_range : tuple[float, float] Quantile range for IQR (default: (25.0, 75.0))

Examples

Standard scaling (z-score normalization)

config = PreprocessingConfig(scaler="standard") scaler = config.create_scaler() train_scaled = scaler.fit_transform(train_features) test_scaled = scaler.transform(test_features) # Uses train statistics

Robust scaling (outlier-resistant)

config = PreprocessingConfig.robust(quantile_range=(10.0, 90.0)) scaler = config.create_scaler()

Serialize for reproducibility

config.to_yaml("preprocessing.yaml")

standard classmethod

standard(
    with_mean=True, with_std=True, columns=None, **kwargs
)

Create StandardScaler config.

Z-score normalization: (x - mean) / std

Parameters

with_mean : bool Center features by removing the mean with_std : bool Scale features to unit variance columns : list[str] | None Columns to scale (None = all)

Returns

PreprocessingConfig Configured for StandardScaler

Examples

config = PreprocessingConfig.standard() scaler = config.create_scaler()

Source code in src/ml4t/engineer/config/preprocessing_config.py
@classmethod
def standard(
    cls,
    with_mean: bool = True,
    with_std: bool = True,
    columns: list[str] | None = None,
    **kwargs: Any,
) -> PreprocessingConfig:
    """Create StandardScaler config.

    Z-score normalization: (x - mean) / std

    Parameters
    ----------
    with_mean : bool
        Center features by removing the mean
    with_std : bool
        Scale features to unit variance
    columns : list[str] | None
        Columns to scale (None = all)

    Returns
    -------
    PreprocessingConfig
        Configured for StandardScaler

    Examples
    --------
    >>> config = PreprocessingConfig.standard()
    >>> scaler = config.create_scaler()
    """
    return cls(
        scaler="standard",
        with_mean=with_mean,
        with_std=with_std,
        columns=columns,
        **kwargs,
    )

minmax classmethod

minmax(feature_range=(0.0, 1.0), columns=None, **kwargs)

Create MinMaxScaler config.

Scales features to [min, max] range.

Parameters

feature_range : tuple[float, float] Target range for scaling (default: (0.0, 1.0)) columns : list[str] | None Columns to scale (None = all)

Returns

PreprocessingConfig Configured for MinMaxScaler

Examples

config = PreprocessingConfig.minmax(feature_range=(-1.0, 1.0)) scaler = config.create_scaler()

Source code in src/ml4t/engineer/config/preprocessing_config.py
@classmethod
def minmax(
    cls,
    feature_range: tuple[float, float] = (0.0, 1.0),
    columns: list[str] | None = None,
    **kwargs: Any,
) -> PreprocessingConfig:
    """Create MinMaxScaler config.

    Scales features to [min, max] range.

    Parameters
    ----------
    feature_range : tuple[float, float]
        Target range for scaling (default: (0.0, 1.0))
    columns : list[str] | None
        Columns to scale (None = all)

    Returns
    -------
    PreprocessingConfig
        Configured for MinMaxScaler

    Examples
    --------
    >>> config = PreprocessingConfig.minmax(feature_range=(-1.0, 1.0))
    >>> scaler = config.create_scaler()
    """
    return cls(
        scaler="minmax",
        feature_range=feature_range,
        columns=columns,
        **kwargs,
    )

robust classmethod

robust(
    with_centering=True,
    with_scaling=True,
    quantile_range=(25.0, 75.0),
    columns=None,
    **kwargs,
)

Create RobustScaler config.

Uses median and IQR, making it robust to outliers.

Parameters

with_centering : bool Center features using median with_scaling : bool Scale features using IQR quantile_range : tuple[float, float] Quantile range for IQR (default: (25.0, 75.0)) columns : list[str] | None Columns to scale (None = all)

Returns

PreprocessingConfig Configured for RobustScaler

Examples

config = PreprocessingConfig.robust(quantile_range=(10.0, 90.0)) scaler = config.create_scaler()

Source code in src/ml4t/engineer/config/preprocessing_config.py
@classmethod
def robust(
    cls,
    with_centering: bool = True,
    with_scaling: bool = True,
    quantile_range: tuple[float, float] = (25.0, 75.0),
    columns: list[str] | None = None,
    **kwargs: Any,
) -> PreprocessingConfig:
    """Create RobustScaler config.

    Uses median and IQR, making it robust to outliers.

    Parameters
    ----------
    with_centering : bool
        Center features using median
    with_scaling : bool
        Scale features using IQR
    quantile_range : tuple[float, float]
        Quantile range for IQR (default: (25.0, 75.0))
    columns : list[str] | None
        Columns to scale (None = all)

    Returns
    -------
    PreprocessingConfig
        Configured for RobustScaler

    Examples
    --------
    >>> config = PreprocessingConfig.robust(quantile_range=(10.0, 90.0))
    >>> scaler = config.create_scaler()
    """
    return cls(
        scaler="robust",
        with_centering=with_centering,
        with_scaling=with_scaling,
        quantile_range=quantile_range,
        columns=columns,
        **kwargs,
    )

none classmethod

none()

Create config with no scaling.

Returns

PreprocessingConfig Configured for no scaling

Examples

config = PreprocessingConfig.none() assert config.create_scaler() is None

Source code in src/ml4t/engineer/config/preprocessing_config.py
@classmethod
def none(cls) -> PreprocessingConfig:
    """Create config with no scaling.

    Returns
    -------
    PreprocessingConfig
        Configured for no scaling

    Examples
    --------
    >>> config = PreprocessingConfig.none()
    >>> assert config.create_scaler() is None
    """
    return cls(scaler=None)

create_scaler

create_scaler()

Create scaler instance from stored parameters.

Returns

BaseScaler | None Configured scaler, or None if scaler="none"

Examples

config = PreprocessingConfig(scaler="standard") scaler = config.create_scaler() train_scaled = scaler.fit_transform(train_features) test_scaled = scaler.transform(test_features)

Source code in src/ml4t/engineer/config/preprocessing_config.py
def create_scaler(self) -> BaseScaler | None:
    """Create scaler instance from stored parameters.

    Returns
    -------
    BaseScaler | None
        Configured scaler, or None if scaler="none"

    Examples
    --------
    >>> config = PreprocessingConfig(scaler="standard")
    >>> scaler = config.create_scaler()
    >>> train_scaled = scaler.fit_transform(train_features)
    >>> test_scaled = scaler.transform(test_features)
    """
    if self.scaler is None:
        return None

    from ml4t.engineer.preprocessing import (
        MinMaxScaler,
        RobustScaler,
        StandardScaler,
    )

    if self.scaler == "standard":
        return StandardScaler(
            columns=self.columns,
            with_mean=self.with_mean,
            with_std=self.with_std,
        )
    elif self.scaler == "minmax":
        return MinMaxScaler(
            columns=self.columns,
            feature_range=self.feature_range,
        )
    elif self.scaler == "robust":
        return RobustScaler(
            columns=self.columns,
            with_centering=self.with_centering,
            with_scaling=self.with_scaling,
            quantile_range=self.quantile_range,
        )
    else:
        raise ValueError(f"Unknown scaler type: {self.scaler}")

ExperimentConfig dataclass

ExperimentConfig(
    features=list(),
    labeling=None,
    preprocessing=None,
    raw=dict(),
)

Container for experiment configuration components.

Holds typed configuration objects for all experiment components, loaded from a single YAML file.

Attributes

features : list[dict] Feature specifications for compute_features() labeling : LabelingConfig | None Labeling configuration (triple barrier, ATR, etc.) preprocessing : PreprocessingConfig | None Preprocessing/scaler configuration raw : dict Raw YAML content for any custom sections

Alternative Bars

bars

Information-driven bars for financial data sampling.

This module implements various bar types that sample data based on information content rather than fixed time intervals:

Standard Event-Driven Bars: - Tick bars: Sample every N ticks - Volume bars: Sample when volume reaches threshold - Dollar bars: Sample when dollar value traded reaches threshold

Advanced Information-Driven Bars: - Imbalance bars: Sample based on order flow imbalance (tick, volume, dollar) - Run bars: Sample based on consecutive buy/sell runs (tick, volume, dollar)

The vectorized implementations are used by default for improved performance. Original implementations are available with the 'Original' suffix if needed.

Based on "Advances in Financial Machine Learning" by Marcos López de Prado.

BarSampler

Bases: ABC

Abstract base class for bar samplers.

Bar samplers transform irregularly spaced tick data into regularly sampled bars based on various criteria (ticks, volume, etc).

sample abstractmethod

sample(data, include_incomplete=False)

Sample bars from tick data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume, side include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled bars with OHLCV and additional information

Source code in src/ml4t/engineer/bars/base.py
@abstractmethod
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample bars from tick data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume, side
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled bars with OHLCV and additional information
    """

FixedTickImbalanceBarSampler

FixedTickImbalanceBarSampler(threshold)

Bases: BarSampler

Sample bars using fixed tick imbalance threshold.

Unlike the adaptive AFML algorithm, this uses a fixed threshold that doesn't change during sampling. This avoids the threshold spiral issue that occurs with adaptive algorithms when order flow is imbalanced.

Recommended for production use - more stable and predictable than the adaptive version.

Parameters

threshold : int Fixed imbalance threshold. Bar forms when |Σ b_t| >= threshold. Typical values: 50-500 depending on desired bar frequency.

Calibration

To calibrate threshold for N bars per day: 1. Compute historical |mean imbalance| per tick 2. threshold ≈ ticks_per_day / N × |2P[b=1] - 1|

Or empirically: test a range and pick threshold giving desired bar count.

Examples

sampler = FixedTickImbalanceBarSampler(threshold=100) bars = sampler.sample(tick_data)

Notes

Advantages over adaptive (AFML) algorithm: - No threshold spiral with imbalanced order flow - Predictable bar count based on imbalance statistics - No feedback loops - stable by construction - Works consistently across all market conditions

Initialize fixed tick imbalance bar sampler.

Parameters

threshold : int Fixed imbalance threshold (positive integer)

Source code in src/ml4t/engineer/bars/imbalance.py
def __init__(self, threshold: int):
    """Initialize fixed tick imbalance bar sampler.

    Parameters
    ----------
    threshold : int
        Fixed imbalance threshold (positive integer)
    """
    if threshold <= 0:
        raise ValueError("threshold must be positive")

    self.threshold = threshold

sample

sample(data, include_incomplete=False)

Sample fixed tick imbalance bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume, side include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled tick imbalance bars

Source code in src/ml4t/engineer/bars/imbalance.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample fixed tick imbalance bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume, side
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled tick imbalance bars
    """
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Tick imbalance bars require 'side' column")

    if len(data) == 0:
        return self._empty_bars_df()

    # Extract arrays
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Calculate bar indices
    bar_indices, cumulative_thetas = _calculate_fixed_tick_imbalance_bars_nb(
        sides, float(self.threshold)
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))
        buy_count = int(np.sum(bar_sides > 0))
        sell_count = int(np.sum(bar_sides < 0))

        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": buy_volume,
                "sell_volume": sell_volume,
                "buy_count": buy_count,
                "sell_count": sell_count,
                "tick_imbalance": buy_count - sell_count,
                "cumulative_theta": float(cumulative_thetas[i]),
                "threshold": float(self.threshold),
            },
        )
        bars.append(bar)
        start_idx = end_idx + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)
        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))
            buy_count = int(np.sum(bar_sides > 0))
            sell_count = int(np.sum(bar_sides < 0))
            cumulative_theta = float(np.sum(bar_sides))

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": buy_volume,
                    "sell_volume": sell_volume,
                    "buy_count": buy_count,
                    "sell_count": sell_count,
                    "tick_imbalance": buy_count - sell_count,
                    "cumulative_theta": cumulative_theta,
                    "threshold": float(self.threshold),
                },
            )
            bars.append(bar)

    if not bars:
        return self._empty_bars_df()

    return pl.DataFrame(bars)

FixedVolumeImbalanceBarSampler

FixedVolumeImbalanceBarSampler(threshold)

Bases: BarSampler

Sample bars using fixed volume imbalance threshold.

Unlike the adaptive AFML algorithm, this uses a fixed threshold that doesn't change during sampling. This avoids instability issues that occur with adaptive algorithms.

Recommended for production use - more stable and predictable than the adaptive version.

Parameters

threshold : float Fixed volume imbalance threshold. Bar forms when |Σ b_t × v_t| >= threshold. Typical values: 10,000-1,000,000 depending on stock and desired frequency.

Calibration

To calibrate threshold for N bars per day: 1. Compute historical |mean signed volume| per tick 2. threshold ≈ ticks_per_day / N × E[|signed_volume|]

Or empirically: test a range and pick threshold giving desired bar count.

Examples

sampler = FixedVolumeImbalanceBarSampler(threshold=50000) bars = sampler.sample(tick_data)

Notes

Advantages over adaptive (AFML) algorithm: - No threshold spiral or collapse - Predictable bar count based on volume imbalance statistics - No feedback loops - stable by construction - Works consistently across all market conditions

Initialize fixed volume imbalance bar sampler.

Parameters

threshold : float Fixed volume imbalance threshold (positive)

Source code in src/ml4t/engineer/bars/imbalance.py
def __init__(self, threshold: float):
    """Initialize fixed volume imbalance bar sampler.

    Parameters
    ----------
    threshold : float
        Fixed volume imbalance threshold (positive)
    """
    if threshold <= 0:
        raise ValueError("threshold must be positive")

    self.threshold = threshold

sample

sample(data, include_incomplete=False)

Sample fixed volume imbalance bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume, side include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled volume imbalance bars

Source code in src/ml4t/engineer/bars/imbalance.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample fixed volume imbalance bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume, side
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled volume imbalance bars
    """
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Volume imbalance bars require 'side' column")

    if len(data) == 0:
        return self._empty_bars_df()

    # Extract arrays
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Calculate bar indices
    bar_indices, cumulative_thetas = _calculate_fixed_volume_imbalance_bars_nb(
        volumes, sides, float(self.threshold)
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))

        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": buy_volume,
                "sell_volume": sell_volume,
                "volume_imbalance": buy_volume - sell_volume,
                "cumulative_theta": float(cumulative_thetas[i]),
                "threshold": float(self.threshold),
            },
        )
        bars.append(bar)
        start_idx = end_idx + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)
        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))
            cumulative_theta = float(np.sum(bar_volumes * bar_sides))

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": buy_volume,
                    "sell_volume": sell_volume,
                    "volume_imbalance": buy_volume - sell_volume,
                    "cumulative_theta": cumulative_theta,
                    "threshold": float(self.threshold),
                },
            )
            bars.append(bar)

    if not bars:
        return self._empty_bars_df()

    return pl.DataFrame(bars)

TickImbalanceBarSampler

TickImbalanceBarSampler(
    expected_ticks_per_bar,
    alpha=0.1,
    initial_p_buy=0.5,
    min_bars_warmup=10,
)

Bases: BarSampler

Sample bars based on tick count imbalance (AFML-compliant TIBs).

Tick Imbalance Bars (TIBs) sample when the cumulative signed tick count (number of buys - number of sells) reaches a dynamically adjusted threshold.

AFML Threshold Formula

θ = Σ b_t (sum of trade signs) E[θ_T] = E[T] × |2P[b=1] - 1|

Where

E[T] = EWMA of bar lengths (ticks per bar) P[b=1] = probability of buy

This produces bar counts comparable to tick bars (both count ticks), unlike Volume Imbalance Bars which have thresholds scaled by volume.

Parameters

expected_ticks_per_bar : int Expected number of ticks per bar (used to initialize E[T]) alpha : float, default 0.1 EWMA decay factor for updating expectations initial_p_buy : float, default 0.5 Initial buy probability P[b=1] min_bars_warmup : int, default 10 Number of bars before starting EWMA updates

Examples

sampler = TickImbalanceBarSampler( ... expected_ticks_per_bar=1000, ... alpha=0.1 ... ) bars = sampler.sample(tick_data)

References

.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. John Wiley & Sons. Chapter 2.3: Information-Driven Bars.

Initialize tick imbalance bar sampler.

Parameters

expected_ticks_per_bar : int Expected number of ticks per bar alpha : float, default 0.1 EWMA decay factor initial_p_buy : float, default 0.5 Initial buy probability P[b=1] min_bars_warmup : int, default 10 Number of bars before starting EWMA updates

Source code in src/ml4t/engineer/bars/imbalance.py
def __init__(
    self,
    expected_ticks_per_bar: int,
    alpha: float = 0.1,
    initial_p_buy: float = 0.5,
    min_bars_warmup: int = 10,
):
    """Initialize tick imbalance bar sampler.

    Parameters
    ----------
    expected_ticks_per_bar : int
        Expected number of ticks per bar
    alpha : float, default 0.1
        EWMA decay factor
    initial_p_buy : float, default 0.5
        Initial buy probability P[b=1]
    min_bars_warmup : int, default 10
        Number of bars before starting EWMA updates
    """
    if expected_ticks_per_bar <= 0:
        raise ValueError("expected_ticks_per_bar must be positive")

    if not 0 < alpha <= 1:
        raise ValueError("alpha must be in (0, 1]")

    if not 0 <= initial_p_buy <= 1:
        raise ValueError("initial_p_buy must be in [0, 1]")

    if min_bars_warmup < 0:
        raise ValueError("min_bars_warmup must be non-negative")

    self.expected_ticks_per_bar = expected_ticks_per_bar
    self.alpha = alpha
    self.initial_p_buy = initial_p_buy
    self.min_bars_warmup = min_bars_warmup

sample

sample(data, include_incomplete=False)

Sample tick imbalance bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume, side include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled tick imbalance bars with AFML diagnostic columns: - expected_t: E[T] at bar formation - p_buy: P[b=1] at bar formation - expected_imbalance: AFML threshold E[θ_T] - cumulative_theta: Actual tick imbalance at bar formation

Source code in src/ml4t/engineer/bars/imbalance.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample tick imbalance bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume, side
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled tick imbalance bars with AFML diagnostic columns:
        - expected_t: E[T] at bar formation
        - p_buy: P[b=1] at bar formation
        - expected_imbalance: AFML threshold E[θ_T]
        - cumulative_theta: Actual tick imbalance at bar formation
    """
    # Validate input
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Tick imbalance bars require 'side' column")

    if len(data) == 0:
        return self._empty_tick_imbalance_bars_df()

    # Extract arrays
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Estimate initial P[b=1] from warmup data
    warmup_size = min(1000, len(sides))
    warmup_sides = sides[:warmup_size]
    estimated_p_buy = float(np.mean(warmup_sides > 0))

    # Use provided initial_p_buy or estimated
    p_buy_init = self.initial_p_buy if self.initial_p_buy != 0.5 else estimated_p_buy

    # Calculate bar indices using AFML-compliant Numba function
    (
        bar_indices,
        expected_thetas,
        cumulative_thetas,
        expected_ts,
        p_buys,
    ) = _calculate_tick_imbalance_bars_nb(
        sides,
        float(self.expected_ticks_per_bar),
        p_buy_init,
        self.alpha,
        self.min_bars_warmup,
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        # Extract bar data
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)

        # Calculate metrics
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume: float = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume: float = float(np.sum(bar_volumes[bar_sides < 0]))
        buy_count = int(np.sum(bar_sides > 0))
        sell_count = int(np.sum(bar_sides < 0))

        # Create bar with AFML diagnostic columns
        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": float(buy_volume),
                "sell_volume": float(sell_volume),
                "buy_count": buy_count,
                "sell_count": sell_count,
                "tick_imbalance": buy_count - sell_count,
                "cumulative_theta": float(cumulative_thetas[i]),
                "expected_imbalance": float(expected_thetas[i]),
                # AFML diagnostic columns
                "expected_t": float(expected_ts[i]),
                "p_buy": float(p_buys[i]),
            },
        )
        bars.append(bar)

        start_idx = end_idx + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_vol_incomplete: float = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_vol_incomplete: float = float(np.sum(bar_volumes[bar_sides < 0]))
            buy_count_incomplete = int(np.sum(bar_sides > 0))
            sell_count_incomplete = int(np.sum(bar_sides < 0))

            # Calculate current cumulative theta (tick count)
            cumulative_theta: float = float(np.sum(bar_sides))

            # Use last values or initial
            last_expected_t = (
                expected_ts[-1] if len(expected_ts) > 0 else float(self.expected_ticks_per_bar)
            )
            last_p_buy = p_buys[-1] if len(p_buys) > 0 else p_buy_init
            expected_imbalance = last_expected_t * abs(2 * last_p_buy - 1)

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": float(buy_vol_incomplete),
                    "sell_volume": float(sell_vol_incomplete),
                    "buy_count": buy_count_incomplete,
                    "sell_count": sell_count_incomplete,
                    "tick_imbalance": buy_count_incomplete - sell_count_incomplete,
                    "cumulative_theta": float(cumulative_theta),
                    "expected_imbalance": float(expected_imbalance),
                    "expected_t": float(last_expected_t),
                    "p_buy": float(last_p_buy),
                },
            )
            bars.append(bar)

    # Convert to DataFrame
    if not bars:
        return self._empty_tick_imbalance_bars_df()

    return pl.DataFrame(bars)

WindowTickImbalanceBarSampler

WindowTickImbalanceBarSampler(
    initial_expected_t, bar_window=10, tick_window=1000
)

Bases: BarSampler

Sample tick imbalance bars using window-based estimation.

Alternative to α-based EWMA that uses rolling windows instead of exponential decay for parameter estimation.

Key difference from α-based version: - E[T] computed from rolling mean of last N bar lengths - P[b=1] computed from rolling mean of last M tick signs - Old data falls out of windows → bounded adaptation → no threshold spiral

Parameters

initial_expected_t : int Initial expected ticks per bar (before first bar forms) bar_window : int, default 10 Number of recent bars to average for E[T] estimation tick_window : int, default 1000 Number of recent ticks to average for P[b=1] estimation

Examples

sampler = WindowTickImbalanceBarSampler( ... initial_expected_t=1000, ... bar_window=10, # E[T] from last 10 bars ... tick_window=5000, # P[b=1] from last 5000 ticks ... ) bars = sampler.sample(tick_data)

Notes

Recommended settings: - bar_window: 5-20 (small, since bar count is limited) - tick_window: 1000-10000 (large, for stable P[b=1] estimate) - initial_expected_t: Rough estimate of ticks per bar

Source code in src/ml4t/engineer/bars/imbalance.py
def __init__(
    self,
    initial_expected_t: int,
    bar_window: int = 10,
    tick_window: int = 1000,
):
    if initial_expected_t <= 0:
        raise ValueError("initial_expected_t must be positive")
    if bar_window <= 0:
        raise ValueError("bar_window must be positive")
    if tick_window <= 0:
        raise ValueError("tick_window must be positive")

    self.initial_expected_t = initial_expected_t
    self.bar_window = bar_window
    self.tick_window = tick_window

sample

sample(data, include_incomplete=False)

Sample window-based tick imbalance bars from data.

Source code in src/ml4t/engineer/bars/imbalance.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample window-based tick imbalance bars from data."""
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Tick imbalance bars require 'side' column")

    if len(data) == 0:
        return self._empty_bars_df()

    # Extract arrays
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Calculate bar indices
    (
        bar_indices,
        expected_thetas,
        cumulative_thetas,
        expected_ts,
        p_buys,
    ) = _calculate_window_tick_imbalance_bars_nb(
        sides,
        self.initial_expected_t,
        self.bar_window,
        self.tick_window,
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))
        buy_count = int(np.sum(bar_sides > 0))
        sell_count = int(np.sum(bar_sides < 0))

        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": buy_volume,
                "sell_volume": sell_volume,
                "buy_count": buy_count,
                "sell_count": sell_count,
                "tick_imbalance": buy_count - sell_count,
                "cumulative_theta": float(cumulative_thetas[i]),
                "expected_imbalance": float(expected_thetas[i]),
                "expected_t": float(expected_ts[i]),
                "p_buy": float(p_buys[i]),
            },
        )
        bars.append(bar)
        start_idx = end_idx + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)
        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))
            buy_count = int(np.sum(bar_sides > 0))
            sell_count = int(np.sum(bar_sides < 0))

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": buy_volume,
                    "sell_volume": sell_volume,
                    "buy_count": buy_count,
                    "sell_count": sell_count,
                    "tick_imbalance": buy_count - sell_count,
                    "cumulative_theta": float(np.sum(bar_sides)),
                    "expected_imbalance": (
                        float(expected_thetas[-1]) if expected_thetas else 0.0
                    ),
                    "expected_t": (
                        float(expected_ts[-1])
                        if expected_ts
                        else float(self.initial_expected_t)
                    ),
                    "p_buy": float(p_buys[-1]) if p_buys else 0.5,
                },
            )
            bars.append(bar)

    if not bars:
        return self._empty_bars_df()

    return pl.DataFrame(bars)

WindowVolumeImbalanceBarSampler

WindowVolumeImbalanceBarSampler(
    initial_expected_t, bar_window=10, tick_window=1000
)

Bases: BarSampler

Sample volume imbalance bars using window-based estimation.

Alternative to α-based EWMA that uses rolling windows instead of exponential decay for parameter estimation.

Key difference from α-based version: - E[T] computed from rolling mean of last N bar lengths - Imbalance factor computed from rolling mean of last M signed volumes - Old data falls out of windows → bounded adaptation → no threshold spiral

Parameters

initial_expected_t : int Initial expected ticks per bar (before first bar forms) bar_window : int, default 10 Number of recent bars to average for E[T] estimation tick_window : int, default 1000 Number of recent ticks to average for imbalance estimation

Examples

sampler = WindowVolumeImbalanceBarSampler( ... initial_expected_t=5000, ... bar_window=10, # E[T] from last 10 bars ... tick_window=5000, # Imbalance from last 5000 ticks ... ) bars = sampler.sample(tick_data)

Source code in src/ml4t/engineer/bars/imbalance.py
def __init__(
    self,
    initial_expected_t: int,
    bar_window: int = 10,
    tick_window: int = 1000,
):
    if initial_expected_t <= 0:
        raise ValueError("initial_expected_t must be positive")
    if bar_window <= 0:
        raise ValueError("bar_window must be positive")
    if tick_window <= 0:
        raise ValueError("tick_window must be positive")

    self.initial_expected_t = initial_expected_t
    self.bar_window = bar_window
    self.tick_window = tick_window

sample

sample(data, include_incomplete=False)

Sample window-based volume imbalance bars from data.

Source code in src/ml4t/engineer/bars/imbalance.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample window-based volume imbalance bars from data."""
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Volume imbalance bars require 'side' column")

    if len(data) == 0:
        return self._empty_bars_df()

    # Extract arrays
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Calculate bar indices
    (
        bar_indices,
        expected_thetas,
        cumulative_thetas,
        expected_ts,
        imbalance_factors,
    ) = _calculate_window_volume_imbalance_bars_nb(
        volumes,
        sides,
        self.initial_expected_t,
        self.bar_window,
        self.tick_window,
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))

        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": buy_volume,
                "sell_volume": sell_volume,
                "volume_imbalance": buy_volume - sell_volume,
                "cumulative_theta": float(cumulative_thetas[i]),
                "expected_imbalance": float(expected_thetas[i]),
                "expected_t": float(expected_ts[i]),
                "imbalance_factor": float(imbalance_factors[i]),
            },
        )
        bars.append(bar)
        start_idx = end_idx + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)
        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": buy_volume,
                    "sell_volume": sell_volume,
                    "volume_imbalance": buy_volume - sell_volume,
                    "cumulative_theta": float(np.sum(bar_volumes * bar_sides)),
                    "expected_imbalance": (
                        float(expected_thetas[-1]) if expected_thetas else 0.0
                    ),
                    "expected_t": (
                        float(expected_ts[-1])
                        if expected_ts
                        else float(self.initial_expected_t)
                    ),
                    "imbalance_factor": (
                        float(imbalance_factors[-1]) if imbalance_factors else 0.0
                    ),
                },
            )
            bars.append(bar)

    if not bars:
        return self._empty_bars_df()

    return pl.DataFrame(bars)

ImbalanceBarSamplerOriginal

ImbalanceBarSamplerOriginal(
    expected_ticks_per_bar,
    alpha=0.1,
    initial_p_buy=0.5,
    min_bars_warmup=10,
)

Bases: BarSampler

Sample bars based on order flow imbalance (AFML-compliant).

Imbalance bars sample when the cumulative signed volume (buy - sell) reaches a dynamically adjusted threshold based on AFML Chapter 2.3.

AFML Threshold Formula

E[θ_T] = E[T] × |2v⁺ - E[v]|

Where

E[T] = EWMA of bar lengths (ticks per bar) v⁺ = P[b=1] × E[v|b=1] = expected buy volume contribution E[v] = unconditional mean volume per tick

Parameters

expected_ticks_per_bar : int Expected number of ticks per bar (used to initialize E[T]) alpha : float, default 0.1 EWMA decay factor for updating expectations initial_p_buy : float, default 0.5 Initial buy probability P[b=1] min_bars_warmup : int, default 10 Number of bars before starting EWMA updates

Examples

sampler = ImbalanceBarSampler( ... expected_ticks_per_bar=100, ... alpha=0.1 ... ) bars = sampler.sample(tick_data)

References

.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. John Wiley & Sons. Chapter 2.3: Information-Driven Bars.

Source code in src/ml4t/engineer/bars/imbalance.py
def __init__(
    self,
    expected_ticks_per_bar: int,
    alpha: float = 0.1,
    initial_p_buy: float = 0.5,
    min_bars_warmup: int = 10,
):
    if expected_ticks_per_bar <= 0:
        raise ValueError("expected_ticks_per_bar must be positive")

    if not 0 < alpha <= 1:
        raise ValueError("alpha must be in (0, 1]")

    if not 0 <= initial_p_buy <= 1:
        raise ValueError("initial_p_buy must be in [0, 1]")

    if min_bars_warmup < 0:
        raise ValueError("min_bars_warmup must be non-negative")

    self.expected_ticks_per_bar = expected_ticks_per_bar
    self.alpha = alpha
    self.initial_p_buy = initial_p_buy
    self.min_bars_warmup = min_bars_warmup

    # Will be estimated from data
    self._initial_v_buy: float | None = None
    self._initial_v: float | None = None

sample

sample(data, include_incomplete=False)

Sample imbalance bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume, side include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled imbalance bars with AFML diagnostic columns: - expected_t: E[T] at bar formation - p_buy: P[b=1] at bar formation - v_plus: v⁺ = P[b=1] × E[v|b=1] at bar formation - e_v: E[v] at bar formation - expected_imbalance: AFML threshold E[θ_T] - cumulative_theta: Actual imbalance at bar formation

Source code in src/ml4t/engineer/bars/imbalance.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample imbalance bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume, side
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled imbalance bars with AFML diagnostic columns:
        - expected_t: E[T] at bar formation
        - p_buy: P[b=1] at bar formation
        - v_plus: v⁺ = P[b=1] × E[v|b=1] at bar formation
        - e_v: E[v] at bar formation
        - expected_imbalance: AFML threshold E[θ_T]
        - cumulative_theta: Actual imbalance at bar formation
    """
    # Validate input
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Imbalance bars require 'side' column")

    if len(data) == 0:
        return self._empty_imbalance_bars_df()

    # Extract arrays
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Estimate initial values from data if not already set
    warmup_size = min(1000, len(volumes))
    warmup_volumes = volumes[:warmup_size]
    warmup_sides = sides[:warmup_size]

    if self._initial_v is None:
        self._initial_v = float(np.mean(warmup_volumes))

    if self._initial_v_buy is None:
        buy_mask = warmup_sides > 0
        if np.any(buy_mask):
            self._initial_v_buy = float(np.mean(warmup_volumes[buy_mask]))
        else:
            self._initial_v_buy = self._initial_v

    # Calculate bar indices using AFML-compliant Numba function
    (
        bar_indices,
        expected_thetas,
        cumulative_thetas,
        expected_ts,
        p_buys,
        v_pluses,
        e_vs,
    ) = _calculate_imbalance_bars_nb(
        volumes,
        sides,
        float(self.expected_ticks_per_bar),
        self.initial_p_buy,
        self._initial_v_buy,
        self._initial_v,
        self.alpha,
        self.min_bars_warmup,
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        # Extract bar data
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)

        # Calculate metrics
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume: float = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume: float = float(np.sum(bar_volumes[bar_sides < 0]))
        imbalance = buy_volume - sell_volume

        # Create bar with AFML diagnostic columns
        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": float(buy_volume),
                "sell_volume": float(sell_volume),
                "imbalance": float(imbalance),
                "cumulative_theta": float(cumulative_thetas[i]),
                "expected_imbalance": float(expected_thetas[i]),
                # AFML diagnostic columns
                "expected_t": float(expected_ts[i]),
                "p_buy": float(p_buys[i]),
                "v_plus": float(v_pluses[i]),
                "e_v": float(e_vs[i]),
            },
        )
        bars.append(bar)

        start_idx = end_idx + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_vol_incomplete: float = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_vol_incomplete: float = float(np.sum(bar_volumes[bar_sides < 0]))
            imbalance_incomplete: float = buy_vol_incomplete - sell_vol_incomplete

            # Calculate current cumulative theta
            cumulative_theta: float = float(np.sum(bar_volumes * bar_sides))

            # Use last values or initial
            last_expected_t = (
                expected_ts[-1] if len(expected_ts) > 0 else float(self.expected_ticks_per_bar)
            )
            last_p_buy = p_buys[-1] if len(p_buys) > 0 else self.initial_p_buy
            last_v_plus = (
                v_pluses[-1] if len(v_pluses) > 0 else last_p_buy * self._initial_v_buy
            )
            last_e_v = e_vs[-1] if len(e_vs) > 0 else self._initial_v
            expected_imbalance = last_expected_t * abs(2 * last_v_plus - last_e_v)

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": float(buy_vol_incomplete),
                    "sell_volume": float(sell_vol_incomplete),
                    "imbalance": float(imbalance_incomplete),
                    "cumulative_theta": float(cumulative_theta),
                    "expected_imbalance": float(expected_imbalance),
                    "expected_t": float(last_expected_t),
                    "p_buy": float(last_p_buy),
                    "v_plus": float(last_v_plus),
                    "e_v": float(last_e_v),
                },
            )
            bars.append(bar)

    # Convert to DataFrame
    if not bars:
        return self._empty_imbalance_bars_df()

    return pl.DataFrame(bars)

DollarRunBarSampler

DollarRunBarSampler(
    expected_ticks_per_bar,
    alpha=0.1,
    initial_p_buy=0.5,
    min_bars_warmup=10,
)

Bases: BarSampler

Sample bars based on cumulative dollar value runs (AFML-compliant).

AFML Chapter 2.3 formula with dollar weighting: θ_T = max{Σ(buy dollars in bar), Σ(sell dollars in bar)}

Parameters

expected_ticks_per_bar : int Expected number of ticks per bar alpha : float, default 0.1 EWMA decay factor initial_p_buy : float, default 0.5 Initial buy probability P[b=1] min_bars_warmup : int, default 10 Number of bars before starting EWMA updates

Examples

sampler = DollarRunBarSampler(expected_ticks_per_bar=100) bars = sampler.sample(tick_data)

Source code in src/ml4t/engineer/bars/run.py
def __init__(
    self,
    expected_ticks_per_bar: int,
    alpha: float = 0.1,
    initial_p_buy: float = 0.5,
    min_bars_warmup: int = 10,
):
    if expected_ticks_per_bar <= 0:
        raise ValueError("expected_ticks_per_bar must be positive")
    if not 0 < alpha <= 1:
        raise ValueError("alpha must be in (0, 1]")
    if not 0 <= initial_p_buy <= 1:
        raise ValueError("initial_p_buy must be in [0, 1]")
    if min_bars_warmup < 0:
        raise ValueError("min_bars_warmup must be non-negative")

    self.expected_ticks_per_bar = expected_ticks_per_bar
    self.alpha = alpha
    self.initial_p_buy = initial_p_buy
    self.min_bars_warmup = min_bars_warmup

sample

sample(data, include_incomplete=False)

Sample dollar run bars from data.

Source code in src/ml4t/engineer/bars/run.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample dollar run bars from data."""
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Run bars require 'side' column")

    if len(data) == 0:
        return self._empty_run_bars_df()

    prices = data["price"].to_numpy().astype(np.float64)
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)
    dollar_volumes = prices * volumes

    # Estimate initial E[dollar] for scaling
    warmup_size = min(1000, len(dollar_volumes))
    avg_dollar_volume = float(np.mean(dollar_volumes[:warmup_size]))

    # Scale expected_ticks_per_bar by average dollar volume
    initial_expected_t_scaled = float(self.expected_ticks_per_bar) * avg_dollar_volume

    # Calculate bar indices using AFML-compliant Numba function
    (
        bar_indices,
        thetas,
        expected_thetas,
        expected_ts,
        p_buys,
        cumulative_buys,
        cumulative_sells,
    ) = _calculate_run_bars_nb(
        dollar_volumes,  # Use dollar volumes as values
        sides,
        initial_expected_t_scaled,
        self.initial_p_buy,
        self.alpha,
        self.min_bars_warmup,
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]
        bar_dollars = dollar_volumes[start_idx : end_idx + 1]

        buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))
        total_dollars = float(np.sum(bar_dollars))
        total_volume = float(np.sum(bar_volumes))
        vwap = total_dollars / total_volume if total_volume > 0 else 0.0

        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": buy_volume,
                "sell_volume": sell_volume,
                "dollar_volume": total_dollars,
                "vwap": vwap,
                "run_dollars": float(thetas[i]),  # For backward compat
                "expected_run": float(expected_thetas[i]),  # For backward compat
                # AFML diagnostic columns
                "theta": float(thetas[i]),
                "expected_theta": float(expected_thetas[i]),
                "expected_t": float(expected_ts[i]),
                "p_buy": float(p_buys[i]),
                "cumulative_buys": float(cumulative_buys[i]),
                "cumulative_sells": float(cumulative_sells[i]),
            },
        )
        bars.append(bar)

        start_idx = end_idx + 1

    # Handle incomplete bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]
            bar_dollars = dollar_volumes[start_idx:]

            buy_vol = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_vol = float(np.sum(bar_volumes[bar_sides < 0]))
            total_dol = float(np.sum(bar_dollars))
            total_vol = float(np.sum(bar_volumes))
            vwap_val = total_dol / total_vol if total_vol > 0 else 0.0

            # Calculate current theta (cumulative dollars)
            buy_dollars = float(np.sum(bar_dollars[bar_sides > 0]))
            sell_dollars = float(np.sum(bar_dollars[bar_sides < 0]))
            current_theta = max(buy_dollars, sell_dollars)

            last_expected_t = (
                expected_ts[-1] if len(expected_ts) > 0 else initial_expected_t_scaled
            )
            last_p_buy = p_buys[-1] if len(p_buys) > 0 else self.initial_p_buy
            expected_theta = last_expected_t * max(last_p_buy, 1 - last_p_buy)

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": buy_vol,
                    "sell_volume": sell_vol,
                    "dollar_volume": total_dol,
                    "vwap": vwap_val,
                    "run_dollars": float(current_theta),
                    "expected_run": float(expected_theta),
                    "theta": float(current_theta),
                    "expected_theta": float(expected_theta),
                    "expected_t": float(last_expected_t),
                    "p_buy": float(last_p_buy),
                    "cumulative_buys": float(buy_dollars),
                    "cumulative_sells": float(sell_dollars),
                },
            )
            bars.append(bar)

    if not bars:
        return self._empty_run_bars_df()

    return pl.DataFrame(bars)

TickRunBarSampler

TickRunBarSampler(
    expected_ticks_per_bar,
    alpha=0.1,
    initial_p_buy=0.5,
    min_bars_warmup=10,
)

Bases: BarSampler

Sample bars based on cumulative tick runs (AFML-compliant).

AFML Chapter 2.3 formula: θ_T = max{Σ(all buys in bar), Σ(all sells in bar)} E[θ_T] = E[T] × max{P[b=1], 1-P[b=1]}

CRITICAL: Uses CUMULATIVE tick counts within the bar. Direction changes DO NOT reset the counts - only bar boundaries do.

Parameters

expected_ticks_per_bar : int Expected number of ticks per bar (used to initialize E[T]) alpha : float, default 0.1 EWMA decay factor for updating expectations initial_p_buy : float, default 0.5 Initial buy probability P[b=1] min_bars_warmup : int, default 10 Number of bars before starting EWMA updates

Examples

sampler = TickRunBarSampler(expected_ticks_per_bar=100) bars = sampler.sample(tick_data)

References

.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning. John Wiley & Sons. Chapter 2.3: Information-Driven Bars.

Source code in src/ml4t/engineer/bars/run.py
def __init__(
    self,
    expected_ticks_per_bar: int,
    alpha: float = 0.1,
    initial_p_buy: float = 0.5,
    min_bars_warmup: int = 10,
):
    if expected_ticks_per_bar <= 0:
        raise ValueError("expected_ticks_per_bar must be positive")
    if not 0 < alpha <= 1:
        raise ValueError("alpha must be in (0, 1]")
    if not 0 <= initial_p_buy <= 1:
        raise ValueError("initial_p_buy must be in [0, 1]")
    if min_bars_warmup < 0:
        raise ValueError("min_bars_warmup must be non-negative")

    self.expected_ticks_per_bar = expected_ticks_per_bar
    self.alpha = alpha
    self.initial_p_buy = initial_p_buy
    self.min_bars_warmup = min_bars_warmup

sample

sample(data, include_incomplete=False)

Sample tick run bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume, side include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled run bars with AFML diagnostic columns

Source code in src/ml4t/engineer/bars/run.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample tick run bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume, side
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled run bars with AFML diagnostic columns
    """
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Run bars require 'side' column")

    if len(data) == 0:
        return self._empty_run_bars_df()

    # Extract arrays - for tick runs, values are all 1.0
    n = len(data)
    values = np.ones(n, dtype=np.float64)
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Calculate bar indices using AFML-compliant Numba function
    (
        bar_indices,
        thetas,
        expected_thetas,
        expected_ts,
        p_buys,
        cumulative_buys,
        cumulative_sells,
    ) = _calculate_run_bars_nb(
        values,
        sides,
        float(self.expected_ticks_per_bar),
        self.initial_p_buy,
        self.alpha,
        self.min_bars_warmup,
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))

        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": buy_volume,
                "sell_volume": sell_volume,
                "run_length": int(thetas[i]),  # For backward compat
                "expected_run": float(expected_thetas[i]),  # For backward compat
                # AFML diagnostic columns
                "theta": float(thetas[i]),
                "expected_theta": float(expected_thetas[i]),
                "expected_t": float(expected_ts[i]),
                "p_buy": float(p_buys[i]),
                "cumulative_buys": float(cumulative_buys[i]),
                "cumulative_sells": float(cumulative_sells[i]),
            },
        )
        bars.append(bar)

        start_idx = end_idx + 1

    # Handle incomplete bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_vol = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_vol = float(np.sum(bar_volumes[bar_sides < 0]))

            # Calculate current theta (cumulative, not consecutive)
            cum_buys = float(np.sum(bar_sides > 0))
            cum_sells = float(np.sum(bar_sides < 0))
            current_theta = max(cum_buys, cum_sells)

            last_expected_t = (
                expected_ts[-1] if len(expected_ts) > 0 else float(self.expected_ticks_per_bar)
            )
            last_p_buy = p_buys[-1] if len(p_buys) > 0 else self.initial_p_buy
            expected_theta = last_expected_t * max(last_p_buy, 1 - last_p_buy)

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": buy_vol,
                    "sell_volume": sell_vol,
                    "run_length": int(current_theta),
                    "expected_run": float(expected_theta),
                    "theta": float(current_theta),
                    "expected_theta": float(expected_theta),
                    "expected_t": float(last_expected_t),
                    "p_buy": float(last_p_buy),
                    "cumulative_buys": float(cum_buys),
                    "cumulative_sells": float(cum_sells),
                },
            )
            bars.append(bar)

    if not bars:
        return self._empty_run_bars_df()

    return pl.DataFrame(bars)

VolumeRunBarSampler

VolumeRunBarSampler(
    expected_ticks_per_bar,
    alpha=0.1,
    initial_p_buy=0.5,
    min_bars_warmup=10,
)

Bases: BarSampler

Sample bars based on cumulative volume runs (AFML-compliant).

AFML Chapter 2.3 formula with volume weighting: θ_T = max{Σ(buy volumes in bar), Σ(sell volumes in bar)} E[θ_T] = E[T] × max{P[b=1], 1-P[b=1]} × E[v]

Where E[v] is estimated from the data.

Parameters

expected_ticks_per_bar : int Expected number of ticks per bar alpha : float, default 0.1 EWMA decay factor initial_p_buy : float, default 0.5 Initial buy probability P[b=1] min_bars_warmup : int, default 10 Number of bars before starting EWMA updates

Examples

sampler = VolumeRunBarSampler(expected_ticks_per_bar=100) bars = sampler.sample(tick_data)

Source code in src/ml4t/engineer/bars/run.py
def __init__(
    self,
    expected_ticks_per_bar: int,
    alpha: float = 0.1,
    initial_p_buy: float = 0.5,
    min_bars_warmup: int = 10,
):
    if expected_ticks_per_bar <= 0:
        raise ValueError("expected_ticks_per_bar must be positive")
    if not 0 < alpha <= 1:
        raise ValueError("alpha must be in (0, 1]")
    if not 0 <= initial_p_buy <= 1:
        raise ValueError("initial_p_buy must be in [0, 1]")
    if min_bars_warmup < 0:
        raise ValueError("min_bars_warmup must be non-negative")

    self.expected_ticks_per_bar = expected_ticks_per_bar
    self.alpha = alpha
    self.initial_p_buy = initial_p_buy
    self.min_bars_warmup = min_bars_warmup

sample

sample(data, include_incomplete=False)

Sample volume run bars from data.

Source code in src/ml4t/engineer/bars/run.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample volume run bars from data."""
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Run bars require 'side' column")

    if len(data) == 0:
        return self._empty_run_bars_df()

    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Estimate initial E[v] for scaling the threshold
    warmup_size = min(1000, len(volumes))
    avg_volume = float(np.mean(volumes[:warmup_size]))

    # Scale expected_ticks_per_bar by average volume for volume-weighted runs
    initial_expected_t_scaled = float(self.expected_ticks_per_bar) * avg_volume

    # Calculate bar indices using AFML-compliant Numba function
    (
        bar_indices,
        thetas,
        expected_thetas,
        expected_ts,
        p_buys,
        cumulative_buys,
        cumulative_sells,
    ) = _calculate_run_bars_nb(
        volumes,  # Use volumes as values
        sides,
        initial_expected_t_scaled,
        self.initial_p_buy,
        self.alpha,
        self.min_bars_warmup,
    )

    # Build bars
    bars = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
        bar_volumes = volumes[start_idx : end_idx + 1]
        bar_sides = sides[start_idx : end_idx + 1]

        buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
        sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))

        bar = self._create_ohlcv_bar(
            bar_ticks,
            additional_cols={
                "buy_volume": buy_volume,
                "sell_volume": sell_volume,
                "run_volume": float(thetas[i]),  # For backward compat
                "expected_run": float(expected_thetas[i]),  # For backward compat
                # AFML diagnostic columns
                "theta": float(thetas[i]),
                "expected_theta": float(expected_thetas[i]),
                "expected_t": float(expected_ts[i]),
                "p_buy": float(p_buys[i]),
                "cumulative_buys": float(cumulative_buys[i]),
                "cumulative_sells": float(cumulative_sells[i]),
            },
        )
        bars.append(bar)

        start_idx = end_idx + 1

    # Handle incomplete bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar_volumes = volumes[start_idx:]
            bar_sides = sides[start_idx:]

            buy_vol = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_vol = float(np.sum(bar_volumes[bar_sides < 0]))

            # Calculate current theta (cumulative volumes)
            current_theta = max(buy_vol, sell_vol)

            last_expected_t = (
                expected_ts[-1] if len(expected_ts) > 0 else initial_expected_t_scaled
            )
            last_p_buy = p_buys[-1] if len(p_buys) > 0 else self.initial_p_buy
            expected_theta = last_expected_t * max(last_p_buy, 1 - last_p_buy)

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": buy_vol,
                    "sell_volume": sell_vol,
                    "run_volume": float(current_theta),
                    "expected_run": float(expected_theta),
                    "theta": float(current_theta),
                    "expected_theta": float(expected_theta),
                    "expected_t": float(last_expected_t),
                    "p_buy": float(last_p_buy),
                    "cumulative_buys": float(buy_vol),
                    "cumulative_sells": float(sell_vol),
                },
            )
            bars.append(bar)

    if not bars:
        return self._empty_run_bars_df()

    return pl.DataFrame(bars)

TickBarSamplerOriginal

TickBarSamplerOriginal(ticks_per_bar)

Bases: BarSampler

Sample bars based on number of ticks.

Tick bars sample the data every N ticks, providing a more stable sampling rate compared to time bars during varying market activity.

Parameters

ticks_per_bar : int Number of ticks per bar

Examples

sampler = TickBarSampler(ticks_per_bar=100) bars = sampler.sample(tick_data)

Initialize tick bar sampler.

Parameters

ticks_per_bar : int Number of ticks per bar

Source code in src/ml4t/engineer/bars/tick.py
def __init__(self, ticks_per_bar: int):
    """Initialize tick bar sampler.

    Parameters
    ----------
    ticks_per_bar : int
        Number of ticks per bar
    """
    if ticks_per_bar <= 0:
        raise ValueError("ticks_per_bar must be positive")

    self.ticks_per_bar = ticks_per_bar

sample

sample(data, include_incomplete=False)

Sample tick bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled tick bars

Source code in src/ml4t/engineer/bars/tick.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample tick bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled tick bars
    """
    # Validate input
    self._validate_data(data)

    if len(data) == 0:
        return pl.DataFrame()

    # Calculate bar indices
    n_ticks = len(data)
    n_complete_bars = n_ticks // self.ticks_per_bar

    bars = []

    # Process complete bars
    for i in range(n_complete_bars):
        start_idx = i * self.ticks_per_bar

        bar_ticks = data.slice(start_idx, self.ticks_per_bar)
        bar = self._create_ohlcv_bar(bar_ticks)
        bars.append(bar)

    # Handle incomplete final bar
    if include_incomplete and n_ticks % self.ticks_per_bar > 0:
        start_idx = n_complete_bars * self.ticks_per_bar
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar = self._create_ohlcv_bar(bar_ticks)
            bars.append(bar)

    # Convert to DataFrame
    if not bars:
        # Return empty DataFrame with correct schema
        return pl.DataFrame(
            {
                "timestamp": [],
                "open": [],
                "high": [],
                "low": [],
                "close": [],
                "volume": [],
                "tick_count": [],
            },
        )

    return pl.DataFrame(bars)

DollarBarSampler

DollarBarSampler(dollars_per_bar)

Bases: BarSampler

Vectorized dollar bar sampler using Polars.

Parameters

dollars_per_bar : float Target dollar value per bar

Source code in src/ml4t/engineer/bars/vectorized.py
def __init__(self, dollars_per_bar: float):
    if dollars_per_bar <= 0:
        raise ValueError("dollars_per_bar must be positive")
    self.dollars_per_bar = dollars_per_bar

sample

sample(data, include_incomplete=False)

Sample dollar bars using vectorized operations.

Source code in src/ml4t/engineer/bars/vectorized.py
def sample(self, data: pl.DataFrame, include_incomplete: bool = False) -> pl.DataFrame:
    """Sample dollar bars using vectorized operations."""
    self._validate_data(data)

    if len(data) == 0:
        return pl.DataFrame()

    # Step 1: Calculate dollar volumes and identify bar boundaries
    prices = data["price"].to_numpy()
    volumes = data["volume"].to_numpy()
    dollar_volumes = prices * volumes

    # Use JIT-compiled function for 10-100x speedup
    bar_ids = _assign_dollar_bar_ids(prices, volumes, self.dollars_per_bar)

    # Add bar IDs and dollar volume to dataframe
    df_with_bars = data.with_columns(
        [pl.Series("bar_id", bar_ids), pl.Series("dollar_volume", dollar_volumes)],
    )

    # Filter out incomplete final bar if requested
    if not include_incomplete:
        # Check if last bar is complete
        last_bar_id = bar_ids[-1]
        last_bar_dollars = df_with_bars.filter(pl.col("bar_id") == last_bar_id)[
            "dollar_volume"
        ].sum()
        if last_bar_dollars < self.dollars_per_bar:
            df_with_bars = df_with_bars.filter(pl.col("bar_id") < last_bar_id)
            if df_with_bars.is_empty():
                return self._empty_dollar_bars_df()

    # Step 2: Vectorized aggregation by bar_id
    bars = (
        df_with_bars.group_by("bar_id", maintain_order=True)
        .agg(
            [
                # OHLCV aggregations
                pl.col("timestamp").first().alias("timestamp"),
                pl.col("price").first().alias("open"),
                pl.col("price").max().alias("high"),
                pl.col("price").min().alias("low"),
                pl.col("price").last().alias("close"),
                pl.col("volume").sum().alias("volume"),
                pl.len().alias("tick_count"),
                # Dollar volume and VWAP calculation
                pl.col("dollar_volume").sum().alias("dollar_volume"),
                (pl.col("dollar_volume").sum() / pl.col("volume").sum()).alias("vwap"),
            ],
        )
        .sort("bar_id")
        .drop("bar_id")
    )

    return bars

ImbalanceBarSampler

ImbalanceBarSampler(
    expected_ticks_per_bar,
    alpha=0.1,
    initial_p_buy=0.5,
    min_bars_warmup=10,
)

Bases: BarSampler

Vectorized imbalance bar sampler with AFML-compliant adaptive thresholds.

This implementation uses vectorized operations for the main logic while keeping the adaptive threshold calculation efficient.

AFML Threshold Formula

E[θ_T] = E[T] × |2v⁺ - E[v]|

Where

E[T] = EWMA of bar lengths (ticks per bar) v⁺ = P[b=1] × E[v|b=1] = expected buy volume contribution E[v] = unconditional mean volume per tick

Parameters

expected_ticks_per_bar : int Expected number of ticks per bar (initializes E[T]) alpha : float, default 0.1 EWMA decay factor for updating expectations initial_p_buy : float, default 0.5 Initial buy probability P[b=1] min_bars_warmup : int, default 10 Number of bars before starting EWMA updates

Source code in src/ml4t/engineer/bars/vectorized.py
def __init__(
    self,
    expected_ticks_per_bar: int,
    alpha: float = 0.1,
    initial_p_buy: float = 0.5,
    min_bars_warmup: int = 10,
):
    if expected_ticks_per_bar <= 0:
        raise ValueError("expected_ticks_per_bar must be positive")
    if not 0 < alpha <= 1:
        raise ValueError("alpha must be in (0, 1]")
    if not 0 <= initial_p_buy <= 1:
        raise ValueError("initial_p_buy must be in [0, 1]")
    if min_bars_warmup < 0:
        raise ValueError("min_bars_warmup must be non-negative")

    self.expected_ticks_per_bar = expected_ticks_per_bar
    self.alpha = alpha
    self.initial_p_buy = initial_p_buy
    self.min_bars_warmup = min_bars_warmup

    # Will be estimated from data
    self._initial_v_buy: float | None = None
    self._initial_v: float | None = None

sample

sample(data, include_incomplete=False)

Sample imbalance bars using vectorized operations where possible.

Source code in src/ml4t/engineer/bars/vectorized.py
def sample(self, data: pl.DataFrame, include_incomplete: bool = False) -> pl.DataFrame:
    """Sample imbalance bars using vectorized operations where possible."""
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Imbalance bars require 'side' column")

    if len(data) == 0:
        return pl.DataFrame()

    # Extract arrays
    volumes = data["volume"].to_numpy().astype(np.float64)
    sides = data["side"].to_numpy().astype(np.float64)

    # Estimate initial values from data if not already set
    warmup_size = min(1000, len(volumes))
    warmup_volumes = volumes[:warmup_size]
    warmup_sides = sides[:warmup_size]

    if self._initial_v is None:
        self._initial_v = float(np.mean(warmup_volumes))

    if self._initial_v_buy is None:
        buy_mask = warmup_sides > 0
        if np.any(buy_mask):
            self._initial_v_buy = float(np.mean(warmup_volumes[buy_mask]))
        else:
            self._initial_v_buy = self._initial_v

    # Use the AFML-compliant Numba function for finding bar boundaries
    from .imbalance import _calculate_imbalance_bars_nb

    (
        bar_indices,
        expected_thetas,
        cumulative_thetas,
        expected_ts,
        p_buys,
        v_pluses,
        e_vs,
    ) = _calculate_imbalance_bars_nb(
        volumes,
        sides,
        float(self.expected_ticks_per_bar),
        self.initial_p_buy,
        self._initial_v_buy,
        self._initial_v,
        self.alpha,
        self.min_bars_warmup,
    )

    if len(bar_indices) == 0:
        if include_incomplete and len(data) > 0:
            # Return single incomplete bar
            return self._create_incomplete_imbalance_bar(data, volumes, sides)
        return self._empty_imbalance_bars_df()

    # Vectorized bar creation using bar boundaries
    bars_data = []
    start_idx = 0

    for i, end_idx in enumerate(bar_indices):
        # Create bar ID column for this segment
        segment_length = end_idx - start_idx + 1
        bar_segment = data.slice(start_idx, segment_length).with_columns(
            [
                pl.lit(i).alias("bar_id"),
                pl.lit(expected_thetas[i]).alias("expected_imbalance"),
                pl.lit(cumulative_thetas[i]).alias("cumulative_theta"),
                # AFML diagnostic columns
                pl.lit(expected_ts[i]).alias("expected_t"),
                pl.lit(p_buys[i]).alias("p_buy"),
                pl.lit(v_pluses[i]).alias("v_plus"),
                pl.lit(e_vs[i]).alias("e_v"),
            ],
        )
        bars_data.append(bar_segment)
        start_idx = end_idx + 1

    # Handle incomplete bar
    if include_incomplete and start_idx < len(data):
        # Calculate current cumulative theta for incomplete bar
        incomplete_volumes = volumes[start_idx:]
        incomplete_sides = sides[start_idx:]
        incomplete_theta = float(np.sum(incomplete_volumes * incomplete_sides))

        # Use last values or initial
        last_expected_t = (
            expected_ts[-1] if len(expected_ts) > 0 else float(self.expected_ticks_per_bar)
        )
        last_p_buy = p_buys[-1] if len(p_buys) > 0 else self.initial_p_buy
        last_v_plus = v_pluses[-1] if len(v_pluses) > 0 else last_p_buy * self._initial_v_buy
        last_e_v = e_vs[-1] if len(e_vs) > 0 else self._initial_v
        incomplete_expected = last_expected_t * abs(2 * last_v_plus - last_e_v)

        incomplete_segment = data.slice(start_idx).with_columns(
            [
                pl.lit(len(bar_indices)).alias("bar_id"),
                pl.lit(incomplete_expected).alias("expected_imbalance"),
                pl.lit(incomplete_theta).alias("cumulative_theta"),
                pl.lit(last_expected_t).alias("expected_t"),
                pl.lit(last_p_buy).alias("p_buy"),
                pl.lit(last_v_plus).alias("v_plus"),
                pl.lit(last_e_v).alias("e_v"),
            ],
        )
        bars_data.append(incomplete_segment)

    if not bars_data:
        return self._empty_imbalance_bars_df()

    # Combine all segments and group by bar_id
    combined_data = pl.concat(bars_data)

    bars = (
        combined_data.group_by("bar_id", maintain_order=True)
        .agg(
            [
                # OHLCV aggregations
                pl.col("timestamp").first().alias("timestamp"),
                pl.col("price").first().alias("open"),
                pl.col("price").max().alias("high"),
                pl.col("price").min().alias("low"),
                pl.col("price").last().alias("close"),
                pl.col("volume").sum().alias("volume"),
                pl.len().alias("tick_count"),
                # Imbalance-specific metrics using vectorized operations
                pl.col("volume")
                .filter(pl.col("side") > 0)
                .sum()
                .fill_null(0)
                .alias("buy_volume"),
                pl.col("volume")
                .filter(pl.col("side") < 0)
                .sum()
                .fill_null(0)
                .alias("sell_volume"),
                pl.col("expected_imbalance").first().alias("expected_imbalance"),
                pl.col("cumulative_theta").first().alias("cumulative_theta"),
                # AFML diagnostic columns
                pl.col("expected_t").first().alias("expected_t"),
                pl.col("p_buy").first().alias("p_buy"),
                pl.col("v_plus").first().alias("v_plus"),
                pl.col("e_v").first().alias("e_v"),
            ],
        )
        .with_columns(
            [
                # Calculate imbalance
                (pl.col("buy_volume") - pl.col("sell_volume")).alias("imbalance"),
            ],
        )
        .sort("bar_id")
        .drop("bar_id")
    )

    return bars

TickBarSampler

TickBarSampler(ticks_per_bar)

Bases: BarSampler

Vectorized tick bar sampler using Polars.

Parameters

ticks_per_bar : int Number of ticks per bar

Source code in src/ml4t/engineer/bars/vectorized.py
def __init__(self, ticks_per_bar: int):
    if ticks_per_bar <= 0:
        raise ValueError("ticks_per_bar must be positive")
    self.ticks_per_bar = ticks_per_bar

sample

sample(data, include_incomplete=False)

Sample tick bars using vectorized operations.

Source code in src/ml4t/engineer/bars/vectorized.py
def sample(self, data: pl.DataFrame, include_incomplete: bool = False) -> pl.DataFrame:
    """Sample tick bars using vectorized operations."""
    self._validate_data(data)

    if len(data) == 0:
        return pl.DataFrame()

    # Step 1: Add row indices and calculate bar IDs
    df_with_bars = data.with_row_index("row_idx").with_columns(
        [
            # Bar group IDs using integer division
            (pl.col("row_idx") // self.ticks_per_bar).cast(pl.Int32).alias("bar_id"),
        ],
    )

    # Filter out incomplete final bar if requested
    if not include_incomplete:
        # Calculate the maximum complete bar ID
        total_rows = len(df_with_bars)
        max_complete_bar = (total_rows // self.ticks_per_bar) - 1

        if max_complete_bar >= 0:
            df_with_bars = df_with_bars.filter(pl.col("bar_id") <= max_complete_bar)
        else:
            return self._empty_tick_bars_df()

    # Step 2: Vectorized aggregation by bar_id
    bars = (
        df_with_bars.group_by("bar_id", maintain_order=True)
        .agg(
            [
                # OHLCV aggregations
                pl.col("timestamp").first().alias("timestamp"),
                pl.col("price").first().alias("open"),
                pl.col("price").max().alias("high"),
                pl.col("price").min().alias("low"),
                pl.col("price").last().alias("close"),
                pl.col("volume").sum().alias("volume"),
                pl.len().alias("tick_count"),
            ],
        )
        .sort("bar_id")
        .drop("bar_id")
    )

    return bars

VolumeBarSampler

VolumeBarSampler(volume_per_bar)

Bases: BarSampler

Vectorized volume bar sampler using Polars.

This implementation replaces Python loops with vectorized Polars operations for dramatically improved performance on large datasets.

Parameters

volume_per_bar : float Target volume per bar

Source code in src/ml4t/engineer/bars/vectorized.py
def __init__(self, volume_per_bar: float):
    if volume_per_bar <= 0:
        raise ValueError("volume_per_bar must be positive")
    self.volume_per_bar = volume_per_bar

sample

sample(data, include_incomplete=False)

Sample volume bars using vectorized operations.

Source code in src/ml4t/engineer/bars/vectorized.py
def sample(self, data: pl.DataFrame, include_incomplete: bool = False) -> pl.DataFrame:
    """Sample volume bars using vectorized operations."""
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Volume bars require 'side' column")

    if len(data) == 0:
        return pl.DataFrame()

    # Step 1: Identify bar boundaries using Numba-optimized function
    volumes = data["volume"].to_numpy()

    # Use JIT-compiled function for 10-100x speedup
    bar_ids = _assign_volume_bar_ids(volumes, self.volume_per_bar)

    # Add bar IDs to dataframe
    df_with_bars = data.with_columns([pl.Series("bar_id", bar_ids)])

    # Filter out incomplete final bar if requested
    if not include_incomplete:
        # Check if last bar is complete
        last_bar_id = bar_ids[-1]
        last_bar_volume = df_with_bars.filter(pl.col("bar_id") == last_bar_id)["volume"].sum()
        if last_bar_volume < self.volume_per_bar:
            df_with_bars = df_with_bars.filter(pl.col("bar_id") < last_bar_id)
            if df_with_bars.is_empty():
                return self._empty_volume_bars_df()

    # Step 2: Group by bar_id and aggregate using vectorized operations
    bars = (
        df_with_bars.group_by("bar_id", maintain_order=True)
        .agg(
            [
                # OHLCV aggregations
                pl.col("timestamp").first().alias("timestamp"),
                pl.col("price").first().alias("open"),
                pl.col("price").max().alias("high"),
                pl.col("price").min().alias("low"),
                pl.col("price").last().alias("close"),
                pl.col("volume").sum().alias("volume"),
                pl.len().alias("tick_count"),
                # Buy/sell volume breakdown using vectorized operations
                pl.col("volume")
                .filter(pl.col("side") > 0)
                .sum()
                .fill_null(0)
                .alias("buy_volume"),
                pl.col("volume")
                .filter(pl.col("side") < 0)
                .sum()
                .fill_null(0)
                .alias("sell_volume"),
            ],
        )
        .sort("bar_id")
        .drop("bar_id")
    )

    return bars

DollarBarSamplerOriginal

DollarBarSamplerOriginal(dollars_per_bar)

Bases: BarSampler

Sample bars based on dollar value traded.

Dollar bars sample when the cumulative dollar value (price * volume) reaches a threshold, providing adaptive sampling based on both price and volume.

Parameters

dollars_per_bar : float Target dollar value per bar

Examples

sampler = DollarBarSampler(dollars_per_bar=1_000_000) bars = sampler.sample(tick_data)

Initialize dollar bar sampler.

Parameters

dollars_per_bar : float Target dollar value per bar

Source code in src/ml4t/engineer/bars/volume.py
def __init__(self, dollars_per_bar: float):
    """Initialize dollar bar sampler.

    Parameters
    ----------
    dollars_per_bar : float
        Target dollar value per bar
    """
    if dollars_per_bar <= 0:
        raise ValueError("dollars_per_bar must be positive")

    self.dollars_per_bar = dollars_per_bar

sample

sample(data, include_incomplete=False)

Sample dollar bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled dollar bars with VWAP

Source code in src/ml4t/engineer/bars/volume.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample dollar bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled dollar bars with VWAP
    """
    # Validate input
    self._validate_data(data)

    if len(data) == 0:
        return pl.DataFrame()

    # Calculate dollar volumes
    prices = data["price"].to_numpy()
    volumes = data["volume"].to_numpy()
    dollar_volumes = prices * volumes

    bars = []
    current_dollars = 0
    start_idx = 0

    for i in range(len(data)):
        current_dollars += dollar_volumes[i]

        # Check if we've reached the threshold
        if current_dollars >= self.dollars_per_bar:
            # Extract bar data
            bar_ticks = data.slice(start_idx, i - start_idx + 1)

            # Calculate VWAP
            bar_prices = prices[start_idx : i + 1]
            bar_volumes = volumes[start_idx : i + 1]
            bar_dollars = dollar_volumes[start_idx : i + 1]

            total_volume: float = float(np.sum(bar_volumes))
            if total_volume > 0:
                vwap = float(np.sum(bar_dollars) / total_volume)
            else:
                vwap = float(np.mean(bar_prices))

            # Create bar
            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "dollar_volume": float(np.sum(bar_dollars)),
                    "vwap": float(vwap),
                },
            )
            bars.append(bar)

            # Reset for next bar
            current_dollars = 0
            start_idx = i + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar_prices = prices[start_idx:]
            bar_volumes = volumes[start_idx:]
            bar_dollars = dollar_volumes[start_idx:]

            total_vol_incomplete: float = float(np.sum(bar_volumes))
            if total_vol_incomplete > 0:
                vwap_incomplete = float(np.sum(bar_dollars) / total_vol_incomplete)
            else:
                vwap_incomplete = float(np.mean(bar_prices))

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "dollar_volume": float(np.sum(bar_dollars)),
                    "vwap": float(vwap_incomplete),
                },
            )
            bars.append(bar)

    # Convert to DataFrame
    if not bars:
        return pl.DataFrame(
            {
                "timestamp": [],
                "open": [],
                "high": [],
                "low": [],
                "close": [],
                "volume": [],
                "tick_count": [],
                "dollar_volume": [],
                "vwap": [],
            },
        )

    return pl.DataFrame(bars)

VolumeBarSamplerOriginal

VolumeBarSamplerOriginal(volume_per_bar)

Bases: BarSampler

Sample bars based on volume traded.

Volume bars sample when the cumulative volume reaches a threshold, providing more samples during high activity periods.

Parameters

volume_per_bar : float Target volume per bar

Examples

sampler = VolumeBarSampler(volume_per_bar=10000) bars = sampler.sample(tick_data)

Initialize volume bar sampler.

Parameters

volume_per_bar : float Target volume per bar

Source code in src/ml4t/engineer/bars/volume.py
def __init__(self, volume_per_bar: float):
    """Initialize volume bar sampler.

    Parameters
    ----------
    volume_per_bar : float
        Target volume per bar
    """
    if volume_per_bar <= 0:
        raise ValueError("volume_per_bar must be positive")

    self.volume_per_bar = volume_per_bar

sample

sample(data, include_incomplete=False)

Sample volume bars from data.

Parameters

data : pl.DataFrame Tick data with columns: timestamp, price, volume, side include_incomplete : bool, default False Whether to include incomplete final bar

Returns

pl.DataFrame Sampled volume bars with buy/sell volume breakdown

Source code in src/ml4t/engineer/bars/volume.py
def sample(
    self,
    data: pl.DataFrame,
    include_incomplete: bool = False,
) -> pl.DataFrame:
    """Sample volume bars from data.

    Parameters
    ----------
    data : pl.DataFrame
        Tick data with columns: timestamp, price, volume, side
    include_incomplete : bool, default False
        Whether to include incomplete final bar

    Returns
    -------
    pl.DataFrame
        Sampled volume bars with buy/sell volume breakdown
    """
    # Validate input
    self._validate_data(data)

    if "side" not in data.columns:
        raise DataValidationError("Volume bars require 'side' column")

    if len(data) == 0:
        return pl.DataFrame()

    # Convert to numpy for efficient processing
    volumes = data["volume"].to_numpy()
    sides = data["side"].to_numpy()

    bars = []
    current_volume: float = 0.0
    start_idx = 0

    for i in range(len(data)):
        current_volume += volumes[i]

        # Check if we've reached the threshold
        if current_volume >= self.volume_per_bar:
            # Extract bar data
            bar_ticks = data.slice(start_idx, i - start_idx + 1)

            # Calculate buy/sell volumes
            bar_sides = sides[start_idx : i + 1]
            bar_volumes = volumes[start_idx : i + 1]

            buy_vol: float = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_vol: float = float(np.sum(bar_volumes[bar_sides < 0]))

            # Create bar
            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": float(buy_vol),
                    "sell_volume": float(sell_vol),
                },
            )
            bars.append(bar)

            # Reset for next bar
            current_volume = 0
            start_idx = i + 1

    # Handle incomplete final bar
    if include_incomplete and start_idx < len(data):
        bar_ticks = data.slice(start_idx)

        if len(bar_ticks) > 0:
            bar_sides = sides[start_idx:]
            bar_volumes = volumes[start_idx:]

            buy_vol_incomplete: float = float(np.sum(bar_volumes[bar_sides > 0]))
            sell_vol_incomplete: float = float(np.sum(bar_volumes[bar_sides < 0]))

            bar = self._create_ohlcv_bar(
                bar_ticks,
                additional_cols={
                    "buy_volume": float(buy_vol_incomplete),
                    "sell_volume": float(sell_vol_incomplete),
                },
            )
            bars.append(bar)

    # Convert to DataFrame
    if not bars:
        return pl.DataFrame(
            {
                "timestamp": [],
                "open": [],
                "high": [],
                "low": [],
                "close": [],
                "volume": [],
                "tick_count": [],
                "buy_volume": [],
                "sell_volume": [],
            },
        )

    return pl.DataFrame(bars)

Next Steps

  • Read Features for the main computation workflow.
  • Read Labeling for supervised target construction.
  • Read Alternative Bars for information-driven sampling.
  • Use the Book Guide to map these APIs back to the book and case studies.