Files
stonks-oracle/tests/test_aggregation_main.py
T
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

125 lines
5.5 KiB
Python

"""Tests for the aggregation main loop signal propagation wiring.
Validates:
- Signal propagation is triggered after aggregation when competitive layer is enabled
- Consecutive failure tracking and operator alerting (Requirement 9.4)
- Propagation is skipped when competitive layer is disabled
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from services.aggregation.main import _trigger_signal_propagation
from services.shared.config import CompetitiveConfig
@pytest.fixture
def competitive_config():
return CompetitiveConfig(
propagation_failure_threshold=5,
)
@pytest.fixture
def mock_pool():
pool = AsyncMock()
return pool
class TestTriggerSignalPropagation:
"""Tests for _trigger_signal_propagation."""
@pytest.mark.asyncio
async def test_no_records_returns_zero(self, mock_pool, competitive_config):
"""When no intelligence records exist, returns 0 signals."""
mock_pool.fetch = AsyncMock(return_value=[])
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
assert result == 0
@pytest.mark.asyncio
async def test_skips_zero_impact_records(self, mock_pool, competitive_config):
"""Records with impact_score <= 0 are skipped."""
mock_pool.fetch = AsyncMock(return_value=[
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.0},
])
with patch("services.aggregation.main.propagate_signals") as mock_prop:
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
assert result == 0
mock_prop.assert_not_called()
@pytest.mark.asyncio
async def test_calls_propagate_signals_for_each_record(self, mock_pool, competitive_config):
"""propagate_signals is called for each valid intelligence record."""
mock_pool.fetch = AsyncMock(return_value=[
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.8},
{"document_id": "doc-2", "catalyst_type": "m_and_a", "impact_score": 0.6},
])
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
mock_prop.return_value = []
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
assert mock_prop.call_count == 2
# Verify correct args for first call
call_args = mock_prop.call_args_list[0]
assert call_args.kwargs["ticker"] == "AAPL"
assert call_args.kwargs["catalyst_type"] == "earnings"
assert call_args.kwargs["impact_score"] == 0.8
assert call_args.kwargs["document_id"] == "doc-1"
@pytest.mark.asyncio
async def test_returns_total_signal_count(self, mock_pool, competitive_config):
"""Returns the total number of competitive signals produced."""
mock_pool.fetch = AsyncMock(return_value=[
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.8},
{"document_id": "doc-2", "catalyst_type": "m_and_a", "impact_score": 0.6},
])
mock_record = MagicMock()
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
mock_prop.side_effect = [
[mock_record, mock_record], # 2 signals from first doc
[mock_record], # 1 signal from second doc
]
result = await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
assert result == 3
@pytest.mark.asyncio
async def test_consecutive_failure_tracking(self, mock_pool, competitive_config):
"""After threshold consecutive failures, logs critical alert and stops."""
import services.aggregation.main as main_mod
# Reset the global counter
main_mod._propagation_consecutive_failures = 0
cfg = CompetitiveConfig(propagation_failure_threshold=3)
mock_pool.fetch = AsyncMock(return_value=[
{"document_id": f"doc-{i}", "catalyst_type": "earnings", "impact_score": 0.8}
for i in range(5)
])
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
mock_prop.side_effect = RuntimeError("DB connection lost")
result = await _trigger_signal_propagation(mock_pool, "AAPL", cfg)
# Should stop after 3 failures (threshold)
assert mock_prop.call_count == 3
assert main_mod._propagation_consecutive_failures == 3
assert result == 0
# Reset for other tests
main_mod._propagation_consecutive_failures = 0
@pytest.mark.asyncio
async def test_success_resets_failure_counter(self, mock_pool, competitive_config):
"""A successful propagation resets the consecutive failure counter."""
import services.aggregation.main as main_mod
main_mod._propagation_consecutive_failures = 4 # Near threshold
mock_pool.fetch = AsyncMock(return_value=[
{"document_id": "doc-1", "catalyst_type": "earnings", "impact_score": 0.8},
])
with patch("services.aggregation.main.propagate_signals", new_callable=AsyncMock) as mock_prop:
mock_prop.return_value = []
await _trigger_signal_propagation(mock_pool, "AAPL", competitive_config)
assert main_mod._propagation_consecutive_failures == 0
# Reset for other tests
main_mod._propagation_consecutive_failures = 0