Home / Libraries / ML4T Data / Docs
ML4T Data
ML4T Data Documentation
Unified market data acquisition from 19+ providers
Skip to content

Incremental Updates: The Smart Way to Update Data

Target Audience: Developers building production data pipelines Time to Read: 20 minutes Prerequisites: Understanding OHLCV, Rate Limiting

The Problem: Naive Data Updates

Let's say you want to maintain a database of stock prices for 100 symbols. The naive approach:

# ❌ NAIVE APPROACH - Don't do this!
for symbol in symbols:
    # Fetch entire history every day
    data = provider.fetch_ohlcv(symbol, "2020-01-01", "2024-12-31")
    storage.write(data, symbol, provider_name)

# Problems:
# 1. Wastes 1000+ API calls per symbol (refetching old data)
# 2. Slow (download gigabytes of data you already have)
# 3. Expensive (hits rate limits quickly)
# 4. Fragile (fails if provider rate-limits you)

The math: - 100 symbols - 1000 days of history each - = 100,000 OHLCV records fetched - = 100,000+ API calls per day! 🔥

Most free tiers allow 500-1000 calls/day. You'd burn through your quota in seconds.

The Solution: Incremental Updates

Incremental updates only fetch NEW data since your last update:

# ✅ SMART APPROACH - Incremental updates
from ml4t.data.providers import TiingoProvider, TiingoUpdater
from ml4t.data.storage.hive import HiveStorage
from ml4t.data.storage.backend import StorageConfig

# Setup storage and updater
storage = HiveStorage(StorageConfig(base_path="./data"))
provider = TiingoProvider(api_key="your_key")
updater = TiingoUpdater(provider, storage)

# Update symbols incrementally
for symbol in symbols:
    result = updater.update_symbol(symbol, incremental=True)
    print(f"{symbol}: {result['records_added']} new records")

# Benefits:
# 1. Only fetches 1 day of new data per symbol (100 API calls total)
# 2. Fast (downloads only what's new)
# 3. Efficient (respects rate limits)
# 4. Robust (handles gaps and missing data automatically)

The math: - 100 symbols - 1 day of new data each - = 100 OHLCV records fetched - = 100 API calls per day ✅ - 1000x reduction in API calls!

How Incremental Updates Work

Step 1: Check What You Have

# Updater checks storage for existing data
metadata = storage.get_metadata(symbol, provider_name)

if metadata:
    last_date = metadata['end_date']  # e.g., "2024-01-14"
    print(f"Last update: {last_date}")
else:
    print("No existing data - will fetch default history")

Step 2: Calculate Gap

# Calculate what data is missing
today = datetime.now().strftime("%Y-%m-%d")

if last_date:
    start_date = last_date  # Start from last known date
    gap_days = (today - last_date).days
    print(f"Gap: {gap_days} days")
else:
    start_date = default_start  # e.g., 90 days ago for crypto
    print("First fetch - getting default history")

end_date = today

Step 3: Fetch Only Gap

# Only fetch the missing data
if gap_days > 0:
    new_data = provider.fetch_ohlcv(symbol, start_date, end_date)
    print(f"Fetched {len(new_data)} new records")
else:
    print("Already up to date!")
    return

Step 4: Merge and Store

# Append new data to existing data
# ML4T Data handles deduplication automatically
storage.write(new_data, symbol, provider_name)
print("Stored successfully")

The ProviderUpdater Pattern

Every ML4T Data provider has a corresponding ProviderUpdater class:

Provider Updater Class
TiingoProvider TiingoUpdater
CoinGeckoProvider CoinGeckoUpdater
IEXCloudProvider IEXCloudUpdater
EODHDProvider EODHDUpdater
AlphaVantageProvider AlphaVantageUpdater
FinnhubProvider FinnhubUpdater

All updaters share the same interface:

class ProviderUpdater:
    def __init__(self, provider, storage):
        self.provider = provider
        self.storage = storage

    def update_symbol(
        self,
        symbol: str,
        start_time: str | None = None,
        end_time: str | None = None,
        frequency: str = "daily",
        incremental: bool = True,
        dry_run: bool = False,
    ) -> dict:
        """Update symbol data incrementally or forcefully."""
        ...

Basic Usage

First Update (Bootstrap)

from ml4t.data.providers import TiingoProvider, TiingoUpdater
from ml4t.data.storage.hive import HiveStorage
from ml4t.data.storage.backend import StorageConfig

# Setup
storage = HiveStorage(StorageConfig(base_path="./data"))
provider = TiingoProvider(api_key="your_key")
updater = TiingoUpdater(provider, storage)

# First update: Downloads default history (30 days for stocks)
result = updater.update_symbol("AAPL", incremental=True)

print(result)
# {
#     'success': True,
#     'symbol': 'AAPL',
#     'records_fetched': 30,
#     'records_added': 30,
#     'start_date': '2024-12-15',
#     'end_date': '2025-01-14',
#     'message': 'Successfully updated'
# }

Daily Updates

# Second update (next day): Only fetches NEW data
result = updater.update_symbol("AAPL", incremental=True)

print(result)
# {
#     'success': True,
#     'symbol': 'AAPL',
#     'records_fetched': 1,
#     'records_added': 1,
#     'start_date': '2025-01-15',
#     'end_date': '2025-01-15',
#     'message': 'Added 1 new record'
# }

Already Up to Date

# Same day, run again: Skips fetch
result = updater.update_symbol("AAPL", incremental=True)

print(result)
# {
#     'success': True,
#     'symbol': 'AAPL',
#     'skip_reason': 'already_up_to_date',
#     'last_date': '2025-01-15',
#     'message': 'Data already current'
# }

Advanced Usage

Custom Date Ranges

# Override default range for bootstrap
result = updater.update_symbol(
    "AAPL",
    start_time="2020-01-01",  # Get 5 years of history
    end_time="2024-12-31",
    incremental=False,  # Force fetch (ignore existing data)
)

Dry Run Mode

# Preview what would be fetched without actually storing
result = updater.update_symbol("AAPL", incremental=True, dry_run=True)

print(result)
# {
#     'success': True,
#     'symbol': 'AAPL',
#     'records_fetched': 5,
#     'dry_run': True,
#     'message': 'Would add 5 new records (dry run - not stored)'
# }

Multiple Frequencies

# Update daily, weekly, and monthly data
for frequency in ["daily", "weekly", "monthly"]:
    result = updater.update_symbol(
        "AAPL",
        frequency=frequency,
        incremental=True
    )
    print(f"{frequency}: {result['records_added']} records")

Handling Gaps

The Gap Problem

What if you miss a few days of updates?

Stored data:  [Jan 1] [Jan 2] [Jan 3] ... [Jan 10]
                                              ↑ Last update
Today:        Jan 15
                                         Gap!
Missing:      [Jan 11] [Jan 12] [Jan 13] [Jan 14]

Solution: Incremental updates automatically detect and fill gaps!

# Even after 5 days without updates, incremental mode works
result = updater.update_symbol("AAPL", incremental=True)

print(result)
# {
#     'success': True,
#     'records_fetched': 5,  # Fetched Jan 11-15
#     'records_added': 5,
#     'gap_detected': True,
#     'gap_days': 5,
#     'message': 'Filled 5-day gap'
# }

Handling Provider Outages

If a provider is down, updater gracefully handles it:

try:
    result = updater.update_symbol("AAPL", incremental=True)
except NetworkError as e:
    logger.error(f"Provider unavailable: {e}")
    # Try again later - incremental mode will catch up

Batch Updates

Update Multiple Symbols

symbols = ["AAPL", "MSFT", "GOOGL", "TSLA", "AMZN"]

for symbol in symbols:
    try:
        result = updater.update_symbol(symbol, incremental=True)
        logger.info(f"{symbol}: +{result['records_added']} records")
    except Exception as e:
        logger.error(f"{symbol}: Failed - {e}")
        continue  # Skip to next symbol

Parallel Updates (Careful!)

from concurrent.futures import ThreadPoolExecutor
import threading

# Use thread-safe storage
storage = HiveStorage(StorageConfig(base_path="./data"))

def update_symbol_safe(symbol):
    """Thread-safe update function."""
    # Each thread gets its own updater instance
    thread_updater = TiingoUpdater(provider, storage)
    return thread_updater.update_symbol(symbol, incremental=True)

# Update in parallel (max 5 concurrent to respect rate limits)
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(update_symbol_safe, s): s for s in symbols}

    for future in futures:
        symbol = futures[future]
        try:
            result = future.result()
            print(f"{symbol}: +{result['records_added']}")
        except Exception as e:
            print(f"{symbol}: ERROR - {e}")

⚠️ Warning: Parallel updates can hit rate limits faster. Use conservatively.

Production Patterns

Pattern 1: Scheduled Daily Updates

import schedule
import time
from datetime import datetime

def daily_update_job():
    """Run at market close + 30 minutes (16:30 ET)."""
    logger.info("Starting daily update...")

    updated = 0
    failed = 0

    for symbol in portfolio_symbols:
        try:
            result = updater.update_symbol(symbol, incremental=True)
            if result['success']:
                updated += 1
                logger.info(f"{symbol}: +{result['records_added']} records")
        except Exception as e:
            failed += 1
            logger.error(f"{symbol}: {e}")

    logger.info(f"Update complete: {updated} success, {failed} failed")

# Schedule for 16:30 ET (21:30 UTC) every weekday
schedule.every().monday.at("21:30").do(daily_update_job)
schedule.every().tuesday.at("21:30").do(daily_update_job)
schedule.every().wednesday.at("21:30").do(daily_update_job)
schedule.every().thursday.at("21:30").do(daily_update_job)
schedule.every().friday.at("21:30").do(daily_update_job)

while True:
    schedule.run_pending()
    time.sleep(60)

Pattern 2: Catch-Up After Downtime

def catch_up_all_symbols():
    """Catch up all symbols after downtime."""
    logger.info("Starting catch-up update...")

    for symbol in all_symbols:
        result = updater.update_symbol(symbol, incremental=True)

        if result.get('gap_detected'):
            logger.warning(
                f"{symbol}: Filled {result['gap_days']}-day gap "
                f"({result['records_added']} records)"
            )
        elif result.get('skip_reason') == 'already_up_to_date':
            logger.info(f"{symbol}: Already current")
        else:
            logger.info(f"{symbol}: +{result['records_added']} records")

catch_up_all_symbols()

Pattern 3: Prioritized Updates with Rate Limit Handling

from ml4t.data.core.exceptions import RateLimitError

def prioritized_update(symbols_priority_order):
    """Update high-priority symbols first, stop at rate limit."""
    updated_symbols = []

    for symbol in symbols_priority_order:
        try:
            result = updater.update_symbol(symbol, incremental=True)
            updated_symbols.append(symbol)
            logger.info(f"{symbol}: Updated ({len(updated_symbols)}/{len(symbols_priority_order)})")

        except RateLimitError as e:
            logger.warning(f"Rate limit reached at symbol {symbol}")
            logger.info(f"Successfully updated {len(updated_symbols)} symbols")
            logger.info(f"Remaining symbols will be updated in next run")
            break

        except Exception as e:
            logger.error(f"{symbol}: Failed - {e}")
            continue

    return updated_symbols

# Run with priority order (high market cap first)
symbols_by_market_cap = ["AAPL", "MSFT", "GOOGL", ...]
updated = prioritized_update(symbols_by_market_cap)

Monitoring and Debugging

Check Update Status

# Check when symbol was last updated
metadata = storage.get_metadata("AAPL", "tiingo")

print(f"Last update: {metadata['end_date']}")
print(f"Total records: {metadata['record_count']}")
print(f"Date range: {metadata['start_date']} to {metadata['end_date']}")

Validate Data Freshness

from datetime import datetime, timedelta

def check_data_freshness(symbol, max_age_days=2):
    """Alert if data is stale."""
    metadata = storage.get_metadata(symbol, "tiingo")

    if not metadata:
        logger.warning(f"{symbol}: No data found!")
        return False

    last_date = datetime.strptime(metadata['end_date'], "%Y-%m-%d")
    age_days = (datetime.now() - last_date).days

    if age_days > max_age_days:
        logger.warning(f"{symbol}: Data is {age_days} days old!")
        return False

    logger.info(f"{symbol}: Fresh (updated {age_days} days ago)")
    return True

# Check all symbols
stale_symbols = []
for symbol in portfolio_symbols:
    if not check_data_freshness(symbol, max_age_days=2):
        stale_symbols.append(symbol)

if stale_symbols:
    logger.error(f"Stale data for {len(stale_symbols)} symbols: {stale_symbols}")

Common Pitfalls

Pitfall 1: Not Using Incremental Updates

# ❌ BAD: Refetching everything
data = provider.fetch_ohlcv("AAPL", "2020-01-01", "2024-12-31")
# Wastes 1000+ API calls every time

# ✅ GOOD: Incremental updates
result = updater.update_symbol("AAPL", incremental=True)
# Only 1 API call per day

Pitfall 2: Ignoring Return Values

# ❌ BAD: Not checking result
updater.update_symbol("AAPL", incremental=True)
# Did it work? Did it skip? Did it fill a gap? Who knows!

# ✅ GOOD: Check result and log
result = updater.update_symbol("AAPL", incremental=True)
if result['success']:
    logger.info(f"Updated: +{result['records_added']} records")
else:
    logger.error(f"Failed: {result.get('error')}")

Pitfall 3: Running Too Frequently

# ❌ BAD: Updating every minute
while True:
    updater.update_symbol("AAPL", incremental=True)
    time.sleep(60)
# Most markets update once per day! Wastes API calls.

# ✅ GOOD: Update once per day after market close
schedule.every().day.at("16:30").do(update_all_symbols)

Pitfall 4: Not Handling Provider-Specific Limits

# ❌ BAD: Updating 1000 symbols with Alpha Vantage (25/day limit)
for symbol in symbols[:1000]:  # Won't work!
    updater.update_symbol(symbol, incremental=True)

# ✅ GOOD: Respect provider limits
daily_limit = 25  # Alpha Vantage free tier
for symbol in symbols[:daily_limit]:
    updater.update_symbol(symbol, incremental=True)

Performance Optimization

Benchmark: Naive vs. Incremental

import time

# Naive approach
start = time.time()
for symbol in symbols[:100]:
    data = provider.fetch_ohlcv(symbol, "2020-01-01", "2024-12-31")
    storage.write(data, symbol, "provider")
naive_time = time.time() - start
print(f"Naive: {naive_time:.1f}s")
# Naive: 342.7s (5.7 minutes)

# Incremental approach
start = time.time()
for symbol in symbols[:100]:
    updater.update_symbol(symbol, incremental=True)
incremental_time = time.time() - start
print(f"Incremental: {incremental_time:.1f}s")
# Incremental: 34.2s (34 seconds)

print(f"Speedup: {naive_time / incremental_time:.1f}x faster")
# Speedup: 10.0x faster

Summary

Key Takeaways: 1. ✅ Always use incremental updates - 100-1000x more efficient 2. ✅ ProviderUpdater pattern - Consistent across all providers 3. ✅ Automatic gap filling - Handles missing days gracefully 4. ✅ Dry run mode - Test before storing 5. ✅ Check return values - Monitor success and debug failures

When to Use Incremental: - Daily/weekly portfolio updates - Production data pipelines - Any scenario where you update the same symbols repeatedly

When to Force Full Fetch: - First-time bootstrap - Data quality issues (provider had bad data, now fixed) - Switching providers - Backfilling historical data

Next Steps: - Tutorial 04: Data Quality Validation - Tutorial 05: Multi-Provider Strategies


Previous Tutorial: 02: Rate Limiting Best Practices Next Tutorial: 04: Data Quality Validation