"""Property-based tests for model validation, calibration, and signal quality. Feature: model-validation-calibration Tests correctness properties from the design specification covering canonical evidence key determinism/idempotence, contribution score invariants, calibration error bounds, Brier score bounds, information coefficient bounds, source reliability shrinkage, and quality gate determinism. """ from __future__ import annotations import urllib.parse from hypothesis import given, settings from hypothesis import strategies as st from services.validation.prediction_snapshot import ( compute_canonical_evidence_key, compute_contribution_scores, ) # --------------------------------------------------------------------------- # Strategies # --------------------------------------------------------------------------- # Titles: arbitrary text (including whitespace, unicode) title_strategy = st.text(min_size=0, max_size=200) # URLs: build realistic URLs with optional query params url_strategy = st.builds( lambda scheme, host, path, query: urllib.parse.urlunparse( (scheme, host, path, "", query, "") ), scheme=st.sampled_from(["http", "https"]), host=st.from_regex(r"[a-z0-9]{1,20}\.[a-z]{2,6}", fullmatch=True), path=st.from_regex(r"(/[a-z0-9\-]{0,15}){0,4}", fullmatch=True), query=st.from_regex(r"([a-z]{1,8}=[a-z0-9]{1,8}(&[a-z]{1,8}=[a-z0-9]{1,8}){0,3})?", fullmatch=True), ) # --------------------------------------------------------------------------- # Property 4: Canonical Evidence Key Determinism and Normalization Idempotence # Validates: Requirements 2.3, 17.4 # --------------------------------------------------------------------------- @given(title=title_strategy, url=url_strategy) @settings(max_examples=100) def test_canonical_evidence_key_determinism(title: str, url: str) -> None: """**Validates: Requirements 2.3, 17.4** For any (title, url) pair, computing the canonical evidence key twice with the same inputs SHALL produce the same result (determinism). """ key1 = compute_canonical_evidence_key(title, url) key2 = compute_canonical_evidence_key(title, url) assert key1 == key2, ( f"Determinism violated: same inputs produced different keys: " f"{key1!r} != {key2!r}" ) # Key should be a valid SHA256 hex digest (64 hex chars) assert len(key1) == 64, f"Expected 64-char hex digest, got {len(key1)}" assert all(c in "0123456789abcdef" for c in key1), ( f"Key contains non-hex characters: {key1!r}" ) @given(title=title_strategy, url=url_strategy) @settings(max_examples=100) def test_canonical_evidence_key_normalization_idempotence(title: str, url: str) -> None: """**Validates: Requirements 2.3, 17.4** Normalizing an already-normalized input and computing the key SHALL produce the same key as the original computation (idempotence). Normalization rules: - Title: lowercase, strip leading/trailing whitespace - URL: lowercase, strip query parameters (keep scheme, netloc, path) """ # Compute key from original (unnormalized) inputs key_original = compute_canonical_evidence_key(title, url) # Pre-normalize the inputs the same way the function does internally normalized_title = title.strip().lower() parsed = urllib.parse.urlparse(url.lower()) normalized_url = urllib.parse.urlunparse( (parsed.scheme, parsed.netloc, parsed.path, "", "", "") ) # Compute key from already-normalized inputs key_from_normalized = compute_canonical_evidence_key(normalized_title, normalized_url) assert key_original == key_from_normalized, ( f"Idempotence violated: key from original inputs ({key_original!r}) " f"differs from key from pre-normalized inputs ({key_from_normalized!r}). " f"title={title!r}, url={url!r}" ) # --------------------------------------------------------------------------- # Strategies for contribution score tests # --------------------------------------------------------------------------- positive_weights_strategy = st.lists( st.floats(min_value=0.01, max_value=1000.0, allow_nan=False, allow_infinity=False), min_size=1, max_size=50, ) # --------------------------------------------------------------------------- # Property 7: Contribution Score Sum-to-One and Range # Validates: Requirements 2.5, 17.7 # --------------------------------------------------------------------------- @given(weights=positive_weights_strategy) @settings(max_examples=100) def test_contribution_scores_sum_to_one_and_range(weights: list[float]) -> None: """**Validates: Requirements 2.5, 17.7** For any non-empty list of positive document weights, the computed contribution scores SHALL each be in [0.0, 1.0] and SHALL sum to 1.0 (within floating-point tolerance of 1e-9). """ scores = compute_contribution_scores(weights) # Same length as input assert len(scores) == len(weights), ( f"Expected {len(weights)} scores, got {len(scores)}" ) # Each score in [0.0, 1.0] for i, score in enumerate(scores): assert 0.0 <= score <= 1.0, ( f"Score at index {i} is {score}, expected in [0.0, 1.0]. " f"weights={weights}" ) # Scores sum to 1.0 within tolerance total = sum(scores) assert abs(total - 1.0) < 1e-9, ( f"Scores sum to {total}, expected 1.0 within 1e-9 tolerance. " f"weights={weights}" ) def test_contribution_scores_empty_input() -> None: """**Validates: Requirements 2.5, 17.7** For an empty weight list, the result SHALL be an empty list. """ scores = compute_contribution_scores([]) assert scores == [], f"Expected empty list for empty input, got {scores}" # --------------------------------------------------------------------------- # Strategies for calibration error tests # --------------------------------------------------------------------------- confidence_strategy = st.floats( min_value=0.50, max_value=1.00, allow_nan=False, allow_infinity=False ) outcome_strategy = st.booleans() prediction_pairs_strategy = st.lists( st.tuples(confidence_strategy, outcome_strategy), min_size=1, max_size=100, ) # Import metric functions from services.validation.metrics import ( compute_brier_score, compute_calibration_error, compute_information_coefficient, ) # --------------------------------------------------------------------------- # Property 1: Calibration Error Range and Round-Trip # Validates: Requirements 5.1, 5.3, 17.1 # --------------------------------------------------------------------------- @given(pairs=prediction_pairs_strategy) @settings(max_examples=100) def test_calibration_error_range(pairs: list[tuple[float, bool]]) -> None: """**Validates: Requirements 5.1, 5.3, 17.1** For any valid distribution of predictions with confidences in [0.50, 1.00] and boolean outcomes, the Expected Calibration Error (ECE) SHALL be in [0.0, 1.0]. """ confidences = [c for c, _ in pairs] outcomes = [o for _, o in pairs] ece, buckets = compute_calibration_error(confidences, outcomes) assert 0.0 <= ece <= 1.0, ( f"ECE {ece} is outside [0.0, 1.0]. " f"confidences={confidences}, outcomes={outcomes}" ) # Each bucket's metrics should also be well-formed for bucket in buckets: if bucket.prediction_count > 0: assert 0.0 <= bucket.avg_confidence <= 1.0, ( f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) has " f"avg_confidence={bucket.avg_confidence} outside [0.0, 1.0]" ) assert 0.0 <= bucket.observed_win_rate <= 1.0, ( f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) has " f"observed_win_rate={bucket.observed_win_rate} outside [0.0, 1.0]" ) def test_calibration_error_zero_when_perfectly_calibrated() -> None: """**Validates: Requirements 5.1, 5.3, 17.1** When every bucket's observed win rate exactly matches its average confidence, ECE SHALL be 0.0. Constructs a scenario with predictions in multiple buckets where the fraction of True outcomes in each bucket equals the bucket's average confidence. """ # For each bucket midpoint, place predictions so win_rate == avg_confidence. # Use 100 predictions per bucket at the midpoint confidence. # Set exactly round(100 * midpoint) outcomes to True. bucket_midpoints = [0.55, 0.65, 0.75, 0.85, 0.95] n_per_bucket = 100 confidences: list[float] = [] outcomes: list[bool] = [] for midpoint in bucket_midpoints: n_true = round(n_per_bucket * midpoint) n_false = n_per_bucket - n_true confidences.extend([midpoint] * n_per_bucket) outcomes.extend([True] * n_true + [False] * n_false) ece, buckets = compute_calibration_error(confidences, outcomes) assert ece == 0.0, ( f"ECE should be 0.0 for perfectly calibrated predictions, got {ece}. " f"Buckets: {[(b.avg_confidence, b.observed_win_rate, b.prediction_count) for b in buckets]}" ) # Verify each non-empty bucket has matching avg_confidence and win_rate for bucket in buckets: if bucket.prediction_count > 0: assert bucket.avg_confidence == bucket.observed_win_rate, ( f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) has " f"avg_confidence={bucket.avg_confidence} != " f"observed_win_rate={bucket.observed_win_rate}" ) assert not bucket.miscalibrated, ( f"Bucket [{bucket.bucket_low}, {bucket.bucket_high}) should not " f"be flagged as miscalibrated when perfectly calibrated" ) # --------------------------------------------------------------------------- # Strategies for Brier score tests # --------------------------------------------------------------------------- p_bull_strategy = st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ) brier_outcome_strategy = st.booleans() brier_pairs_strategy = st.lists( st.tuples(p_bull_strategy, brier_outcome_strategy), min_size=1, max_size=100, ) # --------------------------------------------------------------------------- # Property 2: Brier Score Range and Perfect Prediction # Validates: Requirements 5.4, 17.2 # --------------------------------------------------------------------------- @given(pairs=brier_pairs_strategy) @settings(max_examples=100) def test_brier_score_range(pairs: list[tuple[float, bool]]) -> None: """**Validates: Requirements 5.4, 17.2** For any list of (p_bull, outcome) pairs where p_bull ∈ [0.0, 1.0] and outcome is boolean, the Brier score SHALL be in [0.0, 1.0]. """ p_bulls = [p for p, _ in pairs] outcomes = [o for _, o in pairs] brier = compute_brier_score(p_bulls, outcomes) assert 0.0 <= brier <= 1.0, ( f"Brier score {brier} is outside [0.0, 1.0]. " f"p_bulls={p_bulls}, outcomes={outcomes}" ) @given(n=st.integers(min_value=1, max_value=100)) @settings(max_examples=100) def test_brier_score_perfect_prediction(n: int) -> None: """**Validates: Requirements 5.4, 17.2** When all predictions are perfectly correct — p_bull = 1.0 with outcome = True, or p_bull = 0.0 with outcome = False — the Brier score SHALL be 0.0. """ # Case 1: all p_bull = 1.0 and outcome = True p_bulls_all_bull = [1.0] * n outcomes_all_true = [True] * n brier_bull = compute_brier_score(p_bulls_all_bull, outcomes_all_true) assert brier_bull == 0.0, ( f"Brier score should be 0.0 for perfect bullish predictions, " f"got {brier_bull} with n={n}" ) # Case 2: all p_bull = 0.0 and outcome = False p_bulls_all_bear = [0.0] * n outcomes_all_false = [False] * n brier_bear = compute_brier_score(p_bulls_all_bear, outcomes_all_false) assert brier_bear == 0.0, ( f"Brier score should be 0.0 for perfect bearish predictions, " f"got {brier_bear} with n={n}" ) # --------------------------------------------------------------------------- # Strategies for Information Coefficient tests # --------------------------------------------------------------------------- ic_score_strategy = st.floats( min_value=-100.0, max_value=100.0, allow_nan=False, allow_infinity=False ) # Generate lists of at least 30 (score, return) pairs ic_pairs_strategy = st.lists( st.tuples(ic_score_strategy, ic_score_strategy), min_size=30, max_size=100, ) # --------------------------------------------------------------------------- # Property 3: Information Coefficient Range and Perfect Correlation # Validates: Requirements 6.1, 6.2, 17.3 # --------------------------------------------------------------------------- @given(pairs=ic_pairs_strategy) @settings(max_examples=100) def test_information_coefficient_range(pairs: list[tuple[float, float]]) -> None: """**Validates: Requirements 6.1, 6.2, 17.3** For any list of (score, return) pairs with at least 30 elements where scores and returns are finite floats, the Information Coefficient (Pearson correlation) SHALL be in [-1.0, 1.0] or None (when variance is zero). """ scores = [s for s, _ in pairs] returns = [r for _, r in pairs] ic = compute_information_coefficient(scores, returns) # IC may be None if variance is zero in either list if ic is not None: assert -1.0 <= ic <= 1.0, ( f"IC {ic} is outside [-1.0, 1.0]. " f"scores={scores[:5]}..., returns={returns[:5]}..." ) @given( scores=st.lists( st.floats(min_value=-100.0, max_value=100.0, allow_nan=False, allow_infinity=False), min_size=30, max_size=100, ).filter(lambda xs: max(xs) - min(xs) > 1e-6), a=st.floats(min_value=0.01, max_value=100.0, allow_nan=False, allow_infinity=False), b=st.floats(min_value=-100.0, max_value=100.0, allow_nan=False, allow_infinity=False), ) @settings(max_examples=100) def test_information_coefficient_perfect_positive_correlation( scores: list[float], a: float, b: float ) -> None: """**Validates: Requirements 6.1, 6.2, 17.3** When scores and returns are perfectly positively linearly correlated (returns = a * scores + b, a > 0), IC SHALL be 1.0 within floating-point tolerance. """ returns = [a * s + b for s in scores] ic = compute_information_coefficient(scores, returns) assert ic is not None, ( f"IC should not be None for perfectly correlated data with variance. " f"a={a}, b={b}, scores={scores[:5]}..." ) assert abs(ic - 1.0) < 1e-6, ( f"IC should be 1.0 for perfectly positively correlated data, " f"got {ic}. a={a}, b={b}" ) # --------------------------------------------------------------------------- # Strategies for source reliability tests # --------------------------------------------------------------------------- from services.validation.calibration import compute_source_reliability observed_win_rate_strategy = st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ) sample_count_strategy = st.integers(min_value=0, max_value=100_000) # --------------------------------------------------------------------------- # Property 5: Source Reliability Bayesian Shrinkage Bounds and Convergence # Validates: Requirements 8.1, 8.2, 17.5 # --------------------------------------------------------------------------- @given( observed_win_rate=observed_win_rate_strategy, sample_count=sample_count_strategy, ) @settings(max_examples=100) def test_source_reliability_range(observed_win_rate: float, sample_count: int) -> None: """**Validates: Requirements 8.1, 8.2, 17.5** For any observed_win_rate in [0.0, 1.0] and sample_count >= 0, the source reliability computed via Bayesian shrinkage SHALL be in [0.0, 1.0]. """ reliability = compute_source_reliability(observed_win_rate, sample_count) assert 0.0 <= reliability <= 1.0, ( f"Reliability {reliability} is outside [0.0, 1.0]. " f"observed_win_rate={observed_win_rate}, sample_count={sample_count}" ) def test_source_reliability_zero_samples() -> None: """**Validates: Requirements 8.1, 8.2, 17.5** When sample_count = 0, reliability SHALL be exactly 0.5 (the prior mean). """ reliability = compute_source_reliability(observed_win_rate=0.8, sample_count=0) assert reliability == 0.5, ( f"Reliability should be 0.5 when sample_count=0, got {reliability}" ) # Also verify with different win rates for wr in [0.0, 0.25, 0.5, 0.75, 1.0]: r = compute_source_reliability(observed_win_rate=wr, sample_count=0) assert r == 0.5, ( f"Reliability should be 0.5 when sample_count=0 regardless of " f"observed_win_rate={wr}, got {r}" ) @given( observed_win_rate=st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ), ) @settings(max_examples=100) def test_source_reliability_convergence(observed_win_rate: float) -> None: """**Validates: Requirements 8.1, 8.2, 17.5** As sample_count increases toward infinity, reliability SHALL approach the observed_win_rate. For a large sample_count (e.g., 10000), reliability should be within 0.01 of observed_win_rate. """ reliability = compute_source_reliability(observed_win_rate, sample_count=10_000) assert abs(reliability - observed_win_rate) < 0.01, ( f"Reliability {reliability} should be within 0.01 of " f"observed_win_rate {observed_win_rate} when sample_count=10000. " f"Difference: {abs(reliability - observed_win_rate)}" ) # --------------------------------------------------------------------------- # Strategies for quality gate tests # --------------------------------------------------------------------------- from services.trading.model_quality_gate import ( QualityGateConfig, _evaluate_thresholds, ) # Snapshot dict strategy: generate each metric value in a reasonable range snapshot_strategy = st.fixed_dictionaries({ "prediction_count": st.integers(min_value=0, max_value=10_000), "information_coefficient": st.floats( min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False ), "win_rate": st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ), "calibration_error": st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ), "avg_excess_return_vs_spy": st.floats( min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False ), }) # Config strategy: generate each threshold in a reasonable range gate_config_strategy = st.builds( QualityGateConfig, min_prediction_count=st.integers(min_value=0, max_value=10_000), min_ic=st.floats( min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False ), min_win_rate=st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ), max_ece=st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ), min_excess_return_vs_spy=st.floats( min_value=-1.0, max_value=1.0, allow_nan=False, allow_infinity=False ), ) # --------------------------------------------------------------------------- # Property 6: Quality Gate Determinism and Threshold Monotonicity # Validates: Requirements 11.1, 17.6 # --------------------------------------------------------------------------- @given(snapshot=snapshot_strategy, config=gate_config_strategy) @settings(max_examples=100) def test_quality_gate_determinism( snapshot: dict, config: QualityGateConfig ) -> None: """**Validates: Requirements 11.1, 17.6** For any set of model metric values and quality gate configuration, calling _evaluate_thresholds twice with the same inputs SHALL produce the same pass/fail result for every threshold (determinism). """ results1 = _evaluate_thresholds(snapshot, config) results2 = _evaluate_thresholds(snapshot, config) assert len(results1) == len(results2), ( f"Different number of threshold results: {len(results1)} vs {len(results2)}" ) for r1, r2 in zip(results1, results2): assert r1.name == r2.name, ( f"Threshold name mismatch: {r1.name!r} vs {r2.name!r}" ) assert r1.threshold == r2.threshold, ( f"Threshold value mismatch for {r1.name}: " f"{r1.threshold} vs {r2.threshold}" ) assert r1.actual == r2.actual, ( f"Actual value mismatch for {r1.name}: " f"{r1.actual} vs {r2.actual}" ) assert r1.passed == r2.passed, ( f"Determinism violated for threshold {r1.name}: " f"first call passed={r1.passed}, second call passed={r2.passed}. " f"actual={r1.actual}, threshold={r1.threshold}" ) # Overall gate pass/fail should also be deterministic all_passed_1 = all(r.passed for r in results1) all_passed_2 = all(r.passed for r in results2) assert all_passed_1 == all_passed_2, ( f"Overall gate determinism violated: " f"first call passed={all_passed_1}, second call passed={all_passed_2}" ) @given( snapshot=snapshot_strategy, config=gate_config_strategy, relax_amount=st.floats( min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False ), threshold_to_relax=st.sampled_from([ "min_prediction_count", "min_ic", "min_win_rate", "max_ece", "min_excess_return_vs_spy", ]), ) @settings(max_examples=100) def test_quality_gate_threshold_monotonicity( snapshot: dict, config: QualityGateConfig, relax_amount: float, threshold_to_relax: str, ) -> None: """**Validates: Requirements 11.1, 17.6** For any configuration where the gate passes, relaxing any single threshold (decreasing min values or increasing max values to make them easier to satisfy) SHALL NOT cause the gate to fail (monotonicity). """ # Evaluate with original config original_results = _evaluate_thresholds(snapshot, config) original_passed = all(r.passed for r in original_results) # Only test monotonicity when the gate originally passes if not original_passed: return # Create a relaxed config by making one threshold easier to satisfy from dataclasses import replace if threshold_to_relax == "min_prediction_count": # Decrease min → easier to satisfy relaxed_value = max(0, config.min_prediction_count - int(relax_amount * 1000)) relaxed_config = replace(config, min_prediction_count=relaxed_value) elif threshold_to_relax == "min_ic": # Decrease min → easier to satisfy relaxed_config = replace(config, min_ic=config.min_ic - relax_amount) elif threshold_to_relax == "min_win_rate": # Decrease min → easier to satisfy relaxed_config = replace(config, min_win_rate=config.min_win_rate - relax_amount) elif threshold_to_relax == "max_ece": # Increase max → easier to satisfy relaxed_config = replace(config, max_ece=config.max_ece + relax_amount) elif threshold_to_relax == "min_excess_return_vs_spy": # Decrease min → easier to satisfy relaxed_config = replace( config, min_excess_return_vs_spy=config.min_excess_return_vs_spy - relax_amount, ) else: return # pragma: no cover # Evaluate with relaxed config relaxed_results = _evaluate_thresholds(snapshot, config=relaxed_config) relaxed_passed = all(r.passed for r in relaxed_results) assert relaxed_passed, ( f"Monotonicity violated: gate passed with original config but failed " f"after relaxing {threshold_to_relax}. " f"Original config: min_prediction_count={config.min_prediction_count}, " f"min_ic={config.min_ic}, min_win_rate={config.min_win_rate}, " f"max_ece={config.max_ece}, " f"min_excess_return_vs_spy={config.min_excess_return_vs_spy}. " f"Relaxed threshold: {threshold_to_relax} by {relax_amount}. " f"Failed thresholds: " f"{[(r.name, r.actual, r.threshold) for r in relaxed_results if not r.passed]}" )