c85c0068a2
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files - Fix 12 failing tests to match current implementation behavior - Fix pytest_plugins in non-top-level conftest (moved to root conftest.py) - Auto-fix 189 lint issues (import sorting, unused imports) - Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests) - Add values-beta.yaml and values-paper.yaml for staged deployments - Update GitHub Actions workflow to use self-hosted-gremlin runners - Add integration-test job to CI pipeline Result: 1596 passed, 0 failed, 0 warnings
125 lines
5.5 KiB
Python
125 lines
5.5 KiB
Python
"""Tests for the aggregation main loop signal propagation wiring.
|
|
|
|
Validates:
|
|
- Signal propagation is triggered after aggregation when competitive layer is enabled
|
|
- Consecutive failure tracking and operator alerting (Requirement 9.4)
|
|
- Propagation is skipped when competitive layer is disabled
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from services.aggregation.main import _trigger_signal_propagation
|
|
from services.shared.config import CompetitiveConfig
|
|
|
|
|
|
@pytest.fixture
|
|
def competitive_config():
|
|
return CompetitiveConfig(
|
|
propagation_failure_threshold=5,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_pool():
|
|
pool = AsyncMock()
|
|
return pool
|
|
|
|
|
|
class TestTriggerSignalPropagation:
|
|
"""Tests for _trigger_signal_propagation."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_records_returns_zero(self, mock_pool, competitive_config):
|
|
"""When no intelligence records exist, returns 0 signals."""
|
|
mock_pool.fetch = AsyncMock(return_value=[])
|
|
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
|
|
assert result == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_skips_zero_impact_records(self, mock_pool, competitive_config):
|
|
"""Records with impact_score <= 0 are skipped."""
|
|
mock_pool.fetch = AsyncMock(return_value=[
|
|
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.0},
|
|
])
|
|
with patch("services.aggregation.main.propagate_signals") as mock_prop:
|
|
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
|
|
assert result == 0
|
|
mock_prop.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_calls_propagate_signals_for_each_record(self, mock_pool, competitive_config):
|
|
"""propagate_signals is called for each valid intelligence record."""
|
|
mock_pool.fetch = AsyncMock(return_value=[
|
|
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.8},
|
|
{"document_id": "doc-2", "catalyst_type": "m_and_a", "impact_score": 0.6},
|
|
])
|
|
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
|
|
mock_prop.return_value = []
|
|
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
|
|
assert mock_prop.call_count == 2
|
|
# Verify correct args for first call
|
|
call_args = mock_prop.call_args_list[0]
|
|
assert call_args.kwargs["ticker"] == "AAPL"
|
|
assert call_args.kwargs["catalyst_type"] == "earnings"
|
|
assert call_args.kwargs["impact_score"] == 0.8
|
|
assert call_args.kwargs["document_id"] == "doc-1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_total_signal_count(self, mock_pool, competitive_config):
|
|
"""Returns the total number of competitive signals produced."""
|
|
mock_pool.fetch = AsyncMock(return_value=[
|
|
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.8},
|
|
{"document_id": "doc-2", "catalyst_type": "m_and_a", "impact_score": 0.6},
|
|
])
|
|
mock_record = MagicMock()
|
|
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
|
|
mock_prop.side_effect = [
|
|
[mock_record, mock_record], # 2 signals from first doc
|
|
[mock_record], # 1 signal from second doc
|
|
]
|
|
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
|
|
assert result == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_consecutive_failure_tracking(self, mock_pool, competitive_config):
|
|
"""After threshold consecutive failures, logs critical alert and stops."""
|
|
import services.aggregation.main as main_mod
|
|
# Reset the global counter
|
|
main_mod._propagation_consecutive_failures = 0
|
|
|
|
cfg = CompetitiveConfig(propagation_failure_threshold=3)
|
|
mock_pool.fetch = AsyncMock(return_value=[
|
|
{"document_id": f"doc-{i}", "catalyst_type": "earnings", "impact_score": 0.8}
|
|
for i in range(5)
|
|
])
|
|
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
|
|
mock_prop.side_effect = RuntimeError("DB connection lost")
|
|
result = await _trigger_signal_propagation(mock_pool, "AAPL", cfg)
|
|
# Should stop after 3 failures (threshold)
|
|
assert mock_prop.call_count == 3
|
|
assert main_mod._propagation_consecutive_failures == 3
|
|
assert result == 0
|
|
|
|
# Reset for other tests
|
|
main_mod._propagation_consecutive_failures = 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_success_resets_failure_counter(self, mock_pool, competitive_config):
|
|
"""A successful propagation resets the consecutive failure counter."""
|
|
import services.aggregation.main as main_mod
|
|
main_mod._propagation_consecutive_failures = 4 # Near threshold
|
|
|
|
mock_pool.fetch = AsyncMock(return_value=[
|
|
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.8},
|
|
])
|
|
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
|
|
mock_prop.return_value = []
|
|
await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
|
|
assert main_mod._propagation_consecutive_failures == 0
|
|
|
|
# Reset for other tests
|
|
main_mod._propagation_consecutive_failures = 0
|