Files
stonks-oracle/tests/test_pbt_model_validation.py
T
Celes Renata 7fcc8a6c07
ci/woodpecker/push/test Pipeline failed
ci/woodpecker/push/build-1 unknown status
ci/woodpecker/push/build-3 unknown status
ci/woodpecker/push/build-2 unknown status
ci/woodpecker/push/finalize unknown status
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: model validation, calibration, and signal quality layer
- Migration 035: prediction_snapshots, prediction_outcomes, signal_evidence_links, model_metric_snapshots tables + SQL views
- Prediction snapshot writer with canonical evidence keys, duplicate detection, contribution scores
- Outcome evaluator across 5 horizons (1h, 6h, 1d, 7d, 30d)
- Metrics engine: ECE, Brier score, IC, Rank IC, benchmark comparison
- Attribution engine: per-source, per-catalyst, per-layer performance
- Calibration engine: Bayesian shrinkage source reliability
- Quality gate for live trading eligibility with configurable thresholds
- 7 new /api/validation/* endpoints
- Upgraded OpsModel dashboard with validation tab
- Enhanced recommendation display with calibration context
- Backtest replay validation mode
- 86 Python tests (unit + property-based), 179 frontend tests passing
2026-05-01 03:04:58 +00:00

663 lines
24 KiB
Python

"""Property-based tests for model validation, calibration, and signal quality.
Feature: model-validation-calibration
Tests correctness properties from the design specification covering
canonical evidence key determinism/idempotence, contribution score
invariants, calibration error bounds, Brier score bounds, information
coefficient bounds, source reliability shrinkage, and quality gate
determinism.
"""
from __future__ import annotations
import urllib.parse
from hypothesis import given, settings
from hypothesis import strategies as st
from services.validation.prediction_snapshot import (
compute_canonical_evidence_key,
compute_contribution_scores,
)
# ---------------------------------------------------------------------------
# Strategies
# ---------------------------------------------------------------------------
# Titles: arbitrary text (including whitespace, unicode)
title_strategy = st.text(min_size=0, max_size=200)
# URLs: build realistic URLs with optional query params
url_strategy = st.builds(
lambda scheme, host, path, query: urllib.parse.urlunparse(
(scheme, host, path, "", query, "")
),
scheme=st.sampled_from(["http", "https"]),
host=st.from_regex(r"[a-z0-9]{1,20}\.[a-z]{2,6}", fullmatch=True),
path=st.from_regex(r"(/[a-z0-9\-]{0,15}){0,4}", fullmatch=True),
query=st.from_regex(r"([a-z]{1,8}=[a-z0-9]{1,8}(&[a-z]{1,8}=[a-z0-9]{1,8}){0,3})?", fullmatch=True),
)
# ---------------------------------------------------------------------------
# Property 4: Canonical Evidence Key Determinism and Normalization Idempotence
# Validates: Requirements 2.3, 17.4
# ---------------------------------------------------------------------------
@given(title=title_strategy, url=url_strategy)
@settings(max_examples=100)
def test_canonical_evidence_key_determinism(title: str, url: str) -> None:
"""**Validates: Requirements 2.3, 17.4**
For any (title, url) pair, computing the canonical evidence key twice
with the same inputs SHALL produce the same result (determinism).
"""
key1 = compute_canonical_evidence_key(title, url)
key2 = compute_canonical_evidence_key(title, url)
assert key1 == key2, (
f"Determinism violated: same inputs produced different keys: "
f"{key1!r} != {key2!r}"
)
# Key should be a valid SHA256 hex digest (64 hex chars)
assert len(key1) == 64, f"Expected 64-char hex digest, got {len(key1)}"
assert all(c in "0123456789abcdef" for c in key1), (
f"Key contains non-hex characters: {key1!r}"
)
@given(title=title_strategy, url=url_strategy)
@settings(max_examples=100)
def test_canonical_evidence_key_normalization_idempotence(title: str, url: str) -> None:
"""**Validates: Requirements 2.3, 17.4**
Normalizing an already-normalized input and computing the key SHALL
produce the same key as the original computation (idempotence).
Normalization rules:
- Title: lowercase, strip leading/trailing whitespace
- URL: lowercase, strip query parameters (keep scheme, netloc, path)
"""
# Compute key from original (unnormalized) inputs
key_original = compute_canonical_evidence_key(title, url)
# Pre-normalize the inputs the same way the function does internally
normalized_title = title.strip().lower()
parsed = urllib.parse.urlparse(url.lower())
normalized_url = urllib.parse.urlunparse(
(parsed.scheme, parsed.netloc, parsed.path, "", "", "")
)
# Compute key from already-normalized inputs
key_from_normalized = compute_canonical_evidence_key(normalized_title, normalized_url)
assert key_original == key_from_normalized, (
f"Idempotence violated: key from original inputs ({key_original!r}) "
f"differs from key from pre-normalized inputs ({key_from_normalized!r}). "
f"title={title!r}, url={url!r}"
)
# ---------------------------------------------------------------------------
# Strategies for contribution score tests
# ---------------------------------------------------------------------------
positive_weights_strategy = st.lists(
st.floats(min_value=0.01, max_value=1000.0, allow_nan=False, allow_infinity=False),
min_size=1,
max_size=50,
)
# ---------------------------------------------------------------------------
# Property 7: Contribution Score Sum-to-One and Range
# Validates: Requirements 2.5, 17.7
# ---------------------------------------------------------------------------
@given(weights=positive_weights_strategy)
@settings(max_examples=100)
def test_contribution_scores_sum_to_one_and_range(weights: list[float]) -> None:
"""**Validates: Requirements 2.5, 17.7**
For any non-empty list of positive document weights, the computed
contribution scores SHALL each be in [0.0, 1.0] and SHALL sum to 1.0
(within floating-point tolerance of 1e-9).
"""
scores = compute_contribution_scores(weights)
# Same length as input
assert len(scores) == len(weights), (
f"Expected {len(weights)} scores, got {len(scores)}"
)
# Each score in [0.0, 1.0]
for i, score in enumerate(scores):
assert 0.0 <= score <= 1.0, (
f"Score at index {i} is {score}, expected in [0.0, 1.0]. "
f"weights={weights}"
)
# Scores sum to 1.0 within tolerance
total = sum(scores)
assert abs(total - 1.0) < 1e-9, (
f"Scores sum to {total}, expected 1.0 within 1e-9 tolerance. "
f"weights={weights}"
)
def test_contribution_scores_empty_input() -> None:
"""**Validates: Requirements 2.5, 17.7**
For an empty weight list, the result SHALL be an empty list.
"""
scores = compute_contribution_scores([])
assert scores == [], f"Expected empty list for empty input, got {scores}"
# ---------------------------------------------------------------------------
# Strategies for calibration error tests
# ---------------------------------------------------------------------------
confidence_strategy = st.floats(
min_value=0.50, max_value=1.00, allow_nan=False, allow_infinity=False
)
outcome_strategy = st.booleans()
prediction_pairs_strategy = st.lists(
st.tuples(confidence_strategy, outcome_strategy),
min_size=1,
max_size=100,
)
# Import metric functions
from services.validation.metrics import (
compute_brier_score,
compute_calibration_error,
compute_information_coefficient,
)
# ---------------------------------------------------------------------------
# Property 1: Calibration Error Range and Round-Trip
# Validates: Requirements 5.1, 5.3, 17.1
# ---------------------------------------------------------------------------
@given(pairs=prediction_pairs_strategy)
@settings(max_examples=100)
def test_calibration_error_range(pairs: list[tuple[float, bool]]) -> None:
"""**Validates: Requirements 5.1, 5.3, 17.1**
For any valid distribution of predictions with confidences in [0.50, 1.00]
and boolean outcomes, the Expected Calibration Error (ECE) SHALL be in
[0.0, 1.0].
"""
confidences = [c for c, _ in pairs]
outcomes = [o for _, o in pairs]
ece, buckets = compute_calibration_error(confidences, outcomes)
assert 0.0 <= ece <= 1.0, (
f"ECE {ece} is outside [0.0, 1.0]. "
f"confidences={confidences}, outcomes={outcomes}"
)
# Each bucket's metrics should also be well-formed
for bucket in buckets:
if bucket.prediction_count > 0:
assert 0.0 <= bucket.avg_confidence <= 1.0, (
f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) has "
f"avg_confidence={bucket.avg_confidence} outside [0.0, 1.0]"
)
assert 0.0 <= bucket.observed_win_rate <= 1.0, (
f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) has "
f"observed_win_rate={bucket.observed_win_rate} outside [0.0, 1.0]"
)
def test_calibration_error_zero_when_perfectly_calibrated() -> None:
"""**Validates: Requirements 5.1, 5.3, 17.1**
When every bucket's observed win rate exactly matches its average
confidence, ECE SHALL be 0.0.
Constructs a scenario with predictions in multiple buckets where the
fraction of True outcomes in each bucket equals the bucket's average
confidence.
"""
# For each bucket midpoint, place predictions so win_rate == avg_confidence.
# Use 100 predictions per bucket at the midpoint confidence.
# Set exactly round(100 * midpoint) outcomes to True.
bucket_midpoints = [0.55, 0.65, 0.75, 0.85, 0.95]
n_per_bucket = 100
confidences: list[float] = []
outcomes: list[bool] = []
for midpoint in bucket_midpoints:
n_true = round(n_per_bucket * midpoint)
n_false = n_per_bucket - n_true
confidences.extend([midpoint] * n_per_bucket)
outcomes.extend([True] * n_true + [False] * n_false)
ece, buckets = compute_calibration_error(confidences, outcomes)
assert ece == 0.0, (
f"ECE should be 0.0 for perfectly calibrated predictions, got {ece}. "
f"Buckets: {[(b.avg_confidence, b.observed_win_rate, b.prediction_count) for b in buckets]}"
)
# Verify each non-empty bucket has matching avg_confidence and win_rate
for bucket in buckets:
if bucket.prediction_count > 0:
assert bucket.avg_confidence == bucket.observed_win_rate, (
f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) has "
f"avg_confidence={bucket.avg_confidence} != "
f"observed_win_rate={bucket.observed_win_rate}"
)
assert not bucket.miscalibrated, (
f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) should not "
f"be flagged as miscalibrated when perfectly calibrated"
)
# ---------------------------------------------------------------------------
# Strategies for Brier score tests
# ---------------------------------------------------------------------------
p_bull_strategy = st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
)
brier_outcome_strategy = st.booleans()
brier_pairs_strategy = st.lists(
st.tuples(p_bull_strategy, brier_outcome_strategy),
min_size=1,
max_size=100,
)
# ---------------------------------------------------------------------------
# Property 2: Brier Score Range and Perfect Prediction
# Validates: Requirements 5.4, 17.2
# ---------------------------------------------------------------------------
@given(pairs=brier_pairs_strategy)
@settings(max_examples=100)
def test_brier_score_range(pairs: list[tuple[float, bool]]) -> None:
"""**Validates: Requirements 5.4, 17.2**
For any list of (p_bull, outcome) pairs where p_bull ∈ [0.0, 1.0] and
outcome is boolean, the Brier score SHALL be in [0.0, 1.0].
"""
p_bulls = [p for p, _ in pairs]
outcomes = [o for _, o in pairs]
brier = compute_brier_score(p_bulls, outcomes)
assert 0.0 <= brier <= 1.0, (
f"Brier score {brier} is outside [0.0, 1.0]. "
f"p_bulls={p_bulls}, outcomes={outcomes}"
)
@given(n=st.integers(min_value=1, max_value=100))
@settings(max_examples=100)
def test_brier_score_perfect_prediction(n: int) -> None:
"""**Validates: Requirements 5.4, 17.2**
When all predictions are perfectly correct — p_bull = 1.0 with
outcome = True, or p_bull = 0.0 with outcome = False — the Brier
score SHALL be 0.0.
"""
# Case 1: all p_bull = 1.0 and outcome = True
p_bulls_all_bull = [1.0] * n
outcomes_all_true = [True] * n
brier_bull = compute_brier_score(p_bulls_all_bull, outcomes_all_true)
assert brier_bull == 0.0, (
f"Brier score should be 0.0 for perfect bullish predictions, "
f"got {brier_bull} with n={n}"
)
# Case 2: all p_bull = 0.0 and outcome = False
p_bulls_all_bear = [0.0] * n
outcomes_all_false = [False] * n
brier_bear = compute_brier_score(p_bulls_all_bear, outcomes_all_false)
assert brier_bear == 0.0, (
f"Brier score should be 0.0 for perfect bearish predictions, "
f"got {brier_bear} with n={n}"
)
# ---------------------------------------------------------------------------
# Strategies for Information Coefficient tests
# ---------------------------------------------------------------------------
ic_score_strategy = st.floats(
min_value=-100.0, max_value=100.0, allow_nan=False, allow_infinity=False
)
# Generate lists of at least 30 (score, return) pairs
ic_pairs_strategy = st.lists(
st.tuples(ic_score_strategy, ic_score_strategy),
min_size=30,
max_size=100,
)
# ---------------------------------------------------------------------------
# Property 3: Information Coefficient Range and Perfect Correlation
# Validates: Requirements 6.1, 6.2, 17.3
# ---------------------------------------------------------------------------
@given(pairs=ic_pairs_strategy)
@settings(max_examples=100)
def test_information_coefficient_range(pairs: list[tuple[float, float]]) -> None:
"""**Validates: Requirements 6.1, 6.2, 17.3**
For any list of (score, return) pairs with at least 30 elements where
scores and returns are finite floats, the Information Coefficient
(Pearson correlation) SHALL be in [-1.0, 1.0] or None (when variance
is zero).
"""
scores = [s for s, _ in pairs]
returns = [r for _, r in pairs]
ic = compute_information_coefficient(scores, returns)
# IC may be None if variance is zero in either list
if ic is not None:
assert -1.0 <= ic <= 1.0, (
f"IC {ic} is outside [-1.0, 1.0]. "
f"scores={scores[:5]}..., returns={returns[:5]}..."
)
@given(
scores=st.lists(
st.floats(min_value=-100.0, max_value=100.0, allow_nan=False, allow_infinity=False),
min_size=30,
max_size=100,
).filter(lambda xs: max(xs) - min(xs) > 1e-6),
a=st.floats(min_value=0.01, max_value=100.0, allow_nan=False, allow_infinity=False),
b=st.floats(min_value=-100.0, max_value=100.0, allow_nan=False, allow_infinity=False),
)
@settings(max_examples=100)
def test_information_coefficient_perfect_positive_correlation(
scores: list[float], a: float, b: float
) -> None:
"""**Validates: Requirements 6.1, 6.2, 17.3**
When scores and returns are perfectly positively linearly correlated
(returns = a * scores + b, a > 0), IC SHALL be 1.0 within
floating-point tolerance.
"""
returns = [a * s + b for s in scores]
ic = compute_information_coefficient(scores, returns)
assert ic is not None, (
f"IC should not be None for perfectly correlated data with variance. "
f"a={a}, b={b}, scores={scores[:5]}..."
)
assert abs(ic - 1.0) < 1e-6, (
f"IC should be 1.0 for perfectly positively correlated data, "
f"got {ic}. a={a}, b={b}"
)
# ---------------------------------------------------------------------------
# Strategies for source reliability tests
# ---------------------------------------------------------------------------
from services.validation.calibration import compute_source_reliability
observed_win_rate_strategy = st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
)
sample_count_strategy = st.integers(min_value=0, max_value=100_000)
# ---------------------------------------------------------------------------
# Property 5: Source Reliability Bayesian Shrinkage Bounds and Convergence
# Validates: Requirements 8.1, 8.2, 17.5
# ---------------------------------------------------------------------------
@given(
observed_win_rate=observed_win_rate_strategy,
sample_count=sample_count_strategy,
)
@settings(max_examples=100)
def test_source_reliability_range(observed_win_rate: float, sample_count: int) -> None:
"""**Validates: Requirements 8.1, 8.2, 17.5**
For any observed_win_rate in [0.0, 1.0] and sample_count >= 0,
the source reliability computed via Bayesian shrinkage SHALL be
in [0.0, 1.0].
"""
reliability = compute_source_reliability(observed_win_rate, sample_count)
assert 0.0 <= reliability <= 1.0, (
f"Reliability {reliability} is outside [0.0, 1.0]. "
f"observed_win_rate={observed_win_rate}, sample_count={sample_count}"
)
def test_source_reliability_zero_samples() -> None:
"""**Validates: Requirements 8.1, 8.2, 17.5**
When sample_count = 0, reliability SHALL be exactly 0.5 (the prior mean).
"""
reliability = compute_source_reliability(observed_win_rate=0.8, sample_count=0)
assert reliability == 0.5, (
f"Reliability should be 0.5 when sample_count=0, got {reliability}"
)
# Also verify with different win rates
for wr in [0.0, 0.25, 0.5, 0.75, 1.0]:
r = compute_source_reliability(observed_win_rate=wr, sample_count=0)
assert r == 0.5, (
f"Reliability should be 0.5 when sample_count=0 regardless of "
f"observed_win_rate={wr}, got {r}"
)
@given(
observed_win_rate=st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
)
@settings(max_examples=100)
def test_source_reliability_convergence(observed_win_rate: float) -> None:
"""**Validates: Requirements 8.1, 8.2, 17.5**
As sample_count increases toward infinity, reliability SHALL approach
the observed_win_rate. For a large sample_count (e.g., 10000),
reliability should be within 0.01 of observed_win_rate.
"""
reliability = compute_source_reliability(observed_win_rate, sample_count=10_000)
assert abs(reliability - observed_win_rate) < 0.01, (
f"Reliability {reliability} should be within 0.01 of "
f"observed_win_rate {observed_win_rate} when sample_count=10000. "
f"Difference: {abs(reliability - observed_win_rate)}"
)
# ---------------------------------------------------------------------------
# Strategies for quality gate tests
# ---------------------------------------------------------------------------
from services.trading.model_quality_gate import (
GateThresholdResult,
QualityGateConfig,
_evaluate_thresholds,
)
# Snapshot dict strategy: generate each metric value in a reasonable range
snapshot_strategy = st.fixed_dictionaries({
"prediction_count": st.integers(min_value=0, max_value=10_000),
"information_coefficient": st.floats(
min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
"win_rate": st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
"calibration_error": st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
"avg_excess_return_vs_spy": st.floats(
min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
})
# Config strategy: generate each threshold in a reasonable range
gate_config_strategy = st.builds(
QualityGateConfig,
min_prediction_count=st.integers(min_value=0, max_value=10_000),
min_ic=st.floats(
min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
min_win_rate=st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
max_ece=st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
min_excess_return_vs_spy=st.floats(
min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
)
# ---------------------------------------------------------------------------
# Property 6: Quality Gate Determinism and Threshold Monotonicity
# Validates: Requirements 11.1, 17.6
# ---------------------------------------------------------------------------
@given(snapshot=snapshot_strategy, config=gate_config_strategy)
@settings(max_examples=100)
def test_quality_gate_determinism(
snapshot: dict, config: QualityGateConfig
) -> None:
"""**Validates: Requirements 11.1, 17.6**
For any set of model metric values and quality gate configuration,
calling _evaluate_thresholds twice with the same inputs SHALL produce
the same pass/fail result for every threshold (determinism).
"""
results1 = _evaluate_thresholds(snapshot, config)
results2 = _evaluate_thresholds(snapshot, config)
assert len(results1) == len(results2), (
f"Different number of threshold results: {len(results1)} vs {len(results2)}"
)
for r1, r2 in zip(results1, results2):
assert r1.name == r2.name, (
f"Threshold name mismatch: {r1.name!r} vs {r2.name!r}"
)
assert r1.threshold == r2.threshold, (
f"Threshold value mismatch for {r1.name}: "
f"{r1.threshold} vs {r2.threshold}"
)
assert r1.actual == r2.actual, (
f"Actual value mismatch for {r1.name}: "
f"{r1.actual} vs {r2.actual}"
)
assert r1.passed == r2.passed, (
f"Determinism violated for threshold {r1.name}: "
f"first call passed={r1.passed}, second call passed={r2.passed}. "
f"actual={r1.actual}, threshold={r1.threshold}"
)
# Overall gate pass/fail should also be deterministic
all_passed_1 = all(r.passed for r in results1)
all_passed_2 = all(r.passed for r in results2)
assert all_passed_1 == all_passed_2, (
f"Overall gate determinism violated: "
f"first call passed={all_passed_1}, second call passed={all_passed_2}"
)
@given(
snapshot=snapshot_strategy,
config=gate_config_strategy,
relax_amount=st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False
),
threshold_to_relax=st.sampled_from([
"min_prediction_count",
"min_ic",
"min_win_rate",
"max_ece",
"min_excess_return_vs_spy",
]),
)
@settings(max_examples=100)
def test_quality_gate_threshold_monotonicity(
snapshot: dict,
config: QualityGateConfig,
relax_amount: float,
threshold_to_relax: str,
) -> None:
"""**Validates: Requirements 11.1, 17.6**
For any configuration where the gate passes, relaxing any single
threshold (decreasing min values or increasing max values to make
them easier to satisfy) SHALL NOT cause the gate to fail
(monotonicity).
"""
# Evaluate with original config
original_results = _evaluate_thresholds(snapshot, config)
original_passed = all(r.passed for r in original_results)
# Only test monotonicity when the gate originally passes
if not original_passed:
return
# Create a relaxed config by making one threshold easier to satisfy
from dataclasses import replace
if threshold_to_relax == "min_prediction_count":
# Decrease min → easier to satisfy
relaxed_value = max(0, config.min_prediction_count - int(relax_amount * 1000))
relaxed_config = replace(config, min_prediction_count=relaxed_value)
elif threshold_to_relax == "min_ic":
# Decrease min → easier to satisfy
relaxed_config = replace(config, min_ic=config.min_ic - relax_amount)
elif threshold_to_relax == "min_win_rate":
# Decrease min → easier to satisfy
relaxed_config = replace(config, min_win_rate=config.min_win_rate - relax_amount)
elif threshold_to_relax == "max_ece":
# Increase max → easier to satisfy
relaxed_config = replace(config, max_ece=config.max_ece + relax_amount)
elif threshold_to_relax == "min_excess_return_vs_spy":
# Decrease min → easier to satisfy
relaxed_config = replace(
config,
min_excess_return_vs_spy=config.min_excess_return_vs_spy - relax_amount,
)
else:
return # pragma: no cover
# Evaluate with relaxed config
relaxed_results = _evaluate_thresholds(snapshot, config=relaxed_config)
relaxed_passed = all(r.passed for r in relaxed_results)
assert relaxed_passed, (
f"Monotonicity violated: gate passed with original config but failed "
f"after relaxing {threshold_to_relax}. "
f"Original config: min_prediction_count={config.min_prediction_count}, "
f"min_ic={config.min_ic}, min_win_rate={config.min_win_rate}, "
f"max_ece={config.max_ece}, "
f"min_excess_return_vs_spy={config.min_excess_return_vs_spy}. "
f"Relaxed threshold: {threshold_to_relax} by {relax_amount}. "
f"Failed thresholds: "
f"{[(r.name, r.actual, r.threshold) for r in relaxed_results if not r.passed]}"
)