Files
stonks-oracle/services/shared/alerting.py
T

343 lines
11 KiB
Python

"""Operational alerting for Stonks Oracle pipeline health.
Evaluates alert rules against PostgreSQL operational state and emits
structured log events and Prometheus metrics when thresholds are breached.
Alert rules:
- source_failures: sustained source retrieval failures per source
- schema_failure_spike: extraction validation failure rate exceeds threshold
- analytical_lag: lake publication has not completed within threshold
- broker_issues: consecutive broker submission errors
Requirements: 12.3
Design: Section 12 (Observability and Operations)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
import asyncpg
from services.shared.config import AlertingConfig
from services.shared.metrics import (
ALERT_ACTIVE,
ALERT_CHECK_DURATION,
ALERTS_FIRED,
ALERTS_RESOLVED,
)
logger = logging.getLogger("alerting")
@dataclass
class Alert:
"""A single alert instance."""
rule: str
severity: str # "warning" | "critical"
summary: str
details: dict[str, Any] = field(default_factory=dict)
fired_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
@dataclass
class AlertState:
"""Tracks which rules are currently firing to detect transitions."""
active: dict[str, Alert] = field(default_factory=dict)
def fire(self, alert: Alert) -> bool:
"""Record an alert firing. Returns True if this is a new firing."""
key = f"{alert.rule}:{alert.details.get('key', '')}"
is_new = key not in self.active
self.active[key] = alert
return is_new
def resolve(self, rule: str, key: str = "") -> bool:
"""Resolve an alert. Returns True if it was previously active."""
full_key = f"{rule}:{key}"
if full_key in self.active:
del self.active[full_key]
return True
return False
def is_firing(self, rule: str, key: str = "") -> bool:
return f"{rule}:{key}" in self.active
async def check_source_failures(
pool: asyncpg.Pool,
config: AlertingConfig,
) -> list[Alert]:
"""Check for sources with sustained consecutive failures.
Queries ingestion_runs for sources where the last N runs all failed
within the lookback window.
"""
rows = await pool.fetch(
"""WITH recent_runs AS (
SELECT source_id, status,
ROW_NUMBER() OVER (PARTITION BY source_id ORDER BY started_at DESC) AS rn
FROM ingestion_runs
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1
),
failure_streaks AS (
SELECT source_id,
COUNT(*) FILTER (WHERE status = 'failed') AS consecutive_failures,
COUNT(*) AS total_runs
FROM recent_runs
WHERE rn <= $2
GROUP BY source_id
HAVING COUNT(*) FILTER (WHERE status = 'failed') = COUNT(*)
AND COUNT(*) >= $2
)
SELECT fs.source_id, fs.consecutive_failures,
s.source_type, s.source_name, c.ticker
FROM failure_streaks fs
JOIN sources s ON s.id = fs.source_id
JOIN companies c ON c.id = s.company_id""",
config.source_failure_window_hours,
config.source_failure_threshold,
)
alerts = []
for row in rows:
alerts.append(Alert(
rule="source_failures",
severity="warning",
summary=(
f"Source {row['source_name']} ({row['source_type']}) for "
f"{row['ticker']} has {row['consecutive_failures']} consecutive failures"
),
details={
"key": str(row["source_id"]),
"source_id": str(row["source_id"]),
"source_type": row["source_type"],
"source_name": row["source_name"],
"ticker": row["ticker"],
"consecutive_failures": row["consecutive_failures"],
},
))
return alerts
async def check_schema_failure_spike(
pool: asyncpg.Pool,
config: AlertingConfig,
) -> list[Alert]:
"""Check if extraction schema validation failure rate exceeds threshold.
Queries model_performance_metrics for the recent window and computes
the failure rate.
"""
row = await pool.fetchrow(
"""SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE NOT success) AS failed
FROM model_performance_metrics
WHERE recorded_at >= NOW() - INTERVAL '1 hour' * $1""",
config.schema_failure_window_hours,
)
if not row or row["total"] == 0:
return []
total = row["total"]
failed = row["failed"]
failure_rate = failed / total
if failure_rate >= config.schema_failure_rate_threshold:
return [Alert(
rule="schema_failure_spike",
severity="critical" if failure_rate >= 0.5 else "warning",
summary=(
f"Extraction schema failure rate is {failure_rate:.1%} "
f"({failed}/{total}) in the last {config.schema_failure_window_hours}h"
),
details={
"key": "global",
"total_extractions": total,
"failed_extractions": failed,
"failure_rate": round(failure_rate, 4),
"threshold": config.schema_failure_rate_threshold,
"window_hours": config.schema_failure_window_hours,
},
)]
return []
async def check_analytical_lag(
pool: asyncpg.Pool,
config: AlertingConfig,
) -> list[Alert]:
"""Check if lake publication is lagging beyond threshold.
Looks at the audit_events table for the most recent successful
lake_publish events per table, and alerts if any are stale.
"""
rows = await pool.fetch(
"""SELECT
details->>'table_name' AS table_name,
MAX(created_at) AS last_publish
FROM audit_events
WHERE event_type = 'lake_publish'
AND details->>'status' = 'success'
AND details->>'table_name' IS NOT NULL
GROUP BY details->>'table_name'
HAVING MAX(created_at) < NOW() - INTERVAL '1 minute' * $1""",
config.lake_lag_threshold_minutes,
)
alerts = []
now = datetime.now(timezone.utc)
for row in rows:
table_name = row["table_name"]
last_publish = row["last_publish"]
if last_publish.tzinfo is None:
last_publish = last_publish.replace(tzinfo=timezone.utc)
lag_minutes = (now - last_publish).total_seconds() / 60
alerts.append(Alert(
rule="analytical_lag",
severity="warning",
summary=(
f"Lake table '{table_name}' last published {lag_minutes:.0f}m ago "
f"(threshold: {config.lake_lag_threshold_minutes}m)"
),
details={
"key": table_name,
"table_name": table_name,
"last_publish": last_publish.isoformat(),
"lag_minutes": round(lag_minutes, 1),
"threshold_minutes": config.lake_lag_threshold_minutes,
},
))
return alerts
async def check_broker_issues(
pool: asyncpg.Pool,
config: AlertingConfig,
) -> list[Alert]:
"""Check for consecutive broker submission errors.
Queries order_events for recent broker-level errors (rejections,
timeouts, connection failures) within the lookback window.
"""
rows = await pool.fetch(
"""WITH recent_events AS (
SELECT order_id, event_type, created_at,
ROW_NUMBER() OVER (ORDER BY created_at DESC) AS rn
FROM order_events
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
AND event_type IN ('broker_error', 'broker_timeout', 'connection_failed')
)
SELECT COUNT(*) AS error_count
FROM recent_events
WHERE rn <= $2""",
config.broker_error_window_hours,
config.broker_error_threshold,
)
if not rows:
return []
error_count = rows[0]["error_count"]
if error_count >= config.broker_error_threshold:
return [Alert(
rule="broker_issues",
severity="critical",
summary=(
f"{error_count} broker errors in the last "
f"{config.broker_error_window_hours}h"
),
details={
"key": "global",
"error_count": error_count,
"threshold": config.broker_error_threshold,
"window_hours": config.broker_error_window_hours,
},
)]
return []
async def evaluate_alerts(
pool: asyncpg.Pool,
config: AlertingConfig,
state: AlertState,
) -> list[Alert]:
"""Run all alert rules and return newly fired alerts.
Updates AlertState to track firing/resolved transitions and emits
structured log events and Prometheus metrics for each transition.
"""
all_alerts: list[Alert] = []
with ALERT_CHECK_DURATION.time():
# Collect alerts from all rules
try:
all_alerts.extend(await check_source_failures(pool, config))
except Exception:
logger.exception("Error checking source failures")
try:
all_alerts.extend(await check_schema_failure_spike(pool, config))
except Exception:
logger.exception("Error checking schema failure spike")
try:
all_alerts.extend(await check_analytical_lag(pool, config))
except Exception:
logger.exception("Error checking analytical lag")
try:
all_alerts.extend(await check_broker_issues(pool, config))
except Exception:
logger.exception("Error checking broker issues")
# Track which rule+key combos are currently firing
current_keys: set[str] = set()
newly_fired: list[Alert] = []
for alert in all_alerts:
key = f"{alert.rule}:{alert.details.get('key', '')}"
current_keys.add(key)
if state.fire(alert):
# New alert firing
ALERTS_FIRED.labels(rule=alert.rule, severity=alert.severity).inc()
ALERT_ACTIVE.labels(rule=alert.rule).set(1)
newly_fired.append(alert)
logger.warning(
"ALERT FIRING: [%s] %s",
alert.rule,
alert.summary,
extra={
"alert_rule": alert.rule,
"alert_severity": alert.severity,
"alert_details": alert.details,
},
)
# Check for resolved alerts
resolved_keys = set(state.active.keys()) - current_keys
for key in resolved_keys:
rule = key.split(":")[0]
detail_key = key[len(rule) + 1:]
if state.resolve(rule, detail_key):
ALERTS_RESOLVED.labels(rule=rule).inc()
# Only set gauge to 0 if no more alerts for this rule
still_firing = any(k.startswith(f"{rule}:") for k in state.active)
if not still_firing:
ALERT_ACTIVE.labels(rule=rule).set(0)
logger.info(
"ALERT RESOLVED: [%s] key=%s",
rule,
detail_key,
)
return newly_fired