From 2360c501e431116296a6cc8c527bda2d72753894 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Fri, 17 Apr 2026 01:13:24 +0000 Subject: [PATCH] feat: intraday hourly price bars via Polygon range endpoint - New 'intraday_bars' endpoint in PolygonMarketAdapter: fetches hourly bars for today using range_bars URL with timespan=hour, sort=asc - Scheduler expands intraday_bars global source into per-ticker jobs for all active companies (every 15 minutes via polling_interval) - Migration 025 inserts the intraday source with 900s cadence - Frontend price matching uses closest-timestamp instead of date-string matching, with 2h tolerance for intraday and 36h for daily windows - Bumped market price fetch limit to 200 for intraday granularity --- frontend/src/pages/CompanyDetail.tsx | 32 +++++++++++++------ .../migrations/025_intraday_market_source.sql | 15 +++++++++ services/adapters/market_adapter.py | 17 ++++++++++ services/scheduler/app.py | 21 +++++++++--- 4 files changed, 70 insertions(+), 15 deletions(-) create mode 100644 infra/migrations/025_intraday_market_source.sql diff --git a/frontend/src/pages/CompanyDetail.tsx b/frontend/src/pages/CompanyDetail.tsx index 8b64bde..6097c04 100644 --- a/frontend/src/pages/CompanyDetail.tsx +++ b/frontend/src/pages/CompanyDetail.tsx @@ -43,7 +43,7 @@ export function CompanyDetailPage() { const { data: decisions } = useCorporateDecisions(company?.ticker); const { data: trends } = useTrends({ ticker: company?.ticker, limit: 200 }); const { data: trendHistory } = useTrendHistory({ ticker: company?.ticker, limit: 500 }); - const { data: marketPrices } = useMarketPrices(company?.ticker); + const { data: marketPrices } = useMarketPrices(company?.ticker, 200); const [tab, setTab] = useState<'trends' | 'sources' | 'aliases' | 'macro' | 'competitors' | 'patterns' | 'signals' | 'decisions'>('trends'); if (isLoading || !company) return ; @@ -595,21 +595,33 @@ function TrendHistoryChart({ trends, latestTrends, ticker, marketPrices }: { tre .filter((t) => t.entity_id === ticker && t.window === selectedWindow) .sort((a, b) => new Date(a.generated_at).getTime() - new Date(b.generated_at).getTime()); - // Build a price lookup by date (closest price per day) - const priceByDay = new Map(); - for (const p of marketPrices ?? []) { - if (p.bar_timestamp && p.close != null) { - const d = new Date(p.bar_timestamp).toISOString().slice(0, 10); - priceByDay.set(d, p.close); + // Build a price lookup — match by closest timestamp to each trend point + const sortedPrices = [...(marketPrices ?? [])] + .filter((p) => p.bar_timestamp != null && p.close != null) + .sort((a, b) => a.bar_timestamp - b.bar_timestamp); + + function findClosestPrice(ts: number): number | undefined { + if (sortedPrices.length === 0) return undefined; + let best = sortedPrices[0]; + let bestDiff = Math.abs(ts - best.bar_timestamp); + for (const p of sortedPrices) { + const diff = Math.abs(ts - p.bar_timestamp); + if (diff < bestDiff) { + best = p; + bestDiff = diff; + } } + // Only match if within 2 hours (for intraday) or 36 hours (for daily) + const maxGap = selectedWindow === 'intraday' ? 2 * 3600_000 : 36 * 3600_000; + return bestDiff <= maxGap ? best.close : undefined; } const chartData: ChartPoint[] = filtered.map((t) => { - const trendDate = new Date(t.generated_at).toISOString().slice(0, 10); - const price = priceByDay.get(trendDate); + const trendTs = new Date(t.generated_at).getTime(); + const price = findClosestPrice(trendTs); return { time: new Date(t.generated_at).toLocaleDateString('en-US', { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }), - timestamp: new Date(t.generated_at).getTime(), + timestamp: trendTs, strength: +(t.trend_strength * 100).toFixed(1), confidence: +(t.confidence * 100).toFixed(1), contradiction: +(t.contradiction_score * 100).toFixed(1), diff --git a/infra/migrations/025_intraday_market_source.sql b/infra/migrations/025_intraday_market_source.sql new file mode 100644 index 0000000..5c5839d --- /dev/null +++ b/infra/migrations/025_intraday_market_source.sql @@ -0,0 +1,15 @@ +-- Add intraday market data source (hourly bars via Polygon range endpoint). +-- The scheduler expands this into per-ticker jobs every 15 minutes. + +INSERT INTO sources (source_type, source_name, config, active, company_id) +SELECT + 'market_api', + 'Polygon Intraday Hourly', + '{"endpoint": "intraday_bars", "provider": "polygon", "timespan": "hour", "multiplier": 1, "adjusted": true, "polling_interval_seconds": 900}'::jsonb, + true, + NULL +WHERE NOT EXISTS ( + SELECT 1 FROM sources + WHERE source_type = 'market_api' + AND config->>'endpoint' = 'intraday_bars' +); diff --git a/services/adapters/market_adapter.py b/services/adapters/market_adapter.py index 3d49a4f..dce07bf 100644 --- a/services/adapters/market_adapter.py +++ b/services/adapters/market_adapter.py @@ -44,6 +44,7 @@ class PolygonMarketAdapter(MarketDataAdapter): PREV_BARS = "/v2/aggs/ticker/{ticker}/prev" RANGE_BARS = "/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}" GROUPED_DAILY = "/v2/aggs/grouped/locale/us/market/stocks/{date}" + INTRADAY_BARS = "/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}" TICKER_DETAILS = "/v3/reference/tickers/{ticker}" def __init__(self, api_key: str, base_url: str = "https://api.polygon.io") -> None: @@ -133,6 +134,22 @@ class PolygonMarketAdapter(MarketDataAdapter): params["sort"] = config["sort"] if config.get("limit"): params["limit"] = str(config["limit"]) + elif endpoint_key == "intraday_bars": + # Intraday: fetch hourly bars for today + from datetime import date as date_cls + today = date_cls.today().isoformat() + multiplier = str(config.get("multiplier", 1)) + timespan = config.get("timespan", "hour") + path = self.INTRADAY_BARS.format( + ticker=ticker, + multiplier=multiplier, + timespan=timespan, + from_date=today, + to_date=today, + ) + params["adjusted"] = str(config.get("adjusted", True)).lower() + params["sort"] = "asc" + params["limit"] = str(config.get("limit", 50)) elif endpoint_key == "grouped_daily": # Grouped daily: returns bars for ALL tickers for a given date target_date = config.get("date", "") diff --git a/services/scheduler/app.py b/services/scheduler/app.py index 2181f1e..b652d50 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -453,11 +453,22 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: # Build job with ticker="_MARKET" for global sources job = build_job_payload(src, [], now) - job["ticker"] = "_MARKET" - await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(job)) - enqueued += 1 - - logger.info("Enqueued grouped daily market data job") + if endpoint == "intraday_bars": + # Expand intraday source into per-ticker jobs for all active companies + tickers = await pool.fetch( + "SELECT ticker FROM companies WHERE active = TRUE" + ) + for t_row in tickers: + ticker_job = dict(job) + ticker_job["ticker"] = t_row["ticker"] + await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(ticker_job)) + enqueued += 1 + logger.info("Enqueued %d intraday bar jobs", len(tickers)) + else: + job["ticker"] = "_MARKET" + await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(job)) + enqueued += 1 + logger.info("Enqueued grouped daily market data job") logger.info( "Cycle complete: enqueued=%d skipped_not_due=%d skipped_rate_limit=%d total_sources=%d",