Files
stonks-oracle/docs/observability.md
T
Celes Renata 88ad1e8d99 feat: comprehensive docs, unit tests, docker-compose app services
- Add scheduler and ingestion unit tests (test_scheduler_unit.py, test_ingestion_unit.py)
- Add all 13 app services + dashboard to docker-compose.yml
- Add full documentation suite: API reference, Helm reference, Docker deployment guide,
  3 architecture diagrams (K8s, Docker Compose, data pipeline), AI agent guide,
  backup/restore guide, observability/metrics reference, per-service docs
- Add intelligence pipeline deep-dive docs with Mermaid diagrams
- Update README with documentation index and links
- Add specs for comprehensive-quality-docs, intelligence-pipeline-deep-dive,
  sanitized-pipeline-docs
2026-04-22 02:56:41 +00:00

24 KiB
Raw Permalink Blame History

Observability and Metrics Reference

This document covers the full observability stack for Stonks Oracle: Prometheus metrics, operational alerting, structured logging, dead-letter queues, and recommended monitoring queries.

Prometheus Metrics Endpoint

The Query API exposes a /metrics endpoint that returns all registered Prometheus metrics in the standard text exposition format.

Endpoint: GET /metrics on the Query API service (port 8000)

Response: text/plain; version=0.0.4; charset=utf-8 — standard Prometheus scrape format via prometheus_client.generate_latest().

Prometheus Scrape Configuration

Add the following job to your prometheus.yml:

scrape_configs:
  - job_name: "stonks-oracle"
    scrape_interval: 15s
    scrape_timeout: 10s
    metrics_path: /metrics
    static_targets:
      - targets:
          # Docker Compose
          - "query-api:8000"
          # Kubernetes
          # - "query-api.stonks-oracle.svc.cluster.local:8000"

For Kubernetes deployments, you can also use a ServiceMonitor resource if the Prometheus Operator is installed:

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: stonks-oracle
  namespace: stonks-oracle
spec:
  selector:
    matchLabels:
      app: query-api
  endpoints:
    - port: http
      path: /metrics
      interval: 15s

Prometheus Metrics Reference

All metrics are defined in services/shared/metrics.py. Metric names use the stonks_ prefix.

Service Info

Metric Type Description
stonks_oracle_info Info Service metadata (build version, etc.)

Ingestion Metrics

Metric Type Labels Description
stonks_ingestion_jobs_total Counter source_type, status Total ingestion jobs processed
stonks_ingestion_items_fetched_total Counter source_type Total items fetched from external sources
stonks_ingestion_items_new_total Counter source_type New (non-duplicate) items ingested
stonks_ingestion_items_deduped_total Counter source_type Items skipped due to deduplication
stonks_ingestion_errors_total Counter source_type Ingestion errors by source type
stonks_ingestion_adapter_duration_seconds Histogram source_type Adapter fetch latency (buckets: 0.1, 0.5, 1, 2, 5, 10, 30, 60s)

Parsing Metrics

Metric Type Labels Description
stonks_parse_jobs_total Counter status Total parse jobs processed
stonks_parse_quality_score Histogram Distribution of parser quality scores (buckets: 0.11.0 in 0.1 steps)
stonks_parse_low_quality_total Counter Documents flagged as low quality by the parser
stonks_parse_duration_seconds Histogram Parse job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s)

Extraction Metrics

Metric Type Labels Description
stonks_extraction_jobs_total Counter status Total extraction jobs processed
stonks_extraction_attempts_total Counter Total Ollama extraction attempts (including retries)
stonks_extraction_retries_total Counter Extraction retry count
stonks_extraction_duration_seconds Histogram Extraction total duration (buckets: 1, 2, 5, 10, 20, 30, 60, 120s)
stonks_extraction_confidence Histogram Distribution of extraction confidence scores (buckets: 0.11.0)
stonks_extraction_validation_errors_total Counter Total validation errors across extractions
stonks_extraction_tokens_total Counter direction Estimated token usage (labels: input, output)

Aggregation Metrics

Metric Type Labels Description
stonks_aggregation_windows_total Counter window Trend windows computed
stonks_aggregation_signals_total Counter window Signals processed during aggregation
stonks_aggregation_contradiction_score Histogram Distribution of contradiction scores in trend windows (buckets: 0.01.0)
stonks_aggregation_duration_seconds Histogram window Aggregation job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s)

Recommendation Metrics

Metric Type Labels Description
stonks_recommendations_total Counter action, mode Recommendations generated
stonks_recommendations_suppressed_total Counter Recommendations suppressed due to low data quality
stonks_recommendation_confidence Histogram Distribution of recommendation confidence scores (buckets: 0.11.0)

Lake Publication Metrics

Metric Type Labels Description
stonks_lake_facts_published_total Counter table_name Analytical facts published to the lakehouse
stonks_lake_publish_duration_seconds Histogram table_name Lake publication write latency (buckets: 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5s)
stonks_lake_publish_errors_total Counter table_name Lake publication errors
stonks_lake_publish_bytes_total Counter table_name Total bytes written to the lakehouse

Trading and Broker Metrics

Metric Type Labels Description
stonks_orders_submitted_total Counter side, order_type, mode Orders submitted to broker
stonks_orders_rejected_total Counter reason_category Orders rejected before broker submission
stonks_orders_filled_total Counter side Orders filled by broker
stonks_orders_duplicates_prevented_total Counter detected_via Duplicate orders prevented by idempotency checks
stonks_risk_evaluations_total Counter result Risk evaluations performed
stonks_risk_check_failures_total Counter check_name Individual risk check failures
stonks_positions_synced_total Counter Position sync operations completed

Alerting Metrics

Metric Type Labels Description
stonks_alerts_fired_total Counter rule, severity Total alerts fired by rule
stonks_alerts_resolved_total Counter rule Total alerts resolved by rule
stonks_alert_check_duration_seconds Histogram Duration of alert evaluation cycle (buckets: 0.015s)
stonks_alert_active Gauge rule Whether an alert rule is currently firing (1) or resolved (0)

Dead-Letter Queue Metrics

Metric Type Labels Description
stonks_dlq_items_total Counter queue Jobs sent to dead-letter queues
stonks_dlq_replayed_total Counter queue Jobs replayed from dead-letter queues
stonks_dlq_depth Gauge queue Current dead-letter queue depth

Active Jobs Gauge

Metric Type Labels Description
stonks_active_jobs Gauge stage Currently processing jobs by pipeline stage

Alerting Module

The alerting module (services/shared/alerting.py) evaluates four operational alert rules against PostgreSQL state on a configurable interval. When a threshold is breached, the module emits structured log events and increments Prometheus counters. When a previously firing alert clears, it logs a resolution event.

Alert Rules

1. source_failures — Sustained Source Retrieval Failures

Detects sources where the last N ingestion runs all failed within the lookback window.

Parameter ConfigMap Variable Default Description
Consecutive failure threshold ALERT_SOURCE_FAILURE_THRESHOLD 3 Number of consecutive failures before alert fires
Lookback window ALERT_SOURCE_FAILURE_WINDOW_HOURS 6 hours How far back to check ingestion_runs

Severity: warning

Query: Checks ingestion_runs for sources where the most recent N runs (within the window) all have status = 'failed'.

Details emitted: source_id, source_type, source_name, ticker, consecutive_failures

2. schema_failure_spike — Extraction Validation Failure Rate

Detects when the extraction schema validation failure rate exceeds a threshold.

Parameter ConfigMap Variable Default Description
Failure rate threshold ALERT_SCHEMA_FAILURE_RATE_THRESHOLD 0.3 (30%) Failure rate that triggers the alert
Lookback window ALERT_SCHEMA_FAILURE_WINDOW_HOURS 1 hour Window for computing failure rate

Severity: warning if rate ≥ 30%, critical if rate ≥ 50%

Query: Computes failed / total from model_performance_metrics within the window.

Details emitted: total_extractions, failed_extractions, failure_rate, threshold, window_hours

3. analytical_lag — Lake Publication Lag

Detects when lake publication has not completed within the threshold for any table.

Parameter ConfigMap Variable Default Description
Lag threshold ALERT_LAKE_LAG_THRESHOLD_MINUTES 60 minutes Maximum acceptable time since last successful publish

Severity: warning

Query: Checks audit_events for the most recent successful lake_publish event per table, alerts if any are older than the threshold.

Details emitted: table_name, last_publish, lag_minutes, threshold_minutes

4. broker_issues — Consecutive Broker Errors

Detects consecutive broker submission errors (rejections, timeouts, connection failures).

Parameter ConfigMap Variable Default Description
Error threshold ALERT_BROKER_ERROR_THRESHOLD 3 Consecutive broker errors before alert fires
Lookback window ALERT_BROKER_ERROR_WINDOW_HOURS 1 hour Window for checking order_events

Severity: critical

Query: Counts recent order_events with event_type IN ('broker_error', 'broker_timeout', 'connection_failed').

Details emitted: error_count, threshold, window_hours

Evaluation Cycle

The alerting module runs on a configurable interval (default: every 120 seconds, controlled by ALERT_CHECK_INTERVAL_SECONDS). Each cycle:

  1. Runs all four alert rules against PostgreSQL
  2. Compares results to the current AlertState to detect new firings and resolutions
  3. For new firings: increments stonks_alerts_fired_total, sets stonks_alert_active gauge to 1, logs a WARNING
  4. For resolutions: increments stonks_alerts_resolved_total, sets stonks_alert_active gauge to 0, logs an INFO
  5. Records the evaluation duration in stonks_alert_check_duration_seconds

Each rule check is wrapped in a try/except so a failure in one rule does not block the others.

ConfigMap Variables Summary

Variable Default Description
ALERT_SOURCE_FAILURE_THRESHOLD 3 Consecutive source failures before alert
ALERT_SOURCE_FAILURE_WINDOW_HOURS 6 Source failure lookback window (hours)
ALERT_SCHEMA_FAILURE_RATE_THRESHOLD 0.3 Extraction failure rate threshold (0.01.0)
ALERT_SCHEMA_FAILURE_WINDOW_HOURS 1 Schema failure lookback window (hours)
ALERT_LAKE_LAG_THRESHOLD_MINUTES 60 Max minutes since last lake publish
ALERT_BROKER_ERROR_THRESHOLD 3 Consecutive broker errors before alert
ALERT_BROKER_ERROR_WINDOW_HOURS 1 Broker error lookback window (hours)
ALERT_CHECK_INTERVAL_SECONDS 120 Seconds between alert evaluation cycles

Structured Logging

All services use structured JSON logging configured via services/shared/logging.py. Call setup_logging(service_name) once at service startup.

JSON Log Format

Each log line is a single JSON object with the following fields:

{
  "timestamp": "2025-01-15T12:34:56.789012+00:00",
  "level": "INFO",
  "logger": "ingestion_worker",
  "message": "Processed job for AAPL",
  "service": "ingestion_worker",
  "trace_id": "a1b2c3d4e5f67890",
  "span_id": "1a2b3c4d"
}
Field Type Description
timestamp string (ISO 8601) UTC timestamp of the log event
level string Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL
logger string Python logger name
message string Human-readable log message
service string Service name set at startup (e.g., ingestion_worker, scheduler)
trace_id string 16-character hex trace ID for distributed tracing
span_id string 8-character hex span ID for the current operation

Additional Context Fields

When present, these fields are merged into the JSON output:

Field Source Description
span_operation Span context manager Name of the traced operation
span_status Span context manager ok or error
span_duration_ms Span context manager Duration of the span in milliseconds
span_parent_id Span context manager Parent span ID for nested spans
span_attributes Span context manager Arbitrary key-value attributes set on the span
ticker Manual extra={} Company ticker symbol
document_id Manual extra={} Document UUID
source_type Manual extra={} Source type (e.g., polygon, news_api)
job_id Manual extra={} Job identifier
duration_ms Manual extra={} Operation duration
error Manual extra={} Error description
count Manual extra={} Item count
exception Automatic Formatted exception traceback (when exc_info is set)

Trace Context Propagation

Trace context flows through the pipeline via job payloads:

  1. Inject: Before enqueuing a job to Redis, call inject_trace_context(payload) to add _trace_id to the payload dict.
  2. Extract: At the start of job processing, call extract_trace_context(payload) to restore the trace context (or generate a new one if absent).
  3. Span: Use the Span context manager to create child spans within a service:
from services.shared.logging import Span

with Span("process_document", ticker="AAPL") as span:
    # ... do work ...
    span.set_attribute("doc_count", 5)

This produces a structured log entry on span exit with duration, status, and attributes.

Log Querying

To trace a request through the pipeline, filter by trace_id:

# Kubernetes — find all logs for a specific trace
kubectl logs -n stonks-oracle -l app.kubernetes.io/part-of=stonks-oracle --all-containers \
  | jq -r 'select(.trace_id == "a1b2c3d4e5f67890")'

# Docker Compose — search across all services
docker compose logs --no-color | grep '"trace_id":"a1b2c3d4e5f67890"'

To find errors in a specific service:

# Kubernetes
kubectl logs -n stonks-oracle deployment/extractor --tail=500 \
  | jq 'select(.level == "ERROR")'

# Docker Compose
docker compose logs extractor --no-color --tail=500 \
  | jq 'select(.level == "ERROR")'

To find slow extraction spans:

kubectl logs -n stonks-oracle deployment/extractor --tail=1000 \
  | jq 'select(.span_operation == "extract_document" and .span_duration_ms > 30000)'

Dead-Letter Queue System

When a worker fails to process a job after exhausting retries (default: 3 attempts), the job is pushed to a per-queue dead-letter list in Redis. The DLQ system is implemented in services/shared/dead_letter.py.

Queue Names

Dead-letter queues follow the naming pattern stonks:dlq:<queue_name>:

DLQ Key Source Queue Description
stonks:dlq:ingestion stonks:queue:ingestion Failed ingestion jobs (adapter errors, API failures)
stonks:dlq:parsing stonks:queue:parsing Failed parse jobs
stonks:dlq:extraction stonks:queue:extraction Failed extraction jobs (LLM errors, validation failures)
stonks:dlq:aggregation stonks:queue:aggregation Failed aggregation jobs
stonks:dlq:recommendation stonks:queue:recommendation Failed recommendation jobs
stonks:dlq:broker_orders stonks:queue:broker_orders Failed broker order submissions

When DEPLOY_STAGE is set, the prefix becomes stonks:<stage>:dlq:<queue_name>.

DLQ Entry Format

Each DLQ entry wraps the original job payload with failure metadata:

{
  "original_payload": {
    "source_id": "...",
    "source_type": "polygon",
    "ticker": "AAPL",
    "company_id": "...",
    "config": {}
  },
  "queue": "ingestion",
  "error": "ConnectionError: API timeout after 30s",
  "attempt": 3,
  "worker": "ingestion_worker",
  "dead_lettered_at": "2025-01-15T12:34:56.789012+00:00"
}
Field Type Description
original_payload object The original job payload as it was enqueued
queue string Source queue name
error string Error message from the final failed attempt
attempt integer Number of attempts made before dead-lettering
worker string Worker identifier that dead-lettered the job
dead_lettered_at string (ISO 8601) UTC timestamp when the job was dead-lettered

Routing

Jobs are routed to the DLQ by calling send_to_dlq() from worker code after retry exhaustion:

from services.shared.dead_letter import send_to_dlq

await send_to_dlq(
    rds=redis_client,
    queue_name="ingestion",
    original_payload=job,
    error=str(exception),
    attempt=3,
    worker="ingestion_worker",
)

The default maximum attempts before dead-lettering is DEFAULT_MAX_ATTEMPTS = 3.

Replay Tooling

The services/shared/dead_letter.py module provides functions for inspecting and replaying DLQ items:

Function Description
peek_dlq(rds, queue_name, start=0, count=10) Inspect DLQ entries without removing them
replay_one(rds, queue_name) Pop the oldest DLQ entry and re-enqueue its original payload to the source queue
replay_all(rds, queue_name) Replay every item in the DLQ back to the source queue. Returns the count replayed
dlq_length(rds, queue_name) Return the number of items in the DLQ
dlq_summary(rds, queue_names) Return a mapping of queue_name → DLQ depth for multiple queues
purge_dlq(rds, queue_name) Delete all items from the DLQ. Returns count removed

Monitoring DLQ Depth

Use the scripts/check_queues.py script to inspect queue and DLQ depths from the command line:

# Docker Compose
REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD="" \
  python scripts/check_queues.py

# Kubernetes
kubectl exec -n stonks-oracle deployment/query-api -- \
  python scripts/check_queues.py

The Query API also exposes DLQ depths in the /api/ops/pipeline/stream SSE endpoint and the DevOps metrics endpoints, reporting dlq:<queue_name> keys alongside regular queue depths.

The stonks_dlq_depth Prometheus gauge tracks DLQ depth per queue for dashboard alerting.


Ingestion Throughput

# Ingestion jobs per minute by source type and status
sum(rate(stonks_ingestion_jobs_total[5m])) by (source_type, status) * 60

# New items ingested per minute
sum(rate(stonks_ingestion_items_new_total[5m])) * 60

# Deduplication ratio (higher = more duplicates being filtered)
sum(rate(stonks_ingestion_items_deduped_total[5m]))
  / sum(rate(stonks_ingestion_items_fetched_total[5m]))

# Adapter latency p95 by source type
histogram_quantile(0.95, sum(rate(stonks_ingestion_adapter_duration_seconds_bucket[5m])) by (le, source_type))

# Ingestion error rate
sum(rate(stonks_ingestion_errors_total[5m])) by (source_type)

Extraction Latency and Quality

# Extraction duration p50 and p95
histogram_quantile(0.5, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le))
histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le))

# Extraction success rate
sum(rate(stonks_extraction_jobs_total{status="success"}[5m]))
  / sum(rate(stonks_extraction_jobs_total[5m]))

# Average extraction confidence
histogram_quantile(0.5, sum(rate(stonks_extraction_confidence_bucket[5m])) by (le))

# Validation error rate
sum(rate(stonks_extraction_validation_errors_total[5m]))

# Token usage rate (input vs output)
sum(rate(stonks_extraction_tokens_total[5m])) by (direction)

Aggregation Volume

# Trend windows computed per minute by window size
sum(rate(stonks_aggregation_windows_total[5m])) by (window) * 60

# Signals processed per minute
sum(rate(stonks_aggregation_signals_total[5m])) by (window) * 60

# Average contradiction score (higher = more conflicting signals)
histogram_quantile(0.5, sum(rate(stonks_aggregation_contradiction_score_bucket[5m])) by (le))

# Aggregation duration p95
histogram_quantile(0.95, sum(rate(stonks_aggregation_duration_seconds_bucket[5m])) by (le, window))

Recommendation Generation

# Recommendations generated per minute by action
sum(rate(stonks_recommendations_total[5m])) by (action, mode) * 60

# Suppression rate
sum(rate(stonks_recommendations_suppressed_total[5m]))
  / sum(rate(stonks_recommendations_total[5m]))

# Recommendation confidence distribution
histogram_quantile(0.5, sum(rate(stonks_recommendation_confidence_bucket[5m])) by (le))

Trading Engine Activity

# Orders submitted per minute by side
sum(rate(stonks_orders_submitted_total[5m])) by (side, mode) * 60

# Order rejection rate by reason
sum(rate(stonks_orders_rejected_total[5m])) by (reason_category)

# Fill rate
sum(rate(stonks_orders_filled_total[5m]))
  / sum(rate(stonks_orders_submitted_total[5m]))

# Duplicate orders prevented
sum(rate(stonks_orders_duplicates_prevented_total[5m])) by (detected_via)

# Risk evaluation outcomes
sum(rate(stonks_risk_evaluations_total[5m])) by (result)

# Risk check failure breakdown
sum(rate(stonks_risk_check_failures_total[5m])) by (check_name)

Lake Publication

# Facts published per minute by table
sum(rate(stonks_lake_facts_published_total[5m])) by (table_name) * 60

# Write latency p95 by table
histogram_quantile(0.95, sum(rate(stonks_lake_publish_duration_seconds_bucket[5m])) by (le, table_name))

# Publication error rate
sum(rate(stonks_lake_publish_errors_total[5m])) by (table_name)

# Bytes written per minute
sum(rate(stonks_lake_publish_bytes_total[5m])) by (table_name) * 60

Alerting Health

# Currently active alerts by rule
stonks_alert_active

# Alert firing rate
sum(rate(stonks_alerts_fired_total[1h])) by (rule, severity)

# Alert evaluation duration
histogram_quantile(0.95, sum(rate(stonks_alert_check_duration_seconds_bucket[5m])) by (le))

Dead-Letter Queue Health

# Current DLQ depth by queue
stonks_dlq_depth

# DLQ inflow rate (jobs dead-lettered per minute)
sum(rate(stonks_dlq_items_total[5m])) by (queue) * 60

# DLQ replay rate
sum(rate(stonks_dlq_replayed_total[5m])) by (queue) * 60

Pipeline Overview (Active Jobs)

# Currently active jobs by pipeline stage
stonks_active_jobs

# Parse quality score distribution
histogram_quantile(0.5, sum(rate(stonks_parse_quality_score_bucket[5m])) by (le))

# Low quality document rate
sum(rate(stonks_parse_low_quality_total[5m]))
  / sum(rate(stonks_parse_jobs_total[5m]))
Alert Expression For Severity
High DLQ depth stonks_dlq_depth > 10 5m warning
Ingestion error spike sum(rate(stonks_ingestion_errors_total[5m])) > 0.5 5m warning
Extraction latency high histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le)) > 60 10m warning
Lake publication stale stonks_alert_active{rule="analytical_lag"} == 1 5m warning
Broker errors active stonks_alert_active{rule="broker_issues"} == 1 1m critical
Zero ingestion throughput sum(rate(stonks_ingestion_jobs_total[15m])) == 0 15m critical