343 lines
11 KiB
Python
343 lines
11 KiB
Python
"""Operational alerting for Stonks Oracle pipeline health.
|
|
|
|
Evaluates alert rules against PostgreSQL operational state and emits
|
|
structured log events and Prometheus metrics when thresholds are breached.
|
|
|
|
Alert rules:
|
|
- source_failures: sustained source retrieval failures per source
|
|
- schema_failure_spike: extraction validation failure rate exceeds threshold
|
|
- analytical_lag: lake publication has not completed within threshold
|
|
- broker_issues: consecutive broker submission errors
|
|
|
|
Requirements: 12.3
|
|
Design: Section 12 (Observability and Operations)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
|
|
from services.shared.config import AlertingConfig
|
|
from services.shared.metrics import (
|
|
ALERT_ACTIVE,
|
|
ALERT_CHECK_DURATION,
|
|
ALERTS_FIRED,
|
|
ALERTS_RESOLVED,
|
|
)
|
|
|
|
logger = logging.getLogger("alerting")
|
|
|
|
|
|
@dataclass
|
|
class Alert:
|
|
"""A single alert instance."""
|
|
|
|
rule: str
|
|
severity: str # "warning" | "critical"
|
|
summary: str
|
|
details: dict[str, Any] = field(default_factory=dict)
|
|
fired_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
|
|
|
|
@dataclass
|
|
class AlertState:
|
|
"""Tracks which rules are currently firing to detect transitions."""
|
|
|
|
active: dict[str, Alert] = field(default_factory=dict)
|
|
|
|
def fire(self, alert: Alert) -> bool:
|
|
"""Record an alert firing. Returns True if this is a new firing."""
|
|
key = f"{alert.rule}:{alert.details.get('key', '')}"
|
|
is_new = key not in self.active
|
|
self.active[key] = alert
|
|
return is_new
|
|
|
|
def resolve(self, rule: str, key: str = "") -> bool:
|
|
"""Resolve an alert. Returns True if it was previously active."""
|
|
full_key = f"{rule}:{key}"
|
|
if full_key in self.active:
|
|
del self.active[full_key]
|
|
return True
|
|
return False
|
|
|
|
def is_firing(self, rule: str, key: str = "") -> bool:
|
|
return f"{rule}:{key}" in self.active
|
|
|
|
|
|
async def check_source_failures(
|
|
pool: asyncpg.Pool,
|
|
config: AlertingConfig,
|
|
) -> list[Alert]:
|
|
"""Check for sources with sustained consecutive failures.
|
|
|
|
Queries ingestion_runs for sources where the last N runs all failed
|
|
within the lookback window.
|
|
"""
|
|
rows = await pool.fetch(
|
|
"""WITH recent_runs AS (
|
|
SELECT source_id, status,
|
|
ROW_NUMBER() OVER (PARTITION BY source_id ORDER BY started_at DESC) AS rn
|
|
FROM ingestion_runs
|
|
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1
|
|
),
|
|
failure_streaks AS (
|
|
SELECT source_id,
|
|
COUNT(*) FILTER (WHERE status = 'failed') AS consecutive_failures,
|
|
COUNT(*) AS total_runs
|
|
FROM recent_runs
|
|
WHERE rn <= $2
|
|
GROUP BY source_id
|
|
HAVING COUNT(*) FILTER (WHERE status = 'failed') = COUNT(*)
|
|
AND COUNT(*) >= $2
|
|
)
|
|
SELECT fs.source_id, fs.consecutive_failures,
|
|
s.source_type, s.source_name, c.ticker
|
|
FROM failure_streaks fs
|
|
JOIN sources s ON s.id = fs.source_id
|
|
JOIN companies c ON c.id = s.company_id""",
|
|
config.source_failure_window_hours,
|
|
config.source_failure_threshold,
|
|
)
|
|
|
|
alerts = []
|
|
for row in rows:
|
|
alerts.append(Alert(
|
|
rule="source_failures",
|
|
severity="warning",
|
|
summary=(
|
|
f"Source {row['source_name']} ({row['source_type']}) for "
|
|
f"{row['ticker']} has {row['consecutive_failures']} consecutive failures"
|
|
),
|
|
details={
|
|
"key": str(row["source_id"]),
|
|
"source_id": str(row["source_id"]),
|
|
"source_type": row["source_type"],
|
|
"source_name": row["source_name"],
|
|
"ticker": row["ticker"],
|
|
"consecutive_failures": row["consecutive_failures"],
|
|
},
|
|
))
|
|
return alerts
|
|
|
|
|
|
async def check_schema_failure_spike(
|
|
pool: asyncpg.Pool,
|
|
config: AlertingConfig,
|
|
) -> list[Alert]:
|
|
"""Check if extraction schema validation failure rate exceeds threshold.
|
|
|
|
Queries model_performance_metrics for the recent window and computes
|
|
the failure rate.
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT
|
|
COUNT(*) AS total,
|
|
COUNT(*) FILTER (WHERE NOT success) AS failed
|
|
FROM model_performance_metrics
|
|
WHERE recorded_at >= NOW() - INTERVAL '1 hour' * $1""",
|
|
config.schema_failure_window_hours,
|
|
)
|
|
|
|
if not row or row["total"] == 0:
|
|
return []
|
|
|
|
total = row["total"]
|
|
failed = row["failed"]
|
|
failure_rate = failed / total
|
|
|
|
if failure_rate >= config.schema_failure_rate_threshold:
|
|
return [Alert(
|
|
rule="schema_failure_spike",
|
|
severity="critical" if failure_rate >= 0.5 else "warning",
|
|
summary=(
|
|
f"Extraction schema failure rate is {failure_rate:.1%} "
|
|
f"({failed}/{total}) in the last {config.schema_failure_window_hours}h"
|
|
),
|
|
details={
|
|
"key": "global",
|
|
"total_extractions": total,
|
|
"failed_extractions": failed,
|
|
"failure_rate": round(failure_rate, 4),
|
|
"threshold": config.schema_failure_rate_threshold,
|
|
"window_hours": config.schema_failure_window_hours,
|
|
},
|
|
)]
|
|
return []
|
|
|
|
|
|
async def check_analytical_lag(
|
|
pool: asyncpg.Pool,
|
|
config: AlertingConfig,
|
|
) -> list[Alert]:
|
|
"""Check if lake publication is lagging beyond threshold.
|
|
|
|
Looks at the audit_events table for the most recent successful
|
|
lake_publish events per table, and alerts if any are stale.
|
|
"""
|
|
rows = await pool.fetch(
|
|
"""SELECT
|
|
details->>'table_name' AS table_name,
|
|
MAX(created_at) AS last_publish
|
|
FROM audit_events
|
|
WHERE event_type = 'lake_publish'
|
|
AND details->>'status' = 'success'
|
|
AND details->>'table_name' IS NOT NULL
|
|
GROUP BY details->>'table_name'
|
|
HAVING MAX(created_at) < NOW() - INTERVAL '1 minute' * $1""",
|
|
config.lake_lag_threshold_minutes,
|
|
)
|
|
|
|
alerts = []
|
|
now = datetime.now(timezone.utc)
|
|
for row in rows:
|
|
table_name = row["table_name"]
|
|
last_publish = row["last_publish"]
|
|
if last_publish.tzinfo is None:
|
|
last_publish = last_publish.replace(tzinfo=timezone.utc)
|
|
lag_minutes = (now - last_publish).total_seconds() / 60
|
|
|
|
alerts.append(Alert(
|
|
rule="analytical_lag",
|
|
severity="warning",
|
|
summary=(
|
|
f"Lake table '{table_name}' last published {lag_minutes:.0f}m ago "
|
|
f"(threshold: {config.lake_lag_threshold_minutes}m)"
|
|
),
|
|
details={
|
|
"key": table_name,
|
|
"table_name": table_name,
|
|
"last_publish": last_publish.isoformat(),
|
|
"lag_minutes": round(lag_minutes, 1),
|
|
"threshold_minutes": config.lake_lag_threshold_minutes,
|
|
},
|
|
))
|
|
return alerts
|
|
|
|
|
|
async def check_broker_issues(
|
|
pool: asyncpg.Pool,
|
|
config: AlertingConfig,
|
|
) -> list[Alert]:
|
|
"""Check for consecutive broker submission errors.
|
|
|
|
Queries order_events for recent broker-level errors (rejections,
|
|
timeouts, connection failures) within the lookback window.
|
|
"""
|
|
rows = await pool.fetch(
|
|
"""WITH recent_events AS (
|
|
SELECT order_id, event_type, created_at,
|
|
ROW_NUMBER() OVER (ORDER BY created_at DESC) AS rn
|
|
FROM order_events
|
|
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
|
|
AND event_type IN ('broker_error', 'broker_timeout', 'connection_failed')
|
|
)
|
|
SELECT COUNT(*) AS error_count
|
|
FROM recent_events
|
|
WHERE rn <= $2""",
|
|
config.broker_error_window_hours,
|
|
config.broker_error_threshold,
|
|
)
|
|
|
|
if not rows:
|
|
return []
|
|
|
|
error_count = rows[0]["error_count"]
|
|
if error_count >= config.broker_error_threshold:
|
|
return [Alert(
|
|
rule="broker_issues",
|
|
severity="critical",
|
|
summary=(
|
|
f"{error_count} broker errors in the last "
|
|
f"{config.broker_error_window_hours}h"
|
|
),
|
|
details={
|
|
"key": "global",
|
|
"error_count": error_count,
|
|
"threshold": config.broker_error_threshold,
|
|
"window_hours": config.broker_error_window_hours,
|
|
},
|
|
)]
|
|
return []
|
|
|
|
|
|
async def evaluate_alerts(
|
|
pool: asyncpg.Pool,
|
|
config: AlertingConfig,
|
|
state: AlertState,
|
|
) -> list[Alert]:
|
|
"""Run all alert rules and return newly fired alerts.
|
|
|
|
Updates AlertState to track firing/resolved transitions and emits
|
|
structured log events and Prometheus metrics for each transition.
|
|
"""
|
|
all_alerts: list[Alert] = []
|
|
|
|
with ALERT_CHECK_DURATION.time():
|
|
# Collect alerts from all rules
|
|
try:
|
|
all_alerts.extend(await check_source_failures(pool, config))
|
|
except Exception:
|
|
logger.exception("Error checking source failures")
|
|
|
|
try:
|
|
all_alerts.extend(await check_schema_failure_spike(pool, config))
|
|
except Exception:
|
|
logger.exception("Error checking schema failure spike")
|
|
|
|
try:
|
|
all_alerts.extend(await check_analytical_lag(pool, config))
|
|
except Exception:
|
|
logger.exception("Error checking analytical lag")
|
|
|
|
try:
|
|
all_alerts.extend(await check_broker_issues(pool, config))
|
|
except Exception:
|
|
logger.exception("Error checking broker issues")
|
|
|
|
# Track which rule+key combos are currently firing
|
|
current_keys: set[str] = set()
|
|
newly_fired: list[Alert] = []
|
|
|
|
for alert in all_alerts:
|
|
key = f"{alert.rule}:{alert.details.get('key', '')}"
|
|
current_keys.add(key)
|
|
|
|
if state.fire(alert):
|
|
# New alert firing
|
|
ALERTS_FIRED.labels(rule=alert.rule, severity=alert.severity).inc()
|
|
ALERT_ACTIVE.labels(rule=alert.rule).set(1)
|
|
newly_fired.append(alert)
|
|
logger.warning(
|
|
"ALERT FIRING: [%s] %s",
|
|
alert.rule,
|
|
alert.summary,
|
|
extra={
|
|
"alert_rule": alert.rule,
|
|
"alert_severity": alert.severity,
|
|
"alert_details": alert.details,
|
|
},
|
|
)
|
|
|
|
# Check for resolved alerts
|
|
resolved_keys = set(state.active.keys()) - current_keys
|
|
for key in resolved_keys:
|
|
rule = key.split(":")[0]
|
|
detail_key = key[len(rule) + 1:]
|
|
if state.resolve(rule, detail_key):
|
|
ALERTS_RESOLVED.labels(rule=rule).inc()
|
|
# Only set gauge to 0 if no more alerts for this rule
|
|
still_firing = any(k.startswith(f"{rule}:") for k in state.active)
|
|
if not still_firing:
|
|
ALERT_ACTIVE.labels(rule=rule).set(0)
|
|
logger.info(
|
|
"ALERT RESOLVED: [%s] key=%s",
|
|
rule,
|
|
detail_key,
|
|
)
|
|
|
|
return newly_fired
|