diff --git a/frontend/src/pages/CompanyDetail.tsx b/frontend/src/pages/CompanyDetail.tsx
index 8b64bde..6097c04 100644
--- a/frontend/src/pages/CompanyDetail.tsx
+++ b/frontend/src/pages/CompanyDetail.tsx
@@ -43,7 +43,7 @@ export function CompanyDetailPage() {
const { data: decisions } = useCorporateDecisions(company?.ticker);
const { data: trends } = useTrends({ ticker: company?.ticker, limit: 200 });
const { data: trendHistory } = useTrendHistory({ ticker: company?.ticker, limit: 500 });
- const { data: marketPrices } = useMarketPrices(company?.ticker);
+ const { data: marketPrices } = useMarketPrices(company?.ticker, 200);
const [tab, setTab] = useState<'trends' | 'sources' | 'aliases' | 'macro' | 'competitors' | 'patterns' | 'signals' | 'decisions'>('trends');
if (isLoading || !company) return ;
@@ -595,21 +595,33 @@ function TrendHistoryChart({ trends, latestTrends, ticker, marketPrices }: { tre
.filter((t) => t.entity_id === ticker && t.window === selectedWindow)
.sort((a, b) => new Date(a.generated_at).getTime() - new Date(b.generated_at).getTime());
- // Build a price lookup by date (closest price per day)
- const priceByDay = new Map();
- for (const p of marketPrices ?? []) {
- if (p.bar_timestamp && p.close != null) {
- const d = new Date(p.bar_timestamp).toISOString().slice(0, 10);
- priceByDay.set(d, p.close);
+ // Build a price lookup — match by closest timestamp to each trend point
+ const sortedPrices = [...(marketPrices ?? [])]
+ .filter((p) => p.bar_timestamp != null && p.close != null)
+ .sort((a, b) => a.bar_timestamp - b.bar_timestamp);
+
+ function findClosestPrice(ts: number): number | undefined {
+ if (sortedPrices.length === 0) return undefined;
+ let best = sortedPrices[0];
+ let bestDiff = Math.abs(ts - best.bar_timestamp);
+ for (const p of sortedPrices) {
+ const diff = Math.abs(ts - p.bar_timestamp);
+ if (diff < bestDiff) {
+ best = p;
+ bestDiff = diff;
+ }
}
+ // Only match if within 2 hours (for intraday) or 36 hours (for daily)
+ const maxGap = selectedWindow === 'intraday' ? 2 * 3600_000 : 36 * 3600_000;
+ return bestDiff <= maxGap ? best.close : undefined;
}
const chartData: ChartPoint[] = filtered.map((t) => {
- const trendDate = new Date(t.generated_at).toISOString().slice(0, 10);
- const price = priceByDay.get(trendDate);
+ const trendTs = new Date(t.generated_at).getTime();
+ const price = findClosestPrice(trendTs);
return {
time: new Date(t.generated_at).toLocaleDateString('en-US', { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }),
- timestamp: new Date(t.generated_at).getTime(),
+ timestamp: trendTs,
strength: +(t.trend_strength * 100).toFixed(1),
confidence: +(t.confidence * 100).toFixed(1),
contradiction: +(t.contradiction_score * 100).toFixed(1),
diff --git a/infra/migrations/025_intraday_market_source.sql b/infra/migrations/025_intraday_market_source.sql
new file mode 100644
index 0000000..5c5839d
--- /dev/null
+++ b/infra/migrations/025_intraday_market_source.sql
@@ -0,0 +1,15 @@
+-- Add intraday market data source (hourly bars via Polygon range endpoint).
+-- The scheduler expands this into per-ticker jobs every 15 minutes.
+
+INSERT INTO sources (source_type, source_name, config, active, company_id)
+SELECT
+ 'market_api',
+ 'Polygon Intraday Hourly',
+ '{"endpoint": "intraday_bars", "provider": "polygon", "timespan": "hour", "multiplier": 1, "adjusted": true, "polling_interval_seconds": 900}'::jsonb,
+ true,
+ NULL
+WHERE NOT EXISTS (
+ SELECT 1 FROM sources
+ WHERE source_type = 'market_api'
+ AND config->>'endpoint' = 'intraday_bars'
+);
diff --git a/services/adapters/market_adapter.py b/services/adapters/market_adapter.py
index 3d49a4f..dce07bf 100644
--- a/services/adapters/market_adapter.py
+++ b/services/adapters/market_adapter.py
@@ -44,6 +44,7 @@ class PolygonMarketAdapter(MarketDataAdapter):
PREV_BARS = "/v2/aggs/ticker/{ticker}/prev"
RANGE_BARS = "/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}"
GROUPED_DAILY = "/v2/aggs/grouped/locale/us/market/stocks/{date}"
+ INTRADAY_BARS = "/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{from_date}/{to_date}"
TICKER_DETAILS = "/v3/reference/tickers/{ticker}"
def __init__(self, api_key: str, base_url: str = "https://api.polygon.io") -> None:
@@ -133,6 +134,22 @@ class PolygonMarketAdapter(MarketDataAdapter):
params["sort"] = config["sort"]
if config.get("limit"):
params["limit"] = str(config["limit"])
+ elif endpoint_key == "intraday_bars":
+ # Intraday: fetch hourly bars for today
+ from datetime import date as date_cls
+ today = date_cls.today().isoformat()
+ multiplier = str(config.get("multiplier", 1))
+ timespan = config.get("timespan", "hour")
+ path = self.INTRADAY_BARS.format(
+ ticker=ticker,
+ multiplier=multiplier,
+ timespan=timespan,
+ from_date=today,
+ to_date=today,
+ )
+ params["adjusted"] = str(config.get("adjusted", True)).lower()
+ params["sort"] = "asc"
+ params["limit"] = str(config.get("limit", 50))
elif endpoint_key == "grouped_daily":
# Grouped daily: returns bars for ALL tickers for a given date
target_date = config.get("date", "")
diff --git a/services/scheduler/app.py b/services/scheduler/app.py
index 2181f1e..b652d50 100644
--- a/services/scheduler/app.py
+++ b/services/scheduler/app.py
@@ -453,11 +453,22 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
# Build job with ticker="_MARKET" for global sources
job = build_job_payload(src, [], now)
- job["ticker"] = "_MARKET"
- await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(job))
- enqueued += 1
-
- logger.info("Enqueued grouped daily market data job")
+ if endpoint == "intraday_bars":
+ # Expand intraday source into per-ticker jobs for all active companies
+ tickers = await pool.fetch(
+ "SELECT ticker FROM companies WHERE active = TRUE"
+ )
+ for t_row in tickers:
+ ticker_job = dict(job)
+ ticker_job["ticker"] = t_row["ticker"]
+ await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(ticker_job))
+ enqueued += 1
+ logger.info("Enqueued %d intraday bar jobs", len(tickers))
+ else:
+ job["ticker"] = "_MARKET"
+ await rds.rpush(queue_key(QUEUE_INGESTION), json.dumps(job))
+ enqueued += 1
+ logger.info("Enqueued grouped daily market data job")
logger.info(
"Cycle complete: enqueued=%d skipped_not_due=%d skipped_rate_limit=%d total_sources=%d",