Files
stonks-oracle/tests/test_macro_integration.py
T
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

550 lines
20 KiB
Python

"""Integration tests for the macro pipeline end-to-end.
Exercises the macro signal path through all stages:
Macro Ingestion → Classification → Interpolation → Aggregation → Recommendation
Also tests lake publisher writes for global events and macro impacts,
and macro toggle state propagation.
Requirements: 1.1, 2.1, 4.1, 5.1, 7.3, 11.1
"""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock
from services.aggregation.interpolation import (
compute_macro_impact,
)
from services.aggregation.projection import (
MacroEventInfo,
TrendProjection,
compute_projection,
)
from services.aggregation.worker import (
AggregationConfig,
ImpactRow,
MacroImpactRow,
assemble_trend_with_evidence,
build_macro_weighted_signals,
build_weighted_signals,
)
from services.extractor.event_classifier import GlobalEvent
from services.lake_publisher.worker import (
publish_global_event_fact,
publish_macro_impact_fact,
publish_trend_projection_fact,
)
from services.recommendation.eligibility import evaluate_eligibility
from services.recommendation.worker import (
build_recommendation,
build_thesis,
)
from services.shared.schemas import (
ActionType,
ExposureProfileSchema,
MarketPositionTier,
ModelMetadata,
TrendDirection,
)
NOW = datetime(2026, 5, 15, 14, 0, 0, tzinfo=timezone.utc)
# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------
SAMPLE_EVENT = GlobalEvent(
event_id=str(uuid.uuid4()),
event_types=["trade_barrier", "cost_increase"],
severity="high",
affected_regions=["US", "CN"],
affected_sectors=["Technology"],
affected_commodities=["semiconductors"],
summary="US imposes new tariffs on Chinese semiconductor imports",
key_facts=["25% tariff on semiconductor imports", "Effective in 30 days"],
estimated_duration="medium_term",
confidence=0.85,
source_document_id=str(uuid.uuid4()),
model_metadata=ModelMetadata(
provider="ollama", model_name="test-model",
prompt_version="event-v1", schema_version="1.0.0",
),
)
SAMPLE_PROFILE = ExposureProfileSchema(
company_id=str(uuid.uuid4()),
geographic_revenue_mix={"US": 0.45, "CN": 0.20, "EU": 0.25, "JP": 0.10},
supply_chain_regions=["CN", "TW", "KR"],
key_input_commodities=["semiconductors", "rare_earth"],
regulatory_jurisdictions=["US", "EU"],
market_position_tier=MarketPositionTier.MULTINATIONAL,
export_dependency_pct=0.55,
source="manual",
confidence=1.0,
version=1,
)
def _make_company_impacts() -> list[ImpactRow]:
"""Build company-specific impact rows for aggregation."""
return [
ImpactRow(
document_id="doc-company-1",
confidence=0.82,
novelty_score=0.6,
source_credibility=0.8,
sentiment="positive",
impact_score=0.7,
catalyst_type="earnings",
key_facts=["Revenue beat by 10%"],
risks=["Supply chain concerns"],
published_at=NOW - timedelta(hours=3),
),
ImpactRow(
document_id="doc-company-2",
confidence=0.75,
novelty_score=0.5,
source_credibility=0.7,
sentiment="positive",
impact_score=0.55,
catalyst_type="rating_change",
key_facts=["Analyst upgrade"],
risks=[],
published_at=NOW - timedelta(hours=6),
),
]
def _make_macro_impact_rows(event: GlobalEvent) -> list[MacroImpactRow]:
"""Build macro impact rows from a classified event."""
return [
MacroImpactRow(
event_id=event.event_id,
company_id=SAMPLE_PROFILE.company_id,
ticker="AAPL",
macro_impact_score=0.45,
impact_direction="negative",
contributing_factors=["geographic_overlap:0.650"],
confidence=0.8,
computed_at=NOW,
source_document_id=event.source_document_id,
event_published_at=NOW - timedelta(hours=2),
),
]
# ---------------------------------------------------------------------------
# Stage 1: Classification → Interpolation
# ---------------------------------------------------------------------------
class TestClassificationToInterpolation:
"""Test that event classification feeds correctly into interpolation."""
def test_classified_event_produces_macro_impact(self):
"""A classified GlobalEvent should produce a MacroImpactRecord."""
impact = compute_macro_impact(SAMPLE_EVENT, SAMPLE_PROFILE)
assert impact.event_id == SAMPLE_EVENT.event_id
assert impact.company_id == SAMPLE_PROFILE.company_id
assert 0.0 < impact.macro_impact_score <= 1.0
assert impact.confidence > 0
assert len(impact.contributing_factors) > 0
def test_zero_overlap_event_produces_zero_score(self):
"""An event with no overlap should produce score 0.0."""
no_overlap_event = GlobalEvent(
event_id=str(uuid.uuid4()),
event_types=["geopolitical_risk"],
severity="high",
affected_regions=["BR", "AR"],
affected_sectors=["Agriculture"],
affected_commodities=["soybeans"],
summary="South American agricultural crisis",
confidence=0.9,
source_document_id=str(uuid.uuid4()),
)
no_overlap_profile = ExposureProfileSchema(
company_id=str(uuid.uuid4()),
geographic_revenue_mix={"DE": 0.5, "FR": 0.5},
supply_chain_regions=["DE", "FR"],
key_input_commodities=["steel"],
market_position_tier=MarketPositionTier.REGIONAL,
)
impact = compute_macro_impact(no_overlap_event, no_overlap_profile)
assert impact.macro_impact_score == 0.0
def test_multiple_impact_types_preserved(self):
"""Event with multiple impact types should preserve all in classification."""
assert len(SAMPLE_EVENT.event_types) == 2
assert "trade_barrier" in SAMPLE_EVENT.event_types
assert "cost_increase" in SAMPLE_EVENT.event_types
# ---------------------------------------------------------------------------
# Stage 2: Interpolation → Aggregation
# ---------------------------------------------------------------------------
class TestInterpolationToAggregation:
"""Test that macro impact signals merge into aggregation correctly."""
def test_macro_signals_merge_with_company_signals(self):
"""Macro signals should blend with company signals in aggregation."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT)
macro_signals = build_macro_weighted_signals(
macro_impacts, NOW, "7d", macro_signal_weight=0.3,
)
all_signals = company_signals + macro_signals
assembled = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
assert summary.entity_id == "AAPL"
assert summary.trend_strength > 0
assert summary.confidence > 0
def test_macro_signals_affect_contradiction_score(self):
"""Opposing macro signals should increase contradiction score."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
# Company signals are positive, macro is negative → contradiction
macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT)
macro_signals = build_macro_weighted_signals(
macro_impacts, NOW, "7d", macro_signal_weight=0.3,
)
# With macro (opposing)
all_signals = company_signals + macro_signals
assembled_with = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
# Without macro
assembled_without = assemble_trend_with_evidence(
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
)
# Contradiction should be higher with opposing macro signals
assert assembled_with.summary.contradiction_score >= assembled_without.summary.contradiction_score
def test_no_macro_data_produces_identical_output(self):
"""Without macro data, output should be identical to company-only."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
assembled = assemble_trend_with_evidence(
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
assert summary.trend_direction in (
TrendDirection.BULLISH, TrendDirection.BEARISH,
TrendDirection.MIXED, TrendDirection.NEUTRAL,
)
assert summary.confidence > 0
def test_macro_toggle_disabled_skips_macro_signals(self):
"""When macro is disabled, config should reflect that."""
cfg = AggregationConfig(macro_enabled=False)
assert not cfg.macro_enabled
# The actual toggle check happens in aggregate_company_window
# which reads from DB. Here we verify the config flag works.
cfg_enabled = AggregationConfig(macro_enabled=True)
assert cfg_enabled.macro_enabled
# ---------------------------------------------------------------------------
# Stage 3: Aggregation → Projection → Recommendation
# ---------------------------------------------------------------------------
class TestAggregationToRecommendation:
"""Test the full flow from aggregation through projection to recommendation."""
def _build_trend_with_macro(self):
"""Build a trend summary that includes macro signals."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
macro_impacts = _make_macro_impact_rows(SAMPLE_EVENT)
macro_signals = build_macro_weighted_signals(
macro_impacts, NOW, "7d", macro_signal_weight=0.3,
)
all_signals = company_signals + macro_signals
assembled = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
return assembled.summary
def test_projection_computed_from_trend(self):
"""A projection should be computed from the trend summary."""
summary = self._build_trend_with_macro()
macro_event_infos = [
MacroEventInfo(
event_id=SAMPLE_EVENT.event_id,
macro_impact_score=0.45,
impact_direction="negative",
confidence=0.8,
estimated_duration="medium_term",
severity="high",
event_age_hours=2.0,
),
]
projection = compute_projection(
summary=summary,
macro_events=macro_event_infos,
macro_enabled=True,
)
assert projection.projected_direction in ("bullish", "bearish", "mixed", "neutral")
assert 0.0 <= projection.projected_strength <= 1.0
assert 0.0 <= projection.projected_confidence <= 1.0
assert len(projection.driving_factors) > 0
def test_recommendation_includes_projection_in_thesis(self):
"""Recommendation thesis should cite projection when available."""
summary = self._build_trend_with_macro()
result = evaluate_eligibility(summary)
projection = TrendProjection(
projected_direction="bearish",
projected_strength=0.6,
projected_confidence=0.5,
projection_horizon="7d",
driving_factors=["Macro signals project bearish impact"],
macro_contribution_pct=0.3,
diverges_from_current=True,
low_confidence=False,
)
thesis = build_thesis(summary, result, projection=projection)
assert "Forward projection" in thesis
assert "bearish" in thesis
assert "diverges" in thesis.lower()
def test_low_confidence_projection_excluded_from_thesis(self):
"""Low-confidence projections should not appear in thesis."""
summary = self._build_trend_with_macro()
result = evaluate_eligibility(summary)
low_conf_projection = TrendProjection(
projected_direction="bearish",
projected_strength=0.3,
projected_confidence=0.2,
projection_horizon="7d",
driving_factors=["Weak signal"],
low_confidence=True,
)
thesis = build_thesis(summary, result, projection=low_conf_projection)
assert "Forward projection" not in thesis
def test_recommendation_time_horizon_includes_projection(self):
"""Recommendation time_horizon should reference projection horizon."""
summary = self._build_trend_with_macro()
result = evaluate_eligibility(summary)
projection = TrendProjection(
projected_direction="bullish",
projected_strength=0.7,
projected_confidence=0.6,
projection_horizon="7d",
driving_factors=["Positive momentum"],
low_confidence=False,
)
rec = build_recommendation(
summary, result, reference_time=NOW, projection=projection,
)
assert "proj:7d" in rec.time_horizon
def test_full_macro_pipeline_to_recommendation(self):
"""End-to-end: classification → interpolation → aggregation → recommendation."""
# 1. Classify event (already have SAMPLE_EVENT)
# 2. Compute macro impact
impact = compute_macro_impact(SAMPLE_EVENT, SAMPLE_PROFILE)
assert impact.macro_impact_score > 0
# 3. Build company + macro signals and aggregate
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
macro_rows = [
MacroImpactRow(
event_id=SAMPLE_EVENT.event_id,
company_id=SAMPLE_PROFILE.company_id,
ticker="AAPL",
macro_impact_score=impact.macro_impact_score,
impact_direction=impact.impact_direction,
contributing_factors=impact.contributing_factors,
confidence=impact.confidence,
computed_at=NOW,
source_document_id=SAMPLE_EVENT.source_document_id,
event_published_at=NOW - timedelta(hours=2),
),
]
macro_signals = build_macro_weighted_signals(
macro_rows, NOW, "7d", macro_signal_weight=0.3,
)
all_signals = company_signals + macro_signals
assembled = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
# 4. Compute projection
projection = compute_projection(
summary=summary,
macro_events=[
MacroEventInfo(
event_id=SAMPLE_EVENT.event_id,
macro_impact_score=impact.macro_impact_score,
impact_direction=impact.impact_direction,
confidence=impact.confidence,
estimated_duration=SAMPLE_EVENT.estimated_duration,
severity=SAMPLE_EVENT.severity,
event_age_hours=2.0,
),
],
macro_enabled=True,
)
# 5. Generate recommendation
eligibility = evaluate_eligibility(summary)
rec = build_recommendation(
summary, eligibility, reference_time=NOW, projection=projection,
)
assert rec.ticker == "AAPL"
assert rec.action in (ActionType.BUY, ActionType.SELL, ActionType.HOLD, ActionType.WATCH)
assert len(rec.thesis) > 0
assert rec.confidence > 0
# ---------------------------------------------------------------------------
# Lake publisher writes
# ---------------------------------------------------------------------------
class TestLakePublisherMacroFacts:
"""Test lake publisher writes correct Parquet partitions for macro data."""
def test_publish_global_event_fact(self):
"""Global event fact should be written to correct partition path."""
minio = MagicMock()
ref = publish_global_event_fact(
client=minio,
event_id=SAMPLE_EVENT.event_id,
event_types=SAMPLE_EVENT.event_types,
severity=SAMPLE_EVENT.severity,
affected_regions=SAMPLE_EVENT.affected_regions,
affected_sectors=SAMPLE_EVENT.affected_sectors,
affected_commodities=SAMPLE_EVENT.affected_commodities,
summary=SAMPLE_EVENT.summary,
estimated_duration=SAMPLE_EVENT.estimated_duration,
confidence=SAMPLE_EVENT.confidence,
source_document_id=SAMPLE_EVENT.source_document_id,
created_at=NOW,
)
assert ref.startswith("s3://")
assert "global_events" in ref
assert "dt=" in ref
minio.put_object.assert_called_once()
def test_publish_macro_impact_fact(self):
"""Macro impact fact should be written with ticker partition."""
minio = MagicMock()
ref = publish_macro_impact_fact(
client=minio,
event_id=SAMPLE_EVENT.event_id,
company_id=SAMPLE_PROFILE.company_id,
ticker="AAPL",
macro_impact_score=0.45,
impact_direction="negative",
contributing_factors=["geographic_overlap:0.650"],
confidence=0.8,
computed_at=NOW,
)
assert ref.startswith("s3://")
assert "macro_impacts" in ref
assert "ticker=AAPL" in ref
minio.put_object.assert_called_once()
def test_publish_trend_projection_fact(self):
"""Trend projection fact should be written with ticker partition."""
minio = MagicMock()
ref = publish_trend_projection_fact(
client=minio,
trend_window_id=str(uuid.uuid4()),
ticker="AAPL",
projected_direction="bullish",
projected_strength=0.7,
projected_confidence=0.6,
projection_horizon="7d",
driving_factors=["Positive momentum"],
macro_contribution_pct=0.3,
diverges_from_current=False,
computed_at=NOW,
)
assert ref.startswith("s3://")
assert "trend_projections" in ref
assert "ticker=AAPL" in ref
minio.put_object.assert_called_once()
# ---------------------------------------------------------------------------
# Macro toggle propagation
# ---------------------------------------------------------------------------
class TestMacroTogglePropagation:
"""Test that macro toggle state changes propagate correctly."""
def test_disabled_macro_config_skips_macro_weight(self):
"""When macro_enabled=False, macro_signal_weight should not matter."""
cfg = AggregationConfig(macro_enabled=False, macro_signal_weight=0.5)
assert not cfg.macro_enabled
# The aggregation worker checks macro_enabled before fetching macro data
def test_enabled_macro_config_uses_weight(self):
"""When macro_enabled=True, macro_signal_weight is applied."""
cfg = AggregationConfig(macro_enabled=True, macro_signal_weight=0.3)
assert cfg.macro_enabled
assert cfg.macro_signal_weight == 0.3
def test_macro_disabled_projection_has_reduced_confidence(self):
"""Projections without macro data should have lower confidence."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
assembled = assemble_trend_with_evidence(
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
# With macro enabled but no events
proj_enabled = compute_projection(
summary=summary, macro_events=None, macro_enabled=True,
)
# With macro disabled
proj_disabled = compute_projection(
summary=summary, macro_events=None, macro_enabled=False,
)
assert proj_disabled.projected_confidence <= proj_enabled.projected_confidence