Home / Libraries / ML4T Data / Docs
ML4T Data
ML4T Data Documentation
Unified market data acquisition from 19+ providers
Skip to content

Data Quality Validation

Target Audience: Quant researchers and production engineers Time to Read: 15 minutes Prerequisites: Understanding OHLCV

Why Data Quality Matters

Bad data = Bad strategies = Lost money

A single bad data point can: - Generate false trading signals - Corrupt backtest results - Cause production trading errors - Lead to incorrect risk calculations

Example of how bad data causes problems:

# Bad data: High < Low (impossible!)
timestamp  | symbol | open  | high  | low   | close
2024-01-15 | AAPL   | 185.0 | 183.0 | 187.0 | 186.0
                             ERROR! High should be >= Low

# This corrupt data could trigger:
# - False "flash crash" signals
# - Invalid volatility calculations
# - Broken technical indicators (RSI, Bollinger Bands)
# - Position sizing errors

ML4T Data's approach: Validate everything, fail loudly, never silently corrupt your data.

OHLCV Invariants

These rules MUST be true for valid OHLCV data:

# The 6 OHLCV Invariants
assert (df["high"] >= df["low"]).all()      # 1. High >= Low
assert (df["high"] >= df["open"]).all()     # 2. High >= Open
assert (df["high"] >= df["close"]).all()    # 3. High >= Close
assert (df["low"] <= df["open"]).all()      # 4. Low <= Open
assert (df["low"] <= df["close"]).all()     # 5. Low <= Close
assert (df["volume"] >= 0).all()            # 6. Volume >= 0

ML4T Data validates these automatically for all provider data.

Example: Catching Bad Data

from ml4t.data.providers import TiingoProvider

provider = TiingoProvider(api_key="key")

try:
    # ML4T Data validates on fetch
    data = provider.fetch_ohlcv("AAPL", "2024-01-01", "2024-01-31")
    print("✅ Data passed validation")

except DataValidationError as e:
    # Bad data caught immediately
    print(f"❌ Invalid data: {e}")
    # Example error:
    # "OHLCV invariant violated at 2024-01-15: High (183.0) < Low (187.0)"

Schema Validation

Beyond OHLCV invariants, ML4T Data validates:

1. Column Presence

# Required columns
REQUIRED_COLUMNS = [
    "timestamp",
    "symbol",
    "open",
    "high",
    "low",
    "close",
    "volume"
]

# ML4T Data checks all providers return these columns
if not all(col in df.columns for col in REQUIRED_COLUMNS):
    raise DataValidationError("Missing required columns")

2. Data Types

# Expected types
EXPECTED_TYPES = {
    "timestamp": pl.Datetime,
    "symbol": pl.String,
    "open": pl.Float64,
    "high": pl.Float64,
    "low": pl.Float64,
    "close": pl.Float64,
    "volume": pl.Float64,
}

# ML4T Data enforces correct types
if df["timestamp"].dtype != pl.Datetime:
    raise DataValidationError("timestamp must be Datetime type")

3. Null/NaN Checks

# No nulls allowed in critical columns
critical_cols = ["timestamp", "symbol", "close"]

for col in critical_cols:
    if df[col].null_count() > 0:
        raise DataValidationError(f"Null values found in {col}")

# NaN checks (different from null)
if df["close"].is_nan().any():
    raise DataValidationError("NaN values found in close prices")

Anomaly Detection

ML4T Data includes anomaly detection for suspicious (but technically valid) data:

1. Price Spikes

# Detect abnormal price changes
df = df.with_columns(
    ((df["close"] - df["close"].shift(1)) / df["close"].shift(1) * 100)
    .alias("pct_change")
)

# Flag extreme moves (>20% in one day for large-cap stocks)
anomalies = df.filter(df["pct_change"].abs() > 20)

if len(anomalies) > 0:
    print(f"⚠️  Warning: {len(anomalies)} extreme price moves detected")
    print(anomalies.select(["timestamp", "symbol", "close", "pct_change"]))

2. Zero Volume

# Zero volume on trading days is suspicious
zero_volume = df.filter(df["volume"] == 0)

if len(zero_volume) > 0:
    print(f"⚠️  Warning: {len(zero_volume)} days with zero volume")
    # Could indicate:
    # - Trading halts
    # - Data provider issues
    # - Delisted stocks

3. Constant Prices

# Prices don't change for multiple days (suspicious)
df = df.with_columns(
    (df["close"] == df["close"].shift(1)).alias("price_unchanged")
)

consecutive_unchanged = df.filter(
    df["price_unchanged"] & df["price_unchanged"].shift(1)
)

if len(consecutive_unchanged) > 0:
    print(f"⚠️  Warning: {len(consecutive_unchanged)} periods with no price change")

4. Timestamp Gaps

from datetime import timedelta

# Expected 1-day intervals (for daily data)
df = df.with_columns(
    (df["timestamp"] - df["timestamp"].shift(1)).alias("time_gap")
)

# Flag gaps > 7 days (weekends/holidays ok, long gaps suspicious)
large_gaps = df.filter(df["time_gap"] > timedelta(days=7))

if len(large_gaps) > 0:
    print(f"⚠️  Warning: {len(large_gaps)} large time gaps detected")
    print(large_gaps.select(["timestamp", "time_gap"]))

Using ML4T Data's Validation Module

from ml4t.data.validation import OHLCVValidator, ValidationConfig

# Configure validation rules
config = ValidationConfig(
    check_ohlcv_invariants=True,
    check_nulls=True,
    check_price_spikes=True,
    max_daily_change_pct=30.0,  # Flag >30% moves
    check_zero_volume=True,
    check_timestamp_gaps=True,
)

# Create validator
validator = OHLCVValidator(config)

# Validate data
result = validator.validate(df)

if result.is_valid:
    print("✅ Data passed all validation checks")
else:
    print(f"❌ Validation failed: {len(result.errors)} errors")
    for error in result.errors:
        print(f"  - {error}")

    # Optionally raise exception
    if result.severity == "critical":
        raise DataValidationError("Critical validation failures")

Handling Bad Data

Option 1: Reject and Alert

try:
    data = provider.fetch_ohlcv("AAPL", start, end)
    validator.validate(data)
except DataValidationError as e:
    logger.error(f"Data validation failed: {e}")
    # Send alert to monitoring system
    alert_system.send(f"Bad data from {provider.name()}: {e}")
    # Don't use bad data
    return None

Option 2: Clean and Continue

def clean_ohlcv_data(df):
    """Clean data by removing invalid rows."""
    original_len = len(df)

    # Remove rows with invariant violations
    df = df.filter(df["high"] >= df["low"])
    df = df.filter(df["high"] >= df["open"])
    df = df.filter(df["high"] >= df["close"])
    df = df.filter(df["low"] <= df["open"])
    df = df.filter(df["low"] <= df["close"])

    # Remove null values
    df = df.drop_nulls(subset=["timestamp", "close"])

    # Remove NaN values
    df = df.filter(~df["close"].is_nan())

    cleaned_len = len(df)
    dropped = original_len - cleaned_len

    if dropped > 0:
        logger.warning(f"Dropped {dropped} invalid rows ({dropped/original_len*100:.1f}%)")

    return df

# Use cleaned data (with caution!)
data = provider.fetch_ohlcv("AAPL", start, end)
data = clean_ohlcv_data(data)

Option 3: Fill Forward (Last Resort)

def fill_missing_data(df):
    """Fill missing values with forward fill (use sparingly)."""
    # Fill nulls with previous valid value
    df = df.with_columns([
        pl.col("open").fill_null(strategy="forward"),
        pl.col("high").fill_null(strategy="forward"),
        pl.col("low").fill_null(strategy="forward"),
        pl.col("close").fill_null(strategy="forward"),
        pl.col("volume").fill_null(0),  # Volume = 0 for missing days
    ])

    logger.warning("Applied forward fill to missing data")
    return df

Provider-Specific Validation

Different providers have different data quality issues:

Yahoo Finance

# Known issues:
# - Adjusted close can be null
# - Volume sometimes zero
# - Splits not always adjusted correctly

def validate_yahoo_data(df):
    # Check for null adjusted close
    if "adj_close" in df.columns:
        null_adj = df.filter(df["adj_close"].is_null())
        if len(null_adj) > 0:
            logger.warning(f"Yahoo: {len(null_adj)} rows with null adj_close")

    # Verify split adjustments
    if "split_factor" in df.columns:
        splits = df.filter(df["split_factor"] != 1.0)
        if len(splits) > 0:
            logger.info(f"Yahoo: {len(splits)} stock splits detected")

CoinGecko

# Known issues:
# - Volume can be zero for low-liquidity coins
# - Prices sometimes lag by 1-2 minutes
# - Historical data can be revised

def validate_coingecko_data(df):
    # Check for zero volume (common for small coins)
    zero_vol_pct = (df["volume"] == 0).sum() / len(df) * 100
    if zero_vol_pct > 10:
        logger.warning(f"CoinGecko: {zero_vol_pct:.1f}% of data has zero volume")

    # Check for price lags (timestamp issues)
    current_time = datetime.now()
    latest_timestamp = df["timestamp"].max()
    lag_hours = (current_time - latest_timestamp).total_seconds() / 3600
    if lag_hours > 2:
        logger.warning(f"CoinGecko: Data lagged by {lag_hours:.1f} hours")

Cross-Provider Validation

Compare data from multiple providers to catch issues:

def cross_validate_providers(symbol, start, end):
    """Compare data from multiple providers."""
    # Fetch from 2 providers
    tiingo_data = TiingoProvider(api_key="key1").fetch_ohlcv(symbol, start, end)
    iex_data = IEXCloudProvider(api_key="key2").fetch_ohlcv(symbol, start, end)

    # Merge on timestamp
    merged = tiingo_data.join(iex_data, on="timestamp", suffix="_iex")

    # Compare close prices
    merged = merged.with_columns(
        ((merged["close"] - merged["close_iex"]).abs() / merged["close"] * 100)
        .alias("price_diff_pct")
    )

    # Flag large discrepancies (>1%)
    discrepancies = merged.filter(merged["price_diff_pct"] > 1.0)

    if len(discrepancies) > 0:
        logger.warning(f"Price discrepancies between providers:")
        print(discrepancies.select([
            "timestamp",
            "close",
            "close_iex",
            "price_diff_pct"
        ]))
        return False

    logger.info(f"✅ Providers agree on {symbol} prices")
    return True

Production Validation Checklist

Use this checklist for production pipelines:

class ProductionValidator:
    """Comprehensive validation for production data."""

    def validate_for_production(self, df, symbol, provider_name):
        """Run all validation checks."""
        errors = []
        warnings = []

        # 1. Schema validation
        if not self._validate_schema(df):
            errors.append("Schema validation failed")

        # 2. OHLCV invariants
        if not self._validate_invariants(df):
            errors.append("OHLCV invariant violations")

        # 3. Null checks
        null_count = df.null_count().sum()
        if null_count > 0:
            warnings.append(f"{null_count} null values found")

        # 4. Price spike detection
        spikes = self._detect_price_spikes(df)
        if len(spikes) > 0:
            warnings.append(f"{len(spikes)} price spikes detected")

        # 5. Volume checks
        zero_vol = (df["volume"] == 0).sum()
        if zero_vol > len(df) * 0.1:  # >10% zero volume
            warnings.append(f"{zero_vol} days with zero volume ({zero_vol/len(df)*100:.1f}%)")

        # 6. Timestamp continuity
        gaps = self._detect_timestamp_gaps(df)
        if len(gaps) > 0:
            warnings.append(f"{len(gaps)} timestamp gaps detected")

        # 7. Data freshness
        latest = df["timestamp"].max()
        age_hours = (datetime.now() - latest).total_seconds() / 3600
        if age_hours > 48:
            warnings.append(f"Data is {age_hours:.1f} hours old")

        # Log results
        if errors:
            logger.error(f"{symbol} validation FAILED: {errors}")
            return False

        if warnings:
            logger.warning(f"{symbol} validation warnings: {warnings}")

        logger.info(f"✅ {symbol} passed production validation")
        return True

Monitoring Data Quality Over Time

Track data quality metrics:

class DataQualityMonitor:
    """Monitor data quality trends."""

    def __init__(self):
        self.metrics = []

    def record_validation(self, symbol, provider, result):
        """Record validation results."""
        metrics = {
            "timestamp": datetime.now(),
            "symbol": symbol,
            "provider": provider,
            "valid": result.is_valid,
            "error_count": len(result.errors),
            "warning_count": len(result.warnings),
        }
        self.metrics.append(metrics)

    def get_provider_quality_score(self, provider, days=30):
        """Calculate quality score for provider."""
        recent = [m for m in self.metrics
                  if m["provider"] == provider
                  and (datetime.now() - m["timestamp"]).days <= days]

        if not recent:
            return None

        valid_count = sum(1 for m in recent if m["valid"])
        total_count = len(recent)
        quality_score = valid_count / total_count * 100

        return {
            "provider": provider,
            "quality_score": quality_score,
            "total_validations": total_count,
            "valid": valid_count,
            "invalid": total_count - valid_count,
        }

Summary

Key Takeaways: 1. ✅ ML4T Data validates automatically - OHLCV invariants checked on every fetch 2. ✅ Multiple validation layers - Schema, nulls, anomalies, provider-specific 3. ✅ Fail loudly - Never silently accept bad data 4. ✅ Cross-provider validation - Compare multiple sources 5. ✅ Monitor quality - Track data quality metrics over time

Best Practices: - Always validate before using data in production - Log validation results for debugging - Alert on validation failures - Compare providers when possible - Monitor data freshness

Next Steps: - Tutorial 05: Multi-Provider Strategies - API Reference


Previous Tutorial: 03: Incremental Updates Next Tutorial: 05: Multi-Provider Strategies