"""Property-based tests for the pattern matcher module. Feature: competitive-historical-patterns Uses Hypothesis to validate correctness properties of the pattern matcher: pattern computation, confidence monotonicity, insufficient data threshold, valid-only data filtering, catalyst tier classification, and lookback windows. """ from __future__ import annotations import uuid from datetime import datetime, timedelta, timezone from typing import Any import pytest from hypothesis import assume, given, settings from hypothesis import strategies as st from services.aggregation.pattern_matcher import ( HistoricalPattern, _build_pattern, _lookback_days, classify_catalyst_tier, compute_pattern_confidence, ) from services.shared.config import CompetitiveConfig from services.shared.schemas import MAJOR_DECISION_CATALYSTS # --------------------------------------------------------------------------- # Hypothesis strategies # --------------------------------------------------------------------------- _ALL_MAJOR_CATALYSTS = sorted(MAJOR_DECISION_CATALYSTS) _ROUTINE_CATALYSTS = [ "earnings", "product_launch", "partnership", "analyst_upgrade", "analyst_downgrade", "guidance", "regulatory_approval", "patent", "market_expansion", "cost_cutting", "supply_chain", "hiring", ] _TREND_DIRECTIONS = ["bullish", "bearish", "neutral"] def _sample_count_strategy(min_val: int = 0, max_val: int = 50) -> st.SearchStrategy[int]: return st.integers(min_value=min_val, max_value=max_val) def _unit_float() -> st.SearchStrategy[float]: return st.floats(min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False) def _recency_days_strategy() -> st.SearchStrategy[float]: return st.floats(min_value=0.0, max_value=1000.0, allow_nan=False, allow_infinity=False) def _tier_strategy() -> st.SearchStrategy[str]: return st.sampled_from(["major_corporate_decision", "routine_signal"]) def _catalyst_type_strategy() -> st.SearchStrategy[str]: return st.sampled_from(_ALL_MAJOR_CATALYSTS + _ROUTINE_CATALYSTS) class _FakeRecord: """Minimal dict-like object mimicking asyncpg.Record for _build_pattern.""" def __init__(self, data: dict[str, Any]) -> None: self._data = data def __getitem__(self, key: str) -> Any: return self._data[key] def _fake_row_strategy( base_time: datetime | None = None, ) -> st.SearchStrategy[_FakeRecord]: """Generate a fake DB row compatible with _build_pattern.""" if base_time is None: base_time = datetime.now(timezone.utc) return st.fixed_dictionaries({ "dir_id": st.uuids().map(str), "published_at": st.integers(min_value=0, max_value=180).map( lambda d: base_time - timedelta(days=d) ), "sentiment": st.sampled_from(["positive", "negative", "neutral"]), "trend_direction": st.sampled_from(_TREND_DIRECTIONS), "trend_strength": _unit_float(), "generated_at": st.integers(min_value=0, max_value=30).map( lambda d: base_time - timedelta(days=d) ), "tw_window": st.sampled_from(["1d", "7d", "30d"]), }).map(_FakeRecord) # --------------------------------------------------------------------------- # Property 7: Pattern computation correctness # --------------------------------------------------------------------------- class TestProperty7PatternComputationCorrectness: """Feature: competitive-historical-patterns, Property 7: Pattern computation correctness For any set of historical records, the computed HistoricalPattern SHALL have: sample_count equal to the actual number of matching records, bullish_pct + bearish_pct + neutral_pct ≈ 1.0, avg_strength equal to the mean of the matched trend strengths, and all fields within their valid ranges. **Validates: Requirements 3.1, 3.2, 4.2** """ @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_sample_count_matches_unique_rows( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** sample_count must equal the number of unique dir_id values in the input rows. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None # Count unique dir_ids the same way _build_pattern does seen: set[str] = set() for r in rows: rid = str(r["dir_id"]) if rid not in seen: seen.add(rid) expected_count = len(seen) assert pattern.sample_count == expected_count @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_outcome_percentages_sum_to_one( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** bullish_pct + bearish_pct + neutral_pct must approximately equal 1.0. neutral_pct is implicitly 1 - bullish_pct - bearish_pct. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None neutral_pct = 1.0 - pattern.bullish_pct - pattern.bearish_pct total = pattern.bullish_pct + pattern.bearish_pct + neutral_pct assert abs(total - 1.0) < 1e-9, f"Outcome percentages sum to {total}, expected ~1.0" @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_avg_strength_equals_mean_of_trend_strengths( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** avg_strength must equal the mean of trend_strength values from unique rows, clamped to [0, 1]. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None # Replicate the unique-row logic seen: set[str] = set() unique_rows: list[_FakeRecord] = [] for r in rows: rid = str(r["dir_id"]) if rid not in seen: seen.add(rid) unique_rows.append(r) strengths = [ float(r["trend_strength"]) for r in unique_rows if r["trend_strength"] is not None ] expected = sum(strengths) / len(strengths) if strengths else 0.0 expected = min(max(expected, 0.0), 1.0) assert abs(pattern.avg_strength - expected) < 1e-9, ( f"avg_strength {pattern.avg_strength} != expected {expected}" ) @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_all_fields_within_valid_ranges( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** All numeric fields must be within their documented valid ranges. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count >= 1 assert 0.0 <= pattern.bullish_pct <= 1.0 assert 0.0 <= pattern.bearish_pct <= 1.0 assert 0.0 <= pattern.avg_strength <= 1.0 assert 0.0 <= pattern.pattern_confidence <= 1.0 assert pattern.avg_time_to_resolution >= 0.0 assert pattern.data_start is not None assert pattern.data_end is not None assert pattern.tier in ("major_corporate_decision", "routine_signal") # --------------------------------------------------------------------------- # Property 8: Pattern confidence monotonicity # --------------------------------------------------------------------------- class TestProperty8PatternConfidenceMonotonicity: """Feature: competitive-historical-patterns, Property 8: Pattern confidence monotonicity For any two HistoricalPatterns where one has strictly more samples, more consistent outcomes, and more recent data than the other (all else equal), the first SHALL have a higher or equal pattern_confidence. Additionally, for any two patterns with identical statistics but different tiers, the major_corporate_decision pattern SHALL have higher confidence than the routine_signal pattern. **Validates: Requirements 3.3, 11.2** """ @given( low_samples=st.integers(min_value=1, max_value=9), high_samples=st.integers(min_value=10, max_value=40), consistency=_unit_float(), recency=_recency_days_strategy(), tier=_tier_strategy(), ) @settings(max_examples=100) def test_more_samples_yields_higher_or_equal_confidence( self, low_samples: int, high_samples: int, consistency: float, recency: float, tier: str, ): """**Validates: Requirements 3.3, 11.2** With more samples (all else equal), confidence must be >= the lower-sample confidence. """ assume(high_samples > low_samples) low_conf = compute_pattern_confidence(low_samples, consistency, recency, tier) high_conf = compute_pattern_confidence(high_samples, consistency, recency, tier) assert high_conf >= low_conf - 1e-9, ( f"More samples ({high_samples}) yielded lower confidence " f"{high_conf} < {low_conf} (samples={low_samples})" ) @given( samples=st.integers(min_value=3, max_value=40), low_consistency=st.floats(min_value=0.0, max_value=0.4, allow_nan=False, allow_infinity=False), high_consistency=st.floats(min_value=0.5, max_value=1.0, allow_nan=False, allow_infinity=False), recency=_recency_days_strategy(), tier=_tier_strategy(), ) @settings(max_examples=100) def test_more_consistent_outcomes_yield_higher_or_equal_confidence( self, samples: int, low_consistency: float, high_consistency: float, recency: float, tier: str, ): """**Validates: Requirements 3.3, 11.2** With more consistent outcomes (all else equal), confidence must be >= the less-consistent confidence. """ assume(high_consistency > low_consistency) low_conf = compute_pattern_confidence(samples, low_consistency, recency, tier) high_conf = compute_pattern_confidence(samples, high_consistency, recency, tier) assert high_conf >= low_conf - 1e-9, ( f"Higher consistency ({high_consistency}) yielded lower confidence " f"{high_conf} < {low_conf} (consistency={low_consistency})" ) @given( samples=st.integers(min_value=3, max_value=40), consistency=_unit_float(), ) @settings(max_examples=100) def test_more_recent_data_yields_higher_or_equal_confidence( self, samples: int, consistency: float, ): """**Validates: Requirements 3.3, 11.2** With more recent data (lower recency_days), confidence must be >= the stale-data confidence. """ tier = "routine_signal" recent_conf = compute_pattern_confidence(samples, consistency, 30.0, tier) stale_conf = compute_pattern_confidence(samples, consistency, 300.0, tier) assert recent_conf >= stale_conf - 1e-9, ( f"Recent data (30d) yielded lower confidence {recent_conf} " f"< stale data (300d) {stale_conf}" ) @given( samples=st.integers(min_value=3, max_value=40), consistency=_unit_float(), recency=st.floats(min_value=0.0, max_value=89.0, allow_nan=False, allow_infinity=False), ) @settings(max_examples=100) def test_major_decision_has_higher_confidence_than_routine( self, samples: int, consistency: float, recency: float, ): """**Validates: Requirements 3.3, 11.2** With identical statistics, major_corporate_decision tier must have higher confidence than routine_signal tier. """ major_conf = compute_pattern_confidence( samples, consistency, recency, "major_corporate_decision", ) routine_conf = compute_pattern_confidence( samples, consistency, recency, "routine_signal", ) assert major_conf >= routine_conf - 1e-9, ( f"Major decision confidence {major_conf} < routine {routine_conf}" ) # --------------------------------------------------------------------------- # Property 9: Insufficient data threshold # --------------------------------------------------------------------------- class TestProperty9InsufficientDataThreshold: """Feature: competitive-historical-patterns, Property 9: Insufficient data threshold For any HistoricalPattern with sample_count < 3, the pattern_confidence SHALL be below 0.3 and insufficient_data SHALL be True. **Validates: Requirements 3.4** """ @given( sample_count=st.integers(min_value=1, max_value=2), consistency=_unit_float(), recency=_recency_days_strategy(), tier=_tier_strategy(), ) @settings(max_examples=100) def test_low_sample_count_caps_confidence_below_threshold( self, sample_count: int, consistency: float, recency: float, tier: str, ): """**Validates: Requirements 3.4** When sample_count < 3 (min_pattern_samples), confidence must be capped below 0.3 (specifically at 0.25 per the implementation). """ cfg = CompetitiveConfig() confidence = compute_pattern_confidence( sample_count, consistency, recency, tier, cfg, ) assert confidence < 0.3, ( f"Confidence {confidence} >= 0.3 with only {sample_count} samples" ) # The cap is specifically 0.25 assert confidence <= 0.25 + 1e-9, ( f"Confidence {confidence} > 0.25 cap with {sample_count} samples" ) @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=2), tier=_tier_strategy(), ) @settings(max_examples=100) def test_build_pattern_sets_insufficient_data_flag( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.4** When _build_pattern receives fewer than 3 unique rows, the resulting pattern must have insufficient_data = True and pattern_confidence < 0.3. """ # Ensure unique dir_ids so we get exactly len(rows) samples for i, r in enumerate(rows): r._data["dir_id"] = str(uuid.uuid4()) pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count < 3 assert pattern.insufficient_data is True assert pattern.pattern_confidence < 0.3, ( f"Confidence {pattern.pattern_confidence} >= 0.3 with " f"{pattern.sample_count} samples" ) # --------------------------------------------------------------------------- # Property 10: Valid-only data filtering # --------------------------------------------------------------------------- class TestProperty10ValidOnlyDataFiltering: """Feature: competitive-historical-patterns, Property 10: Valid-only data filtering For any set of document_impact_records containing records linked to invalid intelligence (validation_status != 'valid') or rejected documents (status = 'rejected'), the Pattern_Matcher SHALL exclude those records from pattern computation — the resulting sample_count SHALL only reflect valid, non-rejected records. NOTE: This tests the _build_pattern function conceptually. Since we can't run real SQL, we verify that _build_pattern correctly counts only the rows it receives (the SQL already filters). **Validates: Requirements 3.5** """ @given( valid_count=st.integers(min_value=1, max_value=15), tier=_tier_strategy(), ) @settings(max_examples=100) def test_build_pattern_counts_only_provided_rows( self, valid_count: int, tier: str, ): """**Validates: Requirements 3.5** _build_pattern must count exactly the unique rows it receives. The SQL query pre-filters to valid/non-rejected records, so _build_pattern should faithfully reflect that filtered set. """ now = datetime.now(timezone.utc) rows: list[_FakeRecord] = [] for _ in range(valid_count): rows.append(_FakeRecord({ "dir_id": str(uuid.uuid4()), "published_at": now - timedelta(days=10), "sentiment": "positive", "trend_direction": "bullish", "trend_strength": 0.7, "generated_at": now - timedelta(days=9), "tw_window": "7d", })) pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count == valid_count, ( f"Expected sample_count={valid_count}, got {pattern.sample_count}" ) @given(tier=_tier_strategy()) @settings(max_examples=100) def test_empty_rows_returns_none(self, tier: str): """**Validates: Requirements 3.5** When all records are filtered out (empty input), _build_pattern returns None — no pattern is produced. """ pattern = _build_pattern( [], "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is None @given( valid_count=st.integers(min_value=1, max_value=10), extra_dupes=st.integers(min_value=1, max_value=5), tier=_tier_strategy(), ) @settings(max_examples=100) def test_duplicate_dir_ids_are_deduplicated( self, valid_count: int, extra_dupes: int, tier: str, ): """**Validates: Requirements 3.5** _build_pattern deduplicates rows by dir_id, so duplicate entries for the same document impact record are counted only once. """ now = datetime.now(timezone.utc) rows: list[_FakeRecord] = [] unique_ids: list[str] = [] for _ in range(valid_count): did = str(uuid.uuid4()) unique_ids.append(did) rows.append(_FakeRecord({ "dir_id": did, "published_at": now - timedelta(days=10), "sentiment": "positive", "trend_direction": "bullish", "trend_strength": 0.6, "generated_at": now - timedelta(days=9), "tw_window": "7d", })) # Add duplicates of the first row for _ in range(extra_dupes): rows.append(_FakeRecord({ "dir_id": unique_ids[0], "published_at": now - timedelta(days=10), "sentiment": "positive", "trend_direction": "bullish", "trend_strength": 0.6, "generated_at": now - timedelta(days=9), "tw_window": "7d", })) pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count == valid_count, ( f"Expected {valid_count} unique samples, got {pattern.sample_count} " f"(input had {len(rows)} rows including {extra_dupes} dupes)" ) # --------------------------------------------------------------------------- # Property 19: Catalyst tier classification determinism # --------------------------------------------------------------------------- class TestProperty19CatalystTierClassificationDeterminism: """Feature: competitive-historical-patterns, Property 19: Catalyst tier classification determinism For any catalyst type, the tier classification SHALL be deterministic: m_and_a, legal, restructuring, leadership_change, strategic_pivot, buyback, and dividend_change SHALL always map to major_corporate_decision; all other catalyst types SHALL map to routine_signal. **Validates: Requirements 11.1** """ @given(catalyst=st.sampled_from(_ALL_MAJOR_CATALYSTS)) @settings(max_examples=100) def test_major_catalysts_always_map_to_major_corporate_decision( self, catalyst: str, ): """**Validates: Requirements 11.1** Every catalyst in MAJOR_DECISION_CATALYSTS must classify as major_corporate_decision, deterministically. """ result = classify_catalyst_tier(catalyst) assert result == "major_corporate_decision", ( f"Catalyst '{catalyst}' classified as '{result}', " f"expected 'major_corporate_decision'" ) # Determinism: calling again must produce the same result assert classify_catalyst_tier(catalyst) == result @given(catalyst=st.sampled_from(_ROUTINE_CATALYSTS)) @settings(max_examples=100) def test_routine_catalysts_always_map_to_routine_signal( self, catalyst: str, ): """**Validates: Requirements 11.1** Any catalyst NOT in MAJOR_DECISION_CATALYSTS must classify as routine_signal, deterministically. """ result = classify_catalyst_tier(catalyst) assert result == "routine_signal", ( f"Catalyst '{catalyst}' classified as '{result}', " f"expected 'routine_signal'" ) # Determinism: calling again must produce the same result assert classify_catalyst_tier(catalyst) == result @given( catalyst=st.text( alphabet=st.characters(whitelist_categories=("L", "N", "P")), min_size=1, max_size=30, ), ) @settings(max_examples=100) def test_arbitrary_strings_classify_deterministically( self, catalyst: str, ): """**Validates: Requirements 11.1** For any arbitrary string, classification is deterministic and returns one of the two valid tiers. """ result1 = classify_catalyst_tier(catalyst) result2 = classify_catalyst_tier(catalyst) assert result1 == result2, "Classification is not deterministic" assert result1 in ("major_corporate_decision", "routine_signal") if catalyst in MAJOR_DECISION_CATALYSTS: assert result1 == "major_corporate_decision" else: assert result1 == "routine_signal" # --------------------------------------------------------------------------- # Property 20: Major decision extended lookback # --------------------------------------------------------------------------- class TestProperty20MajorDecisionExtendedLookback: """Feature: competitive-historical-patterns, Property 20: Major decision extended lookback For any pattern mining query for a major_corporate_decision catalyst type, the lookback window SHALL be 365 days. For any routine_signal catalyst type, the lookback window SHALL be 180 days. **Validates: Requirements 11.3, 11.5** """ @given(catalyst=st.sampled_from(_ALL_MAJOR_CATALYSTS)) @settings(max_examples=100) def test_major_decision_lookback_is_365_days(self, catalyst: str): """**Validates: Requirements 11.3, 11.5** Major corporate decision catalysts must use a 365-day lookback. """ tier = classify_catalyst_tier(catalyst) assert tier == "major_corporate_decision" lookback = _lookback_days(tier) assert lookback == 365, ( f"Major decision lookback is {lookback}, expected 365" ) @given(catalyst=st.sampled_from(_ROUTINE_CATALYSTS)) @settings(max_examples=100) def test_routine_signal_lookback_is_180_days(self, catalyst: str): """**Validates: Requirements 11.3, 11.5** Routine signal catalysts must use a 180-day lookback. """ tier = classify_catalyst_tier(catalyst) assert tier == "routine_signal" lookback = _lookback_days(tier) assert lookback == 180, ( f"Routine signal lookback is {lookback}, expected 180" ) @given(catalyst=_catalyst_type_strategy()) @settings(max_examples=100) def test_lookback_matches_tier_for_any_catalyst(self, catalyst: str): """**Validates: Requirements 11.3, 11.5** For any catalyst type, the lookback window must match the tier: 365 for major_corporate_decision, 180 for routine_signal. """ tier = classify_catalyst_tier(catalyst) lookback = _lookback_days(tier) if tier == "major_corporate_decision": assert lookback == 365 else: assert lookback == 180 @given( major_catalyst=st.sampled_from(_ALL_MAJOR_CATALYSTS), routine_catalyst=st.sampled_from(_ROUTINE_CATALYSTS), ) @settings(max_examples=100) def test_major_lookback_strictly_greater_than_routine( self, major_catalyst: str, routine_catalyst: str, ): """**Validates: Requirements 11.3, 11.5** The major decision lookback window must always be strictly greater than the routine signal lookback window. """ major_tier = classify_catalyst_tier(major_catalyst) routine_tier = classify_catalyst_tier(routine_catalyst) major_lookback = _lookback_days(major_tier) routine_lookback = _lookback_days(routine_tier) assert major_lookback > routine_lookback, ( f"Major lookback {major_lookback} not > routine {routine_lookback}" )