"""Property-based tests for the pattern matcher module. Feature: competitive-historical-patterns Uses Hypothesis to validate correctness properties of the pattern matcher: pattern computation, confidence monotonicity, insufficient data threshold, valid-only data filtering, catalyst tier classification, and lookback windows. """ from __future__ import annotations import uuid from datetime import datetime, timedelta, timezone from typing import Any from hypothesis import assume, given, settings from hypothesis import strategies as st from services.aggregation.pattern_matcher import ( _build_pattern, _lookback_days, classify_catalyst_tier, compute_pattern_confidence, ) from services.shared.config import CompetitiveConfig from services.shared.schemas import MAJOR_DECISION_CATALYSTS # --------------------------------------------------------------------------- # Hypothesis strategies # --------------------------------------------------------------------------- _ALL_MAJOR_CATALYSTS = sorted(MAJOR_DECISION_CATALYSTS) _ROUTINE_CATALYSTS = [ "earnings", "product_launch", "partnership", "analyst_upgrade", "analyst_downgrade", "guidance", "regulatory_approval", "patent", "market_expansion", "cost_cutting", "supply_chain", "hiring", ] _TREND_DIRECTIONS = ["bullish", "bearish", "neutral"] def _sample_count_strategy(min_val: int = 0, max_val: int = 50) -> st.SearchStrategy[int]: return st.integers(min_value=min_val, max_value=max_val) def _unit_float() -> st.SearchStrategy[float]: return st.floats(min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False) def _recency_days_strategy() -> st.SearchStrategy[float]: return st.floats(min_value=0.0, max_value=1000.0, allow_nan=False, allow_infinity=False) def _tier_strategy() -> st.SearchStrategy[str]: return st.sampled_from(["major_corporate_decision", "routine_signal"]) def _catalyst_type_strategy() -> st.SearchStrategy[str]: return st.sampled_from(_ALL_MAJOR_CATALYSTS + _ROUTINE_CATALYSTS) class _FakeRecord: """Minimal dict-like object mimicking asyncpg.Record for _build_pattern.""" def __init__(self, data: dict[str, Any]) -> None: self._data = data def __getitem__(self, key: str) -> Any: return self._data[key] def _fake_row_strategy( base_time: datetime | None = None, ) -> st.SearchStrategy[_FakeRecord]: """Generate a fake DB row compatible with _build_pattern.""" if base_time is None: base_time = datetime.now(timezone.utc) return st.fixed_dictionaries({ "dir_id": st.uuids().map(str), "published_at": st.integers(min_value=0, max_value=180).map( lambda d: base_time - timedelta(days=d) ), "sentiment": st.sampled_from(["positive", "negative", "neutral"]), "trend_direction": st.sampled_from(_TREND_DIRECTIONS), "trend_strength": _unit_float(), "generated_at": st.integers(min_value=0, max_value=30).map( lambda d: base_time - timedelta(days=d) ), "tw_window": st.sampled_from(["1d", "7d", "30d"]), }).map(_FakeRecord) # --------------------------------------------------------------------------- # Property 7: Pattern computation correctness # --------------------------------------------------------------------------- class TestProperty7PatternComputationCorrectness: """Feature: competitive-historical-patterns, Property 7: Pattern computation correctness For any set of historical records, the computed HistoricalPattern SHALL have: sample_count equal to the actual number of matching records, bullish_pct + bearish_pct + neutral_pct ≈ 1.0, avg_strength equal to the mean of the matched trend strengths, and all fields within their valid ranges. **Validates: Requirements 3.1, 3.2, 4.2** """ @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_sample_count_matches_unique_rows( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** sample_count must equal the number of unique dir_id values in the input rows. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None # Count unique dir_ids the same way _build_pattern does seen: set[str] = set() for r in rows: rid = str(r["dir_id"]) if rid not in seen: seen.add(rid) expected_count = len(seen) assert pattern.sample_count == expected_count @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_outcome_percentages_sum_to_one( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** bullish_pct + bearish_pct + neutral_pct must approximately equal 1.0. neutral_pct is implicitly 1 - bullish_pct - bearish_pct. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None neutral_pct = 1.0 - pattern.bullish_pct - pattern.bearish_pct total = pattern.bullish_pct + pattern.bearish_pct + neutral_pct assert abs(total - 1.0) < 1e-9, f"Outcome percentages sum to {total}, expected ~1.0" @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_avg_strength_equals_mean_of_trend_strengths( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** avg_strength must equal the mean of trend_strength values from unique rows, clamped to [0, 1]. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None # Replicate the unique-row logic seen: set[str] = set() unique_rows: list[_FakeRecord] = [] for r in rows: rid = str(r["dir_id"]) if rid not in seen: seen.add(rid) unique_rows.append(r) strengths = [ float(r["trend_strength"]) for r in unique_rows if r["trend_strength"] is not None ] expected = sum(strengths) / len(strengths) if strengths else 0.0 expected = min(max(expected, 0.0), 1.0) assert abs(pattern.avg_strength - expected) < 1e-9, ( f"avg_strength {pattern.avg_strength} != expected {expected}" ) @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=30), tier=_tier_strategy(), ) @settings(max_examples=100) def test_all_fields_within_valid_ranges( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.1, 3.2, 4.2** All numeric fields must be within their documented valid ranges. """ pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count >= 1 assert 0.0 <= pattern.bullish_pct <= 1.0 assert 0.0 <= pattern.bearish_pct <= 1.0 assert 0.0 <= pattern.avg_strength <= 1.0 assert 0.0 <= pattern.pattern_confidence <= 1.0 assert pattern.avg_time_to_resolution >= 0.0 assert pattern.data_start is not None assert pattern.data_end is not None assert pattern.tier in ("major_corporate_decision", "routine_signal") # --------------------------------------------------------------------------- # Property 8: Pattern confidence monotonicity # --------------------------------------------------------------------------- class TestProperty8PatternConfidenceMonotonicity: """Feature: competitive-historical-patterns, Property 8: Pattern confidence monotonicity For any two HistoricalPatterns where one has strictly more samples, more consistent outcomes, and more recent data than the other (all else equal), the first SHALL have a higher or equal pattern_confidence. Additionally, for any two patterns with identical statistics but different tiers, the major_corporate_decision pattern SHALL have higher confidence than the routine_signal pattern. **Validates: Requirements 3.3, 11.2** """ @given( low_samples=st.integers(min_value=1, max_value=9), high_samples=st.integers(min_value=10, max_value=40), consistency=_unit_float(), recency=_recency_days_strategy(), tier=_tier_strategy(), ) @settings(max_examples=100) def test_more_samples_yields_higher_or_equal_confidence( self, low_samples: int, high_samples: int, consistency: float, recency: float, tier: str, ): """**Validates: Requirements 3.3, 11.2** With more samples (all else equal), confidence must be >= the lower-sample confidence. """ assume(high_samples > low_samples) low_conf = compute_pattern_confidence(low_samples, consistency, recency, tier) high_conf = compute_pattern_confidence(high_samples, consistency, recency, tier) assert high_conf >= low_conf - 1e-9, ( f"More samples ({high_samples}) yielded lower confidence " f"{high_conf} < {low_conf} (samples={low_samples})" ) @given( samples=st.integers(min_value=3, max_value=40), low_consistency=st.floats(min_value=0.0, max_value=0.4, allow_nan=False, allow_infinity=False), high_consistency=st.floats(min_value=0.5, max_value=1.0, allow_nan=False, allow_infinity=False), recency=_recency_days_strategy(), tier=_tier_strategy(), ) @settings(max_examples=100) def test_more_consistent_outcomes_yield_higher_or_equal_confidence( self, samples: int, low_consistency: float, high_consistency: float, recency: float, tier: str, ): """**Validates: Requirements 3.3, 11.2** With more consistent outcomes (all else equal), confidence must be >= the less-consistent confidence. """ assume(high_consistency > low_consistency) low_conf = compute_pattern_confidence(samples, low_consistency, recency, tier) high_conf = compute_pattern_confidence(samples, high_consistency, recency, tier) assert high_conf >= low_conf - 1e-9, ( f"Higher consistency ({high_consistency}) yielded lower confidence " f"{high_conf} < {low_conf} (consistency={low_consistency})" ) @given( samples=st.integers(min_value=3, max_value=40), consistency=_unit_float(), ) @settings(max_examples=100) def test_more_recent_data_yields_higher_or_equal_confidence( self, samples: int, consistency: float, ): """**Validates: Requirements 3.3, 11.2** With more recent data (lower recency_days), confidence must be >= the stale-data confidence. """ tier = "routine_signal" recent_conf = compute_pattern_confidence(samples, consistency, 30.0, tier) stale_conf = compute_pattern_confidence(samples, consistency, 300.0, tier) assert recent_conf >= stale_conf - 1e-9, ( f"Recent data (30d) yielded lower confidence {recent_conf} " f"< stale data (300d) {stale_conf}" ) @given( samples=st.integers(min_value=3, max_value=40), consistency=_unit_float(), recency=st.floats(min_value=0.0, max_value=89.0, allow_nan=False, allow_infinity=False), ) @settings(max_examples=100) def test_major_decision_has_higher_confidence_than_routine( self, samples: int, consistency: float, recency: float, ): """**Validates: Requirements 3.3, 11.2** With identical statistics, major_corporate_decision tier must have higher confidence than routine_signal tier. """ major_conf = compute_pattern_confidence( samples, consistency, recency, "major_corporate_decision", ) routine_conf = compute_pattern_confidence( samples, consistency, recency, "routine_signal", ) assert major_conf >= routine_conf - 1e-9, ( f"Major decision confidence {major_conf} < routine {routine_conf}" ) # --------------------------------------------------------------------------- # Property 9: Insufficient data threshold # --------------------------------------------------------------------------- class TestProperty9InsufficientDataThreshold: """Feature: competitive-historical-patterns, Property 9: Insufficient data threshold For any HistoricalPattern with sample_count < 3, the pattern_confidence SHALL be below 0.3 and insufficient_data SHALL be True. **Validates: Requirements 3.4** """ @given( sample_count=st.integers(min_value=1, max_value=2), consistency=_unit_float(), recency=_recency_days_strategy(), tier=_tier_strategy(), ) @settings(max_examples=100) def test_low_sample_count_caps_confidence_below_threshold( self, sample_count: int, consistency: float, recency: float, tier: str, ): """**Validates: Requirements 3.4** When sample_count < 3 (min_pattern_samples), confidence must be capped below 0.3 (specifically at 0.25 per the implementation). """ cfg = CompetitiveConfig() confidence = compute_pattern_confidence( sample_count, consistency, recency, tier, cfg, ) assert confidence < 0.3, ( f"Confidence {confidence} >= 0.3 with only {sample_count} samples" ) # The cap is specifically 0.25 assert confidence <= 0.25 + 1e-9, ( f"Confidence {confidence} > 0.25 cap with {sample_count} samples" ) @given( rows=st.lists(_fake_row_strategy(), min_size=1, max_size=2), tier=_tier_strategy(), ) @settings(max_examples=100) def test_build_pattern_sets_insufficient_data_flag( self, rows: list[_FakeRecord], tier: str, ): """**Validates: Requirements 3.4** When _build_pattern receives fewer than 3 unique rows, the resulting pattern must have insufficient_data = True and pattern_confidence < 0.3. """ # Ensure unique dir_ids so we get exactly len(rows) samples for i, r in enumerate(rows): r._data["dir_id"] = str(uuid.uuid4()) pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count < 3 assert pattern.insufficient_data is True assert pattern.pattern_confidence < 0.3, ( f"Confidence {pattern.pattern_confidence} >= 0.3 with " f"{pattern.sample_count} samples" ) # --------------------------------------------------------------------------- # Property 10: Valid-only data filtering # --------------------------------------------------------------------------- class TestProperty10ValidOnlyDataFiltering: """Feature: competitive-historical-patterns, Property 10: Valid-only data filtering For any set of document_impact_records containing records linked to invalid intelligence (validation_status != 'valid') or rejected documents (status = 'rejected'), the Pattern_Matcher SHALL exclude those records from pattern computation — the resulting sample_count SHALL only reflect valid, non-rejected records. NOTE: This tests the _build_pattern function conceptually. Since we can't run real SQL, we verify that _build_pattern correctly counts only the rows it receives (the SQL already filters). **Validates: Requirements 3.5** """ @given( valid_count=st.integers(min_value=1, max_value=15), tier=_tier_strategy(), ) @settings(max_examples=100) def test_build_pattern_counts_only_provided_rows( self, valid_count: int, tier: str, ): """**Validates: Requirements 3.5** _build_pattern must count exactly the unique rows it receives. The SQL query pre-filters to valid/non-rejected records, so _build_pattern should faithfully reflect that filtered set. """ now = datetime.now(timezone.utc) rows: list[_FakeRecord] = [] for _ in range(valid_count): rows.append(_FakeRecord({ "dir_id": str(uuid.uuid4()), "published_at": now - timedelta(days=10), "sentiment": "positive", "trend_direction": "bullish", "trend_strength": 0.7, "generated_at": now - timedelta(days=9), "tw_window": "7d", })) pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count == valid_count, ( f"Expected sample_count={valid_count}, got {pattern.sample_count}" ) @given(tier=_tier_strategy()) @settings(max_examples=100) def test_empty_rows_returns_none(self, tier: str): """**Validates: Requirements 3.5** When all records are filtered out (empty input), _build_pattern returns None — no pattern is produced. """ pattern = _build_pattern( [], "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is None @given( valid_count=st.integers(min_value=1, max_value=10), extra_dupes=st.integers(min_value=1, max_value=5), tier=_tier_strategy(), ) @settings(max_examples=100) def test_duplicate_dir_ids_are_deduplicated( self, valid_count: int, extra_dupes: int, tier: str, ): """**Validates: Requirements 3.5** _build_pattern deduplicates rows by dir_id, so duplicate entries for the same document impact record are counted only once. """ now = datetime.now(timezone.utc) rows: list[_FakeRecord] = [] unique_ids: list[str] = [] for _ in range(valid_count): did = str(uuid.uuid4()) unique_ids.append(did) rows.append(_FakeRecord({ "dir_id": did, "published_at": now - timedelta(days=10), "sentiment": "positive", "trend_direction": "bullish", "trend_strength": 0.6, "generated_at": now - timedelta(days=9), "tw_window": "7d", })) # Add duplicates of the first row for _ in range(extra_dupes): rows.append(_FakeRecord({ "dir_id": unique_ids[0], "published_at": now - timedelta(days=10), "sentiment": "positive", "trend_direction": "bullish", "trend_strength": 0.6, "generated_at": now - timedelta(days=9), "tw_window": "7d", })) pattern = _build_pattern( rows, "SRC", "TGT", "earnings", "7d", tier, ) assert pattern is not None assert pattern.sample_count == valid_count, ( f"Expected {valid_count} unique samples, got {pattern.sample_count} " f"(input had {len(rows)} rows including {extra_dupes} dupes)" ) # --------------------------------------------------------------------------- # Property 19: Catalyst tier classification determinism # --------------------------------------------------------------------------- class TestProperty19CatalystTierClassificationDeterminism: """Feature: competitive-historical-patterns, Property 19: Catalyst tier classification determinism For any catalyst type, the tier classification SHALL be deterministic: m_and_a, legal, restructuring, leadership_change, strategic_pivot, buyback, and dividend_change SHALL always map to major_corporate_decision; all other catalyst types SHALL map to routine_signal. **Validates: Requirements 11.1** """ @given(catalyst=st.sampled_from(_ALL_MAJOR_CATALYSTS)) @settings(max_examples=100) def test_major_catalysts_always_map_to_major_corporate_decision( self, catalyst: str, ): """**Validates: Requirements 11.1** Every catalyst in MAJOR_DECISION_CATALYSTS must classify as major_corporate_decision, deterministically. """ result = classify_catalyst_tier(catalyst) assert result == "major_corporate_decision", ( f"Catalyst '{catalyst}' classified as '{result}', " f"expected 'major_corporate_decision'" ) # Determinism: calling again must produce the same result assert classify_catalyst_tier(catalyst) == result @given(catalyst=st.sampled_from(_ROUTINE_CATALYSTS)) @settings(max_examples=100) def test_routine_catalysts_always_map_to_routine_signal( self, catalyst: str, ): """**Validates: Requirements 11.1** Any catalyst NOT in MAJOR_DECISION_CATALYSTS must classify as routine_signal, deterministically. """ result = classify_catalyst_tier(catalyst) assert result == "routine_signal", ( f"Catalyst '{catalyst}' classified as '{result}', " f"expected 'routine_signal'" ) # Determinism: calling again must produce the same result assert classify_catalyst_tier(catalyst) == result @given( catalyst=st.text( alphabet=st.characters(whitelist_categories=("L", "N", "P")), min_size=1, max_size=30, ), ) @settings(max_examples=100) def test_arbitrary_strings_classify_deterministically( self, catalyst: str, ): """**Validates: Requirements 11.1** For any arbitrary string, classification is deterministic and returns one of the two valid tiers. """ result1 = classify_catalyst_tier(catalyst) result2 = classify_catalyst_tier(catalyst) assert result1 == result2, "Classification is not deterministic" assert result1 in ("major_corporate_decision", "routine_signal") if catalyst in MAJOR_DECISION_CATALYSTS: assert result1 == "major_corporate_decision" else: assert result1 == "routine_signal" # --------------------------------------------------------------------------- # Property 20: Major decision extended lookback # --------------------------------------------------------------------------- class TestProperty20MajorDecisionExtendedLookback: """Feature: competitive-historical-patterns, Property 20: Major decision extended lookback For any pattern mining query for a major_corporate_decision catalyst type, the lookback window SHALL be 365 days. For any routine_signal catalyst type, the lookback window SHALL be 180 days. **Validates: Requirements 11.3, 11.5** """ @given(catalyst=st.sampled_from(_ALL_MAJOR_CATALYSTS)) @settings(max_examples=100) def test_major_decision_lookback_is_365_days(self, catalyst: str): """**Validates: Requirements 11.3, 11.5** Major corporate decision catalysts must use a 365-day lookback. """ tier = classify_catalyst_tier(catalyst) assert tier == "major_corporate_decision" lookback = _lookback_days(tier) assert lookback == 365, ( f"Major decision lookback is {lookback}, expected 365" ) @given(catalyst=st.sampled_from(_ROUTINE_CATALYSTS)) @settings(max_examples=100) def test_routine_signal_lookback_is_180_days(self, catalyst: str): """**Validates: Requirements 11.3, 11.5** Routine signal catalysts must use a 180-day lookback. """ tier = classify_catalyst_tier(catalyst) assert tier == "routine_signal" lookback = _lookback_days(tier) assert lookback == 180, ( f"Routine signal lookback is {lookback}, expected 180" ) @given(catalyst=_catalyst_type_strategy()) @settings(max_examples=100) def test_lookback_matches_tier_for_any_catalyst(self, catalyst: str): """**Validates: Requirements 11.3, 11.5** For any catalyst type, the lookback window must match the tier: 365 for major_corporate_decision, 180 for routine_signal. """ tier = classify_catalyst_tier(catalyst) lookback = _lookback_days(tier) if tier == "major_corporate_decision": assert lookback == 365 else: assert lookback == 180 @given( major_catalyst=st.sampled_from(_ALL_MAJOR_CATALYSTS), routine_catalyst=st.sampled_from(_ROUTINE_CATALYSTS), ) @settings(max_examples=100) def test_major_lookback_strictly_greater_than_routine( self, major_catalyst: str, routine_catalyst: str, ): """**Validates: Requirements 11.3, 11.5** The major decision lookback window must always be strictly greater than the routine signal lookback window. """ major_tier = classify_catalyst_tier(major_catalyst) routine_tier = classify_catalyst_tier(routine_catalyst) major_lookback = _lookback_days(major_tier) routine_lookback = _lookback_days(routine_tier) assert major_lookback > routine_lookback, ( f"Major lookback {major_lookback} not > routine {routine_lookback}" )