"""Integration tests for the macro pipeline end-to-end. Exercises the macro signal path through all stages: Macro Ingestion → Classification → Interpolation → Aggregation → Recommendation Also tests lake publisher writes for global events and macro impacts, and macro toggle state propagation. Requirements: 1.1, 2.1, 4.1, 5.1, 7.3, 11.1 """ from __future__ import annotations import json import uuid from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock import pytest from services.aggregation.interpolation import ( MacroImpactRecord, compute_macro_impact, ) from services.aggregation.projection import ( MacroEventInfo, TrendProjection, compute_projection, ) from services.aggregation.worker import ( AggregationConfig, ImpactRow, MacroImpactRow, assemble_trend_with_evidence, build_macro_weighted_signals, build_weighted_signals, ) from services.extractor.event_classifier import GlobalEvent from services.lake_publisher.worker import ( publish_global_event_fact, publish_macro_impact_fact, publish_trend_projection_fact, ) from services.recommendation.eligibility import evaluate_eligibility from services.recommendation.worker import ( build_recommendation, build_thesis, ) from services.shared.schemas import ( ActionType, ExposureProfileSchema, MarketPositionTier, ModelMetadata, RecommendationMode, TrendDirection, TrendWindow, ) NOW = datetime(2026, 5, 15, 14, 0, 0, tzinfo=timezone.utc) # --------------------------------------------------------------------------- # Shared fixtures # --------------------------------------------------------------------------- SAMPLE_EVENT = GlobalEvent( event_id=str(uuid.uuid4()), event_types=["trade_barrier", "cost_increase"], severity="high", affected_regions=["US", "CN"], affected_sectors=["Technology"], affected_commodities=["semiconductors"], summary="US imposes new tariffs on Chinese semiconductor imports", key_facts=["25% tariff on semiconductor imports", "Effective in 30 days"], estimated_duration="medium_term", confidence=0.85, source_document_id=str(uuid.uuid4()), model_metadata=ModelMetadata( provider="ollama", model_name="test-model", prompt_version="event-v1", schema_version="1.0.0", ), ) SAMPLE_PROFILE = ExposureProfileSchema( company_id=str(uuid.uuid4()), geographic_revenue_mix={"US": 0.45, "CN": 0.20, "EU": 0.25, "JP": 0.10}, supply_chain_regions=["CN", "TW", "KR"], key_input_commodities=["semiconductors", "rare_earth"], regulatory_jurisdictions=["US", "EU"], market_position_tier=MarketPositionTier.MULTINATIONAL, export_dependency_pct=0.55, source="manual", confidence=1.0, version=1, ) def _make_company_impacts() -> list[ImpactRow]: """Build company-specific impact rows for aggregation.""" return [ ImpactRow( document_id="doc-company-1", confidence=0.82, novelty_score=0.6, source_credibility=0.8, sentiment="positive", impact_score=0.7, catalyst_type="earnings", key_facts=["Revenue beat by 10%"], risks=["Supply chain concerns"], published_at=NOW - timedelta(hours=3), ), ImpactRow( document_id="doc-company-2", confidence=0.75, novelty_score=0.5, source_credibility=0.7, sentiment="positive", impact_score=0.55, catalyst_type="rating_change", key_facts=["Analyst upgrade"], risks=[], published_at=NOW - timedelta(hours=6), ), ] def _make_macro_impact_rows(event: GlobalEvent) -> list[MacroImpactRow]: """Build macro impact rows from a classified event.""" return [ MacroImpactRow( event_id=event.event_id, company_id=SAMPLE_PROFILE.company_id, ticker="AAPL", macro_impact_score=0.45, impact_direction="negative", contributing_factors=["geographic_overlap:0.650"], confidence=0.8, computed_at=NOW, source_document_id=event.source_document_id, event_published_at=NOW - timedelta(hours=2), ), ] # --------------------------------------------------------------------------- # Stage 1: Classification → Interpolation # --------------------------------------------------------------------------- class TestClassificationToInterpolation: """Test that event classification feeds correctly into interpolation.""" def test_classified_event_produces_macro_impact(self): """A classified GlobalEvent should produce a MacroImpactRecord.""" impact = compute_macro_impact(SAMPLE_EVENT, SAMPLE_PROFILE) assert impact.event_id == SAMPLE_EVENT.event_id assert impact.company_id == SAMPLE_PROFILE.company_id assert 0.0 < impact.macro_impact_score <= 1.0 assert impact.confidence > 0 assert len(impact.contributing_factors) > 0 def test_zero_overlap_event_produces_zero_score(self): """An event with no overlap should produce score 0.0.""" no_overlap_event = GlobalEvent( event_id=str(uuid.uuid4()), event_types=["geopolitical_risk"], severity="high", affected_regions=["BR", "AR"], affected_sectors=["Agriculture"], affected_commodities=["soybeans"], summary="South American agricultural crisis", confidence=0.9, source_document_id=str(uuid.uuid4()), ) no_overlap_profile = ExposureProfileSchema( company_id=str(uuid.uuid4()), geographic_revenue_mix={"DE": 0.5, "FR": 0.5}, supply_chain_regions=["DE", "FR"], key_input_commodities=["steel"], market_position_tier=MarketPositionTier.REGIONAL, ) impact = compute_macro_impact(no_overlap_event, no_overlap_profile) assert impact.macro_impact_score == 0.0 def test_multiple_impact_types_preserved(self): """Event with multiple impact types should preserve all in classification.""" assert len(SAMPLE_EVENT.event_types) == 2 assert "trade_barrier" in SAMPLE_EVENT.event_types assert "cost_increase" in SAMPLE_EVENT.event_types # --------------------------------------------------------------------------- # Stage 2: Interpolation → Aggregation # --------------------------------------------------------------------------- class TestInterpolationToAggregation: """Test that macro impact signals merge into aggregation correctly.""" def test_macro_signals_merge_with_company_signals(self): """Macro signals should blend with company signals in aggregation.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT) macro_signals = build_macro_weighted_signals( macro_impacts, NOW, "7d", macro_signal_weight=0.3, ) all_signals = company_signals + macro_signals assembled = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary assert summary.entity_id == "AAPL" assert summary.trend_strength > 0 assert summary.confidence > 0 def test_macro_signals_affect_contradiction_score(self): """Opposing macro signals should increase contradiction score.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") # Company signals are positive, macro is negative → contradiction macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT) macro_signals = build_macro_weighted_signals( macro_impacts, NOW, "7d", macro_signal_weight=0.3, ) # With macro (opposing) all_signals = company_signals + macro_signals assembled_with = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) # Without macro assembled_without = assemble_trend_with_evidence( "AAPL", "7d", company_signals, company_impacts, reference_time=NOW, ) # Contradiction should be higher with opposing macro signals assert assembled_with.summary.contradiction_score >= assembled_without.summary.contradiction_score def test_no_macro_data_produces_identical_output(self): """Without macro data, output should be identical to company-only.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "AAPL", "7d", company_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary assert summary.trend_direction in ( TrendDirection.BULLISH, TrendDirection.BEARISH, TrendDirection.MIXED, TrendDirection.NEUTRAL, ) assert summary.confidence > 0 def test_macro_toggle_disabled_skips_macro_signals(self): """When macro is disabled, config should reflect that.""" cfg = AggregationConfig(macro_enabled=False) assert not cfg.macro_enabled # The actual toggle check happens in aggregate_company_window # which reads from DB. Here we verify the config flag works. cfg_enabled = AggregationConfig(macro_enabled=True) assert cfg_enabled.macro_enabled # --------------------------------------------------------------------------- # Stage 3: Aggregation → Projection → Recommendation # --------------------------------------------------------------------------- class TestAggregationToRecommendation: """Test the full flow from aggregation through projection to recommendation.""" def _build_trend_with_macro(self): """Build a trend summary that includes macro signals.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT) macro_signals = build_macro_weighted_signals( macro_impacts, NOW, "7d", macro_signal_weight=0.3, ) all_signals = company_signals + macro_signals assembled = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) return assembled.summary def test_projection_computed_from_trend(self): """A projection should be computed from the trend summary.""" summary = self._build_trend_with_macro() macro_event_infos = [ MacroEventInfo( event_id=SAMPLE_EVENT.event_id, macro_impact_score=0.45, impact_direction="negative", confidence=0.8, estimated_duration="medium_term", severity="high", event_age_hours=2.0, ), ] projection = compute_projection( summary=summary, macro_events=macro_event_infos, macro_enabled=True, ) assert projection.projected_direction in ("bullish", "bearish", "mixed", "neutral") assert 0.0 <= projection.projected_strength <= 1.0 assert 0.0 <= projection.projected_confidence <= 1.0 assert len(projection.driving_factors) > 0 def test_recommendation_includes_projection_in_thesis(self): """Recommendation thesis should cite projection when available.""" summary = self._build_trend_with_macro() result = evaluate_eligibility(summary) projection = TrendProjection( projected_direction="bearish", projected_strength=0.6, projected_confidence=0.5, projection_horizon="7d", driving_factors=["Macro signals project bearish impact"], macro_contribution_pct=0.3, diverges_from_current=True, low_confidence=False, ) thesis = build_thesis(summary, result, projection=projection) assert "Forward projection" in thesis assert "bearish" in thesis assert "diverges" in thesis.lower() def test_low_confidence_projection_excluded_from_thesis(self): """Low-confidence projections should not appear in thesis.""" summary = self._build_trend_with_macro() result = evaluate_eligibility(summary) low_conf_projection = TrendProjection( projected_direction="bearish", projected_strength=0.3, projected_confidence=0.2, projection_horizon="7d", driving_factors=["Weak signal"], low_confidence=True, ) thesis = build_thesis(summary, result, projection=low_conf_projection) assert "Forward projection" not in thesis def test_recommendation_time_horizon_includes_projection(self): """Recommendation time_horizon should reference projection horizon.""" summary = self._build_trend_with_macro() result = evaluate_eligibility(summary) projection = TrendProjection( projected_direction="bullish", projected_strength=0.7, projected_confidence=0.6, projection_horizon="7d", driving_factors=["Positive momentum"], low_confidence=False, ) rec = build_recommendation( summary, result, reference_time=NOW, projection=projection, ) assert "proj:7d" in rec.time_horizon def test_full_macro_pipeline_to_recommendation(self): """End-to-end: classification → interpolation → aggregation → recommendation.""" # 1. Classify event (already have SAMPLE_EVENT) # 2. Compute macro impact impact = compute_macro_impact(SAMPLE_EVENT, SAMPLE_PROFILE) assert impact.macro_impact_score > 0 # 3. Build company + macro signals and aggregate company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") macro_rows = [ MacroImpactRow( event_id=SAMPLE_EVENT.event_id, company_id=SAMPLE_PROFILE.company_id, ticker="AAPL", macro_impact_score=impact.macro_impact_score, impact_direction=impact.impact_direction, contributing_factors=impact.contributing_factors, confidence=impact.confidence, computed_at=NOW, source_document_id=SAMPLE_EVENT.source_document_id, event_published_at=NOW - timedelta(hours=2), ), ] macro_signals = build_macro_weighted_signals( macro_rows, NOW, "7d", macro_signal_weight=0.3, ) all_signals = company_signals + macro_signals assembled = assemble_trend_with_evidence( "AAPL", "7d", all_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary # 4. Compute projection projection = compute_projection( summary=summary, macro_events=[ MacroEventInfo( event_id=SAMPLE_EVENT.event_id, macro_impact_score=impact.macro_impact_score, impact_direction=impact.impact_direction, confidence=impact.confidence, estimated_duration=SAMPLE_EVENT.estimated_duration, severity=SAMPLE_EVENT.severity, event_age_hours=2.0, ), ], macro_enabled=True, ) # 5. Generate recommendation eligibility = evaluate_eligibility(summary) rec = build_recommendation( summary, eligibility, reference_time=NOW, projection=projection, ) assert rec.ticker == "AAPL" assert rec.action in (ActionType.BUY, ActionType.SELL, ActionType.HOLD, ActionType.WATCH) assert len(rec.thesis) > 0 assert rec.confidence > 0 # --------------------------------------------------------------------------- # Lake publisher writes # --------------------------------------------------------------------------- class TestLakePublisherMacroFacts: """Test lake publisher writes correct Parquet partitions for macro data.""" def test_publish_global_event_fact(self): """Global event fact should be written to correct partition path.""" minio = MagicMock() ref = publish_global_event_fact( client=minio, event_id=SAMPLE_EVENT.event_id, event_types=SAMPLE_EVENT.event_types, severity=SAMPLE_EVENT.severity, affected_regions=SAMPLE_EVENT.affected_regions, affected_sectors=SAMPLE_EVENT.affected_sectors, affected_commodities=SAMPLE_EVENT.affected_commodities, summary=SAMPLE_EVENT.summary, estimated_duration=SAMPLE_EVENT.estimated_duration, confidence=SAMPLE_EVENT.confidence, source_document_id=SAMPLE_EVENT.source_document_id, created_at=NOW, ) assert ref.startswith("s3://") assert "global_events" in ref assert "dt=" in ref minio.put_object.assert_called_once() def test_publish_macro_impact_fact(self): """Macro impact fact should be written with ticker partition.""" minio = MagicMock() ref = publish_macro_impact_fact( client=minio, event_id=SAMPLE_EVENT.event_id, company_id=SAMPLE_PROFILE.company_id, ticker="AAPL", macro_impact_score=0.45, impact_direction="negative", contributing_factors=["geographic_overlap:0.650"], confidence=0.8, computed_at=NOW, ) assert ref.startswith("s3://") assert "macro_impacts" in ref assert "ticker=AAPL" in ref minio.put_object.assert_called_once() def test_publish_trend_projection_fact(self): """Trend projection fact should be written with ticker partition.""" minio = MagicMock() ref = publish_trend_projection_fact( client=minio, trend_window_id=str(uuid.uuid4()), ticker="AAPL", projected_direction="bullish", projected_strength=0.7, projected_confidence=0.6, projection_horizon="7d", driving_factors=["Positive momentum"], macro_contribution_pct=0.3, diverges_from_current=False, computed_at=NOW, ) assert ref.startswith("s3://") assert "trend_projections" in ref assert "ticker=AAPL" in ref minio.put_object.assert_called_once() # --------------------------------------------------------------------------- # Macro toggle propagation # --------------------------------------------------------------------------- class TestMacroTogglePropagation: """Test that macro toggle state changes propagate correctly.""" def test_disabled_macro_config_skips_macro_weight(self): """When macro_enabled=False, macro_signal_weight should not matter.""" cfg = AggregationConfig(macro_enabled=False, macro_signal_weight=0.5) assert not cfg.macro_enabled # The aggregation worker checks macro_enabled before fetching macro data def test_enabled_macro_config_uses_weight(self): """When macro_enabled=True, macro_signal_weight is applied.""" cfg = AggregationConfig(macro_enabled=True, macro_signal_weight=0.3) assert cfg.macro_enabled assert cfg.macro_signal_weight == 0.3 def test_macro_disabled_projection_has_reduced_confidence(self): """Projections without macro data should have lower confidence.""" company_impacts = _make_company_impacts() company_signals = build_weighted_signals(company_impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "AAPL", "7d", company_signals, company_impacts, reference_time=NOW, ) summary = assembled.summary # With macro enabled but no events proj_enabled = compute_projection( summary=summary, macro_events=None, macro_enabled=True, ) # With macro disabled proj_disabled = compute_projection( summary=summary, macro_events=None, macro_enabled=False, ) assert proj_disabled.projected_confidence <= proj_enabled.projected_confidence