"""Integration tests for the competitive pipeline end-to-end. Exercises the competitive signal path through all stages: Document Intelligence → Pattern Mining → Signal Propagation → Aggregation Also tests lake publisher writes for competitor relationships and competitive signals, and competitive toggle state propagation. Requirements: 4.1, 5.1, 6.1, 6.4, 7.3 """ from __future__ import annotations import uuid from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock from services.aggregation.pattern_matcher import ( HistoricalPattern, ) from services.aggregation.signal_propagation import ( CompetitiveSignalRecord, build_pattern_weighted_signals, ) from services.aggregation.worker import ( AggregationConfig, ImpactRow, assemble_trend_with_evidence, build_weighted_signals, ) from services.lake_publisher.worker import ( publish_competitive_signal_fact, publish_competitor_relationship_fact, ) from services.shared.config import CompetitiveConfig from services.shared.schemas import TrendDirection NOW = datetime(2026, 6, 10, 12, 0, 0, tzinfo=timezone.utc) # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _make_company_impacts() -> list[ImpactRow]: """Build company-specific impact rows for aggregation.""" return [ ImpactRow( document_id="doc-company-1", confidence=0.82, novelty_score=0.6, source_credibility=0.8, sentiment="positive", impact_score=0.7, catalyst_type="earnings", key_facts=["Revenue beat by 10%"], risks=["Supply chain concerns"], published_at=NOW - timedelta(hours=3), ), ImpactRow( document_id="doc-company-2", confidence=0.75, novelty_score=0.5, source_credibility=0.7, sentiment="positive", impact_score=0.55, catalyst_type="rating_change", key_facts=["Analyst upgrade"], risks=[], published_at=NOW - timedelta(hours=6), ), ] def _make_self_pattern( ticker: str = "AAPL", catalyst_type: str = "earnings", bullish_pct: float = 0.8, bearish_pct: float = 0.2, confidence: float = 0.65, ) -> HistoricalPattern: """Build a self-company historical pattern.""" return HistoricalPattern( source_ticker=ticker, target_ticker=ticker, catalyst_type=catalyst_type, time_horizon="7d", sample_count=10, bullish_pct=bullish_pct, bearish_pct=bearish_pct, avg_strength=0.6, avg_time_to_resolution=3.5, pattern_confidence=confidence, data_start=NOW - timedelta(days=90), data_end=NOW - timedelta(days=5), tier="routine_signal", insufficient_data=False, ) def _make_competitive_signal( source_ticker: str = "MSFT", target_ticker: str = "AAPL", direction: str = "bearish", strength: float = 0.35, ) -> CompetitiveSignalRecord: """Build a competitive signal record.""" return CompetitiveSignalRecord( source_document_id=str(uuid.uuid4()), source_ticker=source_ticker, target_ticker=target_ticker, catalyst_type="product_launch", pattern_confidence=0.55, signal_direction=direction, signal_strength=strength, relationship_strength=0.7, computed_at=NOW - timedelta(hours=1), ) # --------------------------------------------------------------------------- # Stage 1: Pattern Mining → Signal Propagation → Aggregation # --------------------------------------------------------------------------- class TestPatternMiningToAggregation: """Test that pattern mining feeds correctly into aggregation.""" def test_self_patterns_merge_with_company_signals(self): """Self-company patterns should blend with company signals in aggregation.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") patterns = [_make_self_pattern()] competitive_signals: list[CompetitiveSignalRecord] = [] pattern_ws = build_pattern_weighted_signals( patterns, competitive_signals, NOW, "7d", ) all_signals = company_signals + pattern_ws assembled = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary assert summary.entity_id == "AAPL" assert summary.trend_strength > 0 assert summary.confidence > 0 def test_competitive_signals_merge_with_company_signals(self): """Competitive signals should blend with company signals in aggregation.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") patterns: list[HistoricalPattern] = [] competitive_signals = [_make_competitive_signal()] pattern_ws = build_pattern_weighted_signals( patterns, competitive_signals, NOW, "7d", ) all_signals = company_signals + pattern_ws assembled = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary assert summary.entity_id == "AAPL" assert summary.trend_strength > 0 assert summary.confidence > 0 def test_opposing_pattern_increases_contradiction(self): """Bearish pattern signals opposing bullish company signals should increase contradiction.""" company_impacts = _make_company_impacts() # positive sentiment company_signals = build_weighted_signals(company_impacts, NOW, "7d") # Bearish pattern opposing positive company signals bearish_pattern = _make_self_pattern( bullish_pct=0.15, bearish_pct=0.85, confidence=0.7, ) competitive_signals = [ _make_competitive_signal(direction="bearish", strength=0.5), ] pattern_ws = build_pattern_weighted_signals( [bearish_pattern], competitive_signals, NOW, "7d", ) # With pattern signals (opposing) all_signals = company_signals + pattern_ws assembled_with = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) # Without pattern signals assembled_without = assemble_trend_with_evidence( "AAPL", "7d", company_signals, company_impacts, reference_time=NOW, ) assert assembled_with.summary.contradiction_score >= assembled_without.summary.contradiction_score def test_no_pattern_data_produces_identical_output(self): """Without pattern data, output should be identical to company-only.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") # Empty patterns and competitive signals pattern_ws = build_pattern_weighted_signals([], [], NOW, "7d") assert pattern_ws == [] assembled = assemble_trend_with_evidence( "AAPL", "7d", company_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary assert summary.trend_direction in ( TrendDirection.BULLISH, TrendDirection.BEARISH, TrendDirection.MIXED, TrendDirection.NEUTRAL, ) assert summary.confidence > 0 def test_full_three_layer_aggregation(self): """End-to-end: company signals + pattern signals + competitive signals.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") patterns = [_make_self_pattern()] competitive_signals = [_make_competitive_signal(direction="bullish", strength=0.3)] pattern_ws = build_pattern_weighted_signals( patterns, competitive_signals, NOW, "7d", ) all_signals = company_signals + pattern_ws assembled = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary assert summary.entity_id == "AAPL" assert summary.trend_strength > 0 assert summary.confidence > 0 # Evidence should include pattern signal document IDs all_evidence = summary.top_supporting_evidence + summary.top_opposing_evidence assert len(all_evidence) > 0 # --------------------------------------------------------------------------- # Lake publisher writes # --------------------------------------------------------------------------- class TestLakePublisherCompetitiveFacts: """Test lake publisher writes correct Parquet partitions for competitive data.""" def test_publish_competitor_relationship_fact(self): """Competitor relationship fact should be written to correct partition path.""" minio = MagicMock() ref = publish_competitor_relationship_fact( client=minio, relationship_id=str(uuid.uuid4()), company_a_id=str(uuid.uuid4()), company_b_id=str(uuid.uuid4()), relationship_type="direct_rival", strength=0.8, bidirectional=True, source="manual", active=True, created_at=NOW, ) assert ref.startswith("s3://") assert "competitor_relationships" in ref assert "dt=" in ref minio.put_object.assert_called_once() def test_publish_competitive_signal_fact(self): """Competitive signal fact should be written with target_ticker partition.""" minio = MagicMock() ref = publish_competitive_signal_fact( client=minio, signal_id=str(uuid.uuid4()), source_document_id=str(uuid.uuid4()), source_ticker="MSFT", target_ticker="AAPL", catalyst_type="product_launch", pattern_confidence=0.6, signal_direction="bearish", signal_strength=0.4, relationship_strength=0.7, computed_at=NOW, ) assert ref.startswith("s3://") assert "competitive_signals" in ref assert "target_ticker=AAPL" in ref assert "dt=" in ref minio.put_object.assert_called_once() def test_publish_competitor_relationship_inferred(self): """Inferred relationship fact should preserve source='inferred'.""" minio = MagicMock() ref = publish_competitor_relationship_fact( client=minio, relationship_id=str(uuid.uuid4()), company_a_id=str(uuid.uuid4()), company_b_id=str(uuid.uuid4()), relationship_type="same_sector", strength=0.5, bidirectional=True, source="inferred", active=True, created_at=NOW, ) assert ref.startswith("s3://") assert "competitor_relationships" in ref minio.put_object.assert_called_once() # --------------------------------------------------------------------------- # Competitive toggle propagation # --------------------------------------------------------------------------- class TestCompetitiveTogglePropagation: """Test that competitive toggle state changes propagate correctly.""" def test_disabled_competitive_config_flag(self): """When competitive_enabled=False, config should reflect that.""" cfg = AggregationConfig(competitive_enabled=False) assert not cfg.competitive_enabled def test_enabled_competitive_config_uses_weight(self): """When competitive_enabled=True, competitive_signal_weight is applied.""" cfg = AggregationConfig(competitive_enabled=True, competitive_signal_weight=0.2) assert cfg.competitive_enabled assert cfg.competitive_signal_weight == 0.2 def test_toggle_disable_reenable_preserves_data(self): """Disabling and re-enabling the toggle should not lose pattern data.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") patterns = [_make_self_pattern()] competitive_signals = [_make_competitive_signal()] # Simulate disabled: only company signals cfg_disabled = AggregationConfig(competitive_enabled=False) assert not cfg_disabled.competitive_enabled assembled_disabled = assemble_trend_with_evidence( "AAPL", "7d", company_signals, company_impacts, reference_time=NOW, ) # Simulate re-enabled: company + pattern signals cfg_enabled = AggregationConfig(competitive_enabled=True) assert cfg_enabled.competitive_enabled pattern_ws = build_pattern_weighted_signals( patterns, competitive_signals, NOW, "7d", ) all_signals = company_signals + pattern_ws assembled_enabled = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) # Both should produce valid summaries assert assembled_disabled.summary.entity_id == "AAPL" assert assembled_enabled.summary.entity_id == "AAPL" assert assembled_disabled.summary.confidence > 0 assert assembled_enabled.summary.confidence > 0 def test_competitive_weight_configurable(self): """CompetitiveConfig weight should be configurable.""" cfg = CompetitiveConfig(competitive_signal_weight=0.4) assert cfg.competitive_signal_weight == 0.4 patterns = [_make_self_pattern()] ws_default = build_pattern_weighted_signals( patterns, [], NOW, "7d", config=CompetitiveConfig(competitive_signal_weight=0.2), ) ws_higher = build_pattern_weighted_signals( patterns, [], NOW, "7d", config=CompetitiveConfig(competitive_signal_weight=0.5), ) # Higher weight should produce higher impact scores assert ws_higher[0].impact_score >= ws_default[0].impact_score