feat: add Polygon grouped daily endpoint for broad market data
Two tiers of market data: 1. Per-ticker prev bars (existing 50 sources, 15-min cadence) for watchlist detail — trading decisions, stop-loss, position sizing 2. Grouped daily (new single source, once per day) for broad market context — correlation analysis, sector rotation, competitive intel Changes: - Add grouped_daily endpoint to PolygonMarketAdapter with auto date calculation (previous trading day, skip weekends) - Add fetch_global_market_sources() to scheduler for sources without company_id, scheduled once daily (86400s cadence) - Update _persist_market_items to use item-level ticker from T field and look up company_id dynamically for grouped daily bars - Migration 020: make company_id nullable on sources and market_snapshots tables, add grouped daily source row - Fix backtest replay to query market_snapshots data->>'c' for prices
This commit is contained in:
@@ -43,6 +43,7 @@ class PolygonMarketAdapter(MarketDataAdapter):
|
||||
|
||||
PREV_BARS = "/v2/aggs/ticker/{ticker}/prev"
|
||||
RANGE_BARS = "/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}"
|
||||
GROUPED_DAILY = "/v2/aggs/grouped/locale/us/market/stocks/{date}"
|
||||
TICKER_DETAILS = "/v3/reference/tickers/{ticker}"
|
||||
|
||||
def __init__(self, api_key: str, base_url: str = "https://api.polygon.io") -> None:
|
||||
@@ -132,6 +133,20 @@ class PolygonMarketAdapter(MarketDataAdapter):
|
||||
params["sort"] = config["sort"]
|
||||
if config.get("limit"):
|
||||
params["limit"] = str(config["limit"])
|
||||
elif endpoint_key == "grouped_daily":
|
||||
# Grouped daily: returns bars for ALL tickers for a given date
|
||||
target_date = config.get("date", "")
|
||||
if not target_date:
|
||||
# Default to previous trading day
|
||||
from datetime import date, timedelta
|
||||
today = date.today()
|
||||
prev = today - timedelta(days=1)
|
||||
# Skip weekends
|
||||
while prev.weekday() > 4:
|
||||
prev -= timedelta(days=1)
|
||||
target_date = prev.isoformat()
|
||||
path = self.GROUPED_DAILY.format(date=target_date)
|
||||
params["adjusted"] = str(config.get("adjusted", True)).lower()
|
||||
elif endpoint_key == "ticker_details":
|
||||
path = self.TICKER_DETAILS.format(ticker=ticker)
|
||||
else:
|
||||
@@ -149,6 +164,7 @@ class PolygonMarketAdapter(MarketDataAdapter):
|
||||
return [results] if isinstance(results, dict) and results else []
|
||||
|
||||
# Aggregate endpoints return results as a list
|
||||
# For grouped_daily, each item has a "T" field with the ticker
|
||||
results = data.get("results", [])
|
||||
if isinstance(results, list):
|
||||
return results
|
||||
|
||||
@@ -225,6 +225,28 @@ async def fetch_macro_sources(pool: asyncpg.Pool) -> list[asyncpg.Record]:
|
||||
)
|
||||
|
||||
|
||||
async def fetch_global_market_sources(pool: asyncpg.Pool) -> list[asyncpg.Record]:
|
||||
"""Fetch active market sources that are not company-specific.
|
||||
|
||||
These are sources like the grouped daily endpoint that fetch data for
|
||||
all tickers in a single API call. They have company_id IS NULL and
|
||||
source_type = 'market_api'.
|
||||
"""
|
||||
return await pool.fetch(
|
||||
"""SELECT s.id AS source_id,
|
||||
s.company_id,
|
||||
s.source_type,
|
||||
s.source_name,
|
||||
s.config,
|
||||
s.credibility_score
|
||||
FROM sources s
|
||||
WHERE s.active = TRUE
|
||||
AND s.source_type = 'market_api'
|
||||
AND s.company_id IS NULL
|
||||
ORDER BY s.source_name"""
|
||||
)
|
||||
|
||||
|
||||
async def fetch_aliases_for_company(pool: asyncpg.Pool, company_id: str) -> list[str]:
|
||||
"""Fetch all aliases for a company."""
|
||||
rows = await pool.fetch(
|
||||
@@ -360,9 +382,58 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
|
||||
"Enqueued macro_news job for %s", src["source_name"],
|
||||
)
|
||||
|
||||
# --- Schedule global market sources (grouped daily, etc.) ---
|
||||
global_market_sources = await fetch_global_market_sources(pool)
|
||||
for src in global_market_sources:
|
||||
source_id = src["source_id"]
|
||||
source_type = src["source_type"]
|
||||
source_config = _ensure_dict(src["config"])
|
||||
|
||||
# Use a longer cadence for grouped daily (once per day = 86400s)
|
||||
endpoint = source_config.get("endpoint", "")
|
||||
if endpoint == "grouped_daily":
|
||||
source_config.setdefault("polling_interval_seconds", 86400)
|
||||
|
||||
last_run = await fetch_last_run(pool, source_id)
|
||||
|
||||
last_completed_at = None
|
||||
last_status = None
|
||||
retry_count = 0
|
||||
next_retry_at = None
|
||||
|
||||
if last_run:
|
||||
last_status = last_run["status"]
|
||||
last_completed_at = last_run["completed_at"] or last_run["started_at"]
|
||||
retry_count = last_run["retry_count"] or 0
|
||||
next_retry_at = last_run["next_retry_at"]
|
||||
|
||||
if not is_source_due(
|
||||
source_type=source_type,
|
||||
source_config=source_config,
|
||||
last_completed_at=last_completed_at,
|
||||
last_status=last_status,
|
||||
retry_count=retry_count,
|
||||
next_retry_at=next_retry_at,
|
||||
now=now,
|
||||
):
|
||||
skipped_not_due += 1
|
||||
continue
|
||||
|
||||
if not await check_rate_limit(rds, source_type, now):
|
||||
skipped_rate_limit += 1
|
||||
continue
|
||||
|
||||
# Build job with ticker="_MARKET" for global sources
|
||||
job = build_job_payload(src, [], now)
|
||||
job["ticker"] = "_MARKET"
|
||||
await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(job))
|
||||
enqueued += 1
|
||||
|
||||
logger.info("Enqueued grouped daily market data job")
|
||||
|
||||
logger.info(
|
||||
"Cycle complete: enqueued=%d skipped_not_due=%d skipped_rate_limit=%d total_sources=%d",
|
||||
enqueued, skipped_not_due, skipped_rate_limit, len(sources) + len(macro_sources),
|
||||
enqueued, skipped_not_due, skipped_rate_limit, len(sources) + len(macro_sources) + len(global_market_sources),
|
||||
)
|
||||
return enqueued
|
||||
|
||||
|
||||
@@ -302,7 +302,11 @@ async def _persist_market_items(
|
||||
provider: str,
|
||||
content_hash: str,
|
||||
) -> tuple[int, list[str]]:
|
||||
"""Persist market data items as market_snapshots rows."""
|
||||
"""Persist market data items as market_snapshots rows.
|
||||
|
||||
For grouped daily responses, each item contains a 'T' field with the
|
||||
ticker. When present, the item's ticker overrides the job-level ticker.
|
||||
"""
|
||||
ids: list[str] = []
|
||||
for item in items:
|
||||
item_hash = content_hash_str(json.dumps(item, sort_keys=True))
|
||||
@@ -313,11 +317,24 @@ async def _persist_market_items(
|
||||
if exists:
|
||||
continue
|
||||
|
||||
# Use item-level ticker if available (grouped daily), else job-level
|
||||
item_ticker = item.get("T", ticker) or ticker
|
||||
snapshot_type = _infer_market_snapshot_type(item)
|
||||
|
||||
# For grouped daily items, look up company_id by ticker
|
||||
item_company_id = company_id
|
||||
if item.get("T") and not company_id:
|
||||
cid = await pool.fetchval(
|
||||
"SELECT id FROM companies WHERE ticker = $1", item_ticker
|
||||
)
|
||||
if cid:
|
||||
item_company_id = str(cid)
|
||||
# If not in our companies table, store with company_id=NULL
|
||||
|
||||
row_id = await persist_market_snapshot(
|
||||
pool,
|
||||
company_id=company_id,
|
||||
ticker=ticker,
|
||||
company_id=item_company_id,
|
||||
ticker=item_ticker,
|
||||
snapshot_type=snapshot_type,
|
||||
data=item,
|
||||
source_provider=provider,
|
||||
|
||||
Reference in New Issue
Block a user