c85c0068a2
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files - Fix 12 failing tests to match current implementation behavior - Fix pytest_plugins in non-top-level conftest (moved to root conftest.py) - Auto-fix 189 lint issues (import sorting, unused imports) - Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests) - Add values-beta.yaml and values-paper.yaml for staged deployments - Update GitHub Actions workflow to use self-hosted-gremlin runners - Add integration-test job to CI pipeline Result: 1596 passed, 0 failed, 0 warnings
550 lines
20 KiB
Python
550 lines
20 KiB
Python
"""Integration tests for the macro pipeline end-to-end.
|
|
|
|
Exercises the macro signal path through all stages:
|
|
Macro Ingestion → Classification → Interpolation → Aggregation → Recommendation
|
|
|
|
Also tests lake publisher writes for global events and macro impacts,
|
|
and macro toggle state propagation.
|
|
|
|
Requirements: 1.1, 2.1, 4.1, 5.1, 7.3, 11.1
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from unittest.mock import MagicMock
|
|
|
|
from services.aggregation.interpolation import (
|
|
compute_macro_impact,
|
|
)
|
|
from services.aggregation.projection import (
|
|
MacroEventInfo,
|
|
TrendProjection,
|
|
compute_projection,
|
|
)
|
|
from services.aggregation.worker import (
|
|
AggregationConfig,
|
|
ImpactRow,
|
|
MacroImpactRow,
|
|
assemble_trend_with_evidence,
|
|
build_macro_weighted_signals,
|
|
build_weighted_signals,
|
|
)
|
|
from services.extractor.event_classifier import GlobalEvent
|
|
from services.lake_publisher.worker import (
|
|
publish_global_event_fact,
|
|
publish_macro_impact_fact,
|
|
publish_trend_projection_fact,
|
|
)
|
|
from services.recommendation.eligibility import evaluate_eligibility
|
|
from services.recommendation.worker import (
|
|
build_recommendation,
|
|
build_thesis,
|
|
)
|
|
from services.shared.schemas import (
|
|
ActionType,
|
|
ExposureProfileSchema,
|
|
MarketPositionTier,
|
|
ModelMetadata,
|
|
TrendDirection,
|
|
)
|
|
|
|
NOW = datetime(2026, 5, 15, 14, 0, 0, tzinfo=timezone.utc)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SAMPLE_EVENT = GlobalEvent(
|
|
event_id=str(uuid.uuid4()),
|
|
event_types=["trade_barrier", "cost_increase"],
|
|
severity="high",
|
|
affected_regions=["US", "CN"],
|
|
affected_sectors=["Technology"],
|
|
affected_commodities=["semiconductors"],
|
|
summary="US imposes new tariffs on Chinese semiconductor imports",
|
|
key_facts=["25% tariff on semiconductor imports", "Effective in 30 days"],
|
|
estimated_duration="medium_term",
|
|
confidence=0.85,
|
|
source_document_id=str(uuid.uuid4()),
|
|
model_metadata=ModelMetadata(
|
|
provider="ollama", model_name="test-model",
|
|
prompt_version="event-v1", schema_version="1.0.0",
|
|
),
|
|
)
|
|
|
|
SAMPLE_PROFILE = ExposureProfileSchema(
|
|
company_id=str(uuid.uuid4()),
|
|
geographic_revenue_mix={"US": 0.45, "CN": 0.20, "EU": 0.25, "JP": 0.10},
|
|
supply_chain_regions=["CN", "TW", "KR"],
|
|
key_input_commodities=["semiconductors", "rare_earth"],
|
|
regulatory_jurisdictions=["US", "EU"],
|
|
market_position_tier=MarketPositionTier.MULTINATIONAL,
|
|
export_dependency_pct=0.55,
|
|
source="manual",
|
|
confidence=1.0,
|
|
version=1,
|
|
)
|
|
|
|
|
|
def _make_company_impacts() -> list[ImpactRow]:
|
|
"""Build company-specific impact rows for aggregation."""
|
|
return [
|
|
ImpactRow(
|
|
document_id="doc-company-1",
|
|
confidence=0.82,
|
|
novelty_score=0.6,
|
|
source_credibility=0.8,
|
|
sentiment="positive",
|
|
impact_score=0.7,
|
|
catalyst_type="earnings",
|
|
key_facts=["Revenue beat by 10%"],
|
|
risks=["Supply chain concerns"],
|
|
published_at=NOW - timedelta(hours=3),
|
|
),
|
|
ImpactRow(
|
|
document_id="doc-company-2",
|
|
confidence=0.75,
|
|
novelty_score=0.5,
|
|
source_credibility=0.7,
|
|
sentiment="positive",
|
|
impact_score=0.55,
|
|
catalyst_type="rating_change",
|
|
key_facts=["Analyst upgrade"],
|
|
risks=[],
|
|
published_at=NOW - timedelta(hours=6),
|
|
),
|
|
]
|
|
|
|
|
|
def _make_macro_impact_rows(event: GlobalEvent) -> list[MacroImpactRow]:
|
|
"""Build macro impact rows from a classified event."""
|
|
return [
|
|
MacroImpactRow(
|
|
event_id=event.event_id,
|
|
company_id=SAMPLE_PROFILE.company_id,
|
|
ticker="AAPL",
|
|
macro_impact_score=0.45,
|
|
impact_direction="negative",
|
|
contributing_factors=["geographic_overlap:0.650"],
|
|
confidence=0.8,
|
|
computed_at=NOW,
|
|
source_document_id=event.source_document_id,
|
|
event_published_at=NOW - timedelta(hours=2),
|
|
),
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 1: Classification → Interpolation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestClassificationToInterpolation:
|
|
"""Test that event classification feeds correctly into interpolation."""
|
|
|
|
def test_classified_event_produces_macro_impact(self):
|
|
"""A classified GlobalEvent should produce a MacroImpactRecord."""
|
|
impact = compute_macro_impact(SAMPLE_EVENT, SAMPLE_PROFILE)
|
|
|
|
assert impact.event_id == SAMPLE_EVENT.event_id
|
|
assert impact.company_id == SAMPLE_PROFILE.company_id
|
|
assert 0.0 < impact.macro_impact_score <= 1.0
|
|
assert impact.confidence > 0
|
|
assert len(impact.contributing_factors) > 0
|
|
|
|
def test_zero_overlap_event_produces_zero_score(self):
|
|
"""An event with no overlap should produce score 0.0."""
|
|
no_overlap_event = GlobalEvent(
|
|
event_id=str(uuid.uuid4()),
|
|
event_types=["geopolitical_risk"],
|
|
severity="high",
|
|
affected_regions=["BR", "AR"],
|
|
affected_sectors=["Agriculture"],
|
|
affected_commodities=["soybeans"],
|
|
summary="South American agricultural crisis",
|
|
confidence=0.9,
|
|
source_document_id=str(uuid.uuid4()),
|
|
)
|
|
no_overlap_profile = ExposureProfileSchema(
|
|
company_id=str(uuid.uuid4()),
|
|
geographic_revenue_mix={"DE": 0.5, "FR": 0.5},
|
|
supply_chain_regions=["DE", "FR"],
|
|
key_input_commodities=["steel"],
|
|
market_position_tier=MarketPositionTier.REGIONAL,
|
|
)
|
|
impact = compute_macro_impact(no_overlap_event, no_overlap_profile)
|
|
assert impact.macro_impact_score == 0.0
|
|
|
|
def test_multiple_impact_types_preserved(self):
|
|
"""Event with multiple impact types should preserve all in classification."""
|
|
assert len(SAMPLE_EVENT.event_types) == 2
|
|
assert "trade_barrier" in SAMPLE_EVENT.event_types
|
|
assert "cost_increase" in SAMPLE_EVENT.event_types
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 2: Interpolation → Aggregation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInterpolationToAggregation:
|
|
"""Test that macro impact signals merge into aggregation correctly."""
|
|
|
|
def test_macro_signals_merge_with_company_signals(self):
|
|
"""Macro signals should blend with company signals in aggregation."""
|
|
company_impacts = _make_company_impacts()
|
|
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
|
|
|
|
macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT)
|
|
macro_signals = build_macro_weighted_signals(
|
|
macro_impacts, NOW, "7d", macro_signal_weight=0.3,
|
|
)
|
|
|
|
all_signals = company_signals + macro_signals
|
|
assembled = assemble_trend_with_evidence(
|
|
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
|
|
)
|
|
summary = assembled.summary
|
|
|
|
assert summary.entity_id == "AAPL"
|
|
assert summary.trend_strength > 0
|
|
assert summary.confidence > 0
|
|
|
|
def test_macro_signals_affect_contradiction_score(self):
|
|
"""Opposing macro signals should increase contradiction score."""
|
|
company_impacts = _make_company_impacts()
|
|
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
|
|
|
|
# Company signals are positive, macro is negative → contradiction
|
|
macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT)
|
|
macro_signals = build_macro_weighted_signals(
|
|
macro_impacts, NOW, "7d", macro_signal_weight=0.3,
|
|
)
|
|
|
|
# With macro (opposing)
|
|
all_signals = company_signals + macro_signals
|
|
assembled_with = assemble_trend_with_evidence(
|
|
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
|
|
)
|
|
|
|
# Without macro
|
|
assembled_without = assemble_trend_with_evidence(
|
|
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
|
|
)
|
|
|
|
# Contradiction should be higher with opposing macro signals
|
|
assert assembled_with.summary.contradiction_score >= assembled_without.summary.contradiction_score
|
|
|
|
def test_no_macro_data_produces_identical_output(self):
|
|
"""Without macro data, output should be identical to company-only."""
|
|
company_impacts = _make_company_impacts()
|
|
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
|
|
|
|
assembled = assemble_trend_with_evidence(
|
|
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
|
|
)
|
|
summary = assembled.summary
|
|
|
|
assert summary.trend_direction in (
|
|
TrendDirection.BULLISH, TrendDirection.BEARISH,
|
|
TrendDirection.MIXED, TrendDirection.NEUTRAL,
|
|
)
|
|
assert summary.confidence > 0
|
|
|
|
def test_macro_toggle_disabled_skips_macro_signals(self):
|
|
"""When macro is disabled, config should reflect that."""
|
|
cfg = AggregationConfig(macro_enabled=False)
|
|
assert not cfg.macro_enabled
|
|
# The actual toggle check happens in aggregate_company_window
|
|
# which reads from DB. Here we verify the config flag works.
|
|
cfg_enabled = AggregationConfig(macro_enabled=True)
|
|
assert cfg_enabled.macro_enabled
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stage 3: Aggregation → Projection → Recommendation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAggregationToRecommendation:
|
|
"""Test the full flow from aggregation through projection to recommendation."""
|
|
|
|
def _build_trend_with_macro(self):
|
|
"""Build a trend summary that includes macro signals."""
|
|
company_impacts = _make_company_impacts()
|
|
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
|
|
|
|
macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT)
|
|
macro_signals = build_macro_weighted_signals(
|
|
macro_impacts, NOW, "7d", macro_signal_weight=0.3,
|
|
)
|
|
|
|
all_signals = company_signals + macro_signals
|
|
assembled = assemble_trend_with_evidence(
|
|
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
|
|
)
|
|
return assembled.summary
|
|
|
|
def test_projection_computed_from_trend(self):
|
|
"""A projection should be computed from the trend summary."""
|
|
summary = self._build_trend_with_macro()
|
|
|
|
macro_event_infos = [
|
|
MacroEventInfo(
|
|
event_id=SAMPLE_EVENT.event_id,
|
|
macro_impact_score=0.45,
|
|
impact_direction="negative",
|
|
confidence=0.8,
|
|
estimated_duration="medium_term",
|
|
severity="high",
|
|
event_age_hours=2.0,
|
|
),
|
|
]
|
|
|
|
projection = compute_projection(
|
|
summary=summary,
|
|
macro_events=macro_event_infos,
|
|
macro_enabled=True,
|
|
)
|
|
|
|
assert projection.projected_direction in ("bullish", "bearish", "mixed", "neutral")
|
|
assert 0.0 <= projection.projected_strength <= 1.0
|
|
assert 0.0 <= projection.projected_confidence <= 1.0
|
|
assert len(projection.driving_factors) > 0
|
|
|
|
def test_recommendation_includes_projection_in_thesis(self):
|
|
"""Recommendation thesis should cite projection when available."""
|
|
summary = self._build_trend_with_macro()
|
|
result = evaluate_eligibility(summary)
|
|
|
|
projection = TrendProjection(
|
|
projected_direction="bearish",
|
|
projected_strength=0.6,
|
|
projected_confidence=0.5,
|
|
projection_horizon="7d",
|
|
driving_factors=["Macro signals project bearish impact"],
|
|
macro_contribution_pct=0.3,
|
|
diverges_from_current=True,
|
|
low_confidence=False,
|
|
)
|
|
|
|
thesis = build_thesis(summary, result, projection=projection)
|
|
assert "Forward projection" in thesis
|
|
assert "bearish" in thesis
|
|
assert "diverges" in thesis.lower()
|
|
|
|
def test_low_confidence_projection_excluded_from_thesis(self):
|
|
"""Low-confidence projections should not appear in thesis."""
|
|
summary = self._build_trend_with_macro()
|
|
result = evaluate_eligibility(summary)
|
|
|
|
low_conf_projection = TrendProjection(
|
|
projected_direction="bearish",
|
|
projected_strength=0.3,
|
|
projected_confidence=0.2,
|
|
projection_horizon="7d",
|
|
driving_factors=["Weak signal"],
|
|
low_confidence=True,
|
|
)
|
|
|
|
thesis = build_thesis(summary, result, projection=low_conf_projection)
|
|
assert "Forward projection" not in thesis
|
|
|
|
def test_recommendation_time_horizon_includes_projection(self):
|
|
"""Recommendation time_horizon should reference projection horizon."""
|
|
summary = self._build_trend_with_macro()
|
|
result = evaluate_eligibility(summary)
|
|
|
|
projection = TrendProjection(
|
|
projected_direction="bullish",
|
|
projected_strength=0.7,
|
|
projected_confidence=0.6,
|
|
projection_horizon="7d",
|
|
driving_factors=["Positive momentum"],
|
|
low_confidence=False,
|
|
)
|
|
|
|
rec = build_recommendation(
|
|
summary, result, reference_time=NOW, projection=projection,
|
|
)
|
|
assert "proj:7d" in rec.time_horizon
|
|
|
|
def test_full_macro_pipeline_to_recommendation(self):
|
|
"""End-to-end: classification → interpolation → aggregation → recommendation."""
|
|
# 1. Classify event (already have SAMPLE_EVENT)
|
|
# 2. Compute macro impact
|
|
impact = compute_macro_impact(SAMPLE_EVENT, SAMPLE_PROFILE)
|
|
assert impact.macro_impact_score > 0
|
|
|
|
# 3. Build company + macro signals and aggregate
|
|
company_impacts = _make_company_impacts()
|
|
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
|
|
|
|
macro_rows = [
|
|
MacroImpactRow(
|
|
event_id=SAMPLE_EVENT.event_id,
|
|
company_id=SAMPLE_PROFILE.company_id,
|
|
ticker="AAPL",
|
|
macro_impact_score=impact.macro_impact_score,
|
|
impact_direction=impact.impact_direction,
|
|
contributing_factors=impact.contributing_factors,
|
|
confidence=impact.confidence,
|
|
computed_at=NOW,
|
|
source_document_id=SAMPLE_EVENT.source_document_id,
|
|
event_published_at=NOW - timedelta(hours=2),
|
|
),
|
|
]
|
|
macro_signals = build_macro_weighted_signals(
|
|
macro_rows, NOW, "7d", macro_signal_weight=0.3,
|
|
)
|
|
|
|
all_signals = company_signals + macro_signals
|
|
assembled = assemble_trend_with_evidence(
|
|
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
|
|
)
|
|
summary = assembled.summary
|
|
|
|
# 4. Compute projection
|
|
projection = compute_projection(
|
|
summary=summary,
|
|
macro_events=[
|
|
MacroEventInfo(
|
|
event_id=SAMPLE_EVENT.event_id,
|
|
macro_impact_score=impact.macro_impact_score,
|
|
impact_direction=impact.impact_direction,
|
|
confidence=impact.confidence,
|
|
estimated_duration=SAMPLE_EVENT.estimated_duration,
|
|
severity=SAMPLE_EVENT.severity,
|
|
event_age_hours=2.0,
|
|
),
|
|
],
|
|
macro_enabled=True,
|
|
)
|
|
|
|
# 5. Generate recommendation
|
|
eligibility = evaluate_eligibility(summary)
|
|
rec = build_recommendation(
|
|
summary, eligibility, reference_time=NOW, projection=projection,
|
|
)
|
|
|
|
assert rec.ticker == "AAPL"
|
|
assert rec.action in (ActionType.BUY, ActionType.SELL, ActionType.HOLD, ActionType.WATCH)
|
|
assert len(rec.thesis) > 0
|
|
assert rec.confidence > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lake publisher writes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLakePublisherMacroFacts:
|
|
"""Test lake publisher writes correct Parquet partitions for macro data."""
|
|
|
|
def test_publish_global_event_fact(self):
|
|
"""Global event fact should be written to correct partition path."""
|
|
minio = MagicMock()
|
|
ref = publish_global_event_fact(
|
|
client=minio,
|
|
event_id=SAMPLE_EVENT.event_id,
|
|
event_types=SAMPLE_EVENT.event_types,
|
|
severity=SAMPLE_EVENT.severity,
|
|
affected_regions=SAMPLE_EVENT.affected_regions,
|
|
affected_sectors=SAMPLE_EVENT.affected_sectors,
|
|
affected_commodities=SAMPLE_EVENT.affected_commodities,
|
|
summary=SAMPLE_EVENT.summary,
|
|
estimated_duration=SAMPLE_EVENT.estimated_duration,
|
|
confidence=SAMPLE_EVENT.confidence,
|
|
source_document_id=SAMPLE_EVENT.source_document_id,
|
|
created_at=NOW,
|
|
)
|
|
|
|
assert ref.startswith("s3://")
|
|
assert "global_events" in ref
|
|
assert "dt=" in ref
|
|
minio.put_object.assert_called_once()
|
|
|
|
def test_publish_macro_impact_fact(self):
|
|
"""Macro impact fact should be written with ticker partition."""
|
|
minio = MagicMock()
|
|
ref = publish_macro_impact_fact(
|
|
client=minio,
|
|
event_id=SAMPLE_EVENT.event_id,
|
|
company_id=SAMPLE_PROFILE.company_id,
|
|
ticker="AAPL",
|
|
macro_impact_score=0.45,
|
|
impact_direction="negative",
|
|
contributing_factors=["geographic_overlap:0.650"],
|
|
confidence=0.8,
|
|
computed_at=NOW,
|
|
)
|
|
|
|
assert ref.startswith("s3://")
|
|
assert "macro_impacts" in ref
|
|
assert "ticker=AAPL" in ref
|
|
minio.put_object.assert_called_once()
|
|
|
|
def test_publish_trend_projection_fact(self):
|
|
"""Trend projection fact should be written with ticker partition."""
|
|
minio = MagicMock()
|
|
ref = publish_trend_projection_fact(
|
|
client=minio,
|
|
trend_window_id=str(uuid.uuid4()),
|
|
ticker="AAPL",
|
|
projected_direction="bullish",
|
|
projected_strength=0.7,
|
|
projected_confidence=0.6,
|
|
projection_horizon="7d",
|
|
driving_factors=["Positive momentum"],
|
|
macro_contribution_pct=0.3,
|
|
diverges_from_current=False,
|
|
computed_at=NOW,
|
|
)
|
|
|
|
assert ref.startswith("s3://")
|
|
assert "trend_projections" in ref
|
|
assert "ticker=AAPL" in ref
|
|
minio.put_object.assert_called_once()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Macro toggle propagation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMacroTogglePropagation:
|
|
"""Test that macro toggle state changes propagate correctly."""
|
|
|
|
def test_disabled_macro_config_skips_macro_weight(self):
|
|
"""When macro_enabled=False, macro_signal_weight should not matter."""
|
|
cfg = AggregationConfig(macro_enabled=False, macro_signal_weight=0.5)
|
|
assert not cfg.macro_enabled
|
|
# The aggregation worker checks macro_enabled before fetching macro data
|
|
|
|
def test_enabled_macro_config_uses_weight(self):
|
|
"""When macro_enabled=True, macro_signal_weight is applied."""
|
|
cfg = AggregationConfig(macro_enabled=True, macro_signal_weight=0.3)
|
|
assert cfg.macro_enabled
|
|
assert cfg.macro_signal_weight == 0.3
|
|
|
|
def test_macro_disabled_projection_has_reduced_confidence(self):
|
|
"""Projections without macro data should have lower confidence."""
|
|
company_impacts = _make_company_impacts()
|
|
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
|
|
assembled = assemble_trend_with_evidence(
|
|
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
|
|
)
|
|
summary = assembled.summary
|
|
|
|
# With macro enabled but no events
|
|
proj_enabled = compute_projection(
|
|
summary=summary, macro_events=None, macro_enabled=True,
|
|
)
|
|
# With macro disabled
|
|
proj_disabled = compute_projection(
|
|
summary=summary, macro_events=None, macro_enabled=False,
|
|
)
|
|
|
|
assert proj_disabled.projected_confidence <= proj_enabled.projected_confidence
|