Files
stonks-oracle/tests/test_competitive_integration.py
T

394 lines
14 KiB
Python

"""Integration tests for the competitive pipeline end-to-end.
Exercises the competitive signal path through all stages:
Document Intelligence → Pattern Mining → Signal Propagation → Aggregation
Also tests lake publisher writes for competitor relationships and competitive
signals, and competitive toggle state propagation.
Requirements: 4.1, 5.1, 6.1, 6.4, 7.3
"""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock
import pytest
from services.aggregation.pattern_matcher import (
HistoricalPattern,
classify_catalyst_tier,
compute_pattern_confidence,
find_self_patterns,
)
from services.aggregation.signal_propagation import (
CompetitiveSignalRecord,
build_pattern_weighted_signals,
propagate_signals,
)
from services.aggregation.worker import (
AggregationConfig,
ImpactRow,
assemble_trend_with_evidence,
build_weighted_signals,
)
from services.lake_publisher.worker import (
publish_competitor_relationship_fact,
publish_competitive_signal_fact,
)
from services.shared.config import CompetitiveConfig
from services.shared.schemas import TrendDirection
NOW = datetime(2026, 6, 10, 12, 0, 0, tzinfo=timezone.utc)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _make_company_impacts() -> list[ImpactRow]:
"""Build company-specific impact rows for aggregation."""
return [
ImpactRow(
document_id="doc-company-1",
confidence=0.82,
novelty_score=0.6,
source_credibility=0.8,
sentiment="positive",
impact_score=0.7,
catalyst_type="earnings",
key_facts=["Revenue beat by 10%"],
risks=["Supply chain concerns"],
published_at=NOW - timedelta(hours=3),
),
ImpactRow(
document_id="doc-company-2",
confidence=0.75,
novelty_score=0.5,
source_credibility=0.7,
sentiment="positive",
impact_score=0.55,
catalyst_type="rating_change",
key_facts=["Analyst upgrade"],
risks=[],
published_at=NOW - timedelta(hours=6),
),
]
def _make_self_pattern(
ticker: str = "AAPL",
catalyst_type: str = "earnings",
bullish_pct: float = 0.8,
bearish_pct: float = 0.2,
confidence: float = 0.65,
) -> HistoricalPattern:
"""Build a self-company historical pattern."""
return HistoricalPattern(
source_ticker=ticker,
target_ticker=ticker,
catalyst_type=catalyst_type,
time_horizon="7d",
sample_count=10,
bullish_pct=bullish_pct,
bearish_pct=bearish_pct,
avg_strength=0.6,
avg_time_to_resolution=3.5,
pattern_confidence=confidence,
data_start=NOW - timedelta(days=90),
data_end=NOW - timedelta(days=5),
tier="routine_signal",
insufficient_data=False,
)
def _make_competitive_signal(
source_ticker: str = "MSFT",
target_ticker: str = "AAPL",
direction: str = "bearish",
strength: float = 0.35,
) -> CompetitiveSignalRecord:
"""Build a competitive signal record."""
return CompetitiveSignalRecord(
source_document_id=str(uuid.uuid4()),
source_ticker=source_ticker,
target_ticker=target_ticker,
catalyst_type="product_launch",
pattern_confidence=0.55,
signal_direction=direction,
signal_strength=strength,
relationship_strength=0.7,
computed_at=NOW - timedelta(hours=1),
)
# ---------------------------------------------------------------------------
# Stage 1: Pattern Mining → Signal Propagation → Aggregation
# ---------------------------------------------------------------------------
class TestPatternMiningToAggregation:
"""Test that pattern mining feeds correctly into aggregation."""
def test_self_patterns_merge_with_company_signals(self):
"""Self-company patterns should blend with company signals in aggregation."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
patterns = [_make_self_pattern()]
competitive_signals: list[CompetitiveSignalRecord] = []
pattern_ws = build_pattern_weighted_signals(
patterns, competitive_signals, NOW, "7d",
)
all_signals = company_signals + pattern_ws
assembled = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
assert summary.entity_id == "AAPL"
assert summary.trend_strength > 0
assert summary.confidence > 0
def test_competitive_signals_merge_with_company_signals(self):
"""Competitive signals should blend with company signals in aggregation."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
patterns: list[HistoricalPattern] = []
competitive_signals = [_make_competitive_signal()]
pattern_ws = build_pattern_weighted_signals(
patterns, competitive_signals, NOW, "7d",
)
all_signals = company_signals + pattern_ws
assembled = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
assert summary.entity_id == "AAPL"
assert summary.trend_strength > 0
assert summary.confidence > 0
def test_opposing_pattern_increases_contradiction(self):
"""Bearish pattern signals opposing bullish company signals should increase contradiction."""
company_impacts = _make_company_impacts() # positive sentiment
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
# Bearish pattern opposing positive company signals
bearish_pattern = _make_self_pattern(
bullish_pct=0.15, bearish_pct=0.85, confidence=0.7,
)
competitive_signals = [
_make_competitive_signal(direction="bearish", strength=0.5),
]
pattern_ws = build_pattern_weighted_signals(
[bearish_pattern], competitive_signals, NOW, "7d",
)
# With pattern signals (opposing)
all_signals = company_signals + pattern_ws
assembled_with = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
# Without pattern signals
assembled_without = assemble_trend_with_evidence(
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
)
assert assembled_with.summary.contradiction_score >= assembled_without.summary.contradiction_score
def test_no_pattern_data_produces_identical_output(self):
"""Without pattern data, output should be identical to company-only."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
# Empty patterns and competitive signals
pattern_ws = build_pattern_weighted_signals([], [], NOW, "7d")
assert pattern_ws == []
assembled = assemble_trend_with_evidence(
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
assert summary.trend_direction in (
TrendDirection.BULLISH, TrendDirection.BEARISH,
TrendDirection.MIXED, TrendDirection.NEUTRAL,
)
assert summary.confidence > 0
def test_full_three_layer_aggregation(self):
"""End-to-end: company signals + pattern signals + competitive signals."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
patterns = [_make_self_pattern()]
competitive_signals = [_make_competitive_signal(direction="bullish", strength=0.3)]
pattern_ws = build_pattern_weighted_signals(
patterns, competitive_signals, NOW, "7d",
)
all_signals = company_signals + pattern_ws
assembled = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
summary = assembled.summary
assert summary.entity_id == "AAPL"
assert summary.trend_strength > 0
assert summary.confidence > 0
# Evidence should include pattern signal document IDs
all_evidence = summary.top_supporting_evidence + summary.top_opposing_evidence
assert len(all_evidence) > 0
# ---------------------------------------------------------------------------
# Lake publisher writes
# ---------------------------------------------------------------------------
class TestLakePublisherCompetitiveFacts:
"""Test lake publisher writes correct Parquet partitions for competitive data."""
def test_publish_competitor_relationship_fact(self):
"""Competitor relationship fact should be written to correct partition path."""
minio = MagicMock()
ref = publish_competitor_relationship_fact(
client=minio,
relationship_id=str(uuid.uuid4()),
company_a_id=str(uuid.uuid4()),
company_b_id=str(uuid.uuid4()),
relationship_type="direct_rival",
strength=0.8,
bidirectional=True,
source="manual",
active=True,
created_at=NOW,
)
assert ref.startswith("s3://")
assert "competitor_relationships" in ref
assert "dt=" in ref
minio.put_object.assert_called_once()
def test_publish_competitive_signal_fact(self):
"""Competitive signal fact should be written with target_ticker partition."""
minio = MagicMock()
ref = publish_competitive_signal_fact(
client=minio,
signal_id=str(uuid.uuid4()),
source_document_id=str(uuid.uuid4()),
source_ticker="MSFT",
target_ticker="AAPL",
catalyst_type="product_launch",
pattern_confidence=0.6,
signal_direction="bearish",
signal_strength=0.4,
relationship_strength=0.7,
computed_at=NOW,
)
assert ref.startswith("s3://")
assert "competitive_signals" in ref
assert "target_ticker=AAPL" in ref
assert "dt=" in ref
minio.put_object.assert_called_once()
def test_publish_competitor_relationship_inferred(self):
"""Inferred relationship fact should preserve source='inferred'."""
minio = MagicMock()
ref = publish_competitor_relationship_fact(
client=minio,
relationship_id=str(uuid.uuid4()),
company_a_id=str(uuid.uuid4()),
company_b_id=str(uuid.uuid4()),
relationship_type="same_sector",
strength=0.5,
bidirectional=True,
source="inferred",
active=True,
created_at=NOW,
)
assert ref.startswith("s3://")
assert "competitor_relationships" in ref
minio.put_object.assert_called_once()
# ---------------------------------------------------------------------------
# Competitive toggle propagation
# ---------------------------------------------------------------------------
class TestCompetitiveTogglePropagation:
"""Test that competitive toggle state changes propagate correctly."""
def test_disabled_competitive_config_flag(self):
"""When competitive_enabled=False, config should reflect that."""
cfg = AggregationConfig(competitive_enabled=False)
assert not cfg.competitive_enabled
def test_enabled_competitive_config_uses_weight(self):
"""When competitive_enabled=True, competitive_signal_weight is applied."""
cfg = AggregationConfig(competitive_enabled=True, competitive_signal_weight=0.2)
assert cfg.competitive_enabled
assert cfg.competitive_signal_weight == 0.2
def test_toggle_disable_reenable_preserves_data(self):
"""Disabling and re-enabling the toggle should not lose pattern data."""
company_impacts = _make_company_impacts()
company_signals = build_weighted_signals(company_impacts, NOW, "7d")
patterns = [_make_self_pattern()]
competitive_signals = [_make_competitive_signal()]
# Simulate disabled: only company signals
cfg_disabled = AggregationConfig(competitive_enabled=False)
assert not cfg_disabled.competitive_enabled
assembled_disabled = assemble_trend_with_evidence(
"AAPL", "7d", company_signals, company_impacts, reference_time=NOW,
)
# Simulate re-enabled: company + pattern signals
cfg_enabled = AggregationConfig(competitive_enabled=True)
assert cfg_enabled.competitive_enabled
pattern_ws = build_pattern_weighted_signals(
patterns, competitive_signals, NOW, "7d",
)
all_signals = company_signals + pattern_ws
assembled_enabled = assemble_trend_with_evidence(
"AAPL", "7d", all_signals, company_impacts, reference_time=NOW,
)
# Both should produce valid summaries
assert assembled_disabled.summary.entity_id == "AAPL"
assert assembled_enabled.summary.entity_id == "AAPL"
assert assembled_disabled.summary.confidence > 0
assert assembled_enabled.summary.confidence > 0
def test_competitive_weight_configurable(self):
"""CompetitiveConfig weight should be configurable."""
cfg = CompetitiveConfig(competitive_signal_weight=0.4)
assert cfg.competitive_signal_weight == 0.4
patterns = [_make_self_pattern()]
ws_default = build_pattern_weighted_signals(
patterns, [], NOW, "7d", config=CompetitiveConfig(competitive_signal_weight=0.2),
)
ws_higher = build_pattern_weighted_signals(
patterns, [], NOW, "7d", config=CompetitiveConfig(competitive_signal_weight=0.5),
)
# Higher weight should produce higher impact scores
assert ws_higher[0].impact_score >= ws_default[0].impact_score