From c42f2223d824e7a16ec13f79f1231f5d12fee5ad Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Thu, 30 Apr 2026 23:01:20 +0000 Subject: [PATCH] fix: move backfill logic to standalone script (fixes YAML parse error) Inline Python with f-strings and colons broke Helm YAML parsing. Moved to scripts/backfill_market_data.py and call it directly from the init container command. --- .../stonks-oracle/templates/deployments.yaml | 57 +------- scripts/backfill_market_data.py | 125 ++++++++++++++++++ 2 files changed, 126 insertions(+), 56 deletions(-) create mode 100644 scripts/backfill_market_data.py diff --git a/infra/helm/stonks-oracle/templates/deployments.yaml b/infra/helm/stonks-oracle/templates/deployments.yaml index b0d3fd2..ac046d8 100644 --- a/infra/helm/stonks-oracle/templates/deployments.yaml +++ b/infra/helm/stonks-oracle/templates/deployments.yaml @@ -93,62 +93,7 @@ spec: - name: backfill-market-data image: {{ $root.Values.image.registry }}/{{ $svc.image }}:{{ $root.Values.image.tag }} imagePullPolicy: {{ $root.Values.image.pullPolicy }} - command: ["sh", "-c"] - args: - - | - COUNT=$(PGPASSWORD="$POSTGRES_PASSWORD" psql \ - -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" \ - -U "$POSTGRES_USER" -d "$POSTGRES_DB" \ - -tAc "SELECT count(*) FROM market_snapshots WHERE snapshot_type = 'bar'" 2>/dev/null || echo "0") - if [ "$COUNT" -lt "50" ] && [ -n "$MARKET_DATA_API_KEY" ]; then - echo "Only $COUNT market bars found — backfilling 90 days from Polygon..." - python -c " -import asyncio, asyncpg, hashlib, httpx, json, os -from datetime import date, datetime, timedelta, timezone - -async def backfill(): - api_key = os.environ.get('MARKET_DATA_API_KEY', '') - base_url = os.environ.get('MARKET_DATA_BASE_URL', 'https://api.polygon.io') - dsn = f'postgresql://{os.environ[\"POSTGRES_USER\"]}:{os.environ[\"POSTGRES_PASSWORD\"]}@{os.environ[\"POSTGRES_HOST\"]}:{os.environ[\"POSTGRES_PORT\"]}/{os.environ[\"POSTGRES_DB\"]}' - pool = await asyncpg.create_pool(dsn=dsn) - tickers = [r['ticker'] for r in await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker')] - to_d = date.today().isoformat() - from_d = (date.today() - timedelta(days=90)).isoformat() - total = 0 - async with httpx.AsyncClient(timeout=30) as client: - for ticker in tickers: - url = f'{base_url}/v2/aggs/ticker/{ticker}/range/1/day/{from_d}/{to_d}' - try: - resp = await client.get(url, params={'apiKey': api_key, 'adjusted': 'true', 'sort': 'asc', 'limit': '500'}) - resp.raise_for_status() - bars = resp.json().get('results', []) - except Exception as e: - print(f' {ticker}: fetch failed ({e})') - continue - existing = {r['bar_ts'] for r in await pool.fetch(\"SELECT DISTINCT (data->>'t')::bigint AS bar_ts FROM market_snapshots WHERE ticker = \$1 AND snapshot_type = 'bar'\", ticker) if r['bar_ts']} - co = await pool.fetchrow('SELECT id FROM companies WHERE ticker = \$1', ticker) - cid = co['id'] if co else None - inserted = 0 - for bar in bars: - bar_ts = bar.get('t') - if not bar_ts or bar_ts in existing: - continue - bj = json.dumps(bar) - ch = hashlib.sha256(bj.encode()).hexdigest() - ca = datetime.fromtimestamp(bar_ts / 1000, tz=timezone.utc) - await pool.execute('INSERT INTO market_snapshots (company_id, ticker, snapshot_type, data, source_provider, captured_at, content_hash) VALUES (\$1, \$2, \'bar\', \$3::jsonb, \'polygon_backfill\', \$4, \$5)', cid, ticker, bj, ca, ch) - existing.add(bar_ts) - inserted += 1 - total += inserted - if inserted: - print(f' {ticker}: {inserted} bars') - await pool.close() - print(f'Backfill complete: {total} bars across {len(tickers)} tickers') -asyncio.run(backfill()) -" - else - echo "Market data has $COUNT bars — skipping backfill." - fi + command: ["python", "/app/scripts/backfill_market_data.py"] securityContext: {{- include "stonks.containerSecurityContext" $root | nindent 12 }} envFrom: diff --git a/scripts/backfill_market_data.py b/scripts/backfill_market_data.py new file mode 100644 index 0000000..1603940 --- /dev/null +++ b/scripts/backfill_market_data.py @@ -0,0 +1,125 @@ +"""Backfill 90 days of daily OHLCV bars from Polygon into market_snapshots. + +Run as: python scripts/backfill_market_data.py +Requires env vars: POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, + POSTGRES_PORT, POSTGRES_DB, MARKET_DATA_API_KEY, + MARKET_DATA_BASE_URL (optional, defaults to polygon). + +Skips if market_snapshots already has >= 50 bars. +""" + +import asyncio +import hashlib +import json +import os +import sys +from datetime import date, datetime, timedelta, timezone + +import asyncpg +import httpx + + +async def backfill() -> None: + api_key = os.environ.get("MARKET_DATA_API_KEY", "") + if not api_key: + print("No MARKET_DATA_API_KEY set — skipping backfill.") + return + + base_url = os.environ.get("MARKET_DATA_BASE_URL", "https://api.polygon.io") + dsn = ( + f"postgresql://{os.environ['POSTGRES_USER']}:{os.environ['POSTGRES_PASSWORD']}" + f"@{os.environ['POSTGRES_HOST']}:{os.environ['POSTGRES_PORT']}" + f"/{os.environ['POSTGRES_DB']}" + ) + pool = await asyncpg.create_pool(dsn=dsn) + + # Check if backfill is needed + count = await pool.fetchval( + "SELECT count(*) FROM market_snapshots WHERE snapshot_type = 'bar'" + ) + if count >= 50: + print(f"Market data has {count} bars — skipping backfill.") + await pool.close() + return + + print(f"Only {count} market bars found — backfilling 90 days from Polygon...") + + tickers = [ + r["ticker"] + for r in await pool.fetch( + "SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker" + ) + ] + to_d = date.today().isoformat() + from_d = (date.today() - timedelta(days=90)).isoformat() + total = 0 + + async with httpx.AsyncClient(timeout=30) as client: + for ticker in tickers: + url = f"{base_url}/v2/aggs/ticker/{ticker}/range/1/day/{from_d}/{to_d}" + try: + resp = await client.get( + url, + params={ + "apiKey": api_key, + "adjusted": "true", + "sort": "asc", + "limit": "500", + }, + ) + resp.raise_for_status() + bars = resp.json().get("results", []) + except Exception as e: + print(f" {ticker}: fetch failed ({e})") + continue + + existing = { + r["bar_ts"] + for r in await pool.fetch( + "SELECT DISTINCT (data->>'t')::bigint AS bar_ts " + "FROM market_snapshots WHERE ticker = $1 AND snapshot_type = 'bar'", + ticker, + ) + if r["bar_ts"] + } + co = await pool.fetchrow( + "SELECT id FROM companies WHERE ticker = $1", ticker + ) + cid = co["id"] if co else None + inserted = 0 + + for bar in bars: + bar_ts = bar.get("t") + if not bar_ts or bar_ts in existing: + continue + bj = json.dumps(bar) + ch = hashlib.sha256(bj.encode()).hexdigest() + ca = datetime.fromtimestamp(bar_ts / 1000, tz=timezone.utc) + await pool.execute( + "INSERT INTO market_snapshots " + "(company_id, ticker, snapshot_type, data, source_provider, " + "captured_at, content_hash) " + "VALUES ($1, $2, 'bar', $3::jsonb, 'polygon_backfill', $4, $5)", + cid, + ticker, + bj, + ca, + ch, + ) + existing.add(bar_ts) + inserted += 1 + + total += inserted + if inserted: + print(f" {ticker}: {inserted} bars") + + await pool.close() + print(f"Backfill complete: {total} bars across {len(tickers)} tickers") + + +if __name__ == "__main__": + try: + asyncio.run(backfill()) + except Exception as e: + print(f"Backfill failed: {e}", file=sys.stderr) + sys.exit(0) # Don't block startup on backfill failure