From f2b9d6c00af8a9d5f23433a093ed086574a33c15 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Sun, 12 Apr 2026 02:45:37 -0700 Subject: [PATCH] phase 17: fix scheduler config parsing, worker entry points, and seed data for Polygon sources --- .kiro/specs/stonks-oracle/tasks.md | 6 +-- infra/helm/stonks-oracle/values.yaml | 10 ++-- services/scheduler/app.py | 19 ++++++- services/symbol_registry/seed.py | 77 ++++++++++++++++------------ 4 files changed, 70 insertions(+), 42 deletions(-) diff --git a/.kiro/specs/stonks-oracle/tasks.md b/.kiro/specs/stonks-oracle/tasks.md index a4db06b..9dafa18 100644 --- a/.kiro/specs/stonks-oracle/tasks.md +++ b/.kiro/specs/stonks-oracle/tasks.md @@ -174,15 +174,15 @@ ## Phase 17 - First Vertical Slice: Live Pipeline End-to-End -- [ ] 17. Activate the full data pipeline for a set of tracked symbols -- [ ] 17.1 Seed initial symbols and configure sources via the dashboard +- [-] 17. Activate the full data pipeline for a set of tracked symbols +- [x] 17.1 Seed initial symbols and configure sources via the dashboard - Use the dashboard Companies page to add 5-10 symbols (e.g. AAPL, MSFT, GOOGL, AMZN, TSLA, NVDA, META, JPM, V, UNH) - For each company, add sources via the Company Detail → Sources tab: one `market_api` source (Polygon), one `news_api` source, one `filings_api` source - Configure source `config` JSON with the correct Polygon endpoint patterns per ticker - Verify companies and sources appear in the dashboard and via `curl https://stonks-registry.celestium.life/companies` - _Requirements: 1.1, 1.2, 1.3, 2.1_ -- [ ] 17.2 Wire the scheduler to enqueue ingestion jobs for active sources +- [-] 17.2 Wire the scheduler to enqueue ingestion jobs for active sources - Verify the scheduler service reads active companies and sources from PostgreSQL - Verify it enqueues Redis jobs for each source on its polling interval - Check scheduler logs: `kubectl logs -n stonks-oracle deployment/scheduler --tail=50` diff --git a/infra/helm/stonks-oracle/values.yaml b/infra/helm/stonks-oracle/values.yaml index 92996ca..21de3c5 100644 --- a/infra/helm/stonks-oracle/values.yaml +++ b/infra/helm/stonks-oracle/values.yaml @@ -64,7 +64,7 @@ services: extractor: replicas: 1 image: extractor - command: "python -m services.extractor.worker" + command: "python -m services.extractor.main" tier: processing secrets: [stonks-core-secrets] resources: @@ -74,7 +74,7 @@ services: aggregation: replicas: 1 image: aggregation - command: "python -m services.aggregation.worker" + command: "python -m services.aggregation.main" tier: processing secrets: [stonks-core-secrets] resources: @@ -84,7 +84,7 @@ services: recommendation: replicas: 1 image: recommendation - command: "python -m services.recommendation.worker" + command: "python -m services.recommendation.main" tier: processing secrets: [stonks-core-secrets] resources: @@ -105,7 +105,7 @@ services: brokerAdapter: replicas: 1 image: broker-adapter - command: "python -m services.adapters.broker_adapter" + command: "python -m services.adapters.broker_service" tier: trading secrets: [stonks-core-secrets, stonks-broker-secrets] resources: @@ -115,7 +115,7 @@ services: lakePublisher: replicas: 1 image: lake-publisher - command: "python -m services.lake_publisher.worker" + command: "python -m services.lake_publisher.jobs" tier: analytics secrets: [stonks-core-secrets] resources: diff --git a/services/scheduler/app.py b/services/scheduler/app.py index cee0ba1..318483b 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -12,6 +12,21 @@ import logging from datetime import datetime from typing import Any, Optional + +def _ensure_dict(val: Any) -> Optional[dict]: + """Coerce a JSONB value (dict or JSON string) to a Python dict.""" + if val is None: + return None + if isinstance(val, dict): + return val + if isinstance(val, str): + try: + parsed = json.loads(val) + return parsed if isinstance(parsed, dict) else None + except (json.JSONDecodeError, TypeError): + return None + return None + import asyncpg import redis.asyncio as aioredis @@ -132,7 +147,7 @@ def build_job_payload( "aliases": aliases, "source_type": source["source_type"], "source_name": source["source_name"], - "config": dict(source["config"]) if source["config"] else {}, + "config": _ensure_dict(source["config"]) or {}, "credibility_score": float(source["credibility_score"]) if source["credibility_score"] else 0.5, "scheduled_at": now.isoformat(), } @@ -223,7 +238,7 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: for src in sources: source_id = src["source_id"] source_type = src["source_type"] - source_config = dict(src["config"]) if src["config"] else None + source_config = _ensure_dict(src["config"]) # Check last run status and timing last_run = await fetch_last_run(pool, source_id) diff --git a/services/symbol_registry/seed.py b/services/symbol_registry/seed.py index 2d173ce..b0be219 100644 --- a/services/symbol_registry/seed.py +++ b/services/symbol_registry/seed.py @@ -7,6 +7,7 @@ Usage: python -m services.symbol_registry.seed """ import asyncio +import json import logging import asyncpg @@ -47,32 +48,29 @@ ALIASES = { } # --- Source configs per company --- -# Alpha Vantage for market data (free: 25 req/day) -# NewsAPI for news (free: 100 req/day) -# SEC EDGAR for filings (free, rate-limited by user-agent) -# Alpaca for paper trading (free unlimited paper) +# Polygon.io for market data and news (matches PolygonMarketAdapter and PolygonNewsAdapter) +# SEC EDGAR for filings (matches SECEdgarAdapter) +# Alpaca for paper trading (matches AlpacaBrokerAdapter) SOURCES_PER_COMPANY = [ { "source_type": "market_api", - "source_name": "Alpha Vantage", + "source_name": "Polygon Market Data", "credibility_score": 0.9, "config": { - "provider": "alpha_vantage", - "base_url": "https://www.alphavantage.co", - "endpoint": "/query", - "functions": ["TIME_SERIES_DAILY", "GLOBAL_QUOTE", "OVERVIEW"], + "provider": "polygon", + "endpoint": "prev_bars", + "adjusted": True, }, }, { "source_type": "news_api", - "source_name": "NewsAPI", + "source_name": "Polygon News", "credibility_score": 0.7, "config": { - "provider": "newsapi", - "base_url": "https://newsapi.org", - "endpoint": "/v2/everything", - "page_size": 20, + "provider": "polygon", + "limit": 20, + "order": "desc", }, }, { @@ -81,8 +79,7 @@ SOURCES_PER_COMPANY = [ "credibility_score": 1.0, "config": { "provider": "sec_edgar", - "base_url": "https://efts.sec.gov", - "forms": ["8-K", "10-Q", "10-K"], + "forms": "8-K,10-Q,10-K", "user_agent": "StonksOracle/1.0", }, }, @@ -95,22 +92,26 @@ BROKER_SOURCE = { "credibility_score": 1.0, "config": { "provider": "alpaca", - "base_url": "https://paper-api.alpaca.markets", "mode": "paper", }, } async def seed(pool: asyncpg.Pool) -> None: - """Insert seed data. Skips existing records.""" + """Insert seed data. Uses upsert for companies, skips existing aliases/sources.""" company_ids = {} - # Companies + # Companies — upsert on (ticker, exchange) for c in COMPANIES: row = await pool.fetchrow( """INSERT INTO companies (ticker, legal_name, exchange, sector, industry, market_cap_bucket) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (ticker, exchange) DO UPDATE SET legal_name = EXCLUDED.legal_name + ON CONFLICT (ticker, exchange) DO UPDATE SET + legal_name = EXCLUDED.legal_name, + sector = EXCLUDED.sector, + industry = EXCLUDED.industry, + market_cap_bucket = EXCLUDED.market_cap_bucket, + updated_at = NOW() RETURNING id, ticker""", c["ticker"], c["legal_name"], c["exchange"], c["sector"], c["industry"], c["market_cap_bucket"], @@ -146,29 +147,41 @@ async def seed(pool: asyncpg.Pool) -> None: ) logger.info(f"Watchlist 'Starter 10' -> {wl_id}") - # Sources per company + # Sources per company — check for existing before inserting for ticker, cid in company_ids.items(): + existing = await pool.fetch( + "SELECT source_type, source_name FROM sources WHERE company_id = $1", + cid, + ) + existing_set = {(r["source_type"], r["source_name"]) for r in existing} + for src in SOURCES_PER_COMPANY: + key = (src["source_type"], src["source_name"]) + if key in existing_set: + logger.debug(f"Source {key} already exists for {ticker}, skipping") + continue await pool.execute( """INSERT INTO sources (company_id, source_type, source_name, config, credibility_score) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT DO NOTHING""", + VALUES ($1, $2, $3, $4::jsonb, $5)""", cid, src["source_type"], src["source_name"], - src["config"], src["credibility_score"], + json.dumps(src["config"]), src["credibility_score"], ) + # Broker source only for the first company (account-level) if ticker == COMPANIES[0]["ticker"]: - await pool.execute( - """INSERT INTO sources (company_id, source_type, source_name, config, credibility_score) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT DO NOTHING""", - cid, BROKER_SOURCE["source_type"], BROKER_SOURCE["source_name"], - BROKER_SOURCE["config"], BROKER_SOURCE["credibility_score"], - ) + bkey = (BROKER_SOURCE["source_type"], BROKER_SOURCE["source_name"]) + if bkey not in existing_set: + await pool.execute( + """INSERT INTO sources (company_id, source_type, source_name, config, credibility_score) + VALUES ($1, $2, $3, $4::jsonb, $5)""", + cid, BROKER_SOURCE["source_type"], BROKER_SOURCE["source_name"], + json.dumps(BROKER_SOURCE["config"]), BROKER_SOURCE["credibility_score"], + ) logger.info("Sources seeded") total = await pool.fetchval("SELECT count(*) FROM companies") - logger.info(f"Seed complete: {total} companies, watchlist with {len(company_ids)} members") + sources_total = await pool.fetchval("SELECT count(*) FROM sources") + logger.info(f"Seed complete: {total} companies, {sources_total} sources, watchlist with {len(company_ids)} members") async def main() -> None: