phase 17: fix scheduler config parsing, worker entry points, and seed data for Polygon sources

This commit is contained in:
Celes Renata
2026-04-12 02:45:37 -07:00
parent 1410d324b7
commit f2b9d6c00a
4 changed files with 70 additions and 42 deletions
+3 -3
View File
@@ -174,15 +174,15 @@
## Phase 17 - First Vertical Slice: Live Pipeline End-to-End ## Phase 17 - First Vertical Slice: Live Pipeline End-to-End
- [ ] 17. Activate the full data pipeline for a set of tracked symbols - [-] 17. Activate the full data pipeline for a set of tracked symbols
- [ ] 17.1 Seed initial symbols and configure sources via the dashboard - [x] 17.1 Seed initial symbols and configure sources via the dashboard
- Use the dashboard Companies page to add 5-10 symbols (e.g. AAPL, MSFT, GOOGL, AMZN, TSLA, NVDA, META, JPM, V, UNH) - Use the dashboard Companies page to add 5-10 symbols (e.g. AAPL, MSFT, GOOGL, AMZN, TSLA, NVDA, META, JPM, V, UNH)
- For each company, add sources via the Company Detail → Sources tab: one `market_api` source (Polygon), one `news_api` source, one `filings_api` source - For each company, add sources via the Company Detail → Sources tab: one `market_api` source (Polygon), one `news_api` source, one `filings_api` source
- Configure source `config` JSON with the correct Polygon endpoint patterns per ticker - Configure source `config` JSON with the correct Polygon endpoint patterns per ticker
- Verify companies and sources appear in the dashboard and via `curl https://stonks-registry.celestium.life/companies` - Verify companies and sources appear in the dashboard and via `curl https://stonks-registry.celestium.life/companies`
- _Requirements: 1.1, 1.2, 1.3, 2.1_ - _Requirements: 1.1, 1.2, 1.3, 2.1_
- [ ] 17.2 Wire the scheduler to enqueue ingestion jobs for active sources - [-] 17.2 Wire the scheduler to enqueue ingestion jobs for active sources
- Verify the scheduler service reads active companies and sources from PostgreSQL - Verify the scheduler service reads active companies and sources from PostgreSQL
- Verify it enqueues Redis jobs for each source on its polling interval - Verify it enqueues Redis jobs for each source on its polling interval
- Check scheduler logs: `kubectl logs -n stonks-oracle deployment/scheduler --tail=50` - Check scheduler logs: `kubectl logs -n stonks-oracle deployment/scheduler --tail=50`
+5 -5
View File
@@ -64,7 +64,7 @@ services:
extractor: extractor:
replicas: 1 replicas: 1
image: extractor image: extractor
command: "python -m services.extractor.worker" command: "python -m services.extractor.main"
tier: processing tier: processing
secrets: [stonks-core-secrets] secrets: [stonks-core-secrets]
resources: resources:
@@ -74,7 +74,7 @@ services:
aggregation: aggregation:
replicas: 1 replicas: 1
image: aggregation image: aggregation
command: "python -m services.aggregation.worker" command: "python -m services.aggregation.main"
tier: processing tier: processing
secrets: [stonks-core-secrets] secrets: [stonks-core-secrets]
resources: resources:
@@ -84,7 +84,7 @@ services:
recommendation: recommendation:
replicas: 1 replicas: 1
image: recommendation image: recommendation
command: "python -m services.recommendation.worker" command: "python -m services.recommendation.main"
tier: processing tier: processing
secrets: [stonks-core-secrets] secrets: [stonks-core-secrets]
resources: resources:
@@ -105,7 +105,7 @@ services:
brokerAdapter: brokerAdapter:
replicas: 1 replicas: 1
image: broker-adapter image: broker-adapter
command: "python -m services.adapters.broker_adapter" command: "python -m services.adapters.broker_service"
tier: trading tier: trading
secrets: [stonks-core-secrets, stonks-broker-secrets] secrets: [stonks-core-secrets, stonks-broker-secrets]
resources: resources:
@@ -115,7 +115,7 @@ services:
lakePublisher: lakePublisher:
replicas: 1 replicas: 1
image: lake-publisher image: lake-publisher
command: "python -m services.lake_publisher.worker" command: "python -m services.lake_publisher.jobs"
tier: analytics tier: analytics
secrets: [stonks-core-secrets] secrets: [stonks-core-secrets]
resources: resources:
+17 -2
View File
@@ -12,6 +12,21 @@ import logging
from datetime import datetime from datetime import datetime
from typing import Any, Optional from typing import Any, Optional
def _ensure_dict(val: Any) -> Optional[dict]:
"""Coerce a JSONB value (dict or JSON string) to a Python dict."""
if val is None:
return None
if isinstance(val, dict):
return val
if isinstance(val, str):
try:
parsed = json.loads(val)
return parsed if isinstance(parsed, dict) else None
except (json.JSONDecodeError, TypeError):
return None
return None
import asyncpg import asyncpg
import redis.asyncio as aioredis import redis.asyncio as aioredis
@@ -132,7 +147,7 @@ def build_job_payload(
"aliases": aliases, "aliases": aliases,
"source_type": source["source_type"], "source_type": source["source_type"],
"source_name": source["source_name"], "source_name": source["source_name"],
"config": dict(source["config"]) if source["config"] else {}, "config": _ensure_dict(source["config"]) or {},
"credibility_score": float(source["credibility_score"]) if source["credibility_score"] else 0.5, "credibility_score": float(source["credibility_score"]) if source["credibility_score"] else 0.5,
"scheduled_at": now.isoformat(), "scheduled_at": now.isoformat(),
} }
@@ -223,7 +238,7 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
for src in sources: for src in sources:
source_id = src["source_id"] source_id = src["source_id"]
source_type = src["source_type"] source_type = src["source_type"]
source_config = dict(src["config"]) if src["config"] else None source_config = _ensure_dict(src["config"])
# Check last run status and timing # Check last run status and timing
last_run = await fetch_last_run(pool, source_id) last_run = await fetch_last_run(pool, source_id)
+45 -32
View File
@@ -7,6 +7,7 @@ Usage:
python -m services.symbol_registry.seed python -m services.symbol_registry.seed
""" """
import asyncio import asyncio
import json
import logging import logging
import asyncpg import asyncpg
@@ -47,32 +48,29 @@ ALIASES = {
} }
# --- Source configs per company --- # --- Source configs per company ---
# Alpha Vantage for market data (free: 25 req/day) # Polygon.io for market data and news (matches PolygonMarketAdapter and PolygonNewsAdapter)
# NewsAPI for news (free: 100 req/day) # SEC EDGAR for filings (matches SECEdgarAdapter)
# SEC EDGAR for filings (free, rate-limited by user-agent) # Alpaca for paper trading (matches AlpacaBrokerAdapter)
# Alpaca for paper trading (free unlimited paper)
SOURCES_PER_COMPANY = [ SOURCES_PER_COMPANY = [
{ {
"source_type": "market_api", "source_type": "market_api",
"source_name": "Alpha Vantage", "source_name": "Polygon Market Data",
"credibility_score": 0.9, "credibility_score": 0.9,
"config": { "config": {
"provider": "alpha_vantage", "provider": "polygon",
"base_url": "https://www.alphavantage.co", "endpoint": "prev_bars",
"endpoint": "/query", "adjusted": True,
"functions": ["TIME_SERIES_DAILY", "GLOBAL_QUOTE", "OVERVIEW"],
}, },
}, },
{ {
"source_type": "news_api", "source_type": "news_api",
"source_name": "NewsAPI", "source_name": "Polygon News",
"credibility_score": 0.7, "credibility_score": 0.7,
"config": { "config": {
"provider": "newsapi", "provider": "polygon",
"base_url": "https://newsapi.org", "limit": 20,
"endpoint": "/v2/everything", "order": "desc",
"page_size": 20,
}, },
}, },
{ {
@@ -81,8 +79,7 @@ SOURCES_PER_COMPANY = [
"credibility_score": 1.0, "credibility_score": 1.0,
"config": { "config": {
"provider": "sec_edgar", "provider": "sec_edgar",
"base_url": "https://efts.sec.gov", "forms": "8-K,10-Q,10-K",
"forms": ["8-K", "10-Q", "10-K"],
"user_agent": "StonksOracle/1.0", "user_agent": "StonksOracle/1.0",
}, },
}, },
@@ -95,22 +92,26 @@ BROKER_SOURCE = {
"credibility_score": 1.0, "credibility_score": 1.0,
"config": { "config": {
"provider": "alpaca", "provider": "alpaca",
"base_url": "https://paper-api.alpaca.markets",
"mode": "paper", "mode": "paper",
}, },
} }
async def seed(pool: asyncpg.Pool) -> None: async def seed(pool: asyncpg.Pool) -> None:
"""Insert seed data. Skips existing records.""" """Insert seed data. Uses upsert for companies, skips existing aliases/sources."""
company_ids = {} company_ids = {}
# Companies # Companies — upsert on (ticker, exchange)
for c in COMPANIES: for c in COMPANIES:
row = await pool.fetchrow( row = await pool.fetchrow(
"""INSERT INTO companies (ticker, legal_name, exchange, sector, industry, market_cap_bucket) """INSERT INTO companies (ticker, legal_name, exchange, sector, industry, market_cap_bucket)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (ticker, exchange) DO UPDATE SET legal_name = EXCLUDED.legal_name ON CONFLICT (ticker, exchange) DO UPDATE SET
legal_name = EXCLUDED.legal_name,
sector = EXCLUDED.sector,
industry = EXCLUDED.industry,
market_cap_bucket = EXCLUDED.market_cap_bucket,
updated_at = NOW()
RETURNING id, ticker""", RETURNING id, ticker""",
c["ticker"], c["legal_name"], c["exchange"], c["ticker"], c["legal_name"], c["exchange"],
c["sector"], c["industry"], c["market_cap_bucket"], c["sector"], c["industry"], c["market_cap_bucket"],
@@ -146,29 +147,41 @@ async def seed(pool: asyncpg.Pool) -> None:
) )
logger.info(f"Watchlist 'Starter 10' -> {wl_id}") logger.info(f"Watchlist 'Starter 10' -> {wl_id}")
# Sources per company # Sources per company — check for existing before inserting
for ticker, cid in company_ids.items(): for ticker, cid in company_ids.items():
existing = await pool.fetch(
"SELECT source_type, source_name FROM sources WHERE company_id = $1",
cid,
)
existing_set = {(r["source_type"], r["source_name"]) for r in existing}
for src in SOURCES_PER_COMPANY: for src in SOURCES_PER_COMPANY:
key = (src["source_type"], src["source_name"])
if key in existing_set:
logger.debug(f"Source {key} already exists for {ticker}, skipping")
continue
await pool.execute( await pool.execute(
"""INSERT INTO sources (company_id, source_type, source_name, config, credibility_score) """INSERT INTO sources (company_id, source_type, source_name, config, credibility_score)
VALUES ($1, $2, $3, $4, $5) VALUES ($1, $2, $3, $4::jsonb, $5)""",
ON CONFLICT DO NOTHING""",
cid, src["source_type"], src["source_name"], cid, src["source_type"], src["source_name"],
src["config"], src["credibility_score"], json.dumps(src["config"]), src["credibility_score"],
) )
# Broker source only for the first company (account-level) # Broker source only for the first company (account-level)
if ticker == COMPANIES[0]["ticker"]: if ticker == COMPANIES[0]["ticker"]:
await pool.execute( bkey = (BROKER_SOURCE["source_type"], BROKER_SOURCE["source_name"])
"""INSERT INTO sources (company_id, source_type, source_name, config, credibility_score) if bkey not in existing_set:
VALUES ($1, $2, $3, $4, $5) await pool.execute(
ON CONFLICT DO NOTHING""", """INSERT INTO sources (company_id, source_type, source_name, config, credibility_score)
cid, BROKER_SOURCE["source_type"], BROKER_SOURCE["source_name"], VALUES ($1, $2, $3, $4::jsonb, $5)""",
BROKER_SOURCE["config"], BROKER_SOURCE["credibility_score"], cid, BROKER_SOURCE["source_type"], BROKER_SOURCE["source_name"],
) json.dumps(BROKER_SOURCE["config"]), BROKER_SOURCE["credibility_score"],
)
logger.info("Sources seeded") logger.info("Sources seeded")
total = await pool.fetchval("SELECT count(*) FROM companies") total = await pool.fetchval("SELECT count(*) FROM companies")
logger.info(f"Seed complete: {total} companies, watchlist with {len(company_ids)} members") sources_total = await pool.fetchval("SELECT count(*) FROM sources")
logger.info(f"Seed complete: {total} companies, {sources_total} sources, watchlist with {len(company_ids)} members")
async def main() -> None: async def main() -> None: