319 lines
12 KiB
Python
319 lines
12 KiB
Python
"""Tests for aggregation worker — rolling window trend summary computation.
|
|
|
|
Tests the pure logic functions (no DB required). The async DB functions
|
|
are covered by integration tests.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from services.aggregation.scoring import (
|
|
ScoringConfig,
|
|
WeightedSignal,
|
|
compute_signal_weight,
|
|
)
|
|
from services.aggregation.worker import (
|
|
AggregationConfig,
|
|
AssembledTrend,
|
|
ImpactRow,
|
|
assemble_trend_summary,
|
|
assemble_trend_with_evidence,
|
|
build_weighted_signals,
|
|
compute_contradiction_score,
|
|
compute_trend_confidence,
|
|
derive_trend_direction,
|
|
extract_catalysts_and_risks,
|
|
rank_evidence,
|
|
)
|
|
from services.shared.schemas import MarketContext, TrendDirection, TrendWindow
|
|
|
|
NOW = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
|
|
def _make_impact(
|
|
doc_id: str = "doc-1",
|
|
sentiment: str = "positive",
|
|
impact_score: float = 0.7,
|
|
catalyst_type: str = "earnings",
|
|
confidence: float = 0.8,
|
|
source_credibility: float = 0.8,
|
|
novelty_score: float = 0.5,
|
|
published_at: datetime | None = None,
|
|
risks: list[str] | None = None,
|
|
) -> ImpactRow:
|
|
return ImpactRow(
|
|
document_id=doc_id,
|
|
confidence=confidence,
|
|
novelty_score=novelty_score,
|
|
source_credibility=source_credibility,
|
|
sentiment=sentiment,
|
|
impact_score=impact_score,
|
|
catalyst_type=catalyst_type,
|
|
key_facts=["some fact"],
|
|
risks=risks or [],
|
|
published_at=published_at or NOW - timedelta(hours=1),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# build_weighted_signals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_build_weighted_signals_basic():
|
|
impacts = [_make_impact("d1"), _make_impact("d2", sentiment="negative")]
|
|
signals = build_weighted_signals(impacts, NOW, "7d")
|
|
assert len(signals) == 2
|
|
assert signals[0].document_id == "d1"
|
|
assert signals[0].sentiment_value == 1.0
|
|
assert signals[1].sentiment_value == -1.0
|
|
assert all(s.weight.combined > 0 for s in signals)
|
|
|
|
|
|
def test_build_weighted_signals_low_confidence_gated():
|
|
impacts = [_make_impact("d1", confidence=0.1)]
|
|
signals = build_weighted_signals(impacts, NOW, "7d")
|
|
assert signals[0].weight.combined == 0.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# derive_trend_direction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_direction_bullish():
|
|
assert derive_trend_direction(0.5) == TrendDirection.BULLISH
|
|
|
|
|
|
def test_direction_bearish():
|
|
assert derive_trend_direction(-0.5) == TrendDirection.BEARISH
|
|
|
|
|
|
def test_direction_neutral():
|
|
assert derive_trend_direction(0.05) == TrendDirection.NEUTRAL
|
|
|
|
|
|
def test_direction_mixed_high_contradiction():
|
|
assert derive_trend_direction(0.1, contradiction_score=0.2) == TrendDirection.MIXED
|
|
|
|
|
|
def test_direction_bullish_despite_contradiction():
|
|
"""Strong sentiment overrides contradiction."""
|
|
assert derive_trend_direction(0.5, contradiction_score=0.3) == TrendDirection.BULLISH
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# compute_contradiction_score
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_contradiction_no_signals():
|
|
assert compute_contradiction_score([]) == 0.0
|
|
|
|
|
|
def test_contradiction_all_positive():
|
|
sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8)
|
|
signals = [
|
|
WeightedSignal("d1", sw, sentiment_value=1.0, impact_score=0.5),
|
|
WeightedSignal("d2", sw, sentiment_value=1.0, impact_score=0.5),
|
|
]
|
|
assert compute_contradiction_score(signals) == 0.0
|
|
|
|
|
|
def test_contradiction_equal_opposing():
|
|
sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8)
|
|
signals = [
|
|
WeightedSignal("d1", sw, sentiment_value=1.0, impact_score=0.5),
|
|
WeightedSignal("d2", sw, sentiment_value=-1.0, impact_score=0.5),
|
|
]
|
|
score = compute_contradiction_score(signals)
|
|
assert abs(score - 0.5) < 1e-4
|
|
|
|
|
|
def test_contradiction_mostly_positive():
|
|
sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8)
|
|
signals = [
|
|
WeightedSignal("d1", sw, sentiment_value=1.0, impact_score=0.8),
|
|
WeightedSignal("d2", sw, sentiment_value=1.0, impact_score=0.8),
|
|
WeightedSignal("d3", sw, sentiment_value=-1.0, impact_score=0.3),
|
|
]
|
|
score = compute_contradiction_score(signals)
|
|
assert 0.0 < score < 0.5
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# rank_evidence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_rank_evidence_separates_supporting_opposing():
|
|
sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8)
|
|
signals = [
|
|
WeightedSignal("pos1", sw, sentiment_value=1.0, impact_score=0.9),
|
|
WeightedSignal("pos2", sw, sentiment_value=1.0, impact_score=0.3),
|
|
WeightedSignal("neg1", sw, sentiment_value=-1.0, impact_score=0.7),
|
|
WeightedSignal("neutral1", sw, sentiment_value=0.0, impact_score=0.5),
|
|
]
|
|
supporting, opposing = rank_evidence(signals)
|
|
assert supporting == ["pos1", "pos2"]
|
|
assert opposing == ["neg1"]
|
|
|
|
|
|
def test_rank_evidence_respects_max():
|
|
sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8)
|
|
signals = [
|
|
WeightedSignal(f"d{i}", sw, sentiment_value=1.0, impact_score=0.5)
|
|
for i in range(20)
|
|
]
|
|
supporting, opposing = rank_evidence(signals, max_refs=3)
|
|
assert len(supporting) == 3
|
|
assert len(opposing) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# extract_catalysts_and_risks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_extract_catalysts_and_risks():
|
|
impacts = [
|
|
_make_impact("d1", catalyst_type="earnings", risks=["regulatory risk"]),
|
|
_make_impact("d2", catalyst_type="earnings", risks=["supply chain"]),
|
|
_make_impact("d3", catalyst_type="product", risks=["regulatory risk"]),
|
|
]
|
|
signals = build_weighted_signals(impacts, NOW, "7d")
|
|
catalysts, risks = extract_catalysts_and_risks(impacts, signals)
|
|
assert catalysts[0] == "earnings" # highest cumulative weight
|
|
assert "product" in catalysts
|
|
# Risks should be deduplicated
|
|
risk_lower = [r.lower() for r in risks]
|
|
assert risk_lower.count("regulatory risk") == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# compute_trend_confidence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_confidence_no_signals():
|
|
assert compute_trend_confidence([], 0.0) == 0.0
|
|
|
|
|
|
def test_confidence_increases_with_more_signals():
|
|
sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8)
|
|
few = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(2)]
|
|
many = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(15)]
|
|
c_few = compute_trend_confidence(few, 0.0)
|
|
c_many = compute_trend_confidence(many, 0.0)
|
|
assert c_many > c_few
|
|
|
|
|
|
def test_confidence_penalized_by_contradiction():
|
|
sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8)
|
|
signals = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(5)]
|
|
c_low = compute_trend_confidence(signals, 0.0)
|
|
c_high = compute_trend_confidence(signals, 0.5)
|
|
assert c_high < c_low
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# assemble_trend_summary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_assemble_trend_summary_bullish():
|
|
impacts = [
|
|
_make_impact("d1", sentiment="positive", impact_score=0.8),
|
|
_make_impact("d2", sentiment="positive", impact_score=0.6),
|
|
]
|
|
signals = build_weighted_signals(impacts, NOW, "7d")
|
|
summary = assemble_trend_summary("AAPL", "7d", signals, impacts, reference_time=NOW)
|
|
|
|
assert summary.entity_id == "AAPL"
|
|
assert summary.window == TrendWindow.SEVEN_DAY
|
|
assert summary.trend_direction == TrendDirection.BULLISH
|
|
assert summary.trend_strength > 0
|
|
assert summary.confidence > 0
|
|
assert len(summary.top_supporting_evidence) > 0
|
|
assert summary.generated_at == NOW
|
|
|
|
|
|
def test_assemble_trend_summary_mixed():
|
|
impacts = [
|
|
_make_impact("d1", sentiment="positive", impact_score=0.5),
|
|
_make_impact("d2", sentiment="negative", impact_score=0.5),
|
|
]
|
|
signals = build_weighted_signals(impacts, NOW, "7d")
|
|
summary = assemble_trend_summary("TSLA", "7d", signals, impacts, reference_time=NOW)
|
|
|
|
# Equal opposing signals → neutral or mixed
|
|
assert summary.trend_direction in (TrendDirection.NEUTRAL, TrendDirection.MIXED)
|
|
assert summary.contradiction_score > 0
|
|
|
|
|
|
def test_assemble_trend_summary_empty():
|
|
summary = assemble_trend_summary("AAPL", "7d", [], [], reference_time=NOW)
|
|
assert summary.trend_direction == TrendDirection.NEUTRAL
|
|
assert summary.trend_strength == 0.0
|
|
assert summary.confidence == 0.0
|
|
|
|
|
|
def test_assemble_trend_summary_with_market_context():
|
|
impacts = [_make_impact("d1")]
|
|
ctx = MarketContext(ticker="AAPL", volatility=3.0, bars_available=5)
|
|
signals = build_weighted_signals(impacts, NOW, "7d", market_ctx=ctx)
|
|
summary = assemble_trend_summary("AAPL", "7d", signals, impacts, market_ctx=ctx, reference_time=NOW)
|
|
assert summary.market_context is not None
|
|
assert summary.market_context.ticker == "AAPL"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AggregationConfig
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_aggregation_config_defaults():
|
|
cfg = AggregationConfig()
|
|
assert len(cfg.effective_windows()) == len(TrendWindow)
|
|
assert isinstance(cfg.effective_scoring(), ScoringConfig)
|
|
|
|
|
|
def test_aggregation_config_custom_windows():
|
|
cfg = AggregationConfig(windows=["7d", "30d"])
|
|
assert cfg.effective_windows() == ["7d", "30d"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# assemble_trend_with_evidence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_assemble_trend_with_evidence_returns_ranked_details():
|
|
impacts = [
|
|
_make_impact("d1", sentiment="positive", impact_score=0.8),
|
|
_make_impact("d2", sentiment="negative", impact_score=0.6),
|
|
_make_impact("d3", sentiment="positive", impact_score=0.5),
|
|
]
|
|
signals = build_weighted_signals(impacts, NOW, "7d")
|
|
result = assemble_trend_with_evidence("AAPL", "7d", signals, impacts, reference_time=NOW)
|
|
|
|
assert isinstance(result, AssembledTrend)
|
|
assert result.summary.entity_id == "AAPL"
|
|
# Supporting evidence should contain the positive docs
|
|
assert len(result.supporting_evidence) == 2
|
|
assert all(e.sentiment_value > 0 for e in result.supporting_evidence)
|
|
# Opposing evidence should contain the negative doc
|
|
assert len(result.opposing_evidence) == 1
|
|
assert result.opposing_evidence[0].document_id == "d2"
|
|
# Rank scores should be positive
|
|
assert all(e.rank_score > 0 for e in result.supporting_evidence)
|
|
assert all(e.rank_score > 0 for e in result.opposing_evidence)
|
|
# Summary evidence IDs should match
|
|
assert result.summary.top_supporting_evidence == [e.document_id for e in result.supporting_evidence]
|
|
assert result.summary.top_opposing_evidence == [e.document_id for e in result.opposing_evidence]
|
|
|
|
|
|
def test_assemble_trend_with_evidence_empty_signals():
|
|
result = assemble_trend_with_evidence("AAPL", "7d", [], [], reference_time=NOW)
|
|
assert result.supporting_evidence == []
|
|
assert result.opposing_evidence == []
|
|
assert result.summary.trend_direction == TrendDirection.NEUTRAL
|