"""Tests for aggregation worker — rolling window trend summary computation. Tests the pure logic functions (no DB required). The async DB functions are covered by integration tests. """ from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock import pytest from services.aggregation.scoring import ( ScoringConfig, WeightedSignal, compute_signal_weight, ) from services.aggregation.worker import ( AggregationConfig, AssembledTrend, ImpactRow, assemble_trend_summary, assemble_trend_with_evidence, build_weighted_signals, compute_contradiction_score, compute_trend_confidence, derive_trend_direction, extract_catalysts_and_risks, fetch_probabilistic_scoring_enabled, rank_evidence, ) from services.shared.schemas import MarketContext, TrendDirection, TrendWindow NOW = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc) def _make_impact( doc_id: str = "doc-1", sentiment: str = "positive", impact_score: float = 0.7, catalyst_type: str = "earnings", confidence: float = 0.8, source_credibility: float = 0.8, novelty_score: float = 0.5, published_at: datetime | None = None, risks: list[str] | None = None, ) -> ImpactRow: return ImpactRow( document_id=doc_id, confidence=confidence, novelty_score=novelty_score, source_credibility=source_credibility, sentiment=sentiment, impact_score=impact_score, catalyst_type=catalyst_type, key_facts=["some fact"], risks=risks or [], published_at=published_at or NOW - timedelta(hours=1), ) # --------------------------------------------------------------------------- # build_weighted_signals # --------------------------------------------------------------------------- def test_build_weighted_signals_basic(): impacts = [_make_impact("d1"), _make_impact("d2", sentiment="negative")] signals = build_weighted_signals(impacts, NOW, "7d") assert len(signals) == 2 assert signals[0].document_id == "d1" assert signals[0].sentiment_value == 1.0 assert signals[1].sentiment_value == -1.0 assert all(s.weight.combined > 0 for s in signals) def test_build_weighted_signals_low_confidence_gated(): impacts = [_make_impact("d1", confidence=0.1)] signals = build_weighted_signals(impacts, NOW, "7d") assert signals[0].weight.combined == 0.0 # --------------------------------------------------------------------------- # derive_trend_direction # --------------------------------------------------------------------------- def test_direction_bullish(): assert derive_trend_direction(0.5) == TrendDirection.BULLISH def test_direction_bearish(): assert derive_trend_direction(-0.5) == TrendDirection.BEARISH def test_direction_neutral(): assert derive_trend_direction(0.05) == TrendDirection.NEUTRAL def test_direction_mixed_high_contradiction(): assert derive_trend_direction(0.1, contradiction_score=0.2) == TrendDirection.MIXED def test_direction_bullish_despite_contradiction(): """Strong sentiment overrides contradiction.""" assert derive_trend_direction(0.5, contradiction_score=0.3) == TrendDirection.BULLISH # --------------------------------------------------------------------------- # compute_contradiction_score # --------------------------------------------------------------------------- def test_contradiction_no_signals(): assert compute_contradiction_score([]) == 0.0 def test_contradiction_all_positive(): sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) signals = [ WeightedSignal("d1", sw, sentiment_value=1.0, impact_score=0.5), WeightedSignal("d2", sw, sentiment_value=1.0, impact_score=0.5), ] assert compute_contradiction_score(signals) == 0.0 def test_contradiction_equal_opposing(): sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) signals = [ WeightedSignal("d1", sw, sentiment_value=1.0, impact_score=0.5), WeightedSignal("d2", sw, sentiment_value=-1.0, impact_score=0.5), ] score = compute_contradiction_score(signals) assert abs(score - 0.5) < 1e-4 def test_contradiction_mostly_positive(): sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) signals = [ WeightedSignal("d1", sw, sentiment_value=1.0, impact_score=0.8), WeightedSignal("d2", sw, sentiment_value=1.0, impact_score=0.8), WeightedSignal("d3", sw, sentiment_value=-1.0, impact_score=0.3), ] score = compute_contradiction_score(signals) assert 0.0 < score < 0.5 # --------------------------------------------------------------------------- # rank_evidence # --------------------------------------------------------------------------- def test_rank_evidence_separates_supporting_opposing(): sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) signals = [ WeightedSignal("pos1", sw, sentiment_value=1.0, impact_score=0.9), WeightedSignal("pos2", sw, sentiment_value=1.0, impact_score=0.3), WeightedSignal("neg1", sw, sentiment_value=-1.0, impact_score=0.7), WeightedSignal("neutral1", sw, sentiment_value=0.0, impact_score=0.5), ] supporting, opposing = rank_evidence(signals) assert supporting == ["pos1", "pos2"] assert opposing == ["neg1"] def test_rank_evidence_respects_max(): sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) signals = [ WeightedSignal(f"d{i}", sw, sentiment_value=1.0, impact_score=0.5) for i in range(20) ] supporting, opposing = rank_evidence(signals, max_refs=3) assert len(supporting) == 3 assert len(opposing) == 0 # --------------------------------------------------------------------------- # extract_catalysts_and_risks # --------------------------------------------------------------------------- def test_extract_catalysts_and_risks(): impacts = [ _make_impact("d1", catalyst_type="earnings", risks=["regulatory risk"]), _make_impact("d2", catalyst_type="earnings", risks=["supply chain"]), _make_impact("d3", catalyst_type="product", risks=["regulatory risk"]), ] signals = build_weighted_signals(impacts, NOW, "7d") catalysts, risks = extract_catalysts_and_risks(impacts, signals) assert catalysts[0] == "earnings" # highest cumulative weight assert "product" in catalysts # Risks should be deduplicated risk_lower = [r.lower() for r in risks] assert risk_lower.count("regulatory risk") == 1 # --------------------------------------------------------------------------- # compute_trend_confidence # --------------------------------------------------------------------------- def test_confidence_no_signals(): assert compute_trend_confidence([], 0.0) == 0.0 def test_confidence_increases_with_more_signals(): sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) few = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(2)] many = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(15)] c_few = compute_trend_confidence(few, 0.0) c_many = compute_trend_confidence(many, 0.0) assert c_many > c_few def test_confidence_penalized_by_contradiction(): sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) signals = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(5)] c_low = compute_trend_confidence(signals, 0.0) c_high = compute_trend_confidence(signals, 0.5) assert c_high < c_low # --------------------------------------------------------------------------- # Agreement dampener — sample-size scaling # --------------------------------------------------------------------------- def test_single_signal_never_paper_eligible(): """One signal agreeing with itself should not reach paper threshold (0.50).""" # Use the highest credibility possible sw = compute_signal_weight(NOW, NOW, "7d", 1.0, extraction_confidence=1.0) signals = [WeightedSignal("d0", sw, 1.0, 0.5)] conf = compute_trend_confidence(signals, 0.0) assert conf < 0.50, f"Single signal confidence {conf} should be below paper threshold 0.50" def test_two_signals_below_paper_threshold(): """Two agreeing signals with moderate credibility should stay below paper threshold.""" sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) signals = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(2)] conf = compute_trend_confidence(signals, 0.0) assert conf < 0.50, f"Two-signal confidence {conf} should be below paper threshold 0.50" def test_dampener_saturates_at_seven_plus(): """With 7+ unique sources the dampener should be at or near 1.0, producing the same confidence as the undampened formula would.""" sw = compute_signal_weight(NOW, NOW, "7d", 0.8, extraction_confidence=0.8) c7 = compute_trend_confidence( [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(7)], 0.0, ) c15 = compute_trend_confidence( [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(15)], 0.0, ) # 7 signals should be close to 15 signals (difference only from count_factor) # The agreement component should be identical (both dampened to ~1.0) assert c15 > c7 # more docs still helps via count_factor assert c7 > 0.55, f"7-signal confidence {c7} should be well above paper threshold" def test_three_good_signals_paper_eligible(): """Three signals with decent credibility (0.6+) should reach paper threshold.""" sw = compute_signal_weight(NOW, NOW, "7d", 0.7, extraction_confidence=0.7) signals = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(3)] conf = compute_trend_confidence(signals, 0.0) assert conf >= 0.50, f"Three good-signal confidence {conf} should reach paper threshold 0.50" def test_dampener_monotonically_increases_with_sources(): """Confidence should strictly increase as we add more agreeing unique sources.""" sw = compute_signal_weight(NOW, NOW, "7d", 0.6, extraction_confidence=0.6) prev = 0.0 for n in [1, 2, 3, 5, 7, 10, 15]: signals = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(n)] conf = compute_trend_confidence(signals, 0.0) assert conf > prev, f"Confidence should increase: n={n} conf={conf} <= prev={prev}" prev = conf def test_low_credibility_two_signals_not_paper_eligible(): """Two low-credibility signals agreeing should not be paper-eligible.""" sw = compute_signal_weight(NOW, NOW, "7d", 0.3, extraction_confidence=0.3) signals = [WeightedSignal(f"d{i}", sw, 1.0, 0.5) for i in range(2)] conf = compute_trend_confidence(signals, 0.0) assert conf < 0.50, f"Two low-cred signals confidence {conf} should be below 0.50" def test_mixed_sentiment_dampened_correctly(): """Mixed signals (some bullish, some bearish) should have lower agreement AND the dampener should further reduce it for small samples.""" sw = compute_signal_weight(NOW, NOW, "7d", 0.6, extraction_confidence=0.6) # 2 bullish, 1 bearish — agreement = 2/3 = 0.667 signals = [ WeightedSignal("d0", sw, 1.0, 0.5), WeightedSignal("d1", sw, 1.0, 0.5), WeightedSignal("d2", sw, -1.0, 0.5), ] conf = compute_trend_confidence(signals, 0.0) # With dampener: agreement 0.667 * dampener(3) ≈ 0.667 * 0.667 ≈ 0.445 # Should be well below paper threshold assert conf < 0.50, f"Mixed 3-signal confidence {conf} should be below 0.50" def test_assemble_trend_summary_bullish(): impacts = [ _make_impact("d1", sentiment="positive", impact_score=0.8), _make_impact("d2", sentiment="positive", impact_score=0.6), ] signals = build_weighted_signals(impacts, NOW, "7d") summary = assemble_trend_summary("AAPL", "7d", signals, impacts, reference_time=NOW) assert summary.entity_id == "AAPL" assert summary.window == TrendWindow.SEVEN_DAY assert summary.trend_direction == TrendDirection.BULLISH assert summary.trend_strength > 0 assert summary.confidence > 0 assert len(summary.top_supporting_evidence) > 0 assert summary.generated_at == NOW def test_assemble_trend_summary_mixed(): impacts = [ _make_impact("d1", sentiment="positive", impact_score=0.5), _make_impact("d2", sentiment="negative", impact_score=0.5), ] signals = build_weighted_signals(impacts, NOW, "7d") summary = assemble_trend_summary("TSLA", "7d", signals, impacts, reference_time=NOW) # Equal opposing signals → neutral or mixed assert summary.trend_direction in (TrendDirection.NEUTRAL, TrendDirection.MIXED) assert summary.contradiction_score > 0 def test_assemble_trend_summary_empty(): summary = assemble_trend_summary("AAPL", "7d", [], [], reference_time=NOW) assert summary.trend_direction == TrendDirection.NEUTRAL assert summary.trend_strength == 0.0 assert summary.confidence == 0.0 def test_assemble_trend_summary_with_market_context(): impacts = [_make_impact("d1")] ctx = MarketContext(ticker="AAPL", volatility=3.0, bars_available=5) signals = build_weighted_signals(impacts, NOW, "7d", market_ctx=ctx) summary = assemble_trend_summary("AAPL", "7d", signals, impacts, market_ctx=ctx, reference_time=NOW) assert summary.market_context is not None assert summary.market_context.ticker == "AAPL" # --------------------------------------------------------------------------- # AggregationConfig # --------------------------------------------------------------------------- def test_aggregation_config_defaults(): cfg = AggregationConfig() assert len(cfg.effective_windows()) == len(TrendWindow) assert isinstance(cfg.effective_scoring(), ScoringConfig) def test_aggregation_config_custom_windows(): cfg = AggregationConfig(windows=["7d", "30d"]) assert cfg.effective_windows() == ["7d", "30d"] # --------------------------------------------------------------------------- # assemble_trend_with_evidence # --------------------------------------------------------------------------- def test_assemble_trend_with_evidence_returns_ranked_details(): impacts = [ _make_impact("d1", sentiment="positive", impact_score=0.8), _make_impact("d2", sentiment="negative", impact_score=0.6), _make_impact("d3", sentiment="positive", impact_score=0.5), ] signals = build_weighted_signals(impacts, NOW, "7d") result = assemble_trend_with_evidence("AAPL", "7d", signals, impacts, reference_time=NOW) assert isinstance(result, AssembledTrend) assert result.summary.entity_id == "AAPL" # Supporting evidence should contain the positive docs assert len(result.supporting_evidence) == 2 assert all(e.sentiment_value > 0 for e in result.supporting_evidence) # Opposing evidence should contain the negative doc assert len(result.opposing_evidence) == 1 assert result.opposing_evidence[0].document_id == "d2" # Rank scores should be positive assert all(e.rank_score > 0 for e in result.supporting_evidence) assert all(e.rank_score > 0 for e in result.opposing_evidence) # Summary evidence IDs should match assert result.summary.top_supporting_evidence == [e.document_id for e in result.supporting_evidence] assert result.summary.top_opposing_evidence == [e.document_id for e in result.opposing_evidence] def test_assemble_trend_with_evidence_empty_signals(): result = assemble_trend_with_evidence("AAPL", "7d", [], [], reference_time=NOW) assert result.supporting_evidence == [] assert result.opposing_evidence == [] assert result.summary.trend_direction == TrendDirection.NEUTRAL # --------------------------------------------------------------------------- # AggregationConfig — probabilistic_scoring_enabled field # --------------------------------------------------------------------------- def test_aggregation_config_probabilistic_default_false(): """probabilistic_scoring_enabled defaults to False (heuristic pipeline).""" cfg = AggregationConfig() assert cfg.probabilistic_scoring_enabled is False def test_aggregation_config_probabilistic_explicit_true(): """probabilistic_scoring_enabled can be set to True.""" cfg = AggregationConfig(probabilistic_scoring_enabled=True) assert cfg.probabilistic_scoring_enabled is True # --------------------------------------------------------------------------- # fetch_probabilistic_scoring_enabled — DB toggle reading # --------------------------------------------------------------------------- class _FakeRecord(dict): """Minimal dict-like object that mimics an asyncpg Record.""" pass @pytest.mark.asyncio async def test_fetch_probabilistic_enabled_true(): """Returns True when risk_configs has probabilistic_scoring_enabled='true'.""" pool = AsyncMock() pool.fetchrow = AsyncMock( return_value=_FakeRecord({"probabilistic_scoring_enabled": "true"}), ) result = await fetch_probabilistic_scoring_enabled(pool) assert result is True @pytest.mark.asyncio async def test_fetch_probabilistic_enabled_false(): """Returns False when risk_configs has probabilistic_scoring_enabled='false'.""" pool = AsyncMock() pool.fetchrow = AsyncMock( return_value=_FakeRecord({"probabilistic_scoring_enabled": "false"}), ) result = await fetch_probabilistic_scoring_enabled(pool) assert result is False @pytest.mark.asyncio async def test_fetch_probabilistic_enabled_missing_key(): """Returns False when the key is missing from config JSONB (value is None).""" pool = AsyncMock() pool.fetchrow = AsyncMock( return_value=_FakeRecord({"probabilistic_scoring_enabled": None}), ) result = await fetch_probabilistic_scoring_enabled(pool) assert result is False @pytest.mark.asyncio async def test_fetch_probabilistic_enabled_no_config_row(): """Returns False when no risk_configs row exists.""" pool = AsyncMock() pool.fetchrow = AsyncMock(return_value=None) result = await fetch_probabilistic_scoring_enabled(pool) assert result is False @pytest.mark.asyncio async def test_fetch_probabilistic_enabled_invalid_value(): """Returns False when the value is not a valid boolean string.""" pool = AsyncMock() pool.fetchrow = AsyncMock( return_value=_FakeRecord({"probabilistic_scoring_enabled": "yes"}), ) result = await fetch_probabilistic_scoring_enabled(pool) assert result is False @pytest.mark.asyncio async def test_fetch_probabilistic_enabled_db_unreachable(): """Returns False (fail-safe) when the database query raises an exception.""" pool = AsyncMock() pool.fetchrow = AsyncMock(side_effect=Exception("connection refused")) result = await fetch_probabilistic_scoring_enabled(pool) assert result is False