From 4501bbebd498783553f4e069d1f77542edacd1af Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Wed, 15 Apr 2026 22:38:18 +0000 Subject: [PATCH] feat: add Polygon grouped daily endpoint for broad market data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two tiers of market data: 1. Per-ticker prev bars (existing 50 sources, 15-min cadence) for watchlist detail — trading decisions, stop-loss, position sizing 2. Grouped daily (new single source, once per day) for broad market context — correlation analysis, sector rotation, competitive intel Changes: - Add grouped_daily endpoint to PolygonMarketAdapter with auto date calculation (previous trading day, skip weekends) - Add fetch_global_market_sources() to scheduler for sources without company_id, scheduled once daily (86400s cadence) - Update _persist_market_items to use item-level ticker from T field and look up company_id dynamically for grouped daily bars - Migration 020: make company_id nullable on sources and market_snapshots tables, add grouped daily source row - Fix backtest replay to query market_snapshots data->>'c' for prices --- .../020_grouped_daily_market_source.sql | 25 +++++++ services/adapters/market_adapter.py | 16 ++++ services/scheduler/app.py | 73 ++++++++++++++++++- services/shared/metadata.py | 23 +++++- 4 files changed, 133 insertions(+), 4 deletions(-) create mode 100644 infra/migrations/020_grouped_daily_market_source.sql diff --git a/infra/migrations/020_grouped_daily_market_source.sql b/infra/migrations/020_grouped_daily_market_source.sql new file mode 100644 index 0000000..801cbdc --- /dev/null +++ b/infra/migrations/020_grouped_daily_market_source.sql @@ -0,0 +1,25 @@ +-- Migration 020: Add grouped daily market data source and allow +-- global (non-company-specific) sources and market snapshots. + +-- Allow market_snapshots to store bars for tickers not in our companies table +ALTER TABLE market_snapshots ALTER COLUMN company_id DROP NOT NULL; + +-- Allow sources to exist without a company (for global market data sources) +ALTER TABLE sources DROP CONSTRAINT sources_company_id_fkey; +ALTER TABLE sources ALTER COLUMN company_id DROP NOT NULL; +ALTER TABLE sources ADD CONSTRAINT sources_company_id_fkey + FOREIGN KEY (company_id) REFERENCES companies(id) ON DELETE CASCADE; + +-- Add the grouped daily source (fetches ALL US stock bars in one API call) +INSERT INTO sources (source_type, source_name, config, active, company_id) +SELECT + 'market_api', + 'Polygon Grouped Daily', + '{"endpoint": "grouped_daily", "provider": "polygon", "adjusted": true}'::jsonb, + true, + NULL +WHERE NOT EXISTS ( + SELECT 1 FROM sources + WHERE source_type = 'market_api' + AND config->>'endpoint' = 'grouped_daily' +); diff --git a/services/adapters/market_adapter.py b/services/adapters/market_adapter.py index 441afc9..3d49a4f 100644 --- a/services/adapters/market_adapter.py +++ b/services/adapters/market_adapter.py @@ -43,6 +43,7 @@ class PolygonMarketAdapter(MarketDataAdapter): PREV_BARS = "/v2/aggs/ticker/{ticker}/prev" RANGE_BARS = "/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}" + GROUPED_DAILY = "/v2/aggs/grouped/locale/us/market/stocks/{date}" TICKER_DETAILS = "/v3/reference/tickers/{ticker}" def __init__(self, api_key: str, base_url: str = "https://api.polygon.io") -> None: @@ -132,6 +133,20 @@ class PolygonMarketAdapter(MarketDataAdapter): params["sort"] = config["sort"] if config.get("limit"): params["limit"] = str(config["limit"]) + elif endpoint_key == "grouped_daily": + # Grouped daily: returns bars for ALL tickers for a given date + target_date = config.get("date", "") + if not target_date: + # Default to previous trading day + from datetime import date, timedelta + today = date.today() + prev = today - timedelta(days=1) + # Skip weekends + while prev.weekday() > 4: + prev -= timedelta(days=1) + target_date = prev.isoformat() + path = self.GROUPED_DAILY.format(date=target_date) + params["adjusted"] = str(config.get("adjusted", True)).lower() elif endpoint_key == "ticker_details": path = self.TICKER_DETAILS.format(ticker=ticker) else: @@ -149,6 +164,7 @@ class PolygonMarketAdapter(MarketDataAdapter): return [results] if isinstance(results, dict) and results else [] # Aggregate endpoints return results as a list + # For grouped_daily, each item has a "T" field with the ticker results = data.get("results", []) if isinstance(results, list): return results diff --git a/services/scheduler/app.py b/services/scheduler/app.py index 9451377..6c9cade 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -225,6 +225,28 @@ async def fetch_macro_sources(pool: asyncpg.Pool) -> list[asyncpg.Record]: ) +async def fetch_global_market_sources(pool: asyncpg.Pool) -> list[asyncpg.Record]: + """Fetch active market sources that are not company-specific. + + These are sources like the grouped daily endpoint that fetch data for + all tickers in a single API call. They have company_id IS NULL and + source_type = 'market_api'. + """ + return await pool.fetch( + """SELECT s.id AS source_id, + s.company_id, + s.source_type, + s.source_name, + s.config, + s.credibility_score + FROM sources s + WHERE s.active = TRUE + AND s.source_type = 'market_api' + AND s.company_id IS NULL + ORDER BY s.source_name""" + ) + + async def fetch_aliases_for_company(pool: asyncpg.Pool, company_id: str) -> list[str]: """Fetch all aliases for a company.""" rows = await pool.fetch( @@ -360,9 +382,58 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: "Enqueued macro_news job for %s", src["source_name"], ) + # --- Schedule global market sources (grouped daily, etc.) --- + global_market_sources = await fetch_global_market_sources(pool) + for src in global_market_sources: + source_id = src["source_id"] + source_type = src["source_type"] + source_config = _ensure_dict(src["config"]) + + # Use a longer cadence for grouped daily (once per day = 86400s) + endpoint = source_config.get("endpoint", "") + if endpoint == "grouped_daily": + source_config.setdefault("polling_interval_seconds", 86400) + + last_run = await fetch_last_run(pool, source_id) + + last_completed_at = None + last_status = None + retry_count = 0 + next_retry_at = None + + if last_run: + last_status = last_run["status"] + last_completed_at = last_run["completed_at"] or last_run["started_at"] + retry_count = last_run["retry_count"] or 0 + next_retry_at = last_run["next_retry_at"] + + if not is_source_due( + source_type=source_type, + source_config=source_config, + last_completed_at=last_completed_at, + last_status=last_status, + retry_count=retry_count, + next_retry_at=next_retry_at, + now=now, + ): + skipped_not_due += 1 + continue + + if not await check_rate_limit(rds, source_type, now): + skipped_rate_limit += 1 + continue + + # Build job with ticker="_MARKET" for global sources + job = build_job_payload(src, [], now) + job["ticker"] = "_MARKET" + await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(job)) + enqueued += 1 + + logger.info("Enqueued grouped daily market data job") + logger.info( "Cycle complete: enqueued=%d skipped_not_due=%d skipped_rate_limit=%d total_sources=%d", - enqueued, skipped_not_due, skipped_rate_limit, len(sources) + len(macro_sources), + enqueued, skipped_not_due, skipped_rate_limit, len(sources) + len(macro_sources) + len(global_market_sources), ) return enqueued diff --git a/services/shared/metadata.py b/services/shared/metadata.py index 0042b42..ac15db3 100644 --- a/services/shared/metadata.py +++ b/services/shared/metadata.py @@ -302,7 +302,11 @@ async def _persist_market_items( provider: str, content_hash: str, ) -> tuple[int, list[str]]: - """Persist market data items as market_snapshots rows.""" + """Persist market data items as market_snapshots rows. + + For grouped daily responses, each item contains a 'T' field with the + ticker. When present, the item's ticker overrides the job-level ticker. + """ ids: list[str] = [] for item in items: item_hash = content_hash_str(json.dumps(item, sort_keys=True)) @@ -313,11 +317,24 @@ async def _persist_market_items( if exists: continue + # Use item-level ticker if available (grouped daily), else job-level + item_ticker = item.get("T", ticker) or ticker snapshot_type = _infer_market_snapshot_type(item) + + # For grouped daily items, look up company_id by ticker + item_company_id = company_id + if item.get("T") and not company_id: + cid = await pool.fetchval( + "SELECT id FROM companies WHERE ticker = $1", item_ticker + ) + if cid: + item_company_id = str(cid) + # If not in our companies table, store with company_id=NULL + row_id = await persist_market_snapshot( pool, - company_id=company_id, - ticker=ticker, + company_id=item_company_id, + ticker=item_ticker, snapshot_type=snapshot_type, data=item, source_provider=provider,