Files

151 lines
4.6 KiB
Python

"""Market context feature computation for aggregation windows.
Fetches recent market snapshots from PostgreSQL and computes context
features (price change, volume trend, volatility) that enrich trend
summaries and modulate signal weighting.
Requirements: 6.1, 6.2
"""
from __future__ import annotations
import math
from datetime import datetime, timedelta, timezone
from typing import Any
import asyncpg
from services.shared.schemas import MarketContext, TrendWindow
# Map TrendWindow values to lookback durations in days.
WINDOW_LOOKBACK_DAYS: dict[str, int] = {
TrendWindow.INTRADAY.value: 1,
TrendWindow.ONE_DAY.value: 2,
TrendWindow.SEVEN_DAY.value: 8,
TrendWindow.THIRTY_DAY.value: 35,
TrendWindow.NINETY_DAY.value: 95,
}
async def fetch_market_context(
pool: asyncpg.Pool,
ticker: str,
window: str,
reference_time: datetime | None = None,
) -> MarketContext:
"""Build a MarketContext for *ticker* over the given trend *window*.
Queries the ``market_snapshots`` table for recent bars and computes:
- price_change_pct: (last_close - first_close) / first_close
- avg_volume: mean volume across bars
- volume_change_pct: second-half avg volume vs first-half avg volume
- volatility: std-dev of close prices
- latest_close / latest_bar_at
Returns a MarketContext with ``bars_available == 0`` when no data exists.
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
lookback_days = WINDOW_LOOKBACK_DAYS.get(window, 8)
start = reference_time - timedelta(days=lookback_days)
rows = await pool.fetch(
"""
SELECT data, captured_at
FROM market_snapshots
WHERE ticker = $1
AND captured_at >= $2
AND captured_at <= $3
ORDER BY captured_at ASC
""",
ticker,
start,
reference_time,
)
if not rows:
return MarketContext(ticker=ticker)
bars = _extract_bars(rows)
if not bars:
return MarketContext(ticker=ticker)
return _compute_context(ticker, bars)
def _extract_bars(rows: list[Any]) -> list[dict[str, Any]]:
"""Extract OHLCV bar dicts from market_snapshot rows.
The ``data`` column is JSONB. Polygon prev-day bars store fields like
``o``, ``h``, ``l``, ``c``, ``v``, ``t``. We normalise to a common
dict with ``close``, ``volume``, ``captured_at``.
"""
bars: list[dict[str, Any]] = []
for row in rows:
data = row["data"]
if isinstance(data, str):
import json
data = json.loads(data)
# Polygon-style single bar or list of bars
items = data if isinstance(data, list) else [data]
for item in items:
close = item.get("c") or item.get("close")
volume = item.get("v") or item.get("volume")
if close is not None:
bars.append({
"close": float(close),
"volume": float(volume) if volume is not None else 0.0,
"captured_at": row["captured_at"],
})
return bars
def _compute_context(ticker: str, bars: list[dict[str, Any]]) -> MarketContext:
"""Derive market context features from a sorted list of bar dicts."""
closes = [b["close"] for b in bars]
volumes = [b["volume"] for b in bars]
first_close = closes[0]
last_close = closes[-1]
price_change_pct = (
((last_close - first_close) / first_close * 100.0)
if first_close != 0
else 0.0
)
avg_volume = sum(volumes) / len(volumes) if volumes else 0.0
# Volume trend: compare second half to first half
mid = len(volumes) // 2
if mid > 0:
first_half_avg = sum(volumes[:mid]) / mid
second_half_avg = sum(volumes[mid:]) / len(volumes[mid:])
volume_change_pct = (
((second_half_avg - first_half_avg) / first_half_avg * 100.0)
if first_half_avg > 0
else 0.0
)
else:
volume_change_pct = 0.0
# Volatility: std dev of closes
if len(closes) > 1:
mean_close = sum(closes) / len(closes)
variance = sum((c - mean_close) ** 2 for c in closes) / len(closes)
volatility = math.sqrt(variance)
else:
volatility = 0.0
return MarketContext(
ticker=ticker,
price_change_pct=round(price_change_pct, 4),
avg_volume=round(avg_volume, 2),
volume_change_pct=round(volume_change_pct, 4),
volatility=round(volatility, 6),
latest_close=last_close,
latest_bar_at=bars[-1]["captured_at"],
bars_available=len(bars),
)