"""Tests for operational alerting rules. Requirements: 12.3 """ from __future__ import annotations from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import pytest from services.shared.alerting import ( Alert, AlertState, check_analytical_lag, check_broker_issues, check_schema_failure_spike, check_source_failures, evaluate_alerts, ) from services.shared.config import AlertingConfig @pytest.fixture def config(): return AlertingConfig( source_failure_threshold=3, source_failure_window_hours=6, schema_failure_rate_threshold=0.3, schema_failure_window_hours=1, lake_lag_threshold_minutes=60, broker_error_threshold=3, broker_error_window_hours=1, check_interval_seconds=120, ) @pytest.fixture def state(): return AlertState() # --------------------------------------------------------------------------- # AlertState unit tests # --------------------------------------------------------------------------- class TestAlertState: def test_fire_new_alert_returns_true(self, state): alert = Alert(rule="source_failures", severity="warning", summary="test", details={"key": "src1"}) assert state.fire(alert) is True def test_fire_existing_alert_returns_false(self, state): alert = Alert(rule="source_failures", severity="warning", summary="test", details={"key": "src1"}) state.fire(alert) assert state.fire(alert) is False def test_resolve_active_returns_true(self, state): alert = Alert(rule="source_failures", severity="warning", summary="test", details={"key": "src1"}) state.fire(alert) assert state.resolve("source_failures", "src1") is True def test_resolve_inactive_returns_false(self, state): assert state.resolve("source_failures", "src1") is False def test_is_firing(self, state): alert = Alert(rule="broker_issues", severity="critical", summary="test", details={"key": "global"}) assert state.is_firing("broker_issues", "global") is False state.fire(alert) assert state.is_firing("broker_issues", "global") is True def test_multiple_alerts_same_rule_different_keys(self, state): a1 = Alert(rule="source_failures", severity="warning", summary="s1", details={"key": "src1"}) a2 = Alert(rule="source_failures", severity="warning", summary="s2", details={"key": "src2"}) assert state.fire(a1) is True assert state.fire(a2) is True assert state.is_firing("source_failures", "src1") is True assert state.is_firing("source_failures", "src2") is True state.resolve("source_failures", "src1") assert state.is_firing("source_failures", "src1") is False assert state.is_firing("source_failures", "src2") is True # --------------------------------------------------------------------------- # check_source_failures # --------------------------------------------------------------------------- class TestCheckSourceFailures: @pytest.mark.asyncio async def test_returns_alerts_for_failing_sources(self, config): mock_pool = AsyncMock() mock_pool.fetch.return_value = [ { "source_id": "uuid-1", "consecutive_failures": 3, "source_type": "news_api", "source_name": "reuters", "ticker": "AAPL", }, ] alerts = await check_source_failures(mock_pool, config) assert len(alerts) == 1 assert alerts[0].rule == "source_failures" assert alerts[0].severity == "warning" assert "AAPL" in alerts[0].summary assert alerts[0].details["source_id"] == "uuid-1" @pytest.mark.asyncio async def test_returns_empty_when_no_failures(self, config): mock_pool = AsyncMock() mock_pool.fetch.return_value = [] alerts = await check_source_failures(mock_pool, config) assert alerts == [] # --------------------------------------------------------------------------- # check_schema_failure_spike # --------------------------------------------------------------------------- class TestCheckSchemaFailureSpike: @pytest.mark.asyncio async def test_fires_when_rate_exceeds_threshold(self, config): mock_pool = AsyncMock() mock_pool.fetchrow.return_value = {"total": 100, "failed": 40} alerts = await check_schema_failure_spike(mock_pool, config) assert len(alerts) == 1 assert alerts[0].rule == "schema_failure_spike" assert alerts[0].details["failure_rate"] == 0.4 @pytest.mark.asyncio async def test_critical_severity_above_50_percent(self, config): mock_pool = AsyncMock() mock_pool.fetchrow.return_value = {"total": 100, "failed": 60} alerts = await check_schema_failure_spike(mock_pool, config) assert len(alerts) == 1 assert alerts[0].severity == "critical" @pytest.mark.asyncio async def test_no_alert_below_threshold(self, config): mock_pool = AsyncMock() mock_pool.fetchrow.return_value = {"total": 100, "failed": 10} alerts = await check_schema_failure_spike(mock_pool, config) assert alerts == [] @pytest.mark.asyncio async def test_no_alert_when_no_extractions(self, config): mock_pool = AsyncMock() mock_pool.fetchrow.return_value = {"total": 0, "failed": 0} alerts = await check_schema_failure_spike(mock_pool, config) assert alerts == [] # --------------------------------------------------------------------------- # check_analytical_lag # --------------------------------------------------------------------------- class TestCheckAnalyticalLag: @pytest.mark.asyncio async def test_fires_for_stale_tables(self, config): mock_pool = AsyncMock() stale_time = datetime(2026, 4, 10, 10, 0, 0, tzinfo=timezone.utc) mock_pool.fetch.return_value = [ {"table_name": "market_bars", "last_publish": stale_time}, ] alerts = await check_analytical_lag(mock_pool, config) assert len(alerts) == 1 assert alerts[0].rule == "analytical_lag" assert "market_bars" in alerts[0].summary @pytest.mark.asyncio async def test_no_alert_when_recent(self, config): mock_pool = AsyncMock() mock_pool.fetch.return_value = [] alerts = await check_analytical_lag(mock_pool, config) assert alerts == [] # --------------------------------------------------------------------------- # check_broker_issues # --------------------------------------------------------------------------- class TestCheckBrokerIssues: @pytest.mark.asyncio async def test_fires_on_consecutive_errors(self, config): mock_pool = AsyncMock() mock_pool.fetch.return_value = [{"error_count": 5}] alerts = await check_broker_issues(mock_pool, config) assert len(alerts) == 1 assert alerts[0].rule == "broker_issues" assert alerts[0].severity == "critical" @pytest.mark.asyncio async def test_no_alert_below_threshold(self, config): mock_pool = AsyncMock() mock_pool.fetch.return_value = [{"error_count": 1}] alerts = await check_broker_issues(mock_pool, config) assert alerts == [] @pytest.mark.asyncio async def test_no_alert_when_no_events(self, config): mock_pool = AsyncMock() mock_pool.fetch.return_value = [] alerts = await check_broker_issues(mock_pool, config) assert alerts == [] # --------------------------------------------------------------------------- # evaluate_alerts integration # --------------------------------------------------------------------------- class TestEvaluateAlerts: @pytest.mark.asyncio async def test_newly_fired_alerts_returned(self, config, state): mock_pool = AsyncMock() with patch("services.shared.alerting.check_source_failures") as mock_src, \ patch("services.shared.alerting.check_schema_failure_spike") as mock_schema, \ patch("services.shared.alerting.check_analytical_lag") as mock_lag, \ patch("services.shared.alerting.check_broker_issues") as mock_broker: mock_src.return_value = [ Alert(rule="source_failures", severity="warning", summary="src fail", details={"key": "s1"}), ] mock_schema.return_value = [] mock_lag.return_value = [] mock_broker.return_value = [] fired = await evaluate_alerts(mock_pool, config, state) assert len(fired) == 1 assert fired[0].rule == "source_failures" assert state.is_firing("source_failures", "s1") @pytest.mark.asyncio async def test_repeated_alert_not_returned_again(self, config, state): mock_pool = AsyncMock() alert = Alert(rule="broker_issues", severity="critical", summary="broker down", details={"key": "global"}) with patch("services.shared.alerting.check_source_failures", return_value=[]), \ patch("services.shared.alerting.check_schema_failure_spike", return_value=[]), \ patch("services.shared.alerting.check_analytical_lag", return_value=[]), \ patch("services.shared.alerting.check_broker_issues", return_value=[alert]): fired1 = await evaluate_alerts(mock_pool, config, state) assert len(fired1) == 1 fired2 = await evaluate_alerts(mock_pool, config, state) assert len(fired2) == 0 @pytest.mark.asyncio async def test_resolved_alert_clears_state(self, config, state): mock_pool = AsyncMock() alert = Alert(rule="broker_issues", severity="critical", summary="broker down", details={"key": "global"}) with patch("services.shared.alerting.check_source_failures", return_value=[]), \ patch("services.shared.alerting.check_schema_failure_spike", return_value=[]), \ patch("services.shared.alerting.check_analytical_lag", return_value=[]), \ patch("services.shared.alerting.check_broker_issues") as mock_broker: # Fire mock_broker.return_value = [alert] await evaluate_alerts(mock_pool, config, state) assert state.is_firing("broker_issues", "global") # Resolve mock_broker.return_value = [] await evaluate_alerts(mock_pool, config, state) assert not state.is_firing("broker_issues", "global") @pytest.mark.asyncio async def test_rule_exception_does_not_crash(self, config, state): mock_pool = AsyncMock() with patch("services.shared.alerting.check_source_failures", side_effect=Exception("db down")), \ patch("services.shared.alerting.check_schema_failure_spike", return_value=[]), \ patch("services.shared.alerting.check_analytical_lag", return_value=[]), \ patch("services.shared.alerting.check_broker_issues", return_value=[]): # Should not raise fired = await evaluate_alerts(mock_pool, config, state) assert fired == []