Home / Libraries / ML4T Data / Docs
ML4T Data
ML4T Data Documentation
Unified market data acquisition from 19+ providers
Skip to content

API Reference

Complete API documentation for the ml4t-data library, auto-generated from source docstrings via mkdocstrings.


DataManager

The primary entry point for all data operations. DataManager is a facade that delegates to focused manager classes for configuration, fetching, storage, metadata, and batch operations.

from ml4t.data import DataManager

# Fetch-only (no storage)
manager = DataManager()
df = manager.fetch("AAPL", "2024-01-01", "2024-12-31", provider="yahoo")

# With storage for load/update workflows
from ml4t.data.storage import HiveStorage, StorageConfig

storage = HiveStorage(StorageConfig(base_path="./data"))
manager = DataManager(storage=storage, use_transactions=True)
key = manager.load("AAPL", "2024-01-01", "2024-12-31")
key = manager.update("AAPL")

DataManager

DataManager(
    config_path=None,
    output_format="polars",
    providers=None,
    storage=None,
    use_transactions=False,
    enable_validation=True,
    progress_callback=None,
    **kwargs,
)

Unified interface for financial data access and storage.

The DataManager provides a single, consistent API for fetching and managing data from multiple providers. It handles:

Data Fetching: - Provider selection based on symbol patterns - Configuration management (YAML, environment, parameters) - Connection pooling and session management - Output format conversion (Polars, pandas, lazy) - Batch fetching with error handling

Storage Operations (when storage configured): - Initial data loading with validation - Incremental updates with gap detection and filling - Transaction support for ACID guarantees - Progress callbacks for UI integration - Data validation (OHLCV, cross-validation)

Usage:

Fetch only (no storage): >>> manager = DataManager() >>> df = manager.fetch("AAPL", "2024-01-01", "2024-12-31", provider="yahoo")

With storage for load/update: >>> from ml4t.data.storage.hive import HiveStorage >>> from ml4t.data.storage.backend import StorageConfig >>> storage = HiveStorage(StorageConfig(base_path="./data")) >>> manager = DataManager(storage=storage, use_transactions=True) >>> key = manager.load("AAPL", "2024-01-01", "2024-12-31") >>> key = manager.update("AAPL") # Incremental update

Initialize DataManager.

Parameters:

Name Type Description Default
config_path str | None

Path to YAML configuration file

None
output_format str

Output format ('polars', 'pandas', 'lazy')

'polars'
providers dict[str, dict[str, Any]] | None

Provider-specific configuration overrides

None
storage Any | None

Optional storage backend for load/update operations

None
use_transactions bool

Enable transactional storage for ACID guarantees

False
enable_validation bool

Enable data validation during load/update

True
progress_callback Callable[[str, float], None] | None

Optional callback for progress updates (message, progress)

None
**kwargs Any

Additional configuration parameters

{}

config property

config

Get configuration dictionary.

output_format property

output_format

Get output format.

storage property

storage

Get storage backend.

fetch

fetch(
    symbol,
    start,
    end,
    frequency="daily",
    provider=None,
    **kwargs,
)

Fetch data for a symbol.

Parameters:

Name Type Description Default
symbol str

Symbol to fetch

required
start str

Start date (YYYY-MM-DD)

required
end str

End date (YYYY-MM-DD)

required
frequency str

Data frequency (daily, hourly, etc.)

'daily'
provider str | None

Optional provider override

None
**kwargs Any

Additional provider-specific parameters

{}

Returns:

Type Description
DataFrame | LazyFrame | Any

Data in configured output format

Raises:

Type Description
ValueError

If no provider found or data fetch fails

fetch_batch

fetch_batch(
    symbols, start, end, frequency="daily", **kwargs
)

Fetch data for multiple symbols.

Parameters:

Name Type Description Default
symbols list[str]

List of symbols to fetch

required
start str

Start date (YYYY-MM-DD)

required
end str

End date (YYYY-MM-DD)

required
frequency str

Data frequency

'daily'
**kwargs Any

Additional parameters

{}

Returns:

Type Description
dict[str, DataFrame | LazyFrame | Any | None]

Dictionary mapping symbols to data (or None if fetch failed)

batch_load

batch_load(
    symbols,
    start,
    end,
    frequency="daily",
    provider=None,
    max_workers=4,
    fail_on_partial=False,
    **kwargs,
)

Fetch data for multiple symbols and return in multi-asset stacked format.

batch_load_universe

batch_load_universe(
    universe,
    start,
    end,
    frequency="daily",
    provider=None,
    max_workers=4,
    fail_on_partial=False,
    **kwargs,
)

Fetch data for all symbols in a pre-defined universe.

batch_load_from_storage

batch_load_from_storage(
    symbols,
    start,
    end,
    frequency="daily",
    asset_class="equities",
    provider=None,
    fetch_missing=True,
    max_workers=4,
    **kwargs,
)

Load multiple symbols from storage with optional fetch fallback.

load

load(
    symbol,
    start,
    end,
    frequency="daily",
    asset_class="equities",
    provider=None,
    bar_type="time",
    bar_threshold=None,
    exchange="UNKNOWN",
    calendar=None,
)

Load data from provider and store it.

import_data

import_data(
    data,
    symbol,
    provider,
    frequency="daily",
    asset_class="equities",
    bar_type="time",
    bar_threshold=None,
    exchange="UNKNOWN",
    calendar=None,
)

Import external data into storage with metadata.

update

update(
    symbol,
    frequency="daily",
    asset_class="equities",
    lookback_days=7,
    fill_gaps=True,
    provider=None,
)

Update existing data with incremental fetch.

list_symbols

list_symbols(
    provider=None,
    asset_class=None,
    exchange=None,
    bar_type=None,
)

List all symbols in storage, optionally filtered by metadata.

get_metadata

get_metadata(
    symbol, asset_class="equities", frequency="daily"
)

Get metadata for a specific symbol.

assign_sessions

assign_sessions(df, exchange=None, calendar=None)

Assign session_date column to DataFrame based on exchange calendar.

complete_sessions

complete_sessions(
    df,
    exchange=None,
    calendar=None,
    fill_gaps=True,
    fill_method="forward",
    zero_volume=True,
)

Complete sessions by filling gaps.

update_all

update_all(provider=None, asset_class=None, exchange=None)

Update all stored data matching the filters.

list_providers

list_providers()

List available providers.

get_provider_info

get_provider_info(provider_name)

Get information about a provider.

clear_cache

clear_cache()

Clear routing cache and close provider connections.


Storage

StorageConfig

Dataclass configuring the storage backend. Controls partitioning strategy, compression, locking, and metadata tracking.

from ml4t.data.storage import StorageConfig

# Hive-partitioned storage for minute data
config = StorageConfig(
    base_path="./market_data",
    strategy="hive",
    partition_granularity="day",
    compression="zstd",
)

# Flat storage for small datasets
config = StorageConfig(
    base_path="./data",
    strategy="flat",
    compression="snappy",
)

StorageConfig dataclass

StorageConfig(
    base_path,
    strategy="hive",
    compression="zstd",
    partition_granularity="month",
    partition_cols=None,
    atomic_writes=True,
    enable_locking=True,
    metadata_tracking=True,
    generate_profile=True,
)

Configuration for storage backends.

Attributes:

Name Type Description
base_path Path

Base directory for storage.

strategy str

Storage strategy ("hive" or "flat").

compression str | None

Compression type for Parquet files.

partition_granularity PartitionGranularityType

Time-based partition granularity for Hive storage. - "year": Best for daily data (~252 rows/partition for stocks) - "month": Best for hourly data (~720 rows/partition) - "day": Best for minute data (~1,440 rows/partition) - "hour": Best for second/tick data (~3,600 rows/partition)

partition_cols list[str] | None

Deprecated. Use partition_granularity instead.

atomic_writes bool

Use atomic writes with temp file rename.

enable_locking bool

Enable file locking for concurrent access.

metadata_tracking bool

Track metadata in manifest files.

__post_init__
__post_init__()

Validate and set defaults.

StorageBackend

Abstract base class defining the storage interface. All backends (Hive, Flat) implement this contract.

StorageBackend

StorageBackend(config)

Bases: ABC

Abstract base class for storage backends.

Initialize storage backend with configuration.

Parameters:

Name Type Description Default
config StorageConfig

Storage configuration

required
write abstractmethod
write(data, key, metadata=None)

Write data to storage.

Parameters:

Name Type Description Default
data LazyFrame

Polars LazyFrame to write

required
key str

Storage key (e.g., "BTC-USD", "SPY")

required
metadata dict[str, Any] | None

Optional metadata to store alongside data

None

Returns:

Type Description
Path

Path to written file

read abstractmethod
read(key, start_date=None, end_date=None, columns=None)

Read data from storage.

Parameters:

Name Type Description Default
key str

Storage key

required
start_date datetime | None

Optional start date filter

None
end_date datetime | None

Optional end date filter

None
columns list[str] | None

Optional columns to select

None

Returns:

Type Description
LazyFrame

Polars LazyFrame with requested data

list_keys abstractmethod
list_keys()

List all available keys in storage.

Returns:

Type Description
list[str]

List of storage keys

exists abstractmethod
exists(key)

Check if a key exists in storage.

Parameters:

Name Type Description Default
key str

Storage key to check

required

Returns:

Type Description
bool

True if key exists

delete abstractmethod
delete(key)

Delete data for a key.

Parameters:

Name Type Description Default
key str

Storage key to delete

required

Returns:

Type Description
bool

True if deletion was successful

get_metadata
get_metadata(key)

Get metadata for a key.

Parameters:

Name Type Description Default
key str

Storage key

required

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None

HiveStorage

Hive-partitioned storage with configurable time-based partitioning. Delivers 7x query performance improvement for time-range queries via partition pruning.

from ml4t.data.storage import HiveStorage, StorageConfig

config = StorageConfig(
    base_path="./data",
    partition_granularity="month",  # year, month, day, or hour
)
storage = HiveStorage(config)

# Write data (partitions by timestamp automatically)
storage.write(df, "equities/daily/AAPL")

# Read with partition pruning
from datetime import datetime
lf = storage.read(
    "equities/daily/AAPL",
    start_date=datetime(2024, 6, 1),
    end_date=datetime(2024, 12, 31),
    columns=["timestamp", "close", "volume"],
)
df = lf.collect()

HiveStorage

HiveStorage(config)

Bases: StorageBackend

Hive partitioned storage with configurable time-based partitioning.

This implementation provides: - 7x query performance improvement for time-based queries - Configurable partition granularity (year, month, day, hour) - Atomic writes with temp file pattern - Metadata tracking in JSON manifests - File locking for concurrent access safety - Polars lazy evaluation throughout

Partition Granularity

Configure via StorageConfig.partition_granularity: - "year": Best for daily data (~252 rows/partition) - "month": Best for hourly data (~720 rows/partition) [default] - "day": Best for minute data (~1,440 rows/partition) - "hour": Best for second/tick data (~3,600 rows/partition)

Example

from ml4t.data.storage import HiveStorage, StorageConfig

For minute data, use day-level partitioning

config = StorageConfig(base_path="./data", partition_granularity="day") storage = HiveStorage(config)

Initialize Hive storage backend.

Parameters:

Name Type Description Default
config StorageConfig

Storage configuration

required
write
write(data, key=None, metadata=None)

Write data using Hive partitioning.

Parameters:

Name Type Description Default
data LazyFrame | DataFrame | DataObject

Data to write (DataFrame, LazyFrame, or DataObject)

required
key str | None

Storage key (e.g., "BTC-USD" or "equities/daily/AAPL"). Optional if data is DataObject.

None
metadata dict[str, Any] | None

Optional metadata dict

None

Returns:

Type Description
Path | str

Path to base directory (old API) or storage key string (new DataObject API)

read
read(key, start_date=None, end_date=None, columns=None)

Read data from Hive partitions.

Parameters:

Name Type Description Default
key str

Storage key

required
start_date datetime | None

Optional start date filter

None
end_date datetime | None

Optional end date filter

None
columns list[str] | None

Optional columns to select

None

Returns:

Type Description
LazyFrame

LazyFrame with requested data

list_keys
list_keys()

List all keys in storage.

Returns:

Type Description
list[str]

List of storage keys

exists
exists(key)

Check if key exists.

Parameters:

Name Type Description Default
key str

Storage key

required

Returns:

Type Description
bool

True if key exists

delete
delete(key)

Delete all data for a key.

Parameters:

Name Type Description Default
key str

Storage key

required

Returns:

Type Description
bool

True if successful

get_latest_timestamp
get_latest_timestamp(symbol, provider)

Get the latest timestamp for a symbol from a provider.

Parameters:

Name Type Description Default
symbol str

Symbol identifier

required
provider str

Data provider name

required

Returns:

Type Description
datetime | None

Latest timestamp in the dataset, or None if no data exists

save_chunk
save_chunk(data, symbol, provider, start_time, end_time)

Save an incremental data chunk.

Parameters:

Name Type Description Default
data DataFrame

DataFrame with OHLCV data

required
symbol str

Symbol identifier

required
provider str

Data provider name

required
start_time datetime

Start time of this chunk

required
end_time datetime

End time of this chunk

required

Returns:

Type Description
Path

Path to the saved chunk file

update_combined_file
update_combined_file(data, symbol, provider)

Update the main combined file with new data.

Parameters:

Name Type Description Default
data DataFrame

New data to append

required
symbol str

Symbol identifier

required
provider str

Data provider name

required

Returns:

Type Description
int

Number of new records added (after deduplication)

read_data
read_data(symbol, provider, start_time=None, end_time=None)

Read data for a symbol with optional time filtering.

Parameters:

Name Type Description Default
symbol str

Symbol identifier

required
provider str

Data provider name

required
start_time datetime | None

Optional start time filter

None
end_time datetime | None

Optional end time filter

None

Returns:

Type Description
DataFrame

DataFrame with filtered data

update_metadata
update_metadata(
    symbol, provider, last_update, records_added, chunk_file
)

Update metadata after incremental update.

Parameters:

Name Type Description Default
symbol str

Symbol identifier

required
provider str

Data provider name

required
last_update datetime

Timestamp of this update

required
records_added int

Number of records added

required
chunk_file str

Name of the chunk file saved

required

FlatStorage

Simple single-file-per-key storage. Suitable for smaller datasets or when partition pruning is not beneficial.

from ml4t.data.storage import FlatStorage, StorageConfig

config = StorageConfig(base_path="./data", strategy="flat")
storage = FlatStorage(config)

storage.write(df, "reference/spy")
lf = storage.read("reference/spy")

FlatStorage

FlatStorage(config)

Bases: StorageBackend

Flat file storage without partitioning.

This implementation provides: - Simple single-file storage per key - Atomic writes with temp file pattern - Metadata tracking in JSON manifests - File locking for concurrent access safety - Polars lazy evaluation throughout

Initialize flat storage backend.

Parameters:

Name Type Description Default
config StorageConfig

Storage configuration

required
write
write(data, key, metadata=None)

Write data as a single file.

Parameters:

Name Type Description Default
data LazyFrame | DataFrame

Data to write

required
key str

Storage key (e.g., "BTC-USD")

required
metadata dict[str, Any] | None

Optional metadata

None

Returns:

Type Description
Path

Path to written file

read
read(key, start_date=None, end_date=None, columns=None)

Read data from flat file.

Parameters:

Name Type Description Default
key str

Storage key

required
start_date datetime | None

Optional start date filter

None
end_date datetime | None

Optional end date filter

None
columns list[str] | None

Optional columns to select

None

Returns:

Type Description
LazyFrame

LazyFrame with requested data

list_keys
list_keys()

List all keys in storage.

Returns:

Type Description
list[str]

List of storage keys

exists
exists(key)

Check if key exists.

Parameters:

Name Type Description Default
key str

Storage key

required

Returns:

Type Description
bool

True if key exists

delete
delete(key)

Delete data for a key.

Parameters:

Name Type Description Default
key str

Storage key

required

Returns:

Type Description
bool

True if successful

create_storage

Factory function for creating storage backends from a strategy name.

from ml4t.data.storage import create_storage

storage = create_storage("./data", strategy="hive", partition_granularity="day")

create_storage

create_storage(base_path, strategy='hive', **kwargs)

Create a storage backend with the specified strategy.

Parameters:

Name Type Description Default
base_path str | Path

Base directory for storage

required
strategy str

Storage strategy ("hive" or "flat")

'hive'
**kwargs Any

Additional configuration options

{}

Returns:

Type Description
StorageBackend

Configured storage backend

Example

storage = create_storage("/data", strategy="hive") storage.write(df.lazy(), "BTC-USD")


Book-Facing Managers

These classes power the dataset download workflows used throughout the ML4T book repository. They are useful when you want opinionated, higher-level wrappers around specific canonical datasets instead of the full generality of DataManager.

ETFDataManager

Wrapper around the Yahoo ETF download workflow used by the book datasets.

ETFDataManager

ETFDataManager(config)

Bases: ProfileMixin

Manages ETF data download and storage for ML4T book.

This class provides a simple interface for book readers to: 1. Download initial historical data 2. Update data incrementally 3. Load data for analysis

Data is stored in Hive-partitioned format

{storage_path}/ohlcv_1d/ticker={SYMBOL}/data.parquet

Inherits from ProfileMixin to provide
  • generate_profile(): Generate column-level statistics
  • load_profile(): Load existing profile

Initialize the ETF data manager.

Parameters:

Name Type Description Default
config ETFConfig

Configuration object with tickers, dates, and storage path

required
provider property
provider

Lazily initialize Yahoo Finance provider.

from_config classmethod
from_config(config_path)

Create manager from YAML configuration file.

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML config file

required

Returns:

Type Description
ETFDataManager

Initialized ETFDataManager

download_all
download_all(force=False)

Download all ETF data.

Parameters:

Name Type Description Default
force bool

If True, re-download even if data exists

False

Returns:

Type Description
dict[str, int]

Dictionary of symbol -> row count

update
update()

Update existing data with latest available.

Detects the last date in existing data and downloads from there to the configured end date.

Returns:

Type Description
dict[str, int]

Dictionary of symbol -> new rows added

load_ohlcv
load_ohlcv(symbol)

Load OHLCV data for a single symbol.

Parameters:

Name Type Description Default
symbol str

Ticker symbol (e.g., "SPY")

required

Returns:

Type Description
DataFrame

DataFrame with OHLCV data (columns: timestamp, symbol, open, high, low, close, volume)

load_symbols
load_symbols(symbols)

Load OHLCV data for multiple symbols.

Parameters:

Name Type Description Default
symbols list[str]

List of ticker symbols

required

Returns:

Type Description
DataFrame

Combined DataFrame with symbol column

load_all
load_all()

Load all ETF data.

Returns:

Type Description
DataFrame

Combined DataFrame with all tickers (columns: timestamp, symbol, open, high, low, close, volume)

load_category
load_category(category)

Load OHLCV data for a category.

Parameters:

Name Type Description Default
category str

Category name (e.g., "us_equity_broad", "fixed_income")

required

Returns:

Type Description
DataFrame

DataFrame with tickers from that category

get_available_symbols
get_available_symbols()

Get list of symbols with downloaded data.

Returns:

Type Description
list[str]

List of ticker symbols that have data files

get_data_summary
get_data_summary()

Get summary of available data.

Returns:

Type Description
DataFrame

DataFrame with symbol, start_date, end_date, row_count

CryptoDataManager

Simplified manager for Binance Bulk spot and futures download workflows.

CryptoDataManager

CryptoDataManager(config)

Bases: ProfileMixin

Manages crypto data download and storage for ML4T book.

This class provides a simple interface for book readers to: 1. Download premium index data from Binance 2. Load data for analysis

Data is stored as

{storage_path}/premium_index.parquet {storage_path}/premium_index/symbol={SYMBOL}/data.parquet

Inherits from ProfileMixin to provide
  • generate_profile(): Generate column-level statistics
  • load_profile(): Load existing profile

Initialize the crypto data manager.

Parameters:

Name Type Description Default
config CryptoConfig

Configuration object with symbols and storage path

required
provider property
provider

Lazily initialize Binance bulk provider.

from_config classmethod
from_config(config_path)

Create manager from YAML configuration file.

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML config file

required

Returns:

Type Description
CryptoDataManager

Initialized CryptoDataManager

download_premium_index
download_premium_index(symbols=None)

Download premium index data for perpetual futures.

The premium index measures the basis between perpetual and spot prices, and is the primary driver of funding rates.

Premium Index = (Perpetual Price - Spot Price) / Spot Price - High premium → Crowded longs → Expected underperformance - Low/negative premium → Crowded shorts → Expected outperformance

Parameters:

Name Type Description Default
symbols list[str] | None

List of symbols to download (default: all from config)

None

Returns:

Type Description
DataFrame

DataFrame with premium index data

download_perps
download_perps(symbols=None)

Download perpetual futures OHLCV data using parallel multi-symbol fetch.

download_all
download_all(symbols=None)

Download premium index and perpetual OHLCV data.

load_premium_index
load_premium_index(symbols=None)

Load premium index data.

Parameters:

Name Type Description Default
symbols list[str] | None

List of symbols to load (default: all available)

None

Returns:

Type Description
DataFrame

DataFrame with premium index data

load_perps
load_perps(symbols=None)

Load perpetual futures OHLCV data.

load_symbol
load_symbol(symbol)

Load premium index data for a single symbol.

Parameters:

Name Type Description Default
symbol str

Symbol to load (e.g., "BTCUSDT")

required

Returns:

Type Description
DataFrame

DataFrame with premium index data

get_available_symbols
get_available_symbols()

Get list of symbols with downloaded data.

Returns:

Type Description
list[str]

List of symbols with data files

get_data_summary
get_data_summary()

Get summary of available data.

Returns:

Type Description
DataFrame

DataFrame with symbol, start_date, end_date, row_count

MacroDataManager

Simplified manager for FRED-backed macroeconomic and rates datasets.

MacroDataManager

MacroDataManager(config)

Manages macro/economic data download and storage for ML4T book.

This class provides a simple interface for book readers to: 1. Download Treasury yield data from FRED 2. Compute derived series (yield curve slope) 3. Load data for analysis

Data is stored as

{storage_path}/treasury_yields.parquet

Initialize the macro data manager.

Parameters:

Name Type Description Default
config MacroConfig

Configuration object with series and storage path

required
from_config classmethod
from_config(config_path)

Create manager from YAML configuration file.

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML config file

required

Returns:

Type Description
MacroDataManager

Initialized MacroDataManager

download_treasury_yields
download_treasury_yields()

Download Treasury yield data.

Uses FRED API if FRED_API_KEY is set, otherwise falls back to yfinance Treasury yield proxies.

Returns:

Type Description
DataFrame

DataFrame with Treasury yield data

load_treasury_yields
load_treasury_yields()

Load Treasury yield data.

Returns:

Type Description
DataFrame

DataFrame with Treasury yields and derived series

get_yield_curve_slope
get_yield_curve_slope()

Get yield curve slope time series.

The yield curve slope (10Y - 2Y) is a key regime indicator: - Slope > 0.5%: Risk-on environment - Slope < 0.5%: Risk-off environment

Returns:

Type Description
DataFrame

DataFrame with timestamp and YIELD_CURVE_SLOPE columns

get_regime
get_regime(threshold=0.5)

Get regime classification based on yield curve slope.

Parameters:

Name Type Description Default
threshold float

Slope threshold in percentage points (default 0.5%)

0.5

Returns:

Type Description
DataFrame

DataFrame with timestamp, slope, and regime columns

FuturesDataManager

Book-facing CME futures downloader built around Databento.

FuturesDataManager

FuturesDataManager(config)

Manages CME futures data download and storage for ML4T book.

This class provides a simple interface for book readers to: 1. Download initial historical data 2. Update data incrementally 3. Load data for analysis

Data is stored in Hive-partitioned format

{storage_path}/ohlcv_1d/product={PRODUCT}/year={YYYY}/data.parquet {storage_path}/definitions/product={PRODUCT}/definitions.parquet

Initialize the futures data manager.

Parameters:

Name Type Description Default
config FuturesConfig

Configuration object with products, dates, and storage path

required
from_config classmethod
from_config(config_path)

Create manager from YAML configuration file.

Parameters:

Name Type Description Default
config_path str | Path

Path to YAML config file

required

Returns:

Type Description
FuturesDataManager

Initialized FuturesDataManager

download_product_ohlcv
download_product_ohlcv(
    product, start_date=None, end_date=None
)

Download OHLCV data for a single product.

Parameters:

Name Type Description Default
product str

CME product symbol (e.g., "ES", "CL")

required
start_date str | None

Start date (YYYY-MM-DD), defaults to config start

None
end_date str | None

End date (YYYY-MM-DD), defaults to config end

None

Returns:

Type Description
dict[str, Any]

Dict with download statistics

download_product_definitions
download_product_definitions(product)

Download definition snapshots for a single product.

Uses yearly snapshots to efficiently capture contract definitions without downloading the full daily history.

Parameters:

Name Type Description Default
product str

CME product symbol

required

Returns:

Type Description
dict[str, Any]

Dict with download statistics

download_all
download_all(include_definitions=True, parallel=1)

Download all configured products.

Parameters:

Name Type Description Default
include_definitions bool

Whether to also download definitions

True
parallel int

Number of parallel downloads (1 = sequential)

1

Returns:

Type Description
dict[str, Any]

Summary statistics

update
update(end_date=None)

Update existing data to latest available date.

Finds the latest date in existing data and downloads only new data.

Parameters:

Name Type Description Default
end_date str | None

End date for update (default: today)

None

Returns:

Type Description
dict[str, Any]

Update statistics

load_ohlcv
load_ohlcv(product, start=None, end=None)

Load OHLCV data for a product.

Parameters:

Name Type Description Default
product str

CME product symbol

required
start str | None

Optional start date filter (YYYY-MM-DD)

None
end str | None

Optional end date filter (YYYY-MM-DD)

None

Returns:

Type Description
DataFrame

Polars DataFrame with OHLCV data

load_definitions
load_definitions(product)

Load contract definitions for a product.

Parameters:

Name Type Description Default
product str

CME product symbol

required

Returns:

Type Description
DataFrame

Polars DataFrame with contract definitions

list_products
list_products()

List all configured products by category.

get_data_summary
get_data_summary()

Get summary of downloaded data.

Returns:

Type Description
DataFrame

DataFrame with product, date range, row count, etc.

generate_profile
generate_profile(product)

Generate a data profile for a specific product.

Creates column-level statistics for the product's OHLCV data. Can be called on-demand after download to (re)generate the profile.

Parameters:

Name Type Description Default
product str

CME product symbol (e.g., "ES", "CL")

required

Returns:

Type Description
DatasetProfile

DatasetProfile with column statistics

Example

manager = FuturesDataManager.from_config("config.yaml") profile = manager.generate_profile("ES") print(profile.summary())

load_profile
load_profile(product)

Load the existing data profile for a specific product.

Parameters:

Name Type Description Default
product str

CME product symbol (e.g., "ES", "CL")

required

Returns:

Type Description
DatasetProfile | None

DatasetProfile if exists, None otherwise

Example

manager = FuturesDataManager.from_config("config.yaml") profile = manager.load_profile("ES") if profile: ... print(f"ES has {profile.total_rows} rows")

generate_all_profiles
generate_all_profiles()

Generate profiles for all downloaded products.

Returns:

Type Description
dict[str, DatasetProfile]

Dictionary of product -> DatasetProfile

Example

manager = FuturesDataManager.from_config("config.yaml") profiles = manager.generate_all_profiles() for product, profile in profiles.items(): ... print(f"{product}: {profile.total_rows} rows")


Providers

BaseProvider

Abstract base class for all data providers. Composes rate-limiting, circuit-breaker, validation, and HTTP session mixins into a single base.

Concrete providers implement either:

  • _fetch_and_transform_data() for a single-step workflow, or
  • _fetch_raw_data() + _transform_data() for a two-step workflow.
from ml4t.data.providers.base import BaseProvider
import polars as pl

class MyProvider(BaseProvider):
    @property
    def name(self) -> str:
        return "my_provider"

    def _fetch_and_transform_data(self, symbol, start, end, frequency):
        # Fetch from API and return canonical OHLCV DataFrame
        ...

BaseProvider

BaseProvider(
    rate_limit=None,
    session_config=None,
    circuit_breaker_config=None,
)

Bases: RateLimitMixin, CircuitBreakerMixin, ValidationMixin, SessionMixin, ABC

Enhanced base provider composing all mixins.

All providers must return OHLCV data in the canonical schema with columns in standard order: [timestamp, symbol, open, high, low, close, volume].

Each provider must implement either: - _fetch_and_transform_data() for single-step implementation - _fetch_raw_data() + _transform_data() for two-step implementation

Class Variables

DEFAULT_RATE_LIMIT: Default (calls, period_seconds) for rate limiting FREQUENCY_MAP: Mapping of frequency names to provider-specific values CIRCUIT_BREAKER_CONFIG: Circuit breaker failure threshold and reset timeout

Key Contracts
  • Columns always in order: timestamp, symbol, open, high, low, close, volume
  • Timestamps are Datetime type
  • OHLCV values are Float64
  • Symbol is uppercase String
  • Data sorted by timestamp ascending
  • No duplicate timestamps

Initialize base provider with common infrastructure.

Parameters:

Name Type Description Default
rate_limit tuple[int, float] | None

Tuple of (calls, period_seconds) for rate limiting

None
session_config dict[str, Any] | None

HTTP session configuration

None
circuit_breaker_config dict[str, Any] | None

Circuit breaker configuration

None
name abstractmethod property
name

Return the provider name.

fetch_ohlcv
fetch_ohlcv(symbol, start, end, frequency='daily')

Template method for fetching OHLCV data.

This method implements the common workflow: 1. Validate inputs 2. Apply rate limiting 3. Fetch and transform data (provider-specific) 4. Validate and normalize data

Providers can implement either: - _fetch_and_transform_data() for single-step implementation - _fetch_raw_data() + _transform_data() for two-step implementation

Parameters:

Name Type Description Default
symbol str

The symbol to fetch data for

required
start str

Start date in YYYY-MM-DD format (inclusive)

required
end str

End date in YYYY-MM-DD format (see note below)

required
frequency str

Data frequency (daily, minute, etc.)

'daily'

Returns:

Type Description
DataFrame

DataFrame with OHLCV data in canonical schema:

DataFrame

[timestamp, symbol, open, high, low, close, volume]

Note

Date range semantics vary by provider: - Most providers: Both start and end are INCLUSIVE - Yahoo Finance: end is EXCLUSIVE (internally adds 1 day)

fetch_ohlcv_async async
fetch_ohlcv_async(symbol, start, end, frequency='daily')

Async wrapper around fetch_ohlcv using a thread pool.

Providers with native async support should override this method.

capabilities
capabilities()

Return provider capabilities (default implementation).

Override in subclasses to provide accurate capabilities.

close
close()

Clean up resources.

ProviderCapabilities

Frozen dataclass describing what a provider supports (intraday, crypto, forex, futures, authentication requirements, rate limits).

from ml4t.data.providers.protocols import ProviderCapabilities

caps = ProviderCapabilities(
    supports_intraday=True,
    supports_crypto=True,
    requires_api_key=True,
    rate_limit=(120, 60.0),  # 120 calls per 60 seconds
)

ProviderCapabilities dataclass

ProviderCapabilities(
    supports_intraday=False,
    supports_crypto=False,
    supports_forex=False,
    supports_futures=False,
    requires_api_key=False,
    max_history_days=None,
    rate_limit=(60, 60.0),
)

Describes what a provider can do.

Attributes:

Name Type Description
supports_intraday bool

Can fetch minute/hourly data

supports_crypto bool

Handles cryptocurrency symbols

supports_forex bool

Handles forex pairs

supports_futures bool

Handles futures contracts

requires_api_key bool

Needs authentication

max_history_days int | None

Maximum historical data available

rate_limit tuple[int, float]

(calls, period_seconds) tuple

OHLCVProvider (Protocol)

Structural typing protocol for OHLCV providers. Any class implementing name, fetch_ohlcv(), and capabilities() satisfies this protocol without inheriting from BaseProvider.

OHLCVProvider

Bases: Protocol

Protocol for OHLCV data providers.

Any class implementing these methods is considered an OHLCVProvider, regardless of inheritance. This enables duck typing with type safety.

Example

class MyCustomProvider: ... @property ... def name(self) -> str: ... return "custom" ... ... def fetch_ohlcv(self, symbol, start, end, frequency="daily"): ... # Custom implementation ... pass ... ... def capabilities(self) -> ProviderCapabilities: ... return ProviderCapabilities() ... isinstance(MyCustomProvider(), OHLCVProvider) # True

name property
name

Return the provider name (e.g., 'yahoo', 'binance_api').

fetch_ohlcv
fetch_ohlcv(symbol, start, end, frequency='daily')

Fetch OHLCV data for a symbol.

Parameters:

Name Type Description Default
symbol str

Symbol to fetch (e.g., 'AAPL', 'BTCUSDT')

required
start str

Start date in YYYY-MM-DD format

required
end str

End date in YYYY-MM-DD format

required
frequency str

Data frequency ('daily', 'hourly', 'minute', etc.)

'daily'

Returns:

Type Description
DataFrame

DataFrame with columns: [timestamp, symbol, open, high, low, close, volume]

capabilities
capabilities()

Return provider capabilities.


Validation and Updates

OHLCVValidator

Primary structural validation entry point for OHLCV datasets.

OHLCVValidator

OHLCVValidator(
    check_nulls=True,
    check_price_consistency=True,
    check_negative_prices=True,
    check_negative_volume=True,
    check_duplicate_timestamps=True,
    check_chronological_order=True,
    check_price_staleness=True,
    check_extreme_returns=True,
    max_return_threshold=0.5,
    staleness_threshold=5,
)

Bases: Validator

Validator for OHLCV (Open, High, Low, Close, Volume) data.

Initialize OHLCV validator with configurable checks.

Parameters:

Name Type Description Default
check_nulls bool

Check for null values

True
check_price_consistency bool

Check high >= low, high >= close, etc.

True
check_negative_prices bool

Check for negative prices

True
check_negative_volume bool

Check for negative volume

True
check_duplicate_timestamps bool

Check for duplicate timestamps

True
check_chronological_order bool

Check timestamps are in order

True
check_price_staleness bool

Check for stale (unchanged) prices

True
check_extreme_returns bool

Check for extreme price returns

True
max_return_threshold float

Threshold for extreme returns (as fraction)

0.5
staleness_threshold int

Days of identical prices to flag as stale

5
name
name()

Return validator name.

validate
validate(df, **kwargs)

Validate OHLCV DataFrame.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with OHLCV columns

required
**kwargs Any

Additional parameters

{}

Returns:

Type Description
ValidationResult

ValidationResult with any issues found

AnomalyManager

Coordinates statistical anomaly detectors such as return outliers, volume spikes, and stale prices.

AnomalyManager

AnomalyManager(config=None, custom_detectors=None)

Manages anomaly detection across multiple detectors.

Initialize anomaly manager.

Parameters:

Name Type Description Default
config AnomalyConfig | None

Anomaly detection configuration

None
custom_detectors list[AnomalyDetector] | None

Additional custom detectors

None
analyze
analyze(df, symbol, asset_class=None)

Analyze data for anomalies.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with OHLCV data

required
symbol str

Symbol being analyzed

required
asset_class str | None

Optional asset class for configuration overrides

None

Returns:

Type Description
AnomalyReport

Anomaly detection report

analyze_batch
analyze_batch(datasets, asset_classes=None)

Analyze multiple datasets for anomalies.

Parameters:

Name Type Description Default
datasets dict[str, DataFrame]

Dictionary of symbol -> DataFrame

required
asset_classes dict[str, str] | None

Optional mapping of symbol -> asset class

None

Returns:

Type Description
dict[str, AnomalyReport]

Dictionary of symbol -> AnomalyReport

save_report
save_report(report, output_dir)

Save anomaly report to disk.

Parameters:

Name Type Description Default
report AnomalyReport

Anomaly report to save

required
output_dir Path

Directory to save report

required

Returns:

Type Description
Path

Path to saved report

filter_by_severity
filter_by_severity(report, min_severity)

Filter report to only include anomalies above minimum severity.

Parameters:

Name Type Description Default
report AnomalyReport

Original report

required
min_severity str

Minimum severity level

required

Returns:

Type Description
AnomalyReport

Filtered report

get_statistics
get_statistics(report)

Get statistics from anomaly report.

Parameters:

Name Type Description Default
report AnomalyReport

Anomaly report

required

Returns:

Type Description
dict

Dictionary of statistics

GapDetector

Utility for detecting missing periods in stored or freshly fetched time series.

GapDetector

GapDetector(tolerance=DEFAULT_TOLERANCE)

Detect and analyze gaps in time series data.

Initialize gap detector.

Parameters:

Name Type Description Default
tolerance float

Tolerance factor for gap detection (0.1 = 10%)

DEFAULT_TOLERANCE
detect_gaps
detect_gaps(
    df,
    frequency="daily",
    timestamp_col="timestamp",
    is_crypto=False,
)

Detect gaps in time series data.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with time series data

required
frequency str

Data frequency (minute, hourly, daily, etc.)

'daily'
timestamp_col str

Name of timestamp column

'timestamp'
is_crypto bool

If True, expect 24/7 data; if False, market hours only

False

Returns:

Type Description
list[DataGap]

List of detected gaps

summarize_gaps
summarize_gaps(gaps)

Summarize detected gaps.

Parameters:

Name Type Description Default
gaps list[DataGap]

List of detected gaps

required

Returns:

Type Description
dict[str, Any]

Summary statistics

fill_gaps
fill_gaps(
    df, gaps, method="forward", timestamp_col="timestamp"
)

Fill detected gaps in data.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with gaps

required
gaps list[DataGap]

List of detected gaps

required
method str

Fill method ('forward', 'backward', 'interpolate', 'zero')

'forward'
timestamp_col str

Name of timestamp column

'timestamp'

Returns:

Type Description
DataFrame

DataFrame with gaps filled

MetadataTracker

Tracks dataset metadata and update history under the storage root.

MetadataTracker

MetadataTracker(base_path)

Track metadata and update history for datasets.

Initialize metadata tracker.

Parameters:

Name Type Description Default
base_path Path

Base directory for metadata storage

required
get_metadata
get_metadata(key)

Get metadata for a dataset.

Parameters:

Name Type Description Default
key str

Dataset key (e.g., "equities/daily/AAPL")

required

Returns:

Type Description
DatasetMetadata | None

DatasetMetadata if exists, None otherwise

update_metadata
update_metadata(
    key,
    update_record,
    total_rows,
    date_range_start,
    date_range_end,
)

Update metadata for a dataset.

Parameters:

Name Type Description Default
key str

Dataset key

required
update_record UpdateRecord

Record of the update operation

required
total_rows int

Total rows after update

required
date_range_start datetime

Start of data range

required
date_range_end datetime

End of data range

required

Returns:

Type Description
DatasetMetadata

Updated DatasetMetadata

add_update_record
add_update_record(key, record)

Add an update record to the history.

Parameters:

Name Type Description Default
key str

Dataset key

required
record UpdateRecord

Update record to add

required
get_update_history
get_update_history(key, limit=10)

Get update history for a dataset.

Parameters:

Name Type Description Default
key str

Dataset key

required
limit int

Maximum number of records to return

10

Returns:

Type Description
list[UpdateRecord]

List of UpdateRecord objects (most recent first)

check_health
check_health(key, stale_days=7)

Check health status of a dataset.

Parameters:

Name Type Description Default
key str

Dataset key

required
stale_days int

Number of days before data is considered stale

7

Returns:

Type Description
tuple[str, str]

Tuple of (health_status, health_message)

get_summary
get_summary()

Get summary of all tracked datasets.

Returns:

Type Description
dict[str, Any]

Dictionary with summary statistics


Universe and Contracts

Universe

Convenience registry for predefined and custom symbol universes.

Universe

Pre-defined symbol lists for common market indices and asset groups.

This class provides convenient access to commonly-used symbol universes, eliminating the need to manually maintain symbol lists for standard indices.

Attributes:

Name Type Description
SP500 list[str]

S&P 500 constituents (503 symbols including share classes)

NASDAQ100 list[str]

NASDAQ 100 constituents (100 symbols)

CRYPTO_TOP_100 list[str]

Top 100 cryptocurrencies by market cap

FOREX_MAJORS list[str]

Major currency pairs (28 pairs)

Examples:

Access pre-defined universes:

>>> sp500_symbols = Universe.SP500
>>> len(sp500_symbols)
503
>>> nasdaq_symbols = Universe.NASDAQ100
>>> len(nasdaq_symbols)
100

Case-insensitive retrieval:

>>> symbols = Universe.get("sp500")
>>> symbols == Universe.SP500
True
>>> symbols = Universe.get("NASDAQ100")
>>> len(symbols)
100

List all available universes:

>>> available = Universe.list_universes()
>>> "SP500" in available
True
>>> "NASDAQ100" in available
True
get classmethod
get(universe_name)

Get a universe by name (case-insensitive).

Parameters:

Name Type Description Default
universe_name str

Name of the universe (e.g., "sp500", "NASDAQ100")

required

Returns:

Type Description
list[str]

List of symbols in the universe

Raises:

Type Description
ValueError

If universe name is not recognized

Examples:

>>> symbols = Universe.get("sp500")
>>> len(symbols)
503
>>> symbols = Universe.get("NASDAQ100")
>>> len(symbols)
100
>>> symbols = Universe.get("crypto_top_100")
>>> "BTC" in symbols
True
>>> Universe.get("invalid")
Traceback (most recent call last):
    ...
ValueError: Unknown universe 'invalid'. Available: SP500, NASDAQ100, ...
list_universes classmethod
list_universes()

List all available universe names.

Returns:

Type Description
list[str]

Sorted list of universe names

Examples:

>>> universes = Universe.list_universes()
>>> "SP500" in universes
True
>>> "NASDAQ100" in universes
True
>>> len(universes) >= 4
True
add_custom classmethod
add_custom(name, symbols)

Add a custom universe.

This allows users to register their own symbol lists for convenience.

Parameters:

Name Type Description Default
name str

Universe name (will be converted to uppercase)

required
symbols list[str]

List of symbols

required

Raises:

Type Description
ValueError

If universe name already exists

Examples:

>>> Universe.add_custom("my_portfolio", ["AAPL", "MSFT", "GOOGL"])
>>> symbols = Universe.get("my_portfolio")
>>> len(symbols)
3
>>> Universe.add_custom("sp500", ["AAPL"])  # Duplicate
Traceback (most recent call last):
    ...
ValueError: Universe 'SP500' already exists
remove_custom classmethod
remove_custom(name)

Remove a custom universe.

Built-in universes (SP500, NASDAQ100, etc.) cannot be removed.

Parameters:

Name Type Description Default
name str

Universe name to remove

required

Raises:

Type Description
ValueError

If universe doesn't exist or is a built-in universe

Examples:

>>> Universe.add_custom("temp", ["AAPL"])
>>> Universe.remove_custom("temp")
>>> Universe.get("temp")
Traceback (most recent call last):
    ...
ValueError: Unknown universe 'temp'...
>>> Universe.remove_custom("SP500")  # Built-in
Traceback (most recent call last):
    ...
ValueError: Cannot remove built-in universe 'SP500'

ContractSpec

Contract metadata for futures and other exchange-traded assets available from the package root.

ContractSpec dataclass

ContractSpec(
    symbol,
    asset_class=EQUITY,
    multiplier=1.0,
    tick_size=0.01,
    margin=None,
    exchange=None,
    currency="USD",
    name=None,
)

Contract specification for futures and other derivatives.

Defines characteristics that affect P&L calculation and margin: - Futures: multiplier varies (ES=\(50, CL=\)1000, etc.) - Equities: multiplier=1, tick_size=0.01 - Forex: pip value varies by pair and account currency

Example
E-mini S&P 500 futures

es_spec = ContractSpec( symbol="ES", asset_class=AssetClass.FUTURE, multiplier=50.0, # $50 per point tick_size=0.25, # Minimum price move margin=15000.0, # Initial margin per contract exchange="CME", )

Apple stock (default equity spec)

aapl_spec = ContractSpec( symbol="AAPL", asset_class=AssetClass.EQUITY, # multiplier=1.0 (default) # tick_size=0.01 (default) )

tick_value property
tick_value

Dollar value of one tick move.

AssetClass

Top-level asset-class enum exported from the package root.

AssetClass

Bases: str, Enum

Supported asset classes.

Canonical enum for all asset class references across ml4t-data. Plural aliases (EQUITIES, FUTURES, OPTIONS) are provided for backward compatibility with config files and serialized data.


Configuration

Config

Pydantic model for top-level library configuration. Reads defaults from environment variables (QLDM_DATA_ROOT, QLDM_LOG_LEVEL).

from ml4t.data import Config

# Use defaults
config = Config()

# Override data root
config = Config(data_root="/mnt/fast/market_data", log_level="DEBUG")

Config

Config(**data)

Bases: BaseModel

Main configuration for QLDM.

Initialize config with environment variables.

data_root class-attribute instance-attribute
data_root = Field(
    default_factory=lambda: home() / ".qldm" / "data"
)
log_level class-attribute instance-attribute
log_level = 'INFO'
storage class-attribute instance-attribute
storage = Field(default_factory=StorageConfig)
retry class-attribute instance-attribute
retry = Field(default_factory=RetryConfig)
cache class-attribute instance-attribute
cache = Field(default_factory=CacheConfig)
validation class-attribute instance-attribute
validation = Field(
    default_factory=lambda: {
        "enabled": True,
        "strict": False,
    }
)
base_dir property
base_dir

Alias for data_root for backward compatibility.

RetryConfig

Configuration for automatic retry with exponential backoff.

RetryConfig

Bases: BaseModel

Retry configuration.

CacheConfig

Configuration for in-memory caching.

CacheConfig

Bases: BaseModel

Cache configuration.


Exceptions

All exceptions inherit from ML4TDataError, which carries an optional details dictionary for structured error context.

ML4TDataError
├── ProviderError
│   ├── NetworkError
│   │   └── RateLimitError
│   ├── AuthenticationError
│   ├── DataValidationError
│   ├── SymbolNotFoundError
│   └── DataNotAvailableError
├── StorageError
│   └── LockError
├── ConfigurationError
└── CircuitBreakerOpenError

ML4TDataError

ML4TDataError

ML4TDataError(message, details=None)

Bases: Exception

Base exception for all ml4t-data errors.

Initialize ml4t-data error.

Parameters:

Name Type Description Default
message str

Error message

required
details dict[str, Any] | None

Optional dictionary with error details

None

ProviderError

ProviderError

ProviderError(provider, message, details=None)

Bases: ML4TDataError

Base exception for provider-related errors.

Initialize provider error.

Parameters:

Name Type Description Default
provider str

Provider name

required
message str

Error message

required
details dict[str, Any] | None

Optional error details

None

NetworkError

NetworkError

NetworkError(
    provider,
    message="Network error occurred",
    details=None,
    retry_after=None,
)

Bases: ProviderError

Network-related errors (connection, timeout, etc.).

Initialize network error.

Parameters:

Name Type Description Default
provider str

Provider name

required
message str

Error message

'Network error occurred'
details dict[str, Any] | None

Optional error details

None
retry_after float | None

Seconds to wait before retry

None

RateLimitError

RateLimitError

RateLimitError(
    provider, retry_after=None, remaining=None, limit=None
)

Bases: NetworkError

Rate limit exceeded error.

Initialize rate limit error.

Parameters:

Name Type Description Default
provider str

Provider name

required
retry_after float | None

Seconds to wait before retry

None
remaining int | None

Remaining API calls

None
limit int | None

API call limit

None

AuthenticationError

AuthenticationError

AuthenticationError(
    provider, message="Authentication failed", details=None
)

Bases: ProviderError

Authentication/authorization errors.

Initialize authentication error.

DataValidationError

DataValidationError

DataValidationError(
    provider, message, field=None, value=None, details=None
)

Bases: ProviderError

Data validation errors.

Initialize data validation error.

Parameters:

Name Type Description Default
provider str

Provider name

required
message str

Error message

required
field str | None

Field that failed validation

None
value Any | None

Invalid value

None
details dict[str, Any] | None

Optional error details

None

SymbolNotFoundError

SymbolNotFoundError

SymbolNotFoundError(provider, symbol, details=None)

Bases: ProviderError

Symbol not found or invalid.

Initialize symbol not found error.

Parameters:

Name Type Description Default
provider str

Provider name

required
symbol str

The symbol that was not found

required
details dict[str, Any] | None

Optional error details

None

DataNotAvailableError

DataNotAvailableError

DataNotAvailableError(
    provider,
    symbol,
    start=None,
    end=None,
    frequency=None,
    details=None,
)

Bases: ProviderError

Data not available for the requested period.

Initialize data not available error.

Parameters:

Name Type Description Default
provider str

Provider name

required
symbol str

Symbol requested

required
start str | None

Start date

None
end str | None

End date

None
frequency str | None

Data frequency

None
details dict[str, Any] | None

Optional error details

None

StorageError

StorageError

StorageError(message, key=None, details=None)

Bases: ML4TDataError

Storage-related errors.

Initialize storage error.

Parameters:

Name Type Description Default
message str

Error message

required
key str | None

Storage key involved

None
details dict[str, Any] | None

Optional error details

None

LockError

LockError

LockError(key, timeout, details=None)

Bases: StorageError

File locking errors.

Initialize lock error.

Parameters:

Name Type Description Default
key str

Storage key

required
timeout float

Lock timeout that was exceeded

required
details dict[str, Any] | None

Optional error details

None

ConfigurationError

ConfigurationError

ConfigurationError(message, parameter=None, details=None)

Bases: ML4TDataError

Configuration-related errors.

Initialize configuration error.

Parameters:

Name Type Description Default
message str

Error message

required
parameter str | None

Configuration parameter involved

None
details dict[str, Any] | None

Optional error details

None

CircuitBreakerOpenError

CircuitBreakerOpenError

CircuitBreakerOpenError(
    message="Circuit breaker is open",
    failure_count=None,
    details=None,
)

Bases: ML4TDataError

Circuit breaker is open and preventing calls.

Initialize circuit breaker open error.

Parameters:

Name Type Description Default
message str

Error message

'Circuit breaker is open'
failure_count int | None

Number of failures that caused circuit to open

None
details dict[str, Any] | None

Optional error details

None