Files
stonks-oracle/tests/test_alerting.py
T

307 lines
11 KiB
Python

"""Tests for operational alerting rules.
Requirements: 12.3
"""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from services.shared.alerting import (
Alert,
AlertState,
check_analytical_lag,
check_broker_issues,
check_schema_failure_spike,
check_source_failures,
evaluate_alerts,
)
from services.shared.config import AlertingConfig
@pytest.fixture
def config():
return AlertingConfig(
source_failure_threshold=3,
source_failure_window_hours=6,
schema_failure_rate_threshold=0.3,
schema_failure_window_hours=1,
lake_lag_threshold_minutes=60,
broker_error_threshold=3,
broker_error_window_hours=1,
check_interval_seconds=120,
)
@pytest.fixture
def state():
return AlertState()
# ---------------------------------------------------------------------------
# AlertState unit tests
# ---------------------------------------------------------------------------
class TestAlertState:
def test_fire_new_alert_returns_true(self, state):
alert = Alert(rule="source_failures", severity="warning", summary="test",
details={"key": "src1"})
assert state.fire(alert) is True
def test_fire_existing_alert_returns_false(self, state):
alert = Alert(rule="source_failures", severity="warning", summary="test",
details={"key": "src1"})
state.fire(alert)
assert state.fire(alert) is False
def test_resolve_active_returns_true(self, state):
alert = Alert(rule="source_failures", severity="warning", summary="test",
details={"key": "src1"})
state.fire(alert)
assert state.resolve("source_failures", "src1") is True
def test_resolve_inactive_returns_false(self, state):
assert state.resolve("source_failures", "src1") is False
def test_is_firing(self, state):
alert = Alert(rule="broker_issues", severity="critical", summary="test",
details={"key": "global"})
assert state.is_firing("broker_issues", "global") is False
state.fire(alert)
assert state.is_firing("broker_issues", "global") is True
def test_multiple_alerts_same_rule_different_keys(self, state):
a1 = Alert(rule="source_failures", severity="warning", summary="s1",
details={"key": "src1"})
a2 = Alert(rule="source_failures", severity="warning", summary="s2",
details={"key": "src2"})
assert state.fire(a1) is True
assert state.fire(a2) is True
assert state.is_firing("source_failures", "src1") is True
assert state.is_firing("source_failures", "src2") is True
state.resolve("source_failures", "src1")
assert state.is_firing("source_failures", "src1") is False
assert state.is_firing("source_failures", "src2") is True
# ---------------------------------------------------------------------------
# check_source_failures
# ---------------------------------------------------------------------------
class TestCheckSourceFailures:
@pytest.mark.asyncio
async def test_returns_alerts_for_failing_sources(self, config):
mock_pool = AsyncMock()
mock_pool.fetch.return_value = [
{
"source_id": "uuid-1",
"consecutive_failures": 3,
"source_type": "news_api",
"source_name": "reuters",
"ticker": "AAPL",
},
]
alerts = await check_source_failures(mock_pool, config)
assert len(alerts) == 1
assert alerts[0].rule == "source_failures"
assert alerts[0].severity == "warning"
assert "AAPL" in alerts[0].summary
assert alerts[0].details["source_id"] == "uuid-1"
@pytest.mark.asyncio
async def test_returns_empty_when_no_failures(self, config):
mock_pool = AsyncMock()
mock_pool.fetch.return_value = []
alerts = await check_source_failures(mock_pool, config)
assert alerts == []
# ---------------------------------------------------------------------------
# check_schema_failure_spike
# ---------------------------------------------------------------------------
class TestCheckSchemaFailureSpike:
@pytest.mark.asyncio
async def test_fires_when_rate_exceeds_threshold(self, config):
mock_pool = AsyncMock()
mock_pool.fetchrow.return_value = {"total": 100, "failed": 40}
alerts = await check_schema_failure_spike(mock_pool, config)
assert len(alerts) == 1
assert alerts[0].rule == "schema_failure_spike"
assert alerts[0].details["failure_rate"] == 0.4
@pytest.mark.asyncio
async def test_critical_severity_above_50_percent(self, config):
mock_pool = AsyncMock()
mock_pool.fetchrow.return_value = {"total": 100, "failed": 60}
alerts = await check_schema_failure_spike(mock_pool, config)
assert len(alerts) == 1
assert alerts[0].severity == "critical"
@pytest.mark.asyncio
async def test_no_alert_below_threshold(self, config):
mock_pool = AsyncMock()
mock_pool.fetchrow.return_value = {"total": 100, "failed": 10}
alerts = await check_schema_failure_spike(mock_pool, config)
assert alerts == []
@pytest.mark.asyncio
async def test_no_alert_when_no_extractions(self, config):
mock_pool = AsyncMock()
mock_pool.fetchrow.return_value = {"total": 0, "failed": 0}
alerts = await check_schema_failure_spike(mock_pool, config)
assert alerts == []
# ---------------------------------------------------------------------------
# check_analytical_lag
# ---------------------------------------------------------------------------
class TestCheckAnalyticalLag:
@pytest.mark.asyncio
async def test_fires_for_stale_tables(self, config):
mock_pool = AsyncMock()
stale_time = datetime(2026, 4, 10, 10, 0, 0, tzinfo=timezone.utc)
mock_pool.fetch.return_value = [
{"table_name": "market_bars", "last_publish": stale_time},
]
alerts = await check_analytical_lag(mock_pool, config)
assert len(alerts) == 1
assert alerts[0].rule == "analytical_lag"
assert "market_bars" in alerts[0].summary
@pytest.mark.asyncio
async def test_no_alert_when_recent(self, config):
mock_pool = AsyncMock()
mock_pool.fetch.return_value = []
alerts = await check_analytical_lag(mock_pool, config)
assert alerts == []
# ---------------------------------------------------------------------------
# check_broker_issues
# ---------------------------------------------------------------------------
class TestCheckBrokerIssues:
@pytest.mark.asyncio
async def test_fires_on_consecutive_errors(self, config):
mock_pool = AsyncMock()
mock_pool.fetch.return_value = [{"error_count": 5}]
alerts = await check_broker_issues(mock_pool, config)
assert len(alerts) == 1
assert alerts[0].rule == "broker_issues"
assert alerts[0].severity == "critical"
@pytest.mark.asyncio
async def test_no_alert_below_threshold(self, config):
mock_pool = AsyncMock()
mock_pool.fetch.return_value = [{"error_count": 1}]
alerts = await check_broker_issues(mock_pool, config)
assert alerts == []
@pytest.mark.asyncio
async def test_no_alert_when_no_events(self, config):
mock_pool = AsyncMock()
mock_pool.fetch.return_value = []
alerts = await check_broker_issues(mock_pool, config)
assert alerts == []
# ---------------------------------------------------------------------------
# evaluate_alerts integration
# ---------------------------------------------------------------------------
class TestEvaluateAlerts:
@pytest.mark.asyncio
async def test_newly_fired_alerts_returned(self, config, state):
mock_pool = AsyncMock()
with patch("services.shared.alerting.check_source_failures") as mock_src, \
patch("services.shared.alerting.check_schema_failure_spike") as mock_schema, \
patch("services.shared.alerting.check_analytical_lag") as mock_lag, \
patch("services.shared.alerting.check_broker_issues") as mock_broker:
mock_src.return_value = [
Alert(rule="source_failures", severity="warning",
summary="src fail", details={"key": "s1"}),
]
mock_schema.return_value = []
mock_lag.return_value = []
mock_broker.return_value = []
fired = await evaluate_alerts(mock_pool, config, state)
assert len(fired) == 1
assert fired[0].rule == "source_failures"
assert state.is_firing("source_failures", "s1")
@pytest.mark.asyncio
async def test_repeated_alert_not_returned_again(self, config, state):
mock_pool = AsyncMock()
alert = Alert(rule="broker_issues", severity="critical",
summary="broker down", details={"key": "global"})
with patch("services.shared.alerting.check_source_failures", return_value=[]), \
patch("services.shared.alerting.check_schema_failure_spike", return_value=[]), \
patch("services.shared.alerting.check_analytical_lag", return_value=[]), \
patch("services.shared.alerting.check_broker_issues", return_value=[alert]):
fired1 = await evaluate_alerts(mock_pool, config, state)
assert len(fired1) == 1
fired2 = await evaluate_alerts(mock_pool, config, state)
assert len(fired2) == 0
@pytest.mark.asyncio
async def test_resolved_alert_clears_state(self, config, state):
mock_pool = AsyncMock()
alert = Alert(rule="broker_issues", severity="critical",
summary="broker down", details={"key": "global"})
with patch("services.shared.alerting.check_source_failures", return_value=[]), \
patch("services.shared.alerting.check_schema_failure_spike", return_value=[]), \
patch("services.shared.alerting.check_analytical_lag", return_value=[]), \
patch("services.shared.alerting.check_broker_issues") as mock_broker:
# Fire
mock_broker.return_value = [alert]
await evaluate_alerts(mock_pool, config, state)
assert state.is_firing("broker_issues", "global")
# Resolve
mock_broker.return_value = []
await evaluate_alerts(mock_pool, config, state)
assert not state.is_firing("broker_issues", "global")
@pytest.mark.asyncio
async def test_rule_exception_does_not_crash(self, config, state):
mock_pool = AsyncMock()
with patch("services.shared.alerting.check_source_failures",
side_effect=Exception("db down")), \
patch("services.shared.alerting.check_schema_failure_spike", return_value=[]), \
patch("services.shared.alerting.check_analytical_lag", return_value=[]), \
patch("services.shared.alerting.check_broker_issues", return_value=[]):
# Should not raise
fired = await evaluate_alerts(mock_pool, config, state)
assert fired == []