Files
Celes Renata 7fcc8a6c07
ci/woodpecker/push/test Pipeline failed
ci/woodpecker/push/build-1 unknown status
ci/woodpecker/push/build-3 unknown status
ci/woodpecker/push/build-2 unknown status
ci/woodpecker/push/finalize unknown status
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: model validation, calibration, and signal quality layer
- Migration 035: prediction_snapshots, prediction_outcomes, signal_evidence_links, model_metric_snapshots tables + SQL views
- Prediction snapshot writer with canonical evidence keys, duplicate detection, contribution scores
- Outcome evaluator across 5 horizons (1h, 6h, 1d, 7d, 30d)
- Metrics engine: ECE, Brier score, IC, Rank IC, benchmark comparison
- Attribution engine: per-source, per-catalyst, per-layer performance
- Calibration engine: Bayesian shrinkage source reliability
- Quality gate for live trading eligibility with configurable thresholds
- 7 new /api/validation/* endpoints
- Upgraded OpsModel dashboard with validation tab
- Enhanced recommendation display with calibration context
- Backtest replay validation mode
- 86 Python tests (unit + property-based), 179 frontend tests passing
2026-05-01 03:04:58 +00:00

638 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Metrics Engine — computes calibration, IC, Brier, and benchmark metrics.
Aggregates model quality metrics across configurable lookback windows and
prediction horizons. Stores periodic snapshots for time-series analysis
of model performance trends.
Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 6.1, 6.2, 6.3, 6.4, 6.5,
9.1, 9.2, 9.3, 9.4, 10.1, 10.2, 10.3, 10.4, 10.5
"""
from __future__ import annotations
import json
import logging
import math
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import asyncpg
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CONFIDENCE_BUCKETS: list[tuple[float, float]] = [
(0.50, 0.60),
(0.60, 0.70),
(0.70, 0.80),
(0.80, 0.90),
(0.90, 1.00),
]
LOOKBACK_WINDOWS: list[str] = ["7d", "30d", "90d", "all"]
LOOKBACK_DURATIONS: dict[str, timedelta | None] = {
"7d": timedelta(days=7),
"30d": timedelta(days=30),
"90d": timedelta(days=90),
"all": None,
}
EVALUATION_HORIZONS: list[str] = ["1h", "6h", "1d", "7d", "30d"]
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class CalibrationBucket:
"""Calibration metrics for a single confidence bucket."""
bucket_low: float
bucket_high: float
avg_confidence: float
observed_win_rate: float
prediction_count: int
miscalibrated: bool # |avg_confidence - win_rate| > 0.15
@dataclass
class ModelMetricSnapshot:
"""Aggregate model quality metrics for a lookback/horizon combination."""
id: str
generated_at: datetime
lookback_window: str
horizon: str
prediction_count: int
win_rate: float
directional_accuracy: float
information_coefficient: float | None
rank_information_coefficient: float | None
avg_return: float
avg_excess_return_vs_spy: float
avg_excess_return_vs_sector: float
calibration_error: float # ECE
brier_score: float
buy_win_rate: float
sell_win_rate: float
hold_win_rate: float
metadata: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Pure computation functions
# ---------------------------------------------------------------------------
def compute_calibration_error(
confidences: list[float],
outcomes: list[bool],
) -> tuple[float, list[CalibrationBucket]]:
"""Compute ECE and calibration buckets.
ECE = Σ (n_b / N) * |avg_conf_b - win_rate_b|
Groups predictions into 5 confidence buckets and computes the weighted
average of |avg_confidence - observed_win_rate| across all buckets.
Flags buckets where |diff| > 0.15 as miscalibrated.
Returns (ece, buckets). Returns (0.0, []) when no data is provided.
"""
if not confidences or not outcomes:
return 0.0, []
n = len(confidences)
buckets: list[CalibrationBucket] = []
ece = 0.0
for low, high in CONFIDENCE_BUCKETS:
bucket_confs: list[float] = []
bucket_outcomes: list[bool] = []
for conf, outcome in zip(confidences, outcomes):
# Last bucket is inclusive on the right: [0.90, 1.00]
if high == 1.00:
in_bucket = low <= conf <= high
else:
in_bucket = low <= conf < high
if in_bucket:
bucket_confs.append(conf)
bucket_outcomes.append(outcome)
count = len(bucket_confs)
if count == 0:
# Empty bucket — exclude from ECE, still record it
buckets.append(
CalibrationBucket(
bucket_low=low,
bucket_high=high,
avg_confidence=0.0,
observed_win_rate=0.0,
prediction_count=0,
miscalibrated=False,
)
)
continue
avg_conf = sum(bucket_confs) / count
win_rate = sum(1.0 for o in bucket_outcomes if o) / count
diff = abs(avg_conf - win_rate)
miscalibrated = diff > 0.15
buckets.append(
CalibrationBucket(
bucket_low=low,
bucket_high=high,
avg_confidence=avg_conf,
observed_win_rate=win_rate,
prediction_count=count,
miscalibrated=miscalibrated,
)
)
ece += (count / n) * diff
return ece, buckets
def compute_brier_score(
p_bulls: list[float],
outcomes: list[bool],
) -> float:
"""Brier score = mean((p_bull - outcome)^2).
outcome is 1.0 when price moved in predicted direction, 0.0 otherwise.
Returns value in [0.0, 1.0]. Returns 0.0 for empty input.
"""
if not p_bulls or not outcomes:
return 0.0
n = len(p_bulls)
total = 0.0
for p, o in zip(p_bulls, outcomes):
actual = 1.0 if o else 0.0
total += (p - actual) ** 2
return total / n
def _pearson_correlation(xs: list[float], ys: list[float]) -> float | None:
"""Compute Pearson correlation coefficient between two lists.
Returns None if the lists have fewer than 2 elements or if either
has zero variance. Guards against NaN/infinity.
"""
n = len(xs)
if n < 2:
return None
mean_x = sum(xs) / n
mean_y = sum(ys) / n
cov = 0.0
var_x = 0.0
var_y = 0.0
for x, y in zip(xs, ys):
dx = x - mean_x
dy = y - mean_y
cov += dx * dy
var_x += dx * dx
var_y += dy * dy
if var_x == 0.0 or var_y == 0.0:
return None
r = cov / math.sqrt(var_x * var_y)
# Guard against floating-point drift
if math.isnan(r) or math.isinf(r):
return None
# Clamp to [-1.0, 1.0]
return max(-1.0, min(1.0, r))
def _rank_data(values: list[float]) -> list[float]:
"""Compute fractional ranks for a list of values (average tie-breaking)."""
n = len(values)
indexed = sorted(range(n), key=lambda i: values[i])
ranks = [0.0] * n
i = 0
while i < n:
# Find the end of the tie group
j = i + 1
while j < n and values[indexed[j]] == values[indexed[i]]:
j += 1
# Average rank for the tie group (1-based)
avg_rank = (i + j + 1) / 2.0
for k in range(i, j):
ranks[indexed[k]] = avg_rank
i = j
return ranks
def compute_information_coefficient(
scores: list[float],
returns: list[float],
) -> float | None:
"""Pearson correlation between prediction scores and future returns.
Returns None when fewer than 30 data points.
Returns value in [-1.0, 1.0].
"""
if len(scores) < 30 or len(returns) < 30:
return None
n = min(len(scores), len(returns))
return _pearson_correlation(scores[:n], returns[:n])
def compute_rank_information_coefficient(
scores: list[float],
returns: list[float],
) -> float | None:
"""Spearman rank correlation between prediction scores and future returns.
Ranks the data and computes Pearson correlation on the ranks.
Returns None when fewer than 30 data points.
Returns value in [-1.0, 1.0].
"""
if len(scores) < 30 or len(returns) < 30:
return None
n = min(len(scores), len(returns))
ranked_scores = _rank_data(scores[:n])
ranked_returns = _rank_data(returns[:n])
return _pearson_correlation(ranked_scores, ranked_returns)
def compute_contribution_scores(
weights: list[float],
) -> list[float]:
"""Compute contribution scores from document weights.
Each score = weight_i / sum(weights). Sums to 1.0.
Each score in [0.0, 1.0].
Returns empty list for empty input.
"""
if not weights:
return []
total = sum(weights)
if total == 0.0:
n = len(weights)
return [1.0 / n] * n
return [w / total for w in weights]
def compute_hit_rate_improvement(win_rate: float) -> float:
"""Hit rate improvement over random 50/50 baseline.
Defined as (system_win_rate - 0.5) / 0.5.
"""
return (win_rate - 0.5) / 0.5
# ---------------------------------------------------------------------------
# SQL queries for v_prediction_performance view
# ---------------------------------------------------------------------------
_PERFORMANCE_DATA_SQL = """
SELECT
ticker,
direction,
action,
confidence,
strength,
p_bull,
score_company,
score_macro,
score_competitive,
future_return,
excess_return_vs_spy,
excess_return_vs_sector,
direction_correct,
profitable,
horizon,
generated_at
FROM v_prediction_performance
WHERE horizon = $1
"""
_PERFORMANCE_DATA_WITH_LOOKBACK_SQL = """
SELECT
ticker,
direction,
action,
confidence,
strength,
p_bull,
score_company,
score_macro,
score_competitive,
future_return,
excess_return_vs_spy,
excess_return_vs_sector,
direction_correct,
profitable,
horizon,
generated_at
FROM v_prediction_performance
WHERE horizon = $1
AND generated_at >= $2
"""
_INSERT_METRIC_SNAPSHOT_SQL = """
INSERT INTO model_metric_snapshots (
id, generated_at, lookback_window, horizon,
prediction_count, win_rate, directional_accuracy,
information_coefficient, rank_information_coefficient,
avg_return, avg_excess_return_vs_spy, avg_excess_return_vs_sector,
calibration_error, brier_score,
buy_win_rate, sell_win_rate, hold_win_rate,
metadata
) VALUES (
$1::uuid, $2, $3, $4,
$5, $6, $7,
$8, $9,
$10, $11, $12,
$13, $14,
$15, $16, $17,
$18::jsonb
)
"""
# ---------------------------------------------------------------------------
# Metric computation from raw rows
# ---------------------------------------------------------------------------
def _compute_metrics_from_rows(
rows: list[dict],
lookback_window: str,
horizon: str,
) -> ModelMetricSnapshot:
"""Compute all metrics from a list of prediction performance rows.
Returns a ModelMetricSnapshot with all computed metrics.
"""
now = datetime.now().astimezone()
snapshot_id = str(uuid.uuid4())
prediction_count = len(rows)
if prediction_count == 0:
return ModelMetricSnapshot(
id=snapshot_id,
generated_at=now,
lookback_window=lookback_window,
horizon=horizon,
prediction_count=0,
win_rate=0.0,
directional_accuracy=0.0,
information_coefficient=None,
rank_information_coefficient=None,
avg_return=0.0,
avg_excess_return_vs_spy=0.0,
avg_excess_return_vs_sector=0.0,
calibration_error=0.0,
brier_score=0.0,
buy_win_rate=0.0,
sell_win_rate=0.0,
hold_win_rate=0.0,
metadata={},
)
# --- Win rate and directional accuracy ---
direction_correct_count = sum(
1 for r in rows if r.get("direction_correct") is True
)
win_rate = direction_correct_count / prediction_count
directional_accuracy = win_rate # Same metric, different name
# --- Per-action win rates ---
buy_rows = [r for r in rows if (r.get("action") or "").lower() == "buy"]
sell_rows = [r for r in rows if (r.get("action") or "").lower() == "sell"]
hold_rows = [r for r in rows if (r.get("action") or "").lower() == "hold"]
buy_win_rate = (
sum(1 for r in buy_rows if r.get("direction_correct") is True) / len(buy_rows)
if buy_rows
else 0.0
)
sell_win_rate = (
sum(1 for r in sell_rows if r.get("direction_correct") is True)
/ len(sell_rows)
if sell_rows
else 0.0
)
hold_win_rate = (
sum(1 for r in hold_rows if r.get("direction_correct") is True)
/ len(hold_rows)
if hold_rows
else 0.0
)
# --- Average return ---
returns_list = [
r["future_return"] for r in rows if r.get("future_return") is not None
]
avg_return = sum(returns_list) / len(returns_list) if returns_list else 0.0
# --- Average excess return vs SPY (Requirement 9.1) ---
excess_spy_list = [
r["excess_return_vs_spy"]
for r in rows
if r.get("excess_return_vs_spy") is not None
]
avg_excess_return_vs_spy = (
sum(excess_spy_list) / len(excess_spy_list) if excess_spy_list else 0.0
)
# --- Average excess return vs sector ETF (Requirement 9.2) ---
excess_sector_list = [
r["excess_return_vs_sector"]
for r in rows
if r.get("excess_return_vs_sector") is not None
]
avg_excess_return_vs_sector = (
sum(excess_sector_list) / len(excess_sector_list)
if excess_sector_list
else 0.0
)
# --- Calibration error (ECE) (Requirements 5.1, 5.2, 5.3, 5.5) ---
confidences = [
r["confidence"] for r in rows if r.get("confidence") is not None
]
outcomes = [
r.get("direction_correct") is True
for r in rows
if r.get("confidence") is not None
]
ece, _buckets = compute_calibration_error(confidences, outcomes)
# --- Brier score (Requirement 5.4) ---
p_bulls = [r["p_bull"] for r in rows if r.get("p_bull") is not None]
brier_outcomes = [
r.get("direction_correct") is True
for r in rows
if r.get("p_bull") is not None
]
brier = compute_brier_score(p_bulls, brier_outcomes)
# --- Information Coefficient (Requirements 6.1, 6.5) ---
ic_scores = [
r["strength"] for r in rows if r.get("strength") is not None
and r.get("future_return") is not None
]
ic_returns = [
r["future_return"] for r in rows if r.get("strength") is not None
and r.get("future_return") is not None
]
ic = compute_information_coefficient(ic_scores, ic_returns)
# --- Rank Information Coefficient (Requirements 6.2, 6.5) ---
rank_ic = compute_rank_information_coefficient(ic_scores, ic_returns)
# --- Hit rate improvement (Requirement 9.4) ---
hit_rate_improvement = compute_hit_rate_improvement(win_rate)
# --- Metadata (Requirement 10.5) ---
metadata: dict = {
"hit_rate_improvement": hit_rate_improvement,
"buy_count": len(buy_rows),
"sell_count": len(sell_rows),
"hold_count": len(hold_rows),
"returns_count": len(returns_list),
"excess_spy_count": len(excess_spy_list),
"excess_sector_count": len(excess_sector_list),
}
return ModelMetricSnapshot(
id=snapshot_id,
generated_at=now,
lookback_window=lookback_window,
horizon=horizon,
prediction_count=prediction_count,
win_rate=win_rate,
directional_accuracy=directional_accuracy,
information_coefficient=ic,
rank_information_coefficient=rank_ic,
avg_return=avg_return,
avg_excess_return_vs_spy=avg_excess_return_vs_spy,
avg_excess_return_vs_sector=avg_excess_return_vs_sector,
calibration_error=ece,
brier_score=brier,
buy_win_rate=buy_win_rate,
sell_win_rate=sell_win_rate,
hold_win_rate=hold_win_rate,
metadata=metadata,
)
# ---------------------------------------------------------------------------
# Main entry point (Requirements 10.1, 10.2, 10.3, 10.4, 10.5)
# ---------------------------------------------------------------------------
async def compute_and_store_metric_snapshots(
pool: asyncpg.Pool,
) -> list[ModelMetricSnapshot]:
"""Compute metric snapshots for all lookback/horizon combinations.
Lookback windows: 7d, 30d, 90d, all-time.
Horizons: 1h, 6h, 1d, 7d, 30d.
For each of the 4 lookbacks × 5 horizons = 20 combinations, queries the
v_prediction_performance view, computes all metrics, and persists the
result to model_metric_snapshots.
Returns the list of computed snapshots.
"""
snapshots: list[ModelMetricSnapshot] = []
now = datetime.now().astimezone()
for lookback in LOOKBACK_WINDOWS:
duration = LOOKBACK_DURATIONS[lookback]
for horizon in EVALUATION_HORIZONS:
try:
# Query performance data
if duration is not None:
cutoff = now - duration
rows = await pool.fetch(
_PERFORMANCE_DATA_WITH_LOOKBACK_SQL,
horizon,
cutoff,
)
else:
rows = await pool.fetch(
_PERFORMANCE_DATA_SQL,
horizon,
)
# Convert asyncpg Records to dicts
row_dicts = [dict(r) for r in rows]
# Compute metrics
snapshot = _compute_metrics_from_rows(
row_dicts, lookback, horizon
)
# Persist
await pool.execute(
_INSERT_METRIC_SNAPSHOT_SQL,
snapshot.id,
snapshot.generated_at,
snapshot.lookback_window,
snapshot.horizon,
snapshot.prediction_count,
snapshot.win_rate,
snapshot.directional_accuracy,
snapshot.information_coefficient,
snapshot.rank_information_coefficient,
snapshot.avg_return,
snapshot.avg_excess_return_vs_spy,
snapshot.avg_excess_return_vs_sector,
snapshot.calibration_error,
snapshot.brier_score,
snapshot.buy_win_rate,
snapshot.sell_win_rate,
snapshot.hold_win_rate,
json.dumps(snapshot.metadata),
)
snapshots.append(snapshot)
except Exception:
logger.exception(
"Failed to compute metrics for lookback=%s horizon=%s",
lookback,
horizon,
)
continue
logger.info(
"Computed %d metric snapshots across %d lookback/horizon combinations",
len(snapshots),
len(LOOKBACK_WINDOWS) * len(EVALUATION_HORIZONS),
)
return snapshots