"""Operational alerting for Stonks Oracle pipeline health. Evaluates alert rules against PostgreSQL operational state and emits structured log events and Prometheus metrics when thresholds are breached. Alert rules: - source_failures: sustained source retrieval failures per source - schema_failure_spike: extraction validation failure rate exceeds threshold - analytical_lag: lake publication has not completed within threshold - broker_issues: consecutive broker submission errors Requirements: 12.3 Design: Section 12 (Observability and Operations) """ from __future__ import annotations import logging from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any import asyncpg from services.shared.config import AlertingConfig from services.shared.metrics import ( ALERT_ACTIVE, ALERT_CHECK_DURATION, ALERTS_FIRED, ALERTS_RESOLVED, ) logger = logging.getLogger("alerting") @dataclass class Alert: """A single alert instance.""" rule: str severity: str # "warning" | "critical" summary: str details: dict[str, Any] = field(default_factory=dict) fired_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @dataclass class AlertState: """Tracks which rules are currently firing to detect transitions.""" active: dict[str, Alert] = field(default_factory=dict) def fire(self, alert: Alert) -> bool: """Record an alert firing. Returns True if this is a new firing.""" key = f"{alert.rule}:{alert.details.get('key', '')}" is_new = key not in self.active self.active[key] = alert return is_new def resolve(self, rule: str, key: str = "") -> bool: """Resolve an alert. Returns True if it was previously active.""" full_key = f"{rule}:{key}" if full_key in self.active: del self.active[full_key] return True return False def is_firing(self, rule: str, key: str = "") -> bool: return f"{rule}:{key}" in self.active async def check_source_failures( pool: asyncpg.Pool, config: AlertingConfig, ) -> list[Alert]: """Check for sources with sustained consecutive failures. Queries ingestion_runs for sources where the last N runs all failed within the lookback window. """ rows = await pool.fetch( """WITH recent_runs AS ( SELECT source_id, status, ROW_NUMBER() OVER (PARTITION BY source_id ORDER BY started_at DESC) AS rn FROM ingestion_runs WHERE started_at >= NOW() - INTERVAL '1 hour' * $1 ), failure_streaks AS ( SELECT source_id, COUNT(*) FILTER (WHERE status = 'failed') AS consecutive_failures, COUNT(*) AS total_runs FROM recent_runs WHERE rn <= $2 GROUP BY source_id HAVING COUNT(*) FILTER (WHERE status = 'failed') = COUNT(*) AND COUNT(*) >= $2 ) SELECT fs.source_id, fs.consecutive_failures, s.source_type, s.source_name, c.ticker FROM failure_streaks fs JOIN sources s ON s.id = fs.source_id JOIN companies c ON c.id = s.company_id""", config.source_failure_window_hours, config.source_failure_threshold, ) alerts = [] for row in rows: alerts.append(Alert( rule="source_failures", severity="warning", summary=( f"Source {row['source_name']} ({row['source_type']}) for " f"{row['ticker']} has {row['consecutive_failures']} consecutive failures" ), details={ "key": str(row["source_id"]), "source_id": str(row["source_id"]), "source_type": row["source_type"], "source_name": row["source_name"], "ticker": row["ticker"], "consecutive_failures": row["consecutive_failures"], }, )) return alerts async def check_schema_failure_spike( pool: asyncpg.Pool, config: AlertingConfig, ) -> list[Alert]: """Check if extraction schema validation failure rate exceeds threshold. Queries model_performance_metrics for the recent window and computes the failure rate. """ row = await pool.fetchrow( """SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE NOT success) AS failed FROM model_performance_metrics WHERE recorded_at >= NOW() - INTERVAL '1 hour' * $1""", config.schema_failure_window_hours, ) if not row or row["total"] == 0: return [] total = row["total"] failed = row["failed"] failure_rate = failed / total if failure_rate >= config.schema_failure_rate_threshold: return [Alert( rule="schema_failure_spike", severity="critical" if failure_rate >= 0.5 else "warning", summary=( f"Extraction schema failure rate is {failure_rate:.1%} " f"({failed}/{total}) in the last {config.schema_failure_window_hours}h" ), details={ "key": "global", "total_extractions": total, "failed_extractions": failed, "failure_rate": round(failure_rate, 4), "threshold": config.schema_failure_rate_threshold, "window_hours": config.schema_failure_window_hours, }, )] return [] async def check_analytical_lag( pool: asyncpg.Pool, config: AlertingConfig, ) -> list[Alert]: """Check if lake publication is lagging beyond threshold. Looks at the audit_events table for the most recent successful lake_publish events per table, and alerts if any are stale. """ rows = await pool.fetch( """SELECT details->>'table_name' AS table_name, MAX(created_at) AS last_publish FROM audit_events WHERE event_type = 'lake_publish' AND details->>'status' = 'success' AND details->>'table_name' IS NOT NULL GROUP BY details->>'table_name' HAVING MAX(created_at) < NOW() - INTERVAL '1 minute' * $1""", config.lake_lag_threshold_minutes, ) alerts = [] now = datetime.now(timezone.utc) for row in rows: table_name = row["table_name"] last_publish = row["last_publish"] if last_publish.tzinfo is None: last_publish = last_publish.replace(tzinfo=timezone.utc) lag_minutes = (now - last_publish).total_seconds() / 60 alerts.append(Alert( rule="analytical_lag", severity="warning", summary=( f"Lake table '{table_name}' last published {lag_minutes:.0f}m ago " f"(threshold: {config.lake_lag_threshold_minutes}m)" ), details={ "key": table_name, "table_name": table_name, "last_publish": last_publish.isoformat(), "lag_minutes": round(lag_minutes, 1), "threshold_minutes": config.lake_lag_threshold_minutes, }, )) return alerts async def check_broker_issues( pool: asyncpg.Pool, config: AlertingConfig, ) -> list[Alert]: """Check for consecutive broker submission errors. Queries order_events for recent broker-level errors (rejections, timeouts, connection failures) within the lookback window. """ rows = await pool.fetch( """WITH recent_events AS ( SELECT order_id, event_type, created_at, ROW_NUMBER() OVER (ORDER BY created_at DESC) AS rn FROM order_events WHERE created_at >= NOW() - INTERVAL '1 hour' * $1 AND event_type IN ('broker_error', 'broker_timeout', 'connection_failed') ) SELECT COUNT(*) AS error_count FROM recent_events WHERE rn <= $2""", config.broker_error_window_hours, config.broker_error_threshold, ) if not rows: return [] error_count = rows[0]["error_count"] if error_count >= config.broker_error_threshold: return [Alert( rule="broker_issues", severity="critical", summary=( f"{error_count} broker errors in the last " f"{config.broker_error_window_hours}h" ), details={ "key": "global", "error_count": error_count, "threshold": config.broker_error_threshold, "window_hours": config.broker_error_window_hours, }, )] return [] async def evaluate_alerts( pool: asyncpg.Pool, config: AlertingConfig, state: AlertState, ) -> list[Alert]: """Run all alert rules and return newly fired alerts. Updates AlertState to track firing/resolved transitions and emits structured log events and Prometheus metrics for each transition. """ all_alerts: list[Alert] = [] with ALERT_CHECK_DURATION.time(): # Collect alerts from all rules try: all_alerts.extend(await check_source_failures(pool, config)) except Exception: logger.exception("Error checking source failures") try: all_alerts.extend(await check_schema_failure_spike(pool, config)) except Exception: logger.exception("Error checking schema failure spike") try: all_alerts.extend(await check_analytical_lag(pool, config)) except Exception: logger.exception("Error checking analytical lag") try: all_alerts.extend(await check_broker_issues(pool, config)) except Exception: logger.exception("Error checking broker issues") # Track which rule+key combos are currently firing current_keys: set[str] = set() newly_fired: list[Alert] = [] for alert in all_alerts: key = f"{alert.rule}:{alert.details.get('key', '')}" current_keys.add(key) if state.fire(alert): # New alert firing ALERTS_FIRED.labels(rule=alert.rule, severity=alert.severity).inc() ALERT_ACTIVE.labels(rule=alert.rule).set(1) newly_fired.append(alert) logger.warning( "ALERT FIRING: [%s] %s", alert.rule, alert.summary, extra={ "alert_rule": alert.rule, "alert_severity": alert.severity, "alert_details": alert.details, }, ) # Check for resolved alerts resolved_keys = set(state.active.keys()) - current_keys for key in resolved_keys: rule = key.split(":")[0] detail_key = key[len(rule) + 1:] if state.resolve(rule, detail_key): ALERTS_RESOLVED.labels(rule=rule).inc() # Only set gauge to 0 if no more alerts for this rule still_firing = any(k.startswith(f"{rule}:") for k in state.active) if not still_firing: ALERT_ACTIVE.labels(rule=rule).set(0) logger.info( "ALERT RESOLVED: [%s] key=%s", rule, detail_key, ) return newly_fired