Home / Libraries / Docs
ML4T Libraries
Documentation
Skip to content

API Reference

This reference is organized by stable import surface and by conceptual family.

Use case Import surface
Notebook and exploratory work ml4t.models
Structural model protocols ml4t.models.api
Batch and result contracts ml4t.models.types
Config objects ml4t.models.configs
Cross-library handoff ml4t.models.integration

Package Root

The package root re-exports the main model classes, configs, batches, results, and integration helpers.

models

Public package surface for ml4t-models.

AssetMapper

Bases: Protocol

Protocol for mapping factor forecasts back to asset forecasts.

AssetPredictionModel

Bases: Protocol

Protocol for direct asset-level predictive models.

FactorForecaster

Bases: Protocol

Protocol for factor-premium forecasters.

LatentFactorModel

Bases: Protocol

Protocol for structural latent-factor estimators.

PortfolioModel

Bases: Protocol

Protocol for end-to-end portfolio learners.

PortfolioPostprocessor

Bases: Protocol

Protocol for portfolio-weight post-processing hooks.

StochasticDiscountFactorEstimator

Bases: Protocol

Protocol for stochastic discount factor models with weight-native outputs.

SAEModel

SAEModel(config)

Bases: BaseAssetPredictionModel[SAEConfig]

Checkpointed supervised autoencoder for direct asset prediction.

Source code in src/ml4t/models/asset_prediction/sae.py
def __init__(self, config: SAEConfig) -> None:
    super().__init__(config)
    self._checkpoint_states: dict[int, dict[str, Any]] = {}
    self._n_features: int | None = None
    self._asset_ids: tuple[str, ...] = ()
    self._history: tuple[dict[str, float | str], ...] = ()

AR1ForecasterConfig dataclass

AR1ForecasterConfig(
    seed=42, device="cpu", dtype="float64", model_name="ar1"
)

Bases: BaseModelConfig

Config for per-factor AR(1) forecasts.

AssetPredictionConfig dataclass

AssetPredictionConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="asset_prediction",
    task_type="regression",
)

Bases: BaseModelConfig

Shared configuration for direct asset-prediction models.

CAEConfig dataclass

CAEConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="cae",
    n_factors=5,
    persistent_entities=False,
    task_type="regression",
    hidden_units=(32,),
    n_ensemble=1,
    n_epochs=50,
    checkpoint_interval=5,
    checkpoint_epochs=(),
    default_checkpoint=None,
    lr=0.001,
    lambda_l1=0.0001,
)

Bases: LatentFactorConfig

Config for conditional autoencoders.

DeepPortfolioConfig dataclass

DeepPortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="deep_portfolio",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
    d_model=64,
    n_heads=2,
    lstm_layers=1,
    temporal_mha_layers=1,
    cross_attention_heads=2,
    cross_attention_lag=1,
    macro_gnn_heads=2,
    adapter_hidden_mult=2,
)

Bases: PortfolioConfig

Config for DeePM-style end-to-end portfolio learners.

EWMABaseForecasterConfig dataclass

EWMABaseForecasterConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="ewma",
    half_life=12.0,
)

Bases: BaseModelConfig

Config for EWMA factor-premium forecasts.

ExpandingMeanForecasterConfig dataclass

ExpandingMeanForecasterConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="expanding_mean",
)

Bases: BaseModelConfig

Config for the historical-mean factor-premium baseline.

IPCAConfig dataclass

IPCAConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="ipca",
    n_factors=5,
    persistent_entities=False,
    max_iter=100,
    tol=1e-06,
    factor_ridge=1e-06,
    gamma_ridge=1e-06,
)

Bases: LatentFactorConfig

Config for IPCA.

LatentFactorConfig dataclass

LatentFactorConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="latent_factor",
    n_factors=5,
    persistent_entities=False,
)

Bases: BaseModelConfig

Shared latent-factor configuration.

LinearPortfolioConfig dataclass

LinearPortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="linear_portfolio",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
    ridge_alpha=0.0001,
    fit_intercept=True,
    gross_exposure=1.0,
    net_exposure=0.0,
    max_abs_weight=None,
)

Bases: PortfolioConfig

Config for a pooled linear feature portfolio baseline.

LSTMPortfolioConfig dataclass

LSTMPortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="lstm_portfolio",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
    hidden_size=64,
    n_layers=1,
)

Bases: PortfolioConfig

Starter config for a sequence-based portfolio learner.

MapperConfig dataclass

MapperConfig(model_name='beta_lambda')

Config for asset-return or weight mappers.

PCAConfig dataclass

PCAConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="pca",
    n_factors=5,
    persistent_entities=True,
)

Bases: LatentFactorConfig

Config for PCA and related persistent-panel baselines.

PipelineConfig dataclass

PipelineConfig(
    latent_factor_model,
    factor_forecaster,
    asset_mapper="beta_lambda",
)

Declarative description of a latent-factor forecast pipeline.

PortfolioConfig dataclass

PortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="portfolio_model",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
)

Bases: BaseModelConfig

Base config for portfolio-learning models.

RPPCAConfig dataclass

RPPCAConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="rp_pca",
    n_factors=5,
    persistent_entities=True,
    gamma=0.0,
    base_moment="covariance",
    scale_by_asset_volatility=False,
    normalize_loadings="unit_length",
    orthogonalize_factors=False,
)

Bases: LatentFactorConfig

Config for risk-premium-aware PCA.

SAEConfig dataclass

SAEConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="sae",
    task_type="regression",
    bottleneck_dim=96,
    aux_hidden_dim=96,
    main_hidden_units=(896, 448, 448, 256),
    dropout_rates=None,
    noise_std=0.035,
    alpha=1.0,
    aux_weight=1.0,
    n_epochs=50,
    batch_size=None,
    checkpoint_interval=5,
    checkpoint_epochs=(),
    default_checkpoint=None,
    lr=0.0001,
)

Bases: AssetPredictionConfig

Config for supervised autoencoder predictors.

StochasticDiscountFactorConfig dataclass

StochasticDiscountFactorConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="stochastic_discount_factor",
    output_mode="weights",
    state_dim_sdf=4,
    state_dim_moment=32,
    hidden_dim=64,
    n_instruments=8,
    dropout=0.05,
    n_epochs_unc=256,
    n_epochs_moment=64,
    n_epochs_cond=1024,
    checkpoint_interval=None,
    checkpoint_epochs=(),
    default_checkpoint=None,
    expected_return_mapper="linear",
    beta_state_dim=4,
    beta_hidden_dim=64,
    beta_n_epochs=256,
    beta_checkpoint_interval=None,
    beta_checkpoint_epochs=(),
    beta_default_checkpoint=None,
    beta_lr=0.001,
    burn_in_epochs=0,
    lr=0.001,
    weight_decay=0.0,
)

Bases: BaseModelConfig

Config for stochastic discount factor networks.

AR1FactorForecaster

AR1FactorForecaster(config=None)

Bases: BaseFactorForecaster[AR1ForecasterConfig]

Forecast factor premia with independent AR(1) models.

Source code in src/ml4t/models/forecasters/ar.py
def __init__(self, config: AR1ForecasterConfig | None = None) -> None:
    super().__init__(config or AR1ForecasterConfig())
    self._intercepts: np.ndarray | None = None
    self._slopes: np.ndarray | None = None
    self._last_values: np.ndarray | None = None
    self._fallback_mean: np.ndarray | None = None

EWMABaseFactorForecaster

EWMABaseFactorForecaster(config=None)

Bases: BaseFactorForecaster[EWMABaseForecasterConfig]

Forecast factor premia with exponentially weighted moving averages.

Source code in src/ml4t/models/forecasters/ewma.py
def __init__(self, config: EWMABaseForecasterConfig | None = None) -> None:
    super().__init__(config or EWMABaseForecasterConfig())
    self._ewma_level: np.ndarray | None = None

ExpandingMeanFactorForecaster

ExpandingMeanFactorForecaster(config=None)

Bases: BaseFactorForecaster[ExpandingMeanForecasterConfig]

Forecast factor premia with the training-sample mean.

Source code in src/ml4t/models/forecasters/mean.py
def __init__(self, config: ExpandingMeanForecasterConfig | None = None) -> None:
    super().__init__(config or ExpandingMeanForecasterConfig())
    self._mean_factor_premium: np.ndarray | None = None

BacktestDataFeedInputs dataclass

BacktestDataFeedInputs(
    feed_spec,
    prices_frame=None,
    prices_path=None,
    signals=None,
    context=None,
    metadata=dict(),
)

Structured handoff payload for ml4t.backtest.DataFeed.

to_datafeed_kwargs

to_datafeed_kwargs()

Return kwargs compatible with ml4t.backtest.DataFeed.

Source code in src/ml4t/models/integration/backtest.py
def to_datafeed_kwargs(self) -> dict[str, Any]:
    """Return kwargs compatible with ``ml4t.backtest.DataFeed``."""

    kwargs: dict[str, Any] = {"feed_spec": dict(self.feed_spec)}
    if self.prices_frame is not None:
        kwargs["prices_df"] = self.prices_frame
    if self.prices_path is not None:
        kwargs["prices_path"] = str(self.prices_path)
    if self.signals is not None:
        kwargs["signals_df"] = self.signals.to_polars()
    if self.context is not None:
        kwargs["context_df"] = self.context.to_polars()
    return kwargs

ContextFrame dataclass

ContextFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Wide context features for backtest handoff.

PredictionsFrame dataclass

PredictionsFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Long-format asset prediction results.

ResolvedDatasetSchema dataclass

ResolvedDatasetSchema(
    timestamp_col, entity_col, metadata=dict()
)

Resolved timestamp/entity column contract for a tabular dataset.

ResultsFrame dataclass

ResultsFrame(columns, rows, metadata=dict())

Tabular model results with optional export helpers.

to_dicts

to_dicts()

Return the frame as a list of row dictionaries.

Source code in src/ml4t/models/integration/surfaces.py
def to_dicts(self) -> list[dict[str, Any]]:
    """Return the frame as a list of row dictionaries."""

    return [dict(zip(self.columns, row, strict=True)) for row in self.rows]

to_columnar

to_columnar()

Return the frame as columnar Python lists.

Source code in src/ml4t/models/integration/surfaces.py
def to_columnar(self) -> dict[str, list[Any]]:
    """Return the frame as columnar Python lists."""

    data = {column: [] for column in self.columns}
    for row in self.rows:
        for column, value in zip(self.columns, row, strict=True):
            data[column].append(value)
    return data

to_polars

to_polars()

Return the frame as a Polars DataFrame when Polars is installed.

Source code in src/ml4t/models/integration/surfaces.py
def to_polars(self) -> Any:
    """Return the frame as a Polars DataFrame when Polars is installed."""

    pl = _import_polars()
    return pl.DataFrame(self.to_dicts())

write_parquet

write_parquet(path, *, compression='zstd')

Write the frame to parquet when Polars is installed.

Source code in src/ml4t/models/integration/surfaces.py
def write_parquet(self, path: str | Path, *, compression: str = "zstd") -> Path:
    """Write the frame to parquet when Polars is installed."""

    output_path = Path(path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    self.to_polars().write_parquet(output_path, compression=compression)
    return output_path

SignalsFrame dataclass

SignalsFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Long-format asset or portfolio signal results.

WeightsFrame dataclass

WeightsFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Long-format target-weight results.

CAEModel

CAEModel(config)

Bases: BaseLatentFactorModel[CAEConfig]

Conditional autoencoder with checkpoint-aware structural extraction.

Source code in src/ml4t/models/latent_factors/cae.py
def __init__(self, config: CAEConfig) -> None:
    super().__init__(config)
    self._checkpoint_states: dict[int, list[dict[str, Any]]] = {}
    self._asset_ids: tuple[str, ...] = ()
    self._n_characteristics: int | None = None
    self._n_instruments: int | None = None
    self._history: tuple[dict[str, float | str], ...] = ()

IPCAModel

IPCAModel(config)

Bases: BaseLatentFactorModel[IPCAConfig]

Instrumented PCA structural extractor for ragged cross-sections.

Source code in src/ml4t/models/latent_factors/ipca.py
def __init__(self, config: IPCAConfig) -> None:
    super().__init__(config)
    self._gamma: np.ndarray | None = None
    self._train_factor_returns: np.ndarray | None = None
    self._asset_ids: tuple[str, ...] = ()
    self._n_features: int | None = None
    self._fit_iterations = 0
    self._fit_converged = False

PCAModel

PCAModel(config)

Bases: BaseLatentFactorModel[PCAConfig]

Persistent-panel PCA structural extractor.

Source code in src/ml4t/models/latent_factors/pca.py
def __init__(self, config: PCAConfig) -> None:
    super().__init__(config)
    self._asset_mean: np.ndarray | None = None
    self._loadings: np.ndarray | None = None
    self._train_factor_returns: np.ndarray | None = None
    self._asset_ids: tuple[str, ...] = ()

RPPCAModel

RPPCAModel(config)

Bases: BaseLatentFactorModel[RPPCAConfig]

Persistent-panel RP-PCA structural extractor.

Source code in src/ml4t/models/latent_factors/rp_pca.py
def __init__(self, config: RPPCAConfig) -> None:
    super().__init__(config)
    self._asset_betas: np.ndarray | None = None
    self._factor_weights: np.ndarray | None = None
    self._train_factor_returns: np.ndarray | None = None
    self._eigenvalues: np.ndarray | None = None
    self._asset_ids: tuple[str, ...] = ()

BetaLambdaMapper

BetaLambdaMapper(config=None)

Bases: BaseAssetMapper

Map factor forecasts to asset returns via beta times factor premium.

Source code in src/ml4t/models/mappers/base.py
def __init__(self, config: MapperConfig | None = None) -> None:
    self.config = config or MapperConfig()

LatentFactorForecastPipeline

LatentFactorForecastPipeline(model, forecaster, mapper)

Compose structural extraction, factor forecasting, and asset mapping.

Source code in src/ml4t/models/pipelines.py
def __init__(
    self,
    model: LatentFactorModel,
    forecaster: FactorForecaster,
    mapper: AssetMapper,
) -> None:
    self.model = model
    self.forecaster = forecaster
    self.mapper = mapper

PortfolioAllocationPipeline

PortfolioAllocationPipeline(model, *, postprocessors=())

Compose a portfolio model with optional weight post-processing hooks.

Source code in src/ml4t/models/pipelines.py
def __init__(
    self,
    model: PortfolioModel,
    *,
    postprocessors: tuple[PortfolioPostprocessor, ...] = (),
) -> None:
    self.model = model
    self.postprocessors = postprocessors

PortfolioPipelineFitResult dataclass

PortfolioPipelineFitResult(model_fit)

Fit summaries for a portfolio-allocation pipeline.

LinearStochasticDiscountFactorReturnMapper

LinearStochasticDiscountFactorReturnMapper()

Map stochastic discount factor weights to expected returns via a fitted linear projection.

Source code in src/ml4t/models/stochastic_discount_factor/mapper.py
def __init__(self) -> None:
    self._intercept = 0.0
    self._slope = 0.0
    self._is_fitted = False

StochasticDiscountFactorBetaNetworkHead

StochasticDiscountFactorBetaNetworkHead(config)

Paper-faithful beta-network predictive head for stochastic discount factor models.

Source code in src/ml4t/models/stochastic_discount_factor/mapper.py
def __init__(self, config: StochasticDiscountFactorConfig) -> None:
    self.config = config
    self._checkpoint_states: dict[int, dict[str, Any]] = {}
    self._n_asset_features: int | None = None
    self._n_context_features: int = 0
    self._asset_ids: tuple[str, ...] = ()
    self._f_hat_scale: float = 1.0
    self._history: tuple[dict[str, float | str], ...] = ()

StochasticDiscountFactorModel

StochasticDiscountFactorModel(config)

Bases: BaseStochasticDiscountFactorModel

Stochastic discount factor model with weight-native structural outputs.

Source code in src/ml4t/models/stochastic_discount_factor/model.py
def __init__(self, config: StochasticDiscountFactorConfig) -> None:
    super().__init__(config)
    self._checkpoint_states: dict[int, dict[str, dict[str, Any]]] = {}
    self._asset_ids: tuple[str, ...] = ()
    self._n_characteristics: int | None = None
    self._n_context_features: int = 0
    self._history: tuple[dict[str, float | str], ...] = ()

AssetForecastResult dataclass

AssetForecastResult(
    expected_returns,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Asset-level expected-return forecasts.

AssetSignalResult dataclass

AssetSignalResult(
    signal_values,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Asset-level predictive signals.

AssetWeightsResult dataclass

AssetWeightsResult(
    weights, timestamps=(), asset_ids=(), metadata=dict()
)

Cross-sectional asset-weight output indexed by date and asset.

CrossSectionBatch dataclass

CrossSectionBatch(
    characteristics,
    returns=None,
    factor_returns=None,
    context_features=None,
    timestamps=(),
    asset_ids=(),
    mask=None,
    metadata=dict(),
)

Dated observed cross-sections with a date-local slot axis.

FactorForecastResult dataclass

FactorForecastResult(
    factor_premia, timestamps=(), metadata=dict()
)

Forecast of latent factor premia.

FitSummary dataclass

FitSummary(
    converged,
    train_metrics=dict(),
    val_metrics=dict(),
    best_epoch=None,
    history=(),
    notes=(),
)

Fit outcome for a model or forecaster.

LatentFactorPrediction dataclass

LatentFactorPrediction(
    state, factor_forecast, asset_forecast
)

Full prediction bundle from a latent-factor pipeline.

LatentFactorState dataclass

LatentFactorState(
    asset_betas,
    factor_returns=None,
    checkpoint_epoch=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Structural latent-factor state extracted from a batch.

PersistentPanelBatch dataclass

PersistentPanelBatch(
    returns=None,
    characteristics=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Stable-entity panel for models such as PCA and RP-PCA.

PortfolioPrediction dataclass

PortfolioPrediction(raw_weights, processed_weights)

Full prediction bundle from a portfolio-allocation pipeline.

PortfolioSequenceBatch dataclass

PortfolioSequenceBatch(
    features,
    returns=None,
    vol_scale=None,
    prev_weights=None,
    mask=None,
    group_ids=None,
    costs=None,
    adjacency_mask=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Sequence batch for end-to-end portfolio learners.

PortfolioWeightsResult dataclass

PortfolioWeightsResult(
    weights,
    checkpoint_step=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Portfolio-weight output for end-to-end allocators.

StochasticDiscountFactorState dataclass

StochasticDiscountFactorState(
    asset_weights,
    sdf_values=None,
    checkpoint_epoch=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Structural state extracted from a stochastic discount factor model.

backtest_datafeed_inputs

backtest_datafeed_inputs(
    *,
    prices_frame=None,
    prices_path=None,
    signals=None,
    context=None,
    schema=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    metadata=None,
)

Build a structured DataFeed handoff from model outputs and market-data metadata.

Source code in src/ml4t/models/integration/backtest.py
def backtest_datafeed_inputs(
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    signals: PredictionsFrame | SignalsFrame | WeightsFrame | None = None,
    context: ContextFrame | None = None,
    schema: Any | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build a structured ``DataFeed`` handoff from model outputs and market-data metadata."""

    if prices_frame is None and prices_path is None:
        raise ValueError("Provide either prices_frame or prices_path")
    if prices_frame is not None and prices_path is not None:
        raise ValueError("Provide prices_frame or prices_path, not both")

    feed_spec = resolve_feed_spec_mapping(
        prices_frame,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
    )
    combined_metadata = dict(metadata or {})
    if signals is not None:
        combined_metadata.setdefault("signal_frame_type", signals.metadata.get("frame_type"))
    return BacktestDataFeedInputs(
        feed_spec=feed_spec,
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=signals,
        context=context,
        metadata=combined_metadata,
    )

backtest_inputs_from_asset_forecast

backtest_inputs_from_asset_forecast(
    forecast,
    *,
    prices_frame=None,
    prices_path=None,
    schema=None,
    context=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    constants=None,
    metadata=None,
)

Build DataFeed inputs directly from an asset-forecast result.

Source code in src/ml4t/models/integration/backtest.py
def backtest_inputs_from_asset_forecast(
    forecast: AssetForecastResult,
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    schema: Any | None = None,
    context: ContextFrame | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    constants: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build ``DataFeed`` inputs directly from an asset-forecast result."""

    return backtest_datafeed_inputs(
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=predictions_frame_from_asset_forecast(forecast, constants=constants),
        context=context,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
        metadata=metadata,
    )

backtest_inputs_from_asset_signal

backtest_inputs_from_asset_signal(
    signal,
    *,
    prices_frame=None,
    prices_path=None,
    schema=None,
    context=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    constants=None,
    metadata=None,
)

Build DataFeed inputs directly from an asset-signal result.

Source code in src/ml4t/models/integration/backtest.py
def backtest_inputs_from_asset_signal(
    signal: AssetSignalResult,
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    schema: Any | None = None,
    context: ContextFrame | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    constants: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build ``DataFeed`` inputs directly from an asset-signal result."""

    return backtest_datafeed_inputs(
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=predictions_frame_from_asset_signal(signal, constants=constants),
        context=context,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
        metadata=metadata,
    )

backtest_inputs_from_weights

backtest_inputs_from_weights(
    weights,
    *,
    prices_frame=None,
    prices_path=None,
    schema=None,
    as_context=False,
    context_prefix="w_",
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    constants=None,
    metadata=None,
)

Build DataFeed inputs directly from target-weight outputs.

Source code in src/ml4t/models/integration/backtest.py
def backtest_inputs_from_weights(
    weights: AssetWeightsResult | PortfolioWeightsResult,
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    schema: Any | None = None,
    as_context: bool = False,
    context_prefix: str = "w_",
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    constants: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build ``DataFeed`` inputs directly from target-weight outputs."""

    signals = None if as_context else _weights_frame(weights, constants=constants)
    context = context_frame_from_weights(weights, prefix=context_prefix, constants=constants)
    return backtest_datafeed_inputs(
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=signals,
        context=context if as_context else None,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
        metadata=metadata,
    )

context_frame_from_weights

context_frame_from_weights(
    weights, *, prefix="w_", constants=None
)

Convert asset weights to a wide context frame for backtest strategies.

Source code in src/ml4t/models/integration/surfaces.py
def context_frame_from_weights(
    weights: AssetWeightsResult | PortfolioWeightsResult,
    *,
    prefix: str = "w_",
    constants: dict[str, Any] | None = None,
) -> ContextFrame:
    """Convert asset weights to a wide context frame for backtest strategies."""

    weight_matrix, timestamps, assets = _resolve_weight_matrix(weights)
    constant_columns = tuple((constants or {}).keys())
    columns = ("timestamp", *(f"{prefix}{asset}" for asset in assets), *constant_columns)
    rows: list[tuple[Any, ...]] = []

    for t_idx, timestamp in enumerate(timestamps):
        values = [
            float(weight_matrix[t_idx, a_idx]) if np.isfinite(weight_matrix[t_idx, a_idx]) else 0.0
            for a_idx in range(len(assets))
        ]
        rows.append((timestamp, *values, *tuple((constants or {}).values())))

    metadata = {"frame_type": "context", **weights.metadata}
    if isinstance(weights, PortfolioWeightsResult) and weights.checkpoint_step is not None:
        metadata["checkpoint_step"] = weights.checkpoint_step

    return ContextFrame(columns=columns, rows=tuple(rows), metadata=metadata)

cross_section_batch_from_long_frame

cross_section_batch_from_long_frame(
    frame,
    *,
    schema=None,
    feature_cols,
    return_col=None,
    context_cols=(),
    timestamp_col=None,
    entity_col=None,
    metadata=None,
)

Build a ragged cross-sectional batch from a long-format frame.

Source code in src/ml4t/models/integration/data.py
def cross_section_batch_from_long_frame(
    frame: Any,
    *,
    schema: Any | None = None,
    feature_cols: Sequence[str],
    return_col: str | None = None,
    context_cols: Sequence[str] = (),
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    metadata: Mapping[str, Any] | None = None,
) -> CrossSectionBatch:
    """Build a ragged cross-sectional batch from a long-format frame."""

    resolved = resolve_dataset_schema(
        frame,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
    )
    records = _sorted_records(
        frame, timestamp_col=resolved.timestamp_col, entity_col=resolved.entity_col
    )
    timestamps = tuple(_ordered_unique(record[resolved.timestamp_col] for record in records))
    grouped_assets = {
        timestamp: [record for record in records if record[resolved.timestamp_col] == timestamp]
        for timestamp in timestamps
    }
    max_assets = max((len(group) for group in grouped_assets.values()), default=0)
    characteristics = np.full(
        (len(timestamps), max_assets, len(feature_cols)),
        np.nan,
        dtype=np.float64,
    )
    returns = (
        np.full((len(timestamps), max_assets), np.nan, dtype=np.float64)
        if return_col is not None
        else None
    )
    mask = np.zeros((len(timestamps), max_assets), dtype=bool)
    asset_ids = tuple(f"slot_{idx}" for idx in range(max_assets))

    context_features = None
    if context_cols:
        context_features = np.full((len(timestamps), len(context_cols)), np.nan, dtype=np.float64)

    for t_idx, timestamp in enumerate(timestamps):
        records_t = grouped_assets[timestamp]
        for slot_idx, record in enumerate(records_t):
            mask[t_idx, slot_idx] = True
            for f_idx, feature_col in enumerate(feature_cols):
                value = record[feature_col]
                characteristics[t_idx, slot_idx, f_idx] = (
                    float(value) if _is_finite(value) else np.nan
                )
            if returns is not None and return_col is not None:
                value = record[return_col]
                returns[t_idx, slot_idx] = float(value) if _is_finite(value) else np.nan

        if context_features is not None and records_t:
            for c_idx, context_col in enumerate(context_cols):
                values = np.asarray([record[context_col] for record in records_t], dtype=object)
                finite_values = [value for value in values if _is_finite(value)]
                if not finite_values:
                    context_features[t_idx, c_idx] = np.nan
                    continue
                first_value = float(finite_values[0])
                if any(abs(float(value) - first_value) > 1e-12 for value in finite_values[1:]):
                    raise ValueError(
                        f"context column {context_col!r} must be constant within timestamp "
                        f"{timestamp!r}"
                    )
                context_features[t_idx, c_idx] = first_value

    combined_metadata = dict(metadata or {})
    combined_metadata.update(resolved.metadata)
    return CrossSectionBatch(
        characteristics=characteristics,
        returns=returns,
        context_features=context_features,
        timestamps=timestamps,
        asset_ids=asset_ids,
        mask=mask,
        metadata=combined_metadata,
    )

persistent_panel_batch_from_long_frame

persistent_panel_batch_from_long_frame(
    frame,
    *,
    schema=None,
    return_col=None,
    feature_cols=(),
    timestamp_col=None,
    entity_col=None,
    metadata=None,
)

Build a persistent panel batch from a long-format frame.

Source code in src/ml4t/models/integration/data.py
def persistent_panel_batch_from_long_frame(
    frame: Any,
    *,
    schema: Any | None = None,
    return_col: str | None = None,
    feature_cols: Sequence[str] = (),
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    metadata: Mapping[str, Any] | None = None,
) -> PersistentPanelBatch:
    """Build a persistent panel batch from a long-format frame."""

    resolved = resolve_dataset_schema(
        frame,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
    )
    records = _sorted_records(
        frame, timestamp_col=resolved.timestamp_col, entity_col=resolved.entity_col
    )
    timestamps = tuple(_ordered_unique(record[resolved.timestamp_col] for record in records))
    asset_ids = tuple(
        str(asset) for asset in _ordered_unique(record[resolved.entity_col] for record in records)
    )
    time_index = {timestamp: idx for idx, timestamp in enumerate(timestamps)}
    asset_index = {asset: idx for idx, asset in enumerate(asset_ids)}

    returns = None
    if return_col is not None:
        returns = np.full((len(timestamps), len(asset_ids)), np.nan, dtype=np.float64)

    characteristics = None
    if feature_cols:
        characteristics = np.full(
            (len(timestamps), len(asset_ids), len(feature_cols)),
            np.nan,
            dtype=np.float64,
        )

    seen: set[tuple[Any, str]] = set()
    for record in records:
        key = (record[resolved.timestamp_col], str(record[resolved.entity_col]))
        if key in seen:
            raise ValueError(
                f"Duplicate (timestamp, entity) row encountered in long-format panel data: {key}"
            )
        seen.add(key)
        t_idx = time_index[record[resolved.timestamp_col]]
        a_idx = asset_index[str(record[resolved.entity_col])]
        if returns is not None and return_col is not None:
            return_value = record[return_col]
            returns[t_idx, a_idx] = float(return_value) if _is_finite(return_value) else np.nan
        if characteristics is not None:
            for f_idx, feature_col in enumerate(feature_cols):
                value = record[feature_col]
                characteristics[t_idx, a_idx, f_idx] = float(value) if _is_finite(value) else np.nan

    combined_metadata = dict(metadata or {})
    combined_metadata.update(resolved.metadata)
    return PersistentPanelBatch(
        returns=returns,
        characteristics=characteristics,
        timestamps=timestamps,
        asset_ids=asset_ids,
        metadata=combined_metadata,
    )

predictions_frame_from_asset_forecast

predictions_frame_from_asset_forecast(
    forecast, *, constants=None
)

Convert asset expected returns to a diagnostic-ready predictions frame.

Source code in src/ml4t/models/integration/surfaces.py
def predictions_frame_from_asset_forecast(
    forecast: AssetForecastResult,
    *,
    constants: dict[str, Any] | None = None,
) -> PredictionsFrame:
    """Convert asset expected returns to a diagnostic-ready predictions frame."""

    expected_returns = np.asarray(forecast.expected_returns, dtype=np.float64)
    timestamps = _resolve_timestamps(expected_returns.shape[0], forecast.timestamps)
    assets = _resolve_assets(expected_returns.shape[1], forecast.asset_ids)
    constant_columns = tuple((constants or {}).keys())
    rows: list[tuple[Any, ...]] = []

    for t_idx, timestamp in enumerate(timestamps):
        for a_idx, asset in enumerate(assets):
            value = expected_returns[t_idx, a_idx]
            if not np.isfinite(value):
                continue
            rows.append(
                (
                    timestamp,
                    asset,
                    float(value),
                    *tuple((constants or {}).values()),
                )
            )

    return PredictionsFrame(
        columns=("timestamp", "asset", "prediction_value", *constant_columns),
        rows=tuple(rows),
        metadata={"frame_type": "prediction", **forecast.metadata},
    )

predictions_frame_from_asset_signal

predictions_frame_from_asset_signal(
    signal, *, constants=None
)

Convert asset-level signals to a diagnostic-ready predictions frame.

Source code in src/ml4t/models/integration/surfaces.py
def predictions_frame_from_asset_signal(
    signal: AssetSignalResult,
    *,
    constants: dict[str, Any] | None = None,
) -> PredictionsFrame:
    """Convert asset-level signals to a diagnostic-ready predictions frame."""

    signal_values = np.asarray(signal.signal_values, dtype=np.float64)
    timestamps = _resolve_timestamps(signal_values.shape[0], signal.timestamps)
    assets = _resolve_assets(signal_values.shape[1], signal.asset_ids)
    constant_columns = tuple((constants or {}).keys())
    rows: list[tuple[Any, ...]] = []

    for t_idx, timestamp in enumerate(timestamps):
        for a_idx, asset in enumerate(assets):
            value = signal_values[t_idx, a_idx]
            if not np.isfinite(value):
                continue
            rows.append(
                (
                    timestamp,
                    asset,
                    float(value),
                    *tuple((constants or {}).values()),
                )
            )

    return PredictionsFrame(
        columns=("timestamp", "asset", "prediction_value", *constant_columns),
        rows=tuple(rows),
        metadata={"frame_type": "prediction", **signal.metadata},
    )

resolve_dataset_schema

resolve_dataset_schema(
    frame,
    *,
    schema=None,
    timestamp_col=None,
    entity_col=None,
    timestamp_candidates=(
        "timestamp",
        "datetime",
        "date",
        "time",
    ),
    entity_candidates=(
        "asset",
        "symbol",
        "ticker",
        "instrument",
        "security",
    ),
)

Resolve timestamp and entity columns from explicit names or ML4T-style metadata.

Source code in src/ml4t/models/integration/data.py
def resolve_dataset_schema(
    frame: Any,
    *,
    schema: Any | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    timestamp_candidates: Sequence[str] = ("timestamp", "datetime", "date", "time"),
    entity_candidates: Sequence[str] = ("asset", "symbol", "ticker", "instrument", "security"),
) -> ResolvedDatasetSchema:
    """Resolve timestamp and entity columns from explicit names or ML4T-style metadata."""

    columns = tuple(_frame_columns(frame))
    inferred_schema = _coerce_schema(schema)

    resolved_timestamp = (
        timestamp_col
        or inferred_schema.get("timestamp_col")
        or _first_present(columns, timestamp_candidates)
    )
    if resolved_timestamp is None:
        raise ValueError(
            f"Could not resolve a timestamp column from columns {list(columns)}. "
            f"Expected one of {tuple(timestamp_candidates)} or explicit schema metadata."
        )
    if resolved_timestamp not in columns:
        raise ValueError(
            f"Resolved timestamp column {resolved_timestamp!r} not found in columns {list(columns)}"
        )

    resolved_entity = (
        entity_col
        or inferred_schema.get("entity_col")
        or _first_present(columns, entity_candidates)
    )
    if resolved_entity is None:
        raise ValueError(
            f"Could not resolve an entity column from columns {list(columns)}. "
            f"Expected one of {tuple(entity_candidates)} or explicit schema metadata."
        )
    if resolved_entity not in columns:
        raise ValueError(
            f"Resolved entity column {resolved_entity!r} not found in columns {list(columns)}"
        )

    return ResolvedDatasetSchema(
        timestamp_col=resolved_timestamp,
        entity_col=resolved_entity,
        metadata=inferred_schema,
    )

resolve_feed_spec_mapping

resolve_feed_spec_mapping(
    frame=None,
    *,
    schema=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
)

Resolve a FeedSpec-compatible mapping from schema metadata and overrides.

Source code in src/ml4t/models/integration/backtest.py
def resolve_feed_spec_mapping(
    frame: Any | None = None,
    *,
    schema: Any | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
) -> dict[str, Any]:
    """Resolve a ``FeedSpec``-compatible mapping from schema metadata and overrides."""

    spec_mapping = _coerce_feed_spec_mapping(schema)

    if frame is not None and _supports_schema_resolution(frame):
        resolved_schema = resolve_dataset_schema(
            frame,
            schema=schema,
            timestamp_col=timestamp_col,
            entity_col=entity_col,
        )
        spec_mapping["timestamp_col"] = resolved_schema.timestamp_col
        spec_mapping["entity_col"] = resolved_schema.entity_col
    else:
        if timestamp_col is not None:
            spec_mapping["timestamp_col"] = timestamp_col
        else:
            spec_mapping.setdefault("timestamp_col", spec_mapping.get("timestamp_col", "timestamp"))
        if entity_col is not None:
            spec_mapping["entity_col"] = entity_col
        else:
            spec_mapping.setdefault("entity_col", spec_mapping.get("entity_col", "asset"))

    overrides = {
        "price_col": price_col,
        "open_col": open_col,
        "high_col": high_col,
        "low_col": low_col,
        "close_col": close_col,
        "volume_col": volume_col,
        "bid_col": bid_col,
        "ask_col": ask_col,
        "mid_col": mid_col,
        "bid_size_col": bid_size_col,
        "ask_size_col": ask_size_col,
        "calendar": calendar,
        "timezone": timezone,
        "data_frequency": data_frequency,
        "bar_type": bar_type,
        "timestamp_semantics": timestamp_semantics,
        "session_start_time": session_start_time,
    }
    for key, value in overrides.items():
        if value is not None:
            spec_mapping[key] = value

    if "close_col" in spec_mapping and "price_col" not in spec_mapping:
        spec_mapping["price_col"] = spec_mapping["close_col"]
    spec_mapping.setdefault("price_col", "close")
    spec_mapping.setdefault("open_col", "open")
    spec_mapping.setdefault("high_col", "high")
    spec_mapping.setdefault("low_col", "low")
    spec_mapping.setdefault("close_col", spec_mapping["price_col"])
    spec_mapping.setdefault("volume_col", "volume")
    return spec_mapping

signals_frame_from_asset_weights

signals_frame_from_asset_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert cross-sectional asset weights to a diagnostic-ready signals frame.

Source code in src/ml4t/models/integration/surfaces.py
def signals_frame_from_asset_weights(
    weights: AssetWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> SignalsFrame:
    """Convert cross-sectional asset weights to a diagnostic-ready signals frame."""

    return _frame_from_asset_weights(
        weights,
        value_column="signal_value",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="signal",
        frame_cls=SignalsFrame,
    )

signals_frame_from_portfolio_weights

signals_frame_from_portfolio_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert portfolio weights to a diagnostic-ready signals frame.

Source code in src/ml4t/models/integration/surfaces.py
def signals_frame_from_portfolio_weights(
    weights: PortfolioWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> SignalsFrame:
    """Convert portfolio weights to a diagnostic-ready signals frame."""

    return _frame_from_portfolio_weights(
        weights,
        value_column="signal_value",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="signal",
        frame_cls=SignalsFrame,
    )

weights_frame_from_asset_weights

weights_frame_from_asset_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert cross-sectional asset weights to a backtest-ready weights frame.

Source code in src/ml4t/models/integration/surfaces.py
def weights_frame_from_asset_weights(
    weights: AssetWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> WeightsFrame:
    """Convert cross-sectional asset weights to a backtest-ready weights frame."""

    return _frame_from_asset_weights(
        weights,
        value_column="weight",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="weight",
        frame_cls=WeightsFrame,
    )

weights_frame_from_portfolio_weights

weights_frame_from_portfolio_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert portfolio weights to a backtest-ready weights frame.

Source code in src/ml4t/models/integration/surfaces.py
def weights_frame_from_portfolio_weights(
    weights: PortfolioWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> WeightsFrame:
    """Convert portfolio weights to a backtest-ready weights frame."""

    return _frame_from_portfolio_weights(
        weights,
        value_column="weight",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="weight",
        frame_cls=WeightsFrame,
    )

write_backtest_frames

write_backtest_frames(
    artifact_dir,
    *,
    predictions=None,
    weights=None,
    compression="zstd",
)

Write standardized prediction and weight artifacts for downstream ML4T tooling.

Source code in src/ml4t/models/integration/surfaces.py
def write_backtest_frames(
    artifact_dir: str | Path,
    *,
    predictions: PredictionsFrame | None = None,
    weights: WeightsFrame | None = None,
    compression: str = "zstd",
) -> dict[str, Path]:
    """Write standardized prediction and weight artifacts for downstream ML4T tooling."""

    output_dir = Path(artifact_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    written: dict[str, Path] = {}
    if predictions is not None:
        written["predictions"] = predictions.write_parquet(
            output_dir / "predictions.parquet",
            compression=compression,
        )
    if weights is not None:
        written["weights"] = weights.write_parquet(
            output_dir / "weights.parquet",
            compression=compression,
        )
    return written

Protocols

api

Public protocols for ml4t-models.

LatentFactorModel

Bases: Protocol

Protocol for structural latent-factor estimators.

FactorForecaster

Bases: Protocol

Protocol for factor-premium forecasters.

AssetMapper

Bases: Protocol

Protocol for mapping factor forecasts back to asset forecasts.

AssetPredictionModel

Bases: Protocol

Protocol for direct asset-level predictive models.

StochasticDiscountFactorEstimator

Bases: Protocol

Protocol for stochastic discount factor models with weight-native outputs.

PortfolioModel

Bases: Protocol

Protocol for end-to-end portfolio learners.

PortfolioPostprocessor

Bases: Protocol

Protocol for portfolio-weight post-processing hooks.

Typed Contracts

types

Typed batches and result objects for ml4t-models.

PersistentPanelBatch dataclass

PersistentPanelBatch(
    returns=None,
    characteristics=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Stable-entity panel for models such as PCA and RP-PCA.

CrossSectionBatch dataclass

CrossSectionBatch(
    characteristics,
    returns=None,
    factor_returns=None,
    context_features=None,
    timestamps=(),
    asset_ids=(),
    mask=None,
    metadata=dict(),
)

Dated observed cross-sections with a date-local slot axis.

PortfolioSequenceBatch dataclass

PortfolioSequenceBatch(
    features,
    returns=None,
    vol_scale=None,
    prev_weights=None,
    mask=None,
    group_ids=None,
    costs=None,
    adjacency_mask=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Sequence batch for end-to-end portfolio learners.

FitSummary dataclass

FitSummary(
    converged,
    train_metrics=dict(),
    val_metrics=dict(),
    best_epoch=None,
    history=(),
    notes=(),
)

Fit outcome for a model or forecaster.

LatentFactorState dataclass

LatentFactorState(
    asset_betas,
    factor_returns=None,
    checkpoint_epoch=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Structural latent-factor state extracted from a batch.

FactorForecastResult dataclass

FactorForecastResult(
    factor_premia, timestamps=(), metadata=dict()
)

Forecast of latent factor premia.

AssetForecastResult dataclass

AssetForecastResult(
    expected_returns,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Asset-level expected-return forecasts.

AssetSignalResult dataclass

AssetSignalResult(
    signal_values,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Asset-level predictive signals.

AssetWeightsResult dataclass

AssetWeightsResult(
    weights, timestamps=(), asset_ids=(), metadata=dict()
)

Cross-sectional asset-weight output indexed by date and asset.

StochasticDiscountFactorState dataclass

StochasticDiscountFactorState(
    asset_weights,
    sdf_values=None,
    checkpoint_epoch=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Structural state extracted from a stochastic discount factor model.

PortfolioWeightsResult dataclass

PortfolioWeightsResult(
    weights,
    checkpoint_step=None,
    timestamps=(),
    asset_ids=(),
    metadata=dict(),
)

Portfolio-weight output for end-to-end allocators.

LatentFactorPrediction dataclass

LatentFactorPrediction(
    state, factor_forecast, asset_forecast
)

Full prediction bundle from a latent-factor pipeline.

PortfolioPrediction dataclass

PortfolioPrediction(raw_weights, processed_weights)

Full prediction bundle from a portfolio-allocation pipeline.

Configs

configs

Public config dataclasses.

AssetPredictionConfig dataclass

AssetPredictionConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="asset_prediction",
    task_type="regression",
)

Bases: BaseModelConfig

Shared configuration for direct asset-prediction models.

SAEConfig dataclass

SAEConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="sae",
    task_type="regression",
    bottleneck_dim=96,
    aux_hidden_dim=96,
    main_hidden_units=(896, 448, 448, 256),
    dropout_rates=None,
    noise_std=0.035,
    alpha=1.0,
    aux_weight=1.0,
    n_epochs=50,
    batch_size=None,
    checkpoint_interval=5,
    checkpoint_epochs=(),
    default_checkpoint=None,
    lr=0.0001,
)

Bases: AssetPredictionConfig

Config for supervised autoencoder predictors.

BaseModelConfig dataclass

BaseModelConfig(seed=42, device='cpu', dtype='float64')

Common configuration for ML4T models.

AR1ForecasterConfig dataclass

AR1ForecasterConfig(
    seed=42, device="cpu", dtype="float64", model_name="ar1"
)

Bases: BaseModelConfig

Config for per-factor AR(1) forecasts.

EWMABaseForecasterConfig dataclass

EWMABaseForecasterConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="ewma",
    half_life=12.0,
)

Bases: BaseModelConfig

Config for EWMA factor-premium forecasts.

ExpandingMeanForecasterConfig dataclass

ExpandingMeanForecasterConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="expanding_mean",
)

Bases: BaseModelConfig

Config for the historical-mean factor-premium baseline.

CAEConfig dataclass

CAEConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="cae",
    n_factors=5,
    persistent_entities=False,
    task_type="regression",
    hidden_units=(32,),
    n_ensemble=1,
    n_epochs=50,
    checkpoint_interval=5,
    checkpoint_epochs=(),
    default_checkpoint=None,
    lr=0.001,
    lambda_l1=0.0001,
)

Bases: LatentFactorConfig

Config for conditional autoencoders.

IPCAConfig dataclass

IPCAConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="ipca",
    n_factors=5,
    persistent_entities=False,
    max_iter=100,
    tol=1e-06,
    factor_ridge=1e-06,
    gamma_ridge=1e-06,
)

Bases: LatentFactorConfig

Config for IPCA.

LatentFactorConfig dataclass

LatentFactorConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="latent_factor",
    n_factors=5,
    persistent_entities=False,
)

Bases: BaseModelConfig

Shared latent-factor configuration.

PCAConfig dataclass

PCAConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="pca",
    n_factors=5,
    persistent_entities=True,
)

Bases: LatentFactorConfig

Config for PCA and related persistent-panel baselines.

RPPCAConfig dataclass

RPPCAConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="rp_pca",
    n_factors=5,
    persistent_entities=True,
    gamma=0.0,
    base_moment="covariance",
    scale_by_asset_volatility=False,
    normalize_loadings="unit_length",
    orthogonalize_factors=False,
)

Bases: LatentFactorConfig

Config for risk-premium-aware PCA.

StochasticDiscountFactorConfig dataclass

StochasticDiscountFactorConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="stochastic_discount_factor",
    output_mode="weights",
    state_dim_sdf=4,
    state_dim_moment=32,
    hidden_dim=64,
    n_instruments=8,
    dropout=0.05,
    n_epochs_unc=256,
    n_epochs_moment=64,
    n_epochs_cond=1024,
    checkpoint_interval=None,
    checkpoint_epochs=(),
    default_checkpoint=None,
    expected_return_mapper="linear",
    beta_state_dim=4,
    beta_hidden_dim=64,
    beta_n_epochs=256,
    beta_checkpoint_interval=None,
    beta_checkpoint_epochs=(),
    beta_default_checkpoint=None,
    beta_lr=0.001,
    burn_in_epochs=0,
    lr=0.001,
    weight_decay=0.0,
)

Bases: BaseModelConfig

Config for stochastic discount factor networks.

MapperConfig dataclass

MapperConfig(model_name='beta_lambda')

Config for asset-return or weight mappers.

PipelineConfig dataclass

PipelineConfig(
    latent_factor_model,
    factor_forecaster,
    asset_mapper="beta_lambda",
)

Declarative description of a latent-factor forecast pipeline.

DeepPortfolioConfig dataclass

DeepPortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="deep_portfolio",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
    d_model=64,
    n_heads=2,
    lstm_layers=1,
    temporal_mha_layers=1,
    cross_attention_heads=2,
    cross_attention_lag=1,
    macro_gnn_heads=2,
    adapter_hidden_mult=2,
)

Bases: PortfolioConfig

Config for DeePM-style end-to-end portfolio learners.

LinearPortfolioConfig dataclass

LinearPortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="linear_portfolio",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
    ridge_alpha=0.0001,
    fit_intercept=True,
    gross_exposure=1.0,
    net_exposure=0.0,
    max_abs_weight=None,
)

Bases: PortfolioConfig

Config for a pooled linear feature portfolio baseline.

LSTMPortfolioConfig dataclass

LSTMPortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="lstm_portfolio",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
    hidden_size=64,
    n_layers=1,
)

Bases: PortfolioConfig

Starter config for a sequence-based portfolio learner.

PortfolioConfig dataclass

PortfolioConfig(
    seed=42,
    device="cpu",
    dtype="float64",
    model_name="portfolio_model",
    turnover_penalty=0.0,
    dropout=0.1,
    asset_embedding_dim=8,
    group_embedding_dim=4,
    use_group_embedding=True,
    use_cost_in_context=True,
    vvsn_hidden_dim=64,
    batch_size=16,
    learning_rate=0.0001,
    weight_decay=0.0001,
    max_grad_norm=1.0,
    annualization_factor=252.0,
    sharpe_eps=1e-08,
    gamma_cost=0.5,
    softmin_tau=0.2,
    softmin_lambda=0.1,
    burn_in=0,
    max_iters=200,
    eval_every=10,
    metric_ema_alpha=0.45,
    metric_min_delta=0.001,
    early_stopping_patience=20,
    early_stopping_burn_in_iters=20,
    checkpoint_every=10,
    checkpoint_steps=(),
    default_checkpoint=None,
)

Bases: BaseModelConfig

Base config for portfolio-learning models.

Pipelines

pipelines

Composable pipelines for finance-native model workflows.

LatentFactorForecastPipeline

LatentFactorForecastPipeline(model, forecaster, mapper)

Compose structural extraction, factor forecasting, and asset mapping.

Source code in src/ml4t/models/pipelines.py
def __init__(
    self,
    model: LatentFactorModel,
    forecaster: FactorForecaster,
    mapper: AssetMapper,
) -> None:
    self.model = model
    self.forecaster = forecaster
    self.mapper = mapper

PipelineFitResult dataclass

PipelineFitResult(structural_fit, factor_forecast_fit)

Fit summaries for each stage of a latent-factor pipeline.

PortfolioAllocationPipeline

PortfolioAllocationPipeline(model, *, postprocessors=())

Compose a portfolio model with optional weight post-processing hooks.

Source code in src/ml4t/models/pipelines.py
def __init__(
    self,
    model: PortfolioModel,
    *,
    postprocessors: tuple[PortfolioPostprocessor, ...] = (),
) -> None:
    self.model = model
    self.postprocessors = postprocessors

PortfolioPipelineFitResult dataclass

PortfolioPipelineFitResult(model_fit)

Fit summaries for a portfolio-allocation pipeline.

Integration

integration

Integration helpers for cross-library data contracts.

BacktestDataFeedInputs dataclass

BacktestDataFeedInputs(
    feed_spec,
    prices_frame=None,
    prices_path=None,
    signals=None,
    context=None,
    metadata=dict(),
)

Structured handoff payload for ml4t.backtest.DataFeed.

to_datafeed_kwargs

to_datafeed_kwargs()

Return kwargs compatible with ml4t.backtest.DataFeed.

Source code in src/ml4t/models/integration/backtest.py
def to_datafeed_kwargs(self) -> dict[str, Any]:
    """Return kwargs compatible with ``ml4t.backtest.DataFeed``."""

    kwargs: dict[str, Any] = {"feed_spec": dict(self.feed_spec)}
    if self.prices_frame is not None:
        kwargs["prices_df"] = self.prices_frame
    if self.prices_path is not None:
        kwargs["prices_path"] = str(self.prices_path)
    if self.signals is not None:
        kwargs["signals_df"] = self.signals.to_polars()
    if self.context is not None:
        kwargs["context_df"] = self.context.to_polars()
    return kwargs

ResolvedDatasetSchema dataclass

ResolvedDatasetSchema(
    timestamp_col, entity_col, metadata=dict()
)

Resolved timestamp/entity column contract for a tabular dataset.

ContextFrame dataclass

ContextFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Wide context features for backtest handoff.

PredictionsFrame dataclass

PredictionsFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Long-format asset prediction results.

ResultsFrame dataclass

ResultsFrame(columns, rows, metadata=dict())

Tabular model results with optional export helpers.

to_dicts

to_dicts()

Return the frame as a list of row dictionaries.

Source code in src/ml4t/models/integration/surfaces.py
def to_dicts(self) -> list[dict[str, Any]]:
    """Return the frame as a list of row dictionaries."""

    return [dict(zip(self.columns, row, strict=True)) for row in self.rows]

to_columnar

to_columnar()

Return the frame as columnar Python lists.

Source code in src/ml4t/models/integration/surfaces.py
def to_columnar(self) -> dict[str, list[Any]]:
    """Return the frame as columnar Python lists."""

    data = {column: [] for column in self.columns}
    for row in self.rows:
        for column, value in zip(self.columns, row, strict=True):
            data[column].append(value)
    return data

to_polars

to_polars()

Return the frame as a Polars DataFrame when Polars is installed.

Source code in src/ml4t/models/integration/surfaces.py
def to_polars(self) -> Any:
    """Return the frame as a Polars DataFrame when Polars is installed."""

    pl = _import_polars()
    return pl.DataFrame(self.to_dicts())

write_parquet

write_parquet(path, *, compression='zstd')

Write the frame to parquet when Polars is installed.

Source code in src/ml4t/models/integration/surfaces.py
def write_parquet(self, path: str | Path, *, compression: str = "zstd") -> Path:
    """Write the frame to parquet when Polars is installed."""

    output_path = Path(path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    self.to_polars().write_parquet(output_path, compression=compression)
    return output_path

SignalsFrame dataclass

SignalsFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Long-format asset or portfolio signal results.

WeightsFrame dataclass

WeightsFrame(columns, rows, metadata=dict())

Bases: ResultsFrame

Long-format target-weight results.

backtest_datafeed_inputs

backtest_datafeed_inputs(
    *,
    prices_frame=None,
    prices_path=None,
    signals=None,
    context=None,
    schema=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    metadata=None,
)

Build a structured DataFeed handoff from model outputs and market-data metadata.

Source code in src/ml4t/models/integration/backtest.py
def backtest_datafeed_inputs(
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    signals: PredictionsFrame | SignalsFrame | WeightsFrame | None = None,
    context: ContextFrame | None = None,
    schema: Any | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build a structured ``DataFeed`` handoff from model outputs and market-data metadata."""

    if prices_frame is None and prices_path is None:
        raise ValueError("Provide either prices_frame or prices_path")
    if prices_frame is not None and prices_path is not None:
        raise ValueError("Provide prices_frame or prices_path, not both")

    feed_spec = resolve_feed_spec_mapping(
        prices_frame,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
    )
    combined_metadata = dict(metadata or {})
    if signals is not None:
        combined_metadata.setdefault("signal_frame_type", signals.metadata.get("frame_type"))
    return BacktestDataFeedInputs(
        feed_spec=feed_spec,
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=signals,
        context=context,
        metadata=combined_metadata,
    )

backtest_inputs_from_asset_forecast

backtest_inputs_from_asset_forecast(
    forecast,
    *,
    prices_frame=None,
    prices_path=None,
    schema=None,
    context=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    constants=None,
    metadata=None,
)

Build DataFeed inputs directly from an asset-forecast result.

Source code in src/ml4t/models/integration/backtest.py
def backtest_inputs_from_asset_forecast(
    forecast: AssetForecastResult,
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    schema: Any | None = None,
    context: ContextFrame | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    constants: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build ``DataFeed`` inputs directly from an asset-forecast result."""

    return backtest_datafeed_inputs(
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=predictions_frame_from_asset_forecast(forecast, constants=constants),
        context=context,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
        metadata=metadata,
    )

backtest_inputs_from_asset_signal

backtest_inputs_from_asset_signal(
    signal,
    *,
    prices_frame=None,
    prices_path=None,
    schema=None,
    context=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    constants=None,
    metadata=None,
)

Build DataFeed inputs directly from an asset-signal result.

Source code in src/ml4t/models/integration/backtest.py
def backtest_inputs_from_asset_signal(
    signal: AssetSignalResult,
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    schema: Any | None = None,
    context: ContextFrame | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    constants: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build ``DataFeed`` inputs directly from an asset-signal result."""

    return backtest_datafeed_inputs(
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=predictions_frame_from_asset_signal(signal, constants=constants),
        context=context,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
        metadata=metadata,
    )

backtest_inputs_from_weights

backtest_inputs_from_weights(
    weights,
    *,
    prices_frame=None,
    prices_path=None,
    schema=None,
    as_context=False,
    context_prefix="w_",
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
    constants=None,
    metadata=None,
)

Build DataFeed inputs directly from target-weight outputs.

Source code in src/ml4t/models/integration/backtest.py
def backtest_inputs_from_weights(
    weights: AssetWeightsResult | PortfolioWeightsResult,
    *,
    prices_frame: Any | None = None,
    prices_path: str | Path | None = None,
    schema: Any | None = None,
    as_context: bool = False,
    context_prefix: str = "w_",
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
    constants: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
) -> BacktestDataFeedInputs:
    """Build ``DataFeed`` inputs directly from target-weight outputs."""

    signals = None if as_context else _weights_frame(weights, constants=constants)
    context = context_frame_from_weights(weights, prefix=context_prefix, constants=constants)
    return backtest_datafeed_inputs(
        prices_frame=prices_frame,
        prices_path=prices_path,
        signals=signals,
        context=context if as_context else None,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
        price_col=price_col,
        open_col=open_col,
        high_col=high_col,
        low_col=low_col,
        close_col=close_col,
        volume_col=volume_col,
        bid_col=bid_col,
        ask_col=ask_col,
        mid_col=mid_col,
        bid_size_col=bid_size_col,
        ask_size_col=ask_size_col,
        calendar=calendar,
        timezone=timezone,
        data_frequency=data_frequency,
        bar_type=bar_type,
        timestamp_semantics=timestamp_semantics,
        session_start_time=session_start_time,
        metadata=metadata,
    )

resolve_feed_spec_mapping

resolve_feed_spec_mapping(
    frame=None,
    *,
    schema=None,
    timestamp_col=None,
    entity_col=None,
    price_col=None,
    open_col=None,
    high_col=None,
    low_col=None,
    close_col=None,
    volume_col=None,
    bid_col=None,
    ask_col=None,
    mid_col=None,
    bid_size_col=None,
    ask_size_col=None,
    calendar=None,
    timezone=None,
    data_frequency=None,
    bar_type=None,
    timestamp_semantics=None,
    session_start_time=None,
)

Resolve a FeedSpec-compatible mapping from schema metadata and overrides.

Source code in src/ml4t/models/integration/backtest.py
def resolve_feed_spec_mapping(
    frame: Any | None = None,
    *,
    schema: Any | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    price_col: str | None = None,
    open_col: str | None = None,
    high_col: str | None = None,
    low_col: str | None = None,
    close_col: str | None = None,
    volume_col: str | None = None,
    bid_col: str | None = None,
    ask_col: str | None = None,
    mid_col: str | None = None,
    bid_size_col: str | None = None,
    ask_size_col: str | None = None,
    calendar: str | None = None,
    timezone: str | None = None,
    data_frequency: Any | None = None,
    bar_type: str | None = None,
    timestamp_semantics: str | None = None,
    session_start_time: str | None = None,
) -> dict[str, Any]:
    """Resolve a ``FeedSpec``-compatible mapping from schema metadata and overrides."""

    spec_mapping = _coerce_feed_spec_mapping(schema)

    if frame is not None and _supports_schema_resolution(frame):
        resolved_schema = resolve_dataset_schema(
            frame,
            schema=schema,
            timestamp_col=timestamp_col,
            entity_col=entity_col,
        )
        spec_mapping["timestamp_col"] = resolved_schema.timestamp_col
        spec_mapping["entity_col"] = resolved_schema.entity_col
    else:
        if timestamp_col is not None:
            spec_mapping["timestamp_col"] = timestamp_col
        else:
            spec_mapping.setdefault("timestamp_col", spec_mapping.get("timestamp_col", "timestamp"))
        if entity_col is not None:
            spec_mapping["entity_col"] = entity_col
        else:
            spec_mapping.setdefault("entity_col", spec_mapping.get("entity_col", "asset"))

    overrides = {
        "price_col": price_col,
        "open_col": open_col,
        "high_col": high_col,
        "low_col": low_col,
        "close_col": close_col,
        "volume_col": volume_col,
        "bid_col": bid_col,
        "ask_col": ask_col,
        "mid_col": mid_col,
        "bid_size_col": bid_size_col,
        "ask_size_col": ask_size_col,
        "calendar": calendar,
        "timezone": timezone,
        "data_frequency": data_frequency,
        "bar_type": bar_type,
        "timestamp_semantics": timestamp_semantics,
        "session_start_time": session_start_time,
    }
    for key, value in overrides.items():
        if value is not None:
            spec_mapping[key] = value

    if "close_col" in spec_mapping and "price_col" not in spec_mapping:
        spec_mapping["price_col"] = spec_mapping["close_col"]
    spec_mapping.setdefault("price_col", "close")
    spec_mapping.setdefault("open_col", "open")
    spec_mapping.setdefault("high_col", "high")
    spec_mapping.setdefault("low_col", "low")
    spec_mapping.setdefault("close_col", spec_mapping["price_col"])
    spec_mapping.setdefault("volume_col", "volume")
    return spec_mapping

cross_section_batch_from_long_frame

cross_section_batch_from_long_frame(
    frame,
    *,
    schema=None,
    feature_cols,
    return_col=None,
    context_cols=(),
    timestamp_col=None,
    entity_col=None,
    metadata=None,
)

Build a ragged cross-sectional batch from a long-format frame.

Source code in src/ml4t/models/integration/data.py
def cross_section_batch_from_long_frame(
    frame: Any,
    *,
    schema: Any | None = None,
    feature_cols: Sequence[str],
    return_col: str | None = None,
    context_cols: Sequence[str] = (),
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    metadata: Mapping[str, Any] | None = None,
) -> CrossSectionBatch:
    """Build a ragged cross-sectional batch from a long-format frame."""

    resolved = resolve_dataset_schema(
        frame,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
    )
    records = _sorted_records(
        frame, timestamp_col=resolved.timestamp_col, entity_col=resolved.entity_col
    )
    timestamps = tuple(_ordered_unique(record[resolved.timestamp_col] for record in records))
    grouped_assets = {
        timestamp: [record for record in records if record[resolved.timestamp_col] == timestamp]
        for timestamp in timestamps
    }
    max_assets = max((len(group) for group in grouped_assets.values()), default=0)
    characteristics = np.full(
        (len(timestamps), max_assets, len(feature_cols)),
        np.nan,
        dtype=np.float64,
    )
    returns = (
        np.full((len(timestamps), max_assets), np.nan, dtype=np.float64)
        if return_col is not None
        else None
    )
    mask = np.zeros((len(timestamps), max_assets), dtype=bool)
    asset_ids = tuple(f"slot_{idx}" for idx in range(max_assets))

    context_features = None
    if context_cols:
        context_features = np.full((len(timestamps), len(context_cols)), np.nan, dtype=np.float64)

    for t_idx, timestamp in enumerate(timestamps):
        records_t = grouped_assets[timestamp]
        for slot_idx, record in enumerate(records_t):
            mask[t_idx, slot_idx] = True
            for f_idx, feature_col in enumerate(feature_cols):
                value = record[feature_col]
                characteristics[t_idx, slot_idx, f_idx] = (
                    float(value) if _is_finite(value) else np.nan
                )
            if returns is not None and return_col is not None:
                value = record[return_col]
                returns[t_idx, slot_idx] = float(value) if _is_finite(value) else np.nan

        if context_features is not None and records_t:
            for c_idx, context_col in enumerate(context_cols):
                values = np.asarray([record[context_col] for record in records_t], dtype=object)
                finite_values = [value for value in values if _is_finite(value)]
                if not finite_values:
                    context_features[t_idx, c_idx] = np.nan
                    continue
                first_value = float(finite_values[0])
                if any(abs(float(value) - first_value) > 1e-12 for value in finite_values[1:]):
                    raise ValueError(
                        f"context column {context_col!r} must be constant within timestamp "
                        f"{timestamp!r}"
                    )
                context_features[t_idx, c_idx] = first_value

    combined_metadata = dict(metadata or {})
    combined_metadata.update(resolved.metadata)
    return CrossSectionBatch(
        characteristics=characteristics,
        returns=returns,
        context_features=context_features,
        timestamps=timestamps,
        asset_ids=asset_ids,
        mask=mask,
        metadata=combined_metadata,
    )

persistent_panel_batch_from_long_frame

persistent_panel_batch_from_long_frame(
    frame,
    *,
    schema=None,
    return_col=None,
    feature_cols=(),
    timestamp_col=None,
    entity_col=None,
    metadata=None,
)

Build a persistent panel batch from a long-format frame.

Source code in src/ml4t/models/integration/data.py
def persistent_panel_batch_from_long_frame(
    frame: Any,
    *,
    schema: Any | None = None,
    return_col: str | None = None,
    feature_cols: Sequence[str] = (),
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    metadata: Mapping[str, Any] | None = None,
) -> PersistentPanelBatch:
    """Build a persistent panel batch from a long-format frame."""

    resolved = resolve_dataset_schema(
        frame,
        schema=schema,
        timestamp_col=timestamp_col,
        entity_col=entity_col,
    )
    records = _sorted_records(
        frame, timestamp_col=resolved.timestamp_col, entity_col=resolved.entity_col
    )
    timestamps = tuple(_ordered_unique(record[resolved.timestamp_col] for record in records))
    asset_ids = tuple(
        str(asset) for asset in _ordered_unique(record[resolved.entity_col] for record in records)
    )
    time_index = {timestamp: idx for idx, timestamp in enumerate(timestamps)}
    asset_index = {asset: idx for idx, asset in enumerate(asset_ids)}

    returns = None
    if return_col is not None:
        returns = np.full((len(timestamps), len(asset_ids)), np.nan, dtype=np.float64)

    characteristics = None
    if feature_cols:
        characteristics = np.full(
            (len(timestamps), len(asset_ids), len(feature_cols)),
            np.nan,
            dtype=np.float64,
        )

    seen: set[tuple[Any, str]] = set()
    for record in records:
        key = (record[resolved.timestamp_col], str(record[resolved.entity_col]))
        if key in seen:
            raise ValueError(
                f"Duplicate (timestamp, entity) row encountered in long-format panel data: {key}"
            )
        seen.add(key)
        t_idx = time_index[record[resolved.timestamp_col]]
        a_idx = asset_index[str(record[resolved.entity_col])]
        if returns is not None and return_col is not None:
            return_value = record[return_col]
            returns[t_idx, a_idx] = float(return_value) if _is_finite(return_value) else np.nan
        if characteristics is not None:
            for f_idx, feature_col in enumerate(feature_cols):
                value = record[feature_col]
                characteristics[t_idx, a_idx, f_idx] = float(value) if _is_finite(value) else np.nan

    combined_metadata = dict(metadata or {})
    combined_metadata.update(resolved.metadata)
    return PersistentPanelBatch(
        returns=returns,
        characteristics=characteristics,
        timestamps=timestamps,
        asset_ids=asset_ids,
        metadata=combined_metadata,
    )

resolve_dataset_schema

resolve_dataset_schema(
    frame,
    *,
    schema=None,
    timestamp_col=None,
    entity_col=None,
    timestamp_candidates=(
        "timestamp",
        "datetime",
        "date",
        "time",
    ),
    entity_candidates=(
        "asset",
        "symbol",
        "ticker",
        "instrument",
        "security",
    ),
)

Resolve timestamp and entity columns from explicit names or ML4T-style metadata.

Source code in src/ml4t/models/integration/data.py
def resolve_dataset_schema(
    frame: Any,
    *,
    schema: Any | None = None,
    timestamp_col: str | None = None,
    entity_col: str | None = None,
    timestamp_candidates: Sequence[str] = ("timestamp", "datetime", "date", "time"),
    entity_candidates: Sequence[str] = ("asset", "symbol", "ticker", "instrument", "security"),
) -> ResolvedDatasetSchema:
    """Resolve timestamp and entity columns from explicit names or ML4T-style metadata."""

    columns = tuple(_frame_columns(frame))
    inferred_schema = _coerce_schema(schema)

    resolved_timestamp = (
        timestamp_col
        or inferred_schema.get("timestamp_col")
        or _first_present(columns, timestamp_candidates)
    )
    if resolved_timestamp is None:
        raise ValueError(
            f"Could not resolve a timestamp column from columns {list(columns)}. "
            f"Expected one of {tuple(timestamp_candidates)} or explicit schema metadata."
        )
    if resolved_timestamp not in columns:
        raise ValueError(
            f"Resolved timestamp column {resolved_timestamp!r} not found in columns {list(columns)}"
        )

    resolved_entity = (
        entity_col
        or inferred_schema.get("entity_col")
        or _first_present(columns, entity_candidates)
    )
    if resolved_entity is None:
        raise ValueError(
            f"Could not resolve an entity column from columns {list(columns)}. "
            f"Expected one of {tuple(entity_candidates)} or explicit schema metadata."
        )
    if resolved_entity not in columns:
        raise ValueError(
            f"Resolved entity column {resolved_entity!r} not found in columns {list(columns)}"
        )

    return ResolvedDatasetSchema(
        timestamp_col=resolved_timestamp,
        entity_col=resolved_entity,
        metadata=inferred_schema,
    )

context_frame_from_weights

context_frame_from_weights(
    weights, *, prefix="w_", constants=None
)

Convert asset weights to a wide context frame for backtest strategies.

Source code in src/ml4t/models/integration/surfaces.py
def context_frame_from_weights(
    weights: AssetWeightsResult | PortfolioWeightsResult,
    *,
    prefix: str = "w_",
    constants: dict[str, Any] | None = None,
) -> ContextFrame:
    """Convert asset weights to a wide context frame for backtest strategies."""

    weight_matrix, timestamps, assets = _resolve_weight_matrix(weights)
    constant_columns = tuple((constants or {}).keys())
    columns = ("timestamp", *(f"{prefix}{asset}" for asset in assets), *constant_columns)
    rows: list[tuple[Any, ...]] = []

    for t_idx, timestamp in enumerate(timestamps):
        values = [
            float(weight_matrix[t_idx, a_idx]) if np.isfinite(weight_matrix[t_idx, a_idx]) else 0.0
            for a_idx in range(len(assets))
        ]
        rows.append((timestamp, *values, *tuple((constants or {}).values())))

    metadata = {"frame_type": "context", **weights.metadata}
    if isinstance(weights, PortfolioWeightsResult) and weights.checkpoint_step is not None:
        metadata["checkpoint_step"] = weights.checkpoint_step

    return ContextFrame(columns=columns, rows=tuple(rows), metadata=metadata)

predictions_frame_from_asset_forecast

predictions_frame_from_asset_forecast(
    forecast, *, constants=None
)

Convert asset expected returns to a diagnostic-ready predictions frame.

Source code in src/ml4t/models/integration/surfaces.py
def predictions_frame_from_asset_forecast(
    forecast: AssetForecastResult,
    *,
    constants: dict[str, Any] | None = None,
) -> PredictionsFrame:
    """Convert asset expected returns to a diagnostic-ready predictions frame."""

    expected_returns = np.asarray(forecast.expected_returns, dtype=np.float64)
    timestamps = _resolve_timestamps(expected_returns.shape[0], forecast.timestamps)
    assets = _resolve_assets(expected_returns.shape[1], forecast.asset_ids)
    constant_columns = tuple((constants or {}).keys())
    rows: list[tuple[Any, ...]] = []

    for t_idx, timestamp in enumerate(timestamps):
        for a_idx, asset in enumerate(assets):
            value = expected_returns[t_idx, a_idx]
            if not np.isfinite(value):
                continue
            rows.append(
                (
                    timestamp,
                    asset,
                    float(value),
                    *tuple((constants or {}).values()),
                )
            )

    return PredictionsFrame(
        columns=("timestamp", "asset", "prediction_value", *constant_columns),
        rows=tuple(rows),
        metadata={"frame_type": "prediction", **forecast.metadata},
    )

predictions_frame_from_asset_signal

predictions_frame_from_asset_signal(
    signal, *, constants=None
)

Convert asset-level signals to a diagnostic-ready predictions frame.

Source code in src/ml4t/models/integration/surfaces.py
def predictions_frame_from_asset_signal(
    signal: AssetSignalResult,
    *,
    constants: dict[str, Any] | None = None,
) -> PredictionsFrame:
    """Convert asset-level signals to a diagnostic-ready predictions frame."""

    signal_values = np.asarray(signal.signal_values, dtype=np.float64)
    timestamps = _resolve_timestamps(signal_values.shape[0], signal.timestamps)
    assets = _resolve_assets(signal_values.shape[1], signal.asset_ids)
    constant_columns = tuple((constants or {}).keys())
    rows: list[tuple[Any, ...]] = []

    for t_idx, timestamp in enumerate(timestamps):
        for a_idx, asset in enumerate(assets):
            value = signal_values[t_idx, a_idx]
            if not np.isfinite(value):
                continue
            rows.append(
                (
                    timestamp,
                    asset,
                    float(value),
                    *tuple((constants or {}).values()),
                )
            )

    return PredictionsFrame(
        columns=("timestamp", "asset", "prediction_value", *constant_columns),
        rows=tuple(rows),
        metadata={"frame_type": "prediction", **signal.metadata},
    )

signals_frame_from_asset_weights

signals_frame_from_asset_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert cross-sectional asset weights to a diagnostic-ready signals frame.

Source code in src/ml4t/models/integration/surfaces.py
def signals_frame_from_asset_weights(
    weights: AssetWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> SignalsFrame:
    """Convert cross-sectional asset weights to a diagnostic-ready signals frame."""

    return _frame_from_asset_weights(
        weights,
        value_column="signal_value",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="signal",
        frame_cls=SignalsFrame,
    )

signals_frame_from_portfolio_weights

signals_frame_from_portfolio_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert portfolio weights to a diagnostic-ready signals frame.

Source code in src/ml4t/models/integration/surfaces.py
def signals_frame_from_portfolio_weights(
    weights: PortfolioWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> SignalsFrame:
    """Convert portfolio weights to a diagnostic-ready signals frame."""

    return _frame_from_portfolio_weights(
        weights,
        value_column="signal_value",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="signal",
        frame_cls=SignalsFrame,
    )

weights_frame_from_asset_weights

weights_frame_from_asset_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert cross-sectional asset weights to a backtest-ready weights frame.

Source code in src/ml4t/models/integration/surfaces.py
def weights_frame_from_asset_weights(
    weights: AssetWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> WeightsFrame:
    """Convert cross-sectional asset weights to a backtest-ready weights frame."""

    return _frame_from_asset_weights(
        weights,
        value_column="weight",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="weight",
        frame_cls=WeightsFrame,
    )

weights_frame_from_portfolio_weights

weights_frame_from_portfolio_weights(
    weights, *, constants=None, selected_threshold=1e-09
)

Convert portfolio weights to a backtest-ready weights frame.

Source code in src/ml4t/models/integration/surfaces.py
def weights_frame_from_portfolio_weights(
    weights: PortfolioWeightsResult,
    *,
    constants: dict[str, Any] | None = None,
    selected_threshold: float = 1e-9,
) -> WeightsFrame:
    """Convert portfolio weights to a backtest-ready weights frame."""

    return _frame_from_portfolio_weights(
        weights,
        value_column="weight",
        include_selected=True,
        selected_threshold=selected_threshold,
        constants=constants,
        frame_type="weight",
        frame_cls=WeightsFrame,
    )

write_backtest_frames

write_backtest_frames(
    artifact_dir,
    *,
    predictions=None,
    weights=None,
    compression="zstd",
)

Write standardized prediction and weight artifacts for downstream ML4T tooling.

Source code in src/ml4t/models/integration/surfaces.py
def write_backtest_frames(
    artifact_dir: str | Path,
    *,
    predictions: PredictionsFrame | None = None,
    weights: WeightsFrame | None = None,
    compression: str = "zstd",
) -> dict[str, Path]:
    """Write standardized prediction and weight artifacts for downstream ML4T tooling."""

    output_dir = Path(artifact_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    written: dict[str, Path] = {}
    if predictions is not None:
        written["predictions"] = predictions.write_parquet(
            output_dir / "predictions.parquet",
            compression=compression,
        )
    if weights is not None:
        written["weights"] = weights.write_parquet(
            output_dir / "weights.parquet",
            compression=compression,
        )
    return written

Family Namespaces

Namespace Purpose
ml4t.models.latent_factors structural latent-factor estimators
ml4t.models.forecasters factor-premium forecasters
ml4t.models.mappers asset-level mapping from factor forecasts
ml4t.models.stochastic_discount_factor weight-native SDF estimation and return projections
ml4t.models.asset_prediction direct asset-level predictors
ml4t.models.portfolio end-to-end portfolio learners