# Observability and Metrics Reference This document covers the full observability stack for Stonks Oracle: Prometheus metrics, operational alerting, structured logging, dead-letter queues, and recommended monitoring queries. ## Prometheus Metrics Endpoint The Query API exposes a `/metrics` endpoint that returns all registered Prometheus metrics in the standard text exposition format. **Endpoint**: `GET /metrics` on the Query API service (port 8000) **Response**: `text/plain; version=0.0.4; charset=utf-8` — standard Prometheus scrape format via `prometheus_client.generate_latest()`. ### Prometheus Scrape Configuration Add the following job to your `prometheus.yml`: ```yaml scrape_configs: - job_name: "stonks-oracle" scrape_interval: 15s scrape_timeout: 10s metrics_path: /metrics static_targets: - targets: # Docker Compose - "query-api:8000" # Kubernetes # - "query-api.stonks-oracle.svc.cluster.local:8000" ``` For Kubernetes deployments, you can also use a `ServiceMonitor` resource if the Prometheus Operator is installed: ```yaml apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: name: stonks-oracle namespace: stonks-oracle spec: selector: matchLabels: app: query-api endpoints: - port: http path: /metrics interval: 15s ``` --- ## Prometheus Metrics Reference All metrics are defined in `services/shared/metrics.py`. Metric names use the `stonks_` prefix. ### Service Info | Metric | Type | Description | |--------|------|-------------| | `stonks_oracle_info` | Info | Service metadata (build version, etc.) | ### Ingestion Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_ingestion_jobs_total` | Counter | `source_type`, `status` | Total ingestion jobs processed | | `stonks_ingestion_items_fetched_total` | Counter | `source_type` | Total items fetched from external sources | | `stonks_ingestion_items_new_total` | Counter | `source_type` | New (non-duplicate) items ingested | | `stonks_ingestion_items_deduped_total` | Counter | `source_type` | Items skipped due to deduplication | | `stonks_ingestion_errors_total` | Counter | `source_type` | Ingestion errors by source type | | `stonks_ingestion_adapter_duration_seconds` | Histogram | `source_type` | Adapter fetch latency (buckets: 0.1, 0.5, 1, 2, 5, 10, 30, 60s) | ### Parsing Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_parse_jobs_total` | Counter | `status` | Total parse jobs processed | | `stonks_parse_quality_score` | Histogram | — | Distribution of parser quality scores (buckets: 0.1–1.0 in 0.1 steps) | | `stonks_parse_low_quality_total` | Counter | — | Documents flagged as low quality by the parser | | `stonks_parse_duration_seconds` | Histogram | — | Parse job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s) | ### Extraction Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_extraction_jobs_total` | Counter | `status` | Total extraction jobs processed | | `stonks_extraction_attempts_total` | Counter | — | Total Ollama extraction attempts (including retries) | | `stonks_extraction_retries_total` | Counter | — | Extraction retry count | | `stonks_extraction_duration_seconds` | Histogram | — | Extraction total duration (buckets: 1, 2, 5, 10, 20, 30, 60, 120s) | | `stonks_extraction_confidence` | Histogram | — | Distribution of extraction confidence scores (buckets: 0.1–1.0) | | `stonks_extraction_validation_errors_total` | Counter | — | Total validation errors across extractions | | `stonks_extraction_tokens_total` | Counter | `direction` | Estimated token usage (labels: `input`, `output`) | ### Aggregation Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_aggregation_windows_total` | Counter | `window` | Trend windows computed | | `stonks_aggregation_signals_total` | Counter | `window` | Signals processed during aggregation | | `stonks_aggregation_contradiction_score` | Histogram | — | Distribution of contradiction scores in trend windows (buckets: 0.0–1.0) | | `stonks_aggregation_duration_seconds` | Histogram | `window` | Aggregation job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s) | ### Recommendation Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_recommendations_total` | Counter | `action`, `mode` | Recommendations generated | | `stonks_recommendations_suppressed_total` | Counter | — | Recommendations suppressed due to low data quality | | `stonks_recommendation_confidence` | Histogram | — | Distribution of recommendation confidence scores (buckets: 0.1–1.0) | ### Lake Publication Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_lake_facts_published_total` | Counter | `table_name` | Analytical facts published to the lakehouse | | `stonks_lake_publish_duration_seconds` | Histogram | `table_name` | Lake publication write latency (buckets: 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5s) | | `stonks_lake_publish_errors_total` | Counter | `table_name` | Lake publication errors | | `stonks_lake_publish_bytes_total` | Counter | `table_name` | Total bytes written to the lakehouse | ### Trading and Broker Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_orders_submitted_total` | Counter | `side`, `order_type`, `mode` | Orders submitted to broker | | `stonks_orders_rejected_total` | Counter | `reason_category` | Orders rejected before broker submission | | `stonks_orders_filled_total` | Counter | `side` | Orders filled by broker | | `stonks_orders_duplicates_prevented_total` | Counter | `detected_via` | Duplicate orders prevented by idempotency checks | | `stonks_risk_evaluations_total` | Counter | `result` | Risk evaluations performed | | `stonks_risk_check_failures_total` | Counter | `check_name` | Individual risk check failures | | `stonks_positions_synced_total` | Counter | — | Position sync operations completed | ### Alerting Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_alerts_fired_total` | Counter | `rule`, `severity` | Total alerts fired by rule | | `stonks_alerts_resolved_total` | Counter | `rule` | Total alerts resolved by rule | | `stonks_alert_check_duration_seconds` | Histogram | — | Duration of alert evaluation cycle (buckets: 0.01–5s) | | `stonks_alert_active` | Gauge | `rule` | Whether an alert rule is currently firing (1) or resolved (0) | ### Dead-Letter Queue Metrics | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_dlq_items_total` | Counter | `queue` | Jobs sent to dead-letter queues | | `stonks_dlq_replayed_total` | Counter | `queue` | Jobs replayed from dead-letter queues | | `stonks_dlq_depth` | Gauge | `queue` | Current dead-letter queue depth | ### Active Jobs Gauge | Metric | Type | Labels | Description | |--------|------|--------|-------------| | `stonks_active_jobs` | Gauge | `stage` | Currently processing jobs by pipeline stage | --- ## Alerting Module The alerting module (`services/shared/alerting.py`) evaluates four operational alert rules against PostgreSQL state on a configurable interval. When a threshold is breached, the module emits structured log events and increments Prometheus counters. When a previously firing alert clears, it logs a resolution event. ### Alert Rules #### 1. `source_failures` — Sustained Source Retrieval Failures Detects sources where the last N ingestion runs all failed within the lookback window. | Parameter | ConfigMap Variable | Default | Description | |-----------|--------------------|---------|-------------| | Consecutive failure threshold | `ALERT_SOURCE_FAILURE_THRESHOLD` | `3` | Number of consecutive failures before alert fires | | Lookback window | `ALERT_SOURCE_FAILURE_WINDOW_HOURS` | `6` hours | How far back to check ingestion_runs | **Severity**: `warning` **Query**: Checks `ingestion_runs` for sources where the most recent N runs (within the window) all have `status = 'failed'`. **Details emitted**: `source_id`, `source_type`, `source_name`, `ticker`, `consecutive_failures` #### 2. `schema_failure_spike` — Extraction Validation Failure Rate Detects when the extraction schema validation failure rate exceeds a threshold. | Parameter | ConfigMap Variable | Default | Description | |-----------|--------------------|---------|-------------| | Failure rate threshold | `ALERT_SCHEMA_FAILURE_RATE_THRESHOLD` | `0.3` (30%) | Failure rate that triggers the alert | | Lookback window | `ALERT_SCHEMA_FAILURE_WINDOW_HOURS` | `1` hour | Window for computing failure rate | **Severity**: `warning` if rate ≥ 30%, `critical` if rate ≥ 50% **Query**: Computes `failed / total` from `model_performance_metrics` within the window. **Details emitted**: `total_extractions`, `failed_extractions`, `failure_rate`, `threshold`, `window_hours` #### 3. `analytical_lag` — Lake Publication Lag Detects when lake publication has not completed within the threshold for any table. | Parameter | ConfigMap Variable | Default | Description | |-----------|--------------------|---------|-------------| | Lag threshold | `ALERT_LAKE_LAG_THRESHOLD_MINUTES` | `60` minutes | Maximum acceptable time since last successful publish | **Severity**: `warning` **Query**: Checks `audit_events` for the most recent successful `lake_publish` event per table, alerts if any are older than the threshold. **Details emitted**: `table_name`, `last_publish`, `lag_minutes`, `threshold_minutes` #### 4. `broker_issues` — Consecutive Broker Errors Detects consecutive broker submission errors (rejections, timeouts, connection failures). | Parameter | ConfigMap Variable | Default | Description | |-----------|--------------------|---------|-------------| | Error threshold | `ALERT_BROKER_ERROR_THRESHOLD` | `3` | Consecutive broker errors before alert fires | | Lookback window | `ALERT_BROKER_ERROR_WINDOW_HOURS` | `1` hour | Window for checking order_events | **Severity**: `critical` **Query**: Counts recent `order_events` with `event_type IN ('broker_error', 'broker_timeout', 'connection_failed')`. **Details emitted**: `error_count`, `threshold`, `window_hours` ### Evaluation Cycle The alerting module runs on a configurable interval (default: every 120 seconds, controlled by `ALERT_CHECK_INTERVAL_SECONDS`). Each cycle: 1. Runs all four alert rules against PostgreSQL 2. Compares results to the current `AlertState` to detect new firings and resolutions 3. For new firings: increments `stonks_alerts_fired_total`, sets `stonks_alert_active` gauge to 1, logs a `WARNING` 4. For resolutions: increments `stonks_alerts_resolved_total`, sets `stonks_alert_active` gauge to 0, logs an `INFO` 5. Records the evaluation duration in `stonks_alert_check_duration_seconds` Each rule check is wrapped in a try/except so a failure in one rule does not block the others. ### ConfigMap Variables Summary | Variable | Default | Description | |----------|---------|-------------| | `ALERT_SOURCE_FAILURE_THRESHOLD` | `3` | Consecutive source failures before alert | | `ALERT_SOURCE_FAILURE_WINDOW_HOURS` | `6` | Source failure lookback window (hours) | | `ALERT_SCHEMA_FAILURE_RATE_THRESHOLD` | `0.3` | Extraction failure rate threshold (0.0–1.0) | | `ALERT_SCHEMA_FAILURE_WINDOW_HOURS` | `1` | Schema failure lookback window (hours) | | `ALERT_LAKE_LAG_THRESHOLD_MINUTES` | `60` | Max minutes since last lake publish | | `ALERT_BROKER_ERROR_THRESHOLD` | `3` | Consecutive broker errors before alert | | `ALERT_BROKER_ERROR_WINDOW_HOURS` | `1` | Broker error lookback window (hours) | | `ALERT_CHECK_INTERVAL_SECONDS` | `120` | Seconds between alert evaluation cycles | --- ## Structured Logging All services use structured JSON logging configured via `services/shared/logging.py`. Call `setup_logging(service_name)` once at service startup. ### JSON Log Format Each log line is a single JSON object with the following fields: ```json { "timestamp": "2025-01-15T12:34:56.789012+00:00", "level": "INFO", "logger": "ingestion_worker", "message": "Processed job for AAPL", "service": "ingestion_worker", "trace_id": "a1b2c3d4e5f67890", "span_id": "1a2b3c4d" } ``` | Field | Type | Description | |-------|------|-------------| | `timestamp` | string (ISO 8601) | UTC timestamp of the log event | | `level` | string | Log level: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` | | `logger` | string | Python logger name | | `message` | string | Human-readable log message | | `service` | string | Service name set at startup (e.g., `ingestion_worker`, `scheduler`) | | `trace_id` | string | 16-character hex trace ID for distributed tracing | | `span_id` | string | 8-character hex span ID for the current operation | ### Additional Context Fields When present, these fields are merged into the JSON output: | Field | Source | Description | |-------|--------|-------------| | `span_operation` | `Span` context manager | Name of the traced operation | | `span_status` | `Span` context manager | `ok` or `error` | | `span_duration_ms` | `Span` context manager | Duration of the span in milliseconds | | `span_parent_id` | `Span` context manager | Parent span ID for nested spans | | `span_attributes` | `Span` context manager | Arbitrary key-value attributes set on the span | | `ticker` | Manual `extra={}` | Company ticker symbol | | `document_id` | Manual `extra={}` | Document UUID | | `source_type` | Manual `extra={}` | Source type (e.g., `polygon`, `news_api`) | | `job_id` | Manual `extra={}` | Job identifier | | `duration_ms` | Manual `extra={}` | Operation duration | | `error` | Manual `extra={}` | Error description | | `count` | Manual `extra={}` | Item count | | `exception` | Automatic | Formatted exception traceback (when `exc_info` is set) | ### Trace Context Propagation Trace context flows through the pipeline via job payloads: 1. **Inject**: Before enqueuing a job to Redis, call `inject_trace_context(payload)` to add `_trace_id` to the payload dict. 2. **Extract**: At the start of job processing, call `extract_trace_context(payload)` to restore the trace context (or generate a new one if absent). 3. **Span**: Use the `Span` context manager to create child spans within a service: ```python from services.shared.logging import Span with Span("process_document", ticker="AAPL") as span: # ... do work ... span.set_attribute("doc_count", 5) ``` This produces a structured log entry on span exit with duration, status, and attributes. ### Log Querying To trace a request through the pipeline, filter by `trace_id`: ```bash # Kubernetes — find all logs for a specific trace kubectl logs -n stonks-oracle -l app.kubernetes.io/part-of=stonks-oracle --all-containers \ | jq -r 'select(.trace_id == "a1b2c3d4e5f67890")' # Docker Compose — search across all services docker compose logs --no-color | grep '"trace_id":"a1b2c3d4e5f67890"' ``` To find errors in a specific service: ```bash # Kubernetes kubectl logs -n stonks-oracle deployment/extractor --tail=500 \ | jq 'select(.level == "ERROR")' # Docker Compose docker compose logs extractor --no-color --tail=500 \ | jq 'select(.level == "ERROR")' ``` To find slow extraction spans: ```bash kubectl logs -n stonks-oracle deployment/extractor --tail=1000 \ | jq 'select(.span_operation == "extract_document" and .span_duration_ms > 30000)' ``` --- ## Dead-Letter Queue System When a worker fails to process a job after exhausting retries (default: 3 attempts), the job is pushed to a per-queue dead-letter list in Redis. The DLQ system is implemented in `services/shared/dead_letter.py`. ### Queue Names Dead-letter queues follow the naming pattern `stonks:dlq:`: | DLQ Key | Source Queue | Description | |---------|-------------|-------------| | `stonks:dlq:ingestion` | `stonks:queue:ingestion` | Failed ingestion jobs (adapter errors, API failures) | | `stonks:dlq:parsing` | `stonks:queue:parsing` | Failed parse jobs | | `stonks:dlq:extraction` | `stonks:queue:extraction` | Failed extraction jobs (LLM errors, validation failures) | | `stonks:dlq:aggregation` | `stonks:queue:aggregation` | Failed aggregation jobs | | `stonks:dlq:recommendation` | `stonks:queue:recommendation` | Failed recommendation jobs | | `stonks:dlq:broker_orders` | `stonks:queue:broker_orders` | Failed broker order submissions | When `DEPLOY_STAGE` is set, the prefix becomes `stonks::dlq:`. ### DLQ Entry Format Each DLQ entry wraps the original job payload with failure metadata: ```json { "original_payload": { "source_id": "...", "source_type": "polygon", "ticker": "AAPL", "company_id": "...", "config": {} }, "queue": "ingestion", "error": "ConnectionError: API timeout after 30s", "attempt": 3, "worker": "ingestion_worker", "dead_lettered_at": "2025-01-15T12:34:56.789012+00:00" } ``` | Field | Type | Description | |-------|------|-------------| | `original_payload` | object | The original job payload as it was enqueued | | `queue` | string | Source queue name | | `error` | string | Error message from the final failed attempt | | `attempt` | integer | Number of attempts made before dead-lettering | | `worker` | string | Worker identifier that dead-lettered the job | | `dead_lettered_at` | string (ISO 8601) | UTC timestamp when the job was dead-lettered | ### Routing Jobs are routed to the DLQ by calling `send_to_dlq()` from worker code after retry exhaustion: ```python from services.shared.dead_letter import send_to_dlq await send_to_dlq( rds=redis_client, queue_name="ingestion", original_payload=job, error=str(exception), attempt=3, worker="ingestion_worker", ) ``` The default maximum attempts before dead-lettering is `DEFAULT_MAX_ATTEMPTS = 3`. ### Replay Tooling The `services/shared/dead_letter.py` module provides functions for inspecting and replaying DLQ items: | Function | Description | |----------|-------------| | `peek_dlq(rds, queue_name, start=0, count=10)` | Inspect DLQ entries without removing them | | `replay_one(rds, queue_name)` | Pop the oldest DLQ entry and re-enqueue its original payload to the source queue | | `replay_all(rds, queue_name)` | Replay every item in the DLQ back to the source queue. Returns the count replayed | | `dlq_length(rds, queue_name)` | Return the number of items in the DLQ | | `dlq_summary(rds, queue_names)` | Return a mapping of queue_name → DLQ depth for multiple queues | | `purge_dlq(rds, queue_name)` | Delete all items from the DLQ. Returns count removed | ### Monitoring DLQ Depth Use the `scripts/check_queues.py` script to inspect queue and DLQ depths from the command line: ```bash # Docker Compose REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD="" \ python scripts/check_queues.py # Kubernetes kubectl exec -n stonks-oracle deployment/query-api -- \ python scripts/check_queues.py ``` The Query API also exposes DLQ depths in the `/api/ops/pipeline/stream` SSE endpoint and the DevOps metrics endpoints, reporting `dlq:` keys alongside regular queue depths. The `stonks_dlq_depth` Prometheus gauge tracks DLQ depth per queue for dashboard alerting. --- ## Recommended Prometheus/Grafana Queries ### Ingestion Throughput ```promql # Ingestion jobs per minute by source type and status sum(rate(stonks_ingestion_jobs_total[5m])) by (source_type, status) * 60 # New items ingested per minute sum(rate(stonks_ingestion_items_new_total[5m])) * 60 # Deduplication ratio (higher = more duplicates being filtered) sum(rate(stonks_ingestion_items_deduped_total[5m])) / sum(rate(stonks_ingestion_items_fetched_total[5m])) # Adapter latency p95 by source type histogram_quantile(0.95, sum(rate(stonks_ingestion_adapter_duration_seconds_bucket[5m])) by (le, source_type)) # Ingestion error rate sum(rate(stonks_ingestion_errors_total[5m])) by (source_type) ``` ### Extraction Latency and Quality ```promql # Extraction duration p50 and p95 histogram_quantile(0.5, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le)) histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le)) # Extraction success rate sum(rate(stonks_extraction_jobs_total{status="success"}[5m])) / sum(rate(stonks_extraction_jobs_total[5m])) # Average extraction confidence histogram_quantile(0.5, sum(rate(stonks_extraction_confidence_bucket[5m])) by (le)) # Validation error rate sum(rate(stonks_extraction_validation_errors_total[5m])) # Token usage rate (input vs output) sum(rate(stonks_extraction_tokens_total[5m])) by (direction) ``` ### Aggregation Volume ```promql # Trend windows computed per minute by window size sum(rate(stonks_aggregation_windows_total[5m])) by (window) * 60 # Signals processed per minute sum(rate(stonks_aggregation_signals_total[5m])) by (window) * 60 # Average contradiction score (higher = more conflicting signals) histogram_quantile(0.5, sum(rate(stonks_aggregation_contradiction_score_bucket[5m])) by (le)) # Aggregation duration p95 histogram_quantile(0.95, sum(rate(stonks_aggregation_duration_seconds_bucket[5m])) by (le, window)) ``` ### Recommendation Generation ```promql # Recommendations generated per minute by action sum(rate(stonks_recommendations_total[5m])) by (action, mode) * 60 # Suppression rate sum(rate(stonks_recommendations_suppressed_total[5m])) / sum(rate(stonks_recommendations_total[5m])) # Recommendation confidence distribution histogram_quantile(0.5, sum(rate(stonks_recommendation_confidence_bucket[5m])) by (le)) ``` ### Trading Engine Activity ```promql # Orders submitted per minute by side sum(rate(stonks_orders_submitted_total[5m])) by (side, mode) * 60 # Order rejection rate by reason sum(rate(stonks_orders_rejected_total[5m])) by (reason_category) # Fill rate sum(rate(stonks_orders_filled_total[5m])) / sum(rate(stonks_orders_submitted_total[5m])) # Duplicate orders prevented sum(rate(stonks_orders_duplicates_prevented_total[5m])) by (detected_via) # Risk evaluation outcomes sum(rate(stonks_risk_evaluations_total[5m])) by (result) # Risk check failure breakdown sum(rate(stonks_risk_check_failures_total[5m])) by (check_name) ``` ### Lake Publication ```promql # Facts published per minute by table sum(rate(stonks_lake_facts_published_total[5m])) by (table_name) * 60 # Write latency p95 by table histogram_quantile(0.95, sum(rate(stonks_lake_publish_duration_seconds_bucket[5m])) by (le, table_name)) # Publication error rate sum(rate(stonks_lake_publish_errors_total[5m])) by (table_name) # Bytes written per minute sum(rate(stonks_lake_publish_bytes_total[5m])) by (table_name) * 60 ``` ### Alerting Health ```promql # Currently active alerts by rule stonks_alert_active # Alert firing rate sum(rate(stonks_alerts_fired_total[1h])) by (rule, severity) # Alert evaluation duration histogram_quantile(0.95, sum(rate(stonks_alert_check_duration_seconds_bucket[5m])) by (le)) ``` ### Dead-Letter Queue Health ```promql # Current DLQ depth by queue stonks_dlq_depth # DLQ inflow rate (jobs dead-lettered per minute) sum(rate(stonks_dlq_items_total[5m])) by (queue) * 60 # DLQ replay rate sum(rate(stonks_dlq_replayed_total[5m])) by (queue) * 60 ``` ### Pipeline Overview (Active Jobs) ```promql # Currently active jobs by pipeline stage stonks_active_jobs # Parse quality score distribution histogram_quantile(0.5, sum(rate(stonks_parse_quality_score_bucket[5m])) by (le)) # Low quality document rate sum(rate(stonks_parse_low_quality_total[5m])) / sum(rate(stonks_parse_jobs_total[5m])) ``` ### Recommended Grafana Alert Rules | Alert | Expression | For | Severity | |-------|-----------|-----|----------| | High DLQ depth | `stonks_dlq_depth > 10` | 5m | warning | | Ingestion error spike | `sum(rate(stonks_ingestion_errors_total[5m])) > 0.5` | 5m | warning | | Extraction latency high | `histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le)) > 60` | 10m | warning | | Lake publication stale | `stonks_alert_active{rule="analytical_lag"} == 1` | 5m | warning | | Broker errors active | `stonks_alert_active{rule="broker_issues"} == 1` | 1m | critical | | Zero ingestion throughput | `sum(rate(stonks_ingestion_jobs_total[15m])) == 0` | 15m | critical |