"""Metrics Engine — computes calibration, IC, Brier, and benchmark metrics. Aggregates model quality metrics across configurable lookback windows and prediction horizons. Stores periodic snapshots for time-series analysis of model performance trends. Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 6.1, 6.2, 6.3, 6.4, 6.5, 9.1, 9.2, 9.3, 9.4, 10.1, 10.2, 10.3, 10.4, 10.5 """ from __future__ import annotations import json import logging import math import uuid from dataclasses import dataclass, field from datetime import datetime, timedelta import asyncpg logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- CONFIDENCE_BUCKETS: list[tuple[float, float]] = [ (0.50, 0.60), (0.60, 0.70), (0.70, 0.80), (0.80, 0.90), (0.90, 1.00), ] LOOKBACK_WINDOWS: list[str] = ["7d", "30d", "90d", "all"] LOOKBACK_DURATIONS: dict[str, timedelta | None] = { "7d": timedelta(days=7), "30d": timedelta(days=30), "90d": timedelta(days=90), "all": None, } EVALUATION_HORIZONS: list[str] = ["1h", "6h", "1d", "7d", "30d"] # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class CalibrationBucket: """Calibration metrics for a single confidence bucket.""" bucket_low: float bucket_high: float avg_confidence: float observed_win_rate: float prediction_count: int miscalibrated: bool # |avg_confidence - win_rate| > 0.15 @dataclass class ModelMetricSnapshot: """Aggregate model quality metrics for a lookback/horizon combination.""" id: str generated_at: datetime lookback_window: str horizon: str prediction_count: int win_rate: float directional_accuracy: float information_coefficient: float | None rank_information_coefficient: float | None avg_return: float avg_excess_return_vs_spy: float avg_excess_return_vs_sector: float calibration_error: float # ECE brier_score: float buy_win_rate: float sell_win_rate: float hold_win_rate: float metadata: dict = field(default_factory=dict) # --------------------------------------------------------------------------- # Pure computation functions # --------------------------------------------------------------------------- def compute_calibration_error( confidences: list[float], outcomes: list[bool], ) -> tuple[float, list[CalibrationBucket]]: """Compute ECE and calibration buckets. ECE = Σ (n_b / N) * |avg_conf_b - win_rate_b| Groups predictions into 5 confidence buckets and computes the weighted average of |avg_confidence - observed_win_rate| across all buckets. Flags buckets where |diff| > 0.15 as miscalibrated. Returns (ece, buckets). Returns (0.0, []) when no data is provided. """ if not confidences or not outcomes: return 0.0, [] n = len(confidences) buckets: list[CalibrationBucket] = [] ece = 0.0 for low, high in CONFIDENCE_BUCKETS: bucket_confs: list[float] = [] bucket_outcomes: list[bool] = [] for conf, outcome in zip(confidences, outcomes): # Last bucket is inclusive on the right: [0.90, 1.00] if high == 1.00: in_bucket = low <= conf <= high else: in_bucket = low <= conf < high if in_bucket: bucket_confs.append(conf) bucket_outcomes.append(outcome) count = len(bucket_confs) if count == 0: # Empty bucket — exclude from ECE, still record it buckets.append( CalibrationBucket( bucket_low=low, bucket_high=high, avg_confidence=0.0, observed_win_rate=0.0, prediction_count=0, miscalibrated=False, ) ) continue avg_conf = sum(bucket_confs) / count win_rate = sum(1.0 for o in bucket_outcomes if o) / count diff = abs(avg_conf - win_rate) miscalibrated = diff > 0.15 buckets.append( CalibrationBucket( bucket_low=low, bucket_high=high, avg_confidence=avg_conf, observed_win_rate=win_rate, prediction_count=count, miscalibrated=miscalibrated, ) ) ece += (count / n) * diff return ece, buckets def compute_brier_score( p_bulls: list[float], outcomes: list[bool], ) -> float: """Brier score = mean((p_bull - outcome)^2). outcome is 1.0 when price moved in predicted direction, 0.0 otherwise. Returns value in [0.0, 1.0]. Returns 0.0 for empty input. """ if not p_bulls or not outcomes: return 0.0 n = len(p_bulls) total = 0.0 for p, o in zip(p_bulls, outcomes): actual = 1.0 if o else 0.0 total += (p - actual) ** 2 return total / n def _pearson_correlation(xs: list[float], ys: list[float]) -> float | None: """Compute Pearson correlation coefficient between two lists. Returns None if the lists have fewer than 2 elements or if either has zero variance. Guards against NaN/infinity. """ n = len(xs) if n < 2: return None mean_x = sum(xs) / n mean_y = sum(ys) / n cov = 0.0 var_x = 0.0 var_y = 0.0 for x, y in zip(xs, ys): dx = x - mean_x dy = y - mean_y cov += dx * dy var_x += dx * dx var_y += dy * dy if var_x == 0.0 or var_y == 0.0: return None r = cov / math.sqrt(var_x * var_y) # Guard against floating-point drift if math.isnan(r) or math.isinf(r): return None # Clamp to [-1.0, 1.0] return max(-1.0, min(1.0, r)) def _rank_data(values: list[float]) -> list[float]: """Compute fractional ranks for a list of values (average tie-breaking).""" n = len(values) indexed = sorted(range(n), key=lambda i: values[i]) ranks = [0.0] * n i = 0 while i < n: # Find the end of the tie group j = i + 1 while j < n and values[indexed[j]] == values[indexed[i]]: j += 1 # Average rank for the tie group (1-based) avg_rank = (i + j + 1) / 2.0 for k in range(i, j): ranks[indexed[k]] = avg_rank i = j return ranks def compute_information_coefficient( scores: list[float], returns: list[float], ) -> float | None: """Pearson correlation between prediction scores and future returns. Returns None when fewer than 30 data points. Returns value in [-1.0, 1.0]. """ if len(scores) < 30 or len(returns) < 30: return None n = min(len(scores), len(returns)) return _pearson_correlation(scores[:n], returns[:n]) def compute_rank_information_coefficient( scores: list[float], returns: list[float], ) -> float | None: """Spearman rank correlation between prediction scores and future returns. Ranks the data and computes Pearson correlation on the ranks. Returns None when fewer than 30 data points. Returns value in [-1.0, 1.0]. """ if len(scores) < 30 or len(returns) < 30: return None n = min(len(scores), len(returns)) ranked_scores = _rank_data(scores[:n]) ranked_returns = _rank_data(returns[:n]) return _pearson_correlation(ranked_scores, ranked_returns) def compute_contribution_scores( weights: list[float], ) -> list[float]: """Compute contribution scores from document weights. Each score = weight_i / sum(weights). Sums to 1.0. Each score in [0.0, 1.0]. Returns empty list for empty input. """ if not weights: return [] total = sum(weights) if total == 0.0: n = len(weights) return [1.0 / n] * n return [w / total for w in weights] def compute_hit_rate_improvement(win_rate: float) -> float: """Hit rate improvement over random 50/50 baseline. Defined as (system_win_rate - 0.5) / 0.5. """ return (win_rate - 0.5) / 0.5 # --------------------------------------------------------------------------- # SQL queries for v_prediction_performance view # --------------------------------------------------------------------------- _PERFORMANCE_DATA_SQL = """ SELECT ticker, direction, action, confidence, strength, p_bull, score_company, score_macro, score_competitive, future_return, excess_return_vs_spy, excess_return_vs_sector, direction_correct, profitable, horizon, generated_at FROM v_prediction_performance WHERE horizon = $1 """ _PERFORMANCE_DATA_WITH_LOOKBACK_SQL = """ SELECT ticker, direction, action, confidence, strength, p_bull, score_company, score_macro, score_competitive, future_return, excess_return_vs_spy, excess_return_vs_sector, direction_correct, profitable, horizon, generated_at FROM v_prediction_performance WHERE horizon = $1 AND generated_at >= $2 """ _INSERT_METRIC_SNAPSHOT_SQL = """ INSERT INTO model_metric_snapshots ( id, generated_at, lookback_window, horizon, prediction_count, win_rate, directional_accuracy, information_coefficient, rank_information_coefficient, avg_return, avg_excess_return_vs_spy, avg_excess_return_vs_sector, calibration_error, brier_score, buy_win_rate, sell_win_rate, hold_win_rate, metadata ) VALUES ( $1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18::jsonb ) """ # --------------------------------------------------------------------------- # Metric computation from raw rows # --------------------------------------------------------------------------- def _compute_metrics_from_rows( rows: list[dict], lookback_window: str, horizon: str, ) -> ModelMetricSnapshot: """Compute all metrics from a list of prediction performance rows. Returns a ModelMetricSnapshot with all computed metrics. """ now = datetime.now().astimezone() snapshot_id = str(uuid.uuid4()) prediction_count = len(rows) if prediction_count == 0: return ModelMetricSnapshot( id=snapshot_id, generated_at=now, lookback_window=lookback_window, horizon=horizon, prediction_count=0, win_rate=0.0, directional_accuracy=0.0, information_coefficient=None, rank_information_coefficient=None, avg_return=0.0, avg_excess_return_vs_spy=0.0, avg_excess_return_vs_sector=0.0, calibration_error=0.0, brier_score=0.0, buy_win_rate=0.0, sell_win_rate=0.0, hold_win_rate=0.0, metadata={}, ) # --- Win rate and directional accuracy --- direction_correct_count = sum( 1 for r in rows if r.get("direction_correct") is True ) win_rate = direction_correct_count / prediction_count directional_accuracy = win_rate # Same metric, different name # --- Per-action win rates --- buy_rows = [r for r in rows if (r.get("action") or "").lower() == "buy"] sell_rows = [r for r in rows if (r.get("action") or "").lower() == "sell"] hold_rows = [r for r in rows if (r.get("action") or "").lower() == "hold"] buy_win_rate = ( sum(1 for r in buy_rows if r.get("direction_correct") is True) / len(buy_rows) if buy_rows else 0.0 ) sell_win_rate = ( sum(1 for r in sell_rows if r.get("direction_correct") is True) / len(sell_rows) if sell_rows else 0.0 ) hold_win_rate = ( sum(1 for r in hold_rows if r.get("direction_correct") is True) / len(hold_rows) if hold_rows else 0.0 ) # --- Average return --- returns_list = [ r["future_return"] for r in rows if r.get("future_return") is not None ] avg_return = sum(returns_list) / len(returns_list) if returns_list else 0.0 # --- Average excess return vs SPY (Requirement 9.1) --- excess_spy_list = [ r["excess_return_vs_spy"] for r in rows if r.get("excess_return_vs_spy") is not None ] avg_excess_return_vs_spy = ( sum(excess_spy_list) / len(excess_spy_list) if excess_spy_list else 0.0 ) # --- Average excess return vs sector ETF (Requirement 9.2) --- excess_sector_list = [ r["excess_return_vs_sector"] for r in rows if r.get("excess_return_vs_sector") is not None ] avg_excess_return_vs_sector = ( sum(excess_sector_list) / len(excess_sector_list) if excess_sector_list else 0.0 ) # --- Calibration error (ECE) (Requirements 5.1, 5.2, 5.3, 5.5) --- confidences = [ r["confidence"] for r in rows if r.get("confidence") is not None ] outcomes = [ r.get("direction_correct") is True for r in rows if r.get("confidence") is not None ] ece, _buckets = compute_calibration_error(confidences, outcomes) # --- Brier score (Requirement 5.4) --- p_bulls = [r["p_bull"] for r in rows if r.get("p_bull") is not None] brier_outcomes = [ r.get("direction_correct") is True for r in rows if r.get("p_bull") is not None ] brier = compute_brier_score(p_bulls, brier_outcomes) # --- Information Coefficient (Requirements 6.1, 6.5) --- ic_scores = [ r["strength"] for r in rows if r.get("strength") is not None and r.get("future_return") is not None ] ic_returns = [ r["future_return"] for r in rows if r.get("strength") is not None and r.get("future_return") is not None ] ic = compute_information_coefficient(ic_scores, ic_returns) # --- Rank Information Coefficient (Requirements 6.2, 6.5) --- rank_ic = compute_rank_information_coefficient(ic_scores, ic_returns) # --- Hit rate improvement (Requirement 9.4) --- hit_rate_improvement = compute_hit_rate_improvement(win_rate) # --- Metadata (Requirement 10.5) --- metadata: dict = { "hit_rate_improvement": hit_rate_improvement, "buy_count": len(buy_rows), "sell_count": len(sell_rows), "hold_count": len(hold_rows), "returns_count": len(returns_list), "excess_spy_count": len(excess_spy_list), "excess_sector_count": len(excess_sector_list), } return ModelMetricSnapshot( id=snapshot_id, generated_at=now, lookback_window=lookback_window, horizon=horizon, prediction_count=prediction_count, win_rate=win_rate, directional_accuracy=directional_accuracy, information_coefficient=ic, rank_information_coefficient=rank_ic, avg_return=avg_return, avg_excess_return_vs_spy=avg_excess_return_vs_spy, avg_excess_return_vs_sector=avg_excess_return_vs_sector, calibration_error=ece, brier_score=brier, buy_win_rate=buy_win_rate, sell_win_rate=sell_win_rate, hold_win_rate=hold_win_rate, metadata=metadata, ) # --------------------------------------------------------------------------- # Main entry point (Requirements 10.1, 10.2, 10.3, 10.4, 10.5) # --------------------------------------------------------------------------- async def compute_and_store_metric_snapshots( pool: asyncpg.Pool, ) -> list[ModelMetricSnapshot]: """Compute metric snapshots for all lookback/horizon combinations. Lookback windows: 7d, 30d, 90d, all-time. Horizons: 1h, 6h, 1d, 7d, 30d. For each of the 4 lookbacks × 5 horizons = 20 combinations, queries the v_prediction_performance view, computes all metrics, and persists the result to model_metric_snapshots. Returns the list of computed snapshots. """ snapshots: list[ModelMetricSnapshot] = [] now = datetime.now().astimezone() for lookback in LOOKBACK_WINDOWS: duration = LOOKBACK_DURATIONS[lookback] for horizon in EVALUATION_HORIZONS: try: # Query performance data if duration is not None: cutoff = now - duration rows = await pool.fetch( _PERFORMANCE_DATA_WITH_LOOKBACK_SQL, horizon, cutoff, ) else: rows = await pool.fetch( _PERFORMANCE_DATA_SQL, horizon, ) # Convert asyncpg Records to dicts row_dicts = [dict(r) for r in rows] # Compute metrics snapshot = _compute_metrics_from_rows( row_dicts, lookback, horizon ) # Persist await pool.execute( _INSERT_METRIC_SNAPSHOT_SQL, snapshot.id, snapshot.generated_at, snapshot.lookback_window, snapshot.horizon, snapshot.prediction_count, snapshot.win_rate, snapshot.directional_accuracy, snapshot.information_coefficient, snapshot.rank_information_coefficient, snapshot.avg_return, snapshot.avg_excess_return_vs_spy, snapshot.avg_excess_return_vs_sector, snapshot.calibration_error, snapshot.brier_score, snapshot.buy_win_rate, snapshot.sell_win_rate, snapshot.hold_win_rate, json.dumps(snapshot.metadata), ) snapshots.append(snapshot) except Exception: logger.exception( "Failed to compute metrics for lookback=%s horizon=%s", lookback, horizon, ) continue logger.info( "Computed %d metric snapshots across %d lookback/horizon combinations", len(snapshots), len(LOOKBACK_WINDOWS) * len(EVALUATION_HORIZONS), ) return snapshots